Rust Threads, Channels, and Async
Rust's concurrency model is built on two pillars: the type system prevents data races at compile time, and the ecosystem gives you both OS threads and async I/O. This chapter digs into Send, Sync, scoped threads, channels, Arc<Mutex<T>>, and introduces async/await with tokio.
Send and Sync
Two marker traits control what can cross thread boundaries.
Send: A type isSendif it can be transferred to another thread. Most types areSend. Raw pointers are not.Sync: A type isSyncif it can be shared (via&T) between threads. A type isSyncif&TisSend.
+---------------------+--------+--------+
| Type | Send? | Sync? |
+---------------------+--------+--------+
| i32, String, Vec<T> | Yes | Yes |
| Mutex<T> | Yes | Yes |
| Rc<T> | No | No |
| Arc<T> | Yes | Yes |
| Cell<T> | Yes | No |
| *mut T | No | No |
+---------------------+--------+--------+
If you try to send an Rc<T> to another thread, the compiler stops you:
// send_error.rs -- WILL NOT COMPILE use std::rc::Rc; use std::thread; fn main() { let data = Rc::new(42); thread::spawn(move || { println!("{}", data); }); }
error: `Rc<i32>` cannot be sent between threads safely
Rust Note: These traits are automatically derived by the compiler. You almost never implement them manually. They exist so the compiler can reason about thread safety without runtime checks.
Channels: mpsc in Depth
Rust's standard library provides std::sync::mpsc -- multi-producer, single-consumer channels.
// channel_types.rs use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { // Unbounded channel (infinite buffer) let (tx, rx) = mpsc::channel(); thread::spawn(move || { let messages = vec!["hello", "from", "the", "thread"]; for msg in messages { tx.send(msg).unwrap(); thread::sleep(Duration::from_millis(200)); } }); // recv() blocks until a message arrives // When the sender drops, recv() returns Err loop { match rx.recv() { Ok(msg) => println!("Got: {}", msg), Err(_) => { println!("Channel closed"); break; } } } }
For bounded channels (backpressure):
// sync_channel.rs use std::sync::mpsc; use std::thread; fn main() { // Buffer holds at most 2 messages let (tx, rx) = mpsc::sync_channel(2); let producer = thread::spawn(move || { for i in 0..5 { println!("Sending {}", i); tx.send(i).unwrap(); // blocks if buffer full println!("Sent {}", i); } }); for val in rx { println!("Received: {}", val); } producer.join().unwrap(); }
Try It: Change the buffer size to 0. This creates a rendezvous channel where send blocks until the receiver calls recv. Run it and observe the interleaving.
Arc<Mutex> for Shared Mutable State
When multiple threads need to read and write the same data, combine Arc (atomic reference counting) with Mutex (mutual exclusion).
// arc_mutex.rs use std::sync::{Arc, Mutex}; use std::thread; fn main() { let data = Arc::new(Mutex::new(vec![1, 2, 3])); let mut handles = vec![]; for i in 0..3 { let data = Arc::clone(&data); handles.push(thread::spawn(move || { let mut vec = data.lock().unwrap(); vec.push(i * 10); println!("Thread {} pushed {}", i, i * 10); })); } for h in handles { h.join().unwrap(); } println!("Final: {:?}", *data.lock().unwrap()); }
The ownership diagram:
main thread thread 0 thread 1
| | |
v v v
Arc -------> [strong count = 3] <-------- Arc
|
v
Mutex<Vec<i32>>
|
v
Vec [1, 2, 3, ...]
Each Arc::clone increments the atomic reference count. When the last Arc is dropped, the Mutex and Vec are freed.
Scoped Threads
std::thread::scope (stable since Rust 1.63) lets threads borrow from the parent stack. No 'static requirement, no Arc needed.
// scoped.rs use std::thread; fn main() { let mut data = vec![1, 2, 3, 4, 5]; thread::scope(|s| { s.spawn(|| { let sum: i32 = data.iter().sum(); println!("Sum: {}", sum); }); s.spawn(|| { let len = data.len(); println!("Length: {}", len); }); }); // All scoped threads are joined here automatically // We can mutate data again -- threads are done data.push(6); println!("After scope: {:?}", data); }
Rust Note: Scoped threads solve the problem of needing
Arcjust to share a reference. The scope guarantees all threads finish before the borrow ends. This is similar to OpenMP parallel regions.
With scoped threads you can even have one thread borrow mutably while others read different data:
// scoped_mut.rs use std::thread; fn main() { let mut a = 10; let b = 20; thread::scope(|s| { s.spawn(|| { a += b; // mutable borrow of a }); s.spawn(|| { println!("b = {}", b); // shared borrow of b only }); }); println!("a = {}", a); }
Introduction to Async: Why and When
Threads are great for CPU-bound work. But for I/O-bound work (network servers, file I/O), OS threads are heavy: each one costs ~8KB of kernel stack plus scheduling overhead. A server handling 10,000 connections needs 10,000 threads.
Async I/O uses cooperative multitasking: tasks yield when they would block, and a runtime multiplexes many tasks onto a few OS threads.
Threads (preemptive): Async (cooperative):
Thread 1 [==BLOCK=====RUN==] Task 1 [==yield--RUN==yield--]
Thread 2 [=RUN===BLOCK==RUN] Task 2 [--RUN====yield--RUN==]
Thread 3 [BLOCK=======RUN==] Task 3 [--yield--RUN========-]
^
3 OS threads 1 OS thread, 3 tasks
Rule of thumb:
- CPU-bound (number crunching, compression): use threads
- I/O-bound (network, disk): use async
- Mixed: use async with
spawn_blockingfor CPU work
The Future Trait
An async function returns a Future. A Future is a state machine that can be polled.
#![allow(unused)] fn main() { // This is conceptual -- you don't implement Future manually for most code trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending, } }
When you write async fn, the compiler transforms your function into a state machine that implements Future. The .await points are where the state machine yields.
Tokio Basics
Tokio is the most widely used async runtime for Rust. Add it to Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
A minimal async program:
// tokio_hello.rs #[tokio::main] async fn main() { println!("Hello from async main"); let result = compute().await; println!("Result: {}", result); } async fn compute() -> i32 { tokio::time::sleep(std::time::Duration::from_millis(100)).await; 42 }
The #[tokio::main] macro sets up the runtime. Without it, async fn main would return a Future that nobody polls.
Spawning Async Tasks
// tokio_spawn.rs use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { let handle1 = tokio::spawn(async { sleep(Duration::from_millis(100)).await; println!("Task 1 done"); 1 }); let handle2 = tokio::spawn(async { sleep(Duration::from_millis(50)).await; println!("Task 2 done"); 2 }); let r1 = handle1.await.unwrap(); let r2 = handle2.await.unwrap(); println!("Results: {} + {} = {}", r1, r2, r1 + r2); }
Tasks run concurrently on the thread pool. tokio::spawn is like thread::spawn but for async tasks.
select!: Racing Tasks
// tokio_select.rs use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { tokio::select! { _ = sleep(Duration::from_secs(1)) => { println!("1 second elapsed"); } _ = sleep(Duration::from_millis(500)) => { println!("500ms elapsed first"); } } }
select! waits for the first future to complete and cancels the rest. Useful for timeouts, shutdown signals, and multiplexing.
Async TCP Echo Server
Here is a complete async echo server -- the kind of thing that would need threads-per-connection in C:
// echo_server.rs use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let listener = TcpListener::bind("127.0.0.1:8080").await?; println!("Listening on 127.0.0.1:8080"); loop { let (mut socket, addr) = listener.accept().await?; println!("New connection from {}", addr); tokio::spawn(async move { let mut buf = [0u8; 1024]; loop { let n = match socket.read(&mut buf).await { Ok(0) => { println!("{} disconnected", addr); return; } Ok(n) => n, Err(e) => { eprintln!("Read error from {}: {}", addr, e); return; } }; if let Err(e) = socket.write_all(&buf[..n]).await { eprintln!("Write error to {}: {}", addr, e); return; } } }); } }
Try It: Run the echo server, then connect with
nc 127.0.0.1 8080from multiple terminals. Each connection is handled by a lightweight task, not an OS thread.
C Comparison: Threaded Echo Server
For contrast, here is the same server in C using one thread per connection:
/* echo_server_threaded.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
void *handle_client(void *arg) {
int fd = *(int *)arg;
free(arg);
char buf[1024];
ssize_t n;
while ((n = read(fd, buf, sizeof(buf))) > 0) {
write(fd, buf, n);
}
close(fd);
return NULL;
}
int main(void) {
int srv = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(srv, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(8080),
.sin_addr.s_addr = INADDR_ANY
};
bind(srv, (struct sockaddr *)&addr, sizeof(addr));
listen(srv, 128);
printf("Listening on port 8080\n");
while (1) {
int *client = malloc(sizeof(int));
*client = accept(srv, NULL, NULL);
pthread_t t;
pthread_create(&t, NULL, handle_client, client);
pthread_detach(t);
}
}
This works but creates an OS thread per connection. At 10,000 connections, you have 10,000 threads. The async version uses a small thread pool regardless of connection count.
Driver Prep: Kernel drivers do not use async/await, but they use a similar concept: workqueues and tasklets defer work without creating new threads. The kernel's
io_uringinterface is the closest thing to async I/O at the syscall level.
Threads vs Async: Decision Guide
+------------------+-------------------+--------------------+
| Factor | OS Threads | Async Tasks |
+------------------+-------------------+--------------------+
| Scheduling | Preemptive (OS) | Cooperative (user) |
| Stack size | ~8KB kernel stack | Few hundred bytes |
| Creation cost | Moderate | Very cheap |
| Best for | CPU-bound work | I/O-bound work |
| Max concurrency | ~thousands | ~millions |
| Blocking calls | OK | MUST NOT block |
| Debugging | Easier | Harder (state mc.) |
+------------------+-------------------+--------------------+
Caution: Never call blocking functions (like
std::thread::sleepor synchronous file I/O) inside an async task. Usetokio::time::sleep,tokio::fs, ortokio::task::spawn_blockinginstead. Blocking an async task blocks the entire runtime thread.
Knowledge Check
- What is the difference between
SendandSync? - Why does
Rc<T>fail to compile when sent to another thread? - When should you use
tokio::task::spawn_blockinginstead oftokio::spawn?
Common Pitfalls
- Using
Rcinstead ofArcacross threads -- compile error, but confusing for beginners. - Forgetting
moveon closures passed tothread::spawn-- the closure borrows from the stack, which the thread may outlive. - Holding a
MutexGuardacross an.awaitpoint -- this blocks the async runtime. Usetokio::sync::Mutexif you must hold a lock across await. - Calling
.awaitoutside an async function -- futures are lazy; they do nothing until polled. - Mixing
std::sync::Mutexwith async code -- it works if the critical section is short and never crosses an await, buttokio::sync::Mutexis safer for async contexts. - Not dropping the original sender when using
mpsc::channelwith cloned senders -- the receiver never terminates.