mirror of
https://github.com/rust-lang/rust.git
synced 2024-10-31 14:31:55 +00:00
std: leak remaining messages in bounded channel if message destructor panics
This commit is contained in:
parent
4e9e465bd4
commit
34aa87292c
@ -15,7 +15,7 @@ use super::utils::{Backoff, CachePadded};
|
||||
use super::waker::SyncWaker;
|
||||
|
||||
use crate::cell::UnsafeCell;
|
||||
use crate::mem::{self, MaybeUninit};
|
||||
use crate::mem::MaybeUninit;
|
||||
use crate::ptr;
|
||||
use crate::sync::atomic::{self, AtomicUsize, Ordering};
|
||||
use crate::time::Instant;
|
||||
@ -479,6 +479,10 @@ impl<T> Channel<T> {
|
||||
///
|
||||
/// `tail` should be the current (and therefore last) value of `tail`.
|
||||
///
|
||||
/// # Panicking
|
||||
/// If a destructor panics, the remaining messages are leaked, matching the
|
||||
/// behaviour of the unbounded channel.
|
||||
///
|
||||
/// # Safety
|
||||
/// This method must only be called when dropping the last receiver. The
|
||||
/// destruction of all other receivers must have been observed with acquire
|
||||
@ -486,75 +490,47 @@ impl<T> Channel<T> {
|
||||
unsafe fn discard_all_messages(&self, tail: usize) {
|
||||
debug_assert!(self.is_disconnected());
|
||||
|
||||
/// Use a helper struct with a custom `Drop` to ensure all messages are
|
||||
/// dropped, even if a destructor panicks.
|
||||
struct DiscardState<'a, T> {
|
||||
channel: &'a Channel<T>,
|
||||
head: usize,
|
||||
tail: usize,
|
||||
backoff: Backoff,
|
||||
}
|
||||
// Only receivers modify `head`, so since we are the last one,
|
||||
// this value will not change and will not be observed (since
|
||||
// no new messages can be sent after disconnection).
|
||||
let mut head = self.head.load(Ordering::Relaxed);
|
||||
let tail = tail & !self.mark_bit;
|
||||
|
||||
impl<'a, T> DiscardState<'a, T> {
|
||||
fn discard(&mut self) {
|
||||
loop {
|
||||
// Deconstruct the head.
|
||||
let index = self.head & (self.channel.mark_bit - 1);
|
||||
let lap = self.head & !(self.channel.one_lap - 1);
|
||||
let backoff = Backoff::new();
|
||||
loop {
|
||||
// Deconstruct the head.
|
||||
let index = head & (self.mark_bit - 1);
|
||||
let lap = head & !(self.one_lap - 1);
|
||||
|
||||
// Inspect the corresponding slot.
|
||||
debug_assert!(index < self.channel.buffer.len());
|
||||
let slot = unsafe { self.channel.buffer.get_unchecked(index) };
|
||||
let stamp = slot.stamp.load(Ordering::Acquire);
|
||||
// Inspect the corresponding slot.
|
||||
debug_assert!(index < self.buffer.len());
|
||||
let slot = unsafe { self.buffer.get_unchecked(index) };
|
||||
let stamp = slot.stamp.load(Ordering::Acquire);
|
||||
|
||||
// If the stamp is ahead of the head by 1, we may drop the message.
|
||||
if self.head + 1 == stamp {
|
||||
self.head = if index + 1 < self.channel.cap {
|
||||
// Same lap, incremented index.
|
||||
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
|
||||
self.head + 1
|
||||
} else {
|
||||
// One lap forward, index wraps around to zero.
|
||||
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
|
||||
lap.wrapping_add(self.channel.one_lap)
|
||||
};
|
||||
// If the stamp is ahead of the head by 1, we may drop the message.
|
||||
if head + 1 == stamp {
|
||||
head = if index + 1 < self.cap {
|
||||
// Same lap, incremented index.
|
||||
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
|
||||
head + 1
|
||||
} else {
|
||||
// One lap forward, index wraps around to zero.
|
||||
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
|
||||
lap.wrapping_add(self.one_lap)
|
||||
};
|
||||
|
||||
// We updated the head, so even if this descrutor panics,
|
||||
// we will not attempt to destroy the slot again.
|
||||
unsafe {
|
||||
(*slot.msg.get()).assume_init_drop();
|
||||
}
|
||||
// If the tail equals the head, that means the channel is empty.
|
||||
} else if self.tail == self.head {
|
||||
return;
|
||||
// Otherwise, a sender is about to write into the slot, so we need
|
||||
// to wait for it to update the stamp.
|
||||
} else {
|
||||
self.backoff.spin_heavy();
|
||||
}
|
||||
unsafe {
|
||||
(*slot.msg.get()).assume_init_drop();
|
||||
}
|
||||
// If the tail equals the head, that means the channel is empty.
|
||||
} else if tail == head {
|
||||
return;
|
||||
// Otherwise, a sender is about to write into the slot, so we need
|
||||
// to wait for it to update the stamp.
|
||||
} else {
|
||||
backoff.spin_heavy();
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Drop for DiscardState<'a, T> {
|
||||
fn drop(&mut self) {
|
||||
self.discard();
|
||||
}
|
||||
}
|
||||
|
||||
let mut state = DiscardState {
|
||||
channel: self,
|
||||
// Only receivers modify `head`, so since we are the last one,
|
||||
// this value will not change and will not be observed (since
|
||||
// no new messages can be sent after disconnection).
|
||||
head: self.head.load(Ordering::Relaxed),
|
||||
tail: tail & !self.mark_bit,
|
||||
backoff: Backoff::new(),
|
||||
};
|
||||
state.discard();
|
||||
// This point is only reached if no destructor panics, so all messages
|
||||
// have already been dropped.
|
||||
mem::forget(state);
|
||||
}
|
||||
|
||||
/// Returns `true` if the channel is disconnected.
|
||||
|
Loading…
Reference in New Issue
Block a user