diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs index 116d240308a..c56b20453e5 100644 --- a/src/libstd/rt/io/mod.rs +++ b/src/libstd/rt/io/mod.rs @@ -474,17 +474,43 @@ pub trait Seek { fn seek(&mut self, pos: i64, style: SeekStyle); } -/// A listener is a value that listens for connections -pub trait Listener { - /// Wait for and accept an incoming connection - /// - /// Returns `None` on timeout. +/// A listener is a value that can consume itself to start listening for connections. +/// Doing so produces some sort of Acceptor. +pub trait Listener> { + /// Spin up the listener and start queueing incoming connections /// /// # Failure /// /// Raises `io_error` condition. If the condition is handled, + /// then `listen` returns `None`. + fn listen(self) -> Option; +} + +/// An acceptor is a value that presents incoming connections +pub trait Acceptor { + /// Wait for and accept an incoming connection + /// + /// # Failure + /// Raise `io_error` condition. If the condition is handled, /// then `accept` returns `None`. - fn accept(&mut self) -> Option; + fn accept(&mut self) -> Option; + + /// Create an iterator over incoming connections + fn incoming<'r>(&'r mut self) -> IncomingIterator<'r, Self> { + IncomingIterator { inc: self } + } +} + +/// An infinite iterator over incoming connection attempts. +/// Calling `next` will block the task until a connection is attempted. +struct IncomingIterator<'self, A> { + priv inc: &'self mut A, +} + +impl<'self, T, A: Acceptor> Iterator for IncomingIterator<'self, A> { + fn next(&mut self) -> Option { + self.inc.accept() + } } /// Common trait for decorator types. diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index ce66cd0de59..b7cb703eb25 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -11,12 +11,13 @@ use option::{Option, Some, None}; use result::{Ok, Err}; use rt::io::net::ip::SocketAddr; -use rt::io::{Reader, Writer, Listener}; +use rt::io::{Reader, Writer, Listener, Acceptor}; use rt::io::{io_error, read_error, EndOfFile}; use rt::rtio::{IoFactory, IoFactoryObject, - RtioSocket, RtioTcpListener, - RtioTcpListenerObject, RtioTcpStream, - RtioTcpStreamObject}; + RtioSocket, + RtioTcpListener, RtioTcpListenerObject, + RtioTcpAcceptor, RtioTcpAcceptorObject, + RtioTcpStream, RtioTcpStreamObject}; use rt::local::Local; pub struct TcpStream(~RtioTcpStreamObject); @@ -124,13 +125,27 @@ impl TcpListener { } } -impl Listener for TcpListener { +impl Listener for TcpListener { + fn listen(self) -> Option { + match (**self).listen() { + Ok(acceptor) => Some(TcpAcceptor(acceptor)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } +} + +pub struct TcpAcceptor(~RtioTcpAcceptorObject); + +impl Acceptor for TcpAcceptor { fn accept(&mut self) -> Option { match (**self).accept() { Ok(s) => Some(TcpStream::new(s)), Err(ioerr) => { io_error::cond.raise(ioerr); - return None; + None } } } @@ -184,8 +199,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == 99); @@ -204,8 +219,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == 99); @@ -224,8 +239,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -244,8 +259,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -265,8 +280,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -288,8 +303,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -311,8 +326,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let buf = [0]; loop { let mut stop = false; @@ -341,8 +356,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let buf = [0]; loop { let mut stop = false; @@ -371,9 +386,8 @@ mod test { let max = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - do max.times { - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + for ref mut stream in acceptor.incoming().take(max) { let mut buf = [0]; stream.read(buf); assert_eq!(buf[0], 99); @@ -396,9 +410,8 @@ mod test { let max = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - do max.times { - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + for ref mut stream in acceptor.incoming().take(max) { let mut buf = [0]; stream.read(buf); assert_eq!(buf[0], 99); @@ -421,10 +434,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for i in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask { let mut stream = stream.take(); @@ -460,10 +472,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for i in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask { let mut stream = stream.take(); @@ -499,10 +510,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for _ in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for stream in acceptor.incoming().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask_later { let mut stream = stream.take(); @@ -537,10 +547,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for _ in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for stream in acceptor.incoming().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask_later { let mut stream = stream.take(); @@ -573,10 +582,7 @@ mod test { fn socket_name(addr: SocketAddr) { do run_in_newsched_task { do spawntask { - let listener = TcpListener::bind(addr); - - assert!(listener.is_some()); - let mut listener = listener.unwrap(); + let mut listener = TcpListener::bind(addr).unwrap(); // Make sure socket_name gives // us the socket we binded to. @@ -592,9 +598,9 @@ mod test { fn peer_name(addr: SocketAddr) { do run_in_newsched_task { do spawntask { - let mut listener = TcpListener::bind(addr); + let mut acceptor = TcpListener::bind(addr).listen(); - listener.accept(); + acceptor.accept(); } do spawntask { diff --git a/src/libstd/rt/io/net/unix.rs b/src/libstd/rt/io/net/unix.rs index b85b7dd059d..1771a963ba7 100644 --- a/src/libstd/rt/io/net/unix.rs +++ b/src/libstd/rt/io/net/unix.rs @@ -40,6 +40,12 @@ impl UnixListener { } } -impl Listener for UnixListener { +impl Listener for UnixListener { + fn listen(self) -> Option { fail!() } +} + +pub struct UnixAcceptor; + +impl Acceptor for UnixAcceptor { fn accept(&mut self) -> Option { fail!() } } diff --git a/src/libstd/rt/io/option.rs b/src/libstd/rt/io/option.rs index 7dadc653e6c..098433f299c 100644 --- a/src/libstd/rt/io/option.rs +++ b/src/libstd/rt/io/option.rs @@ -17,7 +17,7 @@ //! # XXX Seek and Close use option::*; -use super::{Reader, Writer, Listener}; +use super::{Reader, Writer, Listener, Acceptor}; use super::{standard_error, PreviousIoError, io_error, read_error, IoError}; fn prev_io_error() -> IoError { @@ -62,10 +62,22 @@ impl Reader for Option { } } -impl, S> Listener for Option { - fn accept(&mut self) -> Option { +impl, L: Listener> Listener for Option { + fn listen(self) -> Option { + match self { + Some(listener) => listener.listen(), + None => { + io_error::cond.raise(prev_io_error()); + None + } + } + } +} + +impl> Acceptor for Option { + fn accept(&mut self) -> Option { match *self { - Some(ref mut listener) => listener.accept(), + Some(ref mut acceptor) => acceptor.accept(), None => { io_error::cond.raise(prev_io_error()); None diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 1788b7a04e3..6f1b33d1e21 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -26,6 +26,7 @@ pub type EventLoopObject = uvio::UvEventLoop; pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; +pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor; pub type RtioTcpListenerObject = uvio::UvTcpListener; pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; @@ -75,6 +76,10 @@ pub trait IoFactory { } pub trait RtioTcpListener : RtioSocket { + fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError>; +} + +pub trait RtioTcpAcceptor : RtioSocket { fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>; fn accept_simultaneously(&mut self) -> Result<(), IoError>; fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index f282432dcb5..e37dfba0cc1 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -26,6 +26,7 @@ use rt::local::Local; use rt::rtio::*; use rt::sched::{Scheduler, SchedHandle}; use rt::tube::Tube; +use rt::task::SchedHome; use rt::uv::*; use rt::uv::idle::IdleWatcher; use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr}; @@ -47,76 +48,80 @@ use task; // XXX we should not be calling uvll functions in here. trait HomingIO { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle; + /* XXX This will move pinned tasks to do IO on the proper scheduler * and then move them back to their home. */ - fn home_for_io(&mut self, io: &fn(&mut Self) -> A) -> A { - use rt::sched::{PinnedTask, TaskFromFriend}; - // go home - let old_home = Cell::new_empty(); - let old_home_ptr = &old_home; + fn go_to_IO_home(&mut self) -> SchedHome { + use rt::sched::PinnedTask; + do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - // get the old home first - do task.wake().map_move |mut task| { - old_home_ptr.put_back(task.take_unwrap_home()); - self.home().send(PinnedTask(task)); - }; + let mut old = None; + { + let ptr = &mut old; + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + /* FIXME(#8674) if the task was already killed then wake + * will return None. In that case, the home pointer will never be set. + * + * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action. + */ + do task.wake().map_move |mut task| { + *ptr = Some(task.take_unwrap_home()); + self.home().send(PinnedTask(task)); + }; + } } + old.expect("No old home because task had already been killed.") } + } - // do IO - let a = io(self); + // XXX dummy self param + fn restore_original_home(_dummy_self: Option, old: SchedHome) { + use rt::sched::TaskFromFriend; - // unhome home + let old = Cell::new(old); do task::unkillable { // FIXME(#8674) let scheduler: ~Scheduler = Local::take(); do scheduler.deschedule_running_task_and_then |scheduler, task| { + /* FIXME(#8674) if the task was already killed then wake + * will return None. In that case, the home pointer will never be restored. + * + * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action. + */ do task.wake().map_move |mut task| { - task.give_home(old_home.take()); + task.give_home(old.take()); scheduler.make_handle().send(TaskFromFriend(task)); }; } } + } - // return the result of the IO - a + fn home_for_io(&mut self, io: &fn(&mut Self) -> A) -> A { + let home = self.go_to_IO_home(); + let a = io(self); // do IO + HomingIO::restore_original_home(None:: /* XXX dummy self */, home); + a // return the result of the IO + } + + fn home_for_io_consume(self, io: &fn(Self) -> A) -> A { + let mut this = self; + let home = this.go_to_IO_home(); + let a = io(this); // do IO + HomingIO::restore_original_home(None:: /* XXX dummy self */, home); + a // return the result of the IO } fn home_for_io_with_sched(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A { - use rt::sched::{PinnedTask, TaskFromFriend}; - - do task::unkillable { // FIXME(#8674) - // go home - let old_home = Cell::new_empty(); - let old_home_ptr = &old_home; + let home = self.go_to_IO_home(); + let a = do task::unkillable { // FIXME(#8674) let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - // get the old home first - do task.wake().map_move |mut task| { - old_home_ptr.put_back(task.take_unwrap_home()); - self.home().send(PinnedTask(task)); - }; - } - - // do IO - let scheduler: ~Scheduler = Local::take(); - let a = io_sched(self, scheduler); - - // unhome home - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |scheduler, task| { - do task.wake().map_move |mut task| { - task.give_home(old_home.take()); - scheduler.make_handle().send(TaskFromFriend(task)); - }; - } - - // return the result of the IO - a - } + io_sched(self, scheduler) // do IO and scheduling action + }; + HomingIO::restore_original_home(None:: /* XXX dummy self */, home); + a // return result of IO } } @@ -594,9 +599,7 @@ impl IoFactory for UvIoFactory { } pub struct UvTcpListener { - watcher: TcpWatcher, - listening: bool, - incoming_streams: Tube>, + watcher : TcpWatcher, home: SchedHandle, } @@ -606,15 +609,8 @@ impl HomingIO for UvTcpListener { impl UvTcpListener { fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener { - UvTcpListener { - watcher: watcher, - listening: false, - incoming_streams: Tube::new(), - home: home, - } + UvTcpListener { watcher: watcher, home: home } } - - fn watcher(&self) -> TcpWatcher { self.watcher } } impl Drop for UvTcpListener { @@ -623,10 +619,10 @@ impl Drop for UvTcpListener { let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) }; do self_.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self_.watcher().as_stream().close { + let task = Cell::new(task); + do self_.watcher.as_stream().close { let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task.take()); } } } @@ -636,50 +632,71 @@ impl Drop for UvTcpListener { impl RtioSocket for UvTcpListener { fn socket_name(&mut self) -> Result { do self.home_for_io |self_| { - socket_name(Tcp, self_.watcher) + socket_name(Tcp, self_.watcher) } } } impl RtioTcpListener for UvTcpListener { - - fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { - do self.home_for_io |self_| { - - if !self_.listening { - self_.listening = true; - - let incoming_streams_cell = Cell::new(self_.incoming_streams.clone()); - - do self_.watcher().listen |mut server, status| { - let stream = match status { + fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> { + do self.home_for_io_consume |self_| { + let mut acceptor = ~UvTcpAcceptor::new(self_); + let incoming = Cell::new(acceptor.incoming.clone()); + do acceptor.listener.watcher.listen |mut server, status| { + do incoming.with_mut_ref |incoming| { + let inc = match status { Some(_) => Err(standard_error(OtherIoError)), None => { - let client = TcpWatcher::new(&server.event_loop()); - // XXX: needs to be surfaced in interface - server.accept(client.as_stream()); + let inc = TcpWatcher::new(&server.event_loop()); + // first accept call in the callback guarenteed to succeed + server.accept(inc.as_stream()); let home = get_handle_to_current_scheduler!(); - Ok(~UvTcpStream { watcher: client, home: home }) + Ok(~UvTcpStream { watcher: inc, home: home }) } }; - - let mut incoming_streams = incoming_streams_cell.take(); - incoming_streams.send(stream); - incoming_streams_cell.put_back(incoming_streams); + incoming.send(inc); } - - } - self_.incoming_streams.recv() + }; + Ok(acceptor) } } +} + +pub struct UvTcpAcceptor { + listener: UvTcpListener, + incoming: Tube>, +} + +impl HomingIO for UvTcpAcceptor { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } +} + +impl UvTcpAcceptor { + fn new(listener: UvTcpListener) -> UvTcpAcceptor { + UvTcpAcceptor { listener: listener, incoming: Tube::new() } + } +} + +impl RtioSocket for UvTcpAcceptor { + fn socket_name(&mut self) -> Result { + do self.home_for_io |self_| { + socket_name(Tcp, self_.listener.watcher) + } + } +} + +impl RtioTcpAcceptor for UvTcpAcceptor { + fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { + self.incoming.recv() + } fn accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int) + uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(self_.watcher(), r) { + match status_to_maybe_uv_error(self_.listener.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -689,10 +706,10 @@ impl RtioTcpListener for UvTcpListener { fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int) + uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(self_.watcher(), r) { + match status_to_maybe_uv_error(self_.listener.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1435,8 +1452,9 @@ fn test_simple_tcp_server_and_client() { do spawntask { unsafe { let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut listener = (*io).tcp_bind(addr).unwrap(); - let mut stream = listener.accept().unwrap(); + let listener = (*io).tcp_bind(addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; let nread = stream.read(buf).unwrap(); assert_eq!(nread, 8); @@ -1493,11 +1511,10 @@ fn test_simple_tcp_server_and_client_on_diff_threads() { }; let server_fn: ~fn() = || { - let io: *mut IoFactoryObject = unsafe { - Local::unsafe_borrow() - }; - let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() }; - let mut stream = listener.accept().unwrap(); + let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() }; + let listener = unsafe { (*io).tcp_bind(server_addr).unwrap() }; + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; let nread = stream.read(buf).unwrap(); assert_eq!(nread, 8); @@ -1578,8 +1595,9 @@ fn test_read_and_block() { do spawntask { let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() }; - let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() }; - let mut stream = listener.accept().unwrap(); + let listener = unsafe { (*io).tcp_bind(addr).unwrap() }; + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; let expected = 32; @@ -1634,8 +1652,9 @@ fn test_read_read_read() { do spawntask { unsafe { let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut listener = (*io).tcp_bind(addr).unwrap(); - let mut stream = listener.accept().unwrap(); + let listener = (*io).tcp_bind(addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let buf = [1, .. 2048]; let mut total_bytes_written = 0; while total_bytes_written < MAX {