From 66365b63782e7b6950e6a213397cc75eef75a2c7 Mon Sep 17 00:00:00 2001 From: Eric Reed Date: Tue, 20 Aug 2013 13:20:50 -0700 Subject: [PATCH] Added home_for_io_with_sched variant. Temporarily making IO unkillable. --- src/libstd/rt/uv/uvio.rs | 289 ++++++++++++++++++++++----------------- 1 file changed, 165 insertions(+), 124 deletions(-) diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index f794c0a2bec..6e79a78e061 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -35,6 +35,7 @@ use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR, S_IWUSR}; use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create, CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite}; +use task; #[cfg(test)] use container::Container; #[cfg(test)] use unstable::run_in_bare_thread; @@ -55,30 +56,68 @@ trait HomingIO { // go home let old_home = Cell::new_empty(); let old_home_ptr = &old_home; - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - // get the old home first - do task.wake().map_move |mut task| { - old_home_ptr.put_back(task.take_unwrap_home()); - self.home().send(PinnedTask(task)); - }; + do task::unkillable { // FIXME(#8674) + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + // get the old home first + do task.wake().map_move |mut task| { + old_home_ptr.put_back(task.take_unwrap_home()); + self.home().send(PinnedTask(task)); + }; + } } // do IO let a = io(self); // unhome home - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |scheduler, task| { - do task.wake().map_move |mut task| { - task.give_home(old_home.take()); - scheduler.make_handle().send(TaskFromFriend(task)); - }; + do task::unkillable { // FIXME(#8674) + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |scheduler, task| { + do task.wake().map_move |mut task| { + task.give_home(old_home.take()); + scheduler.make_handle().send(TaskFromFriend(task)); + }; + } } // return the result of the IO a } + + fn home_for_io_with_sched(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A { + use rt::sched::{PinnedTask, TaskFromFriend}; + + do task::unkillable { // FIXME(#8674) + // go home + let old_home = Cell::new_empty(); + let old_home_ptr = &old_home; + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + // get the old home first + do task.wake().map_move |mut task| { + old_home_ptr.put_back(task.take_unwrap_home()); + self.home().send(PinnedTask(task)); + }; + } + + // do IO + let scheduler = Local::take::(); + let a = io_sched(self, scheduler); + + // unhome home + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |scheduler, task| { + do task.wake().map_move |mut task| { + task.give_home(old_home.take()); + scheduler.make_handle().send(TaskFromFriend(task)); + }; + } + + // return the result of the IO + a + } + } } // get a handle for the current scheduler @@ -376,35 +415,37 @@ impl IoFactory for UvIoFactory { let result_cell_ptr: *Cell> = &result_cell; // Block this task and take ownership, switch to scheduler context - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { + do task::unkillable { // FIXME(#8674) + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { - let mut tcp = TcpWatcher::new(self.uv_loop()); - let task_cell = Cell::new(task); + let mut tcp = TcpWatcher::new(self.uv_loop()); + let task_cell = Cell::new(task); - // Wait for a connection - do tcp.connect(addr) |stream, status| { - match status { - None => { - let tcp = NativeHandle::from_native_handle(stream.native_handle()); - let home = get_handle_to_current_scheduler!(); - let res = Ok(~UvTcpStream { watcher: tcp, home: home }); + // Wait for a connection + do tcp.connect(addr) |stream, status| { + match status { + None => { + let tcp = NativeHandle::from_native_handle(stream.native_handle()); + let home = get_handle_to_current_scheduler!(); + let res = Ok(~UvTcpStream { watcher: tcp, home: home }); - // Store the stream in the task's stack - unsafe { (*result_cell_ptr).put_back(res); } - - // Context switch - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - Some(_) => { - let task_cell = Cell::new(task_cell.take()); - do stream.close { - let res = Err(uv_error_to_io_error(status.unwrap())); + // Store the stream in the task's stack unsafe { (*result_cell_ptr).put_back(res); } + + // Context switch let scheduler = Local::take::(); scheduler.resume_blocked_task_immediately(task_cell.take()); } + Some(_) => { + let task_cell = Cell::new(task_cell.take()); + do stream.close { + let res = Err(uv_error_to_io_error(status.unwrap())); + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } } } } @@ -422,15 +463,17 @@ impl IoFactory for UvIoFactory { Ok(~UvTcpListener::new(watcher, home)) } Err(uverr) => { - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do watcher.as_stream().close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + do task::unkillable { // FIXME(#8674) + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do watcher.as_stream().close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } + Err(uv_error_to_io_error(uverr)) } - Err(uv_error_to_io_error(uverr)) } } } @@ -443,15 +486,17 @@ impl IoFactory for UvIoFactory { Ok(~UvUdpSocket { watcher: watcher, home: home }) } Err(uverr) => { - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do watcher.close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + do task::unkillable { // FIXME(#8674) + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do watcher.close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } + Err(uv_error_to_io_error(uverr)) } - Err(uv_error_to_io_error(uverr)) } } } @@ -493,30 +538,32 @@ impl IoFactory for UvIoFactory { let result_cell_ptr: *Cell> = &result_cell; let path_cell = Cell::new(path); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let path = path_cell.take(); - do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int) - |req,err| { - if err.is_none() { - let loop_ = Loop {handle: req.get_loop().native_handle()}; - let home = get_handle_to_current_scheduler!(); - let fd = file::FileDescriptor(req.get_result()); - let fs = ~UvFileStream::new( - loop_, fd, true, home) as ~RtioFileStream; - let res = Ok(fs); - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } else { - let res = Err(uv_error_to_io_error(err.unwrap())); - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } + do task::unkillable { // FIXME(#8674) + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let path = path_cell.take(); + do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int) + |req,err| { + if err.is_none() { + let loop_ = Loop {handle: req.get_loop().native_handle()}; + let home = get_handle_to_current_scheduler!(); + let fd = file::FileDescriptor(req.get_result()); + let fs = ~UvFileStream::new( + loop_, fd, true, home) as ~RtioFileStream; + let res = Ok(fs); + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } else { + let res = Err(uv_error_to_io_error(err.unwrap())); + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + }; }; - }; + } assert!(!result_cell.is_empty()); return result_cell.take(); } @@ -525,20 +572,22 @@ impl IoFactory for UvIoFactory { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let path_cell = Cell::new(path); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let path = path_cell.take(); - do file::FsRequest::unlink(self.uv_loop(), path) |_, err| { - let res = match err { - None => Ok(()), - Some(err) => Err(uv_error_to_io_error(err)) + do task::unkillable { // FIXME(#8674) + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let path = path_cell.take(); + do file::FsRequest::unlink(self.uv_loop(), path) |_, err| { + let res = match err { + None => Ok(()), + Some(err) => Err(uv_error_to_io_error(err)) + }; + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); }; - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); }; - }; + } assert!(!result_cell.is_empty()); return result_cell.take(); } @@ -572,8 +621,7 @@ impl Drop for UvTcpListener { fn drop(&self) { // XXX need mutable finalizer let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) }; - do self_.home_for_io |self_| { - let scheduler = Local::take::(); + do self_.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); do self_.watcher().as_stream().close { @@ -665,8 +713,7 @@ impl Drop for UvTcpStream { fn drop(&self) { // XXX need mutable finalizer let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) }; - do this.home_for_io |self_| { - let scheduler = Local::take::(); + do this.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); do self_.watcher.as_stream().close { @@ -688,11 +735,10 @@ impl RtioSocket for UvTcpStream { impl RtioTcpStream for UvTcpStream { fn read(&mut self, buf: &mut [u8]) -> Result { - do self.home_for_io |self_| { + do self.home_for_io_with_sched |self_, scheduler| { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); let buf_ptr: *&mut [u8] = &buf; do scheduler.deschedule_running_task_and_then |_sched, task| { let task_cell = Cell::new(task); @@ -730,10 +776,9 @@ impl RtioTcpStream for UvTcpStream { } fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - do self.home_for_io |self_| { + do self.home_for_io_with_sched |self_, scheduler| { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -827,11 +872,10 @@ impl Drop for UvUdpSocket { fn drop(&self) { // XXX need mutable finalizer let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) }; - do this.home_for_io |_| { - let scheduler = Local::take::(); + do this.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); - do this.watcher.close { + do self_.watcher.close { let scheduler = Local::take::(); scheduler.resume_blocked_task_immediately(task_cell.take()); } @@ -850,11 +894,10 @@ impl RtioSocket for UvUdpSocket { impl RtioUdpSocket for UvUdpSocket { fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> { - do self.home_for_io |self_| { + do self.home_for_io_with_sched |self_, scheduler| { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); let buf_ptr: *&mut [u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -885,10 +928,9 @@ impl RtioUdpSocket for UvUdpSocket { } fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> { - do self.home_for_io |self_| { + do self.home_for_io_with_sched |self_, scheduler| { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -1047,9 +1089,8 @@ impl UvTimer { impl Drop for UvTimer { fn drop(&self) { let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) }; - do self_.home_for_io |self_| { + do self_.home_for_io_with_sched |self_, scheduler| { rtdebug!("closing UvTimer"); - let scheduler = Local::take::(); do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); do self_.watcher.close { @@ -1063,8 +1104,7 @@ impl Drop for UvTimer { impl RtioTimer for UvTimer { fn sleep(&mut self, msecs: u64) { - do self.home_for_io |self_| { - let scheduler = Local::take::(); + do self.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("sleep: entered scheduler context"); let task_cell = Cell::new(task); @@ -1104,8 +1144,7 @@ impl UvFileStream { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let buf_ptr: *&mut [u8] = &buf; - do self.home_for_io |self_| { - let scheduler = Local::take::(); + do self.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_, task| { let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; let task_cell = Cell::new(task); @@ -1126,8 +1165,7 @@ impl UvFileStream { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let buf_ptr: *&[u8] = &buf; - do self.home_for_io |self_| { - let scheduler = Local::take::(); + do self.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_, task| { let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; let task_cell = Cell::new(task); @@ -1166,8 +1204,7 @@ impl Drop for UvFileStream { fn drop(&self) { let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) }; if self.close_on_drop { - do self_.home_for_io |self_| { - let scheduler = Local::take::(); + do self_.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); do self_.fd.close(&self.loop_) |_,_| { @@ -1273,14 +1310,16 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { assert!(maybe_socket.is_ok()); // block self on sched1 - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - // unblock task - do task.wake().map_move |task| { - // send self to sched2 - tasksFriendHandle.take().send(TaskFromFriend(task)); - }; - // sched1 should now sleep since it has nothing else to do + do task::unkillable { // FIXME(#8674) + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + // unblock task + do task.wake().map_move |task| { + // send self to sched2 + tasksFriendHandle.take().send(TaskFromFriend(task)); + }; + // sched1 should now sleep since it has nothing else to do + } } // sched2 will wake up and get the task // as we do nothing else, the function ends and the socket goes out of scope @@ -1548,13 +1587,15 @@ fn test_read_and_block() { } reads += 1; - let scheduler = Local::take::(); - // Yield to the other task in hopes that it - // will trigger a read callback while we are - // not ready for it - do scheduler.deschedule_running_task_and_then |sched, task| { - let task = Cell::new(task); - sched.enqueue_blocked_task(task.take()); + do task::unkillable { // FIXME(#8674) + let scheduler = Local::take::(); + // Yield to the other task in hopes that it + // will trigger a read callback while we are + // not ready for it + do scheduler.deschedule_running_task_and_then |sched, task| { + let task = Cell::new(task); + sched.enqueue_blocked_task(task.take()); + } } }