mirror of
https://github.com/rust-lang/rust.git
synced 2024-12-03 04:04:06 +00:00
auto merge of #5909 : brson/rust/rt4, r=graydon
This is just a bunch of minor changes and simplifications to the structure of core::rt. It makes ownership of the ~Scheduler more strict (though it is still mutably aliased sometimes), turns the scheduler cleanup_jobs vector into just a single job, shunts the thread-local scheduler code off to its own file.
This commit is contained in:
commit
16652c9c17
@ -32,6 +32,7 @@ macro_rules! rtdebug (
|
||||
($( $arg:expr),+) => ( $(let _ = $arg)*; )
|
||||
)
|
||||
|
||||
#[path = "sched/mod.rs"]
|
||||
mod sched;
|
||||
mod rtio;
|
||||
pub mod uvll;
|
||||
|
100
src/libcore/rt/sched/local.rs
Normal file
100
src/libcore/rt/sched/local.rs
Normal file
@ -0,0 +1,100 @@
|
||||
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
|
||||
// file at the top-level directory of this distribution and at
|
||||
// http://rust-lang.org/COPYRIGHT.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
//! Access to the thread-local Scheduler
|
||||
|
||||
use ptr::mut_null;
|
||||
use libc::c_void;
|
||||
use cast::transmute;
|
||||
|
||||
use super::Scheduler;
|
||||
use tls = super::super::thread_local_storage;
|
||||
#[cfg(test)] use super::super::uvio::UvEventLoop;
|
||||
|
||||
/// Give the Scheduler to thread-local storage
|
||||
pub fn put(sched: ~Scheduler) {
|
||||
unsafe {
|
||||
let key = tls_key();
|
||||
let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched);
|
||||
tls::set(key, void_sched);
|
||||
}
|
||||
}
|
||||
|
||||
/// Take ownership of the Scheduler from thread-local storage
|
||||
pub fn take() -> ~Scheduler {
|
||||
unsafe {
|
||||
let key = tls_key();
|
||||
let void_sched: *mut c_void = tls::get(key);
|
||||
assert!(void_sched.is_not_null());
|
||||
let sched = transmute::<*mut c_void, ~Scheduler>(void_sched);
|
||||
tls::set(key, mut_null());
|
||||
return sched;
|
||||
}
|
||||
}
|
||||
|
||||
/// Borrow a mutable reference to the thread-local Scheduler
|
||||
/// # Safety Note
|
||||
/// Because this leaves the Scheduler in thread-local storage it is possible
|
||||
/// For the Scheduler pointer to be aliased
|
||||
pub unsafe fn borrow() -> &mut Scheduler {
|
||||
unsafe {
|
||||
let key = tls_key();
|
||||
let mut void_sched: *mut c_void = tls::get(key);
|
||||
assert!(void_sched.is_not_null());
|
||||
{
|
||||
let void_sched_ptr = &mut void_sched;
|
||||
let sched: &mut ~Scheduler = {
|
||||
transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
|
||||
};
|
||||
let sched: &mut Scheduler = &mut **sched;
|
||||
return sched;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn tls_key() -> tls::Key {
|
||||
unsafe {
|
||||
let key: *mut c_void = rust_get_sched_tls_key();
|
||||
let key: &mut tls::Key = transmute(key);
|
||||
return *key;
|
||||
}
|
||||
}
|
||||
|
||||
extern {
|
||||
fn rust_get_sched_tls_key() -> *mut c_void;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_local_scheduler_smoke_test() {
|
||||
let scheduler = ~UvEventLoop::new_scheduler();
|
||||
put(scheduler);
|
||||
let _scheduler = take();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_local_scheduler_two_instances() {
|
||||
let scheduler = ~UvEventLoop::new_scheduler();
|
||||
put(scheduler);
|
||||
let _scheduler = take();
|
||||
let scheduler = ~UvEventLoop::new_scheduler();
|
||||
put(scheduler);
|
||||
let _scheduler = take();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn borrow_smoke_test() {
|
||||
let scheduler = ~UvEventLoop::new_scheduler();
|
||||
put(scheduler);
|
||||
unsafe {
|
||||
let _scheduler = borrow();
|
||||
}
|
||||
let _scheduler = take();
|
||||
}
|
||||
|
@ -11,18 +11,18 @@
|
||||
use option::*;
|
||||
use sys;
|
||||
use cast::transmute;
|
||||
use libc::c_void;
|
||||
use ptr::mut_null;
|
||||
|
||||
use super::work_queue::WorkQueue;
|
||||
use super::stack::{StackPool, StackSegment};
|
||||
use super::rtio::{EventLoop, EventLoopObject};
|
||||
use super::rtio::{EventLoop, EventLoopObject, IoFactoryObject};
|
||||
use super::context::Context;
|
||||
use tls = super::thread_local_storage;
|
||||
|
||||
#[cfg(test)] use super::uvio::UvEventLoop;
|
||||
#[cfg(test)] use unstable::run_in_bare_thread;
|
||||
#[cfg(test)] use int;
|
||||
#[cfg(test)] use cell::Cell;
|
||||
|
||||
mod local;
|
||||
|
||||
/// The Scheduler is responsible for coordinating execution of Tasks
|
||||
/// on a single thread. When the scheduler is running it is owned by
|
||||
@ -38,31 +38,25 @@ pub struct Scheduler {
|
||||
priv saved_context: Context,
|
||||
/// The currently executing task
|
||||
priv current_task: Option<~Task>,
|
||||
/// A queue of jobs to perform immediately upon return from task
|
||||
/// context to scheduler context.
|
||||
/// XXX: This probably should be a single cleanup action and it
|
||||
/// should run after a context switch, not on return from the
|
||||
/// scheduler
|
||||
priv cleanup_jobs: ~[CleanupJob]
|
||||
/// An action performed after a context switch on behalf of the
|
||||
/// code running before the context switch
|
||||
priv cleanup_job: Option<CleanupJob>
|
||||
}
|
||||
|
||||
// XXX: Some hacks to put a &fn in Scheduler without borrowck
|
||||
// complaining
|
||||
type UnsafeTaskReceiver = sys::Closure;
|
||||
trait HackAroundBorrowCk {
|
||||
fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
|
||||
fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
|
||||
trait ClosureConverter {
|
||||
fn from_fn(&fn(~Task)) -> Self;
|
||||
fn to_fn(self) -> &fn(~Task);
|
||||
}
|
||||
impl HackAroundBorrowCk for UnsafeTaskReceiver {
|
||||
fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
|
||||
unsafe { transmute(f) }
|
||||
}
|
||||
fn to_fn(self) -> &fn(&mut Scheduler, ~Task) {
|
||||
unsafe { transmute(self) }
|
||||
}
|
||||
impl ClosureConverter for UnsafeTaskReceiver {
|
||||
fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
|
||||
fn to_fn(self) -> &fn(~Task) { unsafe { transmute(self) } }
|
||||
}
|
||||
|
||||
enum CleanupJob {
|
||||
DoNothing,
|
||||
RescheduleTask(~Task),
|
||||
RecycleTask(~Task),
|
||||
GiveTask(~Task, UnsafeTaskReceiver)
|
||||
@ -84,7 +78,7 @@ pub impl Scheduler {
|
||||
stack_pool: StackPool::new(),
|
||||
saved_context: Context::empty(),
|
||||
current_task: None,
|
||||
cleanup_jobs: ~[]
|
||||
cleanup_job: None
|
||||
}
|
||||
}
|
||||
|
||||
@ -96,39 +90,52 @@ pub impl Scheduler {
|
||||
assert!(!self.in_task_context());
|
||||
|
||||
// Give ownership of the scheduler (self) to the thread
|
||||
do self.install |scheduler| {
|
||||
fn run_scheduler_once() {
|
||||
do Scheduler::local |scheduler| {
|
||||
if scheduler.resume_task_from_queue() {
|
||||
// Ok, a task ran. Nice! We'll do it again later
|
||||
scheduler.event_loop.callback(run_scheduler_once);
|
||||
}
|
||||
local::put(self);
|
||||
|
||||
let scheduler = unsafe { local::borrow() };
|
||||
fn run_scheduler_once() {
|
||||
let scheduler = Scheduler::take_local();
|
||||
if scheduler.resume_task_from_queue() {
|
||||
// Ok, a task ran. Nice! We'll do it again later
|
||||
do Scheduler::borrow_local |scheduler| {
|
||||
scheduler.event_loop.callback(run_scheduler_once);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
scheduler.event_loop.callback(run_scheduler_once);
|
||||
scheduler.event_loop.run();
|
||||
scheduler.event_loop.callback(run_scheduler_once);
|
||||
scheduler.event_loop.run();
|
||||
|
||||
return local::take();
|
||||
}
|
||||
|
||||
/// Get a mutable pointer to the thread-local I/O
|
||||
/// # Safety Note
|
||||
/// This allows other mutable aliases to the scheduler, both in the current
|
||||
/// execution context and other execution contexts.
|
||||
unsafe fn borrow_local_io() -> &mut IoFactoryObject {
|
||||
unsafe {
|
||||
let io = local::borrow().event_loop.io().unwrap();
|
||||
transmute::<&mut IoFactoryObject, &mut IoFactoryObject>(io)
|
||||
}
|
||||
}
|
||||
|
||||
fn install(~self, f: &fn(&mut Scheduler)) -> ~Scheduler {
|
||||
let mut tlsched = ThreadLocalScheduler::new();
|
||||
tlsched.put_scheduler(self);
|
||||
{
|
||||
let sched = tlsched.get_scheduler();
|
||||
f(sched);
|
||||
}
|
||||
return tlsched.take_scheduler();
|
||||
/// Borrow the thread-local scheduler from thread-local storage.
|
||||
/// While the scheduler is borrowed it is not available in TLS.
|
||||
fn borrow_local(f: &fn(&mut Scheduler)) {
|
||||
let mut sched = local::take();
|
||||
f(sched);
|
||||
local::put(sched);
|
||||
}
|
||||
|
||||
fn local(f: &fn(&mut Scheduler)) {
|
||||
let mut tlsched = ThreadLocalScheduler::new();
|
||||
f(tlsched.get_scheduler());
|
||||
/// Take ownership of the scheduler from thread local storage
|
||||
fn take_local() -> ~Scheduler {
|
||||
local::take()
|
||||
}
|
||||
|
||||
// * Scheduler-context operations
|
||||
|
||||
fn resume_task_from_queue(&mut self) -> bool {
|
||||
fn resume_task_from_queue(~self) -> bool {
|
||||
assert!(!self.in_task_context());
|
||||
|
||||
let mut self = self;
|
||||
@ -139,24 +146,38 @@ pub impl Scheduler {
|
||||
}
|
||||
None => {
|
||||
rtdebug!("no tasks in queue");
|
||||
local::put(self);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn resume_task_immediately(&mut self, task: ~Task) {
|
||||
fn resume_task_immediately(~self, task: ~Task) {
|
||||
let mut self = self;
|
||||
assert!(!self.in_task_context());
|
||||
|
||||
rtdebug!("scheduling a task");
|
||||
|
||||
// Store the task in the scheduler so it can be grabbed later
|
||||
self.current_task = Some(task);
|
||||
self.swap_in_task();
|
||||
self.enqueue_cleanup_job(DoNothing);
|
||||
|
||||
local::put(self);
|
||||
|
||||
// Take pointers to both the task and scheduler's saved registers.
|
||||
let sched = unsafe { local::borrow() };
|
||||
let (sched_context, _, next_task_context) = sched.get_contexts();
|
||||
let next_task_context = next_task_context.unwrap();
|
||||
// Context switch to the task, restoring it's registers
|
||||
// and saving the scheduler's
|
||||
Context::swap(sched_context, next_task_context);
|
||||
|
||||
let sched = unsafe { local::borrow() };
|
||||
// The running task should have passed ownership elsewhere
|
||||
assert!(self.current_task.is_none());
|
||||
assert!(sched.current_task.is_none());
|
||||
|
||||
// Running tasks may have asked us to do some cleanup
|
||||
self.run_cleanup_jobs();
|
||||
sched.run_cleanup_job();
|
||||
}
|
||||
|
||||
|
||||
@ -164,15 +185,23 @@ pub impl Scheduler {
|
||||
|
||||
/// Called by a running task to end execution, after which it will
|
||||
/// be recycled by the scheduler for reuse in a new task.
|
||||
fn terminate_current_task(&mut self) {
|
||||
fn terminate_current_task(~self) {
|
||||
let mut self = self;
|
||||
assert!(self.in_task_context());
|
||||
|
||||
rtdebug!("ending running task");
|
||||
|
||||
let dead_task = self.current_task.swap_unwrap();
|
||||
self.enqueue_cleanup_job(RecycleTask(dead_task));
|
||||
let dead_task = self.task_from_last_cleanup_job();
|
||||
self.swap_out_task(dead_task);
|
||||
|
||||
local::put(self);
|
||||
|
||||
let sched = unsafe { local::borrow() };
|
||||
let (sched_context, last_task_context, _) = sched.get_contexts();
|
||||
let last_task_context = last_task_context.unwrap();
|
||||
Context::swap(last_task_context, sched_context);
|
||||
|
||||
// Control never reaches here
|
||||
}
|
||||
|
||||
/// Block a running task, context switch to the scheduler, then pass the
|
||||
@ -183,127 +212,120 @@ pub impl Scheduler {
|
||||
/// The closure here is a *stack* closure that lives in the
|
||||
/// running task. It gets transmuted to the scheduler's lifetime
|
||||
/// and called while the task is blocked.
|
||||
fn block_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) {
|
||||
fn deschedule_running_task_and_then(~self, f: &fn(~Task)) {
|
||||
let mut self = self;
|
||||
assert!(self.in_task_context());
|
||||
|
||||
rtdebug!("blocking task");
|
||||
|
||||
let blocked_task = self.current_task.swap_unwrap();
|
||||
let f_fake_region = unsafe {
|
||||
transmute::<&fn(&mut Scheduler, ~Task), &fn(&mut Scheduler, ~Task)>(f)
|
||||
};
|
||||
let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region);
|
||||
let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) };
|
||||
let f_opaque = ClosureConverter::from_fn(f_fake_region);
|
||||
self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
|
||||
let blocked_task = self.task_from_last_cleanup_job();
|
||||
|
||||
self.swap_out_task(blocked_task);
|
||||
local::put(self);
|
||||
|
||||
let sched = unsafe { local::borrow() };
|
||||
let (sched_context, last_task_context, _) = sched.get_contexts();
|
||||
let last_task_context = last_task_context.unwrap();
|
||||
Context::swap(last_task_context, sched_context);
|
||||
|
||||
// We could be executing in a different thread now
|
||||
let sched = unsafe { local::borrow() };
|
||||
sched.run_cleanup_job();
|
||||
}
|
||||
|
||||
/// Switch directly to another task, without going through the scheduler.
|
||||
/// You would want to think hard about doing this, e.g. if there are
|
||||
/// pending I/O events it would be a bad idea.
|
||||
fn resume_task_from_running_task_direct(&mut self, next_task: ~Task) {
|
||||
fn resume_task_from_running_task_direct(~self, next_task: ~Task) {
|
||||
let mut self = self;
|
||||
assert!(self.in_task_context());
|
||||
|
||||
rtdebug!("switching tasks");
|
||||
|
||||
let old_running_task = self.current_task.swap_unwrap();
|
||||
self.enqueue_cleanup_job(RescheduleTask(old_running_task));
|
||||
let old_running_task = self.task_from_last_cleanup_job();
|
||||
|
||||
self.current_task = Some(next_task);
|
||||
self.swap_in_task_from_running_task(old_running_task);
|
||||
|
||||
local::put(self);
|
||||
|
||||
let sched = unsafe { local::borrow() };
|
||||
let (_, last_task_context, next_task_context) = sched.get_contexts();
|
||||
let last_task_context = last_task_context.unwrap();
|
||||
let next_task_context = next_task_context.unwrap();
|
||||
Context::swap(last_task_context, next_task_context);
|
||||
|
||||
// We could be executing in a different thread now
|
||||
let sched = unsafe { local::borrow() };
|
||||
sched.run_cleanup_job();
|
||||
}
|
||||
|
||||
|
||||
// * Context switching
|
||||
|
||||
// NB: When switching to a task callers are expected to first set
|
||||
// self.running_task. When switching away from a task likewise move
|
||||
// out of the self.running_task
|
||||
|
||||
priv fn swap_in_task(&mut self) {
|
||||
// Take pointers to both the task and scheduler's saved registers.
|
||||
let running_task: &~Task = self.current_task.get_ref();
|
||||
let task_context = &running_task.saved_context;
|
||||
let scheduler_context = &mut self.saved_context;
|
||||
|
||||
// Context switch to the task, restoring it's registers
|
||||
// and saving the scheduler's
|
||||
Context::swap(scheduler_context, task_context);
|
||||
}
|
||||
|
||||
priv fn swap_out_task(&mut self, running_task: &mut Task) {
|
||||
let task_context = &mut running_task.saved_context;
|
||||
let scheduler_context = &self.saved_context;
|
||||
Context::swap(task_context, scheduler_context);
|
||||
}
|
||||
|
||||
priv fn swap_in_task_from_running_task(&mut self, running_task: &mut Task) {
|
||||
let running_task_context = &mut running_task.saved_context;
|
||||
let next_context = &self.current_task.get_ref().saved_context;
|
||||
Context::swap(running_task_context, next_context);
|
||||
}
|
||||
|
||||
|
||||
// * Other stuff
|
||||
|
||||
fn in_task_context(&self) -> bool { self.current_task.is_some() }
|
||||
|
||||
fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
|
||||
self.cleanup_jobs.unshift(job);
|
||||
assert!(self.cleanup_job.is_none());
|
||||
self.cleanup_job = Some(job);
|
||||
}
|
||||
|
||||
fn run_cleanup_jobs(&mut self) {
|
||||
assert!(!self.in_task_context());
|
||||
rtdebug!("running cleanup jobs");
|
||||
fn run_cleanup_job(&mut self) {
|
||||
rtdebug!("running cleanup job");
|
||||
|
||||
while !self.cleanup_jobs.is_empty() {
|
||||
match self.cleanup_jobs.pop() {
|
||||
RescheduleTask(task) => {
|
||||
// NB: Pushing to the *front* of the queue
|
||||
self.task_queue.push_front(task);
|
||||
}
|
||||
RecycleTask(task) => task.recycle(&mut self.stack_pool),
|
||||
GiveTask(task, f) => (f.to_fn())(self, task)
|
||||
assert!(self.cleanup_job.is_some());
|
||||
|
||||
let cleanup_job = self.cleanup_job.swap_unwrap();
|
||||
match cleanup_job {
|
||||
DoNothing => { }
|
||||
RescheduleTask(task) => {
|
||||
// NB: Pushing to the *front* of the queue
|
||||
self.task_queue.push_front(task);
|
||||
}
|
||||
RecycleTask(task) => task.recycle(&mut self.stack_pool),
|
||||
GiveTask(task, f) => (f.to_fn())(task)
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: Hack. This should return &'self mut but I don't know how to
|
||||
// make the borrowcheck happy
|
||||
#[cfg(stage0)]
|
||||
fn task_from_last_cleanup_job(&mut self) -> &mut Task {
|
||||
assert!(!self.cleanup_jobs.is_empty());
|
||||
let last_job: &'self mut CleanupJob = &mut self.cleanup_jobs[0];
|
||||
let last_task: &'self Task = match last_job {
|
||||
&RescheduleTask(~ref task) => task,
|
||||
&RecycleTask(~ref task) => task,
|
||||
&GiveTask(~ref task, _) => task,
|
||||
/// Get mutable references to all the contexts that may be involved in a
|
||||
/// context switch.
|
||||
///
|
||||
/// Returns (the scheduler context, the optional context of the
|
||||
/// task in the cleanup list, the optional context of the task in
|
||||
/// the current task slot). When context switching to a task,
|
||||
/// callers should first arrange for that task to be located in the
|
||||
/// Scheduler's current_task slot and set up the
|
||||
/// post-context-switch cleanup job.
|
||||
fn get_contexts<'a>(&'a mut self) -> (&'a mut Context,
|
||||
Option<&'a mut Context>,
|
||||
Option<&'a mut Context>) {
|
||||
let last_task = match self.cleanup_job {
|
||||
Some(RescheduleTask(~ref task)) |
|
||||
Some(RecycleTask(~ref task)) |
|
||||
Some(GiveTask(~ref task, _)) => {
|
||||
Some(task)
|
||||
}
|
||||
Some(DoNothing) => {
|
||||
None
|
||||
}
|
||||
None => fail!(fmt!("all context switches should have a cleanup job"))
|
||||
};
|
||||
// XXX: Pattern matching mutable pointers above doesn't work
|
||||
// because borrowck thinks the three patterns are conflicting
|
||||
// borrows
|
||||
return unsafe { transmute::<&Task, &mut Task>(last_task) };
|
||||
}
|
||||
|
||||
// XXX: Hack. This should return &'self mut but I don't know how to
|
||||
// make the borrowcheck happy
|
||||
#[cfg(stage1)]
|
||||
#[cfg(stage2)]
|
||||
#[cfg(stage3)]
|
||||
fn task_from_last_cleanup_job<'a>(&'a mut self) -> &mut Task {
|
||||
assert!(!self.cleanup_jobs.is_empty());
|
||||
let last_job: &'a mut CleanupJob = &mut self.cleanup_jobs[0];
|
||||
let last_task: &'a Task = match last_job {
|
||||
&RescheduleTask(~ref task) => task,
|
||||
&RecycleTask(~ref task) => task,
|
||||
&GiveTask(~ref task, _) => task,
|
||||
};
|
||||
// XXX: Pattern matching mutable pointers above doesn't work
|
||||
// because borrowck thinks the three patterns are conflicting
|
||||
// borrows
|
||||
return unsafe { transmute::<&Task, &mut Task>(last_task) };
|
||||
unsafe {
|
||||
let last_task = transmute::<Option<&Task>, Option<&mut Task>>(last_task);
|
||||
let last_task_context = match last_task {
|
||||
Some(ref t) => Some(&mut t.saved_context), None => None
|
||||
};
|
||||
let next_task_context = match self.current_task {
|
||||
Some(ref mut t) => Some(&mut t.saved_context), None => None
|
||||
};
|
||||
// XXX: These transmutes can be removed after snapshot
|
||||
return (transmute(&mut self.saved_context),
|
||||
last_task_context,
|
||||
transmute(next_task_context));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -333,10 +355,15 @@ pub impl Task {
|
||||
priv fn build_start_wrapper(start: ~fn()) -> ~fn() {
|
||||
// XXX: The old code didn't have this extra allocation
|
||||
let wrapper: ~fn() = || {
|
||||
// This is the first code to execute after the initial
|
||||
// context switch to the task. The previous context may
|
||||
// have asked us to do some cleanup.
|
||||
let sched = unsafe { local::borrow() };
|
||||
sched.run_cleanup_job();
|
||||
|
||||
start();
|
||||
|
||||
let mut sched = ThreadLocalScheduler::new();
|
||||
let sched = sched.get_scheduler();
|
||||
let sched = Scheduler::take_local();
|
||||
sched.terminate_current_task();
|
||||
};
|
||||
return wrapper;
|
||||
@ -352,112 +379,6 @@ pub impl Task {
|
||||
}
|
||||
}
|
||||
|
||||
// NB: This is a type so we can use make use of the &self region.
|
||||
struct ThreadLocalScheduler(tls::Key);
|
||||
|
||||
impl ThreadLocalScheduler {
|
||||
fn new() -> ThreadLocalScheduler {
|
||||
unsafe {
|
||||
// NB: This assumes that the TLS key has been created prior.
|
||||
// Currently done in rust_start.
|
||||
let key: *mut c_void = rust_get_sched_tls_key();
|
||||
let key: &mut tls::Key = transmute(key);
|
||||
ThreadLocalScheduler(*key)
|
||||
}
|
||||
}
|
||||
|
||||
fn put_scheduler(&mut self, scheduler: ~Scheduler) {
|
||||
unsafe {
|
||||
let key = match self { &ThreadLocalScheduler(key) => key };
|
||||
let value: *mut c_void = transmute::<~Scheduler, *mut c_void>(scheduler);
|
||||
tls::set(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(stage0)]
|
||||
fn get_scheduler(&mut self) -> &'self mut Scheduler {
|
||||
unsafe {
|
||||
let key = match self { &ThreadLocalScheduler(key) => key };
|
||||
let mut value: *mut c_void = tls::get(key);
|
||||
assert!(value.is_not_null());
|
||||
{
|
||||
let value_ptr = &mut value;
|
||||
let sched: &mut ~Scheduler = {
|
||||
transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr)
|
||||
};
|
||||
let sched: &mut Scheduler = &mut **sched;
|
||||
return sched;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(stage1)]
|
||||
#[cfg(stage2)]
|
||||
#[cfg(stage3)]
|
||||
fn get_scheduler<'a>(&'a mut self) -> &'a mut Scheduler {
|
||||
unsafe {
|
||||
let key = match self { &ThreadLocalScheduler(key) => key };
|
||||
let mut value: *mut c_void = tls::get(key);
|
||||
assert!(value.is_not_null());
|
||||
{
|
||||
let value_ptr = &mut value;
|
||||
let sched: &mut ~Scheduler = {
|
||||
transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr)
|
||||
};
|
||||
let sched: &mut Scheduler = &mut **sched;
|
||||
return sched;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn take_scheduler(&mut self) -> ~Scheduler {
|
||||
unsafe {
|
||||
let key = match self { &ThreadLocalScheduler(key) => key };
|
||||
let value: *mut c_void = tls::get(key);
|
||||
assert!(value.is_not_null());
|
||||
let sched = transmute(value);
|
||||
tls::set(key, mut_null());
|
||||
return sched;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extern {
|
||||
fn rust_get_sched_tls_key() -> *mut c_void;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_local_scheduler_smoke_test() {
|
||||
let scheduler = ~UvEventLoop::new_scheduler();
|
||||
let mut tls_scheduler = ThreadLocalScheduler::new();
|
||||
tls_scheduler.put_scheduler(scheduler);
|
||||
{
|
||||
let _scheduler = tls_scheduler.get_scheduler();
|
||||
}
|
||||
let _scheduler = tls_scheduler.take_scheduler();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_local_scheduler_two_instances() {
|
||||
let scheduler = ~UvEventLoop::new_scheduler();
|
||||
let mut tls_scheduler = ThreadLocalScheduler::new();
|
||||
tls_scheduler.put_scheduler(scheduler);
|
||||
{
|
||||
|
||||
let _scheduler = tls_scheduler.get_scheduler();
|
||||
}
|
||||
{
|
||||
let scheduler = tls_scheduler.take_scheduler();
|
||||
tls_scheduler.put_scheduler(scheduler);
|
||||
}
|
||||
|
||||
let mut tls_scheduler = ThreadLocalScheduler::new();
|
||||
{
|
||||
let _scheduler = tls_scheduler.get_scheduler();
|
||||
}
|
||||
let _scheduler = tls_scheduler.take_scheduler();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_scheduling() {
|
||||
do run_in_bare_thread {
|
||||
@ -502,13 +423,12 @@ fn test_swap_tasks() {
|
||||
let mut sched = ~UvEventLoop::new_scheduler();
|
||||
let task1 = ~do Task::new(&mut sched.stack_pool) {
|
||||
unsafe { *count_ptr = *count_ptr + 1; }
|
||||
do Scheduler::local |sched| {
|
||||
let task2 = ~do Task::new(&mut sched.stack_pool) {
|
||||
unsafe { *count_ptr = *count_ptr + 1; }
|
||||
};
|
||||
// Context switch directly to the new task
|
||||
sched.resume_task_from_running_task_direct(task2);
|
||||
}
|
||||
let mut sched = Scheduler::take_local();
|
||||
let task2 = ~do Task::new(&mut sched.stack_pool) {
|
||||
unsafe { *count_ptr = *count_ptr + 1; }
|
||||
};
|
||||
// Context switch directly to the new task
|
||||
sched.resume_task_from_running_task_direct(task2);
|
||||
unsafe { *count_ptr = *count_ptr + 1; }
|
||||
};
|
||||
sched.task_queue.push_back(task1);
|
||||
@ -535,7 +455,7 @@ fn test_run_a_lot_of_tasks_queued() {
|
||||
assert!(count == MAX);
|
||||
|
||||
fn run_task(count_ptr: *mut int) {
|
||||
do Scheduler::local |sched| {
|
||||
do Scheduler::borrow_local |sched| {
|
||||
let task = ~do Task::new(&mut sched.stack_pool) {
|
||||
unsafe {
|
||||
*count_ptr = *count_ptr + 1;
|
||||
@ -568,18 +488,17 @@ fn test_run_a_lot_of_tasks_direct() {
|
||||
assert!(count == MAX);
|
||||
|
||||
fn run_task(count_ptr: *mut int) {
|
||||
do Scheduler::local |sched| {
|
||||
let task = ~do Task::new(&mut sched.stack_pool) {
|
||||
unsafe {
|
||||
*count_ptr = *count_ptr + 1;
|
||||
if *count_ptr != MAX {
|
||||
run_task(count_ptr);
|
||||
}
|
||||
let mut sched = Scheduler::take_local();
|
||||
let task = ~do Task::new(&mut sched.stack_pool) {
|
||||
unsafe {
|
||||
*count_ptr = *count_ptr + 1;
|
||||
if *count_ptr != MAX {
|
||||
run_task(count_ptr);
|
||||
}
|
||||
};
|
||||
// Context switch directly to the new task
|
||||
sched.resume_task_from_running_task_direct(task);
|
||||
}
|
||||
}
|
||||
};
|
||||
// Context switch directly to the new task
|
||||
sched.resume_task_from_running_task_direct(task);
|
||||
};
|
||||
}
|
||||
}
|
||||
@ -589,11 +508,13 @@ fn test_block_task() {
|
||||
do run_in_bare_thread {
|
||||
let mut sched = ~UvEventLoop::new_scheduler();
|
||||
let task = ~do Task::new(&mut sched.stack_pool) {
|
||||
do Scheduler::local |sched| {
|
||||
assert!(sched.in_task_context());
|
||||
do sched.block_running_task_and_then() |sched, task| {
|
||||
let sched = Scheduler::take_local();
|
||||
assert!(sched.in_task_context());
|
||||
do sched.deschedule_running_task_and_then() |task| {
|
||||
let task = Cell(task);
|
||||
do Scheduler::borrow_local |sched| {
|
||||
assert!(!sched.in_task_context());
|
||||
sched.task_queue.push_back(task);
|
||||
sched.task_queue.push_back(task.take());
|
||||
}
|
||||
}
|
||||
};
|
@ -120,37 +120,37 @@ impl IoFactory for UvIoFactory {
|
||||
let result_cell = empty_cell();
|
||||
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
|
||||
|
||||
do Scheduler::local |scheduler| {
|
||||
assert!(scheduler.in_task_context());
|
||||
let scheduler = Scheduler::take_local();
|
||||
assert!(scheduler.in_task_context());
|
||||
|
||||
// Block this task and take ownership, switch to scheduler context
|
||||
do scheduler.block_running_task_and_then |scheduler, task| {
|
||||
// Block this task and take ownership, switch to scheduler context
|
||||
do scheduler.deschedule_running_task_and_then |task| {
|
||||
|
||||
rtdebug!("connect: entered scheduler context");
|
||||
rtdebug!("connect: entered scheduler context");
|
||||
do Scheduler::borrow_local |scheduler| {
|
||||
assert!(!scheduler.in_task_context());
|
||||
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
|
||||
let task_cell = Cell(task);
|
||||
}
|
||||
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
|
||||
let task_cell = Cell(task);
|
||||
|
||||
// Wait for a connection
|
||||
do tcp_watcher.connect(addr) |stream_watcher, status| {
|
||||
rtdebug!("connect: in connect callback");
|
||||
let maybe_stream = if status.is_none() {
|
||||
rtdebug!("status is none");
|
||||
Some(~UvStream(stream_watcher))
|
||||
} else {
|
||||
rtdebug!("status is some");
|
||||
stream_watcher.close(||());
|
||||
None
|
||||
};
|
||||
// Wait for a connection
|
||||
do tcp_watcher.connect(addr) |stream_watcher, status| {
|
||||
rtdebug!("connect: in connect callback");
|
||||
let maybe_stream = if status.is_none() {
|
||||
rtdebug!("status is none");
|
||||
Some(~UvStream(stream_watcher))
|
||||
} else {
|
||||
rtdebug!("status is some");
|
||||
stream_watcher.close(||());
|
||||
None
|
||||
};
|
||||
|
||||
// Store the stream in the task's stack
|
||||
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
|
||||
// Store the stream in the task's stack
|
||||
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
|
||||
|
||||
// Context switch
|
||||
do Scheduler::local |scheduler| {
|
||||
scheduler.resume_task_immediately(task_cell.take());
|
||||
}
|
||||
}
|
||||
// Context switch
|
||||
let scheduler = Scheduler::take_local();
|
||||
scheduler.resume_task_immediately(task_cell.take());
|
||||
}
|
||||
}
|
||||
|
||||
@ -194,33 +194,31 @@ impl TcpListener for UvTcpListener {
|
||||
|
||||
let server_tcp_watcher = self.watcher();
|
||||
|
||||
do Scheduler::local |scheduler| {
|
||||
assert!(scheduler.in_task_context());
|
||||
let scheduler = Scheduler::take_local();
|
||||
assert!(scheduler.in_task_context());
|
||||
|
||||
do scheduler.block_running_task_and_then |_, task| {
|
||||
let task_cell = Cell(task);
|
||||
let mut server_tcp_watcher = server_tcp_watcher;
|
||||
do server_tcp_watcher.listen |server_stream_watcher, status| {
|
||||
let maybe_stream = if status.is_none() {
|
||||
let mut server_stream_watcher = server_stream_watcher;
|
||||
let mut loop_ = loop_from_watcher(&server_stream_watcher);
|
||||
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
|
||||
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
|
||||
// XXX: Need's to be surfaced in interface
|
||||
server_stream_watcher.accept(client_tcp_watcher);
|
||||
Some(~UvStream::new(client_tcp_watcher))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
do scheduler.deschedule_running_task_and_then |task| {
|
||||
let task_cell = Cell(task);
|
||||
let mut server_tcp_watcher = server_tcp_watcher;
|
||||
do server_tcp_watcher.listen |server_stream_watcher, status| {
|
||||
let maybe_stream = if status.is_none() {
|
||||
let mut server_stream_watcher = server_stream_watcher;
|
||||
let mut loop_ = loop_from_watcher(&server_stream_watcher);
|
||||
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
|
||||
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
|
||||
// XXX: Need's to be surfaced in interface
|
||||
server_stream_watcher.accept(client_tcp_watcher);
|
||||
Some(~UvStream::new(client_tcp_watcher))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
|
||||
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
|
||||
|
||||
rtdebug!("resuming task from listen");
|
||||
// Context switch
|
||||
do Scheduler::local |scheduler| {
|
||||
scheduler.resume_task_immediately(task_cell.take());
|
||||
}
|
||||
}
|
||||
rtdebug!("resuming task from listen");
|
||||
// Context switch
|
||||
let scheduler = Scheduler::take_local();
|
||||
scheduler.resume_task_immediately(task_cell.take());
|
||||
}
|
||||
}
|
||||
|
||||
@ -259,42 +257,42 @@ impl Stream for UvStream {
|
||||
let result_cell = empty_cell();
|
||||
let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
|
||||
|
||||
do Scheduler::local |scheduler| {
|
||||
assert!(scheduler.in_task_context());
|
||||
let watcher = self.watcher();
|
||||
let buf_ptr: *&mut [u8] = &buf;
|
||||
do scheduler.block_running_task_and_then |scheduler, task| {
|
||||
rtdebug!("read: entered scheduler context");
|
||||
let scheduler = Scheduler::take_local();
|
||||
assert!(scheduler.in_task_context());
|
||||
let watcher = self.watcher();
|
||||
let buf_ptr: *&mut [u8] = &buf;
|
||||
do scheduler.deschedule_running_task_and_then |task| {
|
||||
rtdebug!("read: entered scheduler context");
|
||||
do Scheduler::borrow_local |scheduler| {
|
||||
assert!(!scheduler.in_task_context());
|
||||
}
|
||||
let mut watcher = watcher;
|
||||
let task_cell = Cell(task);
|
||||
// XXX: We shouldn't reallocate these callbacks every
|
||||
// call to read
|
||||
let alloc: AllocCallback = |_| unsafe {
|
||||
slice_to_uv_buf(*buf_ptr)
|
||||
};
|
||||
do watcher.read_start(alloc) |watcher, nread, _buf, status| {
|
||||
|
||||
// Stop reading so that no read callbacks are
|
||||
// triggered before the user calls `read` again.
|
||||
// XXX: Is there a performance impact to calling
|
||||
// stop here?
|
||||
let mut watcher = watcher;
|
||||
let task_cell = Cell(task);
|
||||
// XXX: We shouldn't reallocate these callbacks every
|
||||
// call to read
|
||||
let alloc: AllocCallback = |_| unsafe {
|
||||
slice_to_uv_buf(*buf_ptr)
|
||||
watcher.read_stop();
|
||||
|
||||
let result = if status.is_none() {
|
||||
assert!(nread >= 0);
|
||||
Ok(nread as uint)
|
||||
} else {
|
||||
Err(())
|
||||
};
|
||||
do watcher.read_start(alloc) |watcher, nread, _buf, status| {
|
||||
|
||||
// Stop reading so that no read callbacks are
|
||||
// triggered before the user calls `read` again.
|
||||
// XXX: Is there a performance impact to calling
|
||||
// stop here?
|
||||
let mut watcher = watcher;
|
||||
watcher.read_stop();
|
||||
unsafe { (*result_cell_ptr).put_back(result); }
|
||||
|
||||
let result = if status.is_none() {
|
||||
assert!(nread >= 0);
|
||||
Ok(nread as uint)
|
||||
} else {
|
||||
Err(())
|
||||
};
|
||||
|
||||
unsafe { (*result_cell_ptr).put_back(result); }
|
||||
|
||||
do Scheduler::local |scheduler| {
|
||||
scheduler.resume_task_immediately(task_cell.take());
|
||||
}
|
||||
}
|
||||
let scheduler = Scheduler::take_local();
|
||||
scheduler.resume_task_immediately(task_cell.take());
|
||||
}
|
||||
}
|
||||
|
||||
@ -305,29 +303,27 @@ impl Stream for UvStream {
|
||||
fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
|
||||
let result_cell = empty_cell();
|
||||
let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
|
||||
do Scheduler::local |scheduler| {
|
||||
assert!(scheduler.in_task_context());
|
||||
let watcher = self.watcher();
|
||||
let buf_ptr: *&[u8] = &buf;
|
||||
do scheduler.block_running_task_and_then |_, task| {
|
||||
let mut watcher = watcher;
|
||||
let task_cell = Cell(task);
|
||||
let buf = unsafe { &*buf_ptr };
|
||||
// XXX: OMGCOPIES
|
||||
let buf = buf.to_vec();
|
||||
do watcher.write(buf) |_watcher, status| {
|
||||
let result = if status.is_none() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(())
|
||||
};
|
||||
let scheduler = Scheduler::take_local();
|
||||
assert!(scheduler.in_task_context());
|
||||
let watcher = self.watcher();
|
||||
let buf_ptr: *&[u8] = &buf;
|
||||
do scheduler.deschedule_running_task_and_then |task| {
|
||||
let mut watcher = watcher;
|
||||
let task_cell = Cell(task);
|
||||
let buf = unsafe { &*buf_ptr };
|
||||
// XXX: OMGCOPIES
|
||||
let buf = buf.to_vec();
|
||||
do watcher.write(buf) |_watcher, status| {
|
||||
let result = if status.is_none() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(())
|
||||
};
|
||||
|
||||
unsafe { (*result_cell_ptr).put_back(result); }
|
||||
unsafe { (*result_cell_ptr).put_back(result); }
|
||||
|
||||
do Scheduler::local |scheduler| {
|
||||
scheduler.resume_task_immediately(task_cell.take());
|
||||
}
|
||||
}
|
||||
let scheduler = Scheduler::take_local();
|
||||
scheduler.resume_task_immediately(task_cell.take());
|
||||
}
|
||||
}
|
||||
|
||||
@ -342,12 +338,10 @@ fn test_simple_io_no_connect() {
|
||||
do run_in_bare_thread {
|
||||
let mut sched = ~UvEventLoop::new_scheduler();
|
||||
let task = ~do Task::new(&mut sched.stack_pool) {
|
||||
do Scheduler::local |sched| {
|
||||
let io = sched.event_loop.io().unwrap();
|
||||
let addr = Ipv4(127, 0, 0, 1, 2926);
|
||||
let maybe_chan = io.connect(addr);
|
||||
assert!(maybe_chan.is_none());
|
||||
}
|
||||
let io = unsafe { Scheduler::borrow_local_io() };
|
||||
let addr = Ipv4(127, 0, 0, 1, 2926);
|
||||
let maybe_chan = io.connect(addr);
|
||||
assert!(maybe_chan.is_none());
|
||||
};
|
||||
sched.task_queue.push_back(task);
|
||||
sched.run();
|
||||
@ -362,29 +356,25 @@ fn test_simple_tcp_server_and_client() {
|
||||
let addr = Ipv4(127, 0, 0, 1, 2929);
|
||||
|
||||
let client_task = ~do Task::new(&mut sched.stack_pool) {
|
||||
do Scheduler::local |sched| {
|
||||
let io = sched.event_loop.io().unwrap();
|
||||
let mut stream = io.connect(addr).unwrap();
|
||||
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
stream.close();
|
||||
}
|
||||
let io = unsafe { Scheduler::borrow_local_io() };
|
||||
let mut stream = io.connect(addr).unwrap();
|
||||
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
stream.close();
|
||||
};
|
||||
|
||||
let server_task = ~do Task::new(&mut sched.stack_pool) {
|
||||
do Scheduler::local |sched| {
|
||||
let io = sched.event_loop.io().unwrap();
|
||||
let mut listener = io.bind(addr).unwrap();
|
||||
let mut stream = listener.listen().unwrap();
|
||||
let mut buf = [0, .. 2048];
|
||||
let nread = stream.read(buf).unwrap();
|
||||
assert!(nread == 8);
|
||||
for uint::range(0, nread) |i| {
|
||||
rtdebug!("%u", buf[i] as uint);
|
||||
assert!(buf[i] == i as u8);
|
||||
}
|
||||
stream.close();
|
||||
listener.close();
|
||||
let io = unsafe { Scheduler::borrow_local_io() };
|
||||
let mut listener = io.bind(addr).unwrap();
|
||||
let mut stream = listener.listen().unwrap();
|
||||
let mut buf = [0, .. 2048];
|
||||
let nread = stream.read(buf).unwrap();
|
||||
assert!(nread == 8);
|
||||
for uint::range(0, nread) |i| {
|
||||
rtdebug!("%u", buf[i] as uint);
|
||||
assert!(buf[i] == i as u8);
|
||||
}
|
||||
stream.close();
|
||||
listener.close();
|
||||
};
|
||||
|
||||
// Start the server first so it listens before the client connects
|
||||
@ -401,53 +391,51 @@ fn test_read_and_block() {
|
||||
let addr = Ipv4(127, 0, 0, 1, 2930);
|
||||
|
||||
let client_task = ~do Task::new(&mut sched.stack_pool) {
|
||||
do Scheduler::local |sched| {
|
||||
let io = sched.event_loop.io().unwrap();
|
||||
let mut stream = io.connect(addr).unwrap();
|
||||
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
stream.close();
|
||||
}
|
||||
let io = unsafe { Scheduler::borrow_local_io() };
|
||||
let mut stream = io.connect(addr).unwrap();
|
||||
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
stream.close();
|
||||
};
|
||||
|
||||
let server_task = ~do Task::new(&mut sched.stack_pool) {
|
||||
do Scheduler::local |sched| {
|
||||
let io = sched.event_loop.io().unwrap();
|
||||
let mut listener = io.bind(addr).unwrap();
|
||||
let mut stream = listener.listen().unwrap();
|
||||
let mut buf = [0, .. 2048];
|
||||
let io = unsafe { Scheduler::borrow_local_io() };
|
||||
let mut listener = io.bind(addr).unwrap();
|
||||
let mut stream = listener.listen().unwrap();
|
||||
let mut buf = [0, .. 2048];
|
||||
|
||||
let expected = 32;
|
||||
let mut current = 0;
|
||||
let mut reads = 0;
|
||||
let expected = 32;
|
||||
let mut current = 0;
|
||||
let mut reads = 0;
|
||||
|
||||
while current < expected {
|
||||
let nread = stream.read(buf).unwrap();
|
||||
for uint::range(0, nread) |i| {
|
||||
let val = buf[i] as uint;
|
||||
assert!(val == current % 8);
|
||||
current += 1;
|
||||
}
|
||||
reads += 1;
|
||||
while current < expected {
|
||||
let nread = stream.read(buf).unwrap();
|
||||
for uint::range(0, nread) |i| {
|
||||
let val = buf[i] as uint;
|
||||
assert!(val == current % 8);
|
||||
current += 1;
|
||||
}
|
||||
reads += 1;
|
||||
|
||||
do Scheduler::local |scheduler| {
|
||||
// Yield to the other task in hopes that it
|
||||
// will trigger a read callback while we are
|
||||
// not ready for it
|
||||
do scheduler.block_running_task_and_then |scheduler, task| {
|
||||
scheduler.task_queue.push_back(task);
|
||||
}
|
||||
let scheduler = Scheduler::take_local();
|
||||
// Yield to the other task in hopes that it
|
||||
// will trigger a read callback while we are
|
||||
// not ready for it
|
||||
do scheduler.deschedule_running_task_and_then |task| {
|
||||
let task = Cell(task);
|
||||
do Scheduler::borrow_local |scheduler| {
|
||||
scheduler.task_queue.push_back(task.take());
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we had multiple reads
|
||||
assert!(reads > 1);
|
||||
|
||||
stream.close();
|
||||
listener.close();
|
||||
}
|
||||
|
||||
// Make sure we had multiple reads
|
||||
assert!(reads > 1);
|
||||
|
||||
stream.close();
|
||||
listener.close();
|
||||
};
|
||||
|
||||
// Start the server first so it listens before the client connects
|
||||
@ -464,19 +452,17 @@ fn test_read_read_read() {
|
||||
let addr = Ipv4(127, 0, 0, 1, 2931);
|
||||
|
||||
let client_task = ~do Task::new(&mut sched.stack_pool) {
|
||||
do Scheduler::local |sched| {
|
||||
let io = sched.event_loop.io().unwrap();
|
||||
let mut stream = io.connect(addr).unwrap();
|
||||
let mut buf = [0, .. 2048];
|
||||
let mut total_bytes_read = 0;
|
||||
while total_bytes_read < 500000000 {
|
||||
let nread = stream.read(buf).unwrap();
|
||||
rtdebug!("read %u bytes", nread as uint);
|
||||
total_bytes_read += nread;
|
||||
}
|
||||
rtdebug_!("read %u bytes total", total_bytes_read as uint);
|
||||
stream.close();
|
||||
let io = unsafe { Scheduler::borrow_local_io() };
|
||||
let mut stream = io.connect(addr).unwrap();
|
||||
let mut buf = [0, .. 2048];
|
||||
let mut total_bytes_read = 0;
|
||||
while total_bytes_read < 500000000 {
|
||||
let nread = stream.read(buf).unwrap();
|
||||
rtdebug!("read %u bytes", nread as uint);
|
||||
total_bytes_read += nread;
|
||||
}
|
||||
rtdebug_!("read %u bytes total", total_bytes_read as uint);
|
||||
stream.close();
|
||||
};
|
||||
|
||||
sched.task_queue.push_back(client_task);
|
||||
|
Loading…
Reference in New Issue
Block a user