core: Wire up stream to newsched

This commit is contained in:
Brian Anderson 2013-05-17 17:47:10 -07:00
parent 26becc308e
commit df9e41278e
3 changed files with 366 additions and 251 deletions

View File

@ -25,7 +25,7 @@ use unstable::sync::{Exclusive, exclusive};
use rtcomm = rt::comm;
use rt;
use pipes::{recv, try_recv, wait_many, peek, PacketHeader};
use pipes::{wait_many, PacketHeader};
// FIXME #5160: Making this public exposes some plumbing from
// pipes. Needs some refactoring
@ -61,76 +61,14 @@ pub trait Peekable<T> {
fn peek(&self) -> bool;
}
// Streams - Make pipes a little easier in general.
/*proto! streamp (
Open:send<T: Owned> {
data(T) -> Open<T>
}
)*/
#[allow(non_camel_case_types)]
pub mod streamp {
priv use core::kinds::Owned;
pub fn init<T: Owned>() -> (client::Open<T>, server::Open<T>) {
pub use core::pipes::HasBuffer;
::core::pipes::entangle()
}
#[allow(non_camel_case_types)]
pub enum Open<T> { pub data(T, server::Open<T>), }
#[allow(non_camel_case_types)]
pub mod client {
priv use core::kinds::Owned;
#[allow(non_camel_case_types)]
pub fn try_data<T: Owned>(pipe: Open<T>, x_0: T) ->
::core::option::Option<Open<T>> {
{
use super::data;
let (c, s) = ::core::pipes::entangle();
let message = data(x_0, s);
if ::core::pipes::send(pipe, message) {
::core::pipes::rt::make_some(c)
} else { ::core::pipes::rt::make_none() }
}
}
#[allow(non_camel_case_types)]
pub fn data<T: Owned>(pipe: Open<T>, x_0: T) -> Open<T> {
{
use super::data;
let (c, s) = ::core::pipes::entangle();
let message = data(x_0, s);
::core::pipes::send(pipe, message);
c
}
}
#[allow(non_camel_case_types)]
pub type Open<T> = ::core::pipes::SendPacket<super::Open<T>>;
}
#[allow(non_camel_case_types)]
pub mod server {
#[allow(non_camel_case_types)]
pub type Open<T> = ::core::pipes::RecvPacket<super::Open<T>>;
}
}
/// An endpoint that can send many messages.
#[unsafe_mut_field(endp)]
pub struct Chan<T> {
endp: Option<streamp::client::Open<T>>
inner: Either<pipesy::Chan<T>, rtcomm::Chan<T>>
}
/// An endpoint that can receive many messages.
#[unsafe_mut_field(endp)]
pub struct Port<T> {
endp: Option<streamp::server::Open<T>>,
inner: Either<pipesy::Port<T>, rtcomm::Port<T>>
}
/** Creates a `(Port, Chan)` pair.
@ -139,100 +77,75 @@ These allow sending or receiving an unlimited number of messages.
*/
pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) {
let (c, s) = streamp::init();
(Port {
endp: Some(s)
}, Chan {
endp: Some(c)
})
let (port, chan) = match rt::context() {
rt::OldTaskContext => match pipesy::stream() {
(p, c) => (Left(p), Left(c))
},
_ => match rtcomm::stream() {
(p, c) => (Right(p), Right(c))
}
};
let port = Port { inner: port };
let chan = Chan { inner: chan };
return (port, chan);
}
impl<T: Owned> GenericChan<T> for Chan<T> {
#[inline(always)]
fn send(&self, x: T) {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
*self_endp = Some(streamp::client::data(endp.unwrap(), x))
match self.inner {
Left(ref chan) => chan.send(x),
Right(ref chan) => chan.send(x)
}
}
}
impl<T: Owned> GenericSmartChan<T> for Chan<T> {
#[inline(always)]
fn try_send(&self, x: T) -> bool {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
match streamp::client::try_data(endp.unwrap(), x) {
Some(next) => {
*self_endp = Some(next);
true
}
None => false
}
match self.inner {
Left(ref chan) => chan.try_send(x),
Right(ref chan) => chan.try_send(x)
}
}
}
impl<T: Owned> GenericPort<T> for Port<T> {
#[inline(always)]
fn recv(&self) -> T {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
let streamp::data(x, endp) = recv(endp.unwrap());
*self_endp = Some(endp);
x
match self.inner {
Left(ref port) => port.recv(),
Right(ref port) => port.recv()
}
}
#[inline(always)]
fn try_recv(&self) -> Option<T> {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
match try_recv(endp.unwrap()) {
Some(streamp::data(x, endp)) => {
*self_endp = Some(endp);
Some(x)
}
None => None
}
match self.inner {
Left(ref port) => port.try_recv(),
Right(ref port) => port.try_recv()
}
}
}
impl<T: Owned> Peekable<T> for Port<T> {
#[inline(always)]
fn peek(&self) -> bool {
unsafe {
let self_endp = transmute_mut(&self.endp);
let mut endp = replace(self_endp, None);
let peek = match endp {
Some(ref mut endp) => peek(endp),
None => fail!("peeking empty stream")
};
*self_endp = endp;
peek
match self.inner {
Left(ref port) => port.peek(),
Right(ref port) => port.peek()
}
}
}
impl<T: Owned> Selectable for Port<T> {
fn header(&mut self) -> *mut PacketHeader {
match self.endp {
Some(ref mut endp) => endp.header(),
None => fail!("peeking empty stream")
}
match self.inner {
Left(ref mut port) => port.header(),
Right(_) => fail!("can't select on newsched ports")
}
}
}
/// Treat many ports as one.
#[unsafe_mut_field(ports)]
pub struct PortSet<T> {
ports: ~[Port<T>],
ports: ~[pipesy::Port<T>],
}
pub impl<T: Owned> PortSet<T> {
@ -243,6 +156,11 @@ pub impl<T: Owned> PortSet<T> {
}
fn add(&self, port: Port<T>) {
let Port { inner } = port;
let port = match inner {
Left(p) => p,
Right(_) => fail!("PortSet not implemented")
};
unsafe {
let self_ports = transmute_mut(&self.ports);
self_ports.push(port)
@ -290,7 +208,7 @@ impl<T: Owned> Peekable<T> for PortSet<T> {
// It'd be nice to use self.port.each, but that version isn't
// pure.
for uint::range(0, vec::uniq_len(&const self.ports)) |i| {
let port: &Port<T> = &self.ports[i];
let port: &pipesy::Port<T> = &self.ports[i];
if port.peek() {
return true;
}
@ -301,12 +219,17 @@ impl<T: Owned> Peekable<T> for PortSet<T> {
/// A channel that can be shared between many senders.
pub struct SharedChan<T> {
ch: Exclusive<Chan<T>>
ch: Exclusive<pipesy::Chan<T>>
}
impl<T: Owned> SharedChan<T> {
/// Converts a `chan` into a `shared_chan`.
pub fn new(c: Chan<T>) -> SharedChan<T> {
let Chan { inner } = c;
let c = match inner {
Left(c) => c,
Right(_) => fail!("SharedChan not implemented")
};
SharedChan { ch: exclusive(c) }
}
}
@ -354,12 +277,8 @@ pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
(p, c) => (Right(p), Right(c))
}
};
let port = PortOne {
inner: port
};
let chan = ChanOne {
inner: chan
};
let port = PortOne { inner: port };
let chan = ChanOne { inner: chan };
return (port, chan);
}
@ -435,7 +354,10 @@ mod pipesy {
use kinds::Owned;
use option::{Option, Some, None};
use pipes::{recv, try_recv};
use pipes::{recv, try_recv, peek, PacketHeader};
use super::{GenericChan, GenericSmartChan, GenericPort, Peekable, Selectable};
use cast::transmute_mut;
use util::replace;
/*proto! oneshot (
Oneshot:send<T:Owned> {
@ -610,6 +532,173 @@ mod pipesy {
}
}
// Streams - Make pipes a little easier in general.
/*proto! streamp (
Open:send<T: Owned> {
data(T) -> Open<T>
}
)*/
#[allow(non_camel_case_types)]
pub mod streamp {
priv use core::kinds::Owned;
pub fn init<T: Owned>() -> (client::Open<T>, server::Open<T>) {
pub use core::pipes::HasBuffer;
::core::pipes::entangle()
}
#[allow(non_camel_case_types)]
pub enum Open<T> { pub data(T, server::Open<T>), }
#[allow(non_camel_case_types)]
pub mod client {
priv use core::kinds::Owned;
#[allow(non_camel_case_types)]
pub fn try_data<T: Owned>(pipe: Open<T>, x_0: T) ->
::core::option::Option<Open<T>> {
{
use super::data;
let (c, s) = ::core::pipes::entangle();
let message = data(x_0, s);
if ::core::pipes::send(pipe, message) {
::core::pipes::rt::make_some(c)
} else { ::core::pipes::rt::make_none() }
}
}
#[allow(non_camel_case_types)]
pub fn data<T: Owned>(pipe: Open<T>, x_0: T) -> Open<T> {
{
use super::data;
let (c, s) = ::core::pipes::entangle();
let message = data(x_0, s);
::core::pipes::send(pipe, message);
c
}
}
#[allow(non_camel_case_types)]
pub type Open<T> = ::core::pipes::SendPacket<super::Open<T>>;
}
#[allow(non_camel_case_types)]
pub mod server {
#[allow(non_camel_case_types)]
pub type Open<T> = ::core::pipes::RecvPacket<super::Open<T>>;
}
}
/// An endpoint that can send many messages.
#[unsafe_mut_field(endp)]
pub struct Chan<T> {
endp: Option<streamp::client::Open<T>>
}
/// An endpoint that can receive many messages.
#[unsafe_mut_field(endp)]
pub struct Port<T> {
endp: Option<streamp::server::Open<T>>,
}
/** Creates a `(Port, Chan)` pair.
These allow sending or receiving an unlimited number of messages.
*/
pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) {
let (c, s) = streamp::init();
(Port {
endp: Some(s)
}, Chan {
endp: Some(c)
})
}
impl<T: Owned> GenericChan<T> for Chan<T> {
#[inline(always)]
fn send(&self, x: T) {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
*self_endp = Some(streamp::client::data(endp.unwrap(), x))
}
}
}
impl<T: Owned> GenericSmartChan<T> for Chan<T> {
#[inline(always)]
fn try_send(&self, x: T) -> bool {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
match streamp::client::try_data(endp.unwrap(), x) {
Some(next) => {
*self_endp = Some(next);
true
}
None => false
}
}
}
}
impl<T: Owned> GenericPort<T> for Port<T> {
#[inline(always)]
fn recv(&self) -> T {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
let streamp::data(x, endp) = recv(endp.unwrap());
*self_endp = Some(endp);
x
}
}
#[inline(always)]
fn try_recv(&self) -> Option<T> {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
match try_recv(endp.unwrap()) {
Some(streamp::data(x, endp)) => {
*self_endp = Some(endp);
Some(x)
}
None => None
}
}
}
}
impl<T: Owned> Peekable<T> for Port<T> {
#[inline(always)]
fn peek(&self) -> bool {
unsafe {
let self_endp = transmute_mut(&self.endp);
let mut endp = replace(self_endp, None);
let peek = match endp {
Some(ref mut endp) => peek(endp),
None => fail!("peeking empty stream")
};
*self_endp = endp;
peek
}
}
}
impl<T: Owned> Selectable for Port<T> {
fn header(&mut self) -> *mut PacketHeader {
match self.endp {
Some(ref mut endp) => endp.header(),
None => fail!("peeking empty stream")
}
}
}
}
/// Returns the index of an endpoint that is ready to receive.

View File

@ -8,6 +8,13 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Ports and channels.
//!
//! XXX: Carefully consider whether the sequentially consistent
//! atomics here can be converted to acq/rel. I'm not sure they can,
//! because there is data being transerred in both directions (the payload
//! goes from sender to receiver and the task pointer goes the other way).
use option::*;
use cast;
use util;
@ -29,33 +36,37 @@ use cell::Cell;
///
/// * 2 - both endpoints are alive
/// * 1 - either the sender or the receiver is dead, determined by context
/// * <ptr> - A pointer to a Task that can be transmuted to ~Task
/// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task
type State = int;
static STATE_BOTH: State = 2;
static STATE_ONE: State = 1;
/// The heap-allocated structure shared between two endpoints.
struct Packet<T> {
state: State,
payload: Option<T>,
}
pub struct PortOne<T> {
// XXX: Hack extra allocation to make by-val self work
inner: ~PortOneHack<T>
}
/// A one-shot channel.
pub struct ChanOne<T> {
// XXX: Hack extra allocation to make by-val self work
inner: ~ChanOneHack<T>
}
pub struct PortOneHack<T> {
/// A one-shot port.
pub struct PortOne<T> {
// XXX: Hack extra allocation to make by-val self work
inner: ~PortOneHack<T>
}
pub struct ChanOneHack<T> {
void_packet: *mut Void,
suppress_finalize: bool
}
pub struct ChanOneHack<T> {
pub struct PortOneHack<T> {
void_packet: *mut Void,
suppress_finalize: bool
}
@ -84,6 +95,54 @@ pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
}
}
impl<T> ChanOne<T> {
pub fn send(self, val: T) {
self.try_send(val);
}
pub fn try_send(self, val: T) -> bool {
let mut this = self;
let mut recvr_active = true;
let packet = this.inner.packet();
unsafe {
// Install the payload
assert!((*packet).payload.is_none());
(*packet).payload = Some(val);
// Atomically swap out the old state to figure out what
// the port's up to, issuing a release barrier to prevent
// reordering of the payload write. This also issues an
// acquire barrier that keeps the subsequent access of the
// ~Task pointer from being reordered.
let oldstate = atomic_xchg(&mut (*packet).state, STATE_ONE);
match oldstate {
STATE_BOTH => {
// Port is not waiting yet. Nothing to do
}
STATE_ONE => {
// Port has closed. Need to clean up.
let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
recvr_active = false;
}
task_as_state => {
// Port is blocked. Wake it up.
let recvr: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take();
sched.schedule_task(recvr);
}
}
}
// Suppress the synchronizing actions in the finalizer. We're done with the packet.
this.inner.suppress_finalize = true;
return recvr_active;
}
}
impl<T> PortOne<T> {
pub fn recv(self) -> T {
match self.try_recv() {
@ -96,30 +155,31 @@ impl<T> PortOne<T> {
pub fn try_recv(self) -> Option<T> {
let mut this = self;
let packet = this.inner.packet();
{
let self_ptr: *mut PortOne<T> = &mut this;
// XXX: Optimize this to not require the two context switches when data is available
// XXX: Optimize this to not require the two context switches when data is available
// Switch to the scheduler
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
unsafe {
let task_as_state: State = cast::transmute(task);
let oldstate = atomic_xchg(&mut (*(*self_ptr).inner.packet()).state, task_as_state);
match oldstate {
STATE_BOTH => {
// Data has not been sent. Now we're blocked.
}
STATE_ONE => {
// Channel is closed. Switch back and check the data.
let task: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take();
sched.resume_task_immediately(task);
}
_ => util::unreachable()
// Switch to the scheduler to put the ~Task into the Packet state.
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
unsafe {
// Atomically swap the task pointer into the Packet state, issuing
// an acquire barrier to prevent reordering of the subsequent read
// of the payload. Also issues a release barrier to prevent reordering
// of any previous writes to the task structure.
let task_as_state: State = cast::transmute(task);
let oldstate = atomic_xchg(&mut (*packet).state, task_as_state);
match oldstate {
STATE_BOTH => {
// Data has not been sent. Now we're blocked.
}
STATE_ONE => {
// Channel is closed. Switch back and check the data.
let task: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take();
sched.resume_task_immediately(task);
}
_ => util::unreachable()
}
}
}
@ -130,20 +190,20 @@ impl<T> PortOne<T> {
// payload. Some scenarios:
//
// 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
// 2) We encountered STATE_BOTH above and blocked. The sending task work-stole us
// and ran on its thread. The work stealing had a memory barrier.
// 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
// and ran on its thread. The sending task issued a read barrier when taking the
// pointer to the receiving task.
// 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
// is pinned to some other scheduler, so the sending task had to give us to
// a different scheduler for resuming. That send synchronized memory.
unsafe {
let payload = util::replace(&mut (*this.inner.packet()).payload, None);
let payload = util::replace(&mut (*packet).payload, None);
// The sender has closed up shop. Drop the packet.
let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
// Supress the finalizer. We're done here.
// Suppress the synchronizing actions in the finalizer. We're done with the packet.
this.inner.suppress_finalize = true;
return payload;
}
}
@ -167,42 +227,30 @@ impl<T> Peekable<T> for PortOne<T> {
}
}
impl<T> ChanOne<T> {
pub fn send(self, val: T) {
self.try_send(val);
}
pub fn try_send(self, val: T) -> bool {
let mut this = self;
let mut recvr_active = true;
#[unsafe_destructor]
impl<T> Drop for ChanOneHack<T> {
fn finalize(&self) {
if self.suppress_finalize { return }
unsafe {
assert!((*this.inner.packet()).payload.is_none());
(*this.inner.packet()).payload = Some(val);
let oldstate = atomic_xchg(&mut (*this.inner.packet()).state, STATE_ONE);
let this = cast::transmute_mut(self);
let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
match oldstate {
STATE_BOTH => {
// Port is not recving yet. Nothing to do
}
// Port still active. It will destroy the Packet.
},
STATE_ONE => {
// Port has closed. Need to clean up.
let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
recvr_active = false;
}
_ => {
// Port is blocked. Wake it up.
let recvr: ~Coroutine = cast::transmute(oldstate);
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
},
task_as_state => {
// The port is blocked waiting for a message we will never send. Wake it.
assert!((*this.packet()).payload.is_none());
let recvr: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take();
sched.schedule_task(recvr);
}
}
}
// Suppress the finalizer. We're done here.
this.inner.suppress_finalize = true;
return recvr_active;
}
}
@ -216,7 +264,7 @@ impl<T> Drop for PortOneHack<T> {
let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
match oldstate {
STATE_BOTH => {
/* cleanup is the chan's responsibility */
// Chan still active. It will destroy the packet.
},
STATE_ONE => {
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
@ -229,43 +277,6 @@ impl<T> Drop for PortOneHack<T> {
}
}
#[unsafe_destructor]
impl<T> Drop for ChanOneHack<T> {
fn finalize(&self) {
if self.suppress_finalize { return }
unsafe {
let this = cast::transmute_mut(self);
let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
match oldstate {
STATE_BOTH => {
/* cleanup is the port's responsibility */
},
STATE_ONE => {
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
},
_ => {
// The port is blocked recving for a message we will never send. Wake it.
assert!((*this.packet()).payload.is_none());
let recvr: ~Coroutine = cast::transmute(oldstate);
let sched = local_sched::take();
sched.schedule_task(recvr);
}
}
}
}
}
impl<T> PortOneHack<T> {
fn packet(&self) -> *mut Packet<T> {
unsafe {
let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
let p: *mut Packet<T> = &mut **p;
return p;
}
}
}
impl<T> ChanOneHack<T> {
fn packet(&self) -> *mut Packet<T> {
unsafe {
@ -276,18 +287,33 @@ impl<T> ChanOneHack<T> {
}
}
struct StreamPayload<T>(T, PortOne<StreamPayload<T>>);
pub struct Port<T> {
// FIXME #5372. Using Cell because we don't take &mut self
next: Cell<PortOne<StreamPayload<T>>>
impl<T> PortOneHack<T> {
fn packet(&self) -> *mut Packet<T> {
unsafe {
let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
let p: *mut Packet<T> = &mut **p;
return p;
}
}
}
struct StreamPayload<T> {
val: T,
next: PortOne<StreamPayload<T>>
}
/// A channel with unbounded size.
pub struct Chan<T> {
// FIXME #5372. Using Cell because we don't take &mut self
next: Cell<ChanOne<StreamPayload<T>>>
}
/// An port with unbounded size.
pub struct Port<T> {
// FIXME #5372. Using Cell because we don't take &mut self
next: Cell<PortOne<StreamPayload<T>>>
}
pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) {
let (pone, cone) = oneshot();
let port = Port { next: Cell(pone) };
@ -295,6 +321,21 @@ pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) {
return (port, chan);
}
impl<T: Owned> GenericChan<T> for Chan<T> {
fn send(&self, val: T) {
self.try_send(val);
}
}
impl<T: Owned> GenericSmartChan<T> for Chan<T> {
fn try_send(&self, val: T) -> bool {
let (next_pone, next_cone) = oneshot();
let cone = self.next.take();
self.next.put_back(next_cone);
cone.try_send(StreamPayload { val: val, next: next_pone })
}
}
impl<T> GenericPort<T> for Port<T> {
fn recv(&self) -> T {
match self.try_recv() {
@ -308,7 +349,7 @@ impl<T> GenericPort<T> for Port<T> {
fn try_recv(&self) -> Option<T> {
let pone = self.next.take();
match pone.try_recv() {
Some(StreamPayload(val, next)) => {
Some(StreamPayload { val, next }) => {
self.next.put_back(next);
Some(val)
}
@ -323,21 +364,6 @@ impl<T> Peekable<T> for Port<T> {
}
}
impl<T: Owned> GenericChan<T> for Chan<T> {
fn send(&self, val: T) {
self.try_send(val);
}
}
impl<T: Owned> GenericSmartChan<T> for Chan<T> {
fn try_send(&self, val: T) -> bool {
let (next_pone, next_cone) = oneshot();
let cone = self.next.take();
self.next.put_back(next_cone);
cone.try_send(StreamPayload(val, next_pone))
}
}
#[cfg(test)]
mod test {
use super::*;
@ -563,7 +589,7 @@ mod test {
}
#[test]
fn stream_send_recv() {
fn stream_send_recv_stress() {
for stress_factor().times {
do run_in_newsched_task {
let (port, chan) = stream::<~int>();

View File

@ -261,7 +261,7 @@ mod test {
use comm::*;
do run_in_newsched_task() {
let (port, chan) = oneshot();
let (port, chan) = stream();
chan.send(10);
assert!(port.recv() == 10);
}