Message Queues and Semaphores

Message queues give you structured, typed communication between processes without the byte-stream nature of pipes. Semaphores give you lightweight synchronization without the overhead of a full mutex. This chapter covers POSIX message queues and POSIX semaphores, then compares all IPC mechanisms.

POSIX Message Queues

A message queue is a kernel-managed list of messages. Each message has a body and a priority. Higher-priority messages are delivered first.

/* mq_sender.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>

#define QUEUE_NAME "/my_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 10

int main(void) {
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = MAX_MSGS,
        .mq_msgsize = MAX_MSG_SIZE,
        .mq_curmsgs = 0
    };

    mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_WRONLY, 0666, &attr);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        return 1;
    }

    const char *messages[] = {
        "Low priority message",
        "Medium priority message",
        "High priority message"
    };
    unsigned int priorities[] = {1, 5, 10};

    for (int i = 0; i < 3; i++) {
        if (mq_send(mq, messages[i], strlen(messages[i]) + 1,
                    priorities[i]) == -1) {
            perror("mq_send");
            return 1;
        }
        printf("Sent (priority %u): %s\n", priorities[i], messages[i]);
    }

    mq_close(mq);
    return 0;
}
/* mq_receiver.c */
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>

#define QUEUE_NAME "/my_queue"
#define MAX_MSG_SIZE 256

int main(void) {
    mqd_t mq = mq_open(QUEUE_NAME, O_RDONLY);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        return 1;
    }

    char buf[MAX_MSG_SIZE + 1];
    unsigned int priority;

    /* Receive 3 messages -- highest priority comes first */
    for (int i = 0; i < 3; i++) {
        ssize_t n = mq_receive(mq, buf, sizeof(buf), &priority);
        if (n == -1) {
            perror("mq_receive");
            return 1;
        }
        buf[n] = '\0';
        printf("Received (priority %u): %s\n", priority, buf);
    }

    mq_close(mq);
    mq_unlink(QUEUE_NAME);
    return 0;
}

Compile with:

gcc -o mq_sender mq_sender.c -lrt
gcc -o mq_receiver mq_receiver.c -lrt

Run the sender first, then the receiver. Notice the output order is by descending priority:

Received (priority 10): High priority message
Received (priority 5): Medium priority message
Received (priority 1): Low priority message

The message queue API:

mq_open(name, flags, mode, attr)  -- create or open
mq_send(mq, msg, len, priority)   -- send a message
mq_receive(mq, buf, len, &prio)   -- receive highest-priority message
mq_close(mq)                      -- close the descriptor
mq_unlink(name)                   -- remove the queue
mq_getattr(mq, &attr)             -- query attributes
mq_notify(mq, &sigevent)          -- register for async notification

Caution: The mq_receive buffer must be at least mq_msgsize bytes (as set in mq_attr). If it is smaller, mq_receive fails with EMSGSIZE. This is a common mistake.

Try It: Modify the sender to send 5 messages with the same priority. Verify that they arrive in FIFO order (first-in, first-out within the same priority level).

Message Queue Limits

Linux imposes system-wide limits on message queues:

/proc/sys/fs/mqueue/msg_max      -- max messages per queue (default 10)
/proc/sys/fs/mqueue/msgsize_max  -- max message size (default 8192)
/proc/sys/fs/mqueue/queues_max   -- max number of queues (default 256)

You can view and modify these:

cat /proc/sys/fs/mqueue/msg_max

Non-blocking and Timed Operations

/* mq_nonblock.c */
#include <stdio.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <errno.h>
#include <time.h>
#include <string.h>

#define QUEUE_NAME "/nb_queue"
#define MAX_MSG_SIZE 256

int main(void) {
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = 10,
        .mq_msgsize = MAX_MSG_SIZE,
        .mq_curmsgs = 0
    };

    mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR | O_NONBLOCK,
                        0666, &attr);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        return 1;
    }

    /* Non-blocking receive on empty queue */
    char buf[MAX_MSG_SIZE + 1];
    unsigned int prio;
    if (mq_receive(mq, buf, sizeof(buf), &prio) == -1) {
        if (errno == EAGAIN)
            printf("No messages available (non-blocking)\n");
    }

    /* Send a message */
    const char *msg = "test message";
    mq_send(mq, msg, strlen(msg) + 1, 0);

    /* Timed receive: wait up to 2 seconds */
    struct timespec ts;
    clock_gettime(CLOCK_REALTIME, &ts);
    ts.tv_sec += 2;

    ssize_t n = mq_timedreceive(mq, buf, sizeof(buf), &prio, &ts);
    if (n > 0) {
        buf[n] = '\0';
        printf("Timed receive got: %s\n", buf);
    }

    mq_close(mq);
    mq_unlink(QUEUE_NAME);
    return 0;
}

POSIX Semaphores

A semaphore is a counter that supports two atomic operations: wait (decrement) and post (increment). When the counter is zero, sem_wait blocks.

There are two kinds:

  • Named semaphores -- created with sem_open, accessible by unrelated processes via a filesystem name.
  • Unnamed semaphores -- created with sem_init, live in shared memory or within a single process.

Named Semaphore

/* sem_named.c */
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <semaphore.h>
#include <sys/wait.h>
#include <unistd.h>

#define SEM_NAME "/my_sem"

int main(void) {
    /* Create semaphore with initial value 1 (binary semaphore = mutex) */
    sem_t *sem = sem_open(SEM_NAME, O_CREAT, 0666, 1);
    if (sem == SEM_FAILED) {
        perror("sem_open");
        return 1;
    }

    pid_t pid = fork();
    if (pid == 0) {
        /* Child */
        sem_wait(sem);
        printf("Child: entered critical section\n");
        sleep(1);
        printf("Child: leaving critical section\n");
        sem_post(sem);
        sem_close(sem);
        _exit(0);
    }

    /* Parent */
    sleep(0);  /* let child start first */
    sem_wait(sem);
    printf("Parent: entered critical section\n");
    printf("Parent: leaving critical section\n");
    sem_post(sem);

    wait(NULL);
    sem_close(sem);
    sem_unlink(SEM_NAME);
    return 0;
}

Compile with:

gcc -o sem_named sem_named.c -pthread

Unnamed Semaphore in Shared Memory

/* sem_unnamed.c */
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include <unistd.h>

int main(void) {
    /* Allocate semaphore in shared memory */
    sem_t *sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE,
                      MAP_SHARED | MAP_ANONYMOUS, -1, 0);
    if (sem == MAP_FAILED) {
        perror("mmap");
        return 1;
    }

    /* Initialize: pshared=1 (process-shared), value=1 */
    sem_init(sem, 1, 1);

    pid_t pid = fork();
    if (pid == 0) {
        sem_wait(sem);
        printf("Child: in critical section\n");
        sleep(1);
        printf("Child: done\n");
        sem_post(sem);
        _exit(0);
    }

    sem_wait(sem);
    printf("Parent: in critical section\n");
    printf("Parent: done\n");
    sem_post(sem);

    wait(NULL);
    sem_destroy(sem);
    munmap(sem, sizeof(sem_t));
    return 0;
}

Counting Semaphores: Resource Pools

A counting semaphore tracks the number of available resources.

/* sem_pool.c */
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>

#define POOL_SIZE 3
#define NUM_WORKERS 8

static sem_t pool;

void *worker(void *arg) {
    int id = *(int *)arg;

    sem_wait(&pool);
    printf("Worker %d: acquired resource (entering pool)\n", id);
    sleep(1);  /* simulate work with the resource */
    printf("Worker %d: releasing resource\n", id);
    sem_post(&pool);

    return NULL;
}

int main(void) {
    sem_init(&pool, 0, POOL_SIZE);  /* 3 resources available */

    pthread_t threads[NUM_WORKERS];
    int ids[NUM_WORKERS];

    for (int i = 0; i < NUM_WORKERS; i++) {
        ids[i] = i;
        pthread_create(&threads[i], NULL, worker, &ids[i]);
    }

    for (int i = 0; i < NUM_WORKERS; i++)
        pthread_join(threads[i], NULL);

    sem_destroy(&pool);
    return 0;
}

Output shows at most 3 workers in the pool at any time:

Worker 0: acquired resource (entering pool)
Worker 1: acquired resource (entering pool)
Worker 2: acquired resource (entering pool)
Worker 0: releasing resource
Worker 3: acquired resource (entering pool)
...

The semaphore value diagram:

sem value:  3  2  1  0  0  0  1  0  1  ...
            |  |  |  |     |  |  |  |
            W0 W1 W2 W3   W4 W0 W5 W1
            acq     acq blocks  rel acq

Driver Prep: The Linux kernel uses semaphores (struct semaphore) and counting semaphores for resource management. The down() and up() functions in the kernel correspond to sem_wait() and sem_post(). Modern kernel code prefers mutexes for binary locking and completions for signaling.

Semaphore vs Mutex

+------------------+-------------------+--------------------+
| Feature          | Mutex             | Semaphore          |
+------------------+-------------------+--------------------+
| Value range      | 0 or 1 (locked/   | 0 to N             |
|                  | unlocked)         |                    |
| Ownership        | Yes (only owner   | No (any thread     |
|                  | can unlock)       | can post)          |
| Use case         | Mutual exclusion  | Resource counting  |
| Priority inherit | Yes (on Linux)    | No                 |
| Cross-process    | With PSHARED attr | Named or in shm    |
+------------------+-------------------+--------------------+

Rust: Message Passing with Channels

Rust does not wrap POSIX message queues in the standard library. Instead, it provides channels (covered in Ch39-40) which serve the same purpose within a single process. For cross-process message queues, use the posixmq crate:

// rust_mq.rs
// Cargo.toml: posixmq = "1"
use posixmq::PosixMq;

fn main() {
    let name = "/rust_mq";

    // Open or create the queue
    let mq = PosixMq::create(name)
        .max_msg_len(256)
        .capacity(10)
        .open_or_create()
        .expect("Failed to open message queue");

    // Send messages with priorities
    mq.send(0, b"Low priority").unwrap();
    mq.send(5, b"Medium priority").unwrap();
    mq.send(10, b"High priority").unwrap();

    // Receive (highest priority first)
    let mut buf = vec![0u8; 256];
    for _ in 0..3 {
        let (priority, len) = mq.recv(&mut buf).unwrap();
        let msg = std::str::from_utf8(&buf[..len]).unwrap();
        println!("Received (priority {}): {}", priority, msg);
    }

    PosixMq::unlink(name).ok();
}

Rust: Semaphore Alternatives

Rust's standard library has no semaphore type. Use tokio's Semaphore for async code or build one from Mutex and Condvar:

// semaphore.rs
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;

struct Semaphore {
    count: Mutex<usize>,
    cvar: Condvar,
}

impl Semaphore {
    fn new(initial: usize) -> Self {
        Semaphore {
            count: Mutex::new(initial),
            cvar: Condvar::new(),
        }
    }

    fn acquire(&self) {
        let mut count = self.count.lock().unwrap();
        while *count == 0 {
            count = self.cvar.wait(count).unwrap();
        }
        *count -= 1;
    }

    fn release(&self) {
        let mut count = self.count.lock().unwrap();
        *count += 1;
        self.cvar.notify_one();
    }
}

fn main() {
    let sem = Arc::new(Semaphore::new(3));
    let mut handles = vec![];

    for id in 0..8 {
        let sem = Arc::clone(&sem);
        handles.push(thread::spawn(move || {
            sem.acquire();
            println!("Worker {}: acquired resource", id);
            thread::sleep(Duration::from_secs(1));
            println!("Worker {}: releasing", id);
            sem.release();
        }));
    }

    for h in handles {
        h.join().unwrap();
    }
}

Rust Note: Rust's philosophy favors channels over semaphores for most use cases. "Do not communicate by sharing memory; share memory by communicating." Channels are easier to reason about and less prone to bugs.

IPC Decision Table

+------------------+-------+--------+--------+--------+--------+
| Feature          | Pipe  | FIFO   | Shm    | MsgQ   | Socket |
+------------------+-------+--------+--------+--------+--------+
| Related procs    | Yes   | Any    | Any    | Any    | Any    |
| Network capable  | No    | No     | No     | No     | Yes    |
| Message boundary | No    | No     | N/A    | Yes    | DGRAM  |
| Priority         | No    | No     | N/A    | Yes    | No     |
| Speed            | Med   | Med    | Fast   | Med    | Med    |
| Kernel copies    | 2     | 2      | 0      | 2      | 2      |
| Bidirectional    | No    | No     | Yes    | No*    | Yes    |
| Max data size    | 64KB  | 64KB   | RAM    | 8KB**  | Large  |
| Persistence      | No    | File   | /dev/  | /dev/  | No     |
|                  |       |        | shm    | mqueue |        |
+------------------+-------+--------+--------+--------+--------+
  * Two queues needed for bidirectional
  ** Default, configurable

Try It: Write a producer-consumer pair using POSIX message queues. The producer sends 10 numbered messages with alternating priorities (odd numbers get priority 1, even get priority 5). The consumer prints them and observes the ordering.

Knowledge Check

  1. What is the difference between a named semaphore and an unnamed semaphore?
  2. Why does mq_receive require a buffer of at least mq_msgsize bytes?
  3. In what situation would you choose a message queue over a pipe?

Common Pitfalls

  • mq_receive buffer too small -- fails with EMSGSIZE even if the actual message is short. The buffer must be mq_msgsize or larger.
  • Forgetting mq_unlink or sem_unlink -- the objects persist in /dev/mqueue/ and /dev/shm/ until explicitly removed.
  • Using sem_init with pshared=0 across processes -- the semaphore only works within one process. Set pshared=1 for cross-process use.
  • Deadlock with semaphores -- if sem_wait is called more times than sem_post, the semaphore blocks forever.
  • Ignoring EINTR -- sem_wait and mq_receive can be interrupted by signals. Always check for EINTR and retry.
  • Message queue full -- mq_send blocks (or returns EAGAIN in non-blocking mode) when the queue is at capacity.