auto merge of #8954 : anasazi/rust/tcp-acceptor, r=catamorphism

The Listener trait takes two type parameters, the type of connection and the type of Acceptor,
and specifies only one method, listen, which consumes the listener and produces an Acceptor.

The Acceptor trait takes one type parameter, the type of connection, and defines two methods.
The accept() method waits for an incoming connection attempt and returns the result.
The incoming() method creates an iterator over incoming connections and is a default method.

Example:

```rust
let listener = TcpListener.bind(addr); // Bind to a socket
let acceptor = listener.listen(); // Start the listener
for stream in acceptor.incoming() {
    // Process incoming connections forever (a failure will kill the task).
}
```

Closes #8689
This commit is contained in:
bors 2013-09-03 14:56:22 -07:00
commit 3c4e943881
6 changed files with 235 additions and 161 deletions

View File

@ -474,17 +474,43 @@ pub trait Seek {
fn seek(&mut self, pos: i64, style: SeekStyle);
}
/// A listener is a value that listens for connections
pub trait Listener<S> {
/// Wait for and accept an incoming connection
///
/// Returns `None` on timeout.
/// A listener is a value that can consume itself to start listening for connections.
/// Doing so produces some sort of Acceptor.
pub trait Listener<T, A: Acceptor<T>> {
/// Spin up the listener and start queueing incoming connections
///
/// # Failure
///
/// Raises `io_error` condition. If the condition is handled,
/// then `listen` returns `None`.
fn listen(self) -> Option<A>;
}
/// An acceptor is a value that presents incoming connections
pub trait Acceptor<T> {
/// Wait for and accept an incoming connection
///
/// # Failure
/// Raise `io_error` condition. If the condition is handled,
/// then `accept` returns `None`.
fn accept(&mut self) -> Option<S>;
fn accept(&mut self) -> Option<T>;
/// Create an iterator over incoming connections
fn incoming<'r>(&'r mut self) -> IncomingIterator<'r, Self> {
IncomingIterator { inc: self }
}
}
/// An infinite iterator over incoming connection attempts.
/// Calling `next` will block the task until a connection is attempted.
struct IncomingIterator<'self, A> {
priv inc: &'self mut A,
}
impl<'self, T, A: Acceptor<T>> Iterator<T> for IncomingIterator<'self, A> {
fn next(&mut self) -> Option<T> {
self.inc.accept()
}
}
/// Common trait for decorator types.

View File

@ -11,12 +11,13 @@
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::io::net::ip::SocketAddr;
use rt::io::{Reader, Writer, Listener};
use rt::io::{Reader, Writer, Listener, Acceptor};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{IoFactory, IoFactoryObject,
RtioSocket, RtioTcpListener,
RtioTcpListenerObject, RtioTcpStream,
RtioTcpStreamObject};
RtioSocket,
RtioTcpListener, RtioTcpListenerObject,
RtioTcpAcceptor, RtioTcpAcceptorObject,
RtioTcpStream, RtioTcpStreamObject};
use rt::local::Local;
pub struct TcpStream(~RtioTcpStreamObject);
@ -124,13 +125,27 @@ impl TcpListener {
}
}
impl Listener<TcpStream> for TcpListener {
impl Listener<TcpStream, TcpAcceptor> for TcpListener {
fn listen(self) -> Option<TcpAcceptor> {
match (**self).listen() {
Ok(acceptor) => Some(TcpAcceptor(acceptor)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
}
}
}
pub struct TcpAcceptor(~RtioTcpAcceptorObject);
impl Acceptor<TcpStream> for TcpAcceptor {
fn accept(&mut self) -> Option<TcpStream> {
match (**self).accept() {
Ok(s) => Some(TcpStream::new(s)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
return None;
None
}
}
}
@ -184,8 +199,8 @@ mod test {
let addr = next_test_ip4();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
@ -204,8 +219,8 @@ mod test {
let addr = next_test_ip6();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
@ -224,8 +239,8 @@ mod test {
let addr = next_test_ip4();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
@ -244,8 +259,8 @@ mod test {
let addr = next_test_ip6();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
@ -265,8 +280,8 @@ mod test {
let addr = next_test_ip4();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
@ -288,8 +303,8 @@ mod test {
let addr = next_test_ip6();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
@ -311,8 +326,8 @@ mod test {
let addr = next_test_ip4();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let buf = [0];
loop {
let mut stop = false;
@ -341,8 +356,8 @@ mod test {
let addr = next_test_ip6();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let buf = [0];
loop {
let mut stop = false;
@ -371,9 +386,8 @@ mod test {
let max = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
do max.times {
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
for ref mut stream in acceptor.incoming().take(max) {
let mut buf = [0];
stream.read(buf);
assert_eq!(buf[0], 99);
@ -396,9 +410,8 @@ mod test {
let max = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
do max.times {
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
for ref mut stream in acceptor.incoming().take(max) {
let mut buf = [0];
stream.read(buf);
assert_eq!(buf[0], 99);
@ -421,10 +434,9 @@ mod test {
static MAX: int = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
for i in range(0, MAX) {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
let mut acceptor = TcpListener::bind(addr).listen();
for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
let stream = Cell::new(stream);
// Start another task to handle the connection
do spawntask {
let mut stream = stream.take();
@ -460,10 +472,9 @@ mod test {
static MAX: int = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
for i in range(0, MAX) {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
let mut acceptor = TcpListener::bind(addr).listen();
for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
let stream = Cell::new(stream);
// Start another task to handle the connection
do spawntask {
let mut stream = stream.take();
@ -499,10 +510,9 @@ mod test {
static MAX: int = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
for _ in range(0, MAX) {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
let mut acceptor = TcpListener::bind(addr).listen();
for stream in acceptor.incoming().take(MAX as uint) {
let stream = Cell::new(stream);
// Start another task to handle the connection
do spawntask_later {
let mut stream = stream.take();
@ -537,10 +547,9 @@ mod test {
static MAX: int = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
for _ in range(0, MAX) {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
let mut acceptor = TcpListener::bind(addr).listen();
for stream in acceptor.incoming().take(MAX as uint) {
let stream = Cell::new(stream);
// Start another task to handle the connection
do spawntask_later {
let mut stream = stream.take();
@ -573,10 +582,7 @@ mod test {
fn socket_name(addr: SocketAddr) {
do run_in_newsched_task {
do spawntask {
let listener = TcpListener::bind(addr);
assert!(listener.is_some());
let mut listener = listener.unwrap();
let mut listener = TcpListener::bind(addr).unwrap();
// Make sure socket_name gives
// us the socket we binded to.
@ -592,9 +598,9 @@ mod test {
fn peer_name(addr: SocketAddr) {
do run_in_newsched_task {
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut acceptor = TcpListener::bind(addr).listen();
listener.accept();
acceptor.accept();
}
do spawntask {

View File

@ -40,6 +40,12 @@ impl UnixListener {
}
}
impl Listener<UnixStream> for UnixListener {
impl Listener<UnixStream, UnixAcceptor> for UnixListener {
fn listen(self) -> Option<UnixAcceptor> { fail!() }
}
pub struct UnixAcceptor;
impl Acceptor<UnixStream> for UnixAcceptor {
fn accept(&mut self) -> Option<UnixStream> { fail!() }
}

View File

@ -17,7 +17,7 @@
//! # XXX Seek and Close
use option::*;
use super::{Reader, Writer, Listener};
use super::{Reader, Writer, Listener, Acceptor};
use super::{standard_error, PreviousIoError, io_error, read_error, IoError};
fn prev_io_error() -> IoError {
@ -62,10 +62,22 @@ impl<R: Reader> Reader for Option<R> {
}
}
impl<L: Listener<S>, S> Listener<S> for Option<L> {
fn accept(&mut self) -> Option<S> {
impl<T, A: Acceptor<T>, L: Listener<T, A>> Listener<T, A> for Option<L> {
fn listen(self) -> Option<A> {
match self {
Some(listener) => listener.listen(),
None => {
io_error::cond.raise(prev_io_error());
None
}
}
}
}
impl<T, A: Acceptor<T>> Acceptor<T> for Option<A> {
fn accept(&mut self) -> Option<T> {
match *self {
Some(ref mut listener) => listener.accept(),
Some(ref mut acceptor) => acceptor.accept(),
None => {
io_error::cond.raise(prev_io_error());
None

View File

@ -26,6 +26,7 @@ pub type EventLoopObject = uvio::UvEventLoop;
pub type RemoteCallbackObject = uvio::UvRemoteCallback;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
pub type RtioTimerObject = uvio::UvTimer;
@ -75,6 +76,10 @@ pub trait IoFactory {
}
pub trait RtioTcpListener : RtioSocket {
fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError>;
}
pub trait RtioTcpAcceptor : RtioSocket {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
fn accept_simultaneously(&mut self) -> Result<(), IoError>;
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;

View File

@ -26,6 +26,7 @@ use rt::local::Local;
use rt::rtio::*;
use rt::sched::{Scheduler, SchedHandle};
use rt::tube::Tube;
use rt::task::SchedHome;
use rt::uv::*;
use rt::uv::idle::IdleWatcher;
use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
@ -47,76 +48,80 @@ use task;
// XXX we should not be calling uvll functions in here.
trait HomingIO {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
/* XXX This will move pinned tasks to do IO on the proper scheduler
* and then move them back to their home.
*/
fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
use rt::sched::{PinnedTask, TaskFromFriend};
// go home
let old_home = Cell::new_empty();
let old_home_ptr = &old_home;
fn go_to_IO_home(&mut self) -> SchedHome {
use rt::sched::PinnedTask;
do task::unkillable { // FIXME(#8674)
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
// get the old home first
do task.wake().map_move |mut task| {
old_home_ptr.put_back(task.take_unwrap_home());
self.home().send(PinnedTask(task));
};
let mut old = None;
{
let ptr = &mut old;
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
/* FIXME(#8674) if the task was already killed then wake
* will return None. In that case, the home pointer will never be set.
*
* RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
*/
do task.wake().map_move |mut task| {
*ptr = Some(task.take_unwrap_home());
self.home().send(PinnedTask(task));
};
}
}
old.expect("No old home because task had already been killed.")
}
}
// do IO
let a = io(self);
// XXX dummy self param
fn restore_original_home(_dummy_self: Option<Self>, old: SchedHome) {
use rt::sched::TaskFromFriend;
// unhome home
let old = Cell::new(old);
do task::unkillable { // FIXME(#8674)
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |scheduler, task| {
/* FIXME(#8674) if the task was already killed then wake
* will return None. In that case, the home pointer will never be restored.
*
* RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
*/
do task.wake().map_move |mut task| {
task.give_home(old_home.take());
task.give_home(old.take());
scheduler.make_handle().send(TaskFromFriend(task));
};
}
}
}
// return the result of the IO
a
fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
let home = self.go_to_IO_home();
let a = io(self); // do IO
HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
a // return the result of the IO
}
fn home_for_io_consume<A>(self, io: &fn(Self) -> A) -> A {
let mut this = self;
let home = this.go_to_IO_home();
let a = io(this); // do IO
HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
a // return the result of the IO
}
fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
use rt::sched::{PinnedTask, TaskFromFriend};
do task::unkillable { // FIXME(#8674)
// go home
let old_home = Cell::new_empty();
let old_home_ptr = &old_home;
let home = self.go_to_IO_home();
let a = do task::unkillable { // FIXME(#8674)
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
// get the old home first
do task.wake().map_move |mut task| {
old_home_ptr.put_back(task.take_unwrap_home());
self.home().send(PinnedTask(task));
};
}
// do IO
let scheduler: ~Scheduler = Local::take();
let a = io_sched(self, scheduler);
// unhome home
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |scheduler, task| {
do task.wake().map_move |mut task| {
task.give_home(old_home.take());
scheduler.make_handle().send(TaskFromFriend(task));
};
}
// return the result of the IO
a
}
io_sched(self, scheduler) // do IO and scheduling action
};
HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
a // return result of IO
}
}
@ -594,9 +599,7 @@ impl IoFactory for UvIoFactory {
}
pub struct UvTcpListener {
watcher: TcpWatcher,
listening: bool,
incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>,
watcher : TcpWatcher,
home: SchedHandle,
}
@ -606,15 +609,8 @@ impl HomingIO for UvTcpListener {
impl UvTcpListener {
fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
UvTcpListener {
watcher: watcher,
listening: false,
incoming_streams: Tube::new(),
home: home,
}
UvTcpListener { watcher: watcher, home: home }
}
fn watcher(&self) -> TcpWatcher { self.watcher }
}
impl Drop for UvTcpListener {
@ -623,10 +619,10 @@ impl Drop for UvTcpListener {
let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
do self_.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher().as_stream().close {
let task = Cell::new(task);
do self_.watcher.as_stream().close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task.take());
}
}
}
@ -636,50 +632,71 @@ impl Drop for UvTcpListener {
impl RtioSocket for UvTcpListener {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(Tcp, self_.watcher)
socket_name(Tcp, self_.watcher)
}
}
}
impl RtioTcpListener for UvTcpListener {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
do self.home_for_io |self_| {
if !self_.listening {
self_.listening = true;
let incoming_streams_cell = Cell::new(self_.incoming_streams.clone());
do self_.watcher().listen |mut server, status| {
let stream = match status {
fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> {
do self.home_for_io_consume |self_| {
let mut acceptor = ~UvTcpAcceptor::new(self_);
let incoming = Cell::new(acceptor.incoming.clone());
do acceptor.listener.watcher.listen |mut server, status| {
do incoming.with_mut_ref |incoming| {
let inc = match status {
Some(_) => Err(standard_error(OtherIoError)),
None => {
let client = TcpWatcher::new(&server.event_loop());
// XXX: needs to be surfaced in interface
server.accept(client.as_stream());
let inc = TcpWatcher::new(&server.event_loop());
// first accept call in the callback guarenteed to succeed
server.accept(inc.as_stream());
let home = get_handle_to_current_scheduler!();
Ok(~UvTcpStream { watcher: client, home: home })
Ok(~UvTcpStream { watcher: inc, home: home })
}
};
let mut incoming_streams = incoming_streams_cell.take();
incoming_streams.send(stream);
incoming_streams_cell.put_back(incoming_streams);
incoming.send(inc);
}
}
self_.incoming_streams.recv()
};
Ok(acceptor)
}
}
}
pub struct UvTcpAcceptor {
listener: UvTcpListener,
incoming: Tube<Result<~RtioTcpStreamObject, IoError>>,
}
impl HomingIO for UvTcpAcceptor {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
}
impl UvTcpAcceptor {
fn new(listener: UvTcpListener) -> UvTcpAcceptor {
UvTcpAcceptor { listener: listener, incoming: Tube::new() }
}
}
impl RtioSocket for UvTcpAcceptor {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(Tcp, self_.listener.watcher)
}
}
}
impl RtioTcpAcceptor for UvTcpAcceptor {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
self.incoming.recv()
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int)
};
match status_to_maybe_uv_error(self_.watcher(), r) {
match status_to_maybe_uv_error(self_.listener.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -689,10 +706,10 @@ impl RtioTcpListener for UvTcpListener {
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int)
};
match status_to_maybe_uv_error(self_.watcher(), r) {
match status_to_maybe_uv_error(self_.listener.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -1435,8 +1452,9 @@ fn test_simple_tcp_server_and_client() {
do spawntask {
unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let listener = (*io).tcp_bind(addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert_eq!(nread, 8);
@ -1493,11 +1511,10 @@ fn test_simple_tcp_server_and_client_on_diff_threads() {
};
let server_fn: ~fn() = || {
let io: *mut IoFactoryObject = unsafe {
Local::unsafe_borrow()
};
let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
let mut stream = listener.accept().unwrap();
let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
let listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
let mut acceptor = listener.listen().unwrap();
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert_eq!(nread, 8);
@ -1578,8 +1595,9 @@ fn test_read_and_block() {
do spawntask {
let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
let mut stream = listener.accept().unwrap();
let listener = unsafe { (*io).tcp_bind(addr).unwrap() };
let mut acceptor = listener.listen().unwrap();
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
@ -1634,8 +1652,9 @@ fn test_read_read_read() {
do spawntask {
unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let listener = (*io).tcp_bind(addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let mut stream = acceptor.accept().unwrap();
let buf = [1, .. 2048];
let mut total_bytes_written = 0;
while total_bytes_written < MAX {