From 43ae4b3301cc0605839778ecf59effb32b752e33 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Sat, 6 Dec 2014 18:34:37 -0800 Subject: [PATCH] Fallout from new thread API --- src/compiletest/runtest.rs | 6 +- src/doc/guide-tasks.md | 6 +- src/libcore/borrow.rs | 2 +- src/librustc_driver/lib.rs | 15 ++- src/librustc_trans/back/write.rs | 15 ++- src/librustdoc/lib.rs | 5 +- src/librustdoc/test.rs | 3 +- src/libstd/c_str.rs | 4 +- src/libstd/comm/blocking.rs | 6 +- src/libstd/comm/mod.rs | 121 +++++------------- src/libstd/comm/oneshot.rs | 7 +- src/libstd/comm/select.rs | 23 ++-- src/libstd/comm/shared.rs | 2 +- src/libstd/comm/stream.rs | 5 +- src/libstd/comm/sync.rs | 40 +++--- src/libstd/failure.rs | 86 +++++-------- src/libstd/io/comm_adapters.rs | 10 +- src/libstd/io/net/pipe.rs | 2 +- src/libstd/io/net/tcp.rs | 4 +- src/libstd/io/process.rs | 11 +- src/libstd/io/stdio.rs | 33 ++--- src/libstd/lib.rs | 2 + src/libstd/os.rs | 2 +- src/libstd/path/posix.rs | 14 +- src/libstd/path/windows.rs | 14 +- src/libstd/rand/os.rs | 15 ++- src/libstd/rt/at_exit_imp.rs | 33 +++-- src/libstd/rt/backtrace.rs | 4 - src/libstd/rt/mod.rs | 38 ++---- src/libstd/rt/unwind.rs | 2 +- src/libstd/rt/util.rs | 12 +- src/libstd/sync/future.rs | 4 +- src/libstd/sync/mutex.rs | 10 +- src/libstd/sync/once.rs | 4 +- src/libstd/sync/poison.rs | 18 +-- src/libstd/sync/rwlock.rs | 34 ++--- src/libstd/sync/task_pool.rs | 7 +- src/libstd/sys/common/backtrace.rs | 2 +- src/libstd/sys/common/helper_thread.rs | 6 +- src/libstd/sys/common/mod.rs | 1 + src/libstd/sys/common/thread_info.rs | 15 +-- src/libstd/sys/common/thread_local.rs | 1 - src/libstd/sys/unix/backtrace.rs | 2 +- src/libstd/sys/unix/stack_overflow.rs | 34 ++--- src/libstd/sys/windows/stack_overflow.rs | 7 +- src/libstd/task.rs | 15 +++ src/libstd/thread.rs | 27 ++-- src/libstd/thread_local/mod.rs | 6 +- src/libtest/lib.rs | 17 ++- ...nup-rvalue-temp-during-incomplete-alloc.rs | 4 +- src/test/run-pass/task-stderr.rs | 6 +- 51 files changed, 323 insertions(+), 439 deletions(-) diff --git a/src/compiletest/runtest.rs b/src/compiletest/runtest.rs index ea6f180ec39..1b9d98823b0 100644 --- a/src/compiletest/runtest.rs +++ b/src/compiletest/runtest.rs @@ -32,7 +32,7 @@ use std::io; use std::os; use std::str; use std::string::String; -use std::task; +use std::thread::Thread; use std::time::Duration; use test::MetricMap; @@ -445,9 +445,9 @@ fn run_debuginfo_gdb_test(config: &Config, props: &TestProps, testfile: &Path) { loop { //waiting 1 second for gdbserver start timer::sleep(Duration::milliseconds(1000)); - let result = task::try(move || { + let result = Thread::with_join(move || { tcp::TcpStream::connect("127.0.0.1:5039").unwrap(); - }); + }).join(); if result.is_err() { continue; } diff --git a/src/doc/guide-tasks.md b/src/doc/guide-tasks.md index 4adca43be18..4753ba7fe47 100644 --- a/src/doc/guide-tasks.md +++ b/src/doc/guide-tasks.md @@ -347,16 +347,16 @@ result with an `int` field (representing a successful result) or an `Err` result (representing termination with an error). ```{rust} -# use std::task; +# use std::thread::Thread; # fn some_condition() -> bool { false } # fn calculate_result() -> int { 0 } -let result: Result> = task::try(move || { +let result: Result> = Thread::with_join(move || { if some_condition() { calculate_result() } else { panic!("oops!"); } -}); +}).join(); assert!(result.is_err()); ``` diff --git a/src/libcore/borrow.rs b/src/libcore/borrow.rs index b44b87bd938..4bf356fb552 100644 --- a/src/libcore/borrow.rs +++ b/src/libcore/borrow.rs @@ -92,7 +92,7 @@ impl<'a, T, Sized? B> BorrowFrom> for B where B: ToOwned { /// Trait for moving into a `Cow` pub trait IntoCow<'a, T, Sized? B> { - /// Moves `self` into `Cow` + /// Moves `serlf` into `Cow` fn into_cow(self) -> Cow<'a, T, B>; } diff --git a/src/librustc_driver/lib.rs b/src/librustc_driver/lib.rs index b0f8b3bdbe7..ba5157932b6 100644 --- a/src/librustc_driver/lib.rs +++ b/src/librustc_driver/lib.rs @@ -55,7 +55,7 @@ use rustc::DIAGNOSTICS; use std::any::AnyRefExt; use std::io; use std::os; -use std::task::TaskBuilder; +use std::thread; use rustc::session::early_error; @@ -475,18 +475,22 @@ pub fn monitor(f: F) { static STACK_SIZE: uint = 32000000; // 32MB let (tx, rx) = channel(); - let w = io::ChanWriter::new(tx); + let mut w = Some(io::ChanWriter::new(tx)); // option dance let mut r = io::ChanReader::new(rx); - let mut task = TaskBuilder::new().named("rustc").stderr(box w); + let mut cfg = thread::cfg().name("rustc".to_string()); // FIXME: Hacks on hacks. If the env is trying to override the stack size // then *don't* set it explicitly. if os::getenv("RUST_MIN_STACK").is_none() { - task = task.stack_size(STACK_SIZE); + cfg = cfg.stack_size(STACK_SIZE); } - match task.try(f) { + let f = proc() { + std::io::stdio::set_stderr(box w.take().unwrap()); + f() + }; + match cfg.with_join(f).join() { Ok(()) => { /* fallthrough */ } Err(value) => { // Task panicked without emitting a fatal diagnostic @@ -540,4 +544,3 @@ pub fn main() { let result = run(args); std::os::set_exit_status(result); } - diff --git a/src/librustc_trans/back/write.rs b/src/librustc_trans/back/write.rs index 24dfe600f2a..8ec010a8553 100644 --- a/src/librustc_trans/back/write.rs +++ b/src/librustc_trans/back/write.rs @@ -30,7 +30,7 @@ use std::ptr; use std::str; use std::mem; use std::sync::{Arc, Mutex}; -use std::task::TaskBuilder; +use std::thread; use libc::{c_uint, c_int, c_void}; #[deriving(Clone, PartialEq, PartialOrd, Ord, Eq)] @@ -896,7 +896,11 @@ fn run_work_multithreaded(sess: &Session, let diag_emitter = diag_emitter.clone(); let remark = sess.opts.cg.remark.clone(); - let future = TaskBuilder::new().named(format!("codegen-{}", i)).try_future(move |:| { + let (tx, rx) = channel(); + let mut tx = Some(tx); + futures.push(rx); + + thread::cfg().name(format!("codegen-{}", i)).spawn(move |:| { let diag_handler = mk_handler(box diag_emitter); // Must construct cgcx inside the proc because it has non-Send @@ -921,13 +925,14 @@ fn run_work_multithreaded(sess: &Session, None => break, } } + + tx.take().unwrap().send(()); }); - futures.push(future); } let mut panicked = false; - for future in futures.into_iter() { - match future.into_inner() { + for rx in futures.into_iter() { + match rx.recv_opt() { Ok(()) => {}, Err(_) => { panicked = true; diff --git a/src/librustdoc/lib.rs b/src/librustdoc/lib.rs index 3d51a6d6ab0..9e06b4e88f5 100644 --- a/src/librustdoc/lib.rs +++ b/src/librustdoc/lib.rs @@ -342,10 +342,11 @@ fn rust_input(cratefile: &str, externs: core::Externs, matches: &getopts::Matche let cr = Path::new(cratefile); info!("starting to run rustc"); - let (mut krate, analysis) = std::task::try(move |:| { + + let (mut krate, analysis) = std::thread::Thread::with_join(move |:| { let cr = cr; core::run_core(libs, cfgs, externs, &cr, triple) - }).map_err(|_| "rustc failed").unwrap(); + }).join().map_err(|_| "rustc failed").unwrap(); info!("finished with rustc"); let mut analysis = Some(analysis); ANALYSISKEY.with(|s| { diff --git a/src/librustdoc/test.rs b/src/librustdoc/test.rs index 0c64a0d29df..d004f030f36 100644 --- a/src/librustdoc/test.rs +++ b/src/librustdoc/test.rs @@ -16,6 +16,7 @@ use std::os; use std::str; use std::string::String; use std::thunk::Thunk; +use std::thread::Thread; use std::collections::{HashSet, HashMap}; use testing; @@ -143,7 +144,7 @@ fn runtest(test: &str, cratename: &str, libs: Vec, externs: core::Externs, let w1 = io::ChanWriter::new(tx); let w2 = w1.clone(); let old = io::stdio::set_stderr(box w1); - spawn(move |:| { + Thread::spawn(move |:| { let mut p = io::ChanReader::new(rx); let mut err = match old { Some(old) => { diff --git a/src/libstd/c_str.rs b/src/libstd/c_str.rs index 27a139835c9..6bfb219d9c4 100644 --- a/src/libstd/c_str.rs +++ b/src/libstd/c_str.rs @@ -536,7 +536,7 @@ pub unsafe fn from_c_multistring(buf: *const libc::c_char, mod tests { use prelude::*; use ptr; - use task; + use thread::Thread; use libc; use super::*; @@ -637,7 +637,7 @@ mod tests { #[test] fn test_to_c_str_fail() { - assert!(task::try(move|| { "he\x00llo".to_c_str() }).is_err()); + assert!(Thread::with_join(move|| { "he\x00llo".to_c_str() }).join().is_err()); } #[test] diff --git a/src/libstd/comm/blocking.rs b/src/libstd/comm/blocking.rs index 5e9a01d0151..bb097265756 100644 --- a/src/libstd/comm/blocking.rs +++ b/src/libstd/comm/blocking.rs @@ -32,7 +32,7 @@ pub struct WaitToken { no_send: NoSend, } -fn token() -> (WaitToken, SignalToken) { +pub fn tokens() -> (WaitToken, SignalToken) { let inner = Arc::new(Inner { thread: Thread::current(), woken: INIT_ATOMIC_BOOL, @@ -48,7 +48,7 @@ fn token() -> (WaitToken, SignalToken) { } impl SignalToken { - fn signal(&self) -> bool { + pub fn signal(&self) -> bool { let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst); if wake { self.inner.thread.unpark(); @@ -73,7 +73,7 @@ impl SignalToken { } impl WaitToken { - fn wait(self) { + pub fn wait(self) { while !self.inner.woken.load(Ordering::SeqCst) { Thread::park() } diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index e5ec0078c5e..236a055b91e 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -317,8 +317,10 @@ use core::kinds::marker; use core::mem; use core::cell::UnsafeCell; -pub use comm::select::{Select, Handle}; -use comm::select::StartResult::*; +pub use self::select::{Select, Handle}; +use self::select::StartResult; +use self::select::StartResult::*; +use self::blocking::SignalToken; macro_rules! test { { fn $name:ident() $b:block $(#[$a:meta])*} => ( @@ -330,7 +332,7 @@ macro_rules! test { use comm::*; use super::*; - use task; + use thread::Thread; $(#[$a])* #[test] fn f() { $b } } @@ -593,12 +595,12 @@ impl Sender { (a, ret) } oneshot::UpDisconnected => (a, Err(t)), - oneshot::UpWoke(task) => { - // This send cannot panic because the task is + oneshot::UpWoke(token) => { + // This send cannot panic because the thread is // asleep (we're looking at it), so the receiver // can't go away. (*a.get()).send(t).ok().unwrap(); - task.wake().map(|t| t.reawaken()); + token.signal(); (a, Ok(())) } } @@ -937,7 +939,7 @@ impl select::Packet for Receiver { } } - fn start_selection(&self, mut token: SignalToken) -> bool { + fn start_selection(&self, mut token: SignalToken) -> StartResult { loop { let (t, new_port) = match *unsafe { self.inner() } { Oneshot(ref p) => { @@ -1240,11 +1242,11 @@ mod test { test! { fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic - let res = task::try(move|| { + let res = Thread::with_join(move|| { let (tx, rx) = channel::(); drop(tx); rx.recv(); - }); + }).join(); // What is our res? assert!(res.is_err()); } } @@ -1312,9 +1314,9 @@ mod test { spawn(move|| { drop(tx); }); - let res = task::try(move|| { + let res = Thread::with_join(move|| { assert!(rx.recv() == box 10); - }); + }).join(); assert!(res.is_err()); } } @@ -1334,19 +1336,19 @@ mod test { spawn(move|| { drop(rx); }); - let _ = task::try(move|| { + let _ = Thread::with_join(move|| { tx.send(1); - }); + }).join(); } } } test! { fn oneshot_multi_thread_recv_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel::(); - spawn(move|| { - let res = task::try(move|| { + spawn(proc() { + let res = Thread::with_join(move|| { rx.recv(); - }); + }).join(); assert!(res.is_err()); }); spawn(move|| { @@ -1495,7 +1497,7 @@ mod test { tx2.send(()); }); // make sure the other task has gone to sleep - for _ in range(0u, 5000) { task::deschedule(); } + for _ in range(0u, 5000) { Thread::yield_now(); } // upgrade to a shared chan and send a message let t = tx.clone(); @@ -1504,45 +1506,7 @@ mod test { // wait for the child task to exit before we exit rx2.recv(); - } } - - test! { fn sends_off_the_runtime() { - use rt::thread::Thread; - - let (tx, rx) = channel(); - let t = Thread::start(move|| { - for _ in range(0u, 1000) { - tx.send(()); - } - }); - for _ in range(0u, 1000) { - rx.recv(); - } - t.join(); - } } - - test! { fn try_recvs_off_the_runtime() { - use rt::thread::Thread; - - let (tx, rx) = channel(); - let (cdone, pdone) = channel(); - let t = Thread::start(move|| { - let mut hits = 0u; - while hits < 10 { - match rx.try_recv() { - Ok(()) => { hits += 1; } - Err(Empty) => { Thread::yield_now(); } - Err(Disconnected) => return, - } - } - cdone.send(()); - }); - for _ in range(0u, 10) { - tx.send(()); - } - t.join(); - pdone.recv(); - } } + }) } #[cfg(test)] @@ -1700,11 +1664,11 @@ mod sync_tests { test! { fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic - let res = task::try(move|| { + let res = Thread::with_join(move|| { let (tx, rx) = sync_channel::(0); drop(tx); rx.recv(); - }); + }).join(); // What is our res? assert!(res.is_err()); } } @@ -1777,9 +1741,9 @@ mod sync_tests { spawn(move|| { drop(tx); }); - let res = task::try(move|| { + let res = Thread::with_join(move|| { assert!(rx.recv() == box 10); - }); + }).join(); assert!(res.is_err()); } } @@ -1799,19 +1763,19 @@ mod sync_tests { spawn(move|| { drop(rx); }); - let _ = task::try(move|| { + let _ = Thread::with_join(move || { tx.send(1); - }); + }).join(); } } } test! { fn oneshot_multi_thread_recv_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::(0); - spawn(move|| { - let res = task::try(move|| { + spawn(proc() { + let res = Thread::with_join(move|| { rx.recv(); - }); + }).join(); assert!(res.is_err()); }); spawn(move|| { @@ -1960,7 +1924,7 @@ mod sync_tests { tx2.send(()); }); // make sure the other task has gone to sleep - for _ in range(0u, 5000) { task::deschedule(); } + for _ in range(0u, 5000) { Thread::yield_now(); } // upgrade to a shared chan and send a message let t = tx.clone(); @@ -1971,29 +1935,6 @@ mod sync_tests { rx2.recv(); } } - test! { fn try_recvs_off_the_runtime() { - use rt::thread::Thread; - - let (tx, rx) = sync_channel::<()>(0); - let (cdone, pdone) = channel(); - let t = Thread::start(move|| { - let mut hits = 0u; - while hits < 10 { - match rx.try_recv() { - Ok(()) => { hits += 1; } - Err(Empty) => { Thread::yield_now(); } - Err(Disconnected) => return, - } - } - cdone.send(()); - }); - for _ in range(0u, 10) { - tx.send(()); - } - t.join(); - pdone.recv(); - } } - test! { fn send_opt1() { let (tx, rx) = sync_channel::(0); spawn(move|| { rx.recv(); }); @@ -2052,7 +1993,7 @@ mod sync_tests { test! { fn try_send4() { let (tx, rx) = sync_channel::(0); spawn(move|| { - for _ in range(0u, 1000) { task::deschedule(); } + for _ in range(0u, 1000) { Thread::yield_now(); } assert_eq!(tx.try_send(1), Ok(())); }); assert_eq!(rx.recv(), 1); diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs index 68f3f229cb4..9c5a6518845 100644 --- a/src/libstd/comm/oneshot.rs +++ b/src/libstd/comm/oneshot.rs @@ -39,9 +39,8 @@ use self::MyUpgrade::*; use core::prelude::*; -use alloc::boxed::Box; use comm::Receiver; -use comm::blocking::{mod, WaitToken, SignalToken}; +use comm::blocking::{mod, SignalToken}; use core::mem; use sync::atomic; @@ -143,7 +142,7 @@ impl Packet { // Attempt to not block the task (it's a little expensive). If it looks // like we're not empty, then immediately go through to `try_recv`. if self.state.load(atomic::SeqCst) == EMPTY { - let (wait_token, signal_token) = blocking::token(); + let (wait_token, signal_token) = blocking::tokens(); let ptr = unsafe { signal_token.cast_to_uint() }; // race with senders to enter the blocking state @@ -332,7 +331,7 @@ impl Packet { // If we've got a blocked task, then use an atomic to gain ownership // of it (may fail) - BLOCKED => self.state.compare_and_swap(BLOCKED, EMPTY, atomic::SeqCst) + ptr => self.state.compare_and_swap(ptr, EMPTY, atomic::SeqCst) }; // Now that we've got ownership of our state, figure out what to do diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 536d38c6e55..690b5861c22 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -54,7 +54,6 @@ use core::prelude::*; -use alloc::boxed::Box; use core::cell::Cell; use core::kinds::marker; use core::mem; @@ -63,8 +62,6 @@ use core::uint; use comm::Receiver; use comm::blocking::{mod, SignalToken}; -use self::StartResult::*; - /// The "receiver set" of the select interface. This structure is used to manage /// a set of receivers which are being selected over. pub struct Select { @@ -190,8 +187,8 @@ impl Select { let (wait_token, signal_token) = blocking::tokens(); for (i, handle) in self.iter().enumerate() { match (*handle).packet.start_selection(signal_token.clone()) { - Installed => {} - Abort => { + StartResult::Installed => {} + StartResult::Abort => { // Go back and abort the already-begun selections for handle in self.iter().take(i) { (*handle).packet.abort_selection(); @@ -417,10 +414,10 @@ mod test { let (tx3, rx3) = channel::(); spawn(move|| { - for _ in range(0u, 20) { task::deschedule(); } + for _ in range(0u, 20) { Thread::yield_now(); } tx1.send(1); rx3.recv(); - for _ in range(0u, 20) { task::deschedule(); } + for _ in range(0u, 20) { Thread::yield_now(); } }); select! { @@ -440,7 +437,7 @@ mod test { let (tx3, rx3) = channel::<()>(); spawn(move|| { - for _ in range(0u, 20) { task::deschedule(); } + for _ in range(0u, 20) { Thread::yield_now(); } tx1.send(1); tx2.send(2); rx3.recv(); @@ -541,7 +538,7 @@ mod test { tx3.send(()); }); - for _ in range(0u, 1000) { task::deschedule(); } + for _ in range(0u, 1000) { Thread::yield_now(); } drop(tx1.clone()); tx2.send(()); rx3.recv(); @@ -644,7 +641,7 @@ mod test { tx2.send(()); }); - for _ in range(0u, 100) { task::deschedule() } + for _ in range(0u, 100) { Thread::yield_now() } tx1.send(()); rx2.recv(); } } @@ -663,7 +660,7 @@ mod test { tx2.send(()); }); - for _ in range(0u, 100) { task::deschedule() } + for _ in range(0u, 100) { Thread::yield_now() } tx1.send(()); rx2.recv(); } } @@ -681,7 +678,7 @@ mod test { tx2.send(()); }); - for _ in range(0u, 100) { task::deschedule() } + for _ in range(0u, 100) { Thread::yield_now() } tx1.send(()); rx2.recv(); } } @@ -697,7 +694,7 @@ mod test { test! { fn sync2() { let (tx, rx) = sync_channel::(0); spawn(move|| { - for _ in range(0u, 100) { task::deschedule() } + for _ in range(0u, 100) { Thread::yield_now() } tx.send(1); }); select! { diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs index 1f1ea2ca9a1..1022694e634 100644 --- a/src/libstd/comm/shared.rs +++ b/src/libstd/comm/shared.rs @@ -22,7 +22,6 @@ pub use self::Failure::*; use core::prelude::*; -use alloc::boxed::Box; use core::cmp; use core::int; @@ -31,6 +30,7 @@ use comm::mpsc_queue as mpsc; use comm::blocking::{mod, SignalToken}; use comm::select::StartResult; use comm::select::StartResult::*; +use thread::Thread; const DISCONNECTED: int = int::MIN; const FUDGE: int = 1024; diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs index a15366d5ebc..b68f626060e 100644 --- a/src/libstd/comm/stream.rs +++ b/src/libstd/comm/stream.rs @@ -24,7 +24,6 @@ use self::Message::*; use core::prelude::*; -use alloc::boxed::Box; use core::cmp; use core::int; use thread::Thread; @@ -32,7 +31,7 @@ use thread::Thread; use sync::atomic; use comm::spsc_queue as spsc; use comm::Receiver; -use comm::blocking::{mod, WaitToken, SignalToken}; +use comm::blocking::{mod, SignalToken}; const DISCONNECTED: int = int::MIN; #[cfg(test)] @@ -147,7 +146,7 @@ impl Packet { let ptr = self.to_wake.load(atomic::SeqCst); self.to_wake.store(0, atomic::SeqCst); assert!(ptr != 0); - unsafe { SignaToken::cast_from_uint(ptr) } + unsafe { SignalToken::cast_from_uint(ptr) } } // Decrements the count on the channel for a sleeper, returning the sleeper diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs index 9e4bdb15b00..b24c6d21fba 100644 --- a/src/libstd/comm/sync.rs +++ b/src/libstd/comm/sync.rs @@ -38,10 +38,8 @@ use core::prelude::*; pub use self::Failure::*; use self::Blocker::*; -use alloc::boxed::Box; use vec::Vec; use core::mem; -use core::cell::UnsafeCell; use sync::{atomic, Mutex, MutexGuard}; use comm::blocking::{mod, WaitToken, SignalToken}; @@ -105,10 +103,10 @@ pub enum Failure { /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock` /// in the meantime. This re-locks the mutex upon returning. -fn wait<'a, 'b, T>(lock: &'a Mutex>, - guard: MutexGuard<'b, State>, - f: fn(BlockedTask) -> Blocker) - -> MutexGuard<'a, State> +fn wait<'a, 'b, T: Send>(lock: &'a Mutex>, + mut guard: MutexGuard<'b, State>, + f: fn(SignalToken) -> Blocker) + -> MutexGuard<'a, State> { let me: Box = Local::take(); me.deschedule(1, |task| { @@ -170,7 +168,7 @@ impl Packet { } pub fn send(&self, t: T) -> Result<(), T> { - let guard = self.acquire_send_slot(); + let mut guard = self.acquire_send_slot(); if guard.disconnected { return Err(t) } guard.buf.enqueue(t); @@ -183,7 +181,7 @@ impl Packet { let mut canceled = false; assert!(guard.canceled.is_none()); guard.canceled = Some(unsafe { mem::transmute(&mut canceled) }); - let guard = wait(&self.lock, guard, BlockedSender); + let mut guard = wait(&self.lock, guard, BlockedSender); if canceled {Err(guard.buf.dequeue())} else {Ok(())} } @@ -198,7 +196,7 @@ impl Packet { } pub fn try_send(&self, t: T) -> Result<(), super::TrySendError> { - let guard = self.lock.lock(); + let mut guard = self.lock.lock(); if guard.disconnected { Err(super::RecvDisconnected(t)) } else if guard.buf.size() == guard.buf.cap() { @@ -235,13 +233,13 @@ impl Packet { // When reading this, remember that there can only ever be one receiver at // time. pub fn recv(&self) -> Result { - let guard = self.lock.lock(); + let mut guard = self.lock.lock(); // Wait for the buffer to have something in it. No need for a while loop // because we're the only receiver. let mut waited = false; if !guard.disconnected && guard.buf.size() == 0 { - wait(&mut guard.blocker, BlockedReceiver, &self.lock); + guard = wait(&self.lock, guard, BlockedReceiver); waited = true; } if guard.disconnected && guard.buf.size() == 0 { return Err(()) } @@ -249,12 +247,12 @@ impl Packet { // Pick up the data, wake up our neighbors, and carry on assert!(guard.buf.size() > 0); let ret = guard.buf.dequeue(); - self.wakeup_senders(waited, guard, state); + self.wakeup_senders(waited, guard); return Ok(ret); } pub fn try_recv(&self) -> Result { - let guard = self.lock(); + let mut guard = self.lock.lock(); // Easy cases first if guard.disconnected { return Err(Disconnected) } @@ -262,7 +260,7 @@ impl Packet { // Be sure to wake up neighbors let ret = Ok(guard.buf.dequeue()); - self.wakeup_senders(false, guard, state); + self.wakeup_senders(false, guard); return ret; } @@ -272,7 +270,7 @@ impl Packet { // * `waited` - flag if the receiver blocked to receive some data, or if it // just picked up some data on the way out // * `guard` - the lock guard that is held over this channel's lock - fn wakeup_senders(&self, waited: bool, guard: MutexGuard>) { + fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard>) { let pending_sender1: Option = guard.queue.dequeue(); // If this is a no-buffer channel (cap == 0), then if we didn't wait we @@ -311,7 +309,7 @@ impl Packet { } // Not much to do other than wake up a receiver if one's there - let guard = self.lock(); + let mut guard = self.lock.lock(); if guard.disconnected { return } guard.disconnected = true; match mem::replace(&mut guard.blocker, NoneBlocked) { @@ -322,7 +320,7 @@ impl Packet { } pub fn drop_port(&self) { - let guard = self.lock(); + let mut guard = self.lock.lock(); if guard.disconnected { return } guard.disconnected = true; @@ -368,14 +366,14 @@ impl Packet { // If Ok, the value is whether this port has data, if Err, then the upgraded // port needs to be checked instead of this one. pub fn can_recv(&self) -> bool { - let guard = self.lock(); + let guard = self.lock.lock(); guard.disconnected || guard.buf.size() > 0 } // Attempts to start selection on this port. This can either succeed or fail // because there is data waiting. pub fn start_selection(&self, token: SignalToken) -> StartResult { - let guard = self.lock(); + let mut guard = self.lock.lock(); if guard.disconnected || guard.buf.size() > 0 { Abort } else { @@ -393,7 +391,7 @@ impl Packet { // // The return value indicates whether there's data on this port. pub fn abort_selection(&self) -> bool { - let guard = self.lock(); + let mut guard = self.lock.lock(); match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => true, BlockedSender(token) => { @@ -409,7 +407,7 @@ impl Packet { impl Drop for Packet { fn drop(&mut self) { assert_eq!(self.channels.load(atomic::SeqCst), 0); - let guard = self.lock(); + let mut guard = self.lock.lock(); assert!(guard.queue.dequeue().is_none()); assert!(guard.canceled.is_none()); } diff --git a/src/libstd/failure.rs b/src/libstd/failure.rs index 5438f1920d6..3934b0cf45f 100644 --- a/src/libstd/failure.rs +++ b/src/libstd/failure.rs @@ -21,10 +21,10 @@ use option::Option::{Some, None}; use result::Result::Ok; use rt::backtrace; use rt::util::{Stderr, Stdio}; -use rt::local::Local; -use rt::task::Task; use str::Str; use string::String; +use thread::Thread; +use sys_common::thread_info; // Defined in this module instead of io::stdio so that the unwinding thread_local! { @@ -52,63 +52,35 @@ pub fn on_fail(obj: &(Any+Send), file: &'static str, line: uint) { } }; let mut err = Stderr; - - // It is assumed that all reasonable rust code will have a local task at - // all times. This means that this `exists` will return true almost all of - // the time. There are border cases, however, when the runtime has - // *almost* set up the local task, but hasn't quite gotten there yet. In - // order to get some better diagnostics, we print on panic and - // immediately abort the whole process if there is no local task - // available. - if !Local::exists(None::) { - let _ = writeln!(&mut err, "panicked at '{}', {}:{}", msg, file, line); - if backtrace::log_enabled() { - let _ = backtrace::write(&mut err); - } else { - let _ = writeln!(&mut err, "run with `RUST_BACKTRACE=1` to \ - see a backtrace"); - } - return - } - - // Peel the name out of local task so we can print it. We've got to be sure - // that the local task is in TLS while we're printing as I/O may occur. - let (name, unwinding) = { - let mut t = Local::borrow(None::); - (t.name.take(), t.unwinder.unwinding()) - }; - { - let n = name.as_ref().map(|n| n.as_slice()).unwrap_or(""); - - let prev = LOCAL_STDERR.with(|s| s.borrow_mut().take()); - match prev { - Some(mut stderr) => { - // FIXME: what to do when the task printing panics? - let _ = writeln!(stderr, - "task '{}' panicked at '{}', {}:{}\n", - n, msg, file, line); - if backtrace::log_enabled() { - let _ = backtrace::write(&mut *stderr); - } - let mut s = Some(stderr); - LOCAL_STDERR.with(|slot| { - *slot.borrow_mut() = s.take(); - }); + let thread = Thread::current(); + let name = thread.name().unwrap_or(""); + let prev = LOCAL_STDERR.with(|s| s.borrow_mut().take()); + match prev { + Some(mut stderr) => { + // FIXME: what to do when the thread printing panics? + let _ = writeln!(stderr, + "thread '{}' panicked at '{}', {}:{}\n", + name, msg, file, line); + if backtrace::log_enabled() { + let _ = backtrace::write(&mut *stderr); } - None => { - let _ = writeln!(&mut err, "task '{}' panicked at '{}', {}:{}", - n, msg, file, line); - if backtrace::log_enabled() { - let _ = backtrace::write(&mut err); - } + let mut s = Some(stderr); + LOCAL_STDERR.with(|slot| { + *slot.borrow_mut() = s.take(); + }); + } + None => { + let _ = writeln!(&mut err, "thread '{}' panicked at '{}', {}:{}", + name, msg, file, line); + if backtrace::log_enabled() { + let _ = backtrace::write(&mut err); } } - - // If this is a double panic, make sure that we printed a backtrace - // for this panic. - if unwinding && !backtrace::log_enabled() { - let _ = backtrace::write(&mut err); - } } - Local::borrow(None::).name = name; + + // If this is a double panic, make sure that we printed a backtrace + // for this panic. + if thread_info::panicking() && !backtrace::log_enabled() { + let _ = backtrace::write(&mut err); + } } diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs index 2aa7435d871..d2a9861737d 100644 --- a/src/libstd/io/comm_adapters.rs +++ b/src/libstd/io/comm_adapters.rs @@ -156,12 +156,12 @@ mod test { use prelude::*; use super::*; use io; - use task; + use thread::Thread; #[test] fn test_rx_reader() { let (tx, rx) = channel(); - task::spawn(move|| { + Thread::spawn(move|| { tx.send(vec![1u8, 2u8]); tx.send(vec![]); tx.send(vec![3u8, 4u8]); @@ -203,7 +203,7 @@ mod test { #[test] fn test_rx_buffer() { let (tx, rx) = channel(); - task::spawn(move|| { + Thread::spawn(move|| { tx.send(b"he".to_vec()); tx.send(b"llo wo".to_vec()); tx.send(b"".to_vec()); @@ -229,7 +229,11 @@ mod test { writer.write_be_u32(42).unwrap(); let wanted = vec![0u8, 0u8, 0u8, 42u8]; +<<<<<<< HEAD let got = match task::try(move|| { rx.recv() }) { +======= + let got = match Thread::with_join(proc() { rx.recv() }).join() { +>>>>>>> Fallout from new thread API Ok(got) => got, Err(_) => panic!(), }; diff --git a/src/libstd/io/net/pipe.rs b/src/libstd/io/net/pipe.rs index 9f2f41c0021..01eb33b44f9 100644 --- a/src/libstd/io/net/pipe.rs +++ b/src/libstd/io/net/pipe.rs @@ -549,7 +549,7 @@ mod tests { Err(ref e) if e.kind == TimedOut => {} Err(e) => panic!("error: {}", e), } - ::task::deschedule(); + ::thread::Thread::yield_now(); if i == 1000 { panic!("should have a pending connection") } } drop(l); diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 3c38e23183f..0e9a93e4275 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -1155,7 +1155,7 @@ mod test { Err(ref e) if e.kind == TimedOut => {} Err(e) => panic!("error: {}", e), } - ::task::deschedule(); + ::thread::Thread::yield_now(); if i == 1000 { panic!("should have a pending connection") } } } @@ -1378,7 +1378,7 @@ mod test { // Try to ensure that the reading clone is indeed reading for _ in range(0i, 50) { - ::task::deschedule(); + ::thread::Thread::yield_now(); } // clone the handle again while it's reading, then let it finish the diff --git a/src/libstd/io/process.rs b/src/libstd/io/process.rs index 9ba6381c8c3..d9acb94714b 100644 --- a/src/libstd/io/process.rs +++ b/src/libstd/io/process.rs @@ -30,6 +30,7 @@ use hash::Hash; use std::hash::sip::SipState; use io::pipe::{PipeStream, PipePair}; use path::BytesContainer; +use thread::Thread; use sys; use sys::fs::FileDesc; @@ -693,10 +694,12 @@ impl Process { fn read(stream: Option) -> Receiver>> { let (tx, rx) = channel(); match stream { - Some(stream) => spawn(move |:| { - let mut stream = stream; - tx.send(stream.read_to_end()) - }), + Some(stream) => { + Thread::spawn(move |:| { + let mut stream = stream; + tx.send(stream.read_to_end()) + }); + } None => tx.send(Ok(Vec::new())) } rx diff --git a/src/libstd/io/stdio.rs b/src/libstd/io/stdio.rs index 7b5cbf7d58f..32ba6278a89 100644 --- a/src/libstd/io/stdio.rs +++ b/src/libstd/io/stdio.rs @@ -41,9 +41,6 @@ use option::Option; use option::Option::{Some, None}; use ops::{Deref, DerefMut, FnOnce}; use result::Result::{Ok, Err}; -use rt; -use rt::local::Local; -use rt::task::Task; use slice::SliceExt; use str::StrPrelude; use string::String; @@ -328,25 +325,17 @@ pub fn set_stderr(stderr: Box) -> Option> { // // io1 aliases io2 // }) // }) -fn with_task_stdout(f: F) where - F: FnOnce(&mut Writer) -> IoResult<()>, -{ - let result = if Local::exists(None::) { - let mut my_stdout = LOCAL_STDOUT.with(|slot| { - slot.borrow_mut().take() - }).unwrap_or_else(|| { - box stdout() as Box - }); - let result = f(&mut *my_stdout); - let mut var = Some(my_stdout); - LOCAL_STDOUT.with(|slot| { - *slot.borrow_mut() = var.take(); - }); - result - } else { - let mut io = rt::util::Stdout; - f(&mut io as &mut Writer) - }; +fn with_task_stdout(f: |&mut Writer| -> IoResult<()>) { + let mut my_stdout = LOCAL_STDOUT.with(|slot| { + slot.borrow_mut().take() + }).unwrap_or_else(|| { + box stdout() as Box + }); + let result = f(&mut *my_stdout); + let mut var = Some(my_stdout); + LOCAL_STDOUT.with(|slot| { + *slot.borrow_mut() = var.take(); + }); match result { Ok(()) => {} Err(e) => panic!("failed printing to stdout: {}", e), diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs index a0939999c7c..7219fef3732 100644 --- a/src/libstd/lib.rs +++ b/src/libstd/lib.rs @@ -229,6 +229,8 @@ pub mod hash; /* Threads and communication */ +pub mod task; +#[allow(missing_docs)] pub mod thread; pub mod sync; pub mod comm; diff --git a/src/libstd/os.rs b/src/libstd/os.rs index 550c8d2faba..0ff41586e72 100644 --- a/src/libstd/os.rs +++ b/src/libstd/os.rs @@ -49,7 +49,7 @@ use ptr::RawPtr; use ptr; use result::Result; use result::Result::{Err, Ok}; -use slice::{AsSlice, SliceExt, PartialEqSliceExt}; +use slice::{AsSlice, SliceExt}; use slice::CloneSliceExt; use str::{Str, StrPrelude, StrAllocating}; use string::{String, ToString}; diff --git a/src/libstd/path/posix.rs b/src/libstd/path/posix.rs index f872aa8e9a4..5ab8eb6b942 100644 --- a/src/libstd/path/posix.rs +++ b/src/libstd/path/posix.rs @@ -514,20 +514,20 @@ mod tests { #[test] fn test_null_byte() { - use task; - let result = task::try(move|| { + use thread::Thread; + let result = Thread::with_join(move|| { Path::new(b"foo/bar\0") - }); + }).join(); assert!(result.is_err()); - let result = task::try(move|| { + let result = Thread::with_join(move|| { Path::new("test").set_filename(b"f\0o") - }); + }).join(); assert!(result.is_err()); - let result = task::try(move|| { + let result = Thread::with_join(move|| { Path::new("test").push(b"f\0o"); - }); + }).join(); assert!(result.is_err()); } diff --git a/src/libstd/path/windows.rs b/src/libstd/path/windows.rs index b376f6d0d5b..171707c10c5 100644 --- a/src/libstd/path/windows.rs +++ b/src/libstd/path/windows.rs @@ -1298,20 +1298,20 @@ mod tests { #[test] fn test_null_byte() { - use task; - let result = task::try(move|| { + use thread::Thread; + let result = Thread::with_join(move|| { Path::new(b"foo/bar\0") - }); + }).join(); assert!(result.is_err()); - let result = task::try(move|| { + let result = Thread::with_join(move|| { Path::new("test").set_filename(b"f\0o") - }); + }).join(); assert!(result.is_err()); - let result = task::try(move|| { + let result = Thread::with_join(move|| { Path::new("test").push(b"f\0o"); - }); + }).join(); assert!(result.is_err()); } diff --git a/src/libstd/rand/os.rs b/src/libstd/rand/os.rs index 6bccef07131..d2ae66d6578 100644 --- a/src/libstd/rand/os.rs +++ b/src/libstd/rand/os.rs @@ -340,7 +340,7 @@ mod test { use super::OsRng; use rand::Rng; - use task; + use thread::Thread; #[test] fn test_os_rng() { @@ -360,25 +360,26 @@ mod test { for _ in range(0u, 20) { let (tx, rx) = channel(); txs.push(tx); - task::spawn(move|| { + + Thread::spawn(move|| { // wait until all the tasks are ready to go. rx.recv(); // deschedule to attempt to interleave things as much // as possible (XXX: is this a good test?) let mut r = OsRng::new().unwrap(); - task::deschedule(); + Thread::yield_now(); let mut v = [0u8, .. 1000]; for _ in range(0u, 100) { r.next_u32(); - task::deschedule(); + Thread::yield_now(); r.next_u64(); - task::deschedule(); + Thread::yield_now(); r.fill_bytes(&mut v); - task::deschedule(); + Thread::yield_now(); } - }) + }); } // start all the tasks diff --git a/src/libstd/rt/at_exit_imp.rs b/src/libstd/rt/at_exit_imp.rs index b8012134c9e..1b97a01146c 100644 --- a/src/libstd/rt/at_exit_imp.rs +++ b/src/libstd/rt/at_exit_imp.rs @@ -14,7 +14,6 @@ use core::prelude::*; -use libc; use boxed::Box; use vec::Vec; use sync::{Mutex, atomic, Once, ONCE_INIT}; @@ -25,31 +24,30 @@ type Queue = Mutex>; static INIT: Once = ONCE_INIT; static QUEUE: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; -static RUNNING: atomic::AtomicBool = atomic::INIT_ATOMIC_BOOL; fn init() { let state: Box = box Mutex::new(Vec::new()); unsafe { QUEUE.store(mem::transmute(state), atomic::SeqCst); - libc::atexit(run); + + // FIXME: switch this to use atexit as below. Currently this + // segfaults (the queue's memory is mysteriously gone), so + // instead the cleanup is tied to the `std::rt` entry point. + // + // ::libc::atexit(cleanup); } } -// Note: this is private and so can only be called via atexit above, -// which guarantees initialization. -extern fn run() { - let cur = unsafe { - rtassert!(!RUNNING.load(atomic::SeqCst)); +pub fn cleanup() { + unsafe { let queue = QUEUE.swap(0, atomic::SeqCst); - rtassert!(queue != 0); - - let queue: Box = mem::transmute(queue); - let v = mem::replace(&mut *queue.lock(), Vec::new()); - v - }; - - for to_run in cur.into_iter() { - to_run.invoke(()); + if queue != 0 { + let queue: Box = mem::transmute(queue); + let v = mem::replace(&mut *queue.lock(), Vec::new()); + for to_run in v.into_iter() { + to_run.invoke(); + } + } } } @@ -60,7 +58,6 @@ pub fn push(f: Thunk) { // all with respect to `run`, meaning that this could theoretically be a // use-after-free. There's not much we can do to protect against that, // however. Let's just assume a well-behaved runtime and go from there! - rtassert!(!RUNNING.load(atomic::SeqCst)); let queue = QUEUE.load(atomic::SeqCst); rtassert!(queue != 0); (*(queue as *const Queue)).lock().push(f); diff --git a/src/libstd/rt/backtrace.rs b/src/libstd/rt/backtrace.rs index 40885823a05..4a692bccf9e 100644 --- a/src/libstd/rt/backtrace.rs +++ b/src/libstd/rt/backtrace.rs @@ -12,12 +12,8 @@ #![allow(non_camel_case_types)] -use io::{IoResult, Writer}; -use iter::{Iterator, IteratorExt}; use option::Option::{Some, None}; use os; -use result::Result::{Ok, Err}; -use str::{StrPrelude, from_str}; use sync::atomic; pub use sys::backtrace::write; diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 44794d2b957..022e73121d7 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -48,14 +48,14 @@ #![allow(dead_code)] -use borrow::IntoCow; use failure; use os; use thunk::Thunk; use kinds::Send; use thread::Thread; +use sys; use sys_common; -use sys_common::thread::{mod, NewThread}; +use sys_common::thread_info::{mod, NewThread}; // Reexport some of our utilities which are expected by other crates. pub use self::util::{default_sched_threads, min_stack, running_on_valgrind}; @@ -87,10 +87,9 @@ pub const DEFAULT_ERROR_CODE: int = 101; /// Initializes global state, including frobbing /// the crate's logging flags, registering GC /// metadata, and storing the process arguments. +// FIXME: this should be unsafe #[allow(experimental)] pub fn init(argc: int, argv: *const *const u8) { - // FIXME: Derefing these pointers is not safe. - // Need to propagate the unsafety to `start`. unsafe { args::init(argc, argv); thread::init(); @@ -122,8 +121,6 @@ fn lang_start(main: *const u8, argc: int, argv: *const *const u8) -> int { pub fn start(argc: int, argv: *const *const u8, main: Thunk) -> int { use prelude::*; use rt; - use rt::task::Task; - use str; let something_around_the_top_of_the_stack = 1; let addr = &something_around_the_top_of_the_stack as *const int; @@ -153,18 +150,19 @@ pub fn start(argc: int, argv: *const *const u8, main: Thunk) -> int { init(argc, argv); let mut exit_code = None; - let thread: std::Thread = NewThread::new(Some("
".into_string())); + let thread: Thread = NewThread::new(Some("
".into_string())); thread_info::set((my_stack_bottom, my_stack_top), unsafe { sys::thread::guard::main() }, thread); - unwind::try(|| { - unsafe { + let mut main_opt = Some(main); // option dance + unsafe { + let _ = unwind::try(|| { sys_common::stack::record_os_managed_stack_bounds(my_stack_bottom, my_stack_top); - } - (main.take().unwrap()).invoke(()); - exit_code = Some(os::get_exit_status()); - }); - unsafe { cleanup(); } + (main_opt.take().unwrap()).invoke(); + exit_code = Some(os::get_exit_status()); + }); + cleanup(); + } // If the exit code wasn't set, then the task block must have panicked. return exit_code.unwrap_or(rt::DEFAULT_ERROR_CODE); } @@ -197,14 +195,6 @@ pub fn at_exit(f: proc():Send) { /// undefined behavior. pub unsafe fn cleanup() { args::cleanup(); - thread::cleanup(); -} - -// FIXME: these probably shouldn't be public... -#[doc(hidden)] -pub mod shouldnt_be_public { - #[cfg(not(test))] - pub use super::local_ptr::native::maybe_tls_key; - #[cfg(all(not(windows), not(target_os = "android"), not(target_os = "ios")))] - pub use super::local_ptr::compiled::RT_TLS_PTR; + sys::stack_overflow::cleanup(); + at_exit_imp::cleanup(); } diff --git a/src/libstd/rt/unwind.rs b/src/libstd/rt/unwind.rs index decf7cfb60a..f9f76e35bd4 100644 --- a/src/libstd/rt/unwind.rs +++ b/src/libstd/rt/unwind.rs @@ -565,7 +565,7 @@ fn begin_unwind_inner(msg: Box, file_line: &(&'static str, uint)) -> // Now that we've run all the necessary unwind callbacks, we actually // perform the unwinding. - if thread_info::unwinding() { + if thread_info::panicking() { // If a thread panics while it's already unwinding then we // have limited options. Currently our preference is to // just abort. In the future we may consider resuming diff --git a/src/libstd/rt/util.rs b/src/libstd/rt/util.rs index d3cfccab9d0..86dbb6066f3 100644 --- a/src/libstd/rt/util.rs +++ b/src/libstd/rt/util.rs @@ -196,8 +196,7 @@ memory and partly incapable of presentation to others.", } pub unsafe fn report_overflow() { - use rt::task::Task; - use rt::local::Local; + use thread::Thread; // See the message below for why this is not emitted to the // ^ Where did the message below go? @@ -206,11 +205,6 @@ pub unsafe fn report_overflow() { // call would happen to initialized it (calling out to libuv), // and the FFI call needs 2MB of stack when we just ran out. - let task: Option<*mut Task> = Local::try_unsafe_borrow(); - - let name = task.and_then(|task| { - (*task).name.as_ref().map(|n| n.as_slice()) - }); - - rterrln!("\ntask '{}' has overflowed its stack", name.unwrap_or("")); + rterrln!("\nthread '{}' has overflowed its stack", + Thread::current().name().unwrap_or("")); } diff --git a/src/libstd/sync/future.rs b/src/libstd/sync/future.rs index e5a1e09967c..16f2cff5998 100644 --- a/src/libstd/sync/future.rs +++ b/src/libstd/sync/future.rs @@ -29,8 +29,8 @@ use core::mem::replace; use self::FutureState::*; use comm::{Receiver, channel}; -use task::spawn; use thunk::{Thunk}; +use thread::Thread; /// A type encapsulating the result of a computation which may not be complete pub struct Future { @@ -139,7 +139,7 @@ impl Future { let (tx, rx) = channel(); - spawn(move |:| { + Thread::spawn(move |:| { // Don't panic if the other end has hung up let _ = tx.send_opt(blk()); }); diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs index 33f8d254c71..fc73e2957a5 100644 --- a/src/libstd/sync/mutex.rs +++ b/src/libstd/sync/mutex.rs @@ -274,7 +274,7 @@ impl Drop for StaticMutexGuard { mod test { use prelude::*; - use task; + use thread::Thread; use sync::{Arc, Mutex, StaticMutex, MUTEX_INIT, Condvar}; #[test] @@ -386,10 +386,10 @@ mod test { fn test_mutex_arc_poison() { let arc = Arc::new(Mutex::new(1i)); let arc2 = arc.clone(); - let _ = task::try(move|| { + let _ = Thread::with_join(move|| { let lock = arc2.lock(); assert_eq!(*lock, 2); - }); + }).join(); let lock = arc.lock(); assert_eq!(*lock, 1); } @@ -414,7 +414,7 @@ mod test { fn test_mutex_arc_access_in_unwind() { let arc = Arc::new(Mutex::new(1i)); let arc2 = arc.clone(); - let _ = task::try(move|| -> () { + let _ = Thread::with_join::<()>(move|| -> () { struct Unwinder { i: Arc>, } @@ -425,7 +425,7 @@ mod test { } let _u = Unwinder { i: arc2 }; panic!(); - }); + }).join(); let lock = arc.lock(); assert_eq!(*lock, 2); } diff --git a/src/libstd/sync/once.rs b/src/libstd/sync/once.rs index 263937c5cbe..a43f822e351 100644 --- a/src/libstd/sync/once.rs +++ b/src/libstd/sync/once.rs @@ -121,7 +121,7 @@ impl Once { mod test { use prelude::*; - use task; + use thread::Thread; use super::{ONCE_INIT, Once}; #[test] @@ -143,7 +143,7 @@ mod test { for _ in range(0u, 10) { let tx = tx.clone(); spawn(move|| { - for _ in range(0u, 4) { task::deschedule() } + for _ in range(0u, 4) { Thread::yield_now() } unsafe { O.doit(|| { assert!(!run); diff --git a/src/libstd/sync/poison.rs b/src/libstd/sync/poison.rs index ee151556620..ad08e9873fa 100644 --- a/src/libstd/sync/poison.rs +++ b/src/libstd/sync/poison.rs @@ -8,21 +8,19 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use option::Option::None; -use rustrt::task::Task; -use rustrt::local::Local; +use thread::Thread; pub struct Flag { pub failed: bool } impl Flag { pub fn borrow(&mut self) -> Guard { - Guard { flag: &mut self.failed, failing: failing() } + Guard { flag: &mut self.failed, panicking: Thread::panicking() } } } pub struct Guard<'a> { flag: &'a mut bool, - failing: bool, + panicking: bool, } impl<'a> Guard<'a> { @@ -33,16 +31,8 @@ impl<'a> Guard<'a> { } pub fn done(&mut self) { - if !self.failing && failing() { + if !self.panicking && Thread::panicking() { *self.flag = true; } } } - -fn failing() -> bool { - if Local::exists(None::) { - Local::borrow(None::).unwinder.unwinding() - } else { - false - } -} diff --git a/src/libstd/sync/rwlock.rs b/src/libstd/sync/rwlock.rs index b6d6aa989c5..1f1e9eea1d6 100644 --- a/src/libstd/sync/rwlock.rs +++ b/src/libstd/sync/rwlock.rs @@ -356,7 +356,7 @@ mod tests { use prelude::*; use rand::{mod, Rng}; - use task; + use thread::Thread; use sync::{Arc, RWLock, StaticRWLock, RWLOCK_INIT}; #[test] @@ -409,10 +409,10 @@ mod tests { fn test_rw_arc_poison_wr() { let arc = Arc::new(RWLock::new(1i)); let arc2 = arc.clone(); - let _ = task::try(move|| { + let _ = Thread::with_join(move|| { let lock = arc2.write(); assert_eq!(*lock, 2); - }); + }).join(); let lock = arc.read(); assert_eq!(*lock, 1); } @@ -422,10 +422,10 @@ mod tests { fn test_rw_arc_poison_ww() { let arc = Arc::new(RWLock::new(1i)); let arc2 = arc.clone(); - let _ = task::try(move|| { + let _ = Thread::with_join(move|| { let lock = arc2.write(); assert_eq!(*lock, 2); - }); + }).join(); let lock = arc.write(); assert_eq!(*lock, 1); } @@ -434,10 +434,10 @@ mod tests { fn test_rw_arc_no_poison_rr() { let arc = Arc::new(RWLock::new(1i)); let arc2 = arc.clone(); - let _ = task::try(move|| { + let _ = Thread::with_join(move|| { let lock = arc2.read(); assert_eq!(*lock, 2); - }); + }).join(); let lock = arc.read(); assert_eq!(*lock, 1); } @@ -445,10 +445,10 @@ mod tests { fn test_rw_arc_no_poison_rw() { let arc = Arc::new(RWLock::new(1i)); let arc2 = arc.clone(); - let _ = task::try(move|| { + let _ = Thread::with_join(move|| { let lock = arc2.read(); assert_eq!(*lock, 2); - }); + }).join(); let lock = arc.write(); assert_eq!(*lock, 1); } @@ -459,12 +459,12 @@ mod tests { let arc2 = arc.clone(); let (tx, rx) = channel(); - task::spawn(move|| { + Thread::spawn(move|| { let mut lock = arc2.write(); for _ in range(0u, 10) { let tmp = *lock; *lock = -1; - task::deschedule(); + Thread::yield_now(); *lock = tmp + 1; } tx.send(()); @@ -474,15 +474,15 @@ mod tests { let mut children = Vec::new(); for _ in range(0u, 5) { let arc3 = arc.clone(); - children.push(task::try_future(move|| { + children.push(Thread::with_join(move|| { let lock = arc3.read(); assert!(*lock >= 0); })); } // Wait for children to pass their asserts - for r in children.iter_mut() { - assert!(r.get_ref().is_ok()); + for r in children.into_iter() { + assert!(r.join().is_ok()); } // Wait for writer to finish @@ -495,7 +495,11 @@ mod tests { fn test_rw_arc_access_in_unwind() { let arc = Arc::new(RWLock::new(1i)); let arc2 = arc.clone(); +<<<<<<< HEAD let _ = task::try(move|| -> () { +======= + let _ = Thread::with_join::<()>(proc() { +>>>>>>> Fallout from new thread API struct Unwinder { i: Arc>, } @@ -507,7 +511,7 @@ mod tests { } let _u = Unwinder { i: arc2 }; panic!(); - }); + }).join(); let lock = arc.read(); assert_eq!(*lock, 2); } diff --git a/src/libstd/sync/task_pool.rs b/src/libstd/sync/task_pool.rs index fa5b62a202b..5e7944d5fe5 100644 --- a/src/libstd/sync/task_pool.rs +++ b/src/libstd/sync/task_pool.rs @@ -12,7 +12,7 @@ use core::prelude::*; -use task::{spawn}; +use thread::Thread; use comm::{channel, Sender, Receiver}; use sync::{Arc, Mutex}; use thunk::Thunk; @@ -105,7 +105,7 @@ impl TaskPool { } fn spawn_in_pool(jobs: Arc>>) { - spawn(move |:| { + Thread::spawn(move |:| { // Will spawn a new task on panic unless it is cancelled. let sentinel = Sentinel::new(&jobs); @@ -126,7 +126,7 @@ fn spawn_in_pool(jobs: Arc>>) { } sentinel.cancel(); - }) + }); } #[cfg(test)] @@ -206,4 +206,3 @@ mod test { waiter.wait(); } } - diff --git a/src/libstd/sys/common/backtrace.rs b/src/libstd/sys/common/backtrace.rs index 0c03060b314..38dc516bf3d 100644 --- a/src/libstd/sys/common/backtrace.rs +++ b/src/libstd/sys/common/backtrace.rs @@ -9,7 +9,7 @@ // except according to those terms. use io::{IoResult, Writer}; -use iter::Iterator; +use iter::{Iterator, IteratorExt}; use option::{Some, None}; use result::{Ok, Err}; use str::{StrPrelude, from_str}; diff --git a/src/libstd/sys/common/helper_thread.rs b/src/libstd/sys/common/helper_thread.rs index ef0181b72b0..8aa09d9bd30 100644 --- a/src/libstd/sys/common/helper_thread.rs +++ b/src/libstd/sys/common/helper_thread.rs @@ -28,7 +28,7 @@ use sync::{StaticMutex, StaticCondvar}; use rt; use sys::helper_signal; -use task; +use thread::Thread; /// A structure for management of a helper thread. /// @@ -82,7 +82,11 @@ impl Helper { *self.signal.get() = send as uint; let t = f(); +<<<<<<< HEAD task::spawn(move |:| { +======= + Thread::spawn(proc() { +>>>>>>> Fallout from new thread API helper(receive, rx, t); let _g = self.lock.lock(); *self.shutdown.get() = true; diff --git a/src/libstd/sys/common/mod.rs b/src/libstd/sys/common/mod.rs index aeee4cf01cd..dc0ad08cdbe 100644 --- a/src/libstd/sys/common/mod.rs +++ b/src/libstd/sys/common/mod.rs @@ -27,6 +27,7 @@ pub mod net; pub mod rwlock; pub mod stack; pub mod thread; +pub mod thread_info; pub mod thread_local; // common error constructors diff --git a/src/libstd/sys/common/thread_info.rs b/src/libstd/sys/common/thread_info.rs index f88fd49ab99..0612448cfa0 100644 --- a/src/libstd/sys/common/thread_info.rs +++ b/src/libstd/sys/common/thread_info.rs @@ -33,11 +33,11 @@ impl ThreadInfo { *c.borrow_mut() = Some(ThreadInfo { stack_bounds: (0, 0), stack_guard: 0, - unwinder: false, - thread: Thread::new(None), + unwinding: false, + thread: NewThread::new(None), }) } - f(c.borrow_mut().as_ref().unwrap()) + f(c.borrow_mut().as_mut().unwrap()) }) } } @@ -47,28 +47,25 @@ pub fn current_thread() -> Thread { } pub fn panicking() -> bool { - ThreadInfo::with(|info| info.unwinder.unwinding()) + ThreadInfo::with(|info| info.unwinding) } pub fn stack_guard() -> uint { ThreadInfo::with(|info| info.stack_guard) } -pub fn unwinding() -> bool { - ThreadInfo::with(|info| info.unwinder.unwinding) -} - pub fn set_unwinding(unwinding: bool) { ThreadInfo::with(|info| info.unwinding = unwinding) } pub fn set(stack_bounds: (uint, uint), stack_guard: uint, thread: Thread) { THREAD_INFO.with(|c| assert!(c.borrow().is_none())); + let mut thread_opt = Some(thread); // option dance THREAD_INFO.with(|c| *c.borrow_mut() = Some(ThreadInfo{ stack_bounds: stack_bounds, stack_guard: stack_guard, unwinding: false, - thread: thread, + thread: thread_opt.take().unwrap(), })); } diff --git a/src/libstd/sys/common/thread_local.rs b/src/libstd/sys/common/thread_local.rs index c3bf5cfc301..fe7a7d8d037 100644 --- a/src/libstd/sys/common/thread_local.rs +++ b/src/libstd/sys/common/thread_local.rs @@ -58,7 +58,6 @@ use prelude::*; -use rt; use sync::atomic::{mod, AtomicUint}; use sync::{Mutex, Once, ONCE_INIT}; diff --git a/src/libstd/sys/unix/backtrace.rs b/src/libstd/sys/unix/backtrace.rs index 2b36ece0e4b..5256e2cc809 100644 --- a/src/libstd/sys/unix/backtrace.rs +++ b/src/libstd/sys/unix/backtrace.rs @@ -141,7 +141,7 @@ pub fn write(w: &mut Writer) -> IoResult<()> { struct Context<'a> { idx: int, - writer: &'a mut Writer+'a, + writer: &'a mut (Writer+'a), last_error: Option, } diff --git a/src/libstd/sys/unix/stack_overflow.rs b/src/libstd/sys/unix/stack_overflow.rs index 73b98f762b4..340f9514241 100644 --- a/src/libstd/sys/unix/stack_overflow.rs +++ b/src/libstd/sys/unix/stack_overflow.rs @@ -45,8 +45,6 @@ mod imp { use self::signal::{siginfo, sigaction, SIGBUS, SIG_DFL, SA_SIGINFO, SA_ONSTACK, sigaltstack, SIGSTKSZ}; - use rt::local::Local; - use rt::task::Task; use libc; use libc::funcs::posix88::mman::{mmap, munmap}; use libc::consts::os::posix88::{SIGSEGV, @@ -56,20 +54,12 @@ mod imp { MAP_ANON, MAP_FAILED}; + use sys_common::thread_info; + // This is initialized in init() and only read from after static mut PAGE_SIZE: uint = 0; - // get_task_info is called from an exception / signal handler. - // It returns the guard page of the current task or 0 if that - // guard page doesn't exist. None is returned if there's currently - // no local task. - unsafe fn get_task_guard_page() -> Option { - let task: Option<*mut Task> = Local::try_unsafe_borrow(); - task.map(|task| (&*task).stack_guard().unwrap_or(0)) - } - - #[no_stack_check] unsafe extern fn signal_handler(signum: libc::c_int, info: *mut siginfo, @@ -89,20 +79,16 @@ mod imp { // We're calling into functions with stack checks stack::record_sp_limit(0); - match get_task_guard_page() { - Some(guard) => { - let addr = (*info).si_addr as uint; + let guard = thread_info::stack_guard(); + let addr = (*info).si_addr as uint; - if guard == 0 || addr < guard - PAGE_SIZE || addr >= guard { - term(signum); - } - - report_overflow(); - - intrinsics::abort() - } - None => term(signum) + if guard == 0 || addr < guard - PAGE_SIZE || addr >= guard { + term(signum); } + + report_overflow(); + + intrinsics::abort() } static mut MAIN_ALTSTACK: *mut libc::c_void = 0 as *mut libc::c_void; diff --git a/src/libstd/sys/windows/stack_overflow.rs b/src/libstd/sys/windows/stack_overflow.rs index e3d96a054f4..63b5b6f5863 100644 --- a/src/libstd/sys/windows/stack_overflow.rs +++ b/src/libstd/sys/windows/stack_overflow.rs @@ -8,15 +8,13 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use rt::local::Local; -use rt::task::Task; use rt::util::report_overflow; use core::prelude::*; use ptr; use mem; use libc; use libc::types::os::arch::extra::{LPVOID, DWORD, LONG, BOOL}; -use sys_common::stack; +use sys_common::{stack, thread_info}; pub struct Handler { _data: *mut libc::c_void @@ -37,8 +35,7 @@ impl Drop for Handler { // guard page doesn't exist. None is returned if there's currently // no local task. unsafe fn get_task_guard_page() -> Option { - let task: Option<*mut Task> = Local::try_unsafe_borrow(); - task.map(|task| (&*task).stack_guard().unwrap_or(0)) + thread_info::stack_guard() } // This is initialized in init() and only read from after diff --git a/src/libstd/task.rs b/src/libstd/task.rs index 6881a1adb25..13395fc9c76 100644 --- a/src/libstd/task.rs +++ b/src/libstd/task.rs @@ -12,8 +12,11 @@ #![deprecated = "use std::thread instead"] +use any::Any; +use boxed::Box; use thread; use kinds::Send; +use result::Result; /// Deprecate: use `std::thread::Cfg` instead. #[deprecated = "use std::thread::Cfg instead"] @@ -24,3 +27,15 @@ pub type TaskBuilder = thread::Cfg; pub fn spawn(f: proc(): Send) { thread::Thread::spawn(f); } + +/// Deprecated: use `std::thread::Thread::with_join instead`. +#[deprecated = "use std::thread::Thread::with_join instead"] +pub fn try(f: proc(): Send -> T) -> Result> { + thread::Thread::with_join(f).join() +} + +/// Deprecated: use `std::thread::Thread::yield_now instead`. +#[deprecated = "use std::thread::Thread::yield_now instead"] +pub fn deschedule() { + thread::Thread::yield_now() +} diff --git a/src/libstd/thread.rs b/src/libstd/thread.rs index a6e114bc2c3..f179219884f 100644 --- a/src/libstd/thread.rs +++ b/src/libstd/thread.rs @@ -231,7 +231,7 @@ impl Cfg { } thread_info::set( (my_stack_bottom, my_stack_top), - thread::current_guard_page(), + unsafe { imp::guard::current() }, their_thread ); @@ -261,7 +261,7 @@ impl Cfg { } } }; - (unsafe { imp::create(stack, box main) }, my_thread) + (unsafe { imp::create(stack_size, box main) }, my_thread) } /// Spawn a detached thread, and return a handle to it. @@ -278,19 +278,20 @@ impl Cfg { // We need the address of the packet to fill in to be stable so when // `main` fills it in it's still valid, so allocate an extra box to do // so. - let my_packet = box Err(box 0); // sentinel value + let any: Box = box 0u8; // sentinel value + let my_packet = box Err(any); let their_packet: *mut Result = unsafe { *mem::transmute::<&Box>, *const *mut Result>(&my_packet) }; let (native, thread) = self.core_spawn(f, proc(result) { - *their_packet = result; + unsafe { *their_packet = result; } }); JoinGuard { native: native, joined: false, - packet: my_packet, + packet: Some(my_packet), thread: thread, } } @@ -336,7 +337,7 @@ impl Thread { /// Gets a handle to the thread that invokes it. pub fn current() -> Thread { - ThreadInfo::current_thread() + thread_info::current_thread() } /// Cooperatively give up a timeslice to the OS scheduler. @@ -346,7 +347,7 @@ impl Thread { /// Determines whether the current thread is panicking. pub fn panicking() -> bool { - ThreadInfo::panicking() + thread_info::panicking() } // http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp @@ -355,9 +356,9 @@ impl Thread { /// See the module doc for more detail. pub fn park() { let thread = Thread::current(); - let guard = thread.inner.lock.lock(); + let mut guard = thread.inner.lock.lock(); while !*guard { - thread.inner.cvar.wait(guard); + thread.inner.cvar.wait(&guard); } *guard = false; } @@ -366,7 +367,7 @@ impl Thread { /// /// See the module doc for more detail. pub fn unpark(&self) { - let guard = self.inner.lock(); + let mut guard = self.inner.lock.lock(); if !*guard { *guard = true; self.inner.cvar.notify_one(); @@ -375,7 +376,7 @@ impl Thread { /// Get the thread's name. pub fn name(&self) -> Option<&str> { - self.inner.name.as_ref() + self.inner.name.as_ref().map(|s| s.as_slice()) } } @@ -387,7 +388,7 @@ impl thread_info::NewThread for Thread { /// Indicates the manner in which a thread exited. /// /// A thread that completes without panicking is considered to exit successfully. -pub type Result = result::Result>; +pub type Result = ::result::Result>; #[must_use] /// An RAII guard that will block until thread termination when dropped. @@ -395,7 +396,7 @@ pub struct JoinGuard { native: imp::rust_thread, thread: Thread, joined: bool, - packet: Box>, + packet: Option>>, } impl JoinGuard { diff --git a/src/libstd/thread_local/mod.rs b/src/libstd/thread_local/mod.rs index b78428d69de..e2cedae1be6 100644 --- a/src/libstd/thread_local/mod.rs +++ b/src/libstd/thread_local/mod.rs @@ -446,7 +446,7 @@ mod tests { use prelude::*; use cell::UnsafeCell; - use rt::thread::Thread; + use thread::Thread; struct Foo(Sender<()>); @@ -534,7 +534,7 @@ mod tests { } } - Thread::start(move|| { + Thread::with_join(move|| { drop(S1); }).join(); } @@ -552,7 +552,7 @@ mod tests { } } - Thread::start(move|| unsafe { + Thread::with_join(move|| unsafe { K1.with(|s| *s.get() = Some(S1)); }).join(); } diff --git a/src/libtest/lib.rs b/src/libtest/lib.rs index d786409e895..044a4a173c6 100644 --- a/src/libtest/lib.rs +++ b/src/libtest/lib.rs @@ -69,7 +69,7 @@ use std::num::{Float, FloatMath, Int}; use std::os; use std::str::FromStr; use std::string::String; -use std::task::TaskBuilder; +use std::thread::{mod, Thread}; use std::time::Duration; use std::thunk::{Thunk, Invoke}; @@ -1121,28 +1121,27 @@ pub fn run_test(opts: &TestOpts, monitor_ch: Sender, nocapture: bool, testfn: Thunk) { - spawn(move || { + Thread::spawn(move || { let (tx, rx) = channel(); let mut reader = ChanReader::new(rx); let stdout = ChanWriter::new(tx.clone()); let stderr = ChanWriter::new(tx); - let mut task = TaskBuilder::new().named(match desc.name { + let mut cfg = thread::cfg().name(match desc.name { DynTestName(ref name) => name.clone().to_string(), StaticTestName(name) => name.to_string(), }); if nocapture { drop((stdout, stderr)); } else { - task = task.stdout(box stdout as Box); - task = task.stderr(box stderr as Box); + cfg = cfg.stdout(box stdout as Box); + cfg = cfg.stderr(box stderr as Box); } - let result_future = task.try_future(move || testfn.invoke(())); + let result_guard = cfg.with_join(testfn); let stdout = reader.read_to_end().unwrap().into_iter().collect(); - let task_result = result_future.into_inner(); - let test_result = calc_result(&desc, task_result); + let test_result = calc_result(&desc, result_guard.join()); monitor_ch.send((desc.clone(), test_result, stdout)); - }) + }); } match testfn { diff --git a/src/test/run-pass/cleanup-rvalue-temp-during-incomplete-alloc.rs b/src/test/run-pass/cleanup-rvalue-temp-during-incomplete-alloc.rs index 771b34159f2..b5acb55d4a1 100644 --- a/src/test/run-pass/cleanup-rvalue-temp-during-incomplete-alloc.rs +++ b/src/test/run-pass/cleanup-rvalue-temp-during-incomplete-alloc.rs @@ -24,7 +24,7 @@ // It's unclear how likely such a bug is to recur, but it seems like a // scenario worth testing. -use std::task; +use std::thread::Thread; enum Conzabble { Bickwick(Foo) @@ -45,5 +45,5 @@ pub fn fails() { } pub fn main() { - task::try(fails); + Thread::with_join(fails).join(); } diff --git a/src/test/run-pass/task-stderr.rs b/src/test/run-pass/task-stderr.rs index 72cf7599f89..8b54de6b805 100644 --- a/src/test/run-pass/task-stderr.rs +++ b/src/test/run-pass/task-stderr.rs @@ -9,16 +9,16 @@ // except according to those terms. use std::io::{ChanReader, ChanWriter}; -use std::task::TaskBuilder; +use std::thread; fn main() { let (tx, rx) = channel(); let mut reader = ChanReader::new(rx); let stderr = ChanWriter::new(tx); - let res = TaskBuilder::new().stderr(box stderr as Box).try(move|| -> () { + let res = thread::cfg().stderr(box stderr as Box).with_join(move|| -> () { panic!("Hello, world!") - }); + }).join(); assert!(res.is_err()); let output = reader.read_to_string().unwrap();