Progress towards pipes.

This commit is contained in:
Eric Holk 2012-06-25 17:17:20 -07:00
parent 61be3cc19e
commit c2d3cdc3df

View File

@ -68,7 +68,8 @@ mod pipes {
}
}
fn send<T: send>(p: *packet<T>, -payload: T) {
fn send<T: send>(-p: send_packet<T>, -payload: T) {
let p = p.unwrap();
let p = unsafe { uniquify(p) };
assert (*p).payload == none;
(*p).payload <- some(payload);
@ -95,7 +96,8 @@ mod pipes {
}
}
fn recv<T: send>(p: *packet<T>) -> option<T> {
fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
let p = p.unwrap();
let p = unsafe { uniquify(p) };
loop {
let old_state = swap_state_acq((*p).state,
@ -148,6 +150,45 @@ mod pipes {
}
}
}
class send_packet<T: send> {
let mut p: option<*packet<T>>;
new(p: *packet<T>) { self.p = some(p); }
drop {
if self.p != none {
let mut p = none;
p <-> self.p;
sender_terminate(option::unwrap(p))
}
}
fn unwrap() -> *packet<T> {
let mut p = none;
p <-> self.p;
option::unwrap(p)
}
}
class recv_packet<T: send> {
let mut p: option<*packet<T>>;
new(p: *packet<T>) { self.p = some(p); }
drop {
if self.p != none {
let mut p = none;
p <-> self.p;
receiver_terminate(option::unwrap(p))
}
}
fn unwrap() -> *packet<T> {
let mut p = none;
p <-> self.p;
option::unwrap(p)
}
}
fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {
let p = packet();
(send_packet(p), recv_packet(p))
}
}
mod pingpong {
@ -155,60 +196,45 @@ mod pingpong {
enum pong = *pipes::packet<ping>;
fn init() -> (client::ping, server::ping) {
let p = pipes::packet();
let p = pingpong::ping(p);
let client = client::ping(p);
let server = server::ping(p);
(client, server)
pipes::entangle()
}
mod client {
enum ping = pingpong::ping;
enum pong = pingpong::pong;
type ping = pipes::send_packet<pingpong::ping>;
type pong = pipes::recv_packet<pingpong::pong>;
fn do_ping(-c: ping) -> pong {
let packet = pipes::packet();
let packet = pingpong::pong(packet);
let p = pipes::packet();
pipes::send(**c, copy packet);
pong(packet)
pipes::send(c, pingpong::ping(p));
pipes::recv_packet(p)
}
fn do_pong(-c: pong) -> (ping, ()) {
let packet = pipes::recv(**c);
alt packet {
none {
let packet = pipes::recv(c);
if packet == none {
fail "sender closed the connection"
}
some(new_packet) {
(ping(new_packet), ())
}
}
(pipes::send_packet(*option::unwrap(packet)), ())
}
}
mod server {
enum ping = pingpong::ping;
enum pong = pingpong::pong;
type ping = pipes::recv_packet<pingpong::ping>;
type pong = pipes::send_packet<pingpong::pong>;
fn do_ping(-c: ping) -> (pong, ()) {
let packet = pipes::recv(**c);
alt packet {
none { fail "sender closed the connection" }
some(new_packet) {
(pong(new_packet), ())
}
let packet = pipes::recv(c);
if packet == none {
fail "sender closed the connection"
}
(pipes::send_packet(*option::unwrap(packet)), ())
}
fn do_pong(-c: pong) -> ping {
let packet = pipes::packet();
let packet = pingpong::ping(packet);
pipes::send(**c, copy packet);
ping(packet)
let p = pipes::packet();
pipes::send(c, pingpong::pong(p));
pipes::recv_packet(p)
}
}
}
@ -218,7 +244,6 @@ fn client(-chan: pingpong::client::ping) {
log(error, "Sent ping");
let (chan, _data) = pingpong::client::do_pong(chan);
log(error, "Received pong");
pipes::sender_terminate(**chan);
}
fn server(-chan: pingpong::server::ping) {
@ -226,7 +251,6 @@ fn server(-chan: pingpong::server::ping) {
log(error, "Received ping");
let chan = pingpong::server::do_pong(chan);
log(error, "Sent pong");
pipes::receiver_terminate(**chan);
}
fn main() {