Hi, this is the fourth part of a Rust Basics series, where we’ll dive into Rust’s concurrency capabilities including threads, channels, shared state, and async programming.
Threading
As briefly mentioned in the Memory post, Rust is multi-threaded and ensures thread safety through its memory model.
Rust’s standard library provides a Thread API that allows you to spawn threads for concurrent execution. The ownership and borrowing rules ensure that threads can’t access invalid memory or cause data races.
Example:
use std::thread;
use std::time::Duration;
fn main() {
// Spawn a new thread
let handle = thread::spawn(|| {
// This code runs in a separate thread
for i in 1..10 {
println!("Hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
// This code runs in the main thread
for i in 1..5 {
println!("Hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
// Wait for the spawned thread to finish
// Without this, the main thread might exit before the spawned thread completes
handle.join().unwrap();
}
Capturing Variables in Threads
To use variables from the main thread in a spawned thread, you need to use the move
keyword to transfer ownership:
use std::thread;
fn main() {
let v = vec![1, 2, 3];
// Use 'move' to transfer ownership of v to the thread
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});
// v is no longer accessible here as it's ownership was moved
handle.join().unwrap();
}
Channels
Channels provide a way for threads to communicate by sending messages to each other. Rust’s standard library includes a multiple-producer, single-consumer channel implementation with the mpsc
module.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Create a channel
let (tx, rx) = mpsc::channel();
// Clone the transmitter for multiple senders
let tx1 = tx.clone();
// First sender thread
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// Second sender thread
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// Receiver in the main thread
for received in rx {
println!("Got: {}", received);
}
}
The channel will automatically close when all transmitters are dropped, causing the for
loop with the receiver to terminate.
Shared State between threads
Sometimes you need to share data between threads rather than passing ownership. Rust’s type system and ownership rules guarantee memory safety, but you need synchronization primitives like mutexes to prevent data races.
Using Mutex for Shared Access
The Mutex<T>
type provides mutual exclusion, ensuring only one thread can access the data at a time:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Wrap the mutex in an Arc (Atomic Reference Counted) to share ownership
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
// Clone the Arc to increase the reference count
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// Lock the mutex to get exclusive access
let mut num = counter.lock().unwrap();
// Modify the value
*num += 1;
// Lock is automatically released when num goes out of scope
});
handles.push(handle);
}
// Wait for all threads to complete
for handle in handles {
handle.join().unwrap();
}
// Print the final value
println!("Final count: {}", *counter.lock().unwrap());
}
Deadlock Prevention
Mutexes can cause deadlocks if not used carefully. A deadlock occurs when two threads each hold a lock the other needs. To avoid deadlocks:
- Acquire locks in a consistent order
- Keep critical sections small
- Use timeouts when acquiring locks
- Consider lock-free data structures for simple cases
Synchronous vs. Asynchronous
By default, Rust is synchronous (code executes sequentially unless explicitly threaded).
However, Rust supports asynchronous programming via async/await, which allows non-blocking concurrency. This enables programs to make progress on other tasks while waiting for I/O or other operations to complete. The async ecosystem in Rust relies on community-driven runtimes such as:
- Tokio: A comprehensive async runtime with I/O, scheduling, and networking capabilities
- async-std: An async version of the standard library
- smol: A small, simple async runtime
Async/Await Basics
The async/await pattern in Rust works by:
- Marking functions with
async
to make them return aFuture
- Using the
await
keyword inside async functions to wait for other futures to complete
Example of async code:
use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;
async fn fetch_data() -> Result<String, Box<dyn Error>> {
// Asynchronous operations
let mut stream = TcpStream::connect("example.com:80").await?;
let request = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
stream.write_all(request).await?;
let mut buffer = Vec::new();
stream.read_to_end(&mut buffer).await?;
Ok(String::from_utf8(buffer)?)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// This will not block the thread
let data = fetch_data().await?;
println!("Received data: {:.100}...", data);
Ok(())
}
Concurrency vs. Parallelism
It’s important to understand the difference:
- Concurrency: Managing multiple tasks and making progress on all of them
- Parallelism: Executing multiple tasks simultaneously on different CPU cores
Async Rust excels at concurrency (handling many tasks efficiently), while threading provides parallelism (utilizing multiple CPU cores).
When to Use Async vs. Threads
-
Use async/await for I/O-bound tasks:
- Network operations
- File I/O
- High-connection services (web servers, API services)
-
Use threads for CPU-bound tasks:
- Complex calculations
- Data processing
- Tasks that need to utilize multiple cores
Combining Async and Threads
For optimal performance in real-world applications, you might combine both approaches:
use tokio::task;
#[tokio::main]
async fn main() {
// Spawn a CPU-intensive task in a separate thread pool
let computation_handle = task::spawn_blocking(|| {
// This runs in a thread pool for CPU-bound tasks
// Perform intensive calculation...
let result = (0..10_000_000).sum::<u64>();
result
});
// Meanwhile, in the async context, we can do other work
let io_handle = tokio::spawn(async {
// This would be an async I/O operation
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
"I/O operation completed"
});
// Wait for both to complete
let (computation_result, io_result) = tokio::join!(
computation_handle,
io_handle
);
println!("Computation result: {}", computation_result.unwrap());
println!("I/O result: {}", io_result.unwrap());
}
This approach gives you the best of both worlds: async for efficient I/O handling and threads for CPU-bound work.