Convert core::pipes to camel case

This commit is contained in:
Brian Anderson 2012-08-28 11:11:15 -07:00
parent e9b7ce6f57
commit cfbc7cbdc7
30 changed files with 318 additions and 236 deletions

View File

@ -2966,7 +2966,7 @@ computations in parallel. We might write something like:
~~~~
import task::spawn;
import pipes::{stream, port, chan};
import pipes::{stream, Port, Chan};
let (chan, port) = stream();
@ -3016,7 +3016,7 @@ some other expensive computation and then waiting for the child's result
to arrive on the port:
~~~~
# import pipes::{stream, port, chan};
# import pipes::{stream, Port, Chan};
# fn some_other_expensive_computation() {}
# let (chan, port) = stream::<int>();
# chan.send(0);
@ -3038,7 +3038,7 @@ Here is the function that implements the child task:
~~~~
# import std::comm::DuplexStream;
# import pipes::{port, chan};
# import pipes::{Port, Chan};
fn stringifier(channel: DuplexStream<~str, uint>) {
let mut value: uint;
loop {
@ -3061,7 +3061,7 @@ Here is the code for the parent task:
~~~~
# import std::comm::DuplexStream;
# import pipes::{port, chan};
# import pipes::{Port, Chan};
# import task::spawn;
# fn stringifier(channel: DuplexStream<~str, uint>) {
# let mut value: uint;

View File

@ -241,6 +241,7 @@ mod comm;
mod task;
//#[warn(non_camel_ase_types)] pipec code continues to trip this warning
mod future;
#[warn(non_camel_case_types)]
mod pipes;
// Runtime and language-primitive support

View File

@ -82,24 +82,32 @@ import option::unwrap;
// Things used by code generated by the pipe compiler.
export entangle, get_buffer, drop_buffer;
export send_packet_buffered, recv_packet_buffered;
export packet, mk_packet, entangle_buffer, has_buffer, buffer_header;
export SendPacketBuffered, RecvPacketBuffered;
export Packet, packet, mk_packet, entangle_buffer, HasBuffer, BufferHeader;
// export these so we can find them in the buffer_resource
// destructor. This is probably a symptom of #3005.
export atomic_add_acq, atomic_sub_rel;
// User-level things
export send_packet, recv_packet, send, recv, try_recv, peek;
export SendPacket, RecvPacket, send, recv, try_recv, peek;
export select, select2, selecti, select2i, selectable;
export spawn_service, spawn_service_recv;
export stream, port, chan, SharedChan, PortSet, channel;
export oneshot, chan_one, port_one;
export stream, Port, Chan, SharedChan, PortSet, Channel;
export oneshot, ChanOne, PortOne;
export recv_one, try_recv_one, send_one, try_send_one;
// Functions used by the protocol compiler
export rt;
// XXX remove me
#[cfg(stage0)]
export has_buffer, buffer_header, packet;
#[cfg(stage0)]
export recv_packet_buffered, send_packet_buffered;
#[cfg(stage0)]
export send_packet, recv_packet, buffer_header;
#[doc(hidden)]
const SPIN_COUNT: uint = 0;
@ -108,14 +116,14 @@ macro_rules! move_it (
)
#[doc(hidden)]
enum state {
empty,
full,
blocked,
terminated
enum State {
Empty,
Full,
Blocked,
Terminated
}
struct buffer_header {
struct BufferHeader {
// Tracks whether this buffer needs to be freed. We can probably
// get away with restricting it to 0 or 1, if we're careful.
let mut ref_count: int;
@ -126,15 +134,19 @@ struct buffer_header {
// thing along.
}
// XXX remove me
#[cfg(stage0)]
fn buffer_header() -> BufferHeader { BufferHeader() }
// This is for protocols to associate extra data to thread around.
#[doc(hidden)]
type buffer<T: send> = {
header: buffer_header,
type Buffer<T: send> = {
header: BufferHeader,
data: T,
};
struct packet_header {
let mut state: state;
struct PacketHeader {
let mut state: State;
let mut blocked_task: *rust_task;
// This is a reinterpret_cast of a ~buffer, that can also be cast
@ -142,53 +154,73 @@ struct packet_header {
let mut buffer: *libc::c_void;
new() {
self.state = empty;
self.state = Empty;
self.blocked_task = ptr::null();
self.buffer = ptr::null();
}
// Returns the old state.
unsafe fn mark_blocked(this: *rust_task) -> state {
unsafe fn mark_blocked(this: *rust_task) -> State {
rustrt::rust_task_ref(this);
let old_task = swap_task(&mut self.blocked_task, this);
assert old_task.is_null();
swap_state_acq(&mut self.state, blocked)
swap_state_acq(&mut self.state, Blocked)
}
unsafe fn unblock() {
let old_task = swap_task(&mut self.blocked_task, ptr::null());
if !old_task.is_null() { rustrt::rust_task_deref(old_task) }
match swap_state_acq(&mut self.state, empty) {
empty | blocked => (),
terminated => self.state = terminated,
full => self.state = full
match swap_state_acq(&mut self.state, Empty) {
Empty | Blocked => (),
Terminated => self.state = Terminated,
Full => self.state = Full
}
}
// unsafe because this can do weird things to the space/time
// continuum. It ends making multiple unique pointers to the same
// thing. You'll proobably want to forget them when you're done.
unsafe fn buf_header() -> ~buffer_header {
unsafe fn buf_header() -> ~BufferHeader {
assert self.buffer.is_not_null();
reinterpret_cast(self.buffer)
}
fn set_buffer<T: send>(b: ~buffer<T>) unsafe {
fn set_buffer<T: send>(b: ~Buffer<T>) unsafe {
self.buffer = reinterpret_cast(b);
}
}
#[doc(hidden)]
type packet<T: send> = {
header: packet_header,
type Packet<T: send> = {
header: PacketHeader,
mut payload: Option<T>,
};
// XXX remove me
#[cfg(stage0)]
#[allow(non_camel_case_types)]
type packet<T: send> = Packet<T>;
#[doc(hidden)]
trait HasBuffer {
// XXX This should not have a trailing underscore
fn set_buffer_(b: *libc::c_void);
}
impl<T: send> Packet<T>: HasBuffer {
fn set_buffer_(b: *libc::c_void) {
self.header.buffer = b;
}
}
#[cfg(stage0)] // XXX remove me
#[doc(hidden)]
#[allow(non_camel_case_types)]
trait has_buffer {
fn set_buffer(b: *libc::c_void);
}
#[cfg(stage0)] // XXX remove me
impl<T: send> packet<T>: has_buffer {
fn set_buffer(b: *libc::c_void) {
self.header.buffer = b;
@ -196,19 +228,19 @@ impl<T: send> packet<T>: has_buffer {
}
#[doc(hidden)]
fn mk_packet<T: send>() -> packet<T> {
fn mk_packet<T: send>() -> Packet<T> {
{
header: packet_header(),
header: PacketHeader(),
mut payload: None
}
}
#[doc(hidden)]
fn unibuffer<T: send>() -> ~buffer<packet<T>> {
fn unibuffer<T: send>() -> ~Buffer<Packet<T>> {
let b = ~{
header: buffer_header(),
header: BufferHeader(),
data: {
header: packet_header(),
header: PacketHeader(),
mut payload: None,
}
};
@ -221,7 +253,7 @@ fn unibuffer<T: send>() -> ~buffer<packet<T>> {
}
#[doc(hidden)]
fn packet<T: send>() -> *packet<T> {
fn packet<T: send>() -> *Packet<T> {
let b = unibuffer();
let p = ptr::addr_of(b.data);
// We'll take over memory management from here.
@ -231,13 +263,13 @@ fn packet<T: send>() -> *packet<T> {
#[doc(hidden)]
fn entangle_buffer<T: send, Tstart: send>(
+buffer: ~buffer<T>,
init: fn(*libc::c_void, x: &T) -> *packet<Tstart>)
-> (send_packet_buffered<Tstart, T>, recv_packet_buffered<Tstart, T>)
+buffer: ~Buffer<T>,
init: fn(*libc::c_void, x: &T) -> *Packet<Tstart>)
-> (SendPacketBuffered<Tstart, T>, RecvPacketBuffered<Tstart, T>)
{
let p = init(unsafe { reinterpret_cast(buffer) }, &buffer.data);
unsafe { forget(buffer) }
(send_packet_buffered(p), recv_packet_buffered(p))
(SendPacketBuffered(p), RecvPacketBuffered(p))
}
#[abi = "rust-intrinsic"]
@ -278,6 +310,7 @@ fn swap_task(+dst: &mut *rust_task, src: *rust_task) -> *rust_task {
}
#[doc(hidden)]
#[allow(non_camel_case_types)]
type rust_task = libc::c_void;
#[doc(hidden)]
@ -307,28 +340,28 @@ fn wait_event(this: *rust_task) -> *libc::c_void {
}
#[doc(hidden)]
fn swap_state_acq(+dst: &mut state, src: state) -> state {
fn swap_state_acq(+dst: &mut State, src: State) -> State {
unsafe {
transmute(rusti::atomic_xchg_acq(transmute(dst), src as int))
}
}
#[doc(hidden)]
fn swap_state_rel(+dst: &mut state, src: state) -> state {
fn swap_state_rel(+dst: &mut State, src: State) -> State {
unsafe {
transmute(rusti::atomic_xchg_rel(transmute(dst), src as int))
}
}
#[doc(hidden)]
unsafe fn get_buffer<T: send>(p: *packet_header) -> ~buffer<T> {
unsafe fn get_buffer<T: send>(p: *PacketHeader) -> ~Buffer<T> {
transmute((*p).buf_header())
}
// This could probably be done with SharedMutableState to avoid move_it!().
struct buffer_resource<T: send> {
let buffer: ~buffer<T>;
new(+b: ~buffer<T>) {
struct BufferResource<T: send> {
let buffer: ~Buffer<T>;
new(+b: ~Buffer<T>) {
//let p = ptr::addr_of(*b);
//error!("take %?", p);
atomic_add_acq(&mut b.header.ref_count, 1);
@ -353,7 +386,7 @@ struct buffer_resource<T: send> {
}
#[doc(hidden)]
fn send<T: send, Tbuffer: send>(+p: send_packet_buffered<T, Tbuffer>,
fn send<T: send, Tbuffer: send>(+p: SendPacketBuffered<T, Tbuffer>,
+payload: T) -> bool {
let header = p.header();
let p_ = p.unwrap();
@ -361,17 +394,17 @@ fn send<T: send, Tbuffer: send>(+p: send_packet_buffered<T, Tbuffer>,
assert ptr::addr_of(p.header) == header;
assert p.payload.is_none();
p.payload <- Some(payload);
let old_state = swap_state_rel(&mut p.header.state, full);
let old_state = swap_state_rel(&mut p.header.state, Full);
match old_state {
empty => {
Empty => {
// Yay, fastpath.
// The receiver will eventually clean this up.
//unsafe { forget(p); }
return true;
}
full => fail ~"duplicate send",
blocked => {
Full => fail ~"duplicate send",
Blocked => {
debug!("waking up task for %?", p_);
let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
if !old_task.is_null() {
@ -384,7 +417,7 @@ fn send<T: send, Tbuffer: send>(+p: send_packet_buffered<T, Tbuffer>,
//unsafe { forget(p); }
return true;
}
terminated => {
Terminated => {
// The receiver will never receive this. Rely on drop_glue
// to clean everything up.
return false;
@ -397,7 +430,7 @@ fn send<T: send, Tbuffer: send>(+p: send_packet_buffered<T, Tbuffer>,
Fails if the sender closes the connection.
*/
fn recv<T: send, Tbuffer: send>(+p: recv_packet_buffered<T, Tbuffer>) -> T {
fn recv<T: send, Tbuffer: send>(+p: RecvPacketBuffered<T, Tbuffer>) -> T {
option::unwrap_expect(try_recv(p), "connection closed")
}
@ -407,18 +440,18 @@ Returns `none` if the sender has closed the connection without sending
a message, or `Some(T)` if a message was received.
*/
fn try_recv<T: send, Tbuffer: send>(+p: recv_packet_buffered<T, Tbuffer>)
fn try_recv<T: send, Tbuffer: send>(+p: RecvPacketBuffered<T, Tbuffer>)
-> Option<T>
{
let p_ = p.unwrap();
let p = unsafe { &*p_ };
struct drop_state {
p: &packet_header;
struct DropState {
p: &PacketHeader;
drop {
if task::failing() {
self.p.state = terminated;
self.p.state = Terminated;
let old_task = swap_task(&mut self.p.blocked_task,
ptr::null());
if !old_task.is_null() {
@ -428,17 +461,17 @@ fn try_recv<T: send, Tbuffer: send>(+p: recv_packet_buffered<T, Tbuffer>)
}
};
let _drop_state = drop_state { p: &p.header };
let _drop_state = DropState { p: &p.header };
// optimistic path
match p.header.state {
full => {
Full => {
let mut payload = None;
payload <-> p.payload;
p.header.state = empty;
p.header.state = Empty;
return Some(option::unwrap(payload))
},
terminated => return None,
Terminated => return None,
_ => {}
}
@ -453,9 +486,9 @@ fn try_recv<T: send, Tbuffer: send>(+p: recv_packet_buffered<T, Tbuffer>)
loop {
rustrt::task_clear_event_reject(this);
let old_state = swap_state_acq(&mut p.header.state,
blocked);
Blocked);
match old_state {
empty => {
Empty => {
debug!("no data available on %?, going to sleep.", p_);
if count == 0 {
wait_event(this);
@ -471,23 +504,23 @@ fn try_recv<T: send, Tbuffer: send>(+p: recv_packet_buffered<T, Tbuffer>)
}
debug!("woke up, p.state = %?", copy p.header.state);
}
blocked => if first {
Blocked => if first {
fail ~"blocking on already blocked packet"
},
full => {
Full => {
let mut payload = None;
payload <-> p.payload;
let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::rust_task_deref(old_task);
}
p.header.state = empty;
p.header.state = Empty;
return Some(option::unwrap(payload))
}
terminated => {
Terminated => {
// This assert detects when we've accidentally unsafely
// casted too big of a number to a state.
assert old_state == terminated;
assert old_state == Terminated;
let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
if !old_task.is_null() {
@ -501,28 +534,28 @@ fn try_recv<T: send, Tbuffer: send>(+p: recv_packet_buffered<T, Tbuffer>)
}
/// Returns true if messages are available.
pure fn peek<T: send, Tb: send>(p: &recv_packet_buffered<T, Tb>) -> bool {
pure fn peek<T: send, Tb: send>(p: &RecvPacketBuffered<T, Tb>) -> bool {
match unsafe {(*p.header()).state} {
empty => false,
blocked => fail ~"peeking on blocked packet",
full | terminated => true
Empty => false,
Blocked => fail ~"peeking on blocked packet",
Full | Terminated => true
}
}
impl<T: send, Tb: send> recv_packet_buffered<T, Tb> {
impl<T: send, Tb: send> RecvPacketBuffered<T, Tb> {
pure fn peek() -> bool {
peek(&self)
}
}
#[doc(hidden)]
fn sender_terminate<T: send>(p: *packet<T>) {
fn sender_terminate<T: send>(p: *Packet<T>) {
let p = unsafe { &*p };
match swap_state_rel(&mut p.header.state, terminated) {
empty => {
match swap_state_rel(&mut p.header.state, Terminated) {
Empty => {
// The receiver will eventually clean up.
}
blocked => {
Blocked => {
// wake up the target
let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
if !old_task.is_null() {
@ -533,11 +566,11 @@ fn sender_terminate<T: send>(p: *packet<T>) {
}
// The receiver will eventually clean up.
}
full => {
Full => {
// This is impossible
fail ~"you dun goofed"
}
terminated => {
Terminated => {
assert p.header.blocked_task.is_null();
// I have to clean up, use drop_glue
}
@ -545,21 +578,21 @@ fn sender_terminate<T: send>(p: *packet<T>) {
}
#[doc(hidden)]
fn receiver_terminate<T: send>(p: *packet<T>) {
fn receiver_terminate<T: send>(p: *Packet<T>) {
let p = unsafe { &*p };
match swap_state_rel(&mut p.header.state, terminated) {
empty => {
match swap_state_rel(&mut p.header.state, Terminated) {
Empty => {
assert p.header.blocked_task.is_null();
// the sender will clean up
}
blocked => {
Blocked => {
let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::rust_task_deref(old_task);
assert old_task == rustrt::rust_get_task();
}
}
terminated | full => {
Terminated | Full => {
assert p.header.blocked_task.is_null();
// I have to clean up, use drop_glue
}
@ -576,7 +609,7 @@ that vector. The index points to an endpoint that has either been
closed by the sender or has a message waiting to be received.
*/
fn wait_many<T: selectable>(pkts: &[T]) -> uint {
fn wait_many<T: Selectable>(pkts: &[T]) -> uint {
let this = rustrt::rust_get_task();
rustrt::task_clear_event_reject(this);
@ -586,20 +619,20 @@ fn wait_many<T: selectable>(pkts: &[T]) -> uint {
let p = unsafe { &*p.header() };
let old = p.mark_blocked(this);
match old {
full | terminated => {
Full | Terminated => {
data_avail = true;
ready_packet = i;
(*p).state = old;
break;
}
blocked => fail ~"blocking on blocked packet",
empty => ()
Blocked => fail ~"blocking on blocked packet",
Empty => ()
}
}
while !data_avail {
debug!("sleeping on %? packets", pkts.len());
let event = wait_event(this) as *packet_header;
let event = wait_event(this) as *PacketHeader;
let pos = vec::position(pkts, |p| p.header() == event);
match pos {
@ -618,8 +651,8 @@ fn wait_many<T: selectable>(pkts: &[T]) -> uint {
debug!("%?, %?", ready_packet, pkts[ready_packet]);
unsafe {
assert (*pkts[ready_packet].header()).state == full
|| (*pkts[ready_packet].header()).state == terminated;
assert (*pkts[ready_packet].header()).state == Full
|| (*pkts[ready_packet].header()).state == Terminated;
}
ready_packet
@ -654,10 +687,10 @@ this case, `select2` may return either `left` or `right`.
*/
fn select2<A: send, Ab: send, B: send, Bb: send>(
+a: recv_packet_buffered<A, Ab>,
+b: recv_packet_buffered<B, Bb>)
-> Either<(Option<A>, recv_packet_buffered<B, Bb>),
(recv_packet_buffered<A, Ab>, Option<B>)>
+a: RecvPacketBuffered<A, Ab>,
+b: RecvPacketBuffered<B, Bb>)
-> Either<(Option<A>, RecvPacketBuffered<B, Bb>),
(RecvPacketBuffered<A, Ab>, Option<B>)>
{
let i = wait_many([a.header(), b.header()]/_);
@ -669,21 +702,21 @@ fn select2<A: send, Ab: send, B: send, Bb: send>(
}
#[doc(hidden)]
trait selectable {
pure fn header() -> *packet_header;
trait Selectable {
pure fn header() -> *PacketHeader;
}
impl *packet_header: selectable {
pure fn header() -> *packet_header { self }
impl *PacketHeader: Selectable {
pure fn header() -> *PacketHeader { self }
}
/// Returns the index of an endpoint that is ready to receive.
fn selecti<T: selectable>(endpoints: &[T]) -> uint {
fn selecti<T: Selectable>(endpoints: &[T]) -> uint {
wait_many(endpoints)
}
/// Returns 0 or 1 depending on which endpoint is ready to receive
fn select2i<A: selectable, B: selectable>(a: &A, b: &B) -> Either<(), ()> {
fn select2i<A: Selectable, B: Selectable>(a: &A, b: &B) -> Either<(), ()> {
match wait_many([a.header(), b.header()]/_) {
0 => Left(()),
1 => Right(()),
@ -695,8 +728,8 @@ fn select2i<A: selectable, B: selectable>(a: &A, b: &B) -> Either<(), ()> {
list of the remaining endpoints.
*/
fn select<T: send, Tb: send>(+endpoints: ~[recv_packet_buffered<T, Tb>])
-> (uint, Option<T>, ~[recv_packet_buffered<T, Tb>])
fn select<T: send, Tb: send>(+endpoints: ~[RecvPacketBuffered<T, Tb>])
-> (uint, Option<T>, ~[RecvPacketBuffered<T, Tb>])
{
let ready = wait_many(endpoints.map(|p| p.header()));
let mut remaining = endpoints;
@ -709,22 +742,33 @@ fn select<T: send, Tb: send>(+endpoints: ~[recv_packet_buffered<T, Tb>])
message.
*/
type send_packet<T: send> = send_packet_buffered<T, packet<T>>;
type SendPacket<T: send> = SendPacketBuffered<T, Packet<T>>;
#[doc(hidden)]
fn send_packet<T: send>(p: *packet<T>) -> send_packet<T> {
send_packet_buffered(p)
fn SendPacket<T: send>(p: *Packet<T>) -> SendPacket<T> {
SendPacketBuffered(p)
}
struct send_packet_buffered<T: send, Tbuffer: send> {
let mut p: Option<*packet<T>>;
let mut buffer: Option<buffer_resource<Tbuffer>>;
new(p: *packet<T>) {
// XXX remove me
#[cfg(stage0)]
#[allow(non_camel_case_types)]
type send_packet<T: send> = SendPacket<T>;
// XXX remove me
#[cfg(stage0)]
fn send_packet<T: send>(p: *packet<T>) -> SendPacket<T> {
SendPacket(p)
}
struct SendPacketBuffered<T: send, Tbuffer: send> {
let mut p: Option<*Packet<T>>;
let mut buffer: Option<BufferResource<Tbuffer>>;
new(p: *Packet<T>) {
//debug!("take send %?", p);
self.p = Some(p);
unsafe {
self.buffer = Some(
buffer_resource(
BufferResource(
get_buffer(ptr::addr_of((*p).header))));
};
}
@ -742,13 +786,13 @@ struct send_packet_buffered<T: send, Tbuffer: send> {
// "none"
// } else { "some" }); }
}
fn unwrap() -> *packet<T> {
fn unwrap() -> *Packet<T> {
let mut p = None;
p <-> self.p;
option::unwrap(p)
}
pure fn header() -> *packet_header {
pure fn header() -> *PacketHeader {
match self.p {
Some(packet) => unsafe {
let packet = &*packet;
@ -760,7 +804,7 @@ struct send_packet_buffered<T: send, Tbuffer: send> {
}
}
fn reuse_buffer() -> buffer_resource<Tbuffer> {
fn reuse_buffer() -> BufferResource<Tbuffer> {
//error!("send reuse_buffer");
let mut tmp = None;
tmp <-> self.buffer;
@ -768,24 +812,41 @@ struct send_packet_buffered<T: send, Tbuffer: send> {
}
}
// XXX remove me
#[cfg(stage0)]
#[allow(non_camel_case_types)]
type send_packet_buffered<T: send, Tbuffer: send> =
SendPacketBuffered<T, Tbuffer>;
/// Represents the receive end of a pipe. It can receive exactly one
/// message.
type recv_packet<T: send> = recv_packet_buffered<T, packet<T>>;
type RecvPacket<T: send> = RecvPacketBuffered<T, Packet<T>>;
#[doc(hidden)]
fn recv_packet<T: send>(p: *packet<T>) -> recv_packet<T> {
recv_packet_buffered(p)
fn RecvPacket<T: send>(p: *Packet<T>) -> RecvPacket<T> {
RecvPacketBuffered(p)
}
struct recv_packet_buffered<T: send, Tbuffer: send> : selectable {
let mut p: Option<*packet<T>>;
let mut buffer: Option<buffer_resource<Tbuffer>>;
new(p: *packet<T>) {
// XXX remove me
#[cfg(stage0)]
#[allow(non_camel_case_types)]
type recv_packet<T: send> = RecvPacket<T>;
// XXX remove me
#[cfg(stage0)]
fn recv_packet<T: send>(p: *packet<T>) -> RecvPacket<T> {
RecvPacket(p)
}
struct RecvPacketBuffered<T: send, Tbuffer: send> : Selectable {
let mut p: Option<*Packet<T>>;
let mut buffer: Option<BufferResource<Tbuffer>>;
new(p: *Packet<T>) {
//debug!("take recv %?", p);
self.p = Some(p);
unsafe {
self.buffer = Some(
buffer_resource(
BufferResource(
get_buffer(ptr::addr_of((*p).header))));
};
}
@ -803,13 +864,13 @@ struct recv_packet_buffered<T: send, Tbuffer: send> : selectable {
// "none"
// } else { "some" }); }
}
fn unwrap() -> *packet<T> {
fn unwrap() -> *Packet<T> {
let mut p = None;
p <-> self.p;
option::unwrap(p)
}
pure fn header() -> *packet_header {
pure fn header() -> *PacketHeader {
match self.p {
Some(packet) => unsafe {
let packet = &*packet;
@ -821,7 +882,7 @@ struct recv_packet_buffered<T: send, Tbuffer: send> : selectable {
}
}
fn reuse_buffer() -> buffer_resource<Tbuffer> {
fn reuse_buffer() -> BufferResource<Tbuffer> {
//error!("recv reuse_buffer");
let mut tmp = None;
tmp <-> self.buffer;
@ -829,10 +890,16 @@ struct recv_packet_buffered<T: send, Tbuffer: send> : selectable {
}
}
// XXX remove me
#[cfg(stage0)]
#[allow(non_camel_case_types)]
type recv_packet_buffered<T: send, Tbuffer: send> =
RecvPacketBuffered<T, Tbuffer>;
#[doc(hidden)]
fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {
fn entangle<T: send>() -> (SendPacket<T>, RecvPacket<T>) {
let p = packet();
(send_packet(p), recv_packet(p))
(SendPacket(p), RecvPacket(p))
}
/** Spawn a task to provide a service.
@ -843,10 +910,10 @@ endpoint is passed to the new task.
*/
fn spawn_service<T: send, Tb: send>(
init: extern fn() -> (send_packet_buffered<T, Tb>,
recv_packet_buffered<T, Tb>),
+service: fn~(+recv_packet_buffered<T, Tb>))
-> send_packet_buffered<T, Tb>
init: extern fn() -> (SendPacketBuffered<T, Tb>,
RecvPacketBuffered<T, Tb>),
+service: fn~(+RecvPacketBuffered<T, Tb>))
-> SendPacketBuffered<T, Tb>
{
let (client, server) = init();
@ -867,10 +934,10 @@ receive state.
*/
fn spawn_service_recv<T: send, Tb: send>(
init: extern fn() -> (recv_packet_buffered<T, Tb>,
send_packet_buffered<T, Tb>),
+service: fn~(+send_packet_buffered<T, Tb>))
-> recv_packet_buffered<T, Tb>
init: extern fn() -> (RecvPacketBuffered<T, Tb>,
SendPacketBuffered<T, Tb>),
+service: fn~(+SendPacketBuffered<T, Tb>))
-> RecvPacketBuffered<T, Tb>
{
let (client, server) = init();
@ -889,13 +956,13 @@ fn spawn_service_recv<T: send, Tb: send>(
// Streams - Make pipes a little easier in general.
proto! streamp (
open:send<T: send> {
data(T) -> open<T>
Open:send<T: send> {
data(T) -> Open<T>
}
)
/// A trait for things that can send multiple messages.
trait channel<T: send> {
trait Channel<T: send> {
// It'd be nice to call this send, but it'd conflict with the
// built in send kind.
@ -907,7 +974,7 @@ trait channel<T: send> {
}
/// A trait for things that can receive multiple messages.
trait recv<T: send> {
trait Recv<T: send> {
/// Receives a message, or fails if the connection closes.
fn recv() -> T;
@ -925,19 +992,19 @@ trait recv<T: send> {
}
#[doc(hidden)]
type chan_<T:send> = { mut endp: Option<streamp::client::open<T>> };
type Chan_<T:send> = { mut endp: Option<streamp::client::Open<T>> };
/// An endpoint that can send many messages.
enum chan<T:send> {
chan_(chan_<T>)
enum Chan<T:send> {
Chan_(Chan_<T>)
}
#[doc(hidden)]
type port_<T:send> = { mut endp: Option<streamp::server::open<T>> };
type Port_<T:send> = { mut endp: Option<streamp::server::Open<T>> };
/// An endpoint that can receive many messages.
enum port<T:send> {
port_(port_<T>)
enum Port<T:send> {
Port_(Port_<T>)
}
/** Creates a `(chan, port)` pair.
@ -945,13 +1012,13 @@ enum port<T:send> {
These allow sending or receiving an unlimited number of messages.
*/
fn stream<T:send>() -> (chan<T>, port<T>) {
fn stream<T:send>() -> (Chan<T>, Port<T>) {
let (c, s) = streamp::init();
(chan_({ mut endp: Some(c) }), port_({ mut endp: Some(s) }))
(Chan_({ mut endp: Some(c) }), Port_({ mut endp: Some(s) }))
}
impl<T: send> chan<T>: channel<T> {
impl<T: send> Chan<T>: Channel<T> {
fn send(+x: T) {
let mut endp = None;
endp <-> self.endp;
@ -972,7 +1039,7 @@ impl<T: send> chan<T>: channel<T> {
}
}
impl<T: send> port<T>: recv<T> {
impl<T: send> Port<T>: Recv<T> {
fn recv() -> T {
let mut endp = None;
endp <-> self.endp;
@ -1006,16 +1073,16 @@ impl<T: send> port<T>: recv<T> {
}
/// Treat many ports as one.
struct PortSet<T: send> : recv<T> {
let mut ports: ~[pipes::port<T>];
struct PortSet<T: send> : Recv<T> {
let mut ports: ~[pipes::Port<T>];
new() { self.ports = ~[]; }
fn add(+port: pipes::port<T>) {
fn add(+port: pipes::Port<T>) {
vec::push(self.ports, port)
}
fn chan() -> chan<T> {
fn chan() -> Chan<T> {
let (ch, po) = stream();
self.add(po);
ch
@ -1057,8 +1124,8 @@ struct PortSet<T: send> : recv<T> {
}
}
impl<T: send> port<T>: selectable {
pure fn header() -> *packet_header unchecked {
impl<T: send> Port<T>: Selectable {
pure fn header() -> *PacketHeader unchecked {
match self.endp {
Some(endp) => endp.header(),
None => fail ~"peeking empty stream"
@ -1067,9 +1134,9 @@ impl<T: send> port<T>: selectable {
}
/// A channel that can be shared between many senders.
type SharedChan<T: send> = unsafe::Exclusive<chan<T>>;
type SharedChan<T: send> = unsafe::Exclusive<Chan<T>>;
impl<T: send> SharedChan<T>: channel<T> {
impl<T: send> SharedChan<T>: Channel<T> {
fn send(+x: T) {
let mut xx = Some(x);
do self.with |chan| {
@ -1090,20 +1157,20 @@ impl<T: send> SharedChan<T>: channel<T> {
}
/// Converts a `chan` into a `shared_chan`.
fn SharedChan<T:send>(+c: chan<T>) -> SharedChan<T> {
fn SharedChan<T:send>(+c: Chan<T>) -> SharedChan<T> {
unsafe::exclusive(c)
}
/// Receive a message from one of two endpoints.
trait select2<T: send, U: send> {
trait Select2<T: send, U: send> {
/// Receive a message or return `none` if a connection closes.
fn try_select() -> Either<Option<T>, Option<U>>;
/// Receive a message or fail if a connection closes.
fn select() -> Either<T, U>;
}
impl<T: send, U: send, Left: selectable recv<T>, Right: selectable recv<U>>
(Left, Right): select2<T, U> {
impl<T: send, U: send, Left: Selectable Recv<T>, Right: Selectable Recv<U>>
(Left, Right): Select2<T, U> {
fn select() -> Either<T, U> {
match self {
@ -1125,18 +1192,18 @@ impl<T: send, U: send, Left: selectable recv<T>, Right: selectable recv<U>>
}
proto! oneshot (
oneshot:send<T:send> {
Oneshot:send<T:send> {
send(T) -> !
}
)
/// The send end of a oneshot pipe.
type chan_one<T: send> = oneshot::client::oneshot<T>;
type ChanOne<T: send> = oneshot::client::Oneshot<T>;
/// The receive end of a oneshot pipe.
type port_one<T: send> = oneshot::server::oneshot<T>;
type PortOne<T: send> = oneshot::server::Oneshot<T>;
/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
fn oneshot<T: send>() -> (chan_one<T>, port_one<T>) {
fn oneshot<T: send>() -> (ChanOne<T>, PortOne<T>) {
oneshot::init()
}
@ -1144,13 +1211,13 @@ fn oneshot<T: send>() -> (chan_one<T>, port_one<T>) {
* Receive a message from a oneshot pipe, failing if the connection was
* closed.
*/
fn recv_one<T: send>(+port: port_one<T>) -> T {
fn recv_one<T: send>(+port: PortOne<T>) -> T {
let oneshot::send(message) = recv(port);
message
}
/// Receive a message from a oneshot pipe unless the connection was closed.
fn try_recv_one<T: send> (+port: port_one<T>) -> Option<T> {
fn try_recv_one<T: send> (+port: PortOne<T>) -> Option<T> {
let message = try_recv(port);
if message.is_none() { None }
@ -1161,7 +1228,7 @@ fn try_recv_one<T: send> (+port: port_one<T>) -> Option<T> {
}
/// Send a message on a oneshot pipe, failing if the connection was closed.
fn send_one<T: send>(+chan: chan_one<T>, +data: T) {
fn send_one<T: send>(+chan: ChanOne<T>, +data: T) {
oneshot::client::send(chan, data);
}
@ -1169,7 +1236,7 @@ fn send_one<T: send>(+chan: chan_one<T>, +data: T) {
* Send a message on a oneshot pipe, or return false if the connection was
* closed.
*/
fn try_send_one<T: send>(+chan: chan_one<T>, +data: T)
fn try_send_one<T: send>(+chan: ChanOne<T>, +data: T)
-> bool {
oneshot::client::try_send(chan, data).is_some()
}

View File

@ -83,7 +83,7 @@ unsafe fn copy_lifetime<S,T>(_ptr: &a/S, ptr: &T) -> &a/T {
// An unwrapper uses this protocol to communicate with the "other" task that
// drops the last refcount on an arc. Unfortunately this can't be a proper
// pipe protocol because the unwrapper has to access both stages at once.
type UnwrapProto = ~mut Option<(pipes::chan_one<()>, pipes::port_one<bool>)>;
type UnwrapProto = ~mut Option<(pipes::ChanOne<()>, pipes::PortOne<bool>)>;
struct ArcData<T> {
mut count: libc::intptr_t;
@ -136,7 +136,7 @@ unsafe fn unwrap_shared_mutable_state<T: send>(+rc: SharedMutableState<T>)
-> T {
struct DeathThroes<T> {
mut ptr: Option<~ArcData<T>>;
mut response: Option<pipes::chan_one<bool>>;
mut response: Option<pipes::ChanOne<bool>>;
drop unsafe {
let response = option::swap_unwrap(&mut self.response);
// In case we get killed early, we need to tell the person who

View File

@ -11,14 +11,14 @@ Higher level communication abstractions.
// Make sure we follow the new conventions
#[forbid(non_camel_case_types)];
import pipes::{channel, recv, chan, port, selectable};
import pipes::{Channel, Recv, Chan, Port, Selectable};
export DuplexStream;
/// An extension of `pipes::stream` that allows both sending and receiving.
struct DuplexStream<T: send, U: send> : channel<T>, recv<U>, selectable {
priv chan: chan<T>;
priv port: port<U>;
struct DuplexStream<T: send, U: send> : Channel<T>, Recv<U>, Selectable {
priv chan: Chan<T>;
priv port: Port <U>;
fn send(+x: T) {
self.chan.send(x)
@ -40,7 +40,7 @@ struct DuplexStream<T: send, U: send> : channel<T>, recv<U>, selectable {
self.port.peek()
}
pure fn header() -> *pipes::packet_header {
pure fn header() -> *pipes::PacketHeader {
self.port.header()
}
}

View File

@ -19,13 +19,13 @@ import unsafe::{Exclusive, exclusive};
// Each waiting task receives on one of these.
#[doc(hidden)]
type WaitEnd = pipes::port_one<()>;
type WaitEnd = pipes::PortOne<()>;
#[doc(hidden)]
type SignalEnd = pipes::chan_one<()>;
type SignalEnd = pipes::ChanOne<()>;
// A doubly-ended queue of waiting tasks.
#[doc(hidden)]
struct Waitqueue { head: pipes::port<SignalEnd>;
tail: pipes::chan<SignalEnd>; }
struct Waitqueue { head: pipes::Port<SignalEnd>;
tail: pipes::Chan<SignalEnd>; }
fn new_waitqueue() -> Waitqueue {
let (block_tail, block_head) = pipes::stream();
@ -928,8 +928,8 @@ mod tests {
assert woken == 0;
}
struct SendOnFailure {
c: pipes::chan<()>;
new(+c: pipes::chan<()>) { self.c = c; }
c: pipes::Chan<()>;
new(+c: pipes::Chan<()>) { self.c = c; }
drop { self.c.send(()); }
}
}

View File

@ -4,6 +4,7 @@
// something smarter.
import ast::{ident, node_id};
import ast_util::respan;
import codemap::span;
import ext::base::mk_ctxt;
@ -209,8 +210,21 @@ impl ext_ctxt: ext_ctxt_ast_builder {
fn item(name: ident,
span: span,
+node: ast::item_) -> @ast::item {
// XXX: Would be nice if our generated code didn't violate
// Rust coding conventions
let non_camel_case_attribute = respan(self.empty_span(), {
style: ast::attr_outer,
value: respan(self.empty_span(),
ast::meta_list(~"allow", ~[
@respan(self.empty_span(),
ast::meta_word(~"non_camel_case_types"))
])),
is_sugared_doc: false
});
@{ident: name,
attrs: ~[],
attrs: ~[non_camel_case_attribute],
id: self.next_id(),
node: node,
vis: ast::public,

View File

@ -72,10 +72,10 @@ impl message: gen_send {
};
body += ~"let b = pipe.reuse_buffer();\n";
body += fmt!("let %s = pipes::send_packet_buffered(\
body += fmt!("let %s = pipes::SendPacketBuffered(\
ptr::addr_of(b.buffer.data.%s));\n",
sp, next.name);
body += fmt!("let %s = pipes::recv_packet_buffered(\
body += fmt!("let %s = pipes::RecvPacketBuffered(\
ptr::addr_of(b.buffer.data.%s));\n",
rp, next.name);
}
@ -258,7 +258,7 @@ impl state: to_type_decls {
self.span,
cx.ty_path_ast_builder(
path(~[cx.ident_of(~"pipes"),
cx.ident_of(dir.to_str() + ~"_packet")],
cx.ident_of(dir.to_str() + ~"Packet")],
empty_span())
.add_ty(cx.ty_path_ast_builder(
path(~[cx.ident_of(self.proto.name),
@ -275,7 +275,7 @@ impl state: to_type_decls {
cx.ty_path_ast_builder(
path(~[cx.ident_of(~"pipes"),
cx.ident_of(dir.to_str()
+ ~"_packet_buffered")],
+ ~"PacketBuffered")],
empty_span())
.add_tys(~[cx.ty_path_ast_builder(
path(~[cx.ident_of(self.proto.name),
@ -321,7 +321,7 @@ impl protocol: gen_init {
};
cx.parse_item(fmt!("fn init%s() -> (client::%s, server::%s)\
{ import pipes::has_buffer; %s }",
{ import pipes::HasBuffer; %s }",
start_state.ty_params.to_source(cx),
start_state.to_ty(cx).to_source(cx),
start_state.to_ty(cx).to_source(cx),
@ -341,7 +341,7 @@ impl protocol: gen_init {
let buffer_fields = self.gen_buffer_init(ext_cx);
let buffer = #ast {
~{header: pipes::buffer_header(),
~{header: pipes::BufferHeader(),
data: $(buffer_fields)}
};
@ -349,7 +349,7 @@ impl protocol: gen_init {
ext_cx.block(
self.states.map_to_vec(
|s| ext_cx.parse_stmt(
fmt!("data.%s.set_buffer(buffer)",
fmt!("data.%s.set_buffer_(buffer)",
s.name))),
ext_cx.parse_expr(
fmt!("ptr::addr_of(data.%s)",
@ -390,7 +390,7 @@ impl protocol: gen_init {
}
let ty = s.to_ty(cx);
let fty = #ast[ty] {
pipes::packet<$(ty)>
pipes::Packet<$(ty)>
};
cx.ty_field_imm(cx.ident_of(s.name), fty)
};

View File

@ -10,8 +10,8 @@ enum direction {
impl direction: ToStr {
fn to_str() -> ~str {
match self {
send => ~"send",
recv => ~"recv"
send => ~"Send",
recv => ~"Recv"
}
}
}

View File

@ -14,7 +14,7 @@ use std;
import io::Writer;
import io::WriterUtil;
import pipes::{port, chan, SharedChan};
import pipes::{Port, Chan, SharedChan};
macro_rules! move_out (
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
@ -26,7 +26,7 @@ enum request {
stop
}
fn server(requests: port<request>, responses: pipes::chan<uint>) {
fn server(requests: Port<request>, responses: pipes::Chan<uint>) {
let mut count = 0u;
let mut done = false;
while !done {

View File

@ -10,7 +10,7 @@ use std;
import io::Writer;
import io::WriterUtil;
import pipes::{port, PortSet, chan};
import pipes::{Port, PortSet, Chan};
macro_rules! move_out (
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
@ -22,7 +22,7 @@ enum request {
stop
}
fn server(requests: PortSet<request>, responses: pipes::chan<uint>) {
fn server(requests: PortSet<request>, responses: pipes::Chan<uint>) {
let mut count = 0u;
let mut done = false;
while !done {

View File

@ -64,7 +64,7 @@ macro_rules! follow (
)
)
fn switch<T: send, Tb: send, U>(+endp: pipes::recv_packet_buffered<T, Tb>,
fn switch<T: send, Tb: send, U>(+endp: pipes::RecvPacketBuffered<T, Tb>,
f: fn(+Option<T>) -> U) -> U {
f(pipes::try_recv(endp))
}

View File

@ -7,7 +7,7 @@ import std::map;
import std::map::hashmap;
import std::sort;
import io::ReaderUtil;
import pipes::{stream, port, chan};
import pipes::{stream, Port, Chan};
// given a map, print a sorted version of it
fn sort_and_fmt(mm: hashmap<~[u8], uint>, total: uint) -> ~str {
@ -88,8 +88,8 @@ fn windows_with_carry(bb: &[u8], nn: uint,
return vec::slice(bb, len - (nn - 1u), len);
}
fn make_sequence_processor(sz: uint, from_parent: pipes::port<~[u8]>,
to_parent: pipes::chan<~str>) {
fn make_sequence_processor(sz: uint, from_parent: pipes::Port<~[u8]>,
to_parent: pipes::Chan<~str>) {
let freqs: hashmap<~[u8], uint> = map::bytes_hash();
let mut carry: ~[u8] = ~[];

View File

@ -15,8 +15,8 @@ use std;
import std::{time, getopts};
import io::WriterUtil;
import int::range;
import pipes::port;
import pipes::chan;
import pipes::Port;
import pipes::Chan;
import pipes::send;
import pipes::recv;
@ -24,7 +24,7 @@ import core::result;
import result::{Ok, Err};
fn fib(n: int) -> int {
fn pfib(c: chan<int>, n: int) {
fn pfib(c: Chan<int>, n: int) {
if n == 0 {
c.send(0);
} else if n <= 2 {

View File

@ -5,7 +5,7 @@
//
// The filename is a song reference; google it in quotes.
fn child_generation(gens_left: uint, -c: pipes::chan<()>) {
fn child_generation(gens_left: uint, -c: pipes::Chan<()>) {
// This used to be O(n^2) in the number of generations that ever existed.
// With this code, only as many generations are alive at a time as tasks
// alive at a time,

View File

@ -14,7 +14,7 @@ mod stream {
recv
}
}
type stream<T: send> = pipes::recv_packet<stream::stream<T>>;
type stream<T: send> = pipes::RecvPacket<stream::stream<T>>;
}
}

View File

@ -1,6 +1,6 @@
// xfail-fast
import pipes::{select2, selectable};
import pipes::{Select2, Selectable};
fn main() {
let (c,p) = pipes::stream();

View File

@ -36,7 +36,7 @@ macro_rules! move_it (
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
)
fn switch<T: send, U>(+endp: pipes::recv_packet<T>,
fn switch<T: send, U>(+endp: pipes::RecvPacket<T>,
f: fn(+Option<T>) -> U) -> U {
f(pipes::try_recv(endp))
}

View File

@ -12,21 +12,21 @@ mod pingpong {
type packets = {
// This is probably a resolve bug, I forgot to export packet,
// but since I didn't import pipes::*, it worked anyway.
ping: packet<ping>,
pong: packet<pong>,
ping: Packet<ping>,
pong: Packet<pong>,
};
fn init() -> (client::ping, server::ping) {
let buffer = ~{
header: buffer_header(),
header: BufferHeader(),
data: {
ping: mk_packet::<ping>(),
pong: mk_packet::<pong>()
}
};
do pipes::entangle_buffer(buffer) |buffer, data| {
data.ping.set_buffer(buffer);
data.pong.set_buffer(buffer);
data.ping.set_buffer_(buffer);
data.pong.set_buffer_(buffer);
ptr::addr_of(data.ping)
}
}
@ -36,32 +36,32 @@ mod pingpong {
fn ping(+pipe: ping) -> pong {
{
let b = pipe.reuse_buffer();
let s = send_packet_buffered(ptr::addr_of(b.buffer.data.pong));
let c = recv_packet_buffered(ptr::addr_of(b.buffer.data.pong));
let s = SendPacketBuffered(ptr::addr_of(b.buffer.data.pong));
let c = RecvPacketBuffered(ptr::addr_of(b.buffer.data.pong));
let message = pingpong::ping(s);
pipes::send(pipe, message);
c
}
}
type ping = pipes::send_packet_buffered<pingpong::ping,
type ping = pipes::SendPacketBuffered<pingpong::ping,
pingpong::packets>;
type pong = pipes::recv_packet_buffered<pingpong::pong,
type pong = pipes::RecvPacketBuffered<pingpong::pong,
pingpong::packets>;
}
mod server {
type ping = pipes::recv_packet_buffered<pingpong::ping,
type ping = pipes::RecvPacketBuffered<pingpong::ping,
pingpong::packets>;
fn pong(+pipe: pong) -> ping {
{
let b = pipe.reuse_buffer();
let s = send_packet_buffered(ptr::addr_of(b.buffer.data.ping));
let c = recv_packet_buffered(ptr::addr_of(b.buffer.data.ping));
let s = SendPacketBuffered(ptr::addr_of(b.buffer.data.ping));
let c = RecvPacketBuffered(ptr::addr_of(b.buffer.data.ping));
let message = pingpong::pong(s);
pipes::send(pipe, message);
c
}
}
type pong = pipes::send_packet_buffered<pingpong::pong,
type pong = pipes::SendPacketBuffered<pingpong::pong,
pingpong::packets>;
}
}

View File

@ -1,13 +1,13 @@
use std;
import pipes;
import pipes::chan;
import pipes::port;
import pipes::Chan;
import pipes::Port;
import task;
fn main() { test05(); }
fn test05_start(ch : chan<int>) {
fn test05_start(ch : Chan<int>) {
ch.send(10);
error!("sent 10");
ch.send(20);

View File

@ -2,7 +2,7 @@ use std;
import task;
import pipes;
fn start(c: pipes::chan<pipes::chan<~str>>) {
fn start(c: pipes::Chan<pipes::Chan<~str>>) {
let (ch, p) = pipes::stream();
c.send(ch);

View File

@ -2,7 +2,7 @@ use std;
import pipes;
import task;
fn start(c: pipes::chan<pipes::chan<int>>) {
fn start(c: pipes::Chan<pipes::Chan<int>>) {
let (ch, p) = pipes::stream();
c.send(ch);
}

View File

@ -3,7 +3,7 @@ import task;
import pipes;
import pipes::send;
fn start(c: pipes::chan<int>, start: int, number_of_messages: int) {
fn start(c: pipes::Chan<int>, start: int, number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { c.send(start + i); i += 1; }
}

View File

@ -26,7 +26,7 @@ fn main() {
debug!("main thread exiting");
}
fn child(x: int, ch: pipes::chan<int>) {
fn child(x: int, ch: pipes::Chan<int>) {
log(debug, x);
ch.send(x);
}

View File

@ -2,7 +2,7 @@
use std;
import task;
fn start(c: pipes::chan<int>, i0: int) {
fn start(c: pipes::Chan<int>, i0: int) {
let mut i = i0;
while i > 0 {
c.send(0);

View File

@ -3,9 +3,9 @@
use std;
import pipes;
import pipes::send;
import pipes::port;
import pipes::Port;
import pipes::recv;
import pipes::chan;
import pipes::Chan;
// Tests of ports and channels on various types
fn test_rec() {

View File

@ -1,13 +1,13 @@
use std;
import task;
import pipes;
import pipes::chan;
import pipes::Chan;
import pipes::send;
import pipes::recv;
fn main() { debug!("===== WITHOUT THREADS ====="); test00(); }
fn test00_start(ch: chan<int>, message: int, count: int) {
fn test00_start(ch: Chan<int>, message: int, count: int) {
debug!("Starting test00_start");
let mut i: int = 0;
while i < count {

View File

@ -3,7 +3,7 @@ import task;
fn main() { test00(); }
fn test00_start(c: pipes::chan<int>, start: int, number_of_messages: int) {
fn test00_start(c: pipes::Chan<int>, start: int, number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { c.send(start + i); i += 1; }
}

View File

@ -3,7 +3,7 @@ import task;
fn main() { test00(); }
fn test00_start(c: pipes::chan<int>, number_of_messages: int) {
fn test00_start(c: pipes::Chan<int>, number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { c.send(i + 0); i += 1; }
}

View File

@ -1,4 +1,4 @@
import pipes::{port, chan};
import pipes::{Port, Chan};
/*
This is about the simplest program that can successfully send a