auto merge of #8645 : alexcrichton/rust/issue-6436-run-non-blocking, r=brson

This overhauls `std::run` to instead run on top of libuv. This is *not* in a mergeable state, I've been attempting to diagnose failures in the compiletest suite. I've managed to find a fair number of bugs so far, but I still  don't seem to be done yet.

Notable changes:
* This requires upgrading libuv. From the discussion on #6567, I took libuv master from a few days ago, applied one patch to fix process spawning with multiple event loops in libuv, and pushed to my own fork
* The build system for libuv has changed since we last used it. There's some extra checkout from a google build system which apparently does all the magic if you don't want to require autotools, and the google system just requires python. I updated the Makefile to get this build system and build libuv with it instead. This is untested on windows and arm, and both will probably need to see some improvement.
* This required adding some pipe bindings to libuv as well. Currently the support is pretty simple and probably completely unsafe for pipes, but you at least get read/write methods. This is necessary for capturing output of processes.
* I didn't redesign `std::run` at all, I simply tried to reimplement all the existing functionality on top of libuv. Some functions ended up dying, but nothing major. All uses of `std::run` in the compiler still work just fine.

I'm not quite sure how the rest of the runtime deals with this, but I marked process structures as `no_send` because the waiting/waking up has to happen in the same event loop right now. If processes start migrating between event loops then very bad things can happen. This may be what threadsafe I/O would fix, and I would be more than willing to rebase on that if it lands first.

Anyway, for now I wanted to put this up for review, I'm still investigating the corruption/deadlock bugs, but this is in an *almost* workable state. Once I find the bugs I'll also rebase on the current master.
This commit is contained in:
bors 2013-08-27 21:55:42 -07:00
commit b8d1fa3994
27 changed files with 1153 additions and 1128 deletions

2
.gitmodules vendored
View File

@ -4,5 +4,5 @@
branch = master
[submodule "src/libuv"]
path = src/libuv
url = https://github.com/brson/libuv.git
url = https://github.com/alexcrichton/libuv.git
branch = master

View File

@ -163,34 +163,49 @@ LIBUV_DEPS := $$(wildcard \
$$(S)src/libuv/*/*/*/*)
endif
LIBUV_GYP := $$(S)src/libuv/build/gyp
LIBUV_MAKEFILE_$(1)_$(2) := $$(CFG_BUILD_DIR)rt/$(1)/stage$(2)/libuv/Makefile
LIBUV_NO_LOAD = run-benchmarks.target.mk run-tests.target.mk \
uv_dtrace_header.target.mk uv_dtrace_provider.target.mk
$$(LIBUV_MAKEFILE_$(1)_$(2)): $$(LIBUV_GYP)
(cd $(S)src/libuv/ && \
./gyp_uv -f make -Dtarget_arch=$$(HOST_$(1)) -D ninja \
-Goutput_dir=$$(@D) --generator-output $$(@D))
# XXX: Shouldn't need platform-specific conditions here
ifdef CFG_WINDOWSY_$(1)
$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS)
$$(Q)$$(MAKE) -C $$(S)src/libuv/ \
builddir_name="$$(CFG_BUILD_DIR)/rt/$(1)/stage$(2)/libuv" \
OS=mingw \
$$(Q)rm -f $$(S)src/libuv/libuv.a
$$(Q)$$(MAKE) -C $$(S)src/libuv -f Makefile.mingw \
CFLAGS="$$(CFG_GCCISH_CFLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES)" \
AR="$$(AR_$(1))" \
V=$$(VERBOSE)
$$(Q)cp $$(S)src/libuv/libuv.a $$@
else ifeq ($(OSTYPE_$(1)), linux-androideabi)
$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS)
$$(Q)$$(MAKE) -C $$(S)src/libuv/ \
$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) $$(LIBUV_MAKEFILE_$(1)_$(2))
$$(Q)$$(MAKE) -C $$(@D) \
CFLAGS="$$(CFG_GCCISH_CFLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES)" \
LDFLAGS="$$(CFG_GCCISH_LINK_FLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1)))" \
CC="$$(CC_$(1))" \
CXX="$$(CXX_$(1))" \
AR="$$(AR_$(1))" \
BUILDTYPE=Release \
builddir_name="$$(CFG_BUILD_DIR)/rt/$(1)/stage$(2)/libuv" \
host=android OS=linux \
builddir="." \
BUILDTYPE=Release \
NO_LOAD="$$(LIBUV_NO_LOAD)" \
V=$$(VERBOSE)
else
$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS)
$$(Q)$$(MAKE) -C $$(S)src/libuv/ \
$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) $$(LIBUV_MAKEFILE_$(1)_$(2))
$$(Q)$$(MAKE) -C $$(@D) \
CFLAGS="$$(CFG_GCCISH_CFLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES)" \
LDFLAGS="$$(CFG_GCCISH_LINK_FLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1)))" \
CC="$$(CC_$(1))" \
CXX="$$(CXX_$(1))" \
AR="$$(AR_$(1))" \
builddir_name="$$(CFG_BUILD_DIR)/rt/$(1)/stage$(2)/libuv" \
builddir="." \
BUILDTYPE=Release \
NO_LOAD="$$(LIBUV_NO_LOAD)" \
V=$$(VERBOSE)
endif
@ -254,3 +269,7 @@ endef
$(foreach stage,$(STAGES), \
$(foreach target,$(CFG_TARGET_TRIPLES), \
$(eval $(call DEF_RUNTIME_TARGETS,$(target),$(stage)))))
$(LIBUV_GYP):
mkdir -p $(S)src/libuv/build
git clone https://git.chromium.org/external/gyp.git $(S)src/libuv/build/gyp

View File

@ -54,10 +54,10 @@ pub fn run(lib_path: &str,
in_fd: None,
out_fd: None,
err_fd: None
});
}).unwrap();
for input in input.iter() {
proc.input().write_str(*input);
proc.input().write(input.as_bytes());
}
let output = proc.finish_with_output();

View File

@ -20,41 +20,16 @@ use procsrv;
use util;
use util::logv;
use std::cell::Cell;
use std::io;
use std::os;
use std::str;
use std::task::{spawn_sched, SingleThreaded};
use std::vec;
use std::unstable::running_on_valgrind;
use extra::test::MetricMap;
pub fn run(config: config, testfile: ~str) {
let config = Cell::new(config);
let testfile = Cell::new(testfile);
// FIXME #6436: Creating another thread to run the test because this
// is going to call waitpid. The new scheduler has some strange
// interaction between the blocking tasks and 'friend' schedulers
// that destroys parallelism if we let normal schedulers block.
// It should be possible to remove this spawn once std::run is
// rewritten to be non-blocking.
//
// We do _not_ create another thread if we're running on V because
// it serializes all threads anyways.
if running_on_valgrind() {
let config = config.take();
let testfile = testfile.take();
let mut _mm = MetricMap::new();
run_metrics(config, testfile, &mut _mm);
} else {
do spawn_sched(SingleThreaded) {
let config = config.take();
let testfile = testfile.take();
let mut _mm = MetricMap::new();
run_metrics(config, testfile, &mut _mm);
}
}
}
pub fn run_metrics(config: config, testfile: ~str, mm: &mut MetricMap) {

View File

@ -104,14 +104,14 @@ fn pandoc_writer(
];
do generic_writer |markdown| {
use std::io::WriterUtil;
debug!("pandoc cmd: %s", pandoc_cmd);
debug!("pandoc args: %s", pandoc_args.connect(" "));
let mut proc = run::Process::new(pandoc_cmd, pandoc_args, run::ProcessOptions::new());
let proc = run::Process::new(pandoc_cmd, pandoc_args,
run::ProcessOptions::new());
let mut proc = proc.unwrap();
proc.input().write_str(markdown);
proc.input().write(markdown.as_bytes());
let output = proc.finish_with_output();
debug!("pandoc result: %i", output.status);

View File

@ -89,7 +89,7 @@ pub fn git_clone_general(source: &str, target: &Path, v: &Version) -> bool {
fn process_output_in_cwd(prog: &str, args: &[~str], cwd: &Path) -> ProcessOutput {
let mut prog = Process::new(prog, args, ProcessOptions{ dir: Some(cwd)
,..ProcessOptions::new()});
,..ProcessOptions::new()}).unwrap();
prog.finish_with_output()
}

View File

@ -112,13 +112,14 @@ fn mk_temp_workspace(short_name: &Path, version: &Version) -> Path {
fn run_git(args: &[~str], env: Option<~[(~str, ~str)]>, cwd: &Path, err_msg: &str) {
let cwd = (*cwd).clone();
let mut prog = run::Process::new("git", args, run::ProcessOptions {
let prog = run::Process::new("git", args, run::ProcessOptions {
env: env,
dir: Some(&cwd),
in_fd: None,
out_fd: None,
err_fd: None
});
let mut prog = prog.unwrap();
let rslt = prog.finish_with_output();
if rslt.status != 0 {
fail!("%s [git returned %?, output = %s, error = %s]", err_msg,
@ -226,7 +227,7 @@ fn command_line_test_with_env(args: &[~str], cwd: &Path, env: Option<~[(~str, ~s
in_fd: None,
out_fd: None,
err_fd: None
});
}).unwrap();
let output = prog.finish_with_output();
debug!("Output from command %s with args %? was %s {%s}[%?]",
cmd, args, str::from_bytes(output.output),
@ -1027,7 +1028,7 @@ fn test_extern_mod() {
test_sysroot().to_str(),
exec_file.to_str());
let mut prog = run::Process::new(rustc.to_str(), [main_file.to_str(),
let prog = run::Process::new(rustc.to_str(), [main_file.to_str(),
~"--sysroot", test_sysroot().to_str(),
~"-o", exec_file.to_str()],
run::ProcessOptions {
@ -1037,6 +1038,7 @@ fn test_extern_mod() {
out_fd: None,
err_fd: None
});
let mut prog = prog.unwrap();
let outp = prog.finish_with_output();
if outp.status != 0 {
fail!("output was %s, error was %s",

View File

@ -71,9 +71,6 @@ pub struct FileStream {
last_nread: int,
}
impl FileStream {
}
impl Reader for FileStream {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
match self.fd.read(buf) {

View File

@ -268,6 +268,9 @@ pub use self::extensions::WriterByteConversions;
/// Synchronous, non-blocking file I/O.
pub mod file;
/// Synchronous, in-memory I/O.
pub mod pipe;
/// Synchronous, non-blocking network I/O.
pub mod net {
pub mod tcp;

View File

@ -16,7 +16,7 @@ use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{IoFactory, IoFactoryObject,
RtioSocket, RtioTcpListener,
RtioTcpListenerObject, RtioTcpStream,
RtioTcpStreamObject};
RtioTcpStreamObject, RtioStream};
use rt::local::Local;
pub struct TcpStream(~RtioTcpStreamObject);
@ -69,7 +69,7 @@ impl TcpStream {
impl Reader for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
match (**self).read(buf) {
match (***self).read(buf) {
Ok(read) => Some(read),
Err(ioerr) => {
// EOF is indicated by returning None
@ -86,7 +86,7 @@ impl Reader for TcpStream {
impl Writer for TcpStream {
fn write(&mut self, buf: &[u8]) {
match (**self).write(buf) {
match (***self).write(buf) {
Ok(_) => (),
Err(ioerr) => io_error::cond.raise(ioerr),
}
@ -166,7 +166,7 @@ mod test {
do run_in_newsched_task {
let mut called = false;
do io_error::cond.trap(|e| {
assert!(e.kind == ConnectionRefused);
assert_eq!(e.kind, ConnectionRefused);
called = true;
}).inside {
let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };

77
src/libstd/rt/io/pipe.rs Normal file
View File

@ -0,0 +1,77 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Synchronous, in-memory pipes.
//!
//! Currently these aren't particularly useful, there only exists bindings
//! enough so that pipes can be created to child processes.
use prelude::*;
use super::{Reader, Writer};
use rt::io::{io_error, read_error, EndOfFile};
use rt::local::Local;
use rt::rtio::{RtioPipeObject, RtioStream, IoFactoryObject, IoFactory};
use rt::uv::pipe;
pub struct PipeStream(~RtioPipeObject);
impl PipeStream {
/// Creates a new pipe initialized, but not bound to any particular
/// source/destination
pub fn new() -> Option<PipeStream> {
let pipe = unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
(*io).pipe_init(false)
};
match pipe {
Ok(p) => Some(PipeStream(p)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
}
}
/// Extracts the underlying libuv pipe to be bound to another source.
pub fn uv_pipe(&self) -> pipe::Pipe {
// Did someone say multiple layers of indirection?
(**self).uv_pipe()
}
}
impl Reader for PipeStream {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
match (***self).read(buf) {
Ok(read) => Some(read),
Err(ioerr) => {
// EOF is indicated by returning None
if ioerr.kind != EndOfFile {
read_error::cond.raise(ioerr);
}
return None;
}
}
}
fn eof(&mut self) -> bool { fail!() }
}
impl Writer for PipeStream {
fn write(&mut self, buf: &[u8]) {
match (***self).write(buf) {
Ok(_) => (),
Err(ioerr) => {
io_error::cond.raise(ioerr);
}
}
}
fn flush(&mut self) { fail!() }
}

View File

@ -8,12 +8,14 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc;
use option::*;
use result::*;
use libc::c_int;
use rt::io::IoError;
use super::io::net::ip::{IpAddr, SocketAddr};
use rt::uv;
use rt::uv::uvio;
use path::Path;
use super::io::support::PathLike;
@ -30,6 +32,9 @@ pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
pub type RtioTimerObject = uvio::UvTimer;
pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;
pub type RtioPipeObject = uvio::UvPipeStream;
pub type RtioProcessObject = uvio::UvProcess;
pub type RtioProcessConfig<'self> = uv::process::Config<'self>;
pub trait EventLoop {
fn run(&mut self);
@ -72,6 +77,13 @@ pub trait IoFactory {
fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
-> Result<~RtioFileStream, IoError>;
fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>;
fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError>;
fn spawn(&mut self, config: &RtioProcessConfig) -> Result<~RtioProcessObject, IoError>;
}
pub trait RtioStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
}
pub trait RtioTcpListener : RtioSocket {
@ -80,9 +92,7 @@ pub trait RtioTcpListener : RtioSocket {
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
}
pub trait RtioTcpStream : RtioSocket {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
pub trait RtioTcpStream : RtioSocket + RtioStream {
fn peer_name(&mut self) -> Result<SocketAddr, IoError>;
fn control_congestion(&mut self) -> Result<(), IoError>;
fn nodelay(&mut self) -> Result<(), IoError>;
@ -124,3 +134,9 @@ pub trait RtioFileStream {
fn tell(&self) -> Result<u64, IoError>;
fn flush(&mut self) -> Result<(), IoError>;
}
pub trait RtioProcess {
fn id(&self) -> libc::pid_t;
fn kill(&mut self, signal: int) -> Result<(), IoError>;
fn wait(&mut self) -> int;
}

View File

@ -34,7 +34,7 @@ impl AsyncWatcher {
extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
let status = status_to_maybe_uv_error(watcher, status);
let status = status_to_maybe_uv_error(status);
let data = watcher.get_watcher_data();
let cb = data.async_cb.get_ref();
(*cb)(watcher, status);

View File

@ -11,8 +11,8 @@
use prelude::*;
use ptr::null;
use libc::c_void;
use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf,
status_to_maybe_uv_error_with_loop, UvError};
use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf, UvError};
use rt::uv::status_to_maybe_uv_error;
use rt::uv::uvll;
use rt::uv::uvll::*;
use super::super::io::support::PathLike;
@ -62,7 +62,7 @@ impl FsRequest {
pub fn open_sync<P: PathLike>(loop_: &Loop, path: &P, flags: int, mode: int)
-> Result<int, UvError> {
let result = FsRequest::open_common(loop_, path, flags, mode, None);
sync_cleanup(loop_, result)
sync_cleanup(result)
}
fn unlink_common<P: PathLike>(loop_: &Loop, path: &P, cb: Option<FsCallback>) -> int {
@ -83,11 +83,11 @@ impl FsRequest {
}
pub fn unlink<P: PathLike>(loop_: &Loop, path: &P, cb: FsCallback) {
let result = FsRequest::unlink_common(loop_, path, Some(cb));
sync_cleanup(loop_, result);
sync_cleanup(result);
}
pub fn unlink_sync<P: PathLike>(loop_: &Loop, path: &P) -> Result<int, UvError> {
let result = FsRequest::unlink_common(loop_, path, None);
sync_cleanup(loop_, result)
sync_cleanup(result)
}
pub fn install_req_data(&self, cb: Option<FsCallback>) {
@ -139,9 +139,8 @@ impl NativeHandle<*uvll::uv_fs_t> for FsRequest {
match self { &FsRequest(ptr) => ptr }
}
}
fn sync_cleanup(loop_: &Loop, result: int)
-> Result<int, UvError> {
match status_to_maybe_uv_error_with_loop(loop_.native_handle(), result as i32) {
fn sync_cleanup(result: int) -> Result<int, UvError> {
match status_to_maybe_uv_error(result as i32) {
Some(err) => Err(err),
None => Ok(result)
}
@ -184,7 +183,7 @@ impl FileDescriptor {
pub fn write_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64)
-> Result<int, UvError> {
let result = self.write_common(loop_, buf, offset, None);
sync_cleanup(loop_, result)
sync_cleanup(result)
}
fn read_common(&mut self, loop_: &Loop, buf: Buf,
@ -212,7 +211,7 @@ impl FileDescriptor {
pub fn read_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64)
-> Result<int, UvError> {
let result = self.read_common(loop_, buf, offset, None);
sync_cleanup(loop_, result)
sync_cleanup(result)
}
fn close_common(self, loop_: &Loop, cb: Option<FsCallback>) -> int {
@ -234,12 +233,11 @@ impl FileDescriptor {
}
pub fn close_sync(self, loop_: &Loop) -> Result<int, UvError> {
let result = self.close_common(loop_, None);
sync_cleanup(loop_, result)
sync_cleanup(result)
}
}
extern fn compl_cb(req: *uv_fs_t) {
let mut req: FsRequest = NativeHandle::from_native_handle(req);
let loop_ = req.get_loop();
// pull the user cb out of the req data
let cb = {
let data = req.get_req_data();
@ -250,8 +248,7 @@ extern fn compl_cb(req: *uv_fs_t) {
// in uv_fs_open calls, the result will be the fd in the
// case of success, otherwise it's -1 indicating an error
let result = req.get_result();
let status = status_to_maybe_uv_error_with_loop(
loop_.native_handle(), result);
let status = status_to_maybe_uv_error(result);
// we have a req and status, call the user cb..
// only giving the user a ref to the FsRequest, as we
// have to clean it up, afterwards (and they aren't really

View File

@ -43,7 +43,7 @@ impl IdleWatcher {
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
let data = idle_watcher.get_watcher_data();
let cb: &IdleCallback = data.idle_cb.get_ref();
let status = status_to_maybe_uv_error(idle_watcher, status);
let status = status_to_maybe_uv_error(status);
(*cb)(idle_watcher, status);
}
}
@ -57,7 +57,7 @@ impl IdleWatcher {
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
let data = idle_watcher.get_watcher_data();
let cb: &IdleCallback = data.idle_cb.get_ref();
let status = status_to_maybe_uv_error(idle_watcher, status);
let status = status_to_maybe_uv_error(status);
(*cb)(idle_watcher, status);
}
}

View File

@ -58,6 +58,8 @@ pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher};
pub use self::idle::IdleWatcher;
pub use self::timer::TimerWatcher;
pub use self::async::AsyncWatcher;
pub use self::process::Process;
pub use self::pipe::Pipe;
/// The implementation of `rtio` for libuv
pub mod uvio;
@ -70,6 +72,8 @@ pub mod net;
pub mod idle;
pub mod timer;
pub mod async;
pub mod process;
pub mod pipe;
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
@ -126,6 +130,8 @@ pub type NullCallback = ~fn();
pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
pub type FsCallback = ~fn(&mut FsRequest, Option<UvError>);
// first int is exit_status, second is term_signal
pub type ExitCallback = ~fn(Process, int, int, Option<UvError>);
pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
@ -143,7 +149,8 @@ struct WatcherData {
timer_cb: Option<TimerCallback>,
async_cb: Option<AsyncCallback>,
udp_recv_cb: Option<UdpReceiveCallback>,
udp_send_cb: Option<UdpSendCallback>
udp_send_cb: Option<UdpSendCallback>,
exit_cb: Option<ExitCallback>,
}
pub trait WatcherInterop {
@ -175,7 +182,8 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
timer_cb: None,
async_cb: None,
udp_recv_cb: None,
udp_send_cb: None
udp_send_cb: None,
exit_cb: None,
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(self.native_handle(), data);
@ -202,12 +210,12 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
// XXX: Need to define the error constants like EOF so they can be
// compared to the UvError type
pub struct UvError(uvll::uv_err_t);
pub struct UvError(c_int);
impl UvError {
pub fn name(&self) -> ~str {
unsafe {
let inner = match self { &UvError(ref a) => a };
let inner = match self { &UvError(a) => a };
let name_str = uvll::err_name(inner);
assert!(name_str.is_not_null());
from_c_str(name_str)
@ -216,7 +224,7 @@ impl UvError {
pub fn desc(&self) -> ~str {
unsafe {
let inner = match self { &UvError(ref a) => a };
let inner = match self { &UvError(a) => a };
let desc_str = uvll::strerror(inner);
assert!(desc_str.is_not_null());
from_c_str(desc_str)
@ -224,7 +232,7 @@ impl UvError {
}
pub fn is_eof(&self) -> bool {
self.code == uvll::EOF
**self == uvll::EOF
}
}
@ -236,18 +244,10 @@ impl ToStr for UvError {
#[test]
fn error_smoke_test() {
let err = uvll::uv_err_t { code: 1, sys_errno_: 1 };
let err: UvError = UvError(err);
let err: UvError = UvError(uvll::EOF);
assert_eq!(err.to_str(), ~"EOF: end of file");
}
pub fn last_uv_error<H, W: Watcher + NativeHandle<*H>>(watcher: &W) -> UvError {
unsafe {
let loop_ = watcher.event_loop();
UvError(uvll::last_error(loop_.native_handle()))
}
}
pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
unsafe {
// Importing error constants
@ -255,10 +255,10 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
use rt::io::*;
// uv error descriptions are static
let c_desc = uvll::strerror(&*uverr);
let c_desc = uvll::strerror(*uverr);
let desc = str::raw::c_str_to_static_slice(c_desc);
let kind = match uverr.code {
let kind = match *uverr {
UNKNOWN => OtherIoError,
OK => OtherIoError,
EOF => EndOfFile,
@ -266,8 +266,8 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
ECONNREFUSED => ConnectionRefused,
ECONNRESET => ConnectionReset,
EPIPE => BrokenPipe,
_ => {
rtdebug!("uverr.code %u", uverr.code as uint);
err => {
rtdebug!("uverr.code %d", err as int);
// XXX: Need to map remaining uv error types
OtherIoError
}
@ -281,31 +281,12 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
}
}
/// Given a uv handle, convert a callback status to a UvError
pub fn status_to_maybe_uv_error_with_loop(
loop_: *uvll::uv_loop_t,
status: c_int) -> Option<UvError> {
if status != -1 {
/// Convert a callback status to a UvError
pub fn status_to_maybe_uv_error(status: c_int) -> Option<UvError> {
if status >= 0 {
None
} else {
unsafe {
rtdebug!("loop: %x", loop_ as uint);
let err = uvll::last_error(loop_);
Some(UvError(err))
}
}
}
/// Given a uv handle, convert a callback status to a UvError
pub fn status_to_maybe_uv_error<T, U: Watcher + NativeHandle<*T>>(handle: U,
status: c_int) -> Option<UvError> {
if status != -1 {
None
} else {
unsafe {
rtdebug!("handle: %x", handle.native_handle() as uint);
let loop_ = uvll::get_loop_for_uv_handle(handle.native_handle());
status_to_maybe_uv_error_with_loop(loop_, status)
}
Some(UvError(status))
}
}

View File

@ -16,7 +16,6 @@ use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback
use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
status_to_maybe_uv_error};
use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
use rt::uv::last_uv_error;
use vec;
use str;
use from_str::{FromStr};
@ -137,7 +136,7 @@ impl StreamWatcher {
rtdebug!("buf len: %d", buf.len as int);
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
let status = status_to_maybe_uv_error(stream_watcher, nread as c_int);
let status = status_to_maybe_uv_error(nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
}
}
@ -167,7 +166,7 @@ impl StreamWatcher {
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
let status = status_to_maybe_uv_error(stream_watcher, status);
let status = status_to_maybe_uv_error(status);
cb(stream_watcher, status);
}
}
@ -232,7 +231,7 @@ impl TcpWatcher {
};
match result {
0 => Ok(()),
_ => Err(last_uv_error(self)),
_ => Err(UvError(result)),
}
}
}
@ -260,7 +259,7 @@ impl TcpWatcher {
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
let status = status_to_maybe_uv_error(stream_watcher, status);
let status = status_to_maybe_uv_error(status);
cb(stream_watcher, status);
}
}
@ -283,7 +282,7 @@ impl TcpWatcher {
rtdebug!("connection_cb");
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
let status = status_to_maybe_uv_error(stream_watcher, status);
let status = status_to_maybe_uv_error(status);
(*cb)(stream_watcher, status);
}
}
@ -327,7 +326,7 @@ impl UdpWatcher {
};
match result {
0 => Ok(()),
_ => Err(last_uv_error(self)),
_ => Err(UvError(result)),
}
}
}
@ -360,7 +359,7 @@ impl UdpWatcher {
rtdebug!("buf len: %d", buf.len as int);
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
let status = status_to_maybe_uv_error(udp_watcher, nread as c_int);
let status = status_to_maybe_uv_error(nread as c_int);
let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
(*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
}
@ -395,7 +394,7 @@ impl UdpWatcher {
let mut udp_watcher = send_request.handle();
send_request.delete();
let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
let status = status_to_maybe_uv_error(udp_watcher, status);
let status = status_to_maybe_uv_error(status);
cb(udp_watcher, status);
}
}

66
src/libstd/rt/uv/pipe.rs Normal file
View File

@ -0,0 +1,66 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use libc;
use rt::uv;
use rt::uv::net;
use rt::uv::uvll;
pub struct Pipe(*uvll::uv_pipe_t);
impl uv::Watcher for Pipe {}
impl Pipe {
pub fn new(loop_: &uv::Loop, ipc: bool) -> Pipe {
unsafe {
let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
assert!(handle.is_not_null());
let ipc = ipc as libc::c_int;
assert_eq!(uvll::pipe_init(loop_.native_handle(), handle, ipc), 0);
let mut ret: Pipe =
uv::NativeHandle::from_native_handle(handle);
ret.install_watcher_data();
ret
}
}
pub fn as_stream(&self) -> net::StreamWatcher {
net::StreamWatcher(**self as *uvll::uv_stream_t)
}
pub fn close(self, cb: uv::NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_pipe_t) {
let mut process: Pipe = uv::NativeHandle::from_native_handle(handle);
process.get_watcher_data().close_cb.take_unwrap()();
process.drop_watcher_data();
unsafe { uvll::free_handle(handle as *libc::c_void) }
}
}
}
impl uv::NativeHandle<*uvll::uv_pipe_t> for Pipe {
fn from_native_handle(handle: *uvll::uv_pipe_t) -> Pipe {
Pipe(handle)
}
fn native_handle(&self) -> *uvll::uv_pipe_t {
match self { &Pipe(ptr) => ptr }
}
}

264
src/libstd/rt/uv/process.rs Normal file
View File

@ -0,0 +1,264 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use libc;
use ptr;
use vec;
use cell::Cell;
use rt::uv;
use rt::uv::net;
use rt::uv::pipe;
use rt::uv::uvll;
/// A process wraps the handle of the underlying uv_process_t.
pub struct Process(*uvll::uv_process_t);
/// This configuration describes how a new process should be spawned. This is
/// translated to libuv's own configuration
pub struct Config<'self> {
/// Path to the program to run
program: &'self str,
/// Arguments to pass to the program (doesn't include the program itself)
args: &'self [~str],
/// Optional environment to specify for the program. If this is None, then
/// it will inherit the current process's environment.
env: Option<&'self [(~str, ~str)]>,
/// Optional working directory for the new process. If this is None, then
/// the current directory of the running process is inherited.
cwd: Option<&'self str>,
/// Any number of streams/file descriptors/pipes may be attached to this
/// process. This list enumerates the file descriptors and such for the
/// process to be spawned, and the file descriptors inherited will start at
/// 0 and go to the length of this array.
///
/// Standard file descriptors are:
///
/// 0 - stdin
/// 1 - stdout
/// 2 - stderr
io: &'self [StdioContainer]
}
/// Describes what to do with a standard io stream for a child process.
pub enum StdioContainer {
/// This stream will be ignored. This is the equivalent of attaching the
/// stream to `/dev/null`
Ignored,
/// The specified file descriptor is inherited for the stream which it is
/// specified for.
InheritFd(libc::c_int),
/// The specified libuv stream is inherited for the corresponding file
/// descriptor it is assigned to.
InheritStream(net::StreamWatcher),
/// Creates a pipe for the specified file descriptor which will be directed
/// into the previously-initialized pipe passed in.
///
/// The first boolean argument is whether the pipe is readable, and the
/// second is whether it is writable. These properties are from the view of
/// the *child* process, not the parent process.
CreatePipe(pipe::Pipe, bool /* readable */, bool /* writable */),
}
impl uv::Watcher for Process {}
impl Process {
/// Creates a new process, ready to spawn inside an event loop
pub fn new() -> Process {
let handle = unsafe { uvll::malloc_handle(uvll::UV_PROCESS) };
assert!(handle.is_not_null());
let mut ret: Process = uv::NativeHandle::from_native_handle(handle);
ret.install_watcher_data();
return ret;
}
/// Spawn a new process inside the specified event loop.
///
/// The `config` variable will be passed down to libuv, and the `exit_cb`
/// will be run only once, when the process exits.
///
/// Returns either the corresponding process object or an error which
/// occurred.
pub fn spawn(&mut self, loop_: &uv::Loop, config: &Config,
exit_cb: uv::ExitCallback) -> Result<(), uv::UvError> {
let cwd = config.cwd.map_move(|s| s.to_c_str());
extern fn on_exit(p: *uvll::uv_process_t,
exit_status: libc::c_int,
term_signal: libc::c_int) {
let mut p: Process = uv::NativeHandle::from_native_handle(p);
let err = match exit_status {
0 => None,
_ => uv::status_to_maybe_uv_error(-1)
};
p.get_watcher_data().exit_cb.take_unwrap()(p,
exit_status as int,
term_signal as int,
err);
}
let mut stdio = vec::with_capacity::<uvll::uv_stdio_container_t>(
config.io.len());
unsafe {
vec::raw::set_len(&mut stdio, config.io.len());
for (slot, &other) in stdio.iter().zip(config.io.iter()) {
set_stdio(slot as *uvll::uv_stdio_container_t, other);
}
}
let exit_cb = Cell::new(exit_cb);
do with_argv(config.program, config.args) |argv| {
do with_env(config.env) |envp| {
let options = uvll::uv_process_options_t {
exit_cb: on_exit,
file: unsafe { *argv },
args: argv,
env: envp,
cwd: match cwd {
Some(ref cwd) => cwd.with_ref(|p| p),
None => ptr::null(),
},
flags: 0,
stdio_count: stdio.len() as libc::c_int,
stdio: stdio.as_imm_buf(|p, _| p),
uid: 0,
gid: 0,
};
match unsafe {
uvll::spawn(loop_.native_handle(), **self, options)
} {
0 => {
(*self).get_watcher_data().exit_cb = Some(exit_cb.take());
Ok(())
}
err => Err(uv::UvError(err))
}
}
}
}
/// Sends a signal to this process.
///
/// This is a wrapper around `uv_process_kill`
pub fn kill(&self, signum: int) -> Result<(), uv::UvError> {
match unsafe {
uvll::process_kill(self.native_handle(), signum as libc::c_int)
} {
0 => Ok(()),
err => Err(uv::UvError(err))
}
}
/// Returns the process id of a spawned process
pub fn pid(&self) -> libc::pid_t {
unsafe { uvll::process_pid(**self) as libc::pid_t }
}
/// Closes this handle, invoking the specified callback once closed
pub fn close(self, cb: uv::NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_process_t) {
let mut process: Process = uv::NativeHandle::from_native_handle(handle);
process.get_watcher_data().close_cb.take_unwrap()();
process.drop_watcher_data();
unsafe { uvll::free_handle(handle as *libc::c_void) }
}
}
}
unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, io: StdioContainer) {
match io {
Ignored => { uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); }
InheritFd(fd) => {
uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD);
uvll::set_stdio_container_fd(dst, fd);
}
InheritStream(stream) => {
uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_STREAM);
uvll::set_stdio_container_stream(dst, stream.native_handle());
}
CreatePipe(pipe, readable, writable) => {
let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int;
if readable {
flags |= uvll::STDIO_READABLE_PIPE as libc::c_int;
}
if writable {
flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
}
uvll::set_stdio_container_flags(dst, flags);
uvll::set_stdio_container_stream(dst,
pipe.as_stream().native_handle());
}
}
}
/// Converts the program and arguments to the argv array expected by libuv
fn with_argv<T>(prog: &str, args: &[~str], f: &fn(**libc::c_char) -> T) -> T {
// First, allocation space to put all the C-strings (we need to have
// ownership of them somewhere
let mut c_strs = vec::with_capacity(args.len() + 1);
c_strs.push(prog.to_c_str());
for arg in args.iter() {
c_strs.push(arg.to_c_str());
}
// Next, create the char** array
let mut c_args = vec::with_capacity(c_strs.len() + 1);
for s in c_strs.iter() {
c_args.push(s.with_ref(|p| p));
}
c_args.push(ptr::null());
c_args.as_imm_buf(|buf, _| f(buf))
}
/// Converts the environment to the env array expected by libuv
fn with_env<T>(env: Option<&[(~str, ~str)]>, f: &fn(**libc::c_char) -> T) -> T {
let env = match env {
Some(s) => s,
None => { return f(ptr::null()); }
};
// As with argv, create some temporary storage and then the actual array
let mut envp = vec::with_capacity(env.len());
for &(ref key, ref value) in env.iter() {
envp.push(fmt!("%s=%s", *key, *value).to_c_str());
}
let mut c_envp = vec::with_capacity(envp.len() + 1);
for s in envp.iter() {
c_envp.push(s.with_ref(|p| p));
}
c_envp.push(ptr::null());
c_envp.as_imm_buf(|buf, _| f(buf))
}
impl uv::NativeHandle<*uvll::uv_process_t> for Process {
fn from_native_handle(handle: *uvll::uv_process_t) -> Process {
Process(handle)
}
fn native_handle(&self) -> *uvll::uv_process_t {
match self { &Process(ptr) => ptr }
}
}

View File

@ -43,7 +43,7 @@ impl TimerWatcher {
let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
let data = watcher.get_watcher_data();
let cb = data.timer_cb.get_ref();
let status = status_to_maybe_uv_error(watcher, status);
let status = status_to_maybe_uv_error(status);
(*cb)(watcher, status);
}
}

View File

@ -13,7 +13,7 @@ use cast::transmute;
use cast;
use cell::Cell;
use clone::Clone;
use libc::{c_int, c_uint, c_void};
use libc::{c_int, c_uint, c_void, pid_t};
use ops::Drop;
use option::*;
use ptr;
@ -22,6 +22,7 @@ use result::*;
use rt::io::IoError;
use rt::io::net::ip::{SocketAddr, IpAddr};
use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
use rt::kill::BlockedTask;
use rt::local::Local;
use rt::rtio::*;
use rt::sched::{Scheduler, SchedHandle};
@ -148,7 +149,7 @@ fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
};
if r != 0 {
let status = status_to_maybe_uv_error(handle, r);
let status = status_to_maybe_uv_error(r);
return Err(uv_error_to_io_error(status.unwrap()));
}
@ -591,6 +592,63 @@ impl IoFactory for UvIoFactory {
assert!(!result_cell.is_empty());
return result_cell.take();
}
fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError> {
let home = get_handle_to_current_scheduler!();
Ok(~UvPipeStream { pipe: Pipe::new(self.uv_loop(), ipc), home: home })
}
fn spawn(&mut self,
config: &process::Config) -> Result<~RtioProcessObject, IoError> {
// Sadly, we must create the UvProcess before we actually call uv_spawn
// so that the exit_cb can close over it and notify it when the process
// has exited.
let mut ret = ~UvProcess {
process: Process::new(),
home: None,
exit_status: None,
term_signal: None,
exit_error: None,
descheduled: None,
};
let ret_ptr = unsafe {
*cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
};
// The purpose of this exit callback is to record the data about the
// exit and then wake up the task which may be waiting for the process
// to exit. This is all performed in the current io-loop, and the
// implementation of UvProcess ensures that reading these fields always
// occurs on the current io-loop.
let exit_cb: ExitCallback = |_, exit_status, term_signal, error| {
unsafe {
assert!((*ret_ptr).exit_status.is_none());
(*ret_ptr).exit_status = Some(exit_status);
(*ret_ptr).term_signal = Some(term_signal);
(*ret_ptr).exit_error = error;
match (*ret_ptr).descheduled.take() {
Some(task) => {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task);
}
None => {}
}
}
};
match ret.process.spawn(self.uv_loop(), config, exit_cb) {
Ok(()) => {
// Only now do we actually get a handle to this scheduler.
ret.home = Some(get_handle_to_current_scheduler!());
Ok(ret)
}
Err(uverr) => {
// We still need to close the process handle we created, but
// that's taken care for us in the destructor of UvProcess
Err(uv_error_to_io_error(uverr))
}
}
}
}
pub struct UvTcpListener {
@ -679,7 +737,7 @@ impl RtioTcpListener for UvTcpListener {
uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
};
match status_to_maybe_uv_error(self_.watcher(), r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -692,7 +750,7 @@ impl RtioTcpListener for UvTcpListener {
uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
};
match status_to_maybe_uv_error(self_.watcher(), r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -700,40 +758,15 @@ impl RtioTcpListener for UvTcpListener {
}
}
pub struct UvTcpStream {
watcher: TcpWatcher,
home: SchedHandle,
trait UvStream: HomingIO {
fn as_stream(&mut self) -> StreamWatcher;
}
impl HomingIO for UvTcpStream {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl Drop for UvTcpStream {
fn drop(&self) {
// XXX need mutable finalizer
let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
do this.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher.as_stream().close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
}
}
impl RtioSocket for UvTcpStream {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(Tcp, self_.watcher)
}
}
}
impl RtioTcpStream for UvTcpStream {
// FIXME(#3429) I would rather this be `impl<T: UvStream> RtioStream for T` but
// that has conflicts with other traits that also have methods
// called `read` and `write`
macro_rules! rtiostream(($t:ident) => {
impl RtioStream for $t {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
@ -747,7 +780,7 @@ impl RtioTcpStream for UvTcpStream {
let alloc: AllocCallback = |_| unsafe {
slice_to_uv_buf(*buf_ptr)
};
let mut watcher = self_.watcher.as_stream();
let mut watcher = self_.as_stream();
do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
// Stop reading so that no read callbacks are
@ -783,7 +816,7 @@ impl RtioTcpStream for UvTcpStream {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let mut watcher = self_.watcher.as_stream();
let mut watcher = self_.as_stream();
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
@ -802,7 +835,85 @@ impl RtioTcpStream for UvTcpStream {
result_cell.take()
}
}
}
})
rtiostream!(UvPipeStream)
rtiostream!(UvTcpStream)
pub struct UvPipeStream {
pipe: Pipe,
home: SchedHandle,
}
impl UvStream for UvPipeStream {
fn as_stream(&mut self) -> StreamWatcher { self.pipe.as_stream() }
}
impl HomingIO for UvPipeStream {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl Drop for UvPipeStream {
fn drop(&self) {
// FIXME(#4330): should not need a transmute
let this = unsafe { cast::transmute_mut(self) };
do this.home_for_io |self_| {
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.pipe.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
}
}
impl UvPipeStream {
pub fn uv_pipe(&self) -> Pipe { self.pipe }
}
pub struct UvTcpStream {
watcher: TcpWatcher,
home: SchedHandle,
}
impl HomingIO for UvTcpStream {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl Drop for UvTcpStream {
fn drop(&self) {
// FIXME(#4330): should not need a transmute
let this = unsafe { cast::transmute_mut(self) };
do this.home_for_io |self_| {
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher.as_stream().close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
}
}
impl UvStream for UvTcpStream {
fn as_stream(&mut self) -> StreamWatcher { self.watcher.as_stream() }
}
impl RtioSocket for UvTcpStream {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(Tcp, self_.watcher)
}
}
}
impl RtioTcpStream for UvTcpStream {
fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(TcpPeer, self_.watcher)
@ -813,7 +924,7 @@ impl RtioTcpStream for UvTcpStream {
do self.home_for_io |self_| {
let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -824,7 +935,7 @@ impl RtioTcpStream for UvTcpStream {
do self.home_for_io |self_| {
let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -838,7 +949,7 @@ impl RtioTcpStream for UvTcpStream {
delay_in_seconds as c_uint)
};
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -851,7 +962,7 @@ impl RtioTcpStream for UvTcpStream {
uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
};
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -963,7 +1074,7 @@ impl RtioUdpSocket for UvUdpSocket {
}
};
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -979,7 +1090,7 @@ impl RtioUdpSocket for UvUdpSocket {
}
};
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -993,7 +1104,7 @@ impl RtioUdpSocket for UvUdpSocket {
uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
};
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -1007,7 +1118,7 @@ impl RtioUdpSocket for UvUdpSocket {
uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
};
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -1021,7 +1132,7 @@ impl RtioUdpSocket for UvUdpSocket {
uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
};
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -1035,7 +1146,7 @@ impl RtioUdpSocket for UvUdpSocket {
uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
};
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -1049,7 +1160,7 @@ impl RtioUdpSocket for UvUdpSocket {
uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
};
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -1063,7 +1174,7 @@ impl RtioUdpSocket for UvUdpSocket {
uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
};
match status_to_maybe_uv_error(self_.watcher, r) {
match status_to_maybe_uv_error(r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -1250,6 +1361,89 @@ impl RtioFileStream for UvFileStream {
}
}
pub struct UvProcess {
process: process::Process,
// Sadly, this structure must be created before we return it, so in that
// brief interim the `home` is None.
home: Option<SchedHandle>,
// All None until the process exits (exit_error may stay None)
priv exit_status: Option<int>,
priv term_signal: Option<int>,
priv exit_error: Option<UvError>,
// Used to store which task to wake up from the exit_cb
priv descheduled: Option<BlockedTask>,
}
impl HomingIO for UvProcess {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() }
}
impl Drop for UvProcess {
fn drop(&self) {
// FIXME(#4330): should not need a transmute
let this = unsafe { cast::transmute_mut(self) };
let close = |self_: &mut UvProcess| {
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task = Cell::new(task);
do self_.process.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task.take());
}
}
};
// If home is none, then this process never actually successfully
// spawned, so there's no need to switch event loops
if this.home.is_none() {
close(this)
} else {
this.home_for_io(close)
}
}
}
impl RtioProcess for UvProcess {
fn id(&self) -> pid_t {
self.process.pid()
}
fn kill(&mut self, signal: int) -> Result<(), IoError> {
do self.home_for_io |self_| {
match self_.process.kill(signal) {
Ok(()) => Ok(()),
Err(uverr) => Err(uv_error_to_io_error(uverr))
}
}
}
fn wait(&mut self) -> int {
// Make sure (on the home scheduler) that we have an exit status listed
do self.home_for_io |self_| {
match self_.exit_status {
Some(*) => {}
None => {
// If there's no exit code previously listed, then the
// process's exit callback has yet to be invoked. We just
// need to deschedule ourselves and wait to be reawoken.
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
assert!(self_.descheduled.is_none());
self_.descheduled = Some(task);
}
assert!(self_.exit_status.is_some());
}
}
}
self.exit_status.unwrap()
}
}
#[test]
fn test_simple_io_no_connect() {
do run_in_newsched_task {

View File

@ -37,28 +37,74 @@ use libc::{malloc, free};
use libc;
use prelude::*;
use ptr;
use str;
use vec;
pub static UNKNOWN: c_int = -1;
pub static OK: c_int = 0;
pub static EOF: c_int = 1;
pub static EADDRINFO: c_int = 2;
pub static EACCES: c_int = 3;
pub static ECONNREFUSED: c_int = 12;
pub static ECONNRESET: c_int = 13;
pub static EPIPE: c_int = 36;
pub use self::errors::*;
pub struct uv_err_t {
code: c_int,
sys_errno_: c_int
pub static OK: c_int = 0;
pub static EOF: c_int = -4095;
pub static UNKNOWN: c_int = -4094;
// uv-errno.h redefines error codes for windows, but not for unix...
#[cfg(windows)]
pub mod errors {
use libc::c_int;
pub static EACCES: c_int = -4093;
pub static ECONNREFUSED: c_int = -4079;
pub static ECONNRESET: c_int = -4078;
pub static EPIPE: c_int = -4048;
}
#[cfg(not(windows))]
pub mod errors {
use libc;
use libc::c_int;
pub static EACCES: c_int = -libc::EACCES;
pub static ECONNREFUSED: c_int = -libc::ECONNREFUSED;
pub static ECONNRESET: c_int = -libc::ECONNRESET;
pub static EPIPE: c_int = -libc::EPIPE;
}
pub static PROCESS_SETUID: c_int = 1 << 0;
pub static PROCESS_SETGID: c_int = 1 << 1;
pub static PROCESS_WINDOWS_VERBATIM_ARGUMENTS: c_int = 1 << 2;
pub static PROCESS_DETACHED: c_int = 1 << 3;
pub static PROCESS_WINDOWS_HIDE: c_int = 1 << 4;
pub static STDIO_IGNORE: c_int = 0x00;
pub static STDIO_CREATE_PIPE: c_int = 0x01;
pub static STDIO_INHERIT_FD: c_int = 0x02;
pub static STDIO_INHERIT_STREAM: c_int = 0x04;
pub static STDIO_READABLE_PIPE: c_int = 0x10;
pub static STDIO_WRITABLE_PIPE: c_int = 0x20;
pub struct uv_buf_t {
base: *u8,
len: libc::size_t,
}
pub struct uv_process_options_t {
exit_cb: uv_exit_cb,
file: *libc::c_char,
args: **libc::c_char,
env: **libc::c_char,
cwd: *libc::c_char,
flags: libc::c_uint,
stdio_count: libc::c_int,
stdio: *uv_stdio_container_t,
uid: uv_uid_t,
gid: uv_gid_t,
}
// These fields are private because they must be interfaced with through the
// functions below.
pub struct uv_stdio_container_t {
priv flags: libc::c_int,
priv stream: *uv_stream_t,
}
pub type uv_handle_t = c_void;
pub type uv_loop_t = c_void;
pub type uv_idle_t = c_void;
@ -72,6 +118,8 @@ pub type uv_timer_t = c_void;
pub type uv_stream_t = c_void;
pub type uv_fs_t = c_void;
pub type uv_udp_send_t = c_void;
pub type uv_process_t = c_void;
pub type uv_pipe_t = c_void;
#[cfg(stage0)]
pub type uv_idle_cb = *u8;
@ -97,6 +145,8 @@ pub type uv_connection_cb = *u8;
pub type uv_timer_cb = *u8;
#[cfg(stage0)]
pub type uv_write_cb = *u8;
#[cfg(stage0)]
pub type uv_exit_cb = *u8;
#[cfg(not(stage0))]
pub type uv_idle_cb = extern "C" fn(handle: *uv_idle_t,
@ -137,12 +187,21 @@ pub type uv_timer_cb = extern "C" fn(handle: *uv_timer_t,
#[cfg(not(stage0))]
pub type uv_write_cb = extern "C" fn(handle: *uv_write_t,
status: c_int);
#[cfg(not(stage0))]
pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t,
exit_status: c_int,
term_signal: c_int);
pub type sockaddr = c_void;
pub type sockaddr_in = c_void;
pub type sockaddr_in6 = c_void;
pub type sockaddr_storage = c_void;
#[cfg(unix)] pub type uv_uid_t = libc::types::os::arch::posix88::uid_t;
#[cfg(unix)] pub type uv_gid_t = libc::types::os::arch::posix88::gid_t;
#[cfg(windows)] pub type uv_uid_t = libc::c_uchar;
#[cfg(windows)] pub type uv_gid_t = libc::c_uchar;
#[deriving(Eq)]
pub enum uv_handle_type {
UV_UNKNOWN_HANDLE,
@ -487,20 +546,12 @@ pub unsafe fn read_stop(stream: *uv_stream_t) -> c_int {
return rust_uv_read_stop(stream as *c_void);
}
pub unsafe fn last_error(loop_handle: *c_void) -> uv_err_t {
pub unsafe fn strerror(err: c_int) -> *c_char {
#[fixed_stack_segment]; #[inline(never)];
return rust_uv_last_error(loop_handle);
}
pub unsafe fn strerror(err: *uv_err_t) -> *c_char {
#[fixed_stack_segment]; #[inline(never)];
return rust_uv_strerror(err);
}
pub unsafe fn err_name(err: *uv_err_t) -> *c_char {
pub unsafe fn err_name(err: c_int) -> *c_char {
#[fixed_stack_segment]; #[inline(never)];
return rust_uv_err_name(err);
}
@ -654,6 +705,45 @@ pub unsafe fn fs_req_cleanup(req: *uv_fs_t) {
rust_uv_fs_req_cleanup(req);
}
pub unsafe fn spawn(loop_ptr: *c_void, result: *uv_process_t,
options: uv_process_options_t) -> c_int {
#[fixed_stack_segment]; #[inline(never)];
return rust_uv_spawn(loop_ptr, result, options);
}
pub unsafe fn process_kill(p: *uv_process_t, signum: c_int) -> c_int {
#[fixed_stack_segment]; #[inline(never)];
return rust_uv_process_kill(p, signum);
}
pub unsafe fn process_pid(p: *uv_process_t) -> c_int {
#[fixed_stack_segment]; #[inline(never)];
return rust_uv_process_pid(p);
}
pub unsafe fn set_stdio_container_flags(c: *uv_stdio_container_t,
flags: libc::c_int) {
#[fixed_stack_segment]; #[inline(never)];
rust_set_stdio_container_flags(c, flags);
}
pub unsafe fn set_stdio_container_fd(c: *uv_stdio_container_t,
fd: libc::c_int) {
#[fixed_stack_segment]; #[inline(never)];
rust_set_stdio_container_fd(c, fd);
}
pub unsafe fn set_stdio_container_stream(c: *uv_stdio_container_t,
stream: *uv_stream_t) {
#[fixed_stack_segment]; #[inline(never)];
rust_set_stdio_container_stream(c, stream);
}
pub unsafe fn pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int {
#[fixed_stack_segment]; #[inline(never)];
rust_uv_pipe_init(loop_ptr, p, ipc)
}
// data access helpers
pub unsafe fn get_result_from_fs_req(req: *uv_fs_t) -> c_int {
#[fixed_stack_segment]; #[inline(never)];
@ -720,22 +810,6 @@ pub unsafe fn get_len_from_buf(buf: uv_buf_t) -> size_t {
return rust_uv_get_len_from_buf(buf);
}
pub unsafe fn get_last_err_info(uv_loop: *c_void) -> ~str {
let err = last_error(uv_loop);
let err_ptr = ptr::to_unsafe_ptr(&err);
let err_name = str::raw::from_c_str(err_name(err_ptr));
let err_msg = str::raw::from_c_str(strerror(err_ptr));
return fmt!("LIBUV ERROR: name: %s msg: %s",
err_name, err_msg);
}
pub unsafe fn get_last_err_data(uv_loop: *c_void) -> uv_err_data {
let err = last_error(uv_loop);
let err_ptr = ptr::to_unsafe_ptr(&err);
let err_name = str::raw::from_c_str(err_name(err_ptr));
let err_msg = str::raw::from_c_str(strerror(err_ptr));
uv_err_data { err_name: err_name, err_msg: err_msg }
}
pub struct uv_err_data {
err_name: ~str,
@ -768,9 +842,8 @@ extern {
cb: uv_async_cb) -> c_int;
fn rust_uv_tcp_init(loop_handle: *c_void, handle_ptr: *uv_tcp_t) -> c_int;
fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, len: size_t);
fn rust_uv_last_error(loop_handle: *c_void) -> uv_err_t;
fn rust_uv_strerror(err: *uv_err_t) -> *c_char;
fn rust_uv_err_name(err: *uv_err_t) -> *c_char;
fn rust_uv_strerror(err: c_int) -> *c_char;
fn rust_uv_err_name(err: c_int) -> *c_char;
fn rust_uv_ip4_addrp(ip: *u8, port: c_int) -> *sockaddr_in;
fn rust_uv_ip6_addrp(ip: *u8, port: c_int) -> *sockaddr_in6;
fn rust_uv_free_ip4_addr(addr: *sockaddr_in);
@ -856,4 +929,13 @@ extern {
fn rust_uv_set_data_for_req(req: *c_void, data: *c_void);
fn rust_uv_get_base_from_buf(buf: uv_buf_t) -> *u8;
fn rust_uv_get_len_from_buf(buf: uv_buf_t) -> size_t;
fn rust_uv_spawn(loop_ptr: *c_void, outptr: *uv_process_t,
options: uv_process_options_t) -> c_int;
fn rust_uv_process_kill(p: *uv_process_t, signum: c_int) -> c_int;
fn rust_uv_process_pid(p: *uv_process_t) -> c_int;
fn rust_set_stdio_container_flags(c: *uv_stdio_container_t, flags: c_int);
fn rust_set_stdio_container_fd(c: *uv_stdio_container_t, fd: c_int);
fn rust_set_stdio_container_stream(c: *uv_stdio_container_t,
stream: *uv_stream_t);
fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int;
}

File diff suppressed because it is too large Load Diff

@ -1 +1 @@
Subproject commit dfae9c3e958dc086d9c0ab068cd76d196c95a433
Subproject commit ef2bcd134164adcaa072dcb56e62b737fdcb075e

View File

@ -329,20 +329,13 @@ rust_uv_get_len_from_buf(uv_buf_t buf) {
return buf.len;
}
extern "C" uv_err_t
rust_uv_last_error(uv_loop_t* loop) {
return uv_last_error(loop);
}
extern "C" const char*
rust_uv_strerror(uv_err_t* err_ptr) {
uv_err_t err = *err_ptr;
rust_uv_strerror(int err) {
return uv_strerror(err);
}
extern "C" const char*
rust_uv_err_name(uv_err_t* err_ptr) {
uv_err_t err = *err_ptr;
rust_uv_err_name(int err) {
return uv_err_name(err);
}
@ -553,3 +546,37 @@ extern "C" uv_loop_t*
rust_uv_get_loop_from_fs_req(uv_fs_t* req) {
return req->loop;
}
extern "C" int
rust_uv_spawn(uv_loop_t *loop, uv_process_t *p, uv_process_options_t options) {
return uv_spawn(loop, p, options);
}
extern "C" int
rust_uv_process_kill(uv_process_t *p, int signum) {
return uv_process_kill(p, signum);
}
extern "C" void
rust_set_stdio_container_flags(uv_stdio_container_t *c, int flags) {
c->flags = (uv_stdio_flags) flags;
}
extern "C" void
rust_set_stdio_container_fd(uv_stdio_container_t *c, int fd) {
c->data.fd = fd;
}
extern "C" void
rust_set_stdio_container_stream(uv_stdio_container_t *c, uv_stream_t *stream) {
c->data.stream = stream;
}
extern "C" int
rust_uv_process_pid(uv_process_t* p) {
return p->pid;
}
extern "C" int
rust_uv_pipe_init(uv_loop_t *loop, uv_pipe_t* p, int ipc) {
return uv_pipe_init(loop, p, ipc);
}

View File

@ -47,7 +47,6 @@ rust_uv_timer_start
rust_uv_timer_stop
rust_uv_tcp_init
rust_uv_buf_init
rust_uv_last_error
rust_uv_strerror
rust_uv_err_name
rust_uv_ip4_addr
@ -192,3 +191,10 @@ rust_take_change_dir_lock
rust_drop_change_dir_lock
rust_get_test_int
rust_get_task
rust_uv_spawn
rust_uv_process_kill
rust_set_stdio_container_flags
rust_set_stdio_container_fd
rust_set_stdio_container_stream
rust_uv_process_pid
rust_uv_pipe_init

View File

@ -22,13 +22,15 @@ use std::str;
#[test]
fn test_destroy_once() {
let mut p = run::Process::new("echo", [], run::ProcessOptions::new());
let p = run::Process::new("echo", [], run::ProcessOptions::new());
let mut p = p.unwrap();
p.destroy(); // this shouldn't crash (and nor should the destructor)
}
#[test]
fn test_destroy_twice() {
let mut p = run::Process::new("echo", [], run::ProcessOptions::new());
let p = run::Process::new("echo", [], run::ProcessOptions::new());
let mut p = p.unwrap();
p.destroy(); // this shouldnt crash...
p.destroy(); // ...and nor should this (and nor should the destructor)
}
@ -74,7 +76,8 @@ fn test_destroy_actually_kills(force: bool) {
}
// this process will stay alive indefinitely trying to read from stdin
let mut p = run::Process::new(BLOCK_COMMAND, [], run::ProcessOptions::new());
let p = run::Process::new(BLOCK_COMMAND, [], run::ProcessOptions::new());
let mut p = p.unwrap();
assert!(process_exists(p.get_id()));