Skip to content
Go back

Rust Basics - Concurrency

Published:  at  08:19 AM

Hi, this is the fourth part of a Rust Basics series, where we’ll dive into Rust’s concurrency capabilities including threads, channels, shared state, and async programming.

Threading

As briefly mentioned in the Memory post, Rust is multi-threaded and ensures thread safety through its memory model.

Rust’s standard library provides a Thread API that allows you to spawn threads for concurrent execution. The ownership and borrowing rules ensure that threads can’t access invalid memory or cause data races.

Example:

use std::thread;
use std::time::Duration;

fn main() {
    // Spawn a new thread
    let handle = thread::spawn(|| {
        // This code runs in a separate thread
        for i in 1..10 {
            println!("Hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    // This code runs in the main thread
    for i in 1..5 {
        println!("Hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    // Wait for the spawned thread to finish
    // Without this, the main thread might exit before the spawned thread completes
    handle.join().unwrap();
}

Capturing Variables in Threads

To use variables from the main thread in a spawned thread, you need to use the move keyword to transfer ownership:

use std::thread;

fn main() {
    let v = vec![1, 2, 3];
    
    // Use 'move' to transfer ownership of v to the thread
    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });
    
    // v is no longer accessible here as it's ownership was moved 
    handle.join().unwrap();
}

Channels

Channels provide a way for threads to communicate by sending messages to each other. Rust’s standard library includes a multiple-producer, single-consumer channel implementation with the mpsc module.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Create a channel
    let (tx, rx) = mpsc::channel();
    
    // Clone the transmitter for multiple senders
    let tx1 = tx.clone();
    
    // First sender thread
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    
    // Second sender thread
    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];
        
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    
    // Receiver in the main thread
    for received in rx {
        println!("Got: {}", received);
    }
}

The channel will automatically close when all transmitters are dropped, causing the for loop with the receiver to terminate.

Shared State between threads

Sometimes you need to share data between threads rather than passing ownership. Rust’s type system and ownership rules guarantee memory safety, but you need synchronization primitives like mutexes to prevent data races.

Using Mutex for Shared Access

The Mutex<T> type provides mutual exclusion, ensuring only one thread can access the data at a time:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // Wrap the mutex in an Arc (Atomic Reference Counted) to share ownership
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    
    for _ in 0..10 {
        // Clone the Arc to increase the reference count
        let counter = Arc::clone(&counter);
        
        let handle = thread::spawn(move || {
            // Lock the mutex to get exclusive access
            let mut num = counter.lock().unwrap();
            
            // Modify the value
            *num += 1;
            // Lock is automatically released when num goes out of scope
        });
        
        handles.push(handle);
    }
    
    // Wait for all threads to complete
    for handle in handles {
        handle.join().unwrap();
    }
    
    // Print the final value
    println!("Final count: {}", *counter.lock().unwrap());
}

Deadlock Prevention

Mutexes can cause deadlocks if not used carefully. A deadlock occurs when two threads each hold a lock the other needs. To avoid deadlocks:

  1. Acquire locks in a consistent order
  2. Keep critical sections small
  3. Use timeouts when acquiring locks
  4. Consider lock-free data structures for simple cases

Synchronous vs. Asynchronous

By default, Rust is synchronous (code executes sequentially unless explicitly threaded).

However, Rust supports asynchronous programming via async/await, which allows non-blocking concurrency. This enables programs to make progress on other tasks while waiting for I/O or other operations to complete. The async ecosystem in Rust relies on community-driven runtimes such as:

Async/Await Basics

The async/await pattern in Rust works by:

  1. Marking functions with async to make them return a Future
  2. Using the await keyword inside async functions to wait for other futures to complete

Example of async code:

use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;

async fn fetch_data() -> Result<String, Box<dyn Error>> {
    // Asynchronous operations
    let mut stream = TcpStream::connect("example.com:80").await?;
    
    let request = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
    stream.write_all(request).await?;
    
    let mut buffer = Vec::new();
    stream.read_to_end(&mut buffer).await?;
    
    Ok(String::from_utf8(buffer)?)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // This will not block the thread
    let data = fetch_data().await?;
    println!("Received data: {:.100}...", data);
    Ok(())
}

Concurrency vs. Parallelism

It’s important to understand the difference:

Async Rust excels at concurrency (handling many tasks efficiently), while threading provides parallelism (utilizing multiple CPU cores).

When to Use Async vs. Threads

Combining Async and Threads

For optimal performance in real-world applications, you might combine both approaches:

use tokio::task;

#[tokio::main]
async fn main() {
    // Spawn a CPU-intensive task in a separate thread pool
    let computation_handle = task::spawn_blocking(|| {
        // This runs in a thread pool for CPU-bound tasks
        // Perform intensive calculation...
        let result = (0..10_000_000).sum::<u64>();
        result
    });
    
    // Meanwhile, in the async context, we can do other work
    let io_handle = tokio::spawn(async {
        // This would be an async I/O operation
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        "I/O operation completed"
    });
    
    // Wait for both to complete
    let (computation_result, io_result) = tokio::join!(
        computation_handle,
        io_handle
    );
    
    println!("Computation result: {}", computation_result.unwrap());
    println!("I/O result: {}", io_result.unwrap());
}

This approach gives you the best of both worlds: async for efficient I/O handling and threads for CPU-bound work.



Next Post
Rust Basics - Ownership, Borrowing & Lifetimes