Make the uv bindings resilient to linked failure

In the ideal world, uv I/O could be canceled safely at any time. In reality,
however, we are unable to do this. Right now linked failure is fairly flaky as
implemented in the runtime, making it very difficult to test whether the linked
failure mechanisms inside of the uv bindings are ready for this kind of
interaction.

Right now, all constructors will execute in a task::unkillable block, and all
homing I/O operations will prevent linked failure in the duration of the homing
operation. What this means is that tasks which perform I/O are still susceptible
to linked failure, but the I/O operations themselves will never get interrupted.
Instead, the linked failure will be received at the edge of the I/O operation.
This commit is contained in:
Alex Crichton 2013-11-07 15:13:06 -08:00
parent 5e6bbc6bfa
commit df4c0b8e43
12 changed files with 839 additions and 821 deletions

View File

@ -9,7 +9,6 @@
// except according to those terms.
use ai = std::rt::io::net::addrinfo;
use std::cast;
use std::libc::c_int;
use std::ptr::null;
use std::rt::BlockedTask;
@ -17,7 +16,7 @@ use std::rt::local::Local;
use std::rt::sched::Scheduler;
use net;
use super::{Loop, UvError, Request};
use super::{Loop, UvError, Request, wait_until_woken_after};
use uvll;
struct Addrinfo {
@ -76,7 +75,7 @@ impl GetAddrInfoRequest {
}
});
let hint_ptr = hint.as_ref().map_default(null(), |x| x as *uvll::addrinfo);
let req = Request::new(uvll::UV_GETADDRINFO);
let mut req = Request::new(uvll::UV_GETADDRINFO);
return match unsafe {
uvll::uv_getaddrinfo(loop_.handle, req.handle,
@ -84,12 +83,11 @@ impl GetAddrInfoRequest {
hint_ptr)
} {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0, addrinfo: None };
req.set_data(&cx);
req.defuse();
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
cx.slot = Some(task);
do wait_until_woken_after(&mut cx.slot) {
req.set_data(&cx);
}
match cx.status {
@ -105,8 +103,8 @@ impl GetAddrInfoRequest {
status: c_int,
res: *uvll::addrinfo) {
let req = Request::wrap(req);
if status == uvll::ECANCELED { return }
let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
cx.addrinfo = Some(Addrinfo { handle: res });
@ -191,25 +189,23 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] {
mod test {
use std::rt::io::net::ip::{SocketAddr, Ipv4Addr};
use super::*;
use super::super::run_uv_loop;
use super::super::local_loop;
#[test]
fn getaddrinfo_test() {
do run_uv_loop |l| {
match GetAddrInfoRequest::run(l, Some("localhost"), None, None) {
Ok(infos) => {
let mut found_local = false;
let local_addr = &SocketAddr {
ip: Ipv4Addr(127, 0, 0, 1),
port: 0
};
for addr in infos.iter() {
found_local = found_local || addr.address == *local_addr;
}
assert!(found_local);
match GetAddrInfoRequest::run(local_loop(), Some("localhost"), None, None) {
Ok(infos) => {
let mut found_local = false;
let local_addr = &SocketAddr {
ip: Ipv4Addr(127, 0, 0, 1),
port: 0
};
for addr in infos.iter() {
found_local = found_local || addr.address == *local_addr;
}
Err(e) => fail!("{:?}", e),
assert!(found_local);
}
Err(e) => fail!("{:?}", e),
}
}
}

View File

@ -131,11 +131,12 @@ mod test_remote {
use std::rt::tube::Tube;
use super::*;
use super::super::run_uv_loop;
use super::super::local_loop;
// Make sure that we can fire watchers in remote threads
// Make sure that we can fire watchers in remote threads and that they
// actually trigger what they say they will.
#[test]
fn test_uv_remote() {
fn smoke_test() {
struct MyCallback(Option<Tube<int>>);
impl Callback for MyCallback {
fn call(&mut self) {
@ -147,35 +148,15 @@ mod test_remote {
}
}
do run_uv_loop |l| {
let mut tube = Tube::new();
let cb = ~MyCallback(Some(tube.clone()));
let watcher = Cell::new(AsyncWatcher::new(l, cb as ~Callback));
let mut tube = Tube::new();
let cb = ~MyCallback(Some(tube.clone()));
let watcher = Cell::new(AsyncWatcher::new(local_loop(), cb as ~Callback));
let thread = do Thread::start {
watcher.take().fire();
};
let thread = do Thread::start {
watcher.take().fire();
};
assert_eq!(tube.recv(), 1);
thread.join();
}
}
#[test]
fn smoke_test() {
static mut hits: uint = 0;
struct MyCallback;
impl Callback for MyCallback {
fn call(&mut self) {
unsafe { hits += 1; }
}
}
do run_uv_loop |l| {
let mut watcher = AsyncWatcher::new(l, ~MyCallback as ~Callback);
watcher.fire();
}
assert!(unsafe { hits > 0 });
assert_eq!(tube.recv(), 1);
thread.join();
}
}

View File

@ -15,14 +15,14 @@ use std::cast;
use std::libc::{c_int, c_char, c_void, c_uint};
use std::libc;
use std::rt::BlockedTask;
use std::rt::io;
use std::rt::io::{FileStat, IoError};
use std::rt::rtio;
use std::rt::io;
use std::rt::local::Local;
use std::rt::rtio;
use std::rt::sched::{Scheduler, SchedHandle};
use std::vec;
use super::{Loop, UvError, uv_error_to_io_error};
use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after};
use uvio::HomingIO;
use uvll;
@ -305,10 +305,8 @@ fn execute(f: &fn(*uvll::uv_fs_t, uvll::uv_fs_cb) -> c_int)
0 => {
req.fired = true;
let mut slot = None;
unsafe { uvll::set_data_for_req(req.req, &slot) }
let sched: ~Scheduler = Local::take();
do sched.deschedule_running_task_and_then |_, task| {
slot = Some(task);
do wait_until_woken_after(&mut slot) {
unsafe { uvll::set_data_for_req(req.req, &slot) }
}
match req.get_result() {
n if n < 0 => Err(UvError(n)),
@ -454,123 +452,113 @@ mod test {
use std::str;
use std::vec;
use super::*;
use super::super::{run_uv_loop};
use l = super::super::local_loop;
#[test]
fn file_test_full_simple_sync() {
do run_uv_loop |l| {
let create_flags = O_RDWR | O_CREAT;
let read_flags = O_RDONLY;
let mode = S_IWUSR | S_IRUSR;
let path_str = "./tmp/file_full_simple_sync.txt";
let create_flags = O_RDWR | O_CREAT;
let read_flags = O_RDONLY;
let mode = S_IWUSR | S_IRUSR;
let path_str = "./tmp/file_full_simple_sync.txt";
{
// open/create
let result = FsRequest::open(l, &path_str.to_c_str(),
create_flags as int, mode as int);
assert!(result.is_ok());
let result = result.unwrap();
let fd = result.fd;
{
// open/create
let result = FsRequest::open(l(), &path_str.to_c_str(),
create_flags as int, mode as int);
assert!(result.is_ok());
let result = result.unwrap();
let fd = result.fd;
// write
let result = FsRequest::write(l, fd, "hello".as_bytes(), -1);
assert!(result.is_ok());
}
{
// re-open
let result = FsRequest::open(l, &path_str.to_c_str(),
read_flags as int, 0);
assert!(result.is_ok());
let result = result.unwrap();
let fd = result.fd;
// read
let mut read_mem = vec::from_elem(1000, 0u8);
let result = FsRequest::read(l, fd, read_mem, 0);
assert!(result.is_ok());
let nread = result.unwrap();
assert!(nread > 0);
let read_str = str::from_utf8(read_mem.slice(0, nread as uint));
assert_eq!(read_str, ~"hello");
}
// unlink
let result = FsRequest::unlink(l, &path_str.to_c_str());
// write
let result = FsRequest::write(l(), fd, "hello".as_bytes(), -1);
assert!(result.is_ok());
}
{
// re-open
let result = FsRequest::open(l(), &path_str.to_c_str(),
read_flags as int, 0);
assert!(result.is_ok());
let result = result.unwrap();
let fd = result.fd;
// read
let mut read_mem = vec::from_elem(1000, 0u8);
let result = FsRequest::read(l(), fd, read_mem, 0);
assert!(result.is_ok());
let nread = result.unwrap();
assert!(nread > 0);
let read_str = str::from_utf8(read_mem.slice(0, nread as uint));
assert_eq!(read_str, ~"hello");
}
// unlink
let result = FsRequest::unlink(l(), &path_str.to_c_str());
assert!(result.is_ok());
}
#[test]
fn file_test_stat() {
do run_uv_loop |l| {
let path = &"./tmp/file_test_stat_simple".to_c_str();
let create_flags = (O_RDWR | O_CREAT) as int;
let mode = (S_IWUSR | S_IRUSR) as int;
let path = &"./tmp/file_test_stat_simple".to_c_str();
let create_flags = (O_RDWR | O_CREAT) as int;
let mode = (S_IWUSR | S_IRUSR) as int;
let result = FsRequest::open(l, path, create_flags, mode);
assert!(result.is_ok());
let file = result.unwrap();
let result = FsRequest::open(l(), path, create_flags, mode);
assert!(result.is_ok());
let file = result.unwrap();
let result = FsRequest::write(l, file.fd, "hello".as_bytes(), 0);
assert!(result.is_ok());
let result = FsRequest::write(l(), file.fd, "hello".as_bytes(), 0);
assert!(result.is_ok());
let result = FsRequest::stat(l, path);
assert!(result.is_ok());
assert_eq!(result.unwrap().size, 5);
let result = FsRequest::stat(l(), path);
assert!(result.is_ok());
assert_eq!(result.unwrap().size, 5);
fn free<T>(_: T) {}
free(file);
fn free<T>(_: T) {}
free(file);
let result = FsRequest::unlink(l, path);
assert!(result.is_ok());
}
let result = FsRequest::unlink(l(), path);
assert!(result.is_ok());
}
#[test]
fn file_test_mk_rm_dir() {
do run_uv_loop |l| {
let path = &"./tmp/mk_rm_dir".to_c_str();
let mode = S_IWUSR | S_IRUSR;
let path = &"./tmp/mk_rm_dir".to_c_str();
let mode = S_IWUSR | S_IRUSR;
let result = FsRequest::mkdir(l, path, mode);
assert!(result.is_ok());
let result = FsRequest::mkdir(l(), path, mode);
assert!(result.is_ok());
let result = FsRequest::stat(l, path);
assert!(result.is_ok());
assert!(result.unwrap().kind == io::TypeDirectory);
let result = FsRequest::stat(l(), path);
assert!(result.is_ok());
assert!(result.unwrap().kind == io::TypeDirectory);
let result = FsRequest::rmdir(l, path);
assert!(result.is_ok());
let result = FsRequest::rmdir(l(), path);
assert!(result.is_ok());
let result = FsRequest::stat(l, path);
assert!(result.is_err());
}
let result = FsRequest::stat(l(), path);
assert!(result.is_err());
}
#[test]
fn file_test_mkdir_chokes_on_double_create() {
do run_uv_loop |l| {
let path = &"./tmp/double_create_dir".to_c_str();
let mode = S_IWUSR | S_IRUSR;
let path = &"./tmp/double_create_dir".to_c_str();
let mode = S_IWUSR | S_IRUSR;
let result = FsRequest::stat(l, path);
assert!(result.is_err(), "{:?}", result);
let result = FsRequest::mkdir(l, path, mode as c_int);
assert!(result.is_ok(), "{:?}", result);
let result = FsRequest::mkdir(l, path, mode as c_int);
assert!(result.is_err(), "{:?}", result);
let result = FsRequest::rmdir(l, path);
assert!(result.is_ok(), "{:?}", result);
}
let result = FsRequest::stat(l(), path);
assert!(result.is_err(), "{:?}", result);
let result = FsRequest::mkdir(l(), path, mode as c_int);
assert!(result.is_ok(), "{:?}", result);
let result = FsRequest::mkdir(l(), path, mode as c_int);
assert!(result.is_err(), "{:?}", result);
let result = FsRequest::rmdir(l(), path);
assert!(result.is_ok(), "{:?}", result);
}
#[test]
fn file_test_rmdir_chokes_on_nonexistant_path() {
do run_uv_loop |l| {
let path = &"./tmp/never_existed_dir".to_c_str();
let result = FsRequest::rmdir(l, path);
assert!(result.is_err());
}
let path = &"./tmp/never_existed_dir".to_c_str();
let result = FsRequest::rmdir(l(), path);
assert!(result.is_err());
}
}

View File

@ -83,7 +83,6 @@ impl UvHandle<uvll::uv_idle_t> for IdleWatcher {
}
extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
if status == uvll::ECANCELED { return }
assert_eq!(status, 0);
let idle: &mut IdleWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
idle.callback.call();
@ -101,7 +100,7 @@ mod test {
use super::*;
use std::rt::tube::Tube;
use std::rt::rtio::{Callback, PausibleIdleCallback};
use super::super::run_uv_loop;
use super::super::local_loop;
struct MyCallback(Tube<int>, int);
impl Callback for MyCallback {
@ -114,55 +113,47 @@ mod test {
#[test]
fn not_used() {
do run_uv_loop |l| {
let cb = ~MyCallback(Tube::new(), 1);
let _idle = IdleWatcher::new(l, cb as ~Callback);
}
let cb = ~MyCallback(Tube::new(), 1);
let _idle = IdleWatcher::new(local_loop(), cb as ~Callback);
}
#[test]
fn smoke_test() {
do run_uv_loop |l| {
let mut tube = Tube::new();
let cb = ~MyCallback(tube.clone(), 1);
let mut idle = IdleWatcher::new(l, cb as ~Callback);
idle.resume();
tube.recv();
}
let mut tube = Tube::new();
let cb = ~MyCallback(tube.clone(), 1);
let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback);
idle.resume();
tube.recv();
}
#[test]
fn fun_combinations_of_methods() {
do run_uv_loop |l| {
let mut tube = Tube::new();
let cb = ~MyCallback(tube.clone(), 1);
let mut idle = IdleWatcher::new(l, cb as ~Callback);
idle.resume();
tube.recv();
idle.pause();
idle.resume();
idle.resume();
tube.recv();
idle.pause();
idle.pause();
idle.resume();
tube.recv();
}
let mut tube = Tube::new();
let cb = ~MyCallback(tube.clone(), 1);
let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback);
idle.resume();
tube.recv();
idle.pause();
idle.resume();
idle.resume();
tube.recv();
idle.pause();
idle.pause();
idle.resume();
tube.recv();
}
#[test]
fn pause_pauses() {
do run_uv_loop |l| {
let mut tube = Tube::new();
let cb = ~MyCallback(tube.clone(), 1);
let mut idle1 = IdleWatcher::new(l, cb as ~Callback);
let cb = ~MyCallback(tube.clone(), 2);
let mut idle2 = IdleWatcher::new(l, cb as ~Callback);
idle2.resume();
assert_eq!(tube.recv(), 2);
idle2.pause();
idle1.resume();
assert_eq!(tube.recv(), 1);
}
let mut tube = Tube::new();
let cb = ~MyCallback(tube.clone(), 1);
let mut idle1 = IdleWatcher::new(local_loop(), cb as ~Callback);
let cb = ~MyCallback(tube.clone(), 2);
let mut idle2 = IdleWatcher::new(local_loop(), cb as ~Callback);
idle2.resume();
assert_eq!(tube.recv(), 2);
idle2.pause();
idle1.resume();
assert_eq!(tube.recv(), 1);
}
}

View File

@ -45,15 +45,19 @@ via `close` and `delete` methods.
#[feature(macro_rules, globs)];
use std::cast;
use std::str::raw::from_c_str;
use std::vec;
use std::ptr;
use std::str;
use std::libc::{c_void, c_int, malloc, free};
use std::cast::transmute;
use std::cast;
use std::libc::{c_int, malloc, free};
use std::ptr::null;
use std::ptr;
use std::rt::BlockedTask;
use std::rt::local::Local;
use std::rt::sched::Scheduler;
use std::str::raw::from_c_str;
use std::str;
use std::task;
use std::unstable::finally::Finally;
use std::vec;
use std::rt::io::IoError;
@ -124,27 +128,90 @@ pub trait UvHandle<T> {
uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb)
}
}
fn close(&mut self) {
let mut slot = None;
unsafe {
uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb);
uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>());
do wait_until_woken_after(&mut slot) {
uvll::set_data_for_uv_handle(self.uv_handle(), &slot);
}
}
extern fn close_cb(handle: *uvll::uv_handle_t) {
unsafe {
let data = uvll::get_data_for_uv_handle(handle);
uvll::free_handle(handle);
if data == ptr::null() { return }
let slot: &mut Option<BlockedTask> = cast::transmute(data);
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(slot.take_unwrap());
}
}
}
}
pub struct ForbidUnwind {
msg: &'static str,
failing_before: bool,
}
impl ForbidUnwind {
fn new(s: &'static str) -> ForbidUnwind {
ForbidUnwind {
msg: s, failing_before: task::failing(),
}
}
}
impl Drop for ForbidUnwind {
fn drop(&mut self) {
assert!(self.failing_before == task::failing(),
"failing sadface {}", self.msg);
}
}
fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: &fn()) {
let _f = ForbidUnwind::new("wait_until_woken_after");
unsafe {
assert!((*slot).is_none());
let sched: ~Scheduler = Local::take();
do sched.deschedule_running_task_and_then |_, task| {
f();
*slot = Some(task);
}
}
}
pub struct Request {
handle: *uvll::uv_req_t,
priv defused: bool,
}
impl Request {
pub fn new(ty: uvll::uv_req_type) -> Request {
Request::wrap(unsafe { uvll::malloc_req(ty) })
unsafe {
let handle = uvll::malloc_req(ty);
uvll::set_data_for_req(handle, null::<()>());
Request::wrap(handle)
}
}
pub fn wrap(handle: *uvll::uv_req_t) -> Request {
Request { handle: handle }
Request { handle: handle, defused: false }
}
pub fn set_data<T>(&self, t: *T) {
unsafe { uvll::set_data_for_req(self.handle, t) }
}
pub fn get_data(&self) -> *c_void {
unsafe { uvll::get_data_for_req(self.handle) }
pub unsafe fn get_data<T>(&self) -> &'static mut T {
let data = uvll::get_data_for_req(self.handle);
assert!(data != null());
cast::transmute(data)
}
// This function should be used when the request handle has been given to an
@ -155,17 +222,15 @@ impl Request {
// This is still a problem in blocking situations due to linked failure. In
// the connection callback the handle should be re-wrapped with the `wrap`
// function to ensure its destruction.
pub fn defuse(mut self) {
self.handle = ptr::null();
pub fn defuse(&mut self) {
self.defused = true;
}
}
impl Drop for Request {
fn drop(&mut self) {
unsafe {
if self.handle != ptr::null() {
uvll::free_req(self.handle)
}
if !self.defused {
unsafe { uvll::free_req(self.handle) }
}
}
}
@ -300,23 +365,18 @@ pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t }
}
fn run_uv_loop(f: proc(&mut Loop)) {
use std::rt::local::Local;
use std::rt::test::run_in_uv_task;
use std::rt::sched::Scheduler;
use std::cell::Cell;
let f = Cell::new(f);
do run_in_uv_task {
let mut io = None;
do Local::borrow |sched: &mut Scheduler| {
sched.event_loop.io(|i| unsafe {
#[cfg(test)]
fn local_loop() -> &'static mut Loop {
unsafe {
cast::transmute(do Local::borrow |sched: &mut Scheduler| {
let mut io = None;
do sched.event_loop.io |i| {
let (_vtable, uvio): (uint, &'static mut uvio::UvIoFactory) =
cast::transmute(i);
io = Some(uvio);
});
}
f.take()(io.unwrap().uv_loop());
}
io.unwrap()
}.uv_loop())
}
}

File diff suppressed because it is too large Load Diff

View File

@ -9,7 +9,6 @@
// except according to those terms.
use std::c_str::CString;
use std::cast;
use std::libc;
use std::rt::BlockedTask;
use std::rt::io::IoError;
@ -17,9 +16,11 @@ use std::rt::local::Local;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
use std::rt::sched::{Scheduler, SchedHandle};
use std::rt::tube::Tube;
use std::task;
use stream::StreamWatcher;
use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error};
use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
wait_until_woken_after};
use uvio::HomingIO;
use uvll;
@ -32,7 +33,6 @@ pub struct PipeWatcher {
pub struct PipeListener {
home: SchedHandle,
pipe: *uvll::uv_pipe_t,
priv closing_task: Option<BlockedTask>,
priv outgoing: Tube<Result<~RtioPipe, IoError>>,
}
@ -74,36 +74,35 @@ impl PipeWatcher {
pub fn connect(loop_: &Loop, name: &CString) -> Result<PipeWatcher, UvError>
{
struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
let mut cx = Ctx { task: None, result: 0 };
let req = Request::new(uvll::UV_CONNECT);
let pipe = PipeWatcher::new(loop_, false);
unsafe {
uvll::set_data_for_req(req.handle, &cx as *Ctx);
uvll::uv_pipe_connect(req.handle,
pipe.handle(),
name.with_ref(|p| p),
connect_cb)
}
req.defuse();
return do task::unkillable {
let mut cx = Ctx { task: None, result: 0 };
let mut req = Request::new(uvll::UV_CONNECT);
let pipe = PipeWatcher::new(loop_, false);
do wait_until_woken_after(&mut cx.task) {
unsafe {
uvll::uv_pipe_connect(req.handle,
pipe.handle(),
name.with_ref(|p| p),
connect_cb)
}
req.set_data(&cx);
req.defuse(); // uv callback now owns this request
}
match cx.result {
0 => Ok(pipe),
n => Err(UvError(n))
}
let sched: ~Scheduler = Local::take();
do sched.deschedule_running_task_and_then |_, task| {
cx.task = Some(task);
}
return match cx.result {
0 => Ok(pipe),
n => Err(UvError(n))
};
extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
let _req = Request::wrap(req);
if status == uvll::ECANCELED { return }
unsafe {
let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req));
cx.result = status;
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(cx.task.take_unwrap());
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {;
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.result = status;
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(cx.task.take_unwrap());
}
}
@ -133,11 +132,15 @@ impl HomingIO for PipeWatcher {
fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
}
impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
}
impl Drop for PipeWatcher {
fn drop(&mut self) {
if !self.defused {
let _m = self.fire_homing_missile();
self.stream.close();
self.close();
}
}
}
@ -150,21 +153,24 @@ extern fn pipe_close_cb(handle: *uvll::uv_handle_t) {
impl PipeListener {
pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
let pipe = PipeWatcher::new(loop_, false);
match unsafe { uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p)) } {
0 => {
// If successful, unwrap the PipeWatcher because we control how
// we close the pipe differently. We can't rely on
// StreamWatcher's default close method.
let p = ~PipeListener {
home: get_handle_to_current_scheduler!(),
pipe: pipe.unwrap(),
closing_task: None,
outgoing: Tube::new(),
};
Ok(p.install())
do task::unkillable {
let pipe = PipeWatcher::new(loop_, false);
match unsafe {
uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
} {
0 => {
// If successful, unwrap the PipeWatcher because we control how
// we close the pipe differently. We can't rely on
// StreamWatcher's default close method.
let p = ~PipeListener {
home: get_handle_to_current_scheduler!(),
pipe: pipe.unwrap(),
outgoing: Tube::new(),
};
Ok(p.install())
}
n => Err(UvError(n))
}
n => Err(UvError(n))
}
}
}
@ -196,6 +202,7 @@ impl UvHandle<uvll::uv_pipe_t> for PipeListener {
}
extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
assert!(status != uvll::ECANCELED);
let msg = match status {
0 => {
let loop_ = Loop::wrap(unsafe {
@ -205,7 +212,6 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
Ok(~client as ~RtioPipe)
}
uvll::ECANCELED => return,
n => Err(uv_error_to_io_error(UvError(n)))
};
@ -215,23 +221,11 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
impl Drop for PipeListener {
fn drop(&mut self) {
let (_m, sched) = self.fire_homing_missile_sched();
do sched.deschedule_running_task_and_then |_, task| {
self.closing_task = Some(task);
unsafe { uvll::uv_close(self.pipe, listener_close_cb) }
}
let _m = self.fire_homing_missile();
self.close();
}
}
extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&handle) };
unsafe { uvll::free_handle(handle) }
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(pipe.closing_task.take_unwrap());
}
// PipeAcceptor implementation and traits
impl RtioUnixAcceptor for PipeAcceptor {

View File

@ -19,7 +19,8 @@ use std::rt::rtio::RtioProcess;
use std::rt::sched::{Scheduler, SchedHandle};
use std::vec;
use super::{Loop, UvHandle, UvError, uv_error_to_io_error};
use super::{Loop, UvHandle, UvError, uv_error_to_io_error,
wait_until_woken_after};
use uvio::HomingIO;
use uvll;
use pipe::PipeWatcher;
@ -222,11 +223,7 @@ impl RtioProcess for Process {
// If there's no exit code previously listed, then the
// process's exit callback has yet to be invoked. We just
// need to deschedule ourselves and wait to be reawoken.
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
assert!(self.to_wake.is_none());
self.to_wake = Some(task);
}
wait_until_woken_after(&mut self.to_wake, || {});
assert!(self.exit_status.is_some());
}
}

View File

@ -9,12 +9,14 @@
// except according to those terms.
use std::cast;
use std::libc::{c_int, size_t, ssize_t, c_void};
use std::libc::{c_int, size_t, ssize_t};
use std::ptr;
use std::rt::BlockedTask;
use std::rt::local::Local;
use std::rt::sched::Scheduler;
use super::{UvError, Buf, slice_to_uv_buf, Request};
use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
ForbidUnwind};
use uvll;
// This is a helper structure which is intended to get embedded into other
@ -63,6 +65,10 @@ impl StreamWatcher {
}
pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
// This read operation needs to get canceled on an unwind via libuv's
// uv_read_stop function
let _f = ForbidUnwind::new("stream read");
// Send off the read request, but don't block until we're sure that the
// read request is queued.
match unsafe {
@ -74,12 +80,10 @@ impl StreamWatcher {
result: 0,
task: None,
};
unsafe {
uvll::set_data_for_uv_handle(self.handle, &rcx)
}
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_sched, task| {
rcx.task = Some(task);
do wait_until_woken_after(&mut rcx.task) {
unsafe {
uvll::set_data_for_uv_handle(self.handle, &rcx)
}
}
match rcx.result {
n if n < 0 => Err(UvError(n as c_int)),
@ -91,12 +95,17 @@ impl StreamWatcher {
}
pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
// The ownership of the write request is dubious if this function
// unwinds. I believe that if the write_cb fails to re-schedule the task
// then the write request will be leaked.
let _f = ForbidUnwind::new("stream write");
// Prepare the write request, either using a cached one or allocating a
// new one
if self.last_write_req.is_none() {
self.last_write_req = Some(Request::new(uvll::UV_WRITE));
}
let req = self.last_write_req.get_ref();
let mut req = match self.last_write_req.take() {
Some(req) => req, None => Request::new(uvll::UV_WRITE),
};
req.set_data(ptr::null::<()>());
// Send off the request, but be careful to not block until we're sure
// that the write reqeust is queued. If the reqeust couldn't be queued,
@ -107,11 +116,12 @@ impl StreamWatcher {
} {
0 => {
let mut wcx = WriteContext { result: 0, task: None, };
req.set_data(&wcx);
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_sched, task| {
wcx.task = Some(task);
req.defuse(); // uv callback now owns this request
do wait_until_woken_after(&mut wcx.task) {
req.set_data(&wcx);
}
self.last_write_req = Some(Request::wrap(req.handle));
match wcx.result {
0 => Ok(()),
n => Err(UvError(n)),
@ -120,50 +130,24 @@ impl StreamWatcher {
n => Err(UvError(n)),
}
}
// This will deallocate an internally used memory, along with closing the
// handle (and freeing it).
pub fn close(&mut self) {
let mut closing_task = None;
unsafe {
uvll::set_data_for_uv_handle(self.handle, &closing_task);
}
// Wait for this stream to close because it possibly represents a remote
// connection which may have consequences if we close asynchronously.
let sched: ~Scheduler = Local::take();
do sched.deschedule_running_task_and_then |_, task| {
closing_task = Some(task);
unsafe { uvll::uv_close(self.handle, close_cb) }
}
extern fn close_cb(handle: *uvll::uv_handle_t) {
let data: *c_void = unsafe { uvll::get_data_for_uv_handle(handle) };
unsafe { uvll::free_handle(handle) }
let closing_task: &mut Option<BlockedTask> = unsafe {
cast::transmute(data)
};
let task = closing_task.take_unwrap();
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task);
}
}
}
// This allocation callback expects to be invoked once and only once. It will
// unwrap the buffer in the ReadContext stored in the stream and return it. This
// will fail if it is called more than once.
extern fn alloc_cb(stream: *uvll::uv_stream_t, _hint: size_t) -> Buf {
uvdebug!("alloc_cb");
let rcx: &mut ReadContext = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(stream))
};
rcx.buf.take().expect("alloc_cb called more than once")
rcx.buf.take().expect("stream alloc_cb called more than once")
}
// When a stream has read some data, we will always forcibly stop reading and
// return all the data read (even if it didn't fill the whole buffer).
extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) {
uvdebug!("read_cb {}", nread);
assert!(nread != uvll::ECANCELED as ssize_t);
let rcx: &mut ReadContext = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
@ -182,11 +166,11 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) {
// reading, however, all this does is wake up the blocked task after squirreling
// away the error code as a result.
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
if status == uvll::ECANCELED { return }
let mut req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
// Remember to not free the request because it is re-used between writes on
// the same stream.
let req = Request::wrap(req);
let wcx: &mut WriteContext = unsafe { cast::transmute(req.get_data()) };
let wcx: &mut WriteContext = unsafe { req.get_data() };
wcx.result = status;
req.defuse();

View File

@ -16,7 +16,7 @@ use std::rt::rtio::RtioTimer;
use std::rt::sched::{Scheduler, SchedHandle};
use uvll;
use super::{Loop, UvHandle};
use super::{Loop, UvHandle, ForbidUnwind};
use uvio::HomingIO;
pub struct TimerWatcher {
@ -67,6 +67,11 @@ impl UvHandle<uvll::uv_timer_t> for TimerWatcher {
impl RtioTimer for TimerWatcher {
fn sleep(&mut self, msecs: u64) {
let (_m, sched) = self.fire_homing_missile_sched();
// If the descheduling operation unwinds after the timer has been
// started, then we need to call stop on the timer.
let _f = ForbidUnwind::new("timer");
do sched.deschedule_running_task_and_then |_sched, task| {
self.action = Some(WakeTask(task));
self.start(msecs, 0);
@ -124,51 +129,43 @@ impl Drop for TimerWatcher {
mod test {
use super::*;
use std::rt::rtio::RtioTimer;
use super::super::run_uv_loop;
use super::super::local_loop;
#[test]
fn oneshot() {
do run_uv_loop |l| {
let mut timer = TimerWatcher::new(l);
let port = timer.oneshot(1);
port.recv();
let port = timer.oneshot(1);
port.recv();
}
let mut timer = TimerWatcher::new(local_loop());
let port = timer.oneshot(1);
port.recv();
let port = timer.oneshot(1);
port.recv();
}
#[test]
fn override() {
do run_uv_loop |l| {
let mut timer = TimerWatcher::new(l);
let oport = timer.oneshot(1);
let pport = timer.period(1);
timer.sleep(1);
assert_eq!(oport.try_recv(), None);
assert_eq!(pport.try_recv(), None);
timer.oneshot(1).recv();
}
let mut timer = TimerWatcher::new(local_loop());
let oport = timer.oneshot(1);
let pport = timer.period(1);
timer.sleep(1);
assert_eq!(oport.try_recv(), None);
assert_eq!(pport.try_recv(), None);
timer.oneshot(1).recv();
}
#[test]
fn period() {
do run_uv_loop |l| {
let mut timer = TimerWatcher::new(l);
let port = timer.period(1);
port.recv();
port.recv();
let port = timer.period(1);
port.recv();
port.recv();
}
let mut timer = TimerWatcher::new(local_loop());
let port = timer.period(1);
port.recv();
port.recv();
let port = timer.period(1);
port.recv();
port.recv();
}
#[test]
fn sleep() {
do run_uv_loop |l| {
let mut timer = TimerWatcher::new(l);
timer.sleep(1);
timer.sleep(1);
}
let mut timer = TimerWatcher::new(local_loop());
timer.sleep(1);
timer.sleep(1);
}
}

View File

@ -103,6 +103,6 @@ impl HomingIO for TtyWatcher {
impl Drop for TtyWatcher {
fn drop(&mut self) {
let _m = self.fire_homing_missile();
self.stream.close();
self.close();
}
}

View File

@ -9,7 +9,7 @@
// except according to those terms.
use std::c_str::CString;
use std::comm::{SharedChan, GenericChan};
use std::comm::SharedChan;
use std::libc::c_int;
use std::libc;
use std::path::Path;
@ -26,7 +26,7 @@ use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
use std::rt::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
ReadWrite, FileStat};
use std::rt::io::signal::Signum;
use std::task;
use std::util;
use ai = std::rt::io::net::addrinfo;
#[cfg(test)] use std::unstable::run_in_bare_thread;
@ -44,6 +44,13 @@ pub trait HomingIO {
fn go_to_IO_home(&mut self) -> uint {
use std::rt::sched::RunOnce;
unsafe {
let task: *mut Task = Local::unsafe_borrow();
(*task).death.inhibit_kill((*task).unwinder.unwinding);
}
let _f = ForbidUnwind::new("going home");
let current_sched_id = do Local::borrow |sched: &mut Scheduler| {
sched.sched_id()
};
@ -51,22 +58,17 @@ pub trait HomingIO {
// Only need to invoke a context switch if we're not on the right
// scheduler.
if current_sched_id != self.home().sched_id {
do task::unkillable { // FIXME(#8674)
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
/* FIXME(#8674) if the task was already killed then wake
* will return None. In that case, the home pointer will
* never be set.
*
* RESOLUTION IDEA: Since the task is dead, we should
* just abort the IO action.
*/
do task.wake().map |task| {
self.home().send(RunOnce(task));
};
}
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
do task.wake().map |task| {
self.home().send(RunOnce(task));
};
}
}
let current_sched_id = do Local::borrow |sched: &mut Scheduler| {
sched.sched_id()
};
assert!(current_sched_id == self.home().sched_id);
self.home().sched_id
}
@ -98,25 +100,38 @@ struct HomingMissile {
priv io_home: uint,
}
impl HomingMissile {
pub fn check(&self, msg: &'static str) {
let local_id = Local::borrow(|sched: &mut Scheduler| sched.sched_id());
assert!(local_id == self.io_home, "{}", msg);
}
}
impl Drop for HomingMissile {
fn drop(&mut self) {
let f = ForbidUnwind::new("leaving home");
// It would truly be a sad day if we had moved off the home I/O
// scheduler while we were doing I/O.
assert_eq!(Local::borrow(|sched: &mut Scheduler| sched.sched_id()),
self.io_home);
self.check("task moved away from the home scheduler");
// If we were a homed task, then we must send ourselves back to the
// original scheduler. Otherwise, we can just return and keep running
if !Task::on_appropriate_sched() {
do task::unkillable { // FIXME(#8674)
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
do task.wake().map |task| {
Scheduler::run_task(task);
};
}
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
do task.wake().map |task| {
Scheduler::run_task(task);
};
}
}
util::ignore(f);
unsafe {
let task: *mut Task = Local::unsafe_borrow();
(*task).death.allow_kill((*task).unwinder.unwinding);
}
}
}