Merge remote-tracking branch 'brson/nocommupstream2'

This commit is contained in:
Brian Anderson 2013-01-26 14:57:58 -08:00
commit 83ca034d2e
25 changed files with 1682 additions and 1183 deletions

View File

@ -141,109 +141,35 @@ pub mod win32 {
}
}
pub fn getenv(n: &str) -> Option<~str> {
global_env::getenv(n)
}
/*
Accessing environment variables is not generally threadsafe.
This uses a per-runtime lock to serialize access.
XXX: It would probably be appropriate to make this a real global
*/
fn with_env_lock<T>(f: &fn() -> T) -> T {
use private::global::global_data_clone_create;
use private::{Exclusive, exclusive};
pub fn setenv(n: &str, v: &str) {
global_env::setenv(n, v)
struct SharedValue(());
type ValueMutex = Exclusive<SharedValue>;
fn key(_: ValueMutex) { }
unsafe {
let lock: ValueMutex = global_data_clone_create(key, || {
~exclusive(SharedValue(()))
});
lock.with_imm(|_| f() )
}
}
pub fn env() -> ~[(~str,~str)] {
global_env::env()
}
mod global_env {
//! Internal module for serializing access to getenv/setenv
use either;
use libc;
use oldcomm;
use option::Option;
use private;
use str;
use task;
extern mod rustrt {
unsafe fn rust_global_env_chan_ptr() -> *libc::uintptr_t;
}
enum Msg {
MsgGetEnv(~str, oldcomm::Chan<Option<~str>>),
MsgSetEnv(~str, ~str, oldcomm::Chan<()>),
MsgEnv(oldcomm::Chan<~[(~str,~str)]>)
}
pub fn getenv(n: &str) -> Option<~str> {
let env_ch = get_global_env_chan();
let po = oldcomm::Port();
oldcomm::send(env_ch, MsgGetEnv(str::from_slice(n),
oldcomm::Chan(&po)));
oldcomm::recv(po)
}
pub fn setenv(n: &str, v: &str) {
let env_ch = get_global_env_chan();
let po = oldcomm::Port();
oldcomm::send(env_ch, MsgSetEnv(str::from_slice(n),
str::from_slice(v),
oldcomm::Chan(&po)));
oldcomm::recv(po)
}
pub fn env() -> ~[(~str,~str)] {
let env_ch = get_global_env_chan();
let po = oldcomm::Port();
oldcomm::send(env_ch, MsgEnv(oldcomm::Chan(&po)));
oldcomm::recv(po)
}
fn get_global_env_chan() -> oldcomm::Chan<Msg> {
unsafe {
let global_ptr = rustrt::rust_global_env_chan_ptr();
private::chan_from_global_ptr(global_ptr, || {
// FIXME (#2621): This would be a good place to use a very
// small foreign stack
task::task().sched_mode(task::SingleThreaded).unlinked()
}, global_env_task)
}
}
fn global_env_task(msg_po: oldcomm::Port<Msg>) {
unsafe {
do private::weaken_task |weak_po| {
loop {
match oldcomm::select2(msg_po, weak_po) {
either::Left(MsgGetEnv(ref n, resp_ch)) => {
oldcomm::send(resp_ch, impl_::getenv(*n))
}
either::Left(MsgSetEnv(ref n, ref v, resp_ch)) => {
oldcomm::send(resp_ch, impl_::setenv(*n, *v))
}
either::Left(MsgEnv(resp_ch)) => {
oldcomm::send(resp_ch, impl_::env())
}
either::Right(_) => break
}
}
}
}
}
mod impl_ {
use cast;
use libc;
use option::Option;
use option;
use ptr;
use str;
use vec;
extern mod rustrt {
unsafe fn rust_env_pairs() -> ~[~str];
}
pub fn env() -> ~[(~str,~str)] {
unsafe {
do with_env_lock {
let mut pairs = ~[];
for vec::each(rustrt::rust_env_pairs()) |p| {
let vs = str::splitn_char(*p, '=', 1u);
@ -253,23 +179,27 @@ mod global_env {
move pairs
}
}
}
#[cfg(unix)]
pub fn getenv(n: &str) -> Option<~str> {
#[cfg(unix)]
pub fn getenv(n: &str) -> Option<~str> {
unsafe {
do with_env_lock {
let s = str::as_c_str(n, |s| libc::getenv(s));
return if ptr::null::<u8>() == cast::reinterpret_cast(&s) {
if ptr::null::<u8>() == cast::reinterpret_cast(&s) {
option::None::<~str>
} else {
let s = cast::reinterpret_cast(&s);
option::Some::<~str>(str::raw::from_buf(s))
};
}
}
}
}
#[cfg(windows)]
pub fn getenv(n: &str) -> Option<~str> {
#[cfg(windows)]
pub fn getenv(n: &str) -> Option<~str> {
unsafe {
do with_env_lock {
use os::win32::{as_utf16_p, fill_utf16_buf_and_decode};
do as_utf16_p(n) |u| {
do fill_utf16_buf_and_decode() |buf, sz| {
@ -278,11 +208,13 @@ mod global_env {
}
}
}
}
#[cfg(unix)]
pub fn setenv(n: &str, v: &str) {
#[cfg(unix)]
pub fn setenv(n: &str, v: &str) {
unsafe {
do with_env_lock {
do str::as_c_str(n) |nbuf| {
do str::as_c_str(v) |vbuf| {
libc::funcs::posix01::unistd::setenv(nbuf, vbuf, 1);
@ -290,11 +222,13 @@ mod global_env {
}
}
}
}
#[cfg(windows)]
pub fn setenv(n: &str, v: &str) {
#[cfg(windows)]
pub fn setenv(n: &str, v: &str) {
unsafe {
do with_env_lock {
use os::win32::as_utf16_p;
do as_utf16_p(n) |nbuf| {
do as_utf16_p(v) |vbuf| {
@ -303,8 +237,6 @@ mod global_env {
}
}
}
}
}
pub fn fdopen(fd: c_int) -> *FILE {

View File

@ -1286,6 +1286,16 @@ pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
(port, chan)
}
impl<T: Owned> PortOne<T> {
fn recv(self) -> T { recv_one(self) }
fn try_recv(self) -> Option<T> { try_recv_one(self) }
}
impl<T: Owned> ChanOne<T> {
fn send(self, data: T) { send_one(self, data) }
fn try_send(self, data: T) -> bool { try_send_one(self, data) }
}
/**
* Receive a message from a oneshot pipe, failing if the connection was
* closed.

View File

@ -18,7 +18,6 @@
use cast;
use iter;
use libc;
use oldcomm;
use option;
use pipes;
use prelude::*;
@ -28,10 +27,17 @@ use task;
use task::{TaskBuilder, atomically};
use uint;
#[path = "private/at_exit.rs"]
pub mod at_exit;
#[path = "private/global.rs"]
pub mod global;
#[path = "private/finally.rs"]
pub mod finally;
#[path = "private/weak_task.rs"]
pub mod weak_task;
extern mod rustrt {
#[legacy_exports];
unsafe fn rust_task_weaken(ch: rust_port_id);
unsafe fn rust_task_unweaken(ch: rust_port_id);
unsafe fn rust_create_little_lock() -> rust_little_lock;
unsafe fn rust_destroy_little_lock(lock: rust_little_lock);
@ -87,11 +93,6 @@ fn test_run_in_bare_thread() {
}
}
#[allow(non_camel_case_types)] // runtime type
type rust_port_id = uint;
type GlobalPtr = *libc::uintptr_t;
fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
unsafe {
let old = rusti::atomic_cxchg(address, oldval, newval);
@ -99,255 +100,6 @@ fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
}
}
/**
* Atomically gets a channel from a pointer to a pointer-sized memory location
* or, if no channel exists creates and installs a new channel and sets up a
* new task to receive from it.
*/
pub unsafe fn chan_from_global_ptr<T: Owned>(
global: GlobalPtr,
task_fn: fn() -> task::TaskBuilder,
f: fn~(oldcomm::Port<T>)
) -> oldcomm::Chan<T> {
enum Msg {
Proceed,
Abort
}
log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
let is_probably_zero = *global == 0u;
log(debug,~"after is_prob_zero check");
if is_probably_zero {
log(debug,~"is probably zero...");
// There's no global channel. We must make it
let (setup1_po, setup1_ch) = pipes::stream();
let (setup2_po, setup2_ch) = pipes::stream();
// FIXME #4422: Ugly type inference hint
let setup2_po: pipes::Port<Msg> = setup2_po;
do task_fn().spawn |move f, move setup1_ch, move setup2_po| {
let po = oldcomm::Port::<T>();
let ch = oldcomm::Chan(&po);
setup1_ch.send(ch);
// Wait to hear if we are the official instance of
// this global task
match setup2_po.recv() {
Proceed => f(move po),
Abort => ()
}
};
log(debug,~"before setup recv..");
// This is the proposed global channel
let ch = setup1_po.recv();
// 0 is our sentinal value. It is not a valid channel
assert *ch != 0;
// Install the channel
log(debug,~"BEFORE COMPARE AND SWAP");
let swapped = compare_and_swap(
cast::reinterpret_cast(&global),
0, cast::reinterpret_cast(&ch));
log(debug,fmt!("AFTER .. swapped? %?", swapped));
if swapped {
// Success!
setup2_ch.send(Proceed);
ch
} else {
// Somebody else got in before we did
setup2_ch.send(Abort);
cast::reinterpret_cast(&*global)
}
} else {
log(debug, ~"global != 0");
cast::reinterpret_cast(&*global)
}
}
#[test]
pub fn test_from_global_chan1() {
// This is unreadable, right?
// The global channel
let globchan = 0;
let globchanp = ptr::addr_of(&globchan);
// Create the global channel, attached to a new task
let ch = unsafe {
do chan_from_global_ptr(globchanp, task::task) |po| {
let ch = oldcomm::recv(po);
oldcomm::send(ch, true);
let ch = oldcomm::recv(po);
oldcomm::send(ch, true);
}
};
// Talk to it
let po = oldcomm::Port();
oldcomm::send(ch, oldcomm::Chan(&po));
assert oldcomm::recv(po) == true;
// This one just reuses the previous channel
let ch = unsafe {
do chan_from_global_ptr(globchanp, task::task) |po| {
let ch = oldcomm::recv(po);
oldcomm::send(ch, false);
}
};
// Talk to the original global task
let po = oldcomm::Port();
oldcomm::send(ch, oldcomm::Chan(&po));
assert oldcomm::recv(po) == true;
}
#[test]
pub fn test_from_global_chan2() {
for iter::repeat(100) {
// The global channel
let globchan = 0;
let globchanp = ptr::addr_of(&globchan);
let resultpo = oldcomm::Port();
let resultch = oldcomm::Chan(&resultpo);
// Spawn a bunch of tasks that all want to compete to
// create the global channel
for uint::range(0, 10) |i| {
do task::spawn {
let ch = unsafe {
do chan_from_global_ptr(
globchanp, task::task) |po| {
for uint::range(0, 10) |_j| {
let ch = oldcomm::recv(po);
oldcomm::send(ch, {i});
}
}
};
let po = oldcomm::Port();
oldcomm::send(ch, oldcomm::Chan(&po));
// We are The winner if our version of the
// task was installed
let winner = oldcomm::recv(po);
oldcomm::send(resultch, winner == i);
}
}
// There should be only one winner
let mut winners = 0u;
for uint::range(0u, 10u) |_i| {
let res = oldcomm::recv(resultpo);
if res { winners += 1u };
}
assert winners == 1u;
}
}
/**
* Convert the current task to a 'weak' task temporarily
*
* As a weak task it will not be counted towards the runtime's set
* of live tasks. When there are no more outstanding live (non-weak) tasks
* the runtime will send an exit message on the provided channel.
*
* This function is super-unsafe. Do not use.
*
* # Safety notes
*
* * Weak tasks must either die on their own or exit upon receipt of
* the exit message. Failure to do so will cause the runtime to never
* exit
* * Tasks must not call `weaken_task` multiple times. This will
* break the kernel's accounting of live tasks.
* * Weak tasks must not be supervised. A supervised task keeps
* a reference to its parent, so the parent will not die.
*/
pub unsafe fn weaken_task(f: fn(oldcomm::Port<()>)) {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
unsafe {
rustrt::rust_task_weaken(cast::reinterpret_cast(&ch));
}
let _unweaken = Unweaken(ch);
f(po);
struct Unweaken {
ch: oldcomm::Chan<()>,
drop {
unsafe {
rustrt::rust_task_unweaken(cast::reinterpret_cast(&self.ch));
}
}
}
fn Unweaken(ch: oldcomm::Chan<()>) -> Unweaken {
Unweaken {
ch: ch
}
}
}
#[test]
pub fn test_weaken_task_then_unweaken() {
do task::try {
unsafe {
do weaken_task |_po| {
}
}
};
}
#[test]
pub fn test_weaken_task_wait() {
do task::spawn_unlinked {
unsafe {
do weaken_task |po| {
oldcomm::recv(po);
}
}
}
}
#[test]
pub fn test_weaken_task_stress() {
// Create a bunch of weak tasks
for iter::repeat(100u) {
do task::spawn {
unsafe {
do weaken_task |_po| {
}
}
}
do task::spawn_unlinked {
unsafe {
do weaken_task |po| {
// Wait for it to tell us to die
oldcomm::recv(po);
}
}
}
}
}
#[test]
#[ignore(cfg(windows))]
pub fn test_weaken_task_fail() {
let res = do task::try {
unsafe {
do weaken_task |_po| {
fail;
}
}
};
assert result::is_err(&res);
}
/****************************************************************************
* Shared state & exclusive ARC
****************************************************************************/
@ -533,6 +285,14 @@ pub unsafe fn clone_shared_mutable_state<T: Owned>(rc: &SharedMutableState<T>)
ArcDestruct((*rc).data)
}
impl<T: Owned> SharedMutableState<T>: Clone {
fn clone(&self) -> SharedMutableState<T> {
unsafe {
clone_shared_mutable_state(self)
}
}
}
/****************************************************************************/
#[allow(non_camel_case_types)] // runtime type

View File

@ -0,0 +1,98 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use sys;
use cast;
use ptr;
use task;
use uint;
use vec;
use rand;
use libc::{c_void, size_t};
/**
Register a function to be run during runtime shutdown.
After all non-weak tasks have exited, registered exit functions will
execute, in random order, on the primary scheduler. Each function runs
in its own unsupervised task.
*/
pub fn at_exit(f: ~fn()) {
unsafe {
let runner: &fn(*ExitFunctions) = exit_runner;
let runner_pair: sys::Closure = cast::transmute(runner);
let runner_ptr = runner_pair.code;
let runner_ptr = cast::transmute(runner_ptr);
rustrt::rust_register_exit_function(runner_ptr, ~f);
}
}
// NB: The double pointer indirection here is because ~fn() is a fat
// pointer and due to FFI problems I am more comfortable making the
// interface use a normal pointer
extern mod rustrt {
fn rust_register_exit_function(runner: *c_void, f: ~~fn());
}
struct ExitFunctions {
// The number of exit functions
count: size_t,
// The buffer of exit functions
start: *~~fn()
}
fn exit_runner(exit_fns: *ExitFunctions) {
let exit_fns = unsafe { &*exit_fns };
let count = (*exit_fns).count;
let start = (*exit_fns).start;
// NB: from_buf memcpys from the source, which will
// give us ownership of the array of functions
let mut exit_fns_vec = unsafe { vec::from_buf(start, count as uint) };
// Let's not make any promises about execution order
rand::Rng().shuffle_mut(exit_fns_vec);
debug!("running %u exit functions", exit_fns_vec.len());
while !exit_fns_vec.is_empty() {
match exit_fns_vec.pop() {
~f => {
task::task().supervised().spawn(f);
}
}
}
}
#[abi = "rust-intrinsic"]
pub extern mod rusti {
fn move_val_init<T>(dst: &mut T, -src: T);
fn init<T>() -> T;
}
#[test]
fn test_at_exit() {
let i = 10;
do at_exit {
debug!("at_exit1");
assert i == 10;
}
}
#[test]
fn test_at_exit_many() {
let i = 10;
for uint::range(20, 100) |j| {
do at_exit {
debug!("at_exit2");
assert i == 10;
assert j > i;
}
}
}

View File

@ -0,0 +1,98 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*!
The Finally trait provides a method, `finally` on
stack closures that emulates Java-style try/finally blocks.
# Example
~~~
do || {
...
}.finally {
alway_run_this();
}
~~~
*/
use ops::Drop;
use task::{spawn, failing};
pub trait Finally<T> {
fn finally(&self, +dtor: &fn()) -> T;
}
impl<T> &fn() -> T: Finally<T> {
// XXX: Should not require a mode here
fn finally(&self, +dtor: &fn()) -> T {
let _d = Finallyalizer {
dtor: dtor
};
(*self)()
}
}
struct Finallyalizer {
dtor: &fn()
}
impl Finallyalizer: Drop {
fn finalize(&self) {
(self.dtor)();
}
}
#[test]
fn test_success() {
let mut i = 0;
do (|| {
i = 10;
}).finally {
assert !failing();
assert i == 10;
i = 20;
}
assert i == 20;
}
#[test]
#[ignore(cfg(windows))]
#[should_fail]
fn test_fail() {
let mut i = 0;
do (|| {
i = 10;
fail;
}).finally {
assert failing();
assert i == 10;
}
}
#[test]
fn test_retval() {
let i = do (fn&() -> int {
10
}).finally { };
assert i == 10;
}
#[test]
fn test_compact() {
// XXX Should be able to use a fn item instead
// of a closure for do_some_fallible_work,
// but it's a type error.
let do_some_fallible_work: &fn() = || { };
fn but_always_run_this_function() { }
do_some_fallible_work.finally(
but_always_run_this_function);
}

View File

@ -0,0 +1,296 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*!
Global data
An interface for creating and retrieving values with global
(per-runtime) scope.
Global values are stored in a map and protected by a single global
mutex. Operations are provided for accessing and cloning the value
under the mutex.
Because all globals go through a single mutex, they should be used
sparingly. The interface is intended to be used with clonable,
atomically reference counted synchronization types, like ARCs, in
which case the value should be cached locally whenever possible to
avoid hitting the mutex.
*/
use cast::{transmute, reinterpret_cast};
use clone::Clone;
use kinds::Owned;
use libc::{c_void, uintptr_t};
use option::{Option, Some, None};
use ops::Drop;
use pipes;
use private::{Exclusive, exclusive};
use private::{SharedMutableState, shared_mutable_state};
use private::{get_shared_immutable_state};
use private::at_exit::at_exit;
use hashmap::linear::LinearMap;
use sys::Closure;
use task::spawn;
use uint;
pub type GlobalDataKey<T: Owned> = &fn(v: T);
pub unsafe fn global_data_clone_create<T: Owned Clone>(
key: GlobalDataKey<T>, create: &fn() -> ~T) -> T {
/*!
* Clone a global value or, if it has not been created,
* first construct the value then return a clone.
*
* # Safety note
*
* Both the clone operation and the constructor are
* called while the global lock is held. Recursive
* use of the global interface in either of these
* operations will result in deadlock.
*/
global_data_clone_create_(key_ptr(key), create)
}
unsafe fn global_data_clone_create_<T: Owned Clone>(
key: uint, create: &fn() -> ~T) -> T {
let mut clone_value: Option<T> = None;
do global_data_modify_(key) |value: Option<~T>| {
match value {
None => {
let value = create();
clone_value = Some(value.clone());
Some(value)
}
Some(value) => {
clone_value = Some(value.clone());
Some(value)
}
}
}
return clone_value.unwrap();
}
unsafe fn global_data_modify<T: Owned>(
key: GlobalDataKey<T>, op: &fn(Option<~T>) -> Option<~T>) {
global_data_modify_(key_ptr(key), op)
}
unsafe fn global_data_modify_<T: Owned>(
key: uint, op: &fn(Option<~T>) -> Option<~T>) {
let mut old_dtor = None;
do get_global_state().with |gs| {
let (maybe_new_value, maybe_dtor) = match gs.map.pop(&key) {
Some((ptr, dtor)) => {
let value: ~T = transmute(ptr);
(op(Some(value)), Some(dtor))
}
None => {
(op(None), None)
}
};
match maybe_new_value {
Some(value) => {
let data: *c_void = transmute(value);
let dtor: ~fn() = match maybe_dtor {
Some(dtor) => dtor,
None => {
let dtor: ~fn() = || unsafe {
let _destroy_value: ~T = transmute(data);
};
dtor
}
};
let value = (data, dtor);
gs.map.insert(key, value);
}
None => {
match maybe_dtor {
Some(dtor) => old_dtor = Some(dtor),
None => ()
}
}
}
}
}
pub unsafe fn global_data_clone<T: Owned Clone>(
key: GlobalDataKey<T>) -> Option<T> {
let mut maybe_clone: Option<T> = None;
do global_data_modify(key) |current| {
match &current {
&Some(~ref value) => {
maybe_clone = Some(value.clone());
}
&None => ()
}
current
}
return maybe_clone;
}
// GlobalState is a map from keys to unique pointers and a
// destructor. Keys are pointers derived from the type of the
// global value. There is a single GlobalState instance per runtime.
struct GlobalState {
map: LinearMap<uint, (*c_void, ~fn())>
}
impl GlobalState: Drop {
fn finalize(&self) {
for self.map.each_value |v| {
match v {
&(_, ref dtor) => (*dtor)()
}
}
}
}
fn get_global_state() -> Exclusive<GlobalState> {
const POISON: int = -1;
// XXX: Doing atomic_cxchg to initialize the global state
// lazily, which wouldn't be necessary with a runtime written
// in Rust
let global_ptr = unsafe { rust_get_global_data_ptr() };
if unsafe { *global_ptr } == 0 {
// Global state doesn't exist yet, probably
// The global state object
let state = GlobalState {
map: LinearMap::new()
};
// It's under a reference-counted mutex
let state = ~exclusive(state);
// Convert it to an integer
let state_ptr: &Exclusive<GlobalState> = state;
let state_i: int = unsafe { transmute(state_ptr) };
// Swap our structure into the global pointer
let prev_i = unsafe { atomic_cxchg(&mut *global_ptr, 0, state_i) };
// Sanity check that we're not trying to reinitialize after shutdown
assert prev_i != POISON;
if prev_i == 0 {
// Successfully installed the global pointer
// Take a handle to return
let clone = state.clone();
// Install a runtime exit function to destroy the global object
do at_exit {
// Poison the global pointer
let prev_i = unsafe {
atomic_cxchg(&mut *global_ptr, state_i, POISON)
};
assert prev_i == state_i;
// Capture the global state object in the at_exit closure
// so that it is destroyed at the right time
let _capture_global_state = &state;
};
return clone;
} else {
// Somebody else initialized the globals first
let state: &Exclusive<GlobalState> = unsafe { transmute(prev_i) };
return state.clone();
}
} else {
let state: &Exclusive<GlobalState> = unsafe {
transmute(*global_ptr)
};
return state.clone();
}
}
fn key_ptr<T: Owned>(key: GlobalDataKey<T>) -> uint {
unsafe {
let closure: Closure = reinterpret_cast(&key);
return transmute(closure.code);
}
}
extern {
fn rust_get_global_data_ptr() -> *mut int;
}
#[abi = "rust-intrinsic"]
extern {
fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int;
}
#[test]
fn test_clone_rc() {
type MyType = SharedMutableState<int>;
fn key(_v: SharedMutableState<int>) { }
for uint::range(0, 100) |_| {
do spawn {
unsafe {
let val = do global_data_clone_create(key) {
~shared_mutable_state(10)
};
assert get_shared_immutable_state(&val) == &10;
}
}
}
}
#[test]
fn test_modify() {
type MyType = SharedMutableState<int>;
fn key(_v: SharedMutableState<int>) { }
unsafe {
do global_data_modify(key) |v| {
match v {
None => {
unsafe {
Some(~shared_mutable_state(10))
}
}
_ => fail
}
}
do global_data_modify(key) |v| {
match v {
Some(sms) => {
let v = get_shared_immutable_state(sms);
assert *v == 10;
None
},
_ => fail
}
}
do global_data_modify(key) |v| {
match v {
None => {
unsafe {
Some(~shared_mutable_state(10))
}
}
_ => fail
}
}
}
}

View File

@ -0,0 +1,207 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*!
Weak tasks
Weak tasks are a runtime feature for building global services that
do not keep the runtime alive. Normally the runtime exits when all
tasks exits, but if a task is weak then the runtime may exit while
it is running, sending a notification to the task that the runtime
is trying to shut down.
*/
use option::{Some, None, swap_unwrap};
use private::at_exit::at_exit;
use private::global::global_data_clone_create;
use private::finally::Finally;
use pipes::{Port, Chan, SharedChan, stream};
use task::{Task, task, spawn};
use task::rt::{task_id, get_task_id};
use hashmap::linear::LinearMap;
use ops::Drop;
type ShutdownMsg = ();
// XXX: This could be a PortOne but I've experienced bugginess
// with oneshot pipes and try_send
pub unsafe fn weaken_task(f: &fn(Port<ShutdownMsg>)) {
let service = global_data_clone_create(global_data_key,
create_global_service);
let (shutdown_port, shutdown_chan) = stream::<ShutdownMsg>();
let shutdown_port = ~mut Some(shutdown_port);
let task = get_task_id();
// Expect the weak task service to be alive
assert service.try_send(RegisterWeakTask(task, shutdown_chan));
unsafe { rust_inc_weak_task_count(); }
do fn&() {
let shutdown_port = swap_unwrap(&mut *shutdown_port);
f(shutdown_port)
}.finally || {
unsafe { rust_dec_weak_task_count(); }
// Service my have already exited
service.send(UnregisterWeakTask(task));
}
}
type WeakTaskService = SharedChan<ServiceMsg>;
type TaskHandle = task_id;
fn global_data_key(_v: WeakTaskService) { }
enum ServiceMsg {
RegisterWeakTask(TaskHandle, Chan<ShutdownMsg>),
UnregisterWeakTask(TaskHandle),
Shutdown
}
fn create_global_service() -> ~WeakTaskService {
debug!("creating global weak task service");
let (port, chan) = stream::<ServiceMsg>();
let port = ~mut Some(port);
let chan = SharedChan(chan);
let chan_clone = chan.clone();
do task().unlinked().spawn {
debug!("running global weak task service");
let port = swap_unwrap(&mut *port);
let port = ~mut Some(port);
do fn&() {
let port = swap_unwrap(&mut *port);
// The weak task service is itself a weak task
debug!("weakening the weak service task");
unsafe { rust_inc_weak_task_count(); }
run_weak_task_service(port);
}.finally {
debug!("unweakening the weak service task");
unsafe { rust_dec_weak_task_count(); }
}
}
do at_exit {
debug!("shutting down weak task service");
chan.send(Shutdown);
}
return ~chan_clone;
}
fn run_weak_task_service(port: Port<ServiceMsg>) {
let mut shutdown_map = LinearMap::new();
loop {
match port.recv() {
RegisterWeakTask(task, shutdown_chan) => {
let previously_unregistered =
shutdown_map.insert(task, shutdown_chan);
assert previously_unregistered;
}
UnregisterWeakTask(task) => {
match shutdown_map.pop(&task) {
Some(shutdown_chan) => {
// Oneshot pipes must send, even though
// nobody will receive this
shutdown_chan.send(());
}
None => fail
}
}
Shutdown => break
}
}
do shutdown_map.consume |_, shutdown_chan| {
// Weak task may have already exited
shutdown_chan.send(());
}
}
extern {
unsafe fn rust_inc_weak_task_count();
unsafe fn rust_dec_weak_task_count();
}
#[test]
fn test_simple() {
let (port, chan) = stream();
do spawn {
unsafe {
do weaken_task |_signal| {
}
}
chan.send(());
}
port.recv();
}
#[test]
fn test_weak_weak() {
let (port, chan) = stream();
do spawn {
unsafe {
do weaken_task |_signal| {
}
do weaken_task |_signal| {
}
}
chan.send(());
}
port.recv();
}
#[test]
fn test_wait_for_signal() {
do spawn {
unsafe {
do weaken_task |signal| {
signal.recv();
}
}
}
}
#[test]
fn test_wait_for_signal_many() {
use uint;
for uint::range(0, 100) |_| {
do spawn {
unsafe {
do weaken_task |signal| {
signal.recv();
}
}
}
}
}
#[test]
fn test_select_stream_and_oneshot() {
use pipes::select2i;
use either::{Left, Right};
let (port, chan) = stream();
let (waitport, waitchan) = stream();
do spawn {
unsafe {
do weaken_task |signal| {
match select2i(&port, &signal) {
Left(*) => (),
Right(*) => fail
}
}
}
waitchan.send(());
}
chan.send(());
waitport.recv();
}

View File

@ -17,7 +17,7 @@ use io;
use io::ReaderUtil;
use libc;
use libc::{pid_t, c_void, c_int};
use oldcomm;
use pipes::{stream, SharedChan};
use option::{Some, None};
use os;
use prelude::*;
@ -336,22 +336,23 @@ pub fn program_output(prog: &str, args: &[~str]) ->
// in parallel so we don't deadlock while blocking on one
// or the other. FIXME (#2625): Surely there's a much more
// clever way to do this.
let p = oldcomm::Port();
let ch = oldcomm::Chan(&p);
let (p, ch) = stream();
let ch = SharedChan(ch);
let ch_clone = ch.clone();
do task::spawn_sched(task::SingleThreaded) {
let errput = readclose(pipe_err.in);
oldcomm::send(ch, (2, move errput));
ch.send((2, move errput));
};
do task::spawn_sched(task::SingleThreaded) {
let output = readclose(pipe_out.in);
oldcomm::send(ch, (1, move output));
ch_clone.send((1, move output));
};
let status = run::waitpid(pid);
let mut errs = ~"";
let mut outs = ~"";
let mut count = 2;
while count > 0 {
let stream = oldcomm::recv(p);
let stream = p.recv();
match stream {
(1, copy s) => {
outs = move s;

View File

@ -43,16 +43,15 @@ use cmp;
use cmp::Eq;
use iter;
use libc;
use oldcomm;
use option;
use result::Result;
use pipes::{stream, Chan, Port};
use pipes::{stream, Chan, Port, SharedChan};
use pipes;
use prelude::*;
use ptr;
use result;
use task::local_data_priv::{local_get, local_set};
use task::rt::{task_id, rust_task};
use task::rt::{task_id, sched_id, rust_task};
use task;
use util;
use util::replace;
@ -62,6 +61,12 @@ pub mod local_data;
pub mod rt;
pub mod spawn;
/// A handle to a scheduler
#[deriving_eq]
pub enum Scheduler {
SchedulerHandle(sched_id)
}
/// A handle to a task
#[deriving_eq]
pub enum Task {
@ -95,7 +100,21 @@ impl TaskResult : Eq {
}
/// Scheduler modes
#[deriving_eq]
pub enum SchedMode {
/// Run task on the default scheduler
DefaultScheduler,
/// Run task on the current scheduler
CurrentScheduler,
/// Run task on a specific scheduler
ExistingScheduler(Scheduler),
/**
* Tasks are scheduled on the main OS thread
*
* The main OS thread is the thread used to launch the runtime which,
* in most cases, is the process's initial thread as created by the OS.
*/
PlatformThread,
/// All tasks run in the same OS thread
SingleThreaded,
/// Tasks are distributed among available CPUs
@ -104,53 +123,6 @@ pub enum SchedMode {
ThreadPerTask,
/// Tasks are distributed among a fixed number of OS threads
ManualThreads(uint),
/**
* Tasks are scheduled on the main OS thread
*
* The main OS thread is the thread used to launch the runtime which,
* in most cases, is the process's initial thread as created by the OS.
*/
PlatformThread
}
impl SchedMode : cmp::Eq {
pure fn eq(&self, other: &SchedMode) -> bool {
match (*self) {
SingleThreaded => {
match (*other) {
SingleThreaded => true,
_ => false
}
}
ThreadPerCore => {
match (*other) {
ThreadPerCore => true,
_ => false
}
}
ThreadPerTask => {
match (*other) {
ThreadPerTask => true,
_ => false
}
}
ManualThreads(e0a) => {
match (*other) {
ManualThreads(e0b) => e0a == e0b,
_ => false
}
}
PlatformThread => {
match (*other) {
PlatformThread => true,
_ => false
}
}
}
}
pure fn ne(&self, other: &SchedMode) -> bool {
!(*self).eq(other)
}
}
/**
@ -204,7 +176,7 @@ pub struct TaskOpts {
linked: bool,
supervised: bool,
mut notify_chan: Option<Chan<TaskResult>>,
sched: Option<SchedOpts>,
sched: SchedOpts
}
/**
@ -369,11 +341,8 @@ impl TaskBuilder {
opts: TaskOpts {
linked: self.opts.linked,
supervised: self.opts.supervised,
notify_chan: notify_chan,
sched: Some(SchedOpts {
mode: mode,
foreign_stack_size: None,
})
notify_chan: move notify_chan,
sched: SchedOpts { mode: mode, foreign_stack_size: None}
},
can_not_copy: None,
.. self.consume()
@ -457,18 +426,17 @@ impl TaskBuilder {
* Fails if a future_result was already set for this task.
*/
fn try<T: Owned>(f: fn~() -> T) -> Result<T,()> {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream::<T>();
let mut result = None;
let fr_task_builder = self.future_result(|+r| {
result = Some(move r);
});
do fr_task_builder.spawn |move f| {
oldcomm::send(ch, f());
do fr_task_builder.spawn |move f, move ch| {
ch.send(f());
}
match option::unwrap(move result).recv() {
Success => result::Ok(oldcomm::recv(po)),
Success => result::Ok(po.recv()),
Failure => result::Err(())
}
}
@ -489,7 +457,10 @@ pub fn default_task_opts() -> TaskOpts {
linked: true,
supervised: false,
notify_chan: None,
sched: None
sched: SchedOpts {
mode: DefaultScheduler,
foreign_stack_size: None
}
}
}
@ -542,10 +513,9 @@ pub fn spawn_with<A:Owned>(arg: A, f: fn~(v: A)) {
pub fn spawn_sched(mode: SchedMode, f: fn~()) {
/*!
* Creates a new scheduler and executes a task on it
*
* Tasks subsequently spawned by that task will also execute on
* the new scheduler. When there are no more tasks to execute the
* Creates a new task on a new or existing scheduler
* When there are no more tasks to execute the
* scheduler terminates.
*
* # Failure
@ -599,6 +569,10 @@ pub fn get_task() -> Task {
}
}
pub fn get_scheduler() -> Scheduler {
SchedulerHandle(unsafe { rt::rust_get_sched_id() })
}
/**
* Temporarily make the task unkillable
*
@ -711,17 +685,18 @@ fn test_cant_dup_task_builder() {
#[test] #[ignore(cfg(windows))]
fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream();
let ch = SharedChan(ch);
do spawn_unlinked {
let ch = ch.clone();
do spawn_unlinked {
// Give middle task a chance to fail-but-not-kill-us.
for iter::repeat(16) { task::yield(); }
oldcomm::send(ch, ()); // If killed first, grandparent hangs.
ch.send(()); // If killed first, grandparent hangs.
}
fail; // Shouldn't kill either (grand)parent or (grand)child.
}
oldcomm::recv(po);
po.recv();
}
#[test] #[ignore(cfg(windows))]
fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails
@ -741,8 +716,7 @@ fn test_spawn_unlinked_sup_fail_down() {
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
let po = oldcomm::Port::<()>();
let _ch = oldcomm::Chan(&po);
let (po, _ch) = stream::<()>();
// Unidirectional "parenting" shouldn't override bidirectional linked.
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
@ -760,7 +734,7 @@ fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
.. b0
};
do b1.spawn { fail; }
oldcomm::recv(po); // We should get punted awake
po.recv(); // We should get punted awake
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
@ -784,11 +758,10 @@ fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails
let po = oldcomm::Port::<()>();
let _ch = oldcomm::Chan(&po);
let (po, _ch) = stream::<()>();
// Default options are to spawn linked & unsupervised.
do spawn { fail; }
oldcomm::recv(po); // We should get punted awake
po.recv(); // We should get punted awake
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
@ -856,27 +829,25 @@ fn test_spawn_linked_sup_propagate_sibling() {
#[test]
fn test_run_basic() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream::<()>();
do task().spawn {
oldcomm::send(ch, ());
ch.send(());
}
oldcomm::recv(po);
po.recv();
}
#[test]
fn test_add_wrapper() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream::<()>();
let b0 = task();
let b1 = do b0.add_wrapper |body| {
fn~(move body) {
body();
oldcomm::send(ch, ());
ch.send(());
}
};
do b1.spawn { }
oldcomm::recv(po);
po.recv();
}
#[test]
@ -929,52 +900,46 @@ fn test_spawn_sched_no_threads() {
#[test]
fn test_spawn_sched() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream::<()>();
let ch = SharedChan(ch);
fn f(i: int, ch: oldcomm::Chan<()>) {
unsafe {
let parent_sched_id = rt::rust_get_sched_id();
fn f(i: int, ch: SharedChan<()>) {
let parent_sched_id = unsafe { rt::rust_get_sched_id() };
do spawn_sched(SingleThreaded) {
unsafe {
let child_sched_id = rt::rust_get_sched_id();
let child_sched_id = unsafe { rt::rust_get_sched_id() };
assert parent_sched_id != child_sched_id;
if (i == 0) {
oldcomm::send(ch, ());
ch.send(());
} else {
f(i - 1, ch);
}
f(i - 1, ch.clone());
}
};
}
}
f(10, ch);
oldcomm::recv(po);
po.recv();
}
#[test]
fn test_spawn_sched_childs_on_same_sched() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
fn test_spawn_sched_childs_on_default_sched() {
let (po, ch) = stream();
// Assuming tests run on the default scheduler
let default_id = unsafe { rt::rust_get_sched_id() };
do spawn_sched(SingleThreaded) {
unsafe {
let parent_sched_id = rt::rust_get_sched_id();
let parent_sched_id = unsafe { rt::rust_get_sched_id() };
do spawn {
unsafe {
let child_sched_id = rt::rust_get_sched_id();
// This should be on the same scheduler
assert parent_sched_id == child_sched_id;
oldcomm::send(ch, ());
}
let child_sched_id = unsafe { rt::rust_get_sched_id() };
assert parent_sched_id != child_sched_id;
assert child_sched_id == default_id;
ch.send(());
};
}
};
oldcomm::recv(po);
po.recv();
}
#[nolink]
@ -996,10 +961,8 @@ fn test_spawn_sched_blocking() {
// without affecting other schedulers
for iter::repeat(20u) {
let start_po = oldcomm::Port();
let start_ch = oldcomm::Chan(&start_po);
let fin_po = oldcomm::Port();
let fin_ch = oldcomm::Chan(&fin_po);
let (start_po, start_ch) = stream();
let (fin_po, fin_ch) = stream();
let lock = testrt::rust_dbg_lock_create();
@ -1007,44 +970,42 @@ fn test_spawn_sched_blocking() {
unsafe {
testrt::rust_dbg_lock_lock(lock);
oldcomm::send(start_ch, ());
start_ch.send(());
// Block the scheduler thread
testrt::rust_dbg_lock_wait(lock);
testrt::rust_dbg_lock_unlock(lock);
oldcomm::send(fin_ch, ());
fin_ch.send(());
}
};
// Wait until the other task has its lock
oldcomm::recv(start_po);
start_po.recv();
fn pingpong(po: oldcomm::Port<int>, ch: oldcomm::Chan<int>) {
fn pingpong(po: &Port<int>, ch: &Chan<int>) {
let mut val = 20;
while val > 0 {
val = oldcomm::recv(po);
oldcomm::send(ch, val - 1);
val = po.recv();
ch.send(val - 1);
}
}
let setup_po = oldcomm::Port();
let setup_ch = oldcomm::Chan(&setup_po);
let parent_po = oldcomm::Port();
let parent_ch = oldcomm::Chan(&parent_po);
let (setup_po, setup_ch) = stream();
let (parent_po, parent_ch) = stream();
do spawn {
let child_po = oldcomm::Port();
oldcomm::send(setup_ch, oldcomm::Chan(&child_po));
pingpong(child_po, parent_ch);
let (child_po, child_ch) = stream();
setup_ch.send(child_ch);
pingpong(&child_po, &parent_ch);
};
let child_ch = oldcomm::recv(setup_po);
oldcomm::send(child_ch, 20);
pingpong(parent_po, child_ch);
let child_ch = setup_po.recv();
child_ch.send(20);
pingpong(&parent_po, &child_ch);
testrt::rust_dbg_lock_lock(lock);
testrt::rust_dbg_lock_signal(lock);
testrt::rust_dbg_lock_unlock(lock);
oldcomm::recv(fin_po);
fin_po.recv();
testrt::rust_dbg_lock_destroy(lock);
}
}
@ -1052,18 +1013,17 @@ fn test_spawn_sched_blocking() {
#[cfg(test)]
fn avoid_copying_the_body(spawnfn: fn(v: fn~())) {
let p = oldcomm::Port::<uint>();
let ch = oldcomm::Chan(&p);
let (p, ch) = stream::<uint>();
let x = ~1;
let x_in_parent = ptr::addr_of(&(*x)) as uint;
do spawnfn |move x| {
let x_in_child = ptr::addr_of(&(*x)) as uint;
oldcomm::send(ch, x_in_child);
ch.send(x_in_child);
}
let x_in_child = oldcomm::recv(p);
let x_in_child = p.recv();
assert x_in_parent == x_in_child;
}
@ -1101,20 +1061,18 @@ fn test_avoid_copying_the_body_unlinked() {
#[test]
fn test_platform_thread() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream();
do task().sched_mode(PlatformThread).spawn {
oldcomm::send(ch, ());
ch.send(());
}
oldcomm::recv(po);
po.recv();
}
#[test]
#[ignore(cfg(windows))]
#[should_fail]
fn test_unkillable() {
let po = oldcomm::Port();
let ch = po.chan();
let (po, ch) = stream();
// We want to do this after failing
do spawn_unlinked {
@ -1242,7 +1200,7 @@ fn test_spawn_thread_on_demand() {
let (port2, chan2) = pipes::stream();
do spawn() |move chan2| {
do spawn_sched(CurrentScheduler) |move chan2| {
chan2.send(());
}

View File

@ -74,9 +74,8 @@
#[warn(deprecated_mode)];
use cast;
use oldcomm;
use option;
use pipes::{Chan, Port};
use pipes::{stream, Chan, Port};
use pipes;
use prelude::*;
use private;
@ -88,6 +87,7 @@ use task::rt::rust_closure;
use task::rt;
use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded};
use task::{Success, TaskOpts, TaskResult, ThreadPerCore, ThreadPerTask};
use task::{ExistingScheduler, SchedulerHandle};
use task::{default_task_opts, unkillable};
use uint;
use util;
@ -536,9 +536,9 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
// Agh. Get move-mode items into the closure. FIXME (#2829)
let (child_tg, ancestors, f) = option::swap_unwrap(child_data);
// Create child task.
let new_task = match opts.sched {
None => rt::new_task(),
Some(sched_opts) => new_task_in_new_sched(sched_opts)
let new_task = match opts.sched.mode {
DefaultScheduler => rt::new_task(),
_ => new_task_in_sched(opts.sched)
};
assert !new_task.is_null();
// Getting killed after here would leak the task.
@ -642,15 +642,18 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
}
}
fn new_task_in_new_sched(opts: SchedOpts) -> *rust_task {
unsafe {
fn new_task_in_sched(opts: SchedOpts) -> *rust_task {
if opts.foreign_stack_size != None {
fail ~"foreign_stack_size scheduler option unimplemented";
}
let num_threads = match opts.mode {
DefaultScheduler
| CurrentScheduler
| ExistingScheduler(*)
| PlatformThread => 0u, /* Won't be used */
SingleThreaded => 1u,
ThreadPerCore => rt::rust_num_threads(),
ThreadPerCore => unsafe { rt::rust_num_threads() },
ThreadPerTask => {
fail ~"ThreadPerTask scheduling mode unimplemented"
}
@ -660,13 +663,14 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
}
threads
}
PlatformThread => 0u /* Won't be used */
};
let sched_id = if opts.mode != PlatformThread {
rt::rust_new_sched(num_threads)
} else {
rt::rust_osmain_sched_id()
unsafe {
let sched_id = match opts.mode {
CurrentScheduler => rt::rust_get_sched_id(),
ExistingScheduler(SchedulerHandle(id)) => id,
PlatformThread => rt::rust_osmain_sched_id(),
_ => rt::rust_new_sched(num_threads)
};
rt::rust_new_task_in_sched(sched_id)
}
@ -675,12 +679,11 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
#[test]
fn test_spawn_raw_simple() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream();
do spawn_raw(default_task_opts()) {
oldcomm::send(ch, ());
ch.send(());
}
oldcomm::recv(po);
po.recv();
}
#[test]

View File

@ -792,7 +792,6 @@ mod test {
let (finish_port, finish_chan) = pipes::stream();
let addr = ip::v4::parse_addr("127.0.0.1");
let iotask = uv::global_loop::get();
let begin_connect_chan = Cell(move begin_connect_chan);
let accept_chan = Cell(move accept_chan);
@ -800,6 +799,7 @@ mod test {
// The server task
do task::spawn |copy addr, move begin_connect_chan,
move accept_chan| {
let iotask = &uv::global_loop::get();
let begin_connect_chan = begin_connect_chan.take();
let accept_chan = accept_chan.take();
let listen_res = do tcp::listen(
@ -831,6 +831,7 @@ mod test {
begin_connect_port.recv();
debug!("connecting");
let iotask = &uv::global_loop::get();
let connect_result = tcp::connect(copy addr, port, iotask);
assert connect_result.is_ok();
let sock = result::unwrap(move connect_result);

View File

@ -114,7 +114,7 @@ enum IpGetAddrErr {
* a vector of `ip_addr` results, in the case of success, or an error
* object in the case of failure
*/
pub fn get_addr(node: &str, iotask: iotask)
pub fn get_addr(node: &str, iotask: &iotask)
-> result::Result<~[IpAddr], IpGetAddrErr> {
do oldcomm::listen |output_ch| {
do str::as_buf(node) |node_ptr, len| {
@ -419,7 +419,7 @@ mod test {
#[ignore(reason = "valgrind says it's leaky")]
fn test_ip_get_addr() {
let localhost_name = ~"localhost";
let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
let ga_result = get_addr(localhost_name, iotask);
if result::is_err(&ga_result) {
fail ~"got err result from net::ip::get_addr();"
@ -445,7 +445,7 @@ mod test {
#[ignore(reason = "valgrind says it's leaky")]
fn test_ip_get_addr_bad_input() {
let localhost_name = ~"sjkl234m,./sdf";
let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
let ga_result = get_addr(localhost_name, iotask);
assert result::is_err(&ga_result);
}

View File

@ -143,7 +143,7 @@ pub enum TcpConnectErrData {
* `net::tcp::tcp_connect_err_data` instance will be returned
*/
pub fn connect(input_ip: ip::IpAddr, port: uint,
iotask: IoTask)
iotask: &IoTask)
-> result::Result<TcpSocket, TcpConnectErrData> {
unsafe {
let result_po = oldcomm::Port::<ConnAttempt>();
@ -166,7 +166,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
ip::Ipv4(_) => { false }
ip::Ipv6(_) => { true }
},
iotask: iotask
iotask: iotask.clone()
};
let socket_data_ptr = ptr::addr_of(&(*socket_data));
log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
@ -188,13 +188,16 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
let addr_str = ip::format_addr(&input_ip);
let connect_result = match input_ip {
ip::Ipv4(ref addr) => {
// have to "recreate" the sockaddr_in/6
// since the ip_addr discards the port
// info.. should probably add an additional
// rust type that actually is closer to
// what the libuv API expects (ip str + port num)
// have to "recreate" the
// sockaddr_in/6 since the ip_addr
// discards the port info.. should
// probably add an additional rust
// type that actually is closer to
// what the libuv API expects (ip str
// + port num)
log(debug, fmt!("addr: %?", addr));
let in_addr = uv::ll::ip4_addr(addr_str, port as int);
let in_addr = uv::ll::ip4_addr(addr_str,
port as int);
uv::ll::tcp_connect(
connect_req_ptr,
stream_handle_ptr,
@ -203,7 +206,8 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
}
ip::Ipv6(ref addr) => {
log(debug, fmt!("addr: %?", addr));
let in_addr = uv::ll::ip6_addr(addr_str, port as int);
let in_addr = uv::ll::ip6_addr(addr_str,
port as int);
uv::ll::tcp_connect6(
connect_req_ptr,
stream_handle_ptr,
@ -216,24 +220,29 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
log(debug, ~"tcp_connect successful");
// reusable data that we'll have for the
// duration..
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
uv::ll::set_data_for_uv_handle(
stream_handle_ptr,
socket_data_ptr as
*libc::c_void);
// just so the connect_cb can send the
// outcome..
uv::ll::set_data_for_req(connect_req_ptr,
conn_data_ptr);
log(debug, ~"leaving tcp_connect interact cb...");
log(debug,
~"leaving tcp_connect interact cb...");
// let tcp_connect_on_connect_cb send on
// the result_ch, now..
}
_ => {
// immediate connect failure.. probably a garbage
// ip or somesuch
let err_data = uv::ll::get_last_err_data(loop_ptr);
// immediate connect
// failure.. probably a garbage ip or
// somesuch
let err_data =
uv::ll::get_last_err_data(loop_ptr);
oldcomm::send((*conn_data_ptr).result_ch,
ConnFailure(err_data));
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
uv::ll::set_data_for_uv_handle(
stream_handle_ptr,
conn_data_ptr);
uv::ll::close(stream_handle_ptr,
stream_error_close_cb);
@ -248,7 +257,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
}
}
}
};
}
match oldcomm::recv(result_po) {
ConnSuccess => {
log(debug, ~"tcp::connect - received success on result_po");
@ -262,7 +271,8 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
as *libc::c_void);
let tcp_conn_err = match err_data.err_name {
~"ECONNREFUSED" => ConnectionRefused,
_ => GenericConnectErr(err_data.err_name, err_data.err_msg)
_ => GenericConnectErr(err_data.err_name,
err_data.err_msg)
};
result::Err(tcp_conn_err)
}
@ -506,24 +516,26 @@ fn read_future(sock: &TcpSocket, timeout_msecs: uint)
pub fn accept(new_conn: TcpNewConnection)
-> result::Result<TcpSocket, TcpErrData> {
unsafe {
match new_conn {
match new_conn{
NewTcpConn(server_handle_ptr) => {
let server_data_ptr = uv::ll::get_data_for_uv_handle(
server_handle_ptr) as *TcpListenFcData;
let reader_po = oldcomm::Port();
let iotask = (*server_data_ptr).iotask;
let iotask = &(*server_data_ptr).iotask;
let stream_handle_ptr = malloc_uv_tcp_t();
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
let client_socket_data = @TcpSocketData {
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) =
uv::ll::tcp_t();
let client_socket_data: @TcpSocketData = @TcpSocketData {
reader_po: reader_po,
reader_ch: oldcomm::Chan(&reader_po),
stream_handle_ptr : stream_handle_ptr,
connect_req : uv::ll::connect_t(),
write_req : uv::ll::write_t(),
ipv6: (*server_data_ptr).ipv6,
iotask : iotask
iotask : iotask.clone()
};
let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data));
let client_socket_data_ptr = ptr::addr_of(
&(*client_socket_data));
let client_stream_handle_ptr =
(*client_socket_data_ptr).stream_handle_ptr;
@ -541,13 +553,17 @@ pub fn accept(new_conn: TcpNewConnection)
server_handle_ptr);
match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
0i32 => {
log(debug, ~"uv_tcp_init successful for client stream");
log(debug, ~"uv_tcp_init successful for \
client stream");
match uv::ll::accept(
server_handle_ptr as *libc::c_void,
client_stream_handle_ptr as *libc::c_void) {
0i32 => {
log(debug, ~"successfully accepted client connection");
uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
log(debug,
~"successfully accepted client \
connection");
uv::ll::set_data_for_uv_handle(
client_stream_handle_ptr,
client_socket_data_ptr
as *libc::c_void);
oldcomm::send(result_ch, None);
@ -555,14 +571,16 @@ pub fn accept(new_conn: TcpNewConnection)
_ => {
log(debug, ~"failed to accept client conn");
oldcomm::send(result_ch, Some(
uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
uv::ll::get_last_err_data(
loop_ptr).to_tcp_err()));
}
}
}
_ => {
log(debug, ~"failed to init client stream");
log(debug, ~"failed to accept client stream");
oldcomm::send(result_ch, Some(
uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
uv::ll::get_last_err_data(
loop_ptr).to_tcp_err()));
}
}
// UNSAFE LIBUV INTERACTION END
@ -604,30 +622,27 @@ pub fn accept(new_conn: TcpNewConnection)
* of listen exiting because of an error
*/
pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
iotask: IoTask,
iotask: &IoTask,
on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
new_connect_cb: fn~(TcpNewConnection,
oldcomm::Chan<Option<TcpErrData>>))
-> result::Result<(), TcpListenErrData> {
unsafe {
do listen_common(move host_ip, port, backlog, iotask,
move on_establish_cb)
// on_connect_cb
|move new_connect_cb, handle| {
unsafe {
let server_data_ptr =
uv::ll::get_data_for_uv_handle(handle)
let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
as *TcpListenFcData;
let new_conn = NewTcpConn(handle);
let kill_ch = (*server_data_ptr).kill_ch;
new_connect_cb(new_conn, kill_ch);
}
}
}
}
fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
iotask: IoTask,
iotask: &IoTask,
on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
on_connect_cb: fn~(*uv::ll::uv_tcp_t))
-> result::Result<(), TcpListenErrData> {
@ -637,12 +652,12 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
let kill_ch = oldcomm::Chan(&kill_po);
let server_stream = uv::ll::tcp_t();
let server_stream_ptr = ptr::addr_of(&server_stream);
let server_data = {
let server_data: TcpListenFcData = TcpListenFcData {
server_stream_ptr: server_stream_ptr,
stream_closed_ch: oldcomm::Chan(&stream_closed_po),
kill_ch: kill_ch,
on_connect_cb: move on_connect_cb,
iotask: iotask,
iotask: iotask.clone(),
ipv6: match &host_ip {
&ip::Ipv4(_) => { false }
&ip::Ipv6(_) => { true }
@ -670,14 +685,16 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
let bind_result = match loc_ip {
ip::Ipv4(ref addr) => {
log(debug, fmt!("addr: %?", addr));
let in_addr = uv::ll::ip4_addr(addr_str,
let in_addr = uv::ll::ip4_addr(
addr_str,
port as int);
uv::ll::tcp_bind(server_stream_ptr,
ptr::addr_of(&in_addr))
}
ip::Ipv6(ref addr) => {
log(debug, fmt!("addr: %?", addr));
let in_addr = uv::ll::ip6_addr(addr_str,
let in_addr = uv::ll::ip6_addr(
addr_str,
port as int);
uv::ll::tcp_bind6(server_stream_ptr,
ptr::addr_of(&in_addr))
@ -685,12 +702,24 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
};
match bind_result {
0i32 => {
match uv::ll::listen(server_stream_ptr,
match uv::ll::listen(
server_stream_ptr,
backlog as libc::c_int,
tcp_lfc_on_connection_cb) {
0i32 => oldcomm::send(setup_ch, None),
_ => {
log(debug, ~"failure to uv_listen()");
log(debug,
~"failure to uv_tcp_init");
let err_data =
uv::ll::get_last_err_data(
loop_ptr);
oldcomm::send(setup_ch,
Some(err_data));
}
}
}
_ => {
log(debug, ~"failure to uv_tcp_bind");
let err_data = uv::ll::get_last_err_data(
loop_ptr);
oldcomm::send(setup_ch, Some(err_data));
@ -705,13 +734,6 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
}
}
}
_ => {
log(debug, ~"failure to uv_tcp_init");
let err_data = uv::ll::get_last_err_data(loop_ptr);
oldcomm::send(setup_ch, Some(err_data));
}
}
};
}
setup_ch.recv()
};
@ -740,7 +762,8 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
log(debug, fmt!("Got '%s' '%s' libuv error",
err_data.err_name, err_data.err_msg));
result::Err(
GenericListenErr(err_data.err_name, err_data.err_msg))
GenericListenErr(err_data.err_name,
err_data.err_msg))
}
}
}
@ -770,6 +793,7 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
}
}
/**
* Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.
*
@ -936,7 +960,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) {
};
let close_data_ptr = ptr::addr_of(&close_data);
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
do iotask::interact((*socket_data).iotask) |loop_ptr| {
do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
unsafe {
log(debug,
fmt!("interact dtor for tcp_socket stream %? loop %?",
@ -962,7 +986,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
use timer;
log(debug, ~"starting tcp::read");
let iotask = (*socket_data).iotask;
let iotask = &(*socket_data).iotask;
let rs_result = read_start_common_impl(socket_data);
if result::is_err(&rs_result) {
let err_data = result::get_err(&rs_result);
@ -1004,11 +1028,11 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let stop_po = oldcomm::Port::<Option<TcpErrData>>();
let stop_ch = oldcomm::Chan(&stop_po);
do iotask::interact((*socket_data).iotask) |loop_ptr| {
do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
unsafe {
log(debug, ~"in interact cb for tcp::read_stop");
match uv::ll::read_stop(stream_handle_ptr as
*uv::ll::uv_stream_t) {
match uv::ll::read_stop(stream_handle_ptr
as *uv::ll::uv_stream_t) {
0i32 => {
log(debug, ~"successfully called uv_read_stop");
oldcomm::send(stop_ch, None);
@ -1020,8 +1044,7 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
}
}
}
};
}
match oldcomm::recv(stop_po) {
Some(move err_data) => Err(err_data),
None => Ok(())
@ -1038,12 +1061,12 @@ fn read_start_common_impl(socket_data: *TcpSocketData)
let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>();
let start_ch = oldcomm::Chan(&start_po);
log(debug, ~"in tcp::read_start before interact loop");
do iotask::interact((*socket_data).iotask) |loop_ptr| {
do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
unsafe {
log(debug,
fmt!("in tcp::read_start interact cb %?", loop_ptr));
match uv::ll::read_start(stream_handle_ptr as
*uv::ll::uv_stream_t,
log(debug, fmt!("in tcp::read_start interact cb %?",
loop_ptr));
match uv::ll::read_start(stream_handle_ptr
as *uv::ll::uv_stream_t,
on_alloc_cb,
on_tcp_read_cb) {
0i32 => {
@ -1057,7 +1080,7 @@ fn read_start_common_impl(socket_data: *TcpSocketData)
}
}
}
};
}
match oldcomm::recv(start_po) {
Some(ref err_data) => result::Err(err_data.to_tcp_err()),
None => result::Ok((*socket_data).reader_po)
@ -1084,7 +1107,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
result_ch: oldcomm::Chan(&result_po)
};
let write_data_ptr = ptr::addr_of(&write_data);
do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| {
do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| {
unsafe {
log(debug, fmt!("in interact cb for tcp::write %?",
loop_ptr));
@ -1094,7 +1117,8 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
tcp_write_complete_cb) {
0i32 => {
log(debug, ~"uv_write() invoked successfully");
uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
uv::ll::set_data_for_req(write_req_ptr,
write_data_ptr);
}
_ => {
log(debug, ~"error invoking uv_write()");
@ -1104,7 +1128,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
}
}
}
};
}
// FIXME (#2656): Instead of passing unsafe pointers to local data,
// and waiting here for the write to complete, we should transfer
// ownership of everything to the I/O task and let it deal with the
@ -1473,7 +1497,7 @@ pub mod test {
}
}
pub fn impl_gl_tcp_ipv4_server_and_client() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8888u;
let expected_req = ~"ping";
@ -1485,6 +1509,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
@ -1493,7 +1518,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
@ -1519,7 +1544,7 @@ pub mod test {
assert str::contains(actual_resp, expected_resp);
}
pub fn impl_gl_tcp_ipv4_get_peer_addr() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8887u;
let expected_resp = ~"pong";
@ -1530,6 +1555,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
@ -1538,7 +1564,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
@ -1549,10 +1575,11 @@ pub mod test {
let server_ip_addr = ip::v4::parse_addr(server_ip);
let iotask = uv::global_loop::get();
let connect_result = connect(move server_ip_addr, server_port,
iotask);
&iotask);
let sock = result::unwrap(move connect_result);
debug!("testing peer address");
// This is what we are actually testing!
assert net::ip::format_addr(&sock.get_peer_addr()) ==
~"127.0.0.1";
@ -1561,12 +1588,14 @@ pub mod test {
// Fulfill the protocol the test server expects
let resp_bytes = str::to_bytes(~"ping");
tcp_write_single(&sock, resp_bytes);
debug!("message sent");
let read_result = sock.read(0u);
client_ch.send(str::from_bytes(read_result.get()));
debug!("result read");
};
}
pub fn impl_gl_tcp_ipv4_client_error_connection_refused() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8889u;
let expected_req = ~"ping";
@ -1586,7 +1615,7 @@ pub mod test {
}
}
pub fn impl_gl_tcp_ipv4_server_address_in_use() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8890u;
let expected_req = ~"ping";
@ -1598,6 +1627,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
@ -1606,7 +1636,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
@ -1637,7 +1667,7 @@ pub mod test {
}
}
pub fn impl_gl_tcp_ipv4_server_access_denied() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 80u;
// this one should fail..
@ -1657,7 +1687,7 @@ pub mod test {
}
pub fn impl_gl_tcp_ipv4_server_client_reader_writer() {
let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8891u;
let expected_req = ~"ping";
@ -1669,6 +1699,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let iotask_clone = iotask.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
@ -1677,7 +1708,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
iotask)
&iotask_clone)
};
server_result_ch.send(actual_req);
};
@ -1708,7 +1739,7 @@ pub mod test {
pub fn impl_tcp_socket_impl_reader_handles_eof() {
use core::io::{Reader,ReaderUtil};
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 10041u;
let expected_req = ~"GET /";
@ -1720,6 +1751,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
@ -1728,7 +1760,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
@ -1768,7 +1800,7 @@ pub mod test {
fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str,
server_ch: oldcomm::Chan<~str>,
cont_ch: oldcomm::Chan<()>,
iotask: IoTask) -> ~str {
iotask: &IoTask) -> ~str {
let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result = listen(move server_ip_addr, server_port, 128,
iotask,
@ -1855,7 +1887,7 @@ pub mod test {
}
fn run_tcp_test_server_fail(server_ip: &str, server_port: uint,
iotask: IoTask) -> TcpListenErrData {
iotask: &IoTask) -> TcpListenErrData {
let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result = listen(move server_ip_addr, server_port, 128,
iotask,
@ -1879,7 +1911,7 @@ pub mod test {
fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str,
client_ch: oldcomm::Chan<~str>,
iotask: IoTask) -> result::Result<~str,
iotask: &IoTask) -> result::Result<~str,
TcpConnectErrData> {
let server_ip_addr = ip::v4::parse_addr(server_ip);

View File

@ -39,7 +39,7 @@ use core;
* * ch - a channel of type T to send a `val` on
* * val - a value of type T to send over the provided `ch`
*/
pub fn delayed_send<T: Owned>(iotask: IoTask,
pub fn delayed_send<T: Owned>(iotask: &IoTask,
msecs: uint,
ch: oldcomm::Chan<T>,
val: T) {
@ -92,7 +92,7 @@ pub fn delayed_send<T: Owned>(iotask: IoTask,
* * `iotask` - a `uv::iotask` that the tcp request will run on
* * msecs - an amount of time, in milliseconds, for the current task to block
*/
pub fn sleep(iotask: IoTask, msecs: uint) {
pub fn sleep(iotask: &IoTask, msecs: uint) {
let exit_po = oldcomm::Port::<()>();
let exit_ch = oldcomm::Chan(&exit_po);
delayed_send(iotask, msecs, exit_ch, ());
@ -119,7 +119,7 @@ pub fn sleep(iotask: IoTask, msecs: uint) {
* on the provided port in the allotted timeout period, then the result will
* be a `Some(T)`. If not, then `None` will be returned.
*/
pub fn recv_timeout<T: Copy Owned>(iotask: IoTask,
pub fn recv_timeout<T: Copy Owned>(iotask: &IoTask,
msecs: uint,
wait_po: oldcomm::Port<T>)
-> Option<T> {
@ -183,13 +183,13 @@ mod test {
#[test]
fn test_gl_timer_simple_sleep_test() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
sleep(hl_loop, 1u);
}
#[test]
fn test_gl_timer_sleep_stress1() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
for iter::repeat(50u) {
sleep(hl_loop, 1u);
}
@ -199,7 +199,7 @@ mod test {
fn test_gl_timer_sleep_stress2() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let repeat = 20u;
let spec = {
@ -214,11 +214,12 @@ mod test {
for spec.each |spec| {
let (times, maxms) = *spec;
let hl_loop_clone = hl_loop.clone();
do task::spawn {
use rand::*;
let rng = Rng();
for iter::repeat(times) {
sleep(hl_loop, rng.next() as uint % maxms);
sleep(&hl_loop_clone, rng.next() as uint % maxms);
}
oldcomm::send(ch, ());
}
@ -277,12 +278,12 @@ mod test {
let expected = rand::Rng().gen_str(16u);
let test_po = oldcomm::Port::<~str>();
let test_ch = oldcomm::Chan(&test_po);
let hl_loop_clone = hl_loop.clone();
do task::spawn() {
delayed_send(hl_loop, 50u, test_ch, expected);
delayed_send(&hl_loop_clone, 50u, test_ch, expected);
};
match recv_timeout(hl_loop, 1u, test_po) {
match recv_timeout(&hl_loop, 1u, test_po) {
None => successes += 1,
_ => failures += 1
};

View File

@ -19,16 +19,16 @@ use uv_iotask::{IoTask, spawn_iotask};
use core::either::{Left, Right};
use core::libc;
use core::oldcomm::{Port, Chan, select2, listen};
use core::private::{chan_from_global_ptr, weaken_task};
use core::pipes::{Port, Chan, SharedChan, select2i};
use core::private::global::{global_data_clone_create,
global_data_clone};
use core::private::weak_task::weaken_task;
use core::str;
use core::task::TaskBuilder;
use core::task::{task, SingleThreaded, spawn};
use core::task;
use core::vec;
extern mod rustrt {
unsafe fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t;
}
use core::clone::Clone;
use core::option::{Some, None};
/**
* Race-free helper to get access to a global task where a libuv
@ -48,69 +48,64 @@ pub fn get() -> IoTask {
#[doc(hidden)]
fn get_monitor_task_gl() -> IoTask {
unsafe {
let monitor_loop_chan_ptr =
rustrt::rust_uv_get_kernel_global_chan_ptr();
debug!("ENTERING global_loop::get() loop chan: %?",
monitor_loop_chan_ptr);
debug!("before priv::chan_from_global_ptr");
type MonChan = Chan<IoTask>;
let monitor_ch =
do chan_from_global_ptr::<MonChan>(monitor_loop_chan_ptr,
|| {
task::task().sched_mode
(task::SingleThreaded)
.unlinked()
}) |msg_po| {
struct GlobalIoTask(IoTask);
impl GlobalIoTask: Clone {
fn clone(&self) -> GlobalIoTask {
GlobalIoTask((**self).clone())
}
}
fn key(_: GlobalIoTask) { }
match unsafe { global_data_clone(key) } {
Some(GlobalIoTask(iotask)) => iotask,
None => {
let iotask: IoTask = spawn_loop();
let mut installed = false;
let final_iotask = unsafe {
do global_data_clone_create(key) {
installed = true;
~GlobalIoTask(iotask.clone())
}
};
if installed {
do task().unlinked().spawn() {
unsafe {
debug!("global monitor task starting");
// As a weak task the runtime will notify us when to exit
do weaken_task() |weak_exit_po| {
debug!("global monitor task is now weak");
let hl_loop = spawn_loop();
loop {
debug!("in outer_loop...");
match select2(weak_exit_po, msg_po) {
Left(weak_exit) => {
// all normal tasks have ended, tell the
// libuv loop to tear_down, then exit
debug!("weak_exit_po recv'd msg: %?", weak_exit);
iotask::exit(hl_loop);
break;
}
Right(fetch_ch) => {
debug!("hl_loop req recv'd: %?", fetch_ch);
fetch_ch.send(hl_loop);
}
}
}
debug!("global monitor task is leaving weakend state");
// As a weak task the runtime will notify us
// when to exit
do weaken_task |weak_exit_po| {
debug!("global monitor task is weak");
weak_exit_po.recv();
iotask::exit(&iotask);
debug!("global monitor task is unweak");
};
debug!("global monitor task exiting");
}
};
}
} else {
iotask::exit(&iotask);
}
// once we have a chan to the monitor loop, we ask it for
// the libuv loop's async handle
do listen |fetch_ch| {
monitor_ch.send(fetch_ch);
fetch_ch.recv()
match final_iotask {
GlobalIoTask(iotask) => iotask
}
}
}
}
fn spawn_loop() -> IoTask {
let builder = do task::task().add_wrapper |task_body| {
let builder = do task().add_wrapper |task_body| {
fn~(move task_body) {
// The I/O loop task also needs to be weak so it doesn't keep
// the runtime alive
unsafe {
do weaken_task |weak_exit_po| {
debug!("global libuv task is now weak %?", weak_exit_po);
do weaken_task |_| {
debug!("global libuv task is now weak");
task_body();
// We don't wait for the exit message on weak_exit_po
@ -122,6 +117,7 @@ fn spawn_loop() -> IoTask {
}
}
};
let builder = builder.unlinked();
spawn_iotask(move builder)
}
@ -135,16 +131,18 @@ mod test {
use core::iter;
use core::libc;
use core::oldcomm;
use core::ptr;
use core::task;
use core::cast::transmute;
use core::libc::c_void;
use core::pipes::{stream, SharedChan, Chan};
extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) {
unsafe {
let exit_ch_ptr = ll::get_data_for_uv_handle(
timer_ptr as *libc::c_void) as *oldcomm::Chan<bool>;
let exit_ch = *exit_ch_ptr;
oldcomm::send(exit_ch, true);
timer_ptr as *libc::c_void);
let exit_ch = transmute::<*c_void, ~Chan<bool>>(exit_ch_ptr);
exit_ch.send(true);
log(debug,
fmt!("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?",
exit_ch_ptr));
@ -155,24 +153,23 @@ mod test {
unsafe {
log(debug, ~"in simple timer cb");
ll::timer_stop(timer_ptr);
let hl_loop = get_gl();
let hl_loop = &get_gl();
do iotask::interact(hl_loop) |_loop_ptr| {
unsafe {
log(debug, ~"closing timer");
unsafe {
ll::close(timer_ptr, simple_timer_close_cb);
}
log(debug, ~"about to deref exit_ch_ptr");
log(debug, ~"after msg sent on deref'd exit_ch");
}
};
log(debug, ~"exiting simple timer cb");
}
}
fn impl_uv_hl_simple_timer(iotask: IoTask) {
fn impl_uv_hl_simple_timer(iotask: &IoTask) {
unsafe {
let exit_po = oldcomm::Port::<bool>();
let exit_ch = oldcomm::Chan(&exit_po);
let exit_ch_ptr = ptr::addr_of(&exit_ch);
let (exit_po, exit_ch) = stream::<bool>();
let exit_ch_ptr: *libc::c_void = transmute(~exit_ch);
log(debug, fmt!("EXIT_CH_PTR newly created exit_ch_ptr: %?",
exit_ch_ptr));
let timer_handle = ll::timer_t();
@ -184,20 +181,22 @@ mod test {
if(init_status == 0i32) {
ll::set_data_for_uv_handle(
timer_ptr as *libc::c_void,
exit_ch_ptr as *libc::c_void);
exit_ch_ptr);
let start_status = ll::timer_start(timer_ptr,
simple_timer_cb,
1u,
0u);
if start_status != 0 {
1u, 0u);
if(start_status == 0i32) {
}
else {
fail ~"failure on ll::timer_start()";
}
} else {
}
else {
fail ~"failure on ll::timer_init()";
}
}
};
oldcomm::recv(exit_po);
exit_po.recv();
log(debug,
~"global_loop timer test: msg recv on exit_po, done..");
}
@ -205,17 +204,15 @@ mod test {
#[test]
fn test_gl_uv_global_loop_high_level_global_timer() {
unsafe {
let hl_loop = get_gl();
let exit_po = oldcomm::Port::<()>();
let exit_ch = oldcomm::Chan(&exit_po);
let hl_loop = &get_gl();
let (exit_po, exit_ch) = stream::<()>();
task::spawn_sched(task::ManualThreads(1u), || {
let hl_loop = &get_gl();
impl_uv_hl_simple_timer(hl_loop);
oldcomm::send(exit_ch, ());
exit_ch.send(());
});
impl_uv_hl_simple_timer(hl_loop);
oldcomm::recv(exit_po);
}
exit_po.recv();
}
// keeping this test ignored until some kind of stress-test-harness
@ -223,23 +220,21 @@ mod test {
#[test]
#[ignore]
fn test_stress_gl_uv_global_loop_high_level_global_timer() {
unsafe {
let hl_loop = get_gl();
let exit_po = oldcomm::Port::<()>();
let exit_ch = oldcomm::Chan(&exit_po);
let (exit_po, exit_ch) = stream::<()>();
let exit_ch = SharedChan(exit_ch);
let cycles = 5000u;
for iter::repeat(cycles) {
let exit_ch_clone = exit_ch.clone();
task::spawn_sched(task::ManualThreads(1u), || {
let hl_loop = &get_gl();
impl_uv_hl_simple_timer(hl_loop);
oldcomm::send(exit_ch, ());
exit_ch_clone.send(());
});
};
for iter::repeat(cycles) {
oldcomm::recv(exit_po);
exit_po.recv();
};
log(debug,
~"test_stress_gl_uv_global_loop_high_level_global_timer"+
log(debug, ~"test_stress_gl_uv_global_loop_high_level_global_timer"+
~" exiting sucessfully!");
}
}
}

View File

@ -20,7 +20,7 @@ use ll = uv_ll;
use core::libc::c_void;
use core::libc;
use core::oldcomm::{Port, Chan, listen};
use core::pipes::{stream, Port, Chan, SharedChan};
use core::prelude::*;
use core::ptr::addr_of;
use core::task::TaskBuilder;
@ -30,22 +30,30 @@ use core::task;
pub enum IoTask {
IoTask_({
async_handle: *ll::uv_async_t,
op_chan: Chan<IoTaskMsg>
op_chan: SharedChan<IoTaskMsg>
})
}
impl IoTask: Clone {
fn clone(&self) -> IoTask {
IoTask_({
async_handle: self.async_handle,
op_chan: self.op_chan.clone()
})
}
}
pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask {
do listen |iotask_ch| {
let (iotask_port, iotask_chan) = stream();
do task.sched_mode(task::SingleThreaded).spawn {
debug!("entering libuv task");
run_loop(iotask_ch);
run_loop(&iotask_chan);
debug!("libuv task exiting");
};
iotask_ch.recv()
}
iotask_port.recv()
}
@ -71,7 +79,7 @@ pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask {
* module. It is not safe to send the `loop_ptr` param to this callback out
* via ports/chans.
*/
pub unsafe fn interact(iotask: IoTask,
pub unsafe fn interact(iotask: &IoTask,
cb: fn~(*c_void)) {
send_msg(iotask, Interaction(move cb));
}
@ -83,7 +91,7 @@ pub unsafe fn interact(iotask: IoTask,
* async handle and do a sanity check to make sure that all other handles are
* closed, causing a failure otherwise.
*/
pub fn exit(iotask: IoTask) {
pub fn exit(iotask: &IoTask) {
unsafe {
send_msg(iotask, TeardownLoop);
}
@ -98,8 +106,10 @@ enum IoTaskMsg {
}
/// Run the loop and begin handling messages
fn run_loop(iotask_ch: Chan<IoTask>) {
fn run_loop(iotask_ch: &Chan<IoTask>) {
unsafe {
debug!("creating loop");
let loop_ptr = ll::loop_new();
// set up the special async handle we'll use to allow multi-task
@ -110,10 +120,12 @@ fn run_loop(iotask_ch: Chan<IoTask>) {
// associate the async handle with the loop
ll::async_init(loop_ptr, async_handle, wake_up_cb);
let (msg_po, msg_ch) = stream::<IoTaskMsg>();
// initialize our loop data and store it in the loop
let data = IoTaskLoopData {
let data: IoTaskLoopData = IoTaskLoopData {
async_handle: async_handle,
msg_po: Port()
msg_po: msg_po
};
ll::set_data_for_uv_handle(async_handle, addr_of(&data));
@ -121,7 +133,7 @@ fn run_loop(iotask_ch: Chan<IoTask>) {
// while we dwell in the I/O loop
let iotask = IoTask_({
async_handle: async_handle,
op_chan: data.msg_po.chan()
op_chan: SharedChan(msg_ch)
});
iotask_ch.send(iotask);
@ -139,9 +151,10 @@ struct IoTaskLoopData {
msg_po: Port<IoTaskMsg>,
}
fn send_msg(iotask: IoTask, msg: IoTaskMsg) {
unsafe {
fn send_msg(iotask: &IoTask,
msg: IoTaskMsg) {
iotask.op_chan.send(move msg);
unsafe {
ll::async_send(iotask.async_handle);
}
}
@ -149,14 +162,15 @@ fn send_msg(iotask: IoTask, msg: IoTaskMsg) {
/// Dispatch all pending messages
extern fn wake_up_cb(async_handle: *ll::uv_async_t,
status: int) {
unsafe {
log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?",
async_handle, status));
unsafe {
let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
let data = ll::get_data_for_uv_handle(async_handle)
as *IoTaskLoopData;
let msg_po = (*data).msg_po;
let data =
ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData;
let msg_po = &(*data).msg_po;
while msg_po.peek() {
match msg_po.recv() {
@ -216,27 +230,32 @@ mod test {
}
struct AhData {
iotask: IoTask,
exit_ch: oldcomm::Chan<()>,
exit_ch: oldcomm::Chan<()>
}
fn impl_uv_iotask_async(iotask: IoTask) {
fn impl_uv_iotask_async(iotask: &IoTask) {
unsafe {
let async_handle = ll::async_t();
let ah_ptr = ptr::addr_of(&async_handle);
let exit_po = oldcomm::Port::<()>();
let exit_ch = oldcomm::Chan(&exit_po);
let ah_data = {
iotask: iotask,
let ah_data = AhData {
iotask: iotask.clone(),
exit_ch: exit_ch
};
let ah_data_ptr = ptr::addr_of(&ah_data);
let ah_data_ptr: *AhData = unsafe {
ptr::to_unsafe_ptr(&ah_data)
};
debug!("about to interact");
do interact(iotask) |loop_ptr| {
unsafe {
debug!("interacting");
ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
ll::set_data_for_uv_handle(ah_ptr,
ah_data_ptr as *libc::c_void);
ll::set_data_for_uv_handle(
ah_ptr, ah_data_ptr as *libc::c_void);
ll::async_send(ah_ptr);
}
};
debug!("waiting for async close");
oldcomm::recv(exit_po);
}
}
@ -244,13 +263,13 @@ mod test {
// this fn documents the bear minimum neccesary to roll your own
// high_level_loop
unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask {
let iotask_port = oldcomm::Port::<IoTask>();
let iotask_ch = oldcomm::Chan(&iotask_port);
let (iotask_port, iotask_ch) = stream::<IoTask>();
do task::spawn_sched(task::ManualThreads(1u)) {
run_loop(iotask_ch);
debug!("about to run a test loop");
run_loop(&iotask_ch);
exit_ch.send(());
};
return oldcomm::recv(iotask_port);
return iotask_port.recv();
}
extern fn lifetime_handle_close(handle: *libc::c_void) {
@ -270,23 +289,30 @@ mod test {
unsafe {
let exit_po = oldcomm::Port::<()>();
let exit_ch = oldcomm::Chan(&exit_po);
let iotask = spawn_test_loop(exit_ch);
let iotask = &spawn_test_loop(exit_ch);
debug!("spawned iotask");
// using this handle to manage the lifetime of the
// high_level_loop, as it will exit the first time one of the
// impl_uv_hl_async() is cleaned up with no one ref'd handles on
// the loop (Which can happen under race-condition type
// situations.. this ensures that the loop lives until, at least,
// all of the impl_uv_hl_async() runs have been called, at least.
// high_level_loop, as it will exit the first time one of
// the impl_uv_hl_async() is cleaned up with no one ref'd
// handles on the loop (Which can happen under
// race-condition type situations.. this ensures that the
// loop lives until, at least, all of the
// impl_uv_hl_async() runs have been called, at least.
let work_exit_po = oldcomm::Port::<()>();
let work_exit_ch = oldcomm::Chan(&work_exit_po);
for iter::repeat(7u) {
let iotask_clone = iotask.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
impl_uv_iotask_async(iotask);
debug!("async");
impl_uv_iotask_async(&iotask_clone);
debug!("done async");
oldcomm::send(work_exit_ch, ());
};
};
for iter::repeat(7u) {
debug!("waiting");
oldcomm::recv(work_exit_po);
};
log(debug, ~"sending teardown_loop msg..");

View File

@ -43,8 +43,8 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
rust_kernel *kernel = new rust_kernel(env);
// Create the main scheduler and the main task
rust_sched_id sched_id = kernel->create_scheduler(env->num_sched_threads);
// Create the main task
rust_sched_id sched_id = kernel->main_sched_id();
rust_scheduler *sched = kernel->get_scheduler_by_id(sched_id);
assert(sched != NULL);
rust_task *root_task = sched->create_task(NULL, "main");

View File

@ -652,7 +652,10 @@ new_task_common(rust_scheduler *sched, rust_task *parent) {
extern "C" CDECL rust_task*
new_task() {
rust_task *task = rust_get_current_task();
return new_task_common(task->sched, task);
rust_sched_id sched_id = task->kernel->main_sched_id();
rust_scheduler *sched = task->kernel->get_scheduler_by_id(sched_id);
assert(sched != NULL && "should always have a main scheduler");
return new_task_common(sched, task);
}
extern "C" CDECL rust_task*
@ -855,24 +858,6 @@ rust_compare_and_swap_ptr(intptr_t *address,
return sync::compare_and_swap(address, oldval, newval);
}
extern "C" CDECL void
rust_task_weaken(rust_port_id chan) {
rust_task *task = rust_get_current_task();
task->kernel->weaken_task(chan);
}
extern "C" CDECL void
rust_task_unweaken(rust_port_id chan) {
rust_task *task = rust_get_current_task();
task->kernel->unweaken_task(chan);
}
extern "C" CDECL uintptr_t*
rust_global_env_chan_ptr() {
rust_task *task = rust_get_current_task();
return task->kernel->get_global_env_chan();
}
extern "C" void
rust_task_inhibit_kill(rust_task *task) {
task->inhibit_kill();
@ -1023,6 +1008,29 @@ rust_raw_thread_join_delete(raw_thread *thread) {
delete thread;
}
extern "C" void
rust_register_exit_function(spawn_fn runner, fn_env_pair *f) {
rust_task *task = rust_get_current_task();
task->kernel->register_exit_function(runner, f);
}
extern "C" void *
rust_get_global_data_ptr() {
rust_task *task = rust_get_current_task();
return &task->kernel->global_data;
}
extern "C" void
rust_inc_weak_task_count() {
rust_task *task = rust_get_current_task();
task->kernel->inc_weak_task_count();
}
extern "C" void
rust_dec_weak_task_count() {
rust_task *task = rust_get_current_task();
task->kernel->dec_weak_task_count();
}
//
// Local Variables:

View File

@ -30,21 +30,29 @@ rust_kernel::rust_kernel(rust_env *env) :
rval(0),
max_sched_id(1),
killed(false),
already_exiting(false),
sched_reaper(this),
osmain_driver(NULL),
non_weak_tasks(0),
global_loop_chan(0),
global_env_chan(0),
env(env)
at_exit_runner(NULL),
at_exit_started(false),
env(env),
global_data(0)
{
// Create the single threaded scheduler that will run on the platform's
// main thread
rust_manual_sched_launcher_factory *launchfac =
rust_manual_sched_launcher_factory *osmain_launchfac =
new rust_manual_sched_launcher_factory();
osmain_scheduler = create_scheduler(launchfac, 1, false);
osmain_driver = launchfac->get_driver();
osmain_scheduler = create_scheduler(osmain_launchfac, 1, false);
osmain_driver = osmain_launchfac->get_driver();
// Create the primary scheduler
rust_thread_sched_launcher_factory *main_launchfac =
new rust_thread_sched_launcher_factory();
main_scheduler = create_scheduler(main_launchfac,
env->num_sched_threads,
false);
sched_reaper.start();
}
@ -103,15 +111,22 @@ rust_kernel::create_scheduler(rust_sched_launcher_factory *launchfac,
{
scoped_lock with(sched_lock);
if (sched_table.size() == 1) {
// The OS main scheduler may not exit while there are other
// schedulers
KLOG_("Disallowing osmain scheduler to exit");
rust_scheduler *sched =
get_scheduler_by_id_nolock(osmain_scheduler);
assert(sched != NULL);
sched->disallow_exit();
/*if (sched_table.size() == 2) {
// The main and OS main schedulers may not exit while there are
// other schedulers
KLOG_("Disallowing main scheduler to exit");
rust_scheduler *main_sched =
get_scheduler_by_id_nolock(main_scheduler);
assert(main_sched != NULL);
main_sched->disallow_exit();
}
if (sched_table.size() == 1) {
KLOG_("Disallowing osmain scheduler to exit");
rust_scheduler *osmain_sched =
get_scheduler_by_id_nolock(osmain_scheduler);
assert(osmain_sched != NULL);
osmain_sched->disallow_exit();
}*/
id = max_sched_id++;
assert(id != INTPTR_MAX && "Hit the maximum scheduler id");
@ -175,14 +190,21 @@ rust_kernel::wait_for_schedulers()
sched_table.erase(iter);
sched->join_task_threads();
sched->deref();
/*if (sched_table.size() == 2) {
KLOG_("Allowing main scheduler to exit");
// It's only the main schedulers left. Tell them to exit
rust_scheduler *main_sched =
get_scheduler_by_id_nolock(main_scheduler);
assert(main_sched != NULL);
main_sched->allow_exit();
}
if (sched_table.size() == 1) {
KLOG_("Allowing osmain scheduler to exit");
// It's only the osmain scheduler left. Tell it to exit
rust_scheduler *sched =
rust_scheduler *osmain_sched =
get_scheduler_by_id_nolock(osmain_scheduler);
assert(sched != NULL);
sched->allow_exit();
}
assert(osmain_sched != NULL);
osmain_sched->allow_exit();
}*/
}
if (!sched_table.empty()) {
sched_lock.wait();
@ -318,60 +340,64 @@ rust_kernel::register_task() {
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
}
void
rust_kernel::allow_scheduler_exit() {
scoped_lock with(sched_lock);
KLOG_("Allowing main scheduler to exit");
// It's only the main schedulers left. Tell them to exit
rust_scheduler *main_sched =
get_scheduler_by_id_nolock(main_scheduler);
assert(main_sched != NULL);
main_sched->allow_exit();
KLOG_("Allowing osmain scheduler to exit");
rust_scheduler *osmain_sched =
get_scheduler_by_id_nolock(osmain_scheduler);
assert(osmain_sched != NULL);
osmain_sched->allow_exit();
}
void
rust_kernel::unregister_task() {
KLOG_("Unregistering task");
uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
if (new_non_weak_tasks == 0) {
end_weak_tasks();
begin_shutdown();
}
}
void
rust_kernel::weaken_task(rust_port_id chan) {
{
scoped_lock with(weak_task_lock);
KLOG_("Weakening task with channel %" PRIdPTR, chan);
weak_task_chans.push_back(chan);
}
rust_kernel::inc_weak_task_count() {
uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
if (new_non_weak_tasks == 0) {
end_weak_tasks();
begin_shutdown();
}
}
void
rust_kernel::unweaken_task(rust_port_id chan) {
rust_kernel::dec_weak_task_count() {
uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks);
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
{
scoped_lock with(weak_task_lock);
KLOG_("Unweakening task with channel %" PRIdPTR, chan);
std::vector<rust_port_id>::iterator iter =
std::find(weak_task_chans.begin(), weak_task_chans.end(), chan);
if (iter != weak_task_chans.end()) {
weak_task_chans.erase(iter);
}
}
}
void
rust_kernel::end_weak_tasks() {
std::vector<rust_port_id> chancopies;
rust_kernel::begin_shutdown() {
{
scoped_lock with(weak_task_lock);
chancopies = weak_task_chans;
weak_task_chans.clear();
scoped_lock with(sched_lock);
// FIXME #4410: This shouldn't be necessary, but because of
// unweaken_task this may end up getting called multiple times.
if (already_exiting) {
return;
} else {
already_exiting = true;
}
while (!chancopies.empty()) {
rust_port_id chan = chancopies.back();
chancopies.pop_back();
KLOG_("Notifying weak task " PRIdPTR, chan);
uintptr_t token = 0;
send_to_port(chan, &token);
}
run_exit_functions();
allow_scheduler_exit();
}
bool
@ -389,6 +415,47 @@ rust_kernel::send_to_port(rust_port_id chan, void *sptr) {
}
}
void
rust_kernel::register_exit_function(spawn_fn runner, fn_env_pair *f) {
scoped_lock with(at_exit_lock);
assert(!at_exit_started && "registering at_exit function after exit");
if (at_exit_runner) {
assert(runner == at_exit_runner
&& "there can be only one at_exit_runner");
}
at_exit_runner = runner;
at_exit_fns.push_back(f);
}
void
rust_kernel::run_exit_functions() {
rust_task *task;
{
scoped_lock with(at_exit_lock);
assert(!at_exit_started && "running exit functions twice?");
at_exit_started = true;
if (at_exit_runner == NULL) {
return;
}
rust_scheduler *sched = get_scheduler_by_id(main_sched_id());
assert(sched);
task = sched->create_task(NULL, "at_exit");
final_exit_fns.count = at_exit_fns.size();
final_exit_fns.start = at_exit_fns.data();
}
task->start(at_exit_runner, NULL, &final_exit_fns);
}
//
// Local Variables:
// mode: C++

View File

@ -49,6 +49,7 @@
#include "memory_region.h"
#include "rust_log.h"
#include "rust_sched_reaper.h"
#include "rust_type.h"
#include "util/hash_map.h"
class rust_scheduler;
@ -65,6 +66,13 @@ typedef intptr_t rust_port_id;
typedef std::map<rust_sched_id, rust_scheduler*> sched_map;
// This is defined as a struct only because we need a single pointer to pass
// to the Rust function that runs the at_exit functions
struct exit_functions {
size_t count;
fn_env_pair **start;
};
class rust_kernel {
memory_region _region;
rust_log _log;
@ -81,7 +89,8 @@ class rust_kernel {
lock_and_signal rval_lock;
int rval;
// Protects max_sched_id and sched_table, join_list, killed
// Protects max_sched_id and sched_table, join_list, killed,
// already_exiting
lock_and_signal sched_lock;
// The next scheduler id
rust_sched_id max_sched_id;
@ -94,8 +103,13 @@ class rust_kernel {
// task group fails). This propagates to all new schedulers and tasks
// created after it is set.
bool killed;
bool already_exiting;
rust_sched_reaper sched_reaper;
// The primary scheduler
rust_sched_id main_scheduler;
// The single-threaded scheduler that uses the main thread
rust_sched_id osmain_scheduler;
// Runs the single-threaded scheduler that executes tasks
@ -104,21 +118,22 @@ class rust_kernel {
// An atomically updated count of the live, 'non-weak' tasks
uintptr_t non_weak_tasks;
// Protects weak_task_chans
lock_and_signal weak_task_lock;
// A list of weak tasks that need to be told when to exit
std::vector<rust_port_id> weak_task_chans;
rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id);
void end_weak_tasks();
void allow_scheduler_exit();
void begin_shutdown();
// Used to communicate with the process-side, global libuv loop
uintptr_t global_loop_chan;
// Used to serialize access to getenv/setenv
uintptr_t global_env_chan;
lock_and_signal at_exit_lock;
spawn_fn at_exit_runner;
bool at_exit_started;
std::vector<fn_env_pair*> at_exit_fns;
exit_functions final_exit_fns;
void run_exit_functions();
public:
struct rust_env *env;
uintptr_t global_data;
rust_kernel(rust_env *env);
@ -154,17 +169,17 @@ public:
void set_exit_status(int code);
rust_sched_id main_sched_id() { return main_scheduler; }
rust_sched_id osmain_sched_id() { return osmain_scheduler; }
void register_task();
void unregister_task();
void weaken_task(rust_port_id chan);
void unweaken_task(rust_port_id chan);
void inc_weak_task_count();
void dec_weak_task_count();
bool send_to_port(rust_port_id chan, void *sptr);
uintptr_t* get_global_loop() { return &global_loop_chan; }
uintptr_t* get_global_env_chan() { return &global_env_chan; }
void register_exit_function(spawn_fn runner, fn_env_pair *f);
};
template <typename T> struct kernel_owned {

View File

@ -513,15 +513,6 @@ rust_uv_ip6_port(struct sockaddr_in6* src) {
return ntohs(src->sin6_port);
}
extern "C" uintptr_t*
rust_uv_get_kernel_global_chan_ptr() {
uintptr_t* result = rust_get_current_task()->kernel->get_global_loop();
rust_task* task = rust_get_current_task();
LOG(task, stdlib, "global loop: %lu", (unsigned long int)result);
LOG(task, stdlib,"global loop val: %lu", (unsigned long int)*result);
return result;
}
extern "C" void*
rust_uv_current_kernel_malloc(size_t size) {
return current_kernel_malloc(size, "rust_uv_current_kernel_malloc");

View File

@ -61,8 +61,6 @@ rust_task_yield
rust_task_is_unwinding
rust_get_task
rust_get_stack_segment
rust_task_weaken
rust_task_unweaken
rust_log_str
start_task
vec_reserve_shared_actual
@ -158,7 +156,6 @@ rust_uv_get_data_for_req
rust_uv_set_data_for_req
rust_uv_get_base_from_buf
rust_uv_get_len_from_buf
rust_uv_get_kernel_global_chan_ptr
rust_uv_current_kernel_malloc
rust_uv_current_kernel_free
rust_uv_getaddrinfo
@ -174,7 +171,6 @@ rust_dbg_do_nothing
rust_dbg_breakpoint
rust_osmain_sched_id
rust_compare_and_swap_ptr
rust_global_env_chan_ptr
rust_port_take
rust_port_drop
rust_port_task
@ -210,3 +206,7 @@ linenoiseHistorySave
linenoiseHistoryLoad
rust_raw_thread_start
rust_raw_thread_join_delete
rust_register_exit_function
rust_get_global_data_ptr
rust_inc_weak_task_count
rust_dec_weak_task_count

View File

@ -27,7 +27,7 @@ proto! oneshot (
)
fn main() {
let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
pipes::spawn_service(oneshot::init, |p| {
match try_recv(move p) {

View File

@ -35,7 +35,7 @@ fn main() {
use oneshot::client::*;
use stream::client::*;
let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
let c = pipes::spawn_service(stream::init, |p| {
error!("waiting for pipes");

View File

@ -27,7 +27,7 @@ fn main() {
let c = pipes::spawn_service(oneshot::init, |p| { recv(move p); });
let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
sleep(iotask, 500);
signal(move c);