Mqueue IPC example in shell script

Intro

Idea of messages comes from beginning of computer history. Mainly used for inter-process communication. Fascinating fact is no I havent seen someone using them on linux. Using mqueue is probably most easiest IPC interface possible. Cool feature of it it have path where you can access it. And as its act as queue then you can just open/read message with minimal amount of code. Disadvantage is you don't know who sent you message.

Advantages

* simple API
* no need to verify message size, comes as single packet
* persistent, if one program crashed, message stays in queue

Disadvantages:

* cant be sure to who received message
* bidirectional communication could be harder to implement

One of bash hacks that can be archived is inter process communication between two bash scripts. Example will be provided!

Linux mqueue API

Most system have them compiled in kernel you can check if you system supports them

$ cat /proc/filesystems | grep mq
nodev   mqueue

and if its mounted with

$ mount | grep mque
mqueue on /dev/mqueue type mqueue (rw,nosuid,nodev,noexec,relatime)

API for using mqueue is similar all UNIX interface. To check status of mqueue use cat command

$ cat /dev/mqueue/stack-0 
QSIZE:0          NOTIFY:0     SIGNO:0     NOTIFY_PID:0

Further is mentioned API calls and API usage template.

Library interfaces and system calls

Library interface    System call
mq_close(3)          close(2)
mq_getattr(3)        mq_getsetattr(2)
mq_notify(3)         mq_notify(2)
mq_open(3)           mq_open(2)
mq_receive(3)        mq_timedreceive(2)
mq_send(3)           mq_timedsend(2)
mq_setattr(3)        mq_getsetattr(2)
mq_timedreceive(3)   mq_timedreceive(2)
mq_timedsend(3)      mq_timedsend(2)
mq_unlink(3)         mq_unlink(2)

Create mqueue

Mqueue are located in mount point directory. Check with mount command where its located. Majority system have it at /dev/mqueue . Mqueue names passed to API should start with "/" no mount point path name needed in front.

Create mqueue by opening it

mq_open("/name",O_RDWR | O_CREAT, 0666, NULL);

Send mqueue message

If mqueue already created then it will be opened, otherwise created.

mq_open("/name",O_RDWR | O_CREAT, 0666, NULL);
mq_send(fd, "packet", strlen("packet"), set_mesg_priority);

Receive mqueue message

If mqueue already created then it will be opened, otherwise created

mq_open("/name",O_RDWR | O_CREAT, 0666, NULL);
mq_receive(mq_fd, ptr_to_str, ptr_size, &get_priority);

Remove mqueue

Unlink file, path will disappear after all process will release file descriptors, after unlink.

mq_open("/name",O_RDWR | O_CREAT, 0666, NULL);
mq_unlink("/name");

Example bash IPC

Further is source of utilities for using mqueues. Lets create example where one message queue created from one script, and script loops over and check received messages from queue, and executes received commands.

Server script that check mqueue for commands

loop reads queue with "./mqueue_pop -m QUEUE_NAME". If its successful then it receives non-empty string, if not then string is empty. Create queue if its not there with "./mqueue_create -m $QUEUE_NAME". And after receiving stop command exits loops and unlink queue.

3 commands supported to be sent stop/ls/aloha

daemon.sh

#!/bin/sh
QUEUE_NAME=/music-player-daemon
POP_CMD()
{
    ./mqueue_pop -m $QUEUE_NAME
}
CREATE_QUEUE()
{
    ./mqueue_create -m $QUEUE_NAME
}
REMOVE_QUEUE()
{
    ./mqueue_remove -m $QUEUE_NAME
}
if [ ! -f "/dev/mqueue$QUEUE_NAME" ]; then
    CREATE_QUEUE
fi
state="1"
while [ $state -ne 0 ]; do
    get_cmd=$(POP_CMD)
    case "$get_cmd" in
        stop)
            state="0"
            ;;
        ls)
            ls /dev
            ;;
        aloha)
            echo "aloha"
            ;;
    esac
    sleep 1
done
if [ -f "/dev/mqueue$QUEUE_NAME" ]; then
    REMOVE_QUEUE
fi

Send commands to script

Send commands to running server script. First lists /dev directory, second output aloha and third one terminates script.

./send_command ls
./send_command aloha
./send_command stop

send_command.sh

#!/bin/sh
QUEUE_NAME=/music-player-daemon
SEND_CMD()
{
    ./mqueue_push -m $QUEUE_NAME -p "$1"
}
echo "Send command to daemon"
if [ -f "/dev/mqueue$QUEUE_NAME" ]; then
    SEND_CMD $1
else
    echo "No queue to send"
fi

Source of example utilities for using mqueue

All source located at http://git.main.lv/cgit.cgi/mqueue_examples.git/

When compiling -lrt flag required

mqueue_create.c - Create mqueue

mqueue_create.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <getopt.h>
#include <unistd.h>
#include <mqueue.h>
#include <errno.h>

typedef struct {
    char *f_mqname;
    int   f_verbose;
    int   f_helper;
} mqueue_params;

mqueue_params g_params;

void helper(char *progname)
{
    printf("Usage: %s [OPTS]\n\n"
        "Version: 0.0.1 \n"
        "-m mqueue name\n"
        "-v verbose output\n"
        "-h help options\n"
        "\n"
        , progname);
}

#define full_mqpath_len 1024


int main(int argc, char **argv)
{
    //stack params
    int c,err;
    int i;
    int mq_fd;
    char full_mqpath[full_mqpath_len];


    memset(&g_params,0,sizeof(g_params));

    //process arguments
    while ((c = getopt(argc, argv, "m:vh")) != -1)
    {
        switch (c)
        {
            case 'm':
                g_params.f_mqname = optarg;
                break;
            case 'v':
                g_params.f_verbose = 1;
                break;
            case 'h':
            default:
                helper(argv[0]);
                exit(1);
        }
    }

    if (g_params.f_helper)
    {
        helper(argv[0]);
        return 0;
    }

    //set mqueue name
    if (g_params.f_mqname == NULL)
    {
        g_params.f_mqname = "/stack-0";
    }

    //compose string
    memset(&full_mqpath[0], 0, full_mqpath_len);
    snprintf(&full_mqpath[0], full_mqpath_len-1, "%s", g_params.f_mqname);
    if (g_params.f_verbose)
    {
        printf("Full path: %s\n",&full_mqpath);
    }

    mq_fd = mq_open(g_params.f_mqname,O_RDWR | O_CREAT, 0666, NULL);
    err = errno;
    if (mq_fd == -1)
    {
        if (g_params.f_verbose)
        {
            printf("ERROR:%d,%s\n",err,strerror(err));
        }
        return -1;
    }
    printf("%s\n",g_params.f_mqname);

    close(mq_fd);

    return 0;
}

mqueue_push.c - Push values to mqueue

mqueue_push.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <getopt.h>
#include <unistd.h>
#include <mqueue.h>
#include <errno.h>

typedef struct {
    char *f_mqname;
    char *f_push;
    int   f_verbose;
    int   f_helper;
} mqueue_params;

mqueue_params g_params;

void helper(char *progname)
{
    printf("Usage: %s [OPTS] VALUE\n\n"
        "Version: 0.0.1 \n"
        "-m mqueue name\n"
        "-v verbose output\n"
        "-p push value\n"
        "-h help options\n"
        "\n"
        , progname);
}

#define full_mqpath_len 1024

int main(int argc, char **argv)
{
    //stack params
    int c,err;
    int i;
    int mq_fd;
    char full_mqpath[full_mqpath_len];
    int prio=10;
    char *push_val="NONE";
    int push_size=strlen(push_val);


    memset(&g_params,0,sizeof(g_params));

    //process arguments
    while ((c = getopt(argc, argv, "m:vhp:")) != -1)
    {
        switch (c)
        {
            case 'm':
                g_params.f_mqname = optarg;
                break;
            case 'v':
                g_params.f_verbose = 1;
                break;
            case 'p':
                g_params.f_push = optarg;
                break;
            case 'h':
            default:
                helper(argv[0]);
                exit(1);
        }
    }

    if (g_params.f_helper)
    {
        helper(argv[0]);
        return 0;
    }

    //set mqueue name
    if (g_params.f_mqname == NULL)
    {
        g_params.f_mqname = "/stack-0";
    }

    //compose string
    memset(&full_mqpath[0], 0, full_mqpath_len);
    snprintf(&full_mqpath[0], full_mqpath_len-1, "%s", g_params.f_mqname);
    if (g_params.f_verbose)
    {
        printf("Full path: %s\n",&full_mqpath);
    }

    mq_fd = mq_open(g_params.f_mqname,O_RDWR | O_CREAT, 0666, NULL);
    err = errno;
    if (mq_fd == -1)
    {
        if (g_params.f_verbose)
        {
            printf("ERROR:%d,%s\n",err,strerror(err));
        }
        return -1;
    }
    printf("%s\n",g_params.f_mqname);

    if (g_params.f_push != NULL)
    {
        push_val = g_params.f_push;
        push_size = strlen(push_val);
    }

    mq_send(mq_fd, push_val, push_size, prio);

    close(mq_fd);

    return 0;
}

mqueue_pop.c - Get value from mqueue

mqueue_pop.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <getopt.h>
#include <unistd.h>
#include <mqueue.h>
#include <errno.h>

typedef struct {
    char *f_mqname;
    int   f_verbose;
    int   f_helper;
} mqueue_params;

mqueue_params g_params;

void helper(char *progname)
{
    printf("Usage: %s [OPTS] VALUE\n\n"
        "Version: 0.0.1 \n"
        "-m mqueue name\n"
        "-v verbose output\n"
        "-h help options\n"
        "\n"
        , progname);
}

#define full_mqpath_len 1024

int main(int argc, char **argv)
{
    //stack params
    int ret=0;
    int c,err;
    int i;
    int mq_fd;
    char full_mqpath[full_mqpath_len];
    int prio;
    struct mq_attr attr;
    char *pop_val=NULL;
    int pop_size;


    memset(&g_params,0,sizeof(g_params));

    //process arguments
    while ((c = getopt(argc, argv, "m:vh")) != -1)
    {
        switch (c)
        {
            case 'm':
                g_params.f_mqname = optarg;
                break;
            case 'v':
                g_params.f_verbose = 1;
                break;
            case 'h':
            default:
                helper(argv[0]);
                exit(1);
        }
    }

    if (g_params.f_helper)
    {
        helper(argv[0]);
        return 0;
    }

    //set mqueue name
    if (g_params.f_mqname == NULL)
    {
        g_params.f_mqname = "/stack-0";
    }

    //compose string
    memset(&full_mqpath[0], 0, full_mqpath_len);
    snprintf(&full_mqpath[0], full_mqpath_len-1, "%s", g_params.f_mqname);
    if (g_params.f_verbose)
    {
        printf("Full path: %s\n",&full_mqpath);
    }

    mq_fd = mq_open(g_params.f_mqname,O_RDWR | O_CREAT, 0666, NULL);
    err = errno;
    if (mq_fd == -1)
    {
        if (g_params.f_verbose)
        {
            printf("ERROR:%d,%s\n",err,strerror(err));
        }
        return -1;
    }
    //printf("%s\n",g_params.f_mqname);

    mq_getattr(mq_fd, &attr);

    pop_val = malloc(attr.mq_msgsize+1);
    memset(pop_val, 0, attr.mq_msgsize+1);

    if (attr.mq_curmsgs > 0)
    {
        pop_size = mq_receive(mq_fd, pop_val, attr.mq_msgsize, &prio);
        printf("%s\n",pop_val);
    } else {
        ret = -1;
    }

    close(mq_fd);
    free(pop_val);

    return ret;
}

mqueue_remove.c - Remove mqueue

mqueue_remove.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <getopt.h>
#include <unistd.h>
#include <mqueue.h>
#include <errno.h>

typedef struct {
    char *f_mqname;
    int   f_verbose;
    int   f_helper;
} mqueue_params;

mqueue_params g_params;

void helper(char *progname)
{
    printf("Usage: %s [OPTS] VALUE\n\n"
        "Version: 0.0.1 \n"
        "-m mqueue name\n"
        "-v verbose output\n"
        "-h help options\n"
        "\n"
        , progname);
}

#define full_mqpath_len 1024

int main(int argc, char **argv)
{
    //stack params
    int ret=0;
    int c,err;
    int i;
    int mq_fd;
    char full_mqpath[full_mqpath_len];


    memset(&g_params,0,sizeof(g_params));

    //process arguments
    while ((c = getopt(argc, argv, "m:vh")) != -1)
    {
        switch (c)
        {
            case 'm':
                g_params.f_mqname = optarg;
                break;
            case 'v':
                g_params.f_verbose = 1;
                break;
            case 'h':
            default:
                helper(argv[0]);
                exit(1);
        }
    }

    if (g_params.f_helper)
    {
        helper(argv[0]);
        return 0;
    }

    //set mqueue name
    if (g_params.f_mqname == NULL)
    {
        g_params.f_mqname = "/stack-0";
    }

    //compose string
    memset(&full_mqpath[0], 0, full_mqpath_len);
    snprintf(&full_mqpath[0], full_mqpath_len-1, "%s", g_params.f_mqname);
    if (g_params.f_verbose)
    {
        printf("Full path: %s\n",&full_mqpath);
    }

    mq_unlink(g_params.f_mqname);

    close(mq_fd);

    return ret;
}

mqueue_info.c - Get stats from mqueue

mqueue_info.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <getopt.h>
#include <unistd.h>
#include <mqueue.h>
#include <errno.h>

typedef struct {
    char *f_mqname;
    int   f_verbose;
    int   f_helper;
} mqueue_params;

mqueue_params g_params;

void helper(char *progname)
{
    printf("Usage: %s [OPTS] VALUE\n\n"
        "Version: 0.0.1 \n"
        "-m mqueue name\n"
        "-v verbose output\n"
        "-h help options\n"
        "\n"
        , progname);
}

#define full_mqpath_len 1024

int main(int argc, char **argv)
{
    //stack params
    int ret=0;
    int c,err;
    int i;
    int mq_fd;
    char full_mqpath[full_mqpath_len];
    int prio;
    struct mq_attr attr;
    char *pop_val=NULL;
    int pop_size;


    memset(&g_params,0,sizeof(g_params));

    //process arguments
    while ((c = getopt(argc, argv, "m:vh")) != -1)
    {
        switch (c)
        {
            case 'm':
                g_params.f_mqname = optarg;
                break;
            case 'v':
                g_params.f_verbose = 1;
                break;
            case 'h':
            default:
                helper(argv[0]);
                exit(1);
        }
    }

    if (g_params.f_helper)
    {
        helper(argv[0]);
        return 0;
    }

    //set mqueue name
    if (g_params.f_mqname == NULL)
    {
        g_params.f_mqname = "/stack-0";
    }

    //compose string
    memset(&full_mqpath[0], 0, full_mqpath_len);
    snprintf(&full_mqpath[0], full_mqpath_len-1, "%s", g_params.f_mqname);
    if (g_params.f_verbose)
    {
        printf("Full path: %s\n",&full_mqpath);
    }

    mq_fd = mq_open(g_params.f_mqname,O_RDWR | O_CREAT, 0666, NULL);
    err = errno;
    if (mq_fd == -1)
    {
        if (g_params.f_verbose)
        {
            printf("ERROR:%d,%s\n",err,strerror(err));
        }
        return -1;
    }

    mq_getattr(mq_fd, &attr);

    printf("%d\n",attr.mq_curmsgs);


    close(mq_fd);

    return ret;
}

Links

[1] https://www.man7.org/linux/man-pages/man7/mq_overview.7.html
[2] https://en.wikipedia.org/wiki/Message_queue