• LOGIN
  • No products in the cart.

Basic Unix Queuing Techniques

It occasionally happens that our incoming or outgoing data cannot be processed as it is generated, or for some reason, we choose to process it at a later time.

By Mark Sitkowski C.Eng, M.I.E.E Design Simulation Systems Ltd

A typical example might be a client-server system, where it is necessary to queue the socket descriptors of incoming connections because of some limit on the number of active processes, or a message hub, which accepts data synchronously, but must rely on other processes to remove the data asynchronously.
Apart from the numerous commercially-available third party implementations of queuing systems, Unix has two highly efficient queuing mechanisms which can be used for extremely low overhead systems of queues.

Kernel Mode Queues

The kernel uses queues internally for the implementation of functions such as device drivers. The system call interface to this mechanism is available for the implementation of application programs.
The queues so produced are implemented in memory, so they are very fast. However, because there is no permanent storage of the data, these queues are also non-persistent. If the process or the machine crashes, all of the queued data will be lost and all incoming data will never be enqueued.
A queuing system, based on kernel queues, is the subject of ‘Advanced Queues’. Here, we will concentrate on disk-based user mode queues.

User Mode Queues

The kernel mode queuing system is a little Spartan, and it is sometimes more convenient to use the user mode queue library functions which offer a little more functionality, namely:

• Notification of message arrival by sending a signal to the monitoring process.
• Prioritization of messages.

There are only four fundamental commands to remember:

• mq_open() – opens an existing queue, or creates a new queue
• mq_send() – enqueues a message
• mq_receive() – dequeues a message
• mq_notify() – notifies a process of the arrival of a message

The remaining four commands perform housekeeping tasks:

mq_close() – closes a queue
mq_unlink() – deletes a queue from the disk
mq_getattr() – interrogates a queue’s characteristics
mq_setattr() – sets a queue’s characteristics

A single structure definition is used to set and get the queues’ attributes, and is defined as:

struct mq_attr {
 long mq_flags /* message queue flags */
 long mq_maxmsg /* maximum number of messages */
 long mq_msgsize /* maximum message size */
 long mq_curmsgs /* number of messages currently queued */
 };

The mq series of commands all relate to disk based queues. The queues themselves are created in the /tmp directory, and are always referred to in the commands as if they were situated below the root directory.
Thus, to create a queue called ‘zq’, we would call mq_open(), like this:

Int qd;
 struct mq_attr atr;

atr.mq_maxmsg = 100;
 atr.mq_msgsize = 255;

if((qd = mq_open(“/zq”, O_RDWR|O_CREAT, 0755, &atr)) == (mqd_t)-1){
 perror(“mq_open”);
 }

Notice the similarity between the above syntax and that of the open() command, for a file. The returned value is the queue descriptor while the flags are exactly the same, as defined in fcntl.h for those relating to a file. The pointer to the ‘atr’ structure permits the setting of the maximum number of messages and the maximum message size, prior to calling mq_open.
Enqueuing a message is analogous to a write() on a file:

char *msg = “xyz”;
 int priority = 5;

if(mq_send(qd, msg, strlen(msg), priority) == -1){
 perror(“mq_send”);
 }

The extra parameter, ‘priority’ determines the order in which the message will be removed from the queue, when it is dequeued, with ‘1’ being the highest priority.
The dequeuing is performed by mq_receive():

unsigned char data[8192];
 int priority;
 int n;

If((n = mq_receive(qd, (char *)data, sizeof(data), &priority)) > 0){
 Printf(“Received %d byte message >%s< with %d priority\n”, n, data, priority); }

Messages are taken off the queue, in order of their priority, which is returned by mq_receive(), into the variable passed to it. The return value of the function is the number of bytes in the message. In normal operation, this function would be called in a ‘while’ loop, and the queue length would be checked at each iteration of the loop. The checking is done by the mq_getattr() function, called with the queue descriptor and the atr structure defined above:

if(mq_getattr(qd, &atr) == 0){ if(atr.mq_curmsgs == 0){ printf(“No more messages\n”); mq_close(qd); } }

The following code extract puts this all together:

while((rval = mq_receive(qd, (char *)data, sizeof(data), &priority)) > 0){
 printf(“Client received: >%s< priority %d\n”, data, priority);
 memset(data, ‘\0’, sizeof(data));
 if(mq_getattr(qd, &atr) == 0){
 if(atr.mq_curmsgs == 0){
 printf(“No more messages\n”);
 mq_close(qd);
 break;
 }
 }
 }

We now have all the information we need to write a little test program which exercises all of these queuing functions. Instead of attempting to re-create MQ Series from scratch (which we will leave for the ‘Advanced Queues’ article), this program merely does the following:

• Creates a queue whose descriptor is ‘qd’.
• Launches a child process, child() which asks to be notified of the arrival of a message.
• Enqueues four messages in ascending order of priority.
• The child pulls the messages off the queue, in the order that they arrived, i.e, in order of priority. It then quits.
• Launches another child process, client(), which merely performs a blocking read of the queue.
• Enqueues four more messages in descending order of priority
• The child, again, pulls the messages off in order of priority, which means the reverse of the order of their arrival. It does not quit.

The notification mechanism is by means of a software interrupt, defined by means of the sigevent structure. We first create the variable:

struct sigevent ev;

The interesting parts of this structure (defined fully in siginfo.h) are:

struct sigevent {
 int sigev_notify;
 int sigev_signo;
 }

Where sigev_notify has the values:

SIGEV_NONE
 SIGEV_SIGNAL
 SIGEV_THREAD

We will choose SIGEV_SIGNAL since we want to catch an interrupt with the arrival of each message on our queue. Later, if we need to turn off notification, we can do it by passing in SIGEV_NONE.
Since sigev_signo lets us choose which signal can be sent to us, we’ll choose something safe that isn’t used by other processes. SIGURG is normally sent out when an urgent condition exists on a socket or other I/O device and, in that capacity, is of no interest to us. Therefore, we will use SIGURG and register it, together with our interrupt handler in main():

 signal(SIGURG, interrupt);

Then, in our child() function, when our child process is running, we define the kind of event we need and the signal number that we’re expecting as follows:

ev.sigev_notify = SIGEV_SIGNAL;
 ev.sigev_signo = SIGURG;

Immediately after these lines, we call pause() which puts the process into a catatonic state, waiting for the arrival of an interrupt.
In reality, the server and client code would probably be in separate files and run in unrelated processes. Since this is merely an exercise, all of the code is in one file, as follows.

September 14, 2017

Leave a Reply

avatar

This site uses Akismet to reduce spam. Learn how your comment data is processed.

  Subscribe  
Notify of
© HAKIN9 MEDIA SP. Z O.O. SP. K. 2013