Message Queues and Semaphores
Message queues give you structured, typed communication between processes without the byte-stream nature of pipes. Semaphores give you lightweight synchronization without the overhead of a full mutex. This chapter covers POSIX message queues and POSIX semaphores, then compares all IPC mechanisms.
POSIX Message Queues
A message queue is a kernel-managed list of messages. Each message has a body and a priority. Higher-priority messages are delivered first.
/* mq_sender.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#define QUEUE_NAME "/my_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 10
int main(void) {
struct mq_attr attr = {
.mq_flags = 0,
.mq_maxmsg = MAX_MSGS,
.mq_msgsize = MAX_MSG_SIZE,
.mq_curmsgs = 0
};
mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_WRONLY, 0666, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
return 1;
}
const char *messages[] = {
"Low priority message",
"Medium priority message",
"High priority message"
};
unsigned int priorities[] = {1, 5, 10};
for (int i = 0; i < 3; i++) {
if (mq_send(mq, messages[i], strlen(messages[i]) + 1,
priorities[i]) == -1) {
perror("mq_send");
return 1;
}
printf("Sent (priority %u): %s\n", priorities[i], messages[i]);
}
mq_close(mq);
return 0;
}
/* mq_receiver.c */
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#define QUEUE_NAME "/my_queue"
#define MAX_MSG_SIZE 256
int main(void) {
mqd_t mq = mq_open(QUEUE_NAME, O_RDONLY);
if (mq == (mqd_t)-1) {
perror("mq_open");
return 1;
}
char buf[MAX_MSG_SIZE + 1];
unsigned int priority;
/* Receive 3 messages -- highest priority comes first */
for (int i = 0; i < 3; i++) {
ssize_t n = mq_receive(mq, buf, sizeof(buf), &priority);
if (n == -1) {
perror("mq_receive");
return 1;
}
buf[n] = '\0';
printf("Received (priority %u): %s\n", priority, buf);
}
mq_close(mq);
mq_unlink(QUEUE_NAME);
return 0;
}
Compile with:
gcc -o mq_sender mq_sender.c -lrt
gcc -o mq_receiver mq_receiver.c -lrt
Run the sender first, then the receiver. Notice the output order is by descending priority:
Received (priority 10): High priority message
Received (priority 5): Medium priority message
Received (priority 1): Low priority message
The message queue API:
mq_open(name, flags, mode, attr) -- create or open
mq_send(mq, msg, len, priority) -- send a message
mq_receive(mq, buf, len, &prio) -- receive highest-priority message
mq_close(mq) -- close the descriptor
mq_unlink(name) -- remove the queue
mq_getattr(mq, &attr) -- query attributes
mq_notify(mq, &sigevent) -- register for async notification
Caution: The
mq_receivebuffer must be at leastmq_msgsizebytes (as set inmq_attr). If it is smaller,mq_receivefails withEMSGSIZE. This is a common mistake.
Try It: Modify the sender to send 5 messages with the same priority. Verify that they arrive in FIFO order (first-in, first-out within the same priority level).
Message Queue Limits
Linux imposes system-wide limits on message queues:
/proc/sys/fs/mqueue/msg_max -- max messages per queue (default 10)
/proc/sys/fs/mqueue/msgsize_max -- max message size (default 8192)
/proc/sys/fs/mqueue/queues_max -- max number of queues (default 256)
You can view and modify these:
cat /proc/sys/fs/mqueue/msg_max
Non-blocking and Timed Operations
/* mq_nonblock.c */
#include <stdio.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <errno.h>
#include <time.h>
#include <string.h>
#define QUEUE_NAME "/nb_queue"
#define MAX_MSG_SIZE 256
int main(void) {
struct mq_attr attr = {
.mq_flags = 0,
.mq_maxmsg = 10,
.mq_msgsize = MAX_MSG_SIZE,
.mq_curmsgs = 0
};
mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR | O_NONBLOCK,
0666, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
return 1;
}
/* Non-blocking receive on empty queue */
char buf[MAX_MSG_SIZE + 1];
unsigned int prio;
if (mq_receive(mq, buf, sizeof(buf), &prio) == -1) {
if (errno == EAGAIN)
printf("No messages available (non-blocking)\n");
}
/* Send a message */
const char *msg = "test message";
mq_send(mq, msg, strlen(msg) + 1, 0);
/* Timed receive: wait up to 2 seconds */
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 2;
ssize_t n = mq_timedreceive(mq, buf, sizeof(buf), &prio, &ts);
if (n > 0) {
buf[n] = '\0';
printf("Timed receive got: %s\n", buf);
}
mq_close(mq);
mq_unlink(QUEUE_NAME);
return 0;
}
POSIX Semaphores
A semaphore is a counter that supports two atomic operations: wait (decrement) and post (increment). When the counter is zero, sem_wait blocks.
There are two kinds:
- Named semaphores -- created with
sem_open, accessible by unrelated processes via a filesystem name. - Unnamed semaphores -- created with
sem_init, live in shared memory or within a single process.
Named Semaphore
/* sem_named.c */
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <semaphore.h>
#include <sys/wait.h>
#include <unistd.h>
#define SEM_NAME "/my_sem"
int main(void) {
/* Create semaphore with initial value 1 (binary semaphore = mutex) */
sem_t *sem = sem_open(SEM_NAME, O_CREAT, 0666, 1);
if (sem == SEM_FAILED) {
perror("sem_open");
return 1;
}
pid_t pid = fork();
if (pid == 0) {
/* Child */
sem_wait(sem);
printf("Child: entered critical section\n");
sleep(1);
printf("Child: leaving critical section\n");
sem_post(sem);
sem_close(sem);
_exit(0);
}
/* Parent */
sleep(0); /* let child start first */
sem_wait(sem);
printf("Parent: entered critical section\n");
printf("Parent: leaving critical section\n");
sem_post(sem);
wait(NULL);
sem_close(sem);
sem_unlink(SEM_NAME);
return 0;
}
Compile with:
gcc -o sem_named sem_named.c -pthread
Unnamed Semaphore in Shared Memory
/* sem_unnamed.c */
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include <unistd.h>
int main(void) {
/* Allocate semaphore in shared memory */
sem_t *sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if (sem == MAP_FAILED) {
perror("mmap");
return 1;
}
/* Initialize: pshared=1 (process-shared), value=1 */
sem_init(sem, 1, 1);
pid_t pid = fork();
if (pid == 0) {
sem_wait(sem);
printf("Child: in critical section\n");
sleep(1);
printf("Child: done\n");
sem_post(sem);
_exit(0);
}
sem_wait(sem);
printf("Parent: in critical section\n");
printf("Parent: done\n");
sem_post(sem);
wait(NULL);
sem_destroy(sem);
munmap(sem, sizeof(sem_t));
return 0;
}
Counting Semaphores: Resource Pools
A counting semaphore tracks the number of available resources.
/* sem_pool.c */
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#define POOL_SIZE 3
#define NUM_WORKERS 8
static sem_t pool;
void *worker(void *arg) {
int id = *(int *)arg;
sem_wait(&pool);
printf("Worker %d: acquired resource (entering pool)\n", id);
sleep(1); /* simulate work with the resource */
printf("Worker %d: releasing resource\n", id);
sem_post(&pool);
return NULL;
}
int main(void) {
sem_init(&pool, 0, POOL_SIZE); /* 3 resources available */
pthread_t threads[NUM_WORKERS];
int ids[NUM_WORKERS];
for (int i = 0; i < NUM_WORKERS; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, worker, &ids[i]);
}
for (int i = 0; i < NUM_WORKERS; i++)
pthread_join(threads[i], NULL);
sem_destroy(&pool);
return 0;
}
Output shows at most 3 workers in the pool at any time:
Worker 0: acquired resource (entering pool)
Worker 1: acquired resource (entering pool)
Worker 2: acquired resource (entering pool)
Worker 0: releasing resource
Worker 3: acquired resource (entering pool)
...
The semaphore value diagram:
sem value: 3 2 1 0 0 0 1 0 1 ...
| | | | | | | |
W0 W1 W2 W3 W4 W0 W5 W1
acq acq blocks rel acq
Driver Prep: The Linux kernel uses semaphores (
struct semaphore) and counting semaphores for resource management. Thedown()andup()functions in the kernel correspond tosem_wait()andsem_post(). Modern kernel code prefers mutexes for binary locking and completions for signaling.
Semaphore vs Mutex
+------------------+-------------------+--------------------+
| Feature | Mutex | Semaphore |
+------------------+-------------------+--------------------+
| Value range | 0 or 1 (locked/ | 0 to N |
| | unlocked) | |
| Ownership | Yes (only owner | No (any thread |
| | can unlock) | can post) |
| Use case | Mutual exclusion | Resource counting |
| Priority inherit | Yes (on Linux) | No |
| Cross-process | With PSHARED attr | Named or in shm |
+------------------+-------------------+--------------------+
Rust: Message Passing with Channels
Rust does not wrap POSIX message queues in the standard library. Instead, it provides channels (covered in Ch39-40) which serve the same purpose within a single process. For cross-process message queues, use the posixmq crate:
// rust_mq.rs // Cargo.toml: posixmq = "1" use posixmq::PosixMq; fn main() { let name = "/rust_mq"; // Open or create the queue let mq = PosixMq::create(name) .max_msg_len(256) .capacity(10) .open_or_create() .expect("Failed to open message queue"); // Send messages with priorities mq.send(0, b"Low priority").unwrap(); mq.send(5, b"Medium priority").unwrap(); mq.send(10, b"High priority").unwrap(); // Receive (highest priority first) let mut buf = vec![0u8; 256]; for _ in 0..3 { let (priority, len) = mq.recv(&mut buf).unwrap(); let msg = std::str::from_utf8(&buf[..len]).unwrap(); println!("Received (priority {}): {}", priority, msg); } PosixMq::unlink(name).ok(); }
Rust: Semaphore Alternatives
Rust's standard library has no semaphore type. Use tokio's Semaphore for async code or build one from Mutex and Condvar:
// semaphore.rs use std::sync::{Arc, Mutex, Condvar}; use std::thread; use std::time::Duration; struct Semaphore { count: Mutex<usize>, cvar: Condvar, } impl Semaphore { fn new(initial: usize) -> Self { Semaphore { count: Mutex::new(initial), cvar: Condvar::new(), } } fn acquire(&self) { let mut count = self.count.lock().unwrap(); while *count == 0 { count = self.cvar.wait(count).unwrap(); } *count -= 1; } fn release(&self) { let mut count = self.count.lock().unwrap(); *count += 1; self.cvar.notify_one(); } } fn main() { let sem = Arc::new(Semaphore::new(3)); let mut handles = vec![]; for id in 0..8 { let sem = Arc::clone(&sem); handles.push(thread::spawn(move || { sem.acquire(); println!("Worker {}: acquired resource", id); thread::sleep(Duration::from_secs(1)); println!("Worker {}: releasing", id); sem.release(); })); } for h in handles { h.join().unwrap(); } }
Rust Note: Rust's philosophy favors channels over semaphores for most use cases. "Do not communicate by sharing memory; share memory by communicating." Channels are easier to reason about and less prone to bugs.
IPC Decision Table
+------------------+-------+--------+--------+--------+--------+
| Feature | Pipe | FIFO | Shm | MsgQ | Socket |
+------------------+-------+--------+--------+--------+--------+
| Related procs | Yes | Any | Any | Any | Any |
| Network capable | No | No | No | No | Yes |
| Message boundary | No | No | N/A | Yes | DGRAM |
| Priority | No | No | N/A | Yes | No |
| Speed | Med | Med | Fast | Med | Med |
| Kernel copies | 2 | 2 | 0 | 2 | 2 |
| Bidirectional | No | No | Yes | No* | Yes |
| Max data size | 64KB | 64KB | RAM | 8KB** | Large |
| Persistence | No | File | /dev/ | /dev/ | No |
| | | | shm | mqueue | |
+------------------+-------+--------+--------+--------+--------+
* Two queues needed for bidirectional
** Default, configurable
Try It: Write a producer-consumer pair using POSIX message queues. The producer sends 10 numbered messages with alternating priorities (odd numbers get priority 1, even get priority 5). The consumer prints them and observes the ordering.
Knowledge Check
- What is the difference between a named semaphore and an unnamed semaphore?
- Why does
mq_receiverequire a buffer of at leastmq_msgsizebytes? - In what situation would you choose a message queue over a pipe?
Common Pitfalls
mq_receivebuffer too small -- fails withEMSGSIZEeven if the actual message is short. The buffer must bemq_msgsizeor larger.- Forgetting
mq_unlinkorsem_unlink-- the objects persist in/dev/mqueue/and/dev/shm/until explicitly removed. - Using
sem_initwithpshared=0across processes -- the semaphore only works within one process. Setpshared=1for cross-process use. - Deadlock with semaphores -- if
sem_waitis called more times thansem_post, the semaphore blocks forever. - Ignoring
EINTR--sem_waitandmq_receivecan be interrupted by signals. Always check forEINTRand retry. - Message queue full --
mq_sendblocks (or returnsEAGAINin non-blocking mode) when the queue is at capacity.