mirror of
https://github.com/rust-lang/rust.git
synced 2024-11-25 16:24:46 +00:00
A concurrency chapter to replace the tasks chapter.
Fixes #18936 Fixes #18938 Fixes #20038 Fixes #8395 Fixes #2080 Fixes #21194
This commit is contained in:
parent
ba2f13ef06
commit
5401f086f0
@ -27,7 +27,7 @@
|
|||||||
* [Iterators](iterators.md)
|
* [Iterators](iterators.md)
|
||||||
* [Generics](generics.md)
|
* [Generics](generics.md)
|
||||||
* [Traits](traits.md)
|
* [Traits](traits.md)
|
||||||
* [Threads](threads.md)
|
* [Concurrency](concurrency.md)
|
||||||
* [Error Handling](error-handling.md)
|
* [Error Handling](error-handling.md)
|
||||||
* [Documentation](documentation.md)
|
* [Documentation](documentation.md)
|
||||||
* [III: Advanced Topics](advanced.md)
|
* [III: Advanced Topics](advanced.md)
|
||||||
|
391
src/doc/trpl/concurrency.md
Normal file
391
src/doc/trpl/concurrency.md
Normal file
@ -0,0 +1,391 @@
|
|||||||
|
% Concurrency
|
||||||
|
|
||||||
|
Concurrency and parallelism are incredibly important topics in computer
|
||||||
|
science, and are also a hot topic in industry today. Computers are gaining more
|
||||||
|
and more cores, yet many programmers aren't prepared to fully utilize them.
|
||||||
|
|
||||||
|
Rust's memory safety features also apply to its concurrency story too. Even
|
||||||
|
concurrent Rust programs must be memory safe, having no data races. Rust's type
|
||||||
|
system is up to the task, and gives you powerful ways to reason about
|
||||||
|
concurrent code at compile time.
|
||||||
|
|
||||||
|
Before we talk about the concurrency features that come with Rust, it's important
|
||||||
|
to understand something: Rust is low-level enough that all of this is provided
|
||||||
|
by the standard library, not by the language. This means that if you don't like
|
||||||
|
some aspect of the way Rust handles concurrency, you can implement an alternative
|
||||||
|
way of doing things. [mio](https://github.com/carllerche/mio) is a real-world
|
||||||
|
example of this principle in action.
|
||||||
|
|
||||||
|
## Background: `Send` and `Sync`
|
||||||
|
|
||||||
|
Concurrency is difficult to reason about. In Rust, we have a strong, static
|
||||||
|
type system to help us reason about our code. As such, Rust gives us two traits
|
||||||
|
to help us make sense of code that can possibly be concurrent.
|
||||||
|
|
||||||
|
### `Send`
|
||||||
|
|
||||||
|
The first trait we're going to talk about is
|
||||||
|
[`Send`](../std/marker/trait.Send.html). When a type `T` implements `Send`, it indicates
|
||||||
|
to the compiler that something of this type is able to have ownership transferred
|
||||||
|
safely between threads.
|
||||||
|
|
||||||
|
This is important to enforce certain restrictions. For example, if we have a
|
||||||
|
channel connecting two threads, we would want to be able to send some data
|
||||||
|
down the channel and to the other thread. Therefore, we'd ensure that `Send` was
|
||||||
|
implemented for that type.
|
||||||
|
|
||||||
|
In the opposite way, if we were wrapping a library with FFI that isn't
|
||||||
|
threadsafe, we wouldn't want to implement `Send`, and so the compiler will help
|
||||||
|
us enforce that it can't leave the current thread.
|
||||||
|
|
||||||
|
### `Sync`
|
||||||
|
|
||||||
|
The second of these two trait is called [`Sync`](../std/marker/trait.Sync.html).
|
||||||
|
When a type `T` implements `Sync`, it indicates to the compiler that something
|
||||||
|
of this type has no possibility of introducing memory unsafety when used from
|
||||||
|
multiple threads concurrently.
|
||||||
|
|
||||||
|
For example, sharing immutable data with an atomic reference count is
|
||||||
|
threadsafe. Rust provides a type like this, `Arc<T>`, and it implements `Sync`,
|
||||||
|
so that it could be safely shared between threads.
|
||||||
|
|
||||||
|
These two traits allow you to use the type system to make strong guarantees
|
||||||
|
about the properties of your code under concurrency. Before we demonstrate
|
||||||
|
why, we need to learn how to create a concurrent Rust program in the first
|
||||||
|
place!
|
||||||
|
|
||||||
|
## Threads
|
||||||
|
|
||||||
|
Rust's standard library provides a library for 'threads', which allow you to
|
||||||
|
run Rust code in parallel. Here's a basic example of using `Thread`:
|
||||||
|
|
||||||
|
```
|
||||||
|
use std::thread::Thread;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
Thread::scoped(|| {
|
||||||
|
println!("Hello from a thread!");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The `Thread::scoped()` method accepts a closure, which is executed in a new
|
||||||
|
thread. It's called `scoped` because this thread returns a join guard:
|
||||||
|
|
||||||
|
```
|
||||||
|
use std::thread::Thread;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let guard = Thread::scoped(|| {
|
||||||
|
println!("Hello from a thread!");
|
||||||
|
});
|
||||||
|
|
||||||
|
// guard goes out of scope here
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
When `guard` goes out of scope, it will block execution until the thread is
|
||||||
|
finished. If we didn't want this behaviour, we could use `Thread::spawn()`:
|
||||||
|
|
||||||
|
```
|
||||||
|
use std::thread::Thread;
|
||||||
|
use std::old_io::timer;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
Thread::spawn(|| {
|
||||||
|
println!("Hello from a thread!");
|
||||||
|
});
|
||||||
|
|
||||||
|
timer::sleep(Duration::milliseconds(50));
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Or call `.detach()`:
|
||||||
|
|
||||||
|
```
|
||||||
|
use std::thread::Thread;
|
||||||
|
use std::old_io::timer;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let guard = Thread::scoped(|| {
|
||||||
|
println!("Hello from a thread!");
|
||||||
|
});
|
||||||
|
|
||||||
|
guard.detach();
|
||||||
|
|
||||||
|
timer::sleep(Duration::milliseconds(50));
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
We need to `sleep` here because when `main()` ends, it kills all of the
|
||||||
|
running threads.
|
||||||
|
|
||||||
|
[`scoped`](std/thread/struct.Builder.html#method.scoped) has an interesting
|
||||||
|
type signature:
|
||||||
|
|
||||||
|
```text
|
||||||
|
fn scoped<'a, T, F>(self, f: F) -> JoinGuard<'a, T>
|
||||||
|
where T: Send + 'a,
|
||||||
|
F: FnOnce() -> T,
|
||||||
|
F: Send + 'a
|
||||||
|
```
|
||||||
|
|
||||||
|
Specifically, `F`, the closure that we pass to execute in the new thread. It
|
||||||
|
has two restrictions: It must be a `FnOnce` from `()` to `T`. Using `FnOnce`
|
||||||
|
allows the closure to take ownership of any data it mentions from the parent
|
||||||
|
thread. The other restriction is that `F` must be `Send`. We aren't allowed to
|
||||||
|
transfer this ownership unless the type thinks that's okay.
|
||||||
|
|
||||||
|
Many languages have the ability to execute threads, but it's wildly unsafe.
|
||||||
|
There are entire books about how to prevent errors that occur from shared
|
||||||
|
mutable state. Rust helps out with its type system here as well, by preventing
|
||||||
|
data races at compile time. Let's talk about how you actually share things
|
||||||
|
between threads.
|
||||||
|
|
||||||
|
## Safe Shared Mutable State
|
||||||
|
|
||||||
|
Due to Rust's type system, we have a concept that sounds like a lie: "safe
|
||||||
|
shared mutable state." Many programmers agree that shared mutable state is
|
||||||
|
very, very bad.
|
||||||
|
|
||||||
|
Someone once said this:
|
||||||
|
|
||||||
|
> Shared mutable state is the root of all evil. Most languages attempt to deal
|
||||||
|
> with this problem through the 'mutable' part, but Rust deals with it by
|
||||||
|
> solving the 'shared' part.
|
||||||
|
|
||||||
|
The same [ownership system](ownership.html) that helps prevent using pointers
|
||||||
|
incorrectly also helps rule out data races, one of the worst kinds of
|
||||||
|
concurrency bugs.
|
||||||
|
|
||||||
|
As an example, here is a Rust program that would have a data race in many
|
||||||
|
languages. It will not compile:
|
||||||
|
|
||||||
|
```ignore
|
||||||
|
use std::thread::Thread;
|
||||||
|
use std::old_io::timer;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let mut data = vec![1u32, 2, 3];
|
||||||
|
|
||||||
|
for i in 0 .. 2 {
|
||||||
|
Thread::spawn(move || {
|
||||||
|
data[i] += 1;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
timer::sleep(Duration::milliseconds(50));
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
This gives us an error:
|
||||||
|
|
||||||
|
```text
|
||||||
|
12:17 error: capture of moved value: `data`
|
||||||
|
data[i] += 1;
|
||||||
|
^~~~
|
||||||
|
```
|
||||||
|
|
||||||
|
In this case, we know that our code _should_ be safe, but Rust isn't sure. And
|
||||||
|
it's actually not safe: if we had a reference to `data` in each thread, and the
|
||||||
|
thread takes ownership of the reference, we have three owners! That's bad. We
|
||||||
|
can fix this by using the `Arc<T>` type, which is an atomic reference counted
|
||||||
|
pointer. The 'atomic' part means that it's safe to share across threads.
|
||||||
|
|
||||||
|
`Arc<T>` assumes one more property about its contents to ensure that it is safe
|
||||||
|
to share across threads: it assumes its contents are `Sync`. But in our
|
||||||
|
case, we want to be able to mutate the value. We need a type that can ensure
|
||||||
|
only one person at a time can mutate what's inside. For that, we can use the
|
||||||
|
`Mutex<T>` type. Here's the second version of our code. It still doesn't work,
|
||||||
|
but for a different reason:
|
||||||
|
|
||||||
|
```ignore
|
||||||
|
use std::thread::Thread;
|
||||||
|
use std::old_io::timer;
|
||||||
|
use std::time::Duration;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let mut data = Mutex::new(vec![1u32, 2, 3]);
|
||||||
|
|
||||||
|
for i in 0 .. 2 {
|
||||||
|
let data = data.lock().unwrap();
|
||||||
|
Thread::spawn(move || {
|
||||||
|
data[i] += 1;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
timer::sleep(Duration::milliseconds(50));
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Here's the error:
|
||||||
|
|
||||||
|
```text
|
||||||
|
<anon>:11:9: 11:22 error: the trait `core::marker::Send` is not implemented for the type `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` [E0277]
|
||||||
|
<anon>:11 Thread::spawn(move || {
|
||||||
|
^~~~~~~~~~~~~
|
||||||
|
<anon>:11:9: 11:22 note: `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` cannot be sent between threads safely
|
||||||
|
<anon>:11 Thread::spawn(move || {
|
||||||
|
^~~~~~~~~~~~~
|
||||||
|
```
|
||||||
|
|
||||||
|
You see, [`Mutex`](std/sync/struct.Mutex.html) has a
|
||||||
|
[`lock`](http://doc.rust-lang.org/nightly/std/sync/struct.Mutex.html#method.lock)
|
||||||
|
method which has this signature:
|
||||||
|
|
||||||
|
```ignore
|
||||||
|
fn lock(&self) -> LockResult<MutexGuard<T>>
|
||||||
|
```
|
||||||
|
|
||||||
|
If we [look at the code for MutexGuard](https://github.com/rust-lang/rust/blob/ca4b9674c26c1de07a2042cb68e6a062d7184cef/src/libstd/sync/mutex.rs#L172), we'll see
|
||||||
|
this:
|
||||||
|
|
||||||
|
```ignore
|
||||||
|
__marker: marker::NoSend,
|
||||||
|
```
|
||||||
|
|
||||||
|
Because our guard is `NoSend`, it's not `Send`. Which means we can't actually
|
||||||
|
transfer the guard across thread boundaries, which gives us our error.
|
||||||
|
|
||||||
|
We can use `Arc<T>` to fix this. Here's the working version:
|
||||||
|
|
||||||
|
```
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::thread::Thread;
|
||||||
|
use std::old_io::timer;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let data = Arc::new(Mutex::new(vec![1u32, 2, 3]));
|
||||||
|
|
||||||
|
for i in (0us..2) {
|
||||||
|
let data = data.clone();
|
||||||
|
Thread::spawn(move || {
|
||||||
|
let mut data = data.lock().unwrap();
|
||||||
|
data[i] += 1;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
timer::sleep(Duration::milliseconds(50));
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
We now call `clone()` on our `Arc`, which increases the internal count. This
|
||||||
|
handle is then moved into the new thread. Let's examine the body of the
|
||||||
|
thread more closely:
|
||||||
|
|
||||||
|
```
|
||||||
|
# use std::sync::{Arc, Mutex};
|
||||||
|
# use std::thread::Thread;
|
||||||
|
# use std::old_io::timer;
|
||||||
|
# use std::time::Duration;
|
||||||
|
# fn main() {
|
||||||
|
# let data = Arc::new(Mutex::new(vec![1u32, 2, 3]));
|
||||||
|
# for i in (0us..2) {
|
||||||
|
# let data = data.clone();
|
||||||
|
Thread::spawn(move || {
|
||||||
|
let mut data = data.lock().unwrap();
|
||||||
|
data[i] += 1;
|
||||||
|
});
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
```
|
||||||
|
|
||||||
|
First, we call `lock()`, which acquires the mutex's lock. Because this may fail,
|
||||||
|
it returns an `Result<T, E>`, and because this is just an example, we `unwrap()`
|
||||||
|
it to get a reference to the data. Real code would have more robust error handling
|
||||||
|
here. We're then free to mutate it, since we have the lock.
|
||||||
|
|
||||||
|
This timer bit is a bit awkward, however. We have picked a reasonable amount of
|
||||||
|
time to wait, but it's entirely possible that we've picked too high, and that
|
||||||
|
we could be taking less time. It's also possible that we've picked too low,
|
||||||
|
and that we aren't actually finishing this computation.
|
||||||
|
|
||||||
|
Rust's standard library provides a few more mechanisms for two threads to
|
||||||
|
synchronize with each other. Let's talk about one: channels.
|
||||||
|
|
||||||
|
## Channels
|
||||||
|
|
||||||
|
Here's a version of our code that uses channels for synchronization, rather
|
||||||
|
than waiting for a specific time:
|
||||||
|
|
||||||
|
```
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::thread::Thread;
|
||||||
|
use std::sync::mpsc;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let data = Arc::new(Mutex::new(0u32));
|
||||||
|
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
|
for _ in (0..10) {
|
||||||
|
let (data, tx) = (data.clone(), tx.clone());
|
||||||
|
|
||||||
|
Thread::spawn(move || {
|
||||||
|
let mut data = data.lock().unwrap();
|
||||||
|
*data += 1;
|
||||||
|
|
||||||
|
tx.send(());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
for _ in 0 .. 10 {
|
||||||
|
rx.recv();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
We use the `mpsc::channel()` method to construct a new channel. We just `send`
|
||||||
|
a simple `()` down the channel, and then wait for ten of them to come back.
|
||||||
|
|
||||||
|
While this channel is just sending a generic signal, we can send any data that
|
||||||
|
is `Send` over the channel!
|
||||||
|
|
||||||
|
```
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::thread::Thread;
|
||||||
|
use std::sync::mpsc;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
|
for _ in range(0, 10) {
|
||||||
|
let tx = tx.clone();
|
||||||
|
|
||||||
|
Thread::spawn(move || {
|
||||||
|
let answer = 42u32;
|
||||||
|
|
||||||
|
tx.send(answer);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
rx.recv().ok().expect("Could not recieve answer");
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
A `u32` is `Send` because we can make a copy. So we create a thread, ask it to calculate
|
||||||
|
the answer, and then it `send()`s us the answer over the channel.
|
||||||
|
|
||||||
|
|
||||||
|
## Panics
|
||||||
|
|
||||||
|
A `panic!` will crash the currently executing thread. You can use Rust's
|
||||||
|
threads as a simple isolation mechanism:
|
||||||
|
|
||||||
|
```
|
||||||
|
use std::thread::Thread;
|
||||||
|
|
||||||
|
let result = Thread::scoped(move || {
|
||||||
|
panic!("oops!");
|
||||||
|
}).join();
|
||||||
|
|
||||||
|
assert!(result.is_err());
|
||||||
|
```
|
||||||
|
|
||||||
|
Our `Thread` gives us a `Result` back, which allows us to check if the thread
|
||||||
|
has panicked or not.
|
@ -1,396 +0,0 @@
|
|||||||
% The Rust Threads and Communication Guide
|
|
||||||
|
|
||||||
**NOTE** This guide is badly out of date and needs to be rewritten.
|
|
||||||
|
|
||||||
# Introduction
|
|
||||||
|
|
||||||
Rust provides safe concurrent abstractions through a number of core library
|
|
||||||
primitives. This guide will describe the concurrency model in Rust, how it
|
|
||||||
relates to the Rust type system, and introduce the fundamental library
|
|
||||||
abstractions for constructing concurrent programs.
|
|
||||||
|
|
||||||
Threads provide failure isolation and recovery. When a fatal error occurs in Rust
|
|
||||||
code as a result of an explicit call to `panic!()`, an assertion failure, or
|
|
||||||
another invalid operation, the runtime system destroys the entire thread. Unlike
|
|
||||||
in languages such as Java and C++, there is no way to `catch` an exception.
|
|
||||||
Instead, threads may monitor each other to see if they panic.
|
|
||||||
|
|
||||||
Threads use Rust's type system to provide strong memory safety guarantees. In
|
|
||||||
particular, the type system guarantees that threads cannot induce a data race
|
|
||||||
from shared mutable state.
|
|
||||||
|
|
||||||
# Basics
|
|
||||||
|
|
||||||
At its simplest, creating a thread is a matter of calling the `spawn` function
|
|
||||||
with a closure argument. `spawn` executes the closure in the new thread.
|
|
||||||
|
|
||||||
```{rust,ignore}
|
|
||||||
# use std::thread::spawn;
|
|
||||||
|
|
||||||
// Print something profound in a different thread using a named function
|
|
||||||
fn print_message() { println!("I am running in a different thread!"); }
|
|
||||||
spawn(print_message);
|
|
||||||
|
|
||||||
// Alternatively, use a `move ||` expression instead of a named function.
|
|
||||||
// `||` expressions evaluate to an unnamed closure. The `move` keyword
|
|
||||||
// indicates that the closure should take ownership of any variables it
|
|
||||||
// touches.
|
|
||||||
spawn(move || println!("I am also running in a different thread!"));
|
|
||||||
```
|
|
||||||
|
|
||||||
In Rust, a thread is not a concept that appears in the language semantics.
|
|
||||||
Instead, Rust's type system provides all the tools necessary to implement safe
|
|
||||||
concurrency: particularly, ownership. The language leaves the implementation
|
|
||||||
details to the standard library.
|
|
||||||
|
|
||||||
The `spawn` function has the type signature: `fn
|
|
||||||
spawn<F:FnOnce()+Send>(f: F)`. This indicates that it takes as
|
|
||||||
argument a closure (of type `F`) that it will run exactly once. This
|
|
||||||
closure is limited to capturing `Send`-able data from its environment
|
|
||||||
(that is, data which is deeply owned). Limiting the closure to `Send`
|
|
||||||
ensures that `spawn` can safely move the entire closure and all its
|
|
||||||
associated state into an entirely different thread for execution.
|
|
||||||
|
|
||||||
```rust
|
|
||||||
use std::thread::Thread;
|
|
||||||
|
|
||||||
fn generate_thread_number() -> i32 { 4 } // a very simple generation
|
|
||||||
|
|
||||||
// Generate some state locally
|
|
||||||
let child_thread_number = generate_thread_number();
|
|
||||||
|
|
||||||
Thread::spawn(move || {
|
|
||||||
// Capture it in the remote thread. The `move` keyword indicates
|
|
||||||
// that this closure should move `child_thread_number` into its
|
|
||||||
// environment, rather than capturing a reference into the
|
|
||||||
// enclosing stack frame.
|
|
||||||
println!("I am child number {}", child_thread_number);
|
|
||||||
});
|
|
||||||
```
|
|
||||||
|
|
||||||
## Communication
|
|
||||||
|
|
||||||
Now that we have spawned a new thread, it would be nice if we could communicate
|
|
||||||
with it. For this, we use *channels*. A channel is simply a pair of endpoints:
|
|
||||||
one for sending messages and another for receiving messages.
|
|
||||||
|
|
||||||
The simplest way to create a channel is to use the `channel` function to create a
|
|
||||||
`(Sender, Receiver)` pair. In Rust parlance, a *sender* is a sending endpoint
|
|
||||||
of a channel, and a *receiver* is the receiving endpoint. Consider the following
|
|
||||||
example of calculating two results concurrently:
|
|
||||||
|
|
||||||
```rust
|
|
||||||
use std::thread::Thread;
|
|
||||||
use std::sync::mpsc;
|
|
||||||
|
|
||||||
let (tx, rx): (mpsc::Sender<u32>, mpsc::Receiver<u32>) = mpsc::channel();
|
|
||||||
|
|
||||||
Thread::spawn(move || {
|
|
||||||
let result = some_expensive_computation();
|
|
||||||
tx.send(result);
|
|
||||||
});
|
|
||||||
|
|
||||||
some_other_expensive_computation();
|
|
||||||
let result = rx.recv();
|
|
||||||
|
|
||||||
fn some_expensive_computation() -> u32 { 42 } // very expensive ;)
|
|
||||||
fn some_other_expensive_computation() {} // even more so
|
|
||||||
```
|
|
||||||
|
|
||||||
Let's examine this example in detail. First, the `let` statement creates a
|
|
||||||
stream for sending and receiving integers (the left-hand side of the `let`,
|
|
||||||
`(tx, rx)`, is an example of a destructuring let: the pattern separates a tuple
|
|
||||||
into its component parts).
|
|
||||||
|
|
||||||
```rust
|
|
||||||
# use std::sync::mpsc;
|
|
||||||
let (tx, rx): (mpsc::Sender<u32>, mpsc::Receiver<u32>) = mpsc::channel();
|
|
||||||
```
|
|
||||||
|
|
||||||
The child thread will use the sender to send data to the parent thread, which will
|
|
||||||
wait to receive the data on the receiver. The next statement spawns the child
|
|
||||||
thread.
|
|
||||||
|
|
||||||
```rust
|
|
||||||
# use std::thread::Thread;
|
|
||||||
# use std::sync::mpsc;
|
|
||||||
# fn some_expensive_computation() -> u32 { 42 }
|
|
||||||
# let (tx, rx) = mpsc::channel();
|
|
||||||
Thread::spawn(move || {
|
|
||||||
let result = some_expensive_computation();
|
|
||||||
tx.send(result);
|
|
||||||
});
|
|
||||||
```
|
|
||||||
|
|
||||||
Notice that the creation of the thread closure transfers `tx` to the child thread
|
|
||||||
implicitly: the closure captures `tx` in its environment. Both `Sender` and
|
|
||||||
`Receiver` are sendable types and may be captured into threads or otherwise
|
|
||||||
transferred between them. In the example, the child thread runs an expensive
|
|
||||||
computation, then sends the result over the captured channel.
|
|
||||||
|
|
||||||
Finally, the parent continues with some other expensive computation, then waits
|
|
||||||
for the child's result to arrive on the receiver:
|
|
||||||
|
|
||||||
```rust
|
|
||||||
# use std::sync::mpsc;
|
|
||||||
# fn some_other_expensive_computation() {}
|
|
||||||
# let (tx, rx) = mpsc::channel::<u32>();
|
|
||||||
# tx.send(0);
|
|
||||||
some_other_expensive_computation();
|
|
||||||
let result = rx.recv();
|
|
||||||
```
|
|
||||||
|
|
||||||
The `Sender` and `Receiver` pair created by `channel` enables efficient
|
|
||||||
communication between a single sender and a single receiver, but multiple
|
|
||||||
senders cannot use a single `Sender` value, and multiple receivers cannot use a
|
|
||||||
single `Receiver` value. What if our example needed to compute multiple
|
|
||||||
results across a number of threads? The following program is ill-typed:
|
|
||||||
|
|
||||||
```{rust,ignore}
|
|
||||||
# use std::sync::mpsc;
|
|
||||||
# fn some_expensive_computation() -> u32 { 42 }
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
|
|
||||||
spawn(move || {
|
|
||||||
tx.send(some_expensive_computation());
|
|
||||||
});
|
|
||||||
|
|
||||||
// ERROR! The previous spawn statement already owns the sender,
|
|
||||||
// so the compiler will not allow it to be captured again
|
|
||||||
spawn(move || {
|
|
||||||
tx.send(some_expensive_computation());
|
|
||||||
});
|
|
||||||
```
|
|
||||||
|
|
||||||
Instead we can clone the `tx`, which allows for multiple senders.
|
|
||||||
|
|
||||||
```rust
|
|
||||||
use std::thread::Thread;
|
|
||||||
use std::sync::mpsc;
|
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
|
|
||||||
for init_val in 0 .. 3 {
|
|
||||||
// Create a new channel handle to distribute to the child thread
|
|
||||||
let child_tx = tx.clone();
|
|
||||||
Thread::spawn(move || {
|
|
||||||
child_tx.send(some_expensive_computation(init_val));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let result = rx.recv().unwrap() + rx.recv().unwrap() + rx.recv().unwrap();
|
|
||||||
# fn some_expensive_computation(_i: i32) -> i32 { 42 }
|
|
||||||
```
|
|
||||||
|
|
||||||
Cloning a `Sender` produces a new handle to the same channel, allowing multiple
|
|
||||||
threads to send data to a single receiver. It upgrades the channel internally in
|
|
||||||
order to allow this functionality, which means that channels that are not
|
|
||||||
cloned can avoid the overhead required to handle multiple senders. But this
|
|
||||||
fact has no bearing on the channel's usage: the upgrade is transparent.
|
|
||||||
|
|
||||||
Note that the above cloning example is somewhat contrived since you could also
|
|
||||||
simply use three `Sender` pairs, but it serves to illustrate the point. For
|
|
||||||
reference, written with multiple streams, it might look like the example below.
|
|
||||||
|
|
||||||
```rust
|
|
||||||
use std::thread::Thread;
|
|
||||||
use std::sync::mpsc;
|
|
||||||
|
|
||||||
// Create a vector of ports, one for each child thread
|
|
||||||
let rxs = (0 .. 3).map(|&:init_val| {
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
Thread::spawn(move || {
|
|
||||||
tx.send(some_expensive_computation(init_val));
|
|
||||||
});
|
|
||||||
rx
|
|
||||||
}).collect::<Vec<_>>();
|
|
||||||
|
|
||||||
// Wait on each port, accumulating the results
|
|
||||||
let result = rxs.iter().fold(0, |&:accum, rx| accum + rx.recv().unwrap() );
|
|
||||||
# fn some_expensive_computation(_i: i32) -> i32 { 42 }
|
|
||||||
```
|
|
||||||
|
|
||||||
## Backgrounding computations: Futures
|
|
||||||
|
|
||||||
With `sync::Future`, rust has a mechanism for requesting a computation and
|
|
||||||
getting the result later.
|
|
||||||
|
|
||||||
The basic example below illustrates this.
|
|
||||||
|
|
||||||
```{rust,ignore}
|
|
||||||
# #![allow(deprecated)]
|
|
||||||
use std::sync::Future;
|
|
||||||
|
|
||||||
# fn main() {
|
|
||||||
# fn make_a_sandwich() {};
|
|
||||||
fn fib(n: u64) -> u64 {
|
|
||||||
// lengthy computation returning an 64
|
|
||||||
12586269025
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut delayed_fib = Future::spawn(move || fib(50));
|
|
||||||
make_a_sandwich();
|
|
||||||
println!("fib(50) = {}", delayed_fib.get())
|
|
||||||
# }
|
|
||||||
```
|
|
||||||
|
|
||||||
The call to `future::spawn` immediately returns a `future` object regardless of
|
|
||||||
how long it takes to run `fib(50)`. You can then make yourself a sandwich while
|
|
||||||
the computation of `fib` is running. The result of the execution of the method
|
|
||||||
is obtained by calling `get` on the future. This call will block until the
|
|
||||||
value is available (*i.e.* the computation is complete). Note that the future
|
|
||||||
needs to be mutable so that it can save the result for next time `get` is
|
|
||||||
called.
|
|
||||||
|
|
||||||
Here is another example showing how futures allow you to background
|
|
||||||
computations. The workload will be distributed on the available cores.
|
|
||||||
|
|
||||||
```{rust,ignore}
|
|
||||||
# #![allow(deprecated)]
|
|
||||||
# use std::num::Float;
|
|
||||||
# use std::sync::Future;
|
|
||||||
fn partial_sum(start: u64) -> f64 {
|
|
||||||
let mut local_sum = 0f64;
|
|
||||||
for num in range(start*100000, (start+1)*100000) {
|
|
||||||
local_sum += (num as f64 + 1.0).powf(-2.0);
|
|
||||||
}
|
|
||||||
local_sum
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
let mut futures = Vec::from_fn(200, |ind| Future::spawn(move || partial_sum(ind)));
|
|
||||||
|
|
||||||
let mut final_res = 0f64;
|
|
||||||
for ft in futures.iter_mut() {
|
|
||||||
final_res += ft.get();
|
|
||||||
}
|
|
||||||
println!("π^2/6 is not far from : {}", final_res);
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## Sharing without copying: Arc
|
|
||||||
|
|
||||||
To share data between threads, a first approach would be to only use channel as
|
|
||||||
we have seen previously. A copy of the data to share would then be made for
|
|
||||||
each thread. In some cases, this would add up to a significant amount of wasted
|
|
||||||
memory and would require copying the same data more than necessary.
|
|
||||||
|
|
||||||
To tackle this issue, one can use an Atomically Reference Counted wrapper
|
|
||||||
(`Arc`) as implemented in the `sync` library of Rust. With an Arc, the data
|
|
||||||
will no longer be copied for each thread. The Arc acts as a reference to the
|
|
||||||
shared data and only this reference is shared and cloned.
|
|
||||||
|
|
||||||
Here is a small example showing how to use Arcs. We wish to run concurrently
|
|
||||||
several computations on a single large vector of floats. Each thread needs the
|
|
||||||
full vector to perform its duty.
|
|
||||||
|
|
||||||
```{rust,ignore}
|
|
||||||
use std::num::Float;
|
|
||||||
use std::rand;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
fn pnorm(nums: &[f64], p: u64) -> f64 {
|
|
||||||
nums.iter().fold(0.0, |a, b| a + b.powf(p as f64)).powf(1.0 / (p as f64))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
let numbers = Vec::from_fn(1000000, |_| rand::random::<f64>());
|
|
||||||
let numbers_arc = Arc::new(numbers);
|
|
||||||
|
|
||||||
for num in range(1, 10) {
|
|
||||||
let thread_numbers = numbers_arc.clone();
|
|
||||||
|
|
||||||
spawn(move || {
|
|
||||||
println!("{}-norm = {}", num, pnorm(thread_numbers.as_slice(), num));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
The function `pnorm` performs a simple computation on the vector (it computes
|
|
||||||
the sum of its items at the power given as argument and takes the inverse power
|
|
||||||
of this value). The Arc on the vector is created by the line:
|
|
||||||
|
|
||||||
```{rust,ignore}
|
|
||||||
# use std::rand;
|
|
||||||
# use std::sync::Arc;
|
|
||||||
# fn main() {
|
|
||||||
# let numbers = Vec::from_fn(1000000, |_| rand::random::<f64>());
|
|
||||||
let numbers_arc = Arc::new(numbers);
|
|
||||||
# }
|
|
||||||
```
|
|
||||||
|
|
||||||
and a clone is captured for each thread via a procedure. This only copies
|
|
||||||
the wrapper and not its contents. Within the thread's procedure, the captured
|
|
||||||
Arc reference can be used as a shared reference to the underlying vector as
|
|
||||||
if it were local.
|
|
||||||
|
|
||||||
```{rust,ignore}
|
|
||||||
# use std::rand;
|
|
||||||
# use std::sync::Arc;
|
|
||||||
# fn pnorm(nums: &[f64], p: u64) -> f64 { 4.0 }
|
|
||||||
# fn main() {
|
|
||||||
# let numbers=Vec::from_fn(1000000, |_| rand::random::<f64>());
|
|
||||||
# let numbers_arc = Arc::new(numbers);
|
|
||||||
# let num = 4;
|
|
||||||
let thread_numbers = numbers_arc.clone();
|
|
||||||
spawn(move || {
|
|
||||||
// Capture thread_numbers and use it as if it was the underlying vector
|
|
||||||
println!("{}-norm = {}", num, pnorm(thread_numbers.as_slice(), num));
|
|
||||||
});
|
|
||||||
# }
|
|
||||||
```
|
|
||||||
|
|
||||||
# Handling thread panics
|
|
||||||
|
|
||||||
Rust has a built-in mechanism for raising exceptions. The `panic!()` macro
|
|
||||||
(which can also be written with an error string as an argument: `panic!(
|
|
||||||
~reason)`) and the `assert!` construct (which effectively calls `panic!()` if a
|
|
||||||
boolean expression is false) are both ways to raise exceptions. When a thread
|
|
||||||
raises an exception, the thread unwinds its stack—running destructors and
|
|
||||||
freeing memory along the way—and then exits. Unlike exceptions in C++,
|
|
||||||
exceptions in Rust are unrecoverable within a single thread: once a thread panics,
|
|
||||||
there is no way to "catch" the exception.
|
|
||||||
|
|
||||||
While it isn't possible for a thread to recover from panicking, threads may notify
|
|
||||||
each other if they panic. The simplest way of handling a panic is with the
|
|
||||||
`try` function, which is similar to `spawn`, but immediately blocks and waits
|
|
||||||
for the child thread to finish. `try` returns a value of type
|
|
||||||
`Result<T, Box<Any + Send>>`. `Result` is an `enum` type with two variants:
|
|
||||||
`Ok` and `Err`. In this case, because the type arguments to `Result` are `i32`
|
|
||||||
and `()`, callers can pattern-match on a result to check whether it's an `Ok`
|
|
||||||
result with an `i32` field (representing a successful result) or an `Err` result
|
|
||||||
(representing termination with an error).
|
|
||||||
|
|
||||||
```{rust,ignore}
|
|
||||||
# use std::thread::Thread;
|
|
||||||
# fn some_condition() -> bool { false }
|
|
||||||
# fn calculate_result() -> i32 { 0 }
|
|
||||||
let result: Result<i32, Box<std::any::Any + Send>> = Thread::spawn(move || {
|
|
||||||
if some_condition() {
|
|
||||||
calculate_result()
|
|
||||||
} else {
|
|
||||||
panic!("oops!");
|
|
||||||
}
|
|
||||||
}).join();
|
|
||||||
assert!(result.is_err());
|
|
||||||
```
|
|
||||||
|
|
||||||
Unlike `spawn`, the function spawned using `try` may return a value, which
|
|
||||||
`try` will dutifully propagate back to the caller in a [`Result`] enum. If the
|
|
||||||
child thread terminates successfully, `try` will return an `Ok` result; if the
|
|
||||||
child thread panics, `try` will return an `Error` result.
|
|
||||||
|
|
||||||
[`Result`]: ../std/result/index.html
|
|
||||||
|
|
||||||
> *Note:* A panicked thread does not currently produce a useful error
|
|
||||||
> value (`try` always returns `Err(())`). In the
|
|
||||||
> future, it may be possible for threads to intercept the value passed to
|
|
||||||
> `panic!()`.
|
|
||||||
|
|
||||||
But not all panics are created equal. In some cases you might need to abort
|
|
||||||
the entire program (perhaps you're writing an assert which, if it trips,
|
|
||||||
indicates an unrecoverable logic error); in other cases you might want to
|
|
||||||
contain the panic at a certain boundary (perhaps a small piece of input from
|
|
||||||
the outside world, which you happen to be processing in parallel, is malformed
|
|
||||||
such that the processing thread cannot proceed).
|
|
Loading…
Reference in New Issue
Block a user