Homed UDP sockets

This commit is contained in:
Eric Reed 2013-08-07 04:05:06 -07:00
parent d7b6fcba29
commit d09412ab89
2 changed files with 213 additions and 24 deletions

View File

@ -22,7 +22,7 @@ pub type RemoteCallbackObject = uvio::UvRemoteCallback;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
pub type RtioUdpSocketObject = uvio::HomedUvUdpSocket; //uvio::UvUdpSocket;
pub type RtioTimerObject = uvio::UvTimer;
pub trait EventLoop {

View File

@ -239,27 +239,6 @@ impl UvIoFactory {
pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
match self { &UvIoFactory(ref mut ptr) => ptr }
}
pub fn homed_udp_bind(&mut self, addr: SocketAddr) -> Result<~HomedUvUdpSocket, IoError> {
let mut watcher = UdpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
Ok(_) => {
let home = do Local::borrow::<Scheduler, SchedHandle> |sched| {sched.make_handle()};
Ok(~HomedUvUdpSocket { watcher: watcher, home: home })
}
Err(uverr) => {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
}
}
}
}
impl IoFactory for UvIoFactory {
@ -331,6 +310,7 @@ impl IoFactory for UvIoFactory {
}
}
/*
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
let mut watcher = UdpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
@ -348,6 +328,28 @@ impl IoFactory for UvIoFactory {
}
}
}
*/
pub fn /*homed_*/udp_bind(&mut self, addr: SocketAddr) -> Result<~/*HomedUvUdpSocket*/RtioUdpSocketObject, IoError> {
let mut watcher = UdpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
Ok(_) => {
let home = do Local::borrow::<Scheduler, SchedHandle> |sched| {sched.make_handle()};
Ok(~HomedUvUdpSocket { watcher: watcher, home: home })
}
Err(uverr) => {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
}
}
}
fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
Ok(~UvTimer(TimerWatcher::new(self.uv_loop())))
@ -640,18 +642,205 @@ impl Drop for HomedUvUdpSocket {
impl RtioSocket for HomedUvUdpSocket {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
// first go home
self.go_home();
socket_name(Udp, self.watcher)
}
}
impl RtioUdpSocket for HomedUvUdpSocket {
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
// first go home
self.go_home();
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
rtdebug!("recvfrom: entered scheduler context");
let task_cell = Cell::new(task);
let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
let _ = flags; // /XXX add handling for partials?
watcher.recv_stop();
let result = match status {
None => {
assert!(nread >= 0);
Ok((nread as uint, addr))
}
Some(err) => Err(uv_error_to_io_error(err)),
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
// first go home
self.go_home();
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
do self.watcher.send(buf, dst) |_watcher, status| {
let result = match status {
None => Ok(()),
Some(err) => Err(uv_error_to_io_error(err)),
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
// first go home
self.go_home();
let r = unsafe {
do multi.to_str().as_c_str |m_addr| {
uvll::udp_set_membership(self.watcher.native_handle(), m_addr,
ptr::null(), uvll::UV_JOIN_GROUP)
}
};
match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
// first go home
self.go_home();
let r = unsafe {
do multi.to_str().as_c_str |m_addr| {
uvll::udp_set_membership(self.watcher.native_handle(), m_addr,
ptr::null(), uvll::UV_LEAVE_GROUP)
}
};
match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
// first go home
self.go_home();
let r = unsafe {
uvll::udp_set_multicast_loop(self.watcher.native_handle(), 1 as c_int)
};
match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
// first go home
self.go_home();
let r = unsafe {
uvll::udp_set_multicast_loop(self.watcher.native_handle(), 0 as c_int)
};
match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
// first go home
self.go_home();
let r = unsafe {
uvll::udp_set_multicast_ttl(self.watcher.native_handle(), ttl as c_int)
};
match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
// first go home
self.go_home();
let r = unsafe {
uvll::udp_set_ttl(self.watcher.native_handle(), ttl as c_int)
};
match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn hear_broadcasts(&mut self) -> Result<(), IoError> {
// first go home
self.go_home();
let r = unsafe {
uvll::udp_set_broadcast(self.watcher.native_handle(), 1 as c_int)
};
match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
// first go home
self.go_home();
let r = unsafe {
uvll::udp_set_broadcast(self.watcher.native_handle(), 0 as c_int)
};
match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
}
#[test]
fn test_simple_homed_udp_io_bind_only() {
do run_in_newsched_task {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let addr = next_test_ip4();
let maybe_socket = (*io).homed_udp_bind(addr);
let maybe_socket = (*io)./*homed_*/udp_bind(addr);
assert!(maybe_socket.is_ok());
}
}
@ -688,7 +877,7 @@ fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() {
let test_function: ~fn() = || {
let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
let addr = next_test_ip4();
let maybe_socket = unsafe { (*io).homed_udp_bind(addr) };
let maybe_socket = unsafe { (*io)./*homed_*/udp_bind(addr) };
// this socket is bound to this event loop
assert!(maybe_socket.is_ok());