Rust Threads, Channels, and Async

Rust's concurrency model is built on two pillars: the type system prevents data races at compile time, and the ecosystem gives you both OS threads and async I/O. This chapter digs into Send, Sync, scoped threads, channels, Arc<Mutex<T>>, and introduces async/await with tokio.

Send and Sync

Two marker traits control what can cross thread boundaries.

  • Send: A type is Send if it can be transferred to another thread. Most types are Send. Raw pointers are not.
  • Sync: A type is Sync if it can be shared (via &T) between threads. A type is Sync if &T is Send.
+---------------------+--------+--------+
| Type                | Send?  | Sync?  |
+---------------------+--------+--------+
| i32, String, Vec<T> | Yes    | Yes    |
| Mutex<T>            | Yes    | Yes    |
| Rc<T>               | No     | No     |
| Arc<T>              | Yes    | Yes    |
| Cell<T>             | Yes    | No     |
| *mut T              | No     | No     |
+---------------------+--------+--------+

If you try to send an Rc<T> to another thread, the compiler stops you:

// send_error.rs -- WILL NOT COMPILE
use std::rc::Rc;
use std::thread;

fn main() {
    let data = Rc::new(42);
    thread::spawn(move || {
        println!("{}", data);
    });
}
error: `Rc<i32>` cannot be sent between threads safely

Rust Note: These traits are automatically derived by the compiler. You almost never implement them manually. They exist so the compiler can reason about thread safety without runtime checks.

Channels: mpsc in Depth

Rust's standard library provides std::sync::mpsc -- multi-producer, single-consumer channels.

// channel_types.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Unbounded channel (infinite buffer)
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let messages = vec!["hello", "from", "the", "thread"];
        for msg in messages {
            tx.send(msg).unwrap();
            thread::sleep(Duration::from_millis(200));
        }
    });

    // recv() blocks until a message arrives
    // When the sender drops, recv() returns Err
    loop {
        match rx.recv() {
            Ok(msg) => println!("Got: {}", msg),
            Err(_) => {
                println!("Channel closed");
                break;
            }
        }
    }
}

For bounded channels (backpressure):

// sync_channel.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    // Buffer holds at most 2 messages
    let (tx, rx) = mpsc::sync_channel(2);

    let producer = thread::spawn(move || {
        for i in 0..5 {
            println!("Sending {}", i);
            tx.send(i).unwrap();  // blocks if buffer full
            println!("Sent {}", i);
        }
    });

    for val in rx {
        println!("Received: {}", val);
    }

    producer.join().unwrap();
}

Try It: Change the buffer size to 0. This creates a rendezvous channel where send blocks until the receiver calls recv. Run it and observe the interleaving.

Arc<Mutex> for Shared Mutable State

When multiple threads need to read and write the same data, combine Arc (atomic reference counting) with Mutex (mutual exclusion).

// arc_mutex.rs
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let data = Arc::new(Mutex::new(vec![1, 2, 3]));
    let mut handles = vec![];

    for i in 0..3 {
        let data = Arc::clone(&data);
        handles.push(thread::spawn(move || {
            let mut vec = data.lock().unwrap();
            vec.push(i * 10);
            println!("Thread {} pushed {}", i, i * 10);
        }));
    }

    for h in handles {
        h.join().unwrap();
    }

    println!("Final: {:?}", *data.lock().unwrap());
}

The ownership diagram:

main thread            thread 0            thread 1
    |                    |                    |
    v                    v                    v
 Arc -------> [strong count = 3] <-------- Arc
              |
              v
          Mutex<Vec<i32>>
              |
              v
          Vec [1, 2, 3, ...]

Each Arc::clone increments the atomic reference count. When the last Arc is dropped, the Mutex and Vec are freed.

Scoped Threads

std::thread::scope (stable since Rust 1.63) lets threads borrow from the parent stack. No 'static requirement, no Arc needed.

// scoped.rs
use std::thread;

fn main() {
    let mut data = vec![1, 2, 3, 4, 5];

    thread::scope(|s| {
        s.spawn(|| {
            let sum: i32 = data.iter().sum();
            println!("Sum: {}", sum);
        });

        s.spawn(|| {
            let len = data.len();
            println!("Length: {}", len);
        });
    });
    // All scoped threads are joined here automatically

    // We can mutate data again -- threads are done
    data.push(6);
    println!("After scope: {:?}", data);
}

Rust Note: Scoped threads solve the problem of needing Arc just to share a reference. The scope guarantees all threads finish before the borrow ends. This is similar to OpenMP parallel regions.

With scoped threads you can even have one thread borrow mutably while others read different data:

// scoped_mut.rs
use std::thread;

fn main() {
    let mut a = 10;
    let b = 20;

    thread::scope(|s| {
        s.spawn(|| {
            a += b;  // mutable borrow of a
        });

        s.spawn(|| {
            println!("b = {}", b);  // shared borrow of b only
        });
    });

    println!("a = {}", a);
}

Introduction to Async: Why and When

Threads are great for CPU-bound work. But for I/O-bound work (network servers, file I/O), OS threads are heavy: each one costs ~8KB of kernel stack plus scheduling overhead. A server handling 10,000 connections needs 10,000 threads.

Async I/O uses cooperative multitasking: tasks yield when they would block, and a runtime multiplexes many tasks onto a few OS threads.

Threads (preemptive):            Async (cooperative):

Thread 1 [==BLOCK=====RUN==]    Task 1 [==yield--RUN==yield--]
Thread 2 [=RUN===BLOCK==RUN]    Task 2 [--RUN====yield--RUN==]
Thread 3 [BLOCK=======RUN==]    Task 3 [--yield--RUN========-]
                                         ^
3 OS threads                    1 OS thread, 3 tasks

Rule of thumb:

  • CPU-bound (number crunching, compression): use threads
  • I/O-bound (network, disk): use async
  • Mixed: use async with spawn_blocking for CPU work

The Future Trait

An async function returns a Future. A Future is a state machine that can be polled.

#![allow(unused)]
fn main() {
// This is conceptual -- you don't implement Future manually for most code
trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
}

When you write async fn, the compiler transforms your function into a state machine that implements Future. The .await points are where the state machine yields.

Tokio Basics

Tokio is the most widely used async runtime for Rust. Add it to Cargo.toml:

[dependencies]
tokio = { version = "1", features = ["full"] }

A minimal async program:

// tokio_hello.rs
#[tokio::main]
async fn main() {
    println!("Hello from async main");

    let result = compute().await;
    println!("Result: {}", result);
}

async fn compute() -> i32 {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    42
}

The #[tokio::main] macro sets up the runtime. Without it, async fn main would return a Future that nobody polls.

Spawning Async Tasks

// tokio_spawn.rs
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let handle1 = tokio::spawn(async {
        sleep(Duration::from_millis(100)).await;
        println!("Task 1 done");
        1
    });

    let handle2 = tokio::spawn(async {
        sleep(Duration::from_millis(50)).await;
        println!("Task 2 done");
        2
    });

    let r1 = handle1.await.unwrap();
    let r2 = handle2.await.unwrap();
    println!("Results: {} + {} = {}", r1, r2, r1 + r2);
}

Tasks run concurrently on the thread pool. tokio::spawn is like thread::spawn but for async tasks.

select!: Racing Tasks

// tokio_select.rs
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    tokio::select! {
        _ = sleep(Duration::from_secs(1)) => {
            println!("1 second elapsed");
        }
        _ = sleep(Duration::from_millis(500)) => {
            println!("500ms elapsed first");
        }
    }
}

select! waits for the first future to complete and cancels the rest. Useful for timeouts, shutdown signals, and multiplexing.

Async TCP Echo Server

Here is a complete async echo server -- the kind of thing that would need threads-per-connection in C:

// echo_server.rs
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Listening on 127.0.0.1:8080");

    loop {
        let (mut socket, addr) = listener.accept().await?;
        println!("New connection from {}", addr);

        tokio::spawn(async move {
            let mut buf = [0u8; 1024];
            loop {
                let n = match socket.read(&mut buf).await {
                    Ok(0) => {
                        println!("{} disconnected", addr);
                        return;
                    }
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("Read error from {}: {}", addr, e);
                        return;
                    }
                };

                if let Err(e) = socket.write_all(&buf[..n]).await {
                    eprintln!("Write error to {}: {}", addr, e);
                    return;
                }
            }
        });
    }
}

Try It: Run the echo server, then connect with nc 127.0.0.1 8080 from multiple terminals. Each connection is handled by a lightweight task, not an OS thread.

C Comparison: Threaded Echo Server

For contrast, here is the same server in C using one thread per connection:

/* echo_server_threaded.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>

void *handle_client(void *arg) {
    int fd = *(int *)arg;
    free(arg);
    char buf[1024];
    ssize_t n;
    while ((n = read(fd, buf, sizeof(buf))) > 0) {
        write(fd, buf, n);
    }
    close(fd);
    return NULL;
}

int main(void) {
    int srv = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(srv, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(8080),
        .sin_addr.s_addr = INADDR_ANY
    };
    bind(srv, (struct sockaddr *)&addr, sizeof(addr));
    listen(srv, 128);
    printf("Listening on port 8080\n");

    while (1) {
        int *client = malloc(sizeof(int));
        *client = accept(srv, NULL, NULL);
        pthread_t t;
        pthread_create(&t, NULL, handle_client, client);
        pthread_detach(t);
    }
}

This works but creates an OS thread per connection. At 10,000 connections, you have 10,000 threads. The async version uses a small thread pool regardless of connection count.

Driver Prep: Kernel drivers do not use async/await, but they use a similar concept: workqueues and tasklets defer work without creating new threads. The kernel's io_uring interface is the closest thing to async I/O at the syscall level.

Threads vs Async: Decision Guide

+------------------+-------------------+--------------------+
| Factor           | OS Threads        | Async Tasks        |
+------------------+-------------------+--------------------+
| Scheduling       | Preemptive (OS)   | Cooperative (user) |
| Stack size       | ~8KB kernel stack  | Few hundred bytes  |
| Creation cost    | Moderate          | Very cheap         |
| Best for         | CPU-bound work    | I/O-bound work     |
| Max concurrency  | ~thousands        | ~millions          |
| Blocking calls   | OK                | MUST NOT block     |
| Debugging        | Easier            | Harder (state mc.) |
+------------------+-------------------+--------------------+

Caution: Never call blocking functions (like std::thread::sleep or synchronous file I/O) inside an async task. Use tokio::time::sleep, tokio::fs, or tokio::task::spawn_blocking instead. Blocking an async task blocks the entire runtime thread.

Knowledge Check

  1. What is the difference between Send and Sync?
  2. Why does Rc<T> fail to compile when sent to another thread?
  3. When should you use tokio::task::spawn_blocking instead of tokio::spawn?

Common Pitfalls

  • Using Rc instead of Arc across threads -- compile error, but confusing for beginners.
  • Forgetting move on closures passed to thread::spawn -- the closure borrows from the stack, which the thread may outlive.
  • Holding a MutexGuard across an .await point -- this blocks the async runtime. Use tokio::sync::Mutex if you must hold a lock across await.
  • Calling .await outside an async function -- futures are lazy; they do nothing until polled.
  • Mixing std::sync::Mutex with async code -- it works if the critical section is short and never crosses an await, but tokio::sync::Mutex is safer for async contexts.
  • Not dropping the original sender when using mpsc::channel with cloned senders -- the receiver never terminates.