2012-09-27 02:00:13 +00:00
|
|
|
% Rust Tasks and Communication Tutorial
|
|
|
|
|
|
|
|
# Introduction
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2013-04-04 05:34:29 +00:00
|
|
|
Rust provides safe concurrency through a combination
|
|
|
|
of lightweight, memory-isolated tasks and message passing.
|
|
|
|
This tutorial will describe the concurrency model in Rust, how it
|
|
|
|
relates to the Rust type system, and introduce
|
|
|
|
the fundamental library abstractions for constructing concurrent programs.
|
|
|
|
|
|
|
|
Rust tasks are not the same as traditional threads: rather,
|
|
|
|
they are considered _green threads_, lightweight units of execution that the Rust
|
|
|
|
runtime schedules cooperatively onto a small number of operating system threads.
|
|
|
|
On a multi-core system Rust tasks will be scheduled in parallel by default.
|
|
|
|
Because tasks are significantly
|
2012-10-09 23:14:55 +00:00
|
|
|
cheaper to create than traditional threads, Rust can create hundreds of
|
|
|
|
thousands of concurrent tasks on a typical 32-bit system.
|
2013-04-04 05:34:29 +00:00
|
|
|
In general, all Rust code executes inside a task, including the `main` function.
|
|
|
|
|
|
|
|
In order to make efficient use of memory Rust tasks have dynamically sized stacks.
|
|
|
|
A task begins its life with a small
|
|
|
|
amount of stack space (currently in the low thousands of bytes, depending on
|
|
|
|
platform), and acquires more stack as needed.
|
|
|
|
Unlike in languages such as C, a Rust task cannot accidentally write to
|
|
|
|
memory beyond the end of the stack, causing crashes or worse.
|
2012-10-09 23:14:55 +00:00
|
|
|
|
2013-04-04 05:34:29 +00:00
|
|
|
Tasks provide failure isolation and recovery. When a fatal error occurs in Rust
|
|
|
|
code as a result of an explicit call to `fail!()`, an assertion failure, or
|
|
|
|
another invalid operation, the runtime system destroys the entire
|
2012-10-09 23:14:55 +00:00
|
|
|
task. Unlike in languages such as Java and C++, there is no way to `catch` an
|
|
|
|
exception. Instead, tasks may monitor each other for failure.
|
|
|
|
|
|
|
|
Tasks use Rust's type system to provide strong memory safety guarantees. In
|
|
|
|
particular, the type system guarantees that tasks cannot share mutable state
|
|
|
|
with each other. Tasks communicate with each other by transferring _owned_
|
|
|
|
data through the global _exchange heap_.
|
|
|
|
|
2012-10-01 22:41:21 +00:00
|
|
|
## A note about the libraries
|
2012-09-30 02:21:12 +00:00
|
|
|
|
|
|
|
While Rust's type system provides the building blocks needed for safe
|
|
|
|
and efficient tasks, all of the task functionality itself is implemented
|
2013-05-23 20:06:29 +00:00
|
|
|
in the standard and extra libraries, which are still under development
|
2013-04-04 05:34:29 +00:00
|
|
|
and do not always present a consistent or complete interface.
|
2012-09-30 02:21:12 +00:00
|
|
|
|
|
|
|
For your reference, these are the standard modules involved in Rust
|
2013-05-17 21:11:49 +00:00
|
|
|
concurrency at this writing:
|
2012-09-30 02:21:12 +00:00
|
|
|
|
2013-05-23 20:06:29 +00:00
|
|
|
* [`std::task`] - All code relating to tasks and task scheduling,
|
|
|
|
* [`std::comm`] - The message passing interface,
|
|
|
|
* [`std::pipes`] - The underlying messaging infrastructure,
|
|
|
|
* [`extra::comm`] - Additional messaging types based on `std::pipes`,
|
|
|
|
* [`extra::sync`] - More exotic synchronization tools, including locks,
|
2013-07-22 20:57:40 +00:00
|
|
|
* [`extra::arc`] - The Arc (atomically reference counted) type,
|
2013-05-17 21:11:49 +00:00
|
|
|
for safely sharing immutable data,
|
2013-05-23 20:06:29 +00:00
|
|
|
* [`extra::future`] - A type representing values that may be computed concurrently and retrieved at a later time.
|
2012-09-30 02:21:12 +00:00
|
|
|
|
2013-05-23 20:06:29 +00:00
|
|
|
[`std::task`]: std/task.html
|
2012-09-30 02:21:12 +00:00
|
|
|
[`std::comm`]: std/comm.html
|
2013-05-23 20:06:29 +00:00
|
|
|
[`std::pipes`]: std/pipes.html
|
|
|
|
[`extra::comm`]: extra/comm.html
|
|
|
|
[`extra::sync`]: extra/sync.html
|
|
|
|
[`extra::arc`]: extra/arc.html
|
|
|
|
[`extra::future`]: extra/future.html
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2012-10-01 22:41:21 +00:00
|
|
|
# Basics
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
The programming interface for creating and managing tasks lives
|
2013-05-23 20:06:29 +00:00
|
|
|
in the `task` module of the `std` library, and is thus available to all
|
2012-10-09 23:14:55 +00:00
|
|
|
Rust code by default. At its simplest, creating a task is a matter of
|
|
|
|
calling the `spawn` function with a closure argument. `spawn` executes the
|
|
|
|
closure in the new task.
|
2012-09-22 22:33:50 +00:00
|
|
|
|
|
|
|
~~~~
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::io::println;
|
|
|
|
# use std::task::spawn;
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2012-10-01 22:41:21 +00:00
|
|
|
// Print something profound in a different task using a named function
|
|
|
|
fn print_message() { println("I am running in a different task!"); }
|
|
|
|
spawn(print_message);
|
|
|
|
|
|
|
|
// Print something more profound in a different task using a lambda expression
|
|
|
|
spawn( || println("I am also running in a different task!") );
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2012-10-01 22:41:21 +00:00
|
|
|
// The canonical way to spawn is using `do` notation
|
2012-09-22 22:33:50 +00:00
|
|
|
do spawn {
|
2012-10-01 22:41:21 +00:00
|
|
|
println("I too am running in a different task!");
|
2012-09-22 22:33:50 +00:00
|
|
|
}
|
|
|
|
~~~~
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
In Rust, there is nothing special about creating tasks: a task is not a
|
|
|
|
concept that appears in the language semantics. Instead, Rust's type system
|
|
|
|
provides all the tools necessary to implement safe concurrency: particularly,
|
2013-05-23 20:06:29 +00:00
|
|
|
_owned types_. The language leaves the implementation details to the standard
|
2012-10-09 23:14:55 +00:00
|
|
|
library.
|
2012-10-01 22:41:21 +00:00
|
|
|
|
|
|
|
The `spawn` function has a very simple type signature: `fn spawn(f:
|
|
|
|
~fn())`. Because it accepts only owned closures, and owned closures
|
2012-10-09 23:14:55 +00:00
|
|
|
contain only owned data, `spawn` can safely move the entire closure
|
2012-10-01 22:41:21 +00:00
|
|
|
and all its associated state into an entirely different task for
|
2012-10-09 23:14:55 +00:00
|
|
|
execution. Like any closure, the function passed to `spawn` may capture
|
2012-10-01 22:41:21 +00:00
|
|
|
an environment that it carries across tasks.
|
|
|
|
|
|
|
|
~~~
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::io::println;
|
|
|
|
# use std::task::spawn;
|
2012-10-01 22:41:21 +00:00
|
|
|
# fn generate_task_number() -> int { 0 }
|
|
|
|
// Generate some state locally
|
|
|
|
let child_task_number = generate_task_number();
|
|
|
|
|
|
|
|
do spawn {
|
|
|
|
// Capture it in the remote task
|
|
|
|
println(fmt!("I am child number %d", child_task_number));
|
|
|
|
}
|
|
|
|
~~~
|
|
|
|
|
|
|
|
## Communication
|
|
|
|
|
|
|
|
Now that we have spawned a new task, it would be nice if we could
|
|
|
|
communicate with it. Recall that Rust does not have shared mutable
|
|
|
|
state, so one task may not manipulate variables owned by another task.
|
|
|
|
Instead we use *pipes*.
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
A pipe is simply a pair of endpoints: one for sending messages and another for
|
|
|
|
receiving messages. Pipes are low-level communication building-blocks and so
|
|
|
|
come in a variety of forms, each one appropriate for a different use case. In
|
|
|
|
what follows, we cover the most commonly used varieties.
|
2012-10-01 22:41:21 +00:00
|
|
|
|
|
|
|
The simplest way to create a pipe is to use the `pipes::stream`
|
2012-12-12 21:38:19 +00:00
|
|
|
function to create a `(Port, Chan)` pair. In Rust parlance, a *channel*
|
2012-10-09 23:14:55 +00:00
|
|
|
is a sending endpoint of a pipe, and a *port* is the receiving
|
|
|
|
endpoint. Consider the following example of calculating two results
|
|
|
|
concurrently:
|
2012-09-22 22:33:50 +00:00
|
|
|
|
|
|
|
~~~~
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::task::spawn;
|
|
|
|
# use std::comm::{stream, Port, Chan};
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2012-12-12 21:38:19 +00:00
|
|
|
let (port, chan): (Port<int>, Chan<int>) = stream();
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2013-02-15 10:44:18 +00:00
|
|
|
do spawn || {
|
2012-09-22 22:33:50 +00:00
|
|
|
let result = some_expensive_computation();
|
|
|
|
chan.send(result);
|
|
|
|
}
|
|
|
|
|
|
|
|
some_other_expensive_computation();
|
|
|
|
let result = port.recv();
|
|
|
|
# fn some_expensive_computation() -> int { 42 }
|
|
|
|
# fn some_other_expensive_computation() {}
|
|
|
|
~~~~
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
Let's examine this example in detail. First, the `let` statement creates a
|
|
|
|
stream for sending and receiving integers (the left-hand side of the `let`,
|
|
|
|
`(chan, port)`, is an example of a *destructuring let*: the pattern separates
|
|
|
|
a tuple into its component parts).
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2012-10-01 22:41:21 +00:00
|
|
|
~~~~
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::comm::{stream, Chan, Port};
|
2012-12-12 21:38:19 +00:00
|
|
|
let (port, chan): (Port<int>, Chan<int>) = stream();
|
2012-09-22 22:33:50 +00:00
|
|
|
~~~~
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
The child task will use the channel to send data to the parent task,
|
2012-10-04 20:28:45 +00:00
|
|
|
which will wait to receive the data on the port. The next statement
|
2012-10-01 22:41:21 +00:00
|
|
|
spawns the child task.
|
2012-09-22 22:33:50 +00:00
|
|
|
|
|
|
|
~~~~
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::task::spawn;
|
|
|
|
# use std::comm::stream;
|
2012-09-22 22:33:50 +00:00
|
|
|
# fn some_expensive_computation() -> int { 42 }
|
2012-12-12 21:38:19 +00:00
|
|
|
# let (port, chan) = stream();
|
2013-02-15 10:44:18 +00:00
|
|
|
do spawn || {
|
2012-09-22 22:33:50 +00:00
|
|
|
let result = some_expensive_computation();
|
|
|
|
chan.send(result);
|
|
|
|
}
|
|
|
|
~~~~
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
Notice that the creation of the task closure transfers `chan` to the child
|
|
|
|
task implicitly: the closure captures `chan` in its environment. Both `Chan`
|
|
|
|
and `Port` are sendable types and may be captured into tasks or otherwise
|
|
|
|
transferred between them. In the example, the child task runs an expensive
|
|
|
|
computation, then sends the result over the captured channel.
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
Finally, the parent continues with some other expensive
|
|
|
|
computation, then waits for the child's result to arrive on the
|
2012-10-01 22:41:21 +00:00
|
|
|
port:
|
2012-09-22 22:33:50 +00:00
|
|
|
|
|
|
|
~~~~
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::comm::{stream};
|
2012-09-22 22:33:50 +00:00
|
|
|
# fn some_other_expensive_computation() {}
|
2012-12-12 21:38:19 +00:00
|
|
|
# let (port, chan) = stream::<int>();
|
2012-09-22 22:33:50 +00:00
|
|
|
# chan.send(0);
|
|
|
|
some_other_expensive_computation();
|
|
|
|
let result = port.recv();
|
|
|
|
~~~~
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
The `Port` and `Chan` pair created by `stream` enables efficient communication
|
|
|
|
between a single sender and a single receiver, but multiple senders cannot use
|
|
|
|
a single `Chan`, and multiple receivers cannot use a single `Port`. What if our
|
2013-01-21 06:48:47 +00:00
|
|
|
example needed to compute multiple results across a number of tasks? The
|
2012-10-09 23:14:55 +00:00
|
|
|
following program is ill-typed:
|
2012-10-02 01:03:09 +00:00
|
|
|
|
|
|
|
~~~ {.xfail-test}
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::task::{spawn};
|
|
|
|
# use std::comm::{stream, Port, Chan};
|
2012-10-02 01:03:09 +00:00
|
|
|
# fn some_expensive_computation() -> int { 42 }
|
2012-12-12 21:38:19 +00:00
|
|
|
let (port, chan) = stream();
|
2012-10-02 01:03:09 +00:00
|
|
|
|
2013-02-15 10:44:18 +00:00
|
|
|
do spawn {
|
2012-10-02 01:03:09 +00:00
|
|
|
chan.send(some_expensive_computation());
|
|
|
|
}
|
|
|
|
|
|
|
|
// ERROR! The previous spawn statement already owns the channel,
|
|
|
|
// so the compiler will not allow it to be captured again
|
|
|
|
do spawn {
|
|
|
|
chan.send(some_expensive_computation());
|
|
|
|
}
|
|
|
|
~~~
|
|
|
|
|
|
|
|
Instead we can use a `SharedChan`, a type that allows a single
|
|
|
|
`Chan` to be shared by multiple senders.
|
2012-10-01 22:41:21 +00:00
|
|
|
|
|
|
|
~~~
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::task::spawn;
|
|
|
|
# use std::comm::{stream, SharedChan};
|
2012-10-01 22:41:21 +00:00
|
|
|
|
2012-12-12 21:38:19 +00:00
|
|
|
let (port, chan) = stream();
|
2013-04-17 06:45:29 +00:00
|
|
|
let chan = SharedChan::new(chan);
|
2012-10-01 22:41:21 +00:00
|
|
|
|
2013-08-03 16:45:23 +00:00
|
|
|
for init_val in range(0u, 3) {
|
2012-10-01 22:41:21 +00:00
|
|
|
// Create a new channel handle to distribute to the child task
|
|
|
|
let child_chan = chan.clone();
|
2013-02-15 10:44:18 +00:00
|
|
|
do spawn {
|
2012-10-01 22:41:21 +00:00
|
|
|
child_chan.send(some_expensive_computation(init_val));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let result = port.recv() + port.recv() + port.recv();
|
|
|
|
# fn some_expensive_computation(_i: uint) -> int { 42 }
|
|
|
|
~~~
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
Here we transfer ownership of the channel into a new `SharedChan` value. Like
|
|
|
|
`Chan`, `SharedChan` is a non-copyable, owned type (sometimes also referred to
|
|
|
|
as an *affine* or *linear* type). Unlike with `Chan`, though, the programmer
|
|
|
|
may duplicate a `SharedChan`, with the `clone()` method. A cloned
|
|
|
|
`SharedChan` produces a new handle to the same channel, allowing multiple
|
|
|
|
tasks to send data to a single port. Between `spawn`, `stream` and
|
|
|
|
`SharedChan`, we have enough tools to implement many useful concurrency
|
|
|
|
patterns.
|
2012-10-01 22:41:21 +00:00
|
|
|
|
|
|
|
Note that the above `SharedChan` example is somewhat contrived since
|
|
|
|
you could also simply use three `stream` pairs, but it serves to
|
2012-10-09 23:14:55 +00:00
|
|
|
illustrate the point. For reference, written with multiple streams, it
|
2012-10-01 22:41:21 +00:00
|
|
|
might look like the example below.
|
|
|
|
|
|
|
|
~~~
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::task::spawn;
|
|
|
|
# use std::comm::stream;
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::vec;
|
2012-10-01 22:41:21 +00:00
|
|
|
|
2012-10-02 01:03:09 +00:00
|
|
|
// Create a vector of ports, one for each child task
|
2012-10-01 22:41:21 +00:00
|
|
|
let ports = do vec::from_fn(3) |init_val| {
|
2012-12-12 21:38:19 +00:00
|
|
|
let (port, chan) = stream();
|
2013-02-15 10:44:18 +00:00
|
|
|
do spawn {
|
2012-10-01 22:41:21 +00:00
|
|
|
chan.send(some_expensive_computation(init_val));
|
|
|
|
}
|
2013-02-15 10:44:18 +00:00
|
|
|
port
|
2012-10-01 22:41:21 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
// Wait on each port, accumulating the results
|
2013-06-17 23:43:22 +00:00
|
|
|
let result = ports.iter().fold(0, |accum, port| accum + port.recv() );
|
2012-10-01 22:41:21 +00:00
|
|
|
# fn some_expensive_computation(_i: uint) -> int { 42 }
|
|
|
|
~~~
|
|
|
|
|
2013-05-26 17:10:16 +00:00
|
|
|
## Backgrounding computations: Futures
|
2013-05-23 20:06:29 +00:00
|
|
|
With `extra::future`, rust has a mechanism for requesting a computation and getting the result
|
2013-05-17 21:11:49 +00:00
|
|
|
later.
|
|
|
|
|
|
|
|
The basic example below illustrates this.
|
|
|
|
~~~
|
|
|
|
# fn make_a_sandwich() {};
|
|
|
|
fn fib(n: uint) -> uint {
|
|
|
|
// lengthy computation returning an uint
|
|
|
|
12586269025
|
|
|
|
}
|
|
|
|
|
2013-05-23 20:06:29 +00:00
|
|
|
let mut delayed_fib = extra::future::spawn (|| fib(50) );
|
2013-05-17 21:11:49 +00:00
|
|
|
make_a_sandwich();
|
|
|
|
println(fmt!("fib(50) = %?", delayed_fib.get()))
|
|
|
|
~~~
|
|
|
|
|
|
|
|
The call to `future::spawn` returns immediately a `future` object regardless of how long it
|
|
|
|
takes to run `fib(50)`. You can then make yourself a sandwich while the computation of `fib` is
|
|
|
|
running. The result of the execution of the method is obtained by calling `get` on the future.
|
|
|
|
This call will block until the value is available (*i.e.* the computation is complete). Note that
|
|
|
|
the future needs to be mutable so that it can save the result for next time `get` is called.
|
|
|
|
|
|
|
|
Here is another example showing how futures allow you to background computations. The workload will
|
|
|
|
be distributed on the available cores.
|
|
|
|
~~~
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::vec;
|
2013-05-17 21:11:49 +00:00
|
|
|
fn partial_sum(start: uint) -> f64 {
|
|
|
|
let mut local_sum = 0f64;
|
2013-08-03 16:45:23 +00:00
|
|
|
for num in range(start*100000, (start+1)*100000) {
|
2013-06-11 01:03:02 +00:00
|
|
|
local_sum += (num as f64 + 1.0).pow(&-2.0);
|
2013-05-17 21:11:49 +00:00
|
|
|
}
|
|
|
|
local_sum
|
|
|
|
}
|
|
|
|
|
|
|
|
fn main() {
|
2013-05-23 20:06:29 +00:00
|
|
|
let mut futures = vec::from_fn(1000, |ind| do extra::future::spawn { partial_sum(ind) });
|
2013-05-17 21:11:49 +00:00
|
|
|
|
|
|
|
let mut final_res = 0f64;
|
2013-08-03 16:45:23 +00:00
|
|
|
for ft in futures.mut_iter() {
|
2013-05-17 21:11:49 +00:00
|
|
|
final_res += ft.get();
|
|
|
|
}
|
|
|
|
println(fmt!("π^2/6 is not far from : %?", final_res));
|
|
|
|
}
|
|
|
|
~~~
|
|
|
|
|
2013-07-22 20:57:40 +00:00
|
|
|
## Sharing immutable data without copy: Arc
|
2013-05-26 17:10:16 +00:00
|
|
|
|
|
|
|
To share immutable data between tasks, a first approach would be to only use pipes as we have seen
|
|
|
|
previously. A copy of the data to share would then be made for each task. In some cases, this would
|
|
|
|
add up to a significant amount of wasted memory and would require copying the same data more than
|
|
|
|
necessary.
|
|
|
|
|
2013-07-22 20:57:40 +00:00
|
|
|
To tackle this issue, one can use an Atomically Reference Counted wrapper (`Arc`) as implemented in
|
|
|
|
the `extra` library of Rust. With an Arc, the data will no longer be copied for each task. The Arc
|
2013-05-26 17:10:16 +00:00
|
|
|
acts as a reference to the shared data and only this reference is shared and cloned.
|
|
|
|
|
2013-07-22 20:57:40 +00:00
|
|
|
Here is a small example showing how to use Arcs. We wish to run concurrently several computations on
|
2013-05-26 17:10:16 +00:00
|
|
|
a single large vector of floats. Each task needs the full vector to perform its duty.
|
|
|
|
~~~
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::vec;
|
|
|
|
# use std::rand;
|
2013-07-22 20:57:40 +00:00
|
|
|
use extra::arc::Arc;
|
2013-05-26 17:10:16 +00:00
|
|
|
|
|
|
|
fn pnorm(nums: &~[float], p: uint) -> float {
|
2013-06-11 01:03:02 +00:00
|
|
|
nums.iter().fold(0.0, |a,b| a+(*b).pow(&(p as float)) ).pow(&(1f / (p as float)))
|
2013-05-26 17:10:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn main() {
|
2013-05-25 02:35:29 +00:00
|
|
|
let numbers = vec::from_fn(1000000, |_| rand::random::<float>());
|
2013-06-16 00:53:30 +00:00
|
|
|
println(fmt!("Inf-norm = %?", *numbers.iter().max().unwrap()));
|
2013-05-26 17:10:16 +00:00
|
|
|
|
2013-07-22 20:57:40 +00:00
|
|
|
let numbers_arc = Arc::new(numbers);
|
2013-05-26 17:10:16 +00:00
|
|
|
|
2013-08-03 16:45:23 +00:00
|
|
|
for num in range(1u, 10) {
|
2013-05-26 17:10:16 +00:00
|
|
|
let (port, chan) = stream();
|
|
|
|
chan.send(numbers_arc.clone());
|
|
|
|
|
|
|
|
do spawn {
|
2013-07-22 20:57:40 +00:00
|
|
|
let local_arc : Arc<~[float]> = port.recv();
|
2013-05-26 17:10:16 +00:00
|
|
|
let task_numbers = local_arc.get();
|
|
|
|
println(fmt!("%u-norm = %?", num, pnorm(task_numbers, num)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
~~~
|
|
|
|
|
|
|
|
The function `pnorm` performs a simple computation on the vector (it computes the sum of its items
|
2013-07-22 20:57:40 +00:00
|
|
|
at the power given as argument and takes the inverse power of this value). The Arc on the vector is
|
2013-05-26 17:10:16 +00:00
|
|
|
created by the line
|
|
|
|
~~~
|
2013-07-22 20:57:40 +00:00
|
|
|
# use extra::arc::Arc;
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::vec;
|
|
|
|
# use std::rand;
|
|
|
|
# let numbers = vec::from_fn(1000000, |_| rand::random::<float>());
|
2013-07-22 20:57:40 +00:00
|
|
|
let numbers_arc=Arc::new(numbers);
|
2013-05-26 17:10:16 +00:00
|
|
|
~~~
|
|
|
|
and a clone of it is sent to each task
|
|
|
|
~~~
|
2013-07-22 20:57:40 +00:00
|
|
|
# use extra::arc::Arc;
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::vec;
|
|
|
|
# use std::rand;
|
2013-05-26 17:10:16 +00:00
|
|
|
# let numbers=vec::from_fn(1000000, |_| rand::random::<float>());
|
2013-07-22 20:57:40 +00:00
|
|
|
# let numbers_arc = Arc::new(numbers);
|
2013-05-26 17:10:16 +00:00
|
|
|
# let (port, chan) = stream();
|
|
|
|
chan.send(numbers_arc.clone());
|
|
|
|
~~~
|
|
|
|
copying only the wrapper and not its contents.
|
|
|
|
|
|
|
|
Each task recovers the underlying data by
|
|
|
|
~~~
|
2013-07-22 20:57:40 +00:00
|
|
|
# use extra::arc::Arc;
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::vec;
|
|
|
|
# use std::rand;
|
2013-05-26 17:10:16 +00:00
|
|
|
# let numbers=vec::from_fn(1000000, |_| rand::random::<float>());
|
2013-07-22 20:57:40 +00:00
|
|
|
# let numbers_arc=Arc::new(numbers);
|
2013-05-26 17:10:16 +00:00
|
|
|
# let (port, chan) = stream();
|
|
|
|
# chan.send(numbers_arc.clone());
|
2013-07-22 20:57:40 +00:00
|
|
|
# let local_arc : Arc<~[float]> = port.recv();
|
2013-05-26 17:10:16 +00:00
|
|
|
let task_numbers = local_arc.get();
|
|
|
|
~~~
|
|
|
|
and can use it as if it were local.
|
|
|
|
|
2013-07-22 20:57:40 +00:00
|
|
|
The `arc` module also implements Arcs around mutable data that are not covered here.
|
2013-05-26 17:10:16 +00:00
|
|
|
|
2012-10-02 01:42:11 +00:00
|
|
|
# Handling task failure
|
|
|
|
|
2013-02-01 02:24:09 +00:00
|
|
|
Rust has a built-in mechanism for raising exceptions. The `fail!()` macro
|
|
|
|
(which can also be written with an error string as an argument: `fail!(
|
2013-03-29 01:39:09 +00:00
|
|
|
~reason)`) and the `assert!` construct (which effectively calls `fail!()`
|
2013-03-06 21:58:02 +00:00
|
|
|
if a boolean expression is false) are both ways to raise exceptions. When a
|
|
|
|
task raises an exception the task unwinds its stack---running destructors and
|
2012-10-09 23:14:55 +00:00
|
|
|
freeing memory along the way---and then exits. Unlike exceptions in C++,
|
|
|
|
exceptions in Rust are unrecoverable within a single task: once a task fails,
|
|
|
|
there is no way to "catch" the exception.
|
2012-10-02 01:42:11 +00:00
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
All tasks are, by default, _linked_ to each other. That means that the fates
|
|
|
|
of all tasks are intertwined: if one fails, so do all the others.
|
2012-10-02 01:42:11 +00:00
|
|
|
|
2013-08-06 21:32:30 +00:00
|
|
|
~~~{.xfail-test .linked-failure}
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::task::spawn;
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::task;
|
2012-10-02 01:42:11 +00:00
|
|
|
# fn do_some_work() { loop { task::yield() } }
|
|
|
|
# do task::try {
|
|
|
|
// Create a child task that fails
|
2013-02-15 01:33:16 +00:00
|
|
|
do spawn { fail!() }
|
2012-10-02 01:42:11 +00:00
|
|
|
|
|
|
|
// This will also fail because the task we spawned failed
|
|
|
|
do_some_work();
|
|
|
|
# };
|
|
|
|
~~~
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
While it isn't possible for a task to recover from failure, tasks may notify
|
|
|
|
each other of failure. The simplest way of handling task failure is with the
|
|
|
|
`try` function, which is similar to `spawn`, but immediately blocks waiting
|
|
|
|
for the child task to finish. `try` returns a value of type `Result<int,
|
|
|
|
()>`. `Result` is an `enum` type with two variants: `Ok` and `Err`. In this
|
|
|
|
case, because the type arguments to `Result` are `int` and `()`, callers can
|
|
|
|
pattern-match on a result to check whether it's an `Ok` result with an `int`
|
|
|
|
field (representing a successful result) or an `Err` result (representing
|
|
|
|
termination with an error).
|
2012-10-02 01:42:11 +00:00
|
|
|
|
2013-08-06 21:32:30 +00:00
|
|
|
~~~{.xfail-test .linked-failure}
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::task;
|
2012-10-02 01:42:11 +00:00
|
|
|
# fn some_condition() -> bool { false }
|
|
|
|
# fn calculate_result() -> int { 0 }
|
|
|
|
let result: Result<int, ()> = do task::try {
|
|
|
|
if some_condition() {
|
|
|
|
calculate_result()
|
|
|
|
} else {
|
2013-05-05 22:18:51 +00:00
|
|
|
fail!("oops!");
|
2012-10-02 01:42:11 +00:00
|
|
|
}
|
|
|
|
};
|
2013-03-29 01:39:09 +00:00
|
|
|
assert!(result.is_err());
|
2012-10-02 01:42:11 +00:00
|
|
|
~~~
|
|
|
|
|
|
|
|
Unlike `spawn`, the function spawned using `try` may return a value,
|
|
|
|
which `try` will dutifully propagate back to the caller in a [`Result`]
|
|
|
|
enum. If the child task terminates successfully, `try` will
|
|
|
|
return an `Ok` result; if the child task fails, `try` will return
|
|
|
|
an `Error` result.
|
|
|
|
|
2013-05-23 20:06:29 +00:00
|
|
|
[`Result`]: std/result.html
|
2012-10-02 01:42:11 +00:00
|
|
|
|
|
|
|
> ***Note:*** A failed task does not currently produce a useful error
|
2012-10-09 23:14:55 +00:00
|
|
|
> value (`try` always returns `Err(())`). In the
|
|
|
|
> future, it may be possible for tasks to intercept the value passed to
|
2013-02-01 02:24:09 +00:00
|
|
|
> `fail!()`.
|
2012-10-02 01:42:11 +00:00
|
|
|
|
|
|
|
TODO: Need discussion of `future_result` in order to make failure
|
|
|
|
modes useful.
|
|
|
|
|
2013-06-19 19:58:08 +00:00
|
|
|
But not all failures are created equal. In some cases you might need to
|
2012-10-02 01:42:11 +00:00
|
|
|
abort the entire program (perhaps you're writing an assert which, if
|
|
|
|
it trips, indicates an unrecoverable logic error); in other cases you
|
|
|
|
might want to contain the failure at a certain boundary (perhaps a
|
|
|
|
small piece of input from the outside world, which you happen to be
|
|
|
|
processing in parallel, is malformed and its processing task can't
|
2012-10-09 23:14:55 +00:00
|
|
|
proceed). Hence, you will need different _linked failure modes_.
|
2012-10-02 01:42:11 +00:00
|
|
|
|
|
|
|
## Failure modes
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
By default, task failure is _bidirectionally linked_, which means that if
|
2013-02-15 01:33:16 +00:00
|
|
|
either task fails, it kills the other one.
|
2012-10-02 01:42:11 +00:00
|
|
|
|
2013-08-06 21:32:30 +00:00
|
|
|
~~~{.xfail-test .linked-failure}
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::task;
|
2013-08-06 20:25:09 +00:00
|
|
|
# use std::comm::oneshot;
|
|
|
|
# fn sleep_forever() { loop { let (p, c) = oneshot::<()>(); p.recv(); } }
|
2012-10-02 01:42:11 +00:00
|
|
|
# do task::try {
|
2013-05-17 21:11:49 +00:00
|
|
|
do spawn {
|
|
|
|
do spawn {
|
2013-02-15 01:33:16 +00:00
|
|
|
fail!(); // All three tasks will fail.
|
2012-10-02 01:42:11 +00:00
|
|
|
}
|
|
|
|
sleep_forever(); // Will get woken up by force, then fail
|
|
|
|
}
|
|
|
|
sleep_forever(); // Will get woken up by force, then fail
|
|
|
|
# };
|
|
|
|
~~~
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
If you want parent tasks to be able to kill their children, but do not want a
|
2013-02-15 01:33:16 +00:00
|
|
|
parent to fail automatically if one of its child task fails, you can call
|
2012-10-02 01:42:11 +00:00
|
|
|
`task::spawn_supervised` for _unidirectionally linked_ failure. The
|
|
|
|
function `task::try`, which we saw previously, uses `spawn_supervised`
|
|
|
|
internally, with additional logic to wait for the child task to finish
|
|
|
|
before returning. Hence:
|
|
|
|
|
2013-08-06 21:32:30 +00:00
|
|
|
~~~{.xfail-test .linked-failure}
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::comm::{stream, Chan, Port};
|
2013-08-06 20:25:09 +00:00
|
|
|
# use std::comm::oneshot;
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::task::{spawn, try};
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::task;
|
2013-08-06 20:25:09 +00:00
|
|
|
# fn sleep_forever() { loop { let (p, c) = oneshot::<()>(); p.recv(); } }
|
2012-10-02 01:42:11 +00:00
|
|
|
# do task::try {
|
2012-12-12 21:38:19 +00:00
|
|
|
let (receiver, sender): (Port<int>, Chan<int>) = stream();
|
2013-02-15 10:44:18 +00:00
|
|
|
do spawn { // Bidirectionally linked
|
2012-10-02 01:42:11 +00:00
|
|
|
// Wait for the supervised child task to exist.
|
|
|
|
let message = receiver.recv();
|
|
|
|
// Kill both it and the parent task.
|
2013-03-29 01:39:09 +00:00
|
|
|
assert!(message != 42);
|
2012-10-02 01:42:11 +00:00
|
|
|
}
|
2013-02-15 10:44:18 +00:00
|
|
|
do try { // Unidirectionally linked
|
2012-10-02 01:42:11 +00:00
|
|
|
sender.send(42);
|
|
|
|
sleep_forever(); // Will get woken up by force
|
|
|
|
}
|
|
|
|
// Flow never reaches here -- parent task was killed too.
|
|
|
|
# };
|
|
|
|
~~~
|
|
|
|
|
|
|
|
Supervised failure is useful in any situation where one task manages
|
|
|
|
multiple fallible child tasks, and the parent task can recover
|
2012-10-09 23:14:55 +00:00
|
|
|
if any child fails. On the other hand, if the _parent_ (supervisor) fails,
|
2012-10-02 01:42:11 +00:00
|
|
|
then there is nothing the children can do to recover, so they should
|
|
|
|
also fail.
|
|
|
|
|
|
|
|
Supervised task failure propagates across multiple generations even if
|
|
|
|
an intermediate generation has already exited:
|
|
|
|
|
2013-08-06 21:32:30 +00:00
|
|
|
~~~{.xfail-test .linked-failure}
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::task;
|
2013-08-06 20:25:09 +00:00
|
|
|
# use std::comm::oneshot;
|
|
|
|
# fn sleep_forever() { loop { let (p, c) = oneshot::<()>(); p.recv(); } }
|
2013-08-06 03:43:06 +00:00
|
|
|
# fn wait_for_a_while() { for _ in range(0, 1000u) { task::yield() } }
|
2012-10-02 01:42:11 +00:00
|
|
|
# do task::try::<int> {
|
|
|
|
do task::spawn_supervised {
|
|
|
|
do task::spawn_supervised {
|
|
|
|
sleep_forever(); // Will get woken up by force, then fail
|
|
|
|
}
|
|
|
|
// Intermediate task immediately exits
|
|
|
|
}
|
|
|
|
wait_for_a_while();
|
2013-02-15 01:33:16 +00:00
|
|
|
fail!(); // Will kill grandchild even if child has already exited
|
2012-10-02 01:42:11 +00:00
|
|
|
# };
|
|
|
|
~~~
|
|
|
|
|
|
|
|
Finally, tasks can be configured to not propagate failure to each
|
|
|
|
other at all, using `task::spawn_unlinked` for _isolated failure_.
|
|
|
|
|
2013-08-06 21:32:30 +00:00
|
|
|
~~~{.xfail-test .linked-failure}
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::task;
|
2013-01-10 18:59:58 +00:00
|
|
|
# fn random() -> uint { 100 }
|
2013-08-06 03:43:06 +00:00
|
|
|
# fn sleep_for(i: uint) { for _ in range(0, i) { task::yield() } }
|
2012-10-02 01:42:11 +00:00
|
|
|
# do task::try::<()> {
|
|
|
|
let (time1, time2) = (random(), random());
|
|
|
|
do task::spawn_unlinked {
|
|
|
|
sleep_for(time2); // Won't get forced awake
|
2013-02-15 01:33:16 +00:00
|
|
|
fail!();
|
2012-10-02 01:42:11 +00:00
|
|
|
}
|
|
|
|
sleep_for(time1); // Won't get forced awake
|
2013-02-15 01:33:16 +00:00
|
|
|
fail!();
|
2012-10-02 01:42:11 +00:00
|
|
|
// It will take MAX(time1,time2) for the program to finish.
|
|
|
|
# };
|
|
|
|
~~~
|
|
|
|
|
2012-10-01 22:41:21 +00:00
|
|
|
## Creating a task with a bi-directional communication path
|
2012-09-22 22:33:50 +00:00
|
|
|
|
|
|
|
A very common thing to do is to spawn a child task where the parent
|
|
|
|
and child both need to exchange messages with each other. The
|
2013-05-23 20:06:29 +00:00
|
|
|
function `extra::comm::DuplexStream()` supports this pattern. We'll
|
2012-10-09 23:14:55 +00:00
|
|
|
look briefly at how to use it.
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2012-12-13 03:17:31 +00:00
|
|
|
To see how `DuplexStream()` works, we will create a child task
|
2012-10-09 23:14:55 +00:00
|
|
|
that repeatedly receives a `uint` message, converts it to a string, and sends
|
|
|
|
the string in response. The child terminates when it receives `0`.
|
2012-09-22 22:33:50 +00:00
|
|
|
Here is the function that implements the child task:
|
|
|
|
|
2013-08-06 21:32:30 +00:00
|
|
|
~~~{.xfail-test .linked-failure}
|
2013-05-23 20:06:29 +00:00
|
|
|
# use extra::comm::DuplexStream;
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::uint;
|
2012-09-22 22:33:50 +00:00
|
|
|
fn stringifier(channel: &DuplexStream<~str, uint>) {
|
|
|
|
let mut value: uint;
|
|
|
|
loop {
|
|
|
|
value = channel.recv();
|
2013-02-02 15:40:42 +00:00
|
|
|
channel.send(uint::to_str(value));
|
2012-10-09 23:14:55 +00:00
|
|
|
if value == 0 { break; }
|
2012-09-22 22:33:50 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
~~~~
|
|
|
|
|
|
|
|
The implementation of `DuplexStream` supports both sending and
|
|
|
|
receiving. The `stringifier` function takes a `DuplexStream` that can
|
|
|
|
send strings (the first type parameter) and receive `uint` messages
|
|
|
|
(the second type parameter). The body itself simply loops, reading
|
|
|
|
from the channel and then sending its response back. The actual
|
2012-10-09 23:14:55 +00:00
|
|
|
response itself is simply the stringified version of the received value,
|
2012-09-22 22:33:50 +00:00
|
|
|
`uint::to_str(value)`.
|
|
|
|
|
|
|
|
Here is the code for the parent task:
|
|
|
|
|
2013-08-06 21:32:30 +00:00
|
|
|
~~~{.xfail-test .linked-failure}
|
2013-05-23 20:06:29 +00:00
|
|
|
# use std::task::spawn;
|
2013-05-25 02:35:29 +00:00
|
|
|
# use std::uint;
|
2013-05-23 20:06:29 +00:00
|
|
|
# use extra::comm::DuplexStream;
|
2012-09-22 22:33:50 +00:00
|
|
|
# fn stringifier(channel: &DuplexStream<~str, uint>) {
|
|
|
|
# let mut value: uint;
|
|
|
|
# loop {
|
|
|
|
# value = channel.recv();
|
2013-02-02 15:40:42 +00:00
|
|
|
# channel.send(uint::to_str(value));
|
2012-09-22 22:33:50 +00:00
|
|
|
# if value == 0u { break; }
|
|
|
|
# }
|
|
|
|
# }
|
|
|
|
# fn main() {
|
|
|
|
|
|
|
|
let (from_child, to_child) = DuplexStream();
|
|
|
|
|
2013-02-15 10:44:18 +00:00
|
|
|
do spawn {
|
2012-09-22 22:33:50 +00:00
|
|
|
stringifier(&to_child);
|
|
|
|
};
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
from_child.send(22);
|
2013-03-29 01:39:09 +00:00
|
|
|
assert!(from_child.recv() == ~"22");
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
from_child.send(23);
|
|
|
|
from_child.send(0);
|
2012-09-22 22:33:50 +00:00
|
|
|
|
2013-03-29 01:39:09 +00:00
|
|
|
assert!(from_child.recv() == ~"23");
|
|
|
|
assert!(from_child.recv() == ~"0");
|
2012-09-22 22:33:50 +00:00
|
|
|
|
|
|
|
# }
|
|
|
|
~~~~
|
|
|
|
|
2012-10-09 23:14:55 +00:00
|
|
|
The parent task first calls `DuplexStream` to create a pair of bidirectional
|
|
|
|
endpoints. It then uses `task::spawn` to create the child task, which captures
|
|
|
|
one end of the communication channel. As a result, both parent and child can
|
|
|
|
send and receive data to and from the other.
|