From d09412ab893f54ef5309cf63d17bcb6110d582b9 Mon Sep 17 00:00:00 2001 From: Eric Reed Date: Wed, 7 Aug 2013 04:05:06 -0700 Subject: [PATCH] Homed UDP sockets --- src/libstd/rt/rtio.rs | 2 +- src/libstd/rt/uv/uvio.rs | 235 +++++++++++++++++++++++++++++++++++---- 2 files changed, 213 insertions(+), 24 deletions(-) diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index a7c794fb5f1..2bec782847b 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -22,7 +22,7 @@ pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; -pub type RtioUdpSocketObject = uvio::UvUdpSocket; +pub type RtioUdpSocketObject = uvio::HomedUvUdpSocket; //uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; pub trait EventLoop { diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 07ba44101c8..43be09434a4 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -239,27 +239,6 @@ impl UvIoFactory { pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop { match self { &UvIoFactory(ref mut ptr) => ptr } } - - pub fn homed_udp_bind(&mut self, addr: SocketAddr) -> Result<~HomedUvUdpSocket, IoError> { - let mut watcher = UdpWatcher::new(self.uv_loop()); - match watcher.bind(addr) { - Ok(_) => { - let home = do Local::borrow:: |sched| {sched.make_handle()}; - Ok(~HomedUvUdpSocket { watcher: watcher, home: home }) - } - Err(uverr) => { - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do watcher.close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - Err(uv_error_to_io_error(uverr)) - } - } - } } impl IoFactory for UvIoFactory { @@ -331,6 +310,7 @@ impl IoFactory for UvIoFactory { } } + /* fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> { let mut watcher = UdpWatcher::new(self.uv_loop()); match watcher.bind(addr) { @@ -348,6 +328,28 @@ impl IoFactory for UvIoFactory { } } } + */ + + pub fn /*homed_*/udp_bind(&mut self, addr: SocketAddr) -> Result<~/*HomedUvUdpSocket*/RtioUdpSocketObject, IoError> { + let mut watcher = UdpWatcher::new(self.uv_loop()); + match watcher.bind(addr) { + Ok(_) => { + let home = do Local::borrow:: |sched| {sched.make_handle()}; + Ok(~HomedUvUdpSocket { watcher: watcher, home: home }) + } + Err(uverr) => { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do watcher.close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + Err(uv_error_to_io_error(uverr)) + } + } + } fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> { Ok(~UvTimer(TimerWatcher::new(self.uv_loop()))) @@ -640,18 +642,205 @@ impl Drop for HomedUvUdpSocket { impl RtioSocket for HomedUvUdpSocket { fn socket_name(&mut self) -> Result { + // first go home self.go_home(); socket_name(Udp, self.watcher) } } +impl RtioUdpSocket for HomedUvUdpSocket { + fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> { + // first go home + self.go_home(); + + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + + let scheduler = Local::take::(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + rtdebug!("recvfrom: entered scheduler context"); + let task_cell = Cell::new(task); + let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; + do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { + let _ = flags; // /XXX add handling for partials? + + watcher.recv_stop(); + + let result = match status { + None => { + assert!(nread >= 0); + Ok((nread as uint, addr)) + } + Some(err) => Err(uv_error_to_io_error(err)), + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + + assert!(!result_cell.is_empty()); + return result_cell.take(); + } + + fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> { + // first go home + self.go_home(); + + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + let scheduler = Local::take::(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + do self.watcher.send(buf, dst) |_watcher, status| { + + let result = match status { + None => Ok(()), + Some(err) => Err(uv_error_to_io_error(err)), + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + + assert!(!result_cell.is_empty()); + return result_cell.take(); + } + + fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + do multi.to_str().as_c_str |m_addr| { + uvll::udp_set_membership(self.watcher.native_handle(), m_addr, + ptr::null(), uvll::UV_JOIN_GROUP) + } + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + do multi.to_str().as_c_str |m_addr| { + uvll::udp_set_membership(self.watcher.native_handle(), m_addr, + ptr::null(), uvll::UV_LEAVE_GROUP) + } + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn loop_multicast_locally(&mut self) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_multicast_loop(self.watcher.native_handle(), 1 as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_multicast_loop(self.watcher.native_handle(), 0 as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_multicast_ttl(self.watcher.native_handle(), ttl as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_ttl(self.watcher.native_handle(), ttl as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn hear_broadcasts(&mut self) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_broadcast(self.watcher.native_handle(), 1 as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn ignore_broadcasts(&mut self) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_broadcast(self.watcher.native_handle(), 0 as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } +} + #[test] fn test_simple_homed_udp_io_bind_only() { do run_in_newsched_task { unsafe { let io = Local::unsafe_borrow::(); let addr = next_test_ip4(); - let maybe_socket = (*io).homed_udp_bind(addr); + let maybe_socket = (*io)./*homed_*/udp_bind(addr); assert!(maybe_socket.is_ok()); } } @@ -688,7 +877,7 @@ fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() { let test_function: ~fn() = || { let io = unsafe { Local::unsafe_borrow::() }; let addr = next_test_ip4(); - let maybe_socket = unsafe { (*io).homed_udp_bind(addr) }; + let maybe_socket = unsafe { (*io)./*homed_*/udp_bind(addr) }; // this socket is bound to this event loop assert!(maybe_socket.is_ok());