Rewrite task-comm-NN to use pipes

This commit is contained in:
Eric Holk 2012-07-25 14:05:06 -07:00
parent 60d682b577
commit 08a77e06a8
15 changed files with 151 additions and 210 deletions

View File

@ -30,59 +30,6 @@ macro_rules! move {
// places. Once there is unary move, it can be removed.
fn move<T>(-x: T) -> T { x }
/**
Some thoughts about fixed buffers.
The idea is if a protocol is bounded, we will synthesize a record that
has a field for each state. Each of these states contains a packet for
the messages that are legal to be sent in that state. Then, instead of
allocating, the send code just finds a pointer to the right field and
uses that instead.
Unforunately, this makes things kind of tricky. We need to be able to
find the buffer, which means we need to pass it around. This could
either be associated with the (send|recv)_packet classes, or with the
packet itself. We will also need some form of reference counting so we
can track who has the responsibility of freeing the buffer.
We want to preserve the ability to do things like optimistic buffer
re-use, and skipping over to a new buffer when necessary. What I mean
is, suppose we had the typical stream protocol. It'd make sense to
amortize allocation costs by allocating a buffer with say 16
messages. When the sender gets to the end of the buffer, it could
check if the receiver is done with the packet in slot 0. If so, it can
just reuse that one, checking if the receiver is done with the next
one in each case. If it is ever not done, it just allocates a new
buffer and skips over to that.
Also, since protocols are in libcore, we have to do this in a way that
maintains backwards compatibility.
buffer header and buffer. Cast as c_void when necessary.
===
Okay, here are some new ideas.
It'd be nice to keep the bounded/unbounded case as uniform as
possible. It leads to less code duplication, and less things that can
go sublty wrong. For the bounded case, we could either have a struct
with a bunch of unique pointers to pre-allocated packets, or we could
lay them out inline. Inline layout is better, if for no other reason
than that we don't have to allocate each packet
individually. Currently we pass unique packets around as unsafe
pointers, but they are actually unique pointers. We should instead use
real unsafe pointers. This makes freeing data and running destructors
trickier though. Thus, we should allocate all packets in parter of a
higher level buffer structure. Packets can maintain a pointer to their
buffer, and this is the part that gets freed.
It might be helpful to have some idea of a semi-unique pointer (like
being partially pregnant, also like an ARC).
*/
enum state {
empty,
full,
@ -805,6 +752,12 @@ class port_set<T: send> : recv<T> {
vec::push(self.ports, port)
}
fn chan() -> chan<T> {
let (ch, po) = stream();
self.add(po);
ch
}
fn try_recv() -> option<T> {
let mut result = none;
while result == none && self.ports.len() > 0 {

View File

@ -1,31 +1,29 @@
use std;
import comm;
import comm::chan;
import comm::send;
import pipes;
import pipes::chan;
import pipes::port;
import task;
fn main() { test05(); }
fn test05_start(ch : chan<int>) {
log(error, ch);
send(ch, 10);
ch.send(10);
#error("sent 10");
send(ch, 20);
ch.send(20);
#error("sent 20");
send(ch, 30);
ch.send(30);
#error("sent 30");
}
fn test05() {
let po = comm::port();
let ch = comm::chan(po);
let (ch, po) = pipes::stream();
task::spawn(|| test05_start(ch) );
let mut value = comm::recv(po);
let mut value = po.recv();
log(error, value);
value = comm::recv(po);
value = po.recv();
log(error, value);
value = comm::recv(po);
value = po.recv();
log(error, value);
assert (value == 30);
}

View File

@ -1,28 +1,27 @@
use std;
import task;
import comm;
import pipes;
fn start(c: comm::chan<comm::chan<~str>>) {
let p = comm::port();
comm::send(c, comm::chan(p));
fn start(c: pipes::chan<pipes::chan<~str>>) {
let (ch, p) = pipes::stream();
c.send(ch);
let mut a;
let mut b;
a = comm::recv(p);
a = p.recv();
assert a == ~"A";
log(error, a);
b = comm::recv(p);
b = p.recv();
assert b == ~"B";
log(error, b);
}
fn main() {
let p = comm::port();
let ch = comm::chan(p);
let (ch, p) = pipes::stream();
let child = task::spawn(|| start(ch) );
let c = comm::recv(p);
comm::send(c, ~"A");
comm::send(c, ~"B");
let c = p.recv();
c.send(~"A");
c.send(~"B");
task::yield();
}

View File

@ -1,15 +1,14 @@
use std;
import comm;
import pipes;
import task;
fn start(c: comm::chan<comm::chan<int>>) {
let p: comm::port<int> = comm::port();
comm::send(c, comm::chan(p));
fn start(c: pipes::chan<pipes::chan<int>>) {
let (ch, p) = pipes::stream();
c.send(ch);
}
fn main() {
let p = comm::port();
let ch = comm::chan(p);
let (ch, p) = pipes::stream();
let child = task::spawn(|| start(ch) );
let c = comm::recv(p);
let c = p.recv();
}

View File

@ -1,17 +1,16 @@
use std;
import task;
import comm;
import comm::send;
import pipes;
import pipes::send;
fn start(c: comm::chan<int>, start: int, number_of_messages: int) {
fn start(c: pipes::chan<int>, start: int, number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { send(c, start + i); i += 1; }
while i < number_of_messages { c.send(start + i); i += 1; }
}
fn main() {
#debug("Check that we don't deadlock.");
let p = comm::port::<int>();
let ch = comm::chan(p);
let (ch, p) = pipes::stream();
task::try(|| start(ch, 0, 10) );
#debug("Joined task");
}

View File

@ -1,15 +1,14 @@
use std;
import comm;
import task;
fn main() {
let po = comm::port::<int>();
let ch = comm::chan(po);
let po = pipes::port_set();
// Spawn 10 tasks each sending us back one int.
let mut i = 10;
while (i > 0) {
log(debug, i);
let (ch, p) = pipes::stream();
po.add(p);
task::spawn(|copy i| child(i, ch) );
i = i - 1;
}
@ -18,17 +17,16 @@ fn main() {
// anything back, so we deadlock here.
i = 10;
let mut value = 0;
while (i > 0) {
log(debug, i);
value = comm::recv(po);
po.recv();
i = i - 1;
}
#debug("main thread exiting");
}
fn child(x: int, ch: comm::chan<int>) {
fn child(x: int, ch: pipes::chan<int>) {
log(debug, x);
comm::send(ch, copy x);
ch.send(x);
}

View File

@ -1,23 +1,21 @@
// xfail-win32
use std;
import comm;
import task;
fn start(c: comm::chan<int>, i0: int) {
fn start(c: pipes::chan<int>, i0: int) {
let mut i = i0;
while i > 0 {
comm::send(c, 0);
c.send(0);
i = i - 1;
}
}
fn main() {
let p = comm::port();
// Spawn a task that sends us back messages. The parent task
// is likely to terminate before the child completes, so from
// the child's point of view the receiver may die. We should
// drop messages on the floor in this case, and not crash!
let ch = comm::chan(p);
let child = task::spawn(|| start(ch, 10) );
let c = comm::recv(p);
let (ch, p) = pipes::stream();
task::spawn(|| start(ch, 10));
p.recv();
}

View File

@ -1,44 +1,41 @@
// -*- rust -*-
use std;
import comm;
import comm::send;
import comm::port;
import comm::recv;
import comm::chan;
import pipes;
import pipes::send;
import pipes::port;
import pipes::recv;
import pipes::chan;
// Tests of ports and channels on various types
fn test_rec() {
type r = {val0: int, val1: u8, val2: char};
let po = comm::port();
let ch = chan(po);
let (ch, po) = pipes::stream();
let r0: r = {val0: 0, val1: 1u8, val2: '2'};
send(ch, r0);
ch.send(r0);
let mut r1: r;
r1 = recv(po);
r1 = po.recv();
assert (r1.val0 == 0);
assert (r1.val1 == 1u8);
assert (r1.val2 == '2');
}
fn test_vec() {
let po = port();
let ch = chan(po);
let (ch, po) = pipes::stream();
let v0: ~[int] = ~[0, 1, 2];
send(ch, v0);
let v1 = recv(po);
ch.send(v0);
let v1 = po.recv();
assert (v1[0] == 0);
assert (v1[1] == 1);
assert (v1[2] == 2);
}
fn test_str() {
let po = port();
let ch = chan(po);
let s0 = ~"test";
send(ch, s0);
let s1 = recv(po);
let (ch, po) = pipes::stream();
let s0 = "test";
ch.send(s0);
let s1 = po.recv();
assert (s1[0] == 't' as u8);
assert (s1[1] == 'e' as u8);
assert (s1[2] == 's' as u8);
@ -47,33 +44,36 @@ fn test_str() {
fn test_tag() {
enum t { tag1, tag2(int), tag3(int, u8, char), }
let po = port();
let ch = chan(po);
send(ch, tag1);
send(ch, tag2(10));
send(ch, tag3(10, 11u8, 'A'));
let (ch, po) = pipes::stream();
ch.send(tag1);
ch.send(tag2(10));
ch.send(tag3(10, 11u8, 'A'));
let mut t1: t;
t1 = recv(po);
t1 = po.recv();
assert (t1 == tag1);
t1 = recv(po);
t1 = po.recv();
assert (t1 == tag2(10));
t1 = recv(po);
t1 = po.recv();
assert (t1 == tag3(10, 11u8, 'A'));
}
fn test_chan() {
let po = port();
let ch = chan(po);
let po0 = port();
let ch0 = chan(po0);
send(ch, ch0);
let ch1 = recv(po);
let (ch, po) = pipes::stream();
let (ch0, po0) = pipes::stream();
ch.send(ch0);
let ch1 = po.recv();
// Does the transmitted channel still work?
send(ch1, 10);
ch1.send(10);
let mut i: int;
i = recv(po0);
i = po0.recv();
assert (i == 10);
}
fn main() { test_rec(); test_vec(); test_str(); test_tag(); test_chan(); }
fn main() {
test_rec();
test_vec();
test_str();
test_tag();
test_chan();
}

View File

@ -1,9 +1,9 @@
use std;
import task;
import comm;
import comm::chan;
import comm::send;
import comm::recv;
import pipes;
import pipes::chan;
import pipes::send;
import pipes::recv;
fn main() { #debug("===== WITHOUT THREADS ====="); test00(); }
@ -12,7 +12,7 @@ fn test00_start(ch: chan<int>, message: int, count: int) {
let mut i: int = 0;
while i < count {
#debug("Sending Message");
send(ch, message + 0);
ch.send(message + 0);
i = i + 1;
}
#debug("Ending test00_start");
@ -24,14 +24,14 @@ fn test00() {
#debug("Creating tasks");
let po = comm::port();
let ch = chan(po);
let po = pipes::port_set();
let mut i: int = 0;
// Create and spawn tasks...
let mut results = ~[];
while i < number_of_tasks {
let ch = po.chan();
do task::task().future_result(|-r| {
results += ~[r];
}).spawn |copy i| {
@ -45,7 +45,7 @@ fn test00() {
for results.each |r| {
i = 0;
while i < number_of_messages {
let value = recv(po);
let value = po.recv();
sum += value;
i = i + 1;
}

View File

@ -1,44 +1,43 @@
use std;
import comm;
import comm::send;
import pipes;
import pipes::send;
fn main() { test00(); }
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
let p = comm::port();
let c = comm::chan(p);
send(c, 1);
send(c, 2);
send(c, 3);
send(c, 4);
r = comm::recv(p);
let (c, p) = pipes::stream();
c.send(1);
c.send(2);
c.send(3);
c.send(4);
r = p.recv();
sum += r;
log(debug, r);
r = comm::recv(p);
r = p.recv();
sum += r;
log(debug, r);
r = comm::recv(p);
r = p.recv();
sum += r;
log(debug, r);
r = comm::recv(p);
r = p.recv();
sum += r;
log(debug, r);
send(c, 5);
send(c, 6);
send(c, 7);
send(c, 8);
r = comm::recv(p);
c.send(5);
c.send(6);
c.send(7);
c.send(8);
r = p.recv();
sum += r;
log(debug, r);
r = comm::recv(p);
r = p.recv();
sum += r;
log(debug, r);
r = comm::recv(p);
r = p.recv();
sum += r;
log(debug, r);
r = comm::recv(p);
r = p.recv();
sum += r;
log(debug, r);
assert (sum == 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8);

View File

@ -1,17 +1,16 @@
use std;
import comm;
import pipes;
fn main() { test00(); }
fn test00() {
let r: int = 0;
let mut sum: int = 0;
let p = comm::port();
let c = comm::chan(p);
let (c, p) = pipes::stream();
let number_of_messages: int = 1000;
let mut i: int = 0;
while i < number_of_messages { comm::send(c, i + 0); i += 1; }
while i < number_of_messages { c.send(i + 0); i += 1; }
i = 0;
while i < number_of_messages { sum += comm::recv(p); i += 1; }
while i < number_of_messages { sum += p.recv(); i += 1; }
assert (sum == number_of_messages * (number_of_messages - 1) / 2);
}

View File

@ -1,37 +1,37 @@
use std;
import comm;
import comm::send;
import comm::chan;
import comm::recv;
import pipes;
import pipes::send;
import pipes::chan;
import pipes::recv;
fn main() { test00(); }
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
let p = comm::port();
let c0 = chan(p);
let c1 = chan(p);
let c2 = chan(p);
let c3 = chan(p);
let p = pipes::port_set();
let c0 = p.chan();
let c1 = p.chan();
let c2 = p.chan();
let c3 = p.chan();
let number_of_messages: int = 1000;
let mut i: int = 0;
while i < number_of_messages {
send(c0, i + 0);
send(c1, i + 0);
send(c2, i + 0);
send(c3, i + 0);
c0.send(i + 0);
c1.send(i + 0);
c2.send(i + 0);
c3.send(i + 0);
i += 1;
}
i = 0;
while i < number_of_messages {
r = recv(p);
r = p.recv();
sum += r;
r = recv(p);
r = p.recv();
sum += r;
r = recv(p);
r = p.recv();
sum += r;
r = recv(p);
r = p.recv();
sum += r;
i += 1;
}

View File

@ -1,43 +1,45 @@
use std;
import task;
import comm;
fn main() { test00(); }
fn test00_start(c: comm::chan<int>, start: int, number_of_messages: int) {
fn test00_start(c: pipes::chan<int>, start: int, number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { comm::send(c, start + i); i += 1; }
while i < number_of_messages { c.send(start + i); i += 1; }
}
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
let p = comm::port();
let p = pipes::port_set();
let number_of_messages: int = 10;
let c = comm::chan(p);
let c = p.chan();
do task::spawn {
test00_start(c, number_of_messages * 0, number_of_messages);
}
let c = p.chan();
do task::spawn {
test00_start(c, number_of_messages * 1, number_of_messages);
}
let c = p.chan();
do task::spawn {
test00_start(c, number_of_messages * 2, number_of_messages);
}
let c = p.chan();
do task::spawn {
test00_start(c, number_of_messages * 3, number_of_messages);
}
let mut i: int = 0;
while i < number_of_messages {
r = comm::recv(p);
r = p.recv();
sum += r;
r = comm::recv(p);
r = p.recv();
sum += r;
r = comm::recv(p);
r = p.recv();
sum += r;
r = comm::recv(p);
r = p.recv();
sum += r;
i += 1;
}

View File

@ -1,20 +1,19 @@
use std;
import task;
import comm;
fn main() { test00(); }
fn test00_start(c: comm::chan<int>, number_of_messages: int) {
fn test00_start(c: pipes::chan<int>, number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { comm::send(c, i + 0); i += 1; }
while i < number_of_messages { c.send(i + 0); i += 1; }
}
fn test00() {
let r: int = 0;
let mut sum: int = 0;
let p = comm::port();
let p = pipes::port_set();
let number_of_messages: int = 10;
let ch = comm::chan(p);
let ch = p.chan();
let mut result = none;
do task::task().future_result(|-r| { result = some(r); }).spawn {
@ -23,7 +22,7 @@ fn test00() {
let mut i: int = 0;
while i < number_of_messages {
sum += comm::recv(p);
sum += p.recv();
log(debug, r);
i += 1;
}

View File

@ -1,14 +1,12 @@
use std;
import comm::*;
import pipes::{port, chan}
/*
This is about the simplest program that can successfully send a
message.
*/
fn main() {
let po = port();
let ch = chan(po);
send(ch, 42);
let r = recv(po);
let (ch, po) = pipes::stream();
ch.send(42);
let r = po.recv();
log(error, r);
}