Message Queue

Message queue definitions for the QuantumRT Real-Time Kernel.

Part of QuantumRT Real-Time Kernel.

Typedefs

typedef void *mqd_t

Message queue descriptor.

Functions

mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr)

Open a connection between thread and message queue with a message queue descriptor.

Error numbers:

  • EINVAL if the name is NULL.

  • EINVAL if the oflag is invalid.

  • EINVAL if the attributes is != NULL and mq_maxmsg or mq_msgsize are <= 0 when creating a new queue.

  • ENAMETOOLONG if the name length exceeds QRT_MQ_NAME_LIMIT.

  • EEXIST if the queue exists and both O_CREAT and O_EXCL are specified.

  • ENOMEM if there is insufficient memory for the descriptor, message queue, or name.

  • ENOSPC if the system limit on the number of message queues is reached.

  • ENOENT if the queue does not exist and O_CREAT is not specified.

Parameters:
  • name – Name of the message queue.

  • oflag – One of the access mode flags:

    • O_RDONLY : Open the queue to receive messages only.

    • O_WRONLY : Open the queue to send messages only.

    • O_RDWR : Open the queue for reading and writing. And optionally OR’ed with:

    • O_CREAT : Create the queue if it does not exist.

    • O_EXCL : If O_CREAT is also specified, and a queue with the given name already exists, then mq_open() will fail.

    • O_NONBLOCK : Open the queue in non-blocking mode.

  • mode – Not supported.

  • attr – Message queue attributes. NULL for default attributes.

Returns:

mqd_t Message queue descriptor on success, (mqd_t)-1 on error and errno set.

// Example — MQTT client thread communication

#include "mqueue.h"
#include "pthread.h"
#include "unistd.h"
#include <string.h>

struct mqtt_msg
{
    char topic[32u];
    char payload[64u];
};

void *mqtt_subscriber_thread(void *arg)
{
    struct mqtt_msg msg;
    mqd_t mq_mqtt = mq_open("/mqtt_subscriber", O_WRONLY, 0, NULL);

    for (;;)
    {
        if (read_mqtt_packet(&msg) == 0)
        {
            mq_send(mq_mqtt, (const char *)&msg, sizeof(msg), 0);
        }
    }
    return NULL;
}

void *mqtt_publisher_thread(void *arg)
{
    struct mqtt_msg msg;
    mqd_t mq_mqtt = mq_open("/mqtt_publisher", O_RDONLY, 0, NULL);

    for (;;)
    {
        if (mq_receive(mq_mqtt, (char *)&msg, sizeof(msg), NULL) != -1)
        {
            send_mqtt_packet(&msg);
        }
    }
    return NULL;
}

void *mqtt_app_thread(void *arg)
{
    struct mqtt_msg tx;
    struct mqtt_msg rx;
    struct mq_attr attr =
    {
        .mq_maxmsg  = 8,
        .mq_msgsize = sizeof(struct mqtt_msg)
    };

    mqd_t publisher  = mq_open("/mqtt_publisher",  O_CREAT | O_EXCL | O_WRONLY, 0, &attr);
    mqd_t subscriber = mq_open("/mqtt_subscriber", O_CREAT | O_EXCL | O_RDONLY, 0, &attr);

    for (;;)
    {
        // Process incoming messages
        if (mq_receive(subscriber, (char *)&rx, sizeof(rx), NULL) != -1)
        {
            // Handle received message
        }

        // Send command to MQTT broker
        strcpy(tx.topic, "command");
        strcpy(tx.payload, "toggle_led");
        mq_send(publisher, (const char *)&tx, sizeof(tx), 0);
    }
    return NULL;
}

int mq_notify(mqd_t mqdes, const struct sigevent *notification)

Not supported.

Returns:

-1 and sets errno to ENOSYS.

int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)

Set message queue attributes.

Error numbers:

  • EBADF if the descriptor is invalid.

Parameters:
  • mqdes – Message queue descriptor.

  • mqstat – New message queue attributes.

  • omqstat – Old message queue attributes.

Returns:

0 on success, -1 on error and errno set.

int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)

Get message queue attributes.

Error numbers:

  • EBADF if the descriptor is invalid.

Parameters:
  • mqdes – Message queue descriptor.

  • mqstat – Message queue attributes.

Returns:

0 on success, -1 on error and errno set.

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio)

Send a message to the message queue.

Error numbers:

  • EAGAIN if the queue is full and O_NONBLOCK is set.

  • EINVAL if the message length is invalid or msg_ptr is NULL.

  • EMSGSIZE if the message length exceeds the maximum message size.

  • EBADF if the descriptor is invalid or not opened for writing.

Parameters:
  • mqdes – Message queue descriptor.

  • msg_ptr – Pointer to the message to be sent.

  • msg_len – Length of the message.

  • msg_prio – Not supported.

Returns:

0 on success, -1 on error and errno set.

int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abstime)

Send a message to the message queue.

Error numbers:

  • EAGAIN if the queue is full and O_NONBLOCK is set.

  • EINVAL if the message length is invalid or msg_ptr is NULL.

  • EINVAL if the abstime is invalid, nanoseconds have to be => 0 and < 1000 million.

  • EMSGSIZE if the message length exceeds the maximum message size.

  • EBADF if the descriptor is invalid or not opened for writing.

  • ETIMEDOUT if the wait timed out.

Parameters:
  • mqdes – Message queue descriptor.

  • msg_ptr – Pointer to the message to be sent.

  • msg_len – Length of the message.

  • msg_prio – Not supported.

Returns:

0 on success, -1 on error and errno set.

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio)

Send a message to the message queue.

Error numbers:

  • EAGAIN if the queue is empty and O_NONBLOCK is set.

  • EINVAL if the message length is invalid or msg_ptr is NULL.

  • EMSGSIZE if the message length exceeds the maximum message size.

  • EBADF if the descriptor is invalid or not opened for writing.

Parameters:
  • mqdes – Message queue descriptor.

  • msg_ptr – Pointer to the message to be sent.

  • msg_len – Length of the message.

  • msg_prio – Not supported.

Returns:

Message length in bytes on success, -1 on error and errno set.

ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abstime)

Send a message to the message queue.

Error numbers:

  • EAGAIN if the queue is empty and O_NONBLOCK is set.

  • EINVAL if the message length is invalid or msg_ptr is NULL.

  • EINVAL if the abstime is invalid, nanoseconds have to be => 0 and < 1000 million.

  • EMSGSIZE if the message length exceeds the maximum message size.

  • EBADF if the descriptor is invalid or not opened for writing.

  • ETIMEDOUT if the wait timed out.

Parameters:
  • mqdes – Message queue descriptor.

  • msg_ptr – Pointer to the message to be sent.

  • msg_len – Length of the message.

  • msg_prio – Not supported.

Returns:

Message length in bytes on success, -1 on error and errno set.

Unlink a message queue.

Error numbers:

  • ENOENT if the message queue does not exist.

Parameters:
  • name – Name of the message queue.

Returns:

0 on success, -1 on error and errno set.

int mq_close(mqd_t mqdes)

Close a message queue descriptor.

Error numbers:

  • EBADF if the descriptor is invalid.

Parameters:
  • mqdes – Message queue descriptor.

Returns:

0 on success, -1 on error and errno set.

struct mq_attr

Public Members

long mq_flags

Message queue flags.

long mq_maxmsg

Maximum number of messages.

long mq_msgsize

Maximum message size.

long mq_curmsgs

Number of messages currently queued.