mirror of
https://github.com/rust-lang/rust.git
synced 2024-12-02 19:53:46 +00:00
std: Reimplement std::comm without the scheduler
Like the librustuv refactoring, this refactors std::comm to sever all ties with the scheduler. This means that the entire `comm::imp` module can be deleted in favor of implementations outside of libstd.
This commit is contained in:
parent
daaec28c6f
commit
49e5493587
@ -1,337 +0,0 @@
|
|||||||
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
|
|
||||||
// file at the top-level directory of this distribution and at
|
|
||||||
// http://rust-lang.org/COPYRIGHT.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
|
||||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
|
||||||
// option. This file may not be copied, modified, or distributed
|
|
||||||
// except according to those terms.
|
|
||||||
|
|
||||||
//! One of the major goals behind this channel implementation is to work
|
|
||||||
//! seamlessly on and off the runtime. This also means that the code isn't
|
|
||||||
//! littered with "if is_green() { ... } else { ... }". Right now, the rest of
|
|
||||||
//! the runtime isn't quite ready to for this abstraction to be done very
|
|
||||||
//! nicely, so the conditional "if green" blocks are all contained in this inner
|
|
||||||
//! module.
|
|
||||||
//!
|
|
||||||
//! The goal of this module is to mirror what the runtime "should be", not the
|
|
||||||
//! state that it is currently in today. You'll notice that there is no mention
|
|
||||||
//! of schedulers or is_green inside any of the channel code, it is currently
|
|
||||||
//! entirely contained in this one module.
|
|
||||||
//!
|
|
||||||
//! In the ideal world, nothing in this module exists and it is all implemented
|
|
||||||
//! elsewhere in the runtime (in the proper location). All of this code is
|
|
||||||
//! structured in order to easily refactor this to the correct location whenever
|
|
||||||
//! we have the trait objects in place to serve as the boundary of the
|
|
||||||
//! abstraction.
|
|
||||||
|
|
||||||
use iter::{range, Iterator};
|
|
||||||
use ops::Drop;
|
|
||||||
use option::{Some, None, Option};
|
|
||||||
use rt::local::Local;
|
|
||||||
use rt::sched::{SchedHandle, Scheduler, TaskFromFriend};
|
|
||||||
use rt::thread::Thread;
|
|
||||||
use rt;
|
|
||||||
use unstable::mutex::Mutex;
|
|
||||||
use unstable::sync::UnsafeArc;
|
|
||||||
|
|
||||||
// A task handle is a method of waking up a blocked task. The handle itself
|
|
||||||
// is completely opaque and only has a wake() method defined on it. This
|
|
||||||
// method will wake the method regardless of the context of the thread which
|
|
||||||
// is currently calling wake().
|
|
||||||
//
|
|
||||||
// This abstraction should be able to be created when putting a task to
|
|
||||||
// sleep. This should basically be a method on whatever the local Task is,
|
|
||||||
// consuming the local Task.
|
|
||||||
|
|
||||||
pub struct TaskHandle {
|
|
||||||
priv inner: TaskRepr
|
|
||||||
}
|
|
||||||
enum TaskRepr {
|
|
||||||
Green(rt::BlockedTask, *mut SchedHandle),
|
|
||||||
Native(NativeWakeupStyle),
|
|
||||||
}
|
|
||||||
enum NativeWakeupStyle {
|
|
||||||
ArcWakeup(UnsafeArc<Mutex>), // shared mutex to synchronize on
|
|
||||||
LocalWakeup(*mut Mutex), // synchronize on the task-local mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TaskHandle {
|
|
||||||
// Signal that this handle should be woken up. The `can_resched`
|
|
||||||
// argument indicates whether the current task could possibly be
|
|
||||||
// rescheduled or not. This does not have a lot of meaning for the
|
|
||||||
// native case, but for an M:N case it indicates whether a context
|
|
||||||
// switch can happen or not.
|
|
||||||
pub fn wake(self, can_resched: bool) {
|
|
||||||
match self.inner {
|
|
||||||
Green(task, handle) => {
|
|
||||||
// If we have a local scheduler, then use that to run the
|
|
||||||
// blocked task, otherwise we can use the handle to send the
|
|
||||||
// task back to its home.
|
|
||||||
if rt::in_green_task_context() {
|
|
||||||
if can_resched {
|
|
||||||
task.wake().map(Scheduler::run_task);
|
|
||||||
} else {
|
|
||||||
let mut s: ~Scheduler = Local::take();
|
|
||||||
s.enqueue_blocked_task(task);
|
|
||||||
Local::put(s);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let task = match task.wake() {
|
|
||||||
Some(task) => task, None => return
|
|
||||||
};
|
|
||||||
// XXX: this is not an easy section of code to refactor.
|
|
||||||
// If this handle is owned by the Task (which it
|
|
||||||
// should be), then this would be a use-after-free
|
|
||||||
// because once the task is pushed onto the message
|
|
||||||
// queue, the handle is gone.
|
|
||||||
//
|
|
||||||
// Currently the handle is instead owned by the
|
|
||||||
// Port/Chan pair, which means that because a
|
|
||||||
// channel is invoking this method the handle will
|
|
||||||
// continue to stay alive for the entire duration
|
|
||||||
// of this method. This will require thought when
|
|
||||||
// moving the handle into the task.
|
|
||||||
unsafe { (*handle).send(TaskFromFriend(task)) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Note that there are no use-after-free races in this code. In
|
|
||||||
// the arc-case, we own the lock, and in the local case, we're
|
|
||||||
// using a lock so it's guranteed that they aren't running while
|
|
||||||
// we hold the lock.
|
|
||||||
Native(ArcWakeup(lock)) => {
|
|
||||||
unsafe {
|
|
||||||
let lock = lock.get();
|
|
||||||
(*lock).lock();
|
|
||||||
(*lock).signal();
|
|
||||||
(*lock).unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Native(LocalWakeup(lock)) => {
|
|
||||||
unsafe {
|
|
||||||
(*lock).lock();
|
|
||||||
(*lock).signal();
|
|
||||||
(*lock).unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trashes handle to this task. This ensures that necessary memory is
|
|
||||||
// deallocated, and there may be some extra assertions as well.
|
|
||||||
pub fn trash(self) {
|
|
||||||
match self.inner {
|
|
||||||
Green(task, _) => task.assert_already_awake(),
|
|
||||||
Native(..) => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This structure is an abstraction of what should be stored in the local
|
|
||||||
// task itself. This data is currently stored inside of each channel, but
|
|
||||||
// this should rather be stored in each task (and channels will still
|
|
||||||
// continue to lazily initialize this data).
|
|
||||||
|
|
||||||
pub struct TaskData {
|
|
||||||
priv handle: Option<SchedHandle>,
|
|
||||||
priv lock: Mutex,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TaskData {
|
|
||||||
pub fn new() -> TaskData {
|
|
||||||
TaskData {
|
|
||||||
handle: None,
|
|
||||||
lock: unsafe { Mutex::empty() },
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for TaskData {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
unsafe { self.lock.destroy() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now this is the really fun part. This is where all the M:N/1:1-agnostic
|
|
||||||
// along with recv/select-agnostic blocking information goes. A "blocking
|
|
||||||
// context" is really just a stack-allocated structure (which is probably
|
|
||||||
// fine to be a stack-trait-object).
|
|
||||||
//
|
|
||||||
// This has some particularly strange interfaces, but the reason for all
|
|
||||||
// this is to support selection/recv/1:1/M:N all in one bundle.
|
|
||||||
|
|
||||||
pub struct BlockingContext<'a> {
|
|
||||||
priv inner: BlockingRepr<'a>
|
|
||||||
}
|
|
||||||
|
|
||||||
enum BlockingRepr<'a> {
|
|
||||||
GreenBlock(rt::BlockedTask, &'a mut Scheduler),
|
|
||||||
NativeBlock(Option<UnsafeArc<Mutex>>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> BlockingContext<'a> {
|
|
||||||
// Creates one blocking context. The data provided should in theory be
|
|
||||||
// acquired from the local task, but it is instead acquired from the
|
|
||||||
// channel currently.
|
|
||||||
//
|
|
||||||
// This function will call `f` with a blocking context, plus the data
|
|
||||||
// that it is given. This function will then return whether this task
|
|
||||||
// should actually go to sleep or not. If `true` is returned, then this
|
|
||||||
// function does not return until someone calls `wake()` on the task.
|
|
||||||
// If `false` is returned, then this function immediately returns.
|
|
||||||
//
|
|
||||||
// # Safety note
|
|
||||||
//
|
|
||||||
// Note that this stack closure may not be run on the same stack as when
|
|
||||||
// this function was called. This means that the environment of this
|
|
||||||
// stack closure could be unsafely aliased. This is currently prevented
|
|
||||||
// through the guarantee that this function will never return before `f`
|
|
||||||
// finishes executing.
|
|
||||||
pub fn one(data: &mut TaskData,
|
|
||||||
f: |BlockingContext, &mut TaskData| -> bool) {
|
|
||||||
if rt::in_green_task_context() {
|
|
||||||
let sched: ~Scheduler = Local::take();
|
|
||||||
sched.deschedule_running_task_and_then(|sched, task| {
|
|
||||||
let ctx = BlockingContext { inner: GreenBlock(task, sched) };
|
|
||||||
// no need to do something on success/failure other than
|
|
||||||
// returning because the `block` function for a BlockingContext
|
|
||||||
// takes care of reawakening itself if the blocking procedure
|
|
||||||
// fails. If this function is successful, then we're already
|
|
||||||
// blocked, and if it fails, the task will already be
|
|
||||||
// rescheduled.
|
|
||||||
f(ctx, data);
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
unsafe { data.lock.lock(); }
|
|
||||||
let ctx = BlockingContext { inner: NativeBlock(None) };
|
|
||||||
if f(ctx, data) {
|
|
||||||
unsafe { data.lock.wait(); }
|
|
||||||
}
|
|
||||||
unsafe { data.lock.unlock(); }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Creates many blocking contexts. The intended use case for this
|
|
||||||
// function is selection over a number of ports. This will create `amt`
|
|
||||||
// blocking contexts, yielding them to `f` in turn. If `f` returns
|
|
||||||
// false, then this function aborts and returns immediately. If `f`
|
|
||||||
// repeatedly returns `true` `amt` times, then this function will block.
|
|
||||||
pub fn many(amt: uint, f: |BlockingContext| -> bool) {
|
|
||||||
if rt::in_green_task_context() {
|
|
||||||
let sched: ~Scheduler = Local::take();
|
|
||||||
sched.deschedule_running_task_and_then(|sched, task| {
|
|
||||||
for handle in task.make_selectable(amt) {
|
|
||||||
let ctx = BlockingContext {
|
|
||||||
inner: GreenBlock(handle, sched)
|
|
||||||
};
|
|
||||||
// see comment above in `one` for why no further action is
|
|
||||||
// necessary here
|
|
||||||
if !f(ctx) { break }
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
// In the native case, our decision to block must be shared
|
|
||||||
// amongst all of the channels. It may be possible to
|
|
||||||
// stack-allocate this mutex (instead of putting it in an
|
|
||||||
// UnsafeArc box), but for now in order to prevent
|
|
||||||
// use-after-free trivially we place this into a box and then
|
|
||||||
// pass that around.
|
|
||||||
unsafe {
|
|
||||||
let mtx = UnsafeArc::new(Mutex::new());
|
|
||||||
(*mtx.get()).lock();
|
|
||||||
let success = range(0, amt).all(|_| {
|
|
||||||
f(BlockingContext {
|
|
||||||
inner: NativeBlock(Some(mtx.clone()))
|
|
||||||
})
|
|
||||||
});
|
|
||||||
if success {
|
|
||||||
(*mtx.get()).wait();
|
|
||||||
}
|
|
||||||
(*mtx.get()).unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function will consume this BlockingContext, and optionally block
|
|
||||||
// if according to the atomic `decision` function. The semantics of this
|
|
||||||
// functions are:
|
|
||||||
//
|
|
||||||
// * `slot` is required to be a `None`-slot (which is owned by the
|
|
||||||
// channel)
|
|
||||||
// * The `slot` will be filled in with a blocked version of the current
|
|
||||||
// task (with `wake`-ability if this function is successful).
|
|
||||||
// * If the `decision` function returns true, then this function
|
|
||||||
// immediately returns having relinquished ownership of the task.
|
|
||||||
// * If the `decision` function returns false, then the `slot` is reset
|
|
||||||
// to `None` and the task is re-scheduled if necessary (remember that
|
|
||||||
// the task will not resume executing before the outer `one` or
|
|
||||||
// `many` function has returned. This function is expected to have a
|
|
||||||
// release memory fence in order for the modifications of `to_wake` to be
|
|
||||||
// visible to other tasks. Code which attempts to read `to_wake` should
|
|
||||||
// have an acquiring memory fence to guarantee that this write is
|
|
||||||
// visible.
|
|
||||||
//
|
|
||||||
// This function will return whether the blocking occurred or not.
|
|
||||||
pub fn block(self,
|
|
||||||
data: &mut TaskData,
|
|
||||||
slot: &mut Option<TaskHandle>,
|
|
||||||
decision: || -> bool) -> bool {
|
|
||||||
assert!(slot.is_none());
|
|
||||||
match self.inner {
|
|
||||||
GreenBlock(task, sched) => {
|
|
||||||
if data.handle.is_none() {
|
|
||||||
data.handle = Some(sched.make_handle());
|
|
||||||
}
|
|
||||||
let handle = data.handle.get_mut_ref() as *mut SchedHandle;
|
|
||||||
*slot = Some(TaskHandle { inner: Green(task, handle) });
|
|
||||||
|
|
||||||
if !decision() {
|
|
||||||
match slot.take_unwrap().inner {
|
|
||||||
Green(task, _) => sched.enqueue_blocked_task(task),
|
|
||||||
Native(..) => unreachable!()
|
|
||||||
}
|
|
||||||
false
|
|
||||||
} else {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
NativeBlock(shared) => {
|
|
||||||
*slot = Some(TaskHandle {
|
|
||||||
inner: Native(match shared {
|
|
||||||
Some(arc) => ArcWakeup(arc),
|
|
||||||
None => LocalWakeup(&mut data.lock as *mut Mutex),
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
if !decision() {
|
|
||||||
*slot = None;
|
|
||||||
false
|
|
||||||
} else {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Agnostic method of forcing a yield of the current task
|
|
||||||
pub fn yield_now() {
|
|
||||||
if rt::in_green_task_context() {
|
|
||||||
let sched: ~Scheduler = Local::take();
|
|
||||||
sched.yield_now();
|
|
||||||
} else {
|
|
||||||
Thread::yield_now();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Agnostic method of "maybe yielding" in order to provide fairness
|
|
||||||
pub fn maybe_yield() {
|
|
||||||
if rt::in_green_task_context() {
|
|
||||||
let sched: ~Scheduler = Local::take();
|
|
||||||
sched.maybe_yield();
|
|
||||||
} else {
|
|
||||||
// the OS decides fairness, nothing for us to do.
|
|
||||||
}
|
|
||||||
}
|
|
@ -233,14 +233,17 @@ use iter::Iterator;
|
|||||||
use kinds::Send;
|
use kinds::Send;
|
||||||
use ops::Drop;
|
use ops::Drop;
|
||||||
use option::{Option, Some, None};
|
use option::{Option, Some, None};
|
||||||
|
use result::{Ok, Err};
|
||||||
|
use rt::local::Local;
|
||||||
|
use rt::task::{Task, BlockedTask};
|
||||||
use rt::thread::Thread;
|
use rt::thread::Thread;
|
||||||
use unstable::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed};
|
use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed};
|
||||||
|
use task;
|
||||||
use vec::{ImmutableVector, OwnedVector};
|
use vec::{ImmutableVector, OwnedVector};
|
||||||
|
|
||||||
use spsc = rt::spsc_queue;
|
use spsc = sync::spsc_queue;
|
||||||
use mpsc = rt::mpsc_queue;
|
use mpsc = sync::mpsc_queue;
|
||||||
|
|
||||||
use self::imp::{TaskHandle, TaskData, BlockingContext};
|
|
||||||
pub use self::select::Select;
|
pub use self::select::Select;
|
||||||
|
|
||||||
macro_rules! test (
|
macro_rules! test (
|
||||||
@ -265,7 +268,6 @@ macro_rules! test (
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
mod imp;
|
|
||||||
mod select;
|
mod select;
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
@ -326,9 +328,7 @@ pub struct SharedChan<T> {
|
|||||||
struct Packet {
|
struct Packet {
|
||||||
cnt: AtomicInt, // How many items are on this channel
|
cnt: AtomicInt, // How many items are on this channel
|
||||||
steals: int, // How many times has a port received without blocking?
|
steals: int, // How many times has a port received without blocking?
|
||||||
to_wake: Option<TaskHandle>, // Task to wake up
|
to_wake: Option<BlockedTask>, // Task to wake up
|
||||||
|
|
||||||
data: TaskData,
|
|
||||||
|
|
||||||
// This lock is used to wake up native threads blocked in select. The
|
// This lock is used to wake up native threads blocked in select. The
|
||||||
// `lock` field is not used because the thread blocking in select must
|
// `lock` field is not used because the thread blocking in select must
|
||||||
@ -358,7 +358,6 @@ impl Packet {
|
|||||||
cnt: AtomicInt::new(0),
|
cnt: AtomicInt::new(0),
|
||||||
steals: 0,
|
steals: 0,
|
||||||
to_wake: None,
|
to_wake: None,
|
||||||
data: TaskData::new(),
|
|
||||||
channels: AtomicInt::new(1),
|
channels: AtomicInt::new(1),
|
||||||
|
|
||||||
selecting: AtomicBool::new(false),
|
selecting: AtomicBool::new(false),
|
||||||
@ -418,7 +417,10 @@ impl Packet {
|
|||||||
// This function must have had at least an acquire fence before it to be
|
// This function must have had at least an acquire fence before it to be
|
||||||
// properly called.
|
// properly called.
|
||||||
fn wakeup(&mut self, can_resched: bool) {
|
fn wakeup(&mut self, can_resched: bool) {
|
||||||
self.to_wake.take_unwrap().wake(can_resched);
|
match self.to_wake.take_unwrap().wake() {
|
||||||
|
Some(task) => task.reawaken(can_resched),
|
||||||
|
None => {}
|
||||||
|
}
|
||||||
self.selecting.store(false, Relaxed);
|
self.selecting.store(false, Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -607,7 +609,7 @@ impl<T: Send> Chan<T> {
|
|||||||
n => {
|
n => {
|
||||||
assert!(n >= 0);
|
assert!(n >= 0);
|
||||||
if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
|
if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
|
||||||
imp::maybe_yield();
|
task::deschedule();
|
||||||
}
|
}
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
@ -700,7 +702,7 @@ impl<T: Send> SharedChan<T> {
|
|||||||
-1 => { (*packet).wakeup(can_resched); }
|
-1 => { (*packet).wakeup(can_resched); }
|
||||||
n => {
|
n => {
|
||||||
if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
|
if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
|
||||||
imp::maybe_yield();
|
task::deschedule();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -840,8 +842,15 @@ impl<T: Send> Port<T> {
|
|||||||
unsafe {
|
unsafe {
|
||||||
this = cast::transmute_mut(self);
|
this = cast::transmute_mut(self);
|
||||||
packet = this.queue.packet();
|
packet = this.queue.packet();
|
||||||
BlockingContext::one(&mut (*packet).data, |ctx, data| {
|
let task: ~Task = Local::take();
|
||||||
ctx.block(data, &mut (*packet).to_wake, || (*packet).decrement())
|
task.deschedule(1, |task| {
|
||||||
|
assert!((*packet).to_wake.is_none());
|
||||||
|
(*packet).to_wake = Some(task);
|
||||||
|
if (*packet).decrement() {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err((*packet).to_wake.take_unwrap())
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,10 +50,13 @@ use kinds::Send;
|
|||||||
use ops::Drop;
|
use ops::Drop;
|
||||||
use option::{Some, None, Option};
|
use option::{Some, None, Option};
|
||||||
use ptr::RawPtr;
|
use ptr::RawPtr;
|
||||||
use super::imp::BlockingContext;
|
use result::{Ok, Err};
|
||||||
use super::{Packet, Port, imp};
|
use rt::thread::Thread;
|
||||||
|
use rt::local::Local;
|
||||||
|
use rt::task::Task;
|
||||||
|
use super::{Packet, Port};
|
||||||
|
use sync::atomics::{Relaxed, SeqCst};
|
||||||
use uint;
|
use uint;
|
||||||
use unstable::atomics::{Relaxed, SeqCst};
|
|
||||||
|
|
||||||
macro_rules! select {
|
macro_rules! select {
|
||||||
(
|
(
|
||||||
@ -184,19 +187,22 @@ impl Select {
|
|||||||
// Acquire a number of blocking contexts, and block on each one
|
// Acquire a number of blocking contexts, and block on each one
|
||||||
// sequentially until one fails. If one fails, then abort
|
// sequentially until one fails. If one fails, then abort
|
||||||
// immediately so we can go unblock on all the other ports.
|
// immediately so we can go unblock on all the other ports.
|
||||||
BlockingContext::many(amt, |ctx| {
|
let task: ~Task = Local::take();
|
||||||
|
task.deschedule(amt, |task| {
|
||||||
|
// Prepare for the block
|
||||||
let (i, packet) = iter.next().unwrap();
|
let (i, packet) = iter.next().unwrap();
|
||||||
|
assert!((*packet).to_wake.is_none());
|
||||||
|
(*packet).to_wake = Some(task);
|
||||||
(*packet).selecting.store(true, SeqCst);
|
(*packet).selecting.store(true, SeqCst);
|
||||||
if !ctx.block(&mut (*packet).data,
|
|
||||||
&mut (*packet).to_wake,
|
if (*packet).decrement() {
|
||||||
|| (*packet).decrement()) {
|
Ok(())
|
||||||
|
} else {
|
||||||
(*packet).abort_selection(false);
|
(*packet).abort_selection(false);
|
||||||
(*packet).selecting.store(false, SeqCst);
|
(*packet).selecting.store(false, SeqCst);
|
||||||
ready_index = i;
|
ready_index = i;
|
||||||
ready_id = (*packet).selection_id;
|
ready_id = (*packet).selection_id;
|
||||||
false
|
Err((*packet).to_wake.take_unwrap())
|
||||||
} else {
|
|
||||||
true
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -225,7 +231,7 @@ impl Select {
|
|||||||
if (*packet).abort_selection(true) {
|
if (*packet).abort_selection(true) {
|
||||||
ready_id = (*packet).selection_id;
|
ready_id = (*packet).selection_id;
|
||||||
while (*packet).selecting.load(Relaxed) {
|
while (*packet).selecting.load(Relaxed) {
|
||||||
imp::yield_now();
|
task::deschedule();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user