[wgpu-core] Return submission index for map_async and on_submitted_work_done to track down completion of async callbacks (#6360)

This commit is contained in:
Elie Michel 2024-11-04 15:21:09 +01:00 committed by GitHub
parent f3db183f1b
commit 6a75a73300
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 56 additions and 73 deletions

View File

@ -47,6 +47,10 @@ Bottom level categories:
- Parse `diagnostic(…)` directives, but don't implement any triggering rules yet. By @ErichDonGubler in [#6456](https://github.com/gfx-rs/wgpu/pull/6456).
- Fix an issue where `naga` CLI would incorrectly skip the first positional argument when `--stdin-file-path` was specified. By @ErichDonGubler in [#6480](https://github.com/gfx-rs/wgpu/pull/6480).
#### General
- Return submission index in `map_async` and `on_submitted_work_done` to track down completion of async callbacks. By @eliemichel in [#6360](https://github.com/gfx-rs/wgpu/pull/6360).
### Changes
#### HAL

View File

@ -2147,33 +2147,27 @@ impl Global {
offset: BufferAddress,
size: Option<BufferAddress>,
op: BufferMapOperation,
) -> BufferAccessResult {
) -> Result<crate::SubmissionIndex, BufferAccessError> {
profiling::scope!("Buffer::map_async");
api_log!("Buffer::map_async {buffer_id:?} offset {offset:?} size {size:?} op: {op:?}");
let hub = &self.hub;
let op_and_err = 'error: {
let buffer = match hub.buffers.get(buffer_id).get() {
Ok(buffer) => buffer,
Err(e) => break 'error Some((op, e.into())),
};
buffer.map_async(offset, size, op).err()
let map_result = match hub.buffers.get(buffer_id).get() {
Ok(buffer) => buffer.map_async(offset, size, op),
Err(e) => Err((op, e.into())),
};
// User callbacks must not be called while holding `buffer.map_async`'s locks, so we
// defer the error callback if it needs to be called immediately (typically when running
// into errors).
if let Some((mut operation, err)) = op_and_err {
if let Some(callback) = operation.callback.take() {
callback.call(Err(err.clone()));
match map_result {
Ok(submission_index) => Ok(submission_index),
Err((mut operation, err)) => {
if let Some(callback) = operation.callback.take() {
callback.call(Err(err.clone()));
}
log::error!("Buffer::map_async error: {err}");
Err(err)
}
log::error!("Buffer::map_async error: {err}");
return Err(err);
}
Ok(())
}
pub fn buffer_get_mapped_range(

View File

@ -126,35 +126,20 @@ pub enum WaitIdleError {
/// - Each buffer's `ResourceInfo::submission_index` records the index of the
/// most recent queue submission that uses that buffer.
///
/// - Calling `Global::buffer_map_async` adds the buffer to
/// `self.mapped`, and changes `Buffer::map_state` to prevent it
/// from being used in any new submissions.
///
/// - When the device is polled, the following `LifetimeTracker` methods decide
/// what should happen next:
///
/// 1) `triage_mapped` drains `self.mapped`, checking the submission index
/// of each buffer against the queue submissions that have finished
/// execution. Buffers used by submissions still in flight go in
/// `self.active[index].mapped`, and the rest go into
/// `self.ready_to_map`.
///
/// 2) `triage_submissions` moves entries in `self.active[i]` for completed
/// 1) `triage_submissions` moves entries in `self.active[i]` for completed
/// submissions to `self.ready_to_map`. At this point, both
/// `self.active` and `self.ready_to_map` are up to date with the given
/// submission index.
///
/// 3) `handle_mapping` drains `self.ready_to_map` and actually maps the
/// 2) `handle_mapping` drains `self.ready_to_map` and actually maps the
/// buffers, collecting a list of notification closures to call.
///
/// Only calling `Global::buffer_map_async` clones a new `Arc` for the
/// buffer. This new `Arc` is only dropped by `handle_mapping`.
pub(crate) struct LifetimeTracker {
/// Buffers for which a call to [`Buffer::map_async`] has succeeded, but
/// which haven't been examined by `triage_mapped` yet to decide when they
/// can be mapped.
mapped: Vec<Arc<Buffer>>,
/// Resources used by queue submissions still in flight. One entry per
/// submission, with older submissions appearing before younger.
///
@ -182,7 +167,6 @@ pub(crate) struct LifetimeTracker {
impl LifetimeTracker {
pub fn new() -> Self {
Self {
mapped: Vec::new(),
active: Vec::new(),
ready_to_map: Vec::new(),
work_done_closures: SmallVec::new(),
@ -211,8 +195,21 @@ impl LifetimeTracker {
});
}
pub(crate) fn map(&mut self, value: &Arc<Buffer>) {
self.mapped.push(value.clone());
pub(crate) fn map(&mut self, buffer: &Arc<Buffer>) -> Option<SubmissionIndex> {
// Determine which buffers are ready to map, and which must wait for the GPU.
let submission = self
.active
.iter_mut()
.rev()
.find(|a| a.contains_buffer(buffer));
let maybe_submission_index = submission.as_ref().map(|s| s.index);
submission
.map_or(&mut self.ready_to_map, |a| &mut a.mapped)
.push(buffer.clone());
maybe_submission_index
}
/// Returns the submission index of the most recent submission that uses the
@ -304,41 +301,24 @@ impl LifetimeTracker {
}
}
pub fn add_work_done_closure(&mut self, closure: SubmittedWorkDoneClosure) {
pub fn add_work_done_closure(
&mut self,
closure: SubmittedWorkDoneClosure,
) -> Option<SubmissionIndex> {
match self.active.last_mut() {
Some(active) => {
active.work_done_closures.push(closure);
Some(active.index)
}
// We must defer the closure until all previously occurring map_async closures
// have fired. This is required by the spec.
None => {
self.work_done_closures.push(closure);
None
}
}
}
/// Determine which buffers are ready to map, and which must wait for the
/// GPU.
///
/// See the documentation for [`LifetimeTracker`] for details.
pub(crate) fn triage_mapped(&mut self) {
if self.mapped.is_empty() {
return;
}
for buffer in self.mapped.drain(..) {
let submission = self
.active
.iter_mut()
.rev()
.find(|a| a.contains_buffer(&buffer));
submission
.map_or(&mut self.ready_to_map, |a| &mut a.mapped)
.push(buffer);
}
}
/// Map the buffers in `self.ready_to_map`.
///
/// Return a list of mapping notifications to send.

View File

@ -1256,10 +1256,13 @@ impl Queue {
unsafe { self.raw().get_timestamp_period() }
}
pub fn on_submitted_work_done(&self, closure: SubmittedWorkDoneClosure) {
pub fn on_submitted_work_done(
&self,
closure: SubmittedWorkDoneClosure,
) -> Option<SubmissionIndex> {
api_log!("Queue::on_submitted_work_done");
//TODO: flush pending writes
self.device.lock_life().add_work_done_closure(closure);
self.device.lock_life().add_work_done_closure(closure)
}
}
@ -1402,9 +1405,13 @@ impl Global {
&self,
queue_id: QueueId,
closure: SubmittedWorkDoneClosure,
) {
) -> SubmissionIndex {
api_log!("Queue::on_submitted_work_done {queue_id:?}");
//TODO: flush pending writes
let queue = self.hub.queues.get(queue_id);
queue.on_submitted_work_done(closure);
let result = queue.on_submitted_work_done(closure);
result.unwrap_or(0) // '0' means no wait is necessary
}
}

View File

@ -493,8 +493,6 @@ impl Device {
let submission_closures =
life_tracker.triage_submissions(submission_index, &self.command_allocator);
life_tracker.triage_mapped();
let mapping_closures = life_tracker.handle_mapping(self.raw(), &snatch_guard);
let queue_empty = life_tracker.queue_empty();

View File

@ -15,7 +15,7 @@ use crate::{
snatch::{SnatchGuard, Snatchable},
track::{SharedTrackerIndexAllocator, TextureSelector, TrackerIndex},
weak_vec::WeakVec,
Label, LabelHelpers,
Label, LabelHelpers, SubmissionIndex,
};
use smallvec::SmallVec;
@ -304,7 +304,7 @@ impl BufferMapCallback {
// SAFETY: the contract of the call to from_c says that this unsafe is sound.
BufferMapCallbackInner::C { inner } => unsafe {
let status = match result {
Ok(()) => BufferMapAsyncStatus::Success,
Ok(_) => BufferMapAsyncStatus::Success,
Err(BufferAccessError::Device(_)) => BufferMapAsyncStatus::ContextLost,
Err(BufferAccessError::InvalidResource(_))
| Err(BufferAccessError::DestroyedResource(_)) => BufferMapAsyncStatus::Invalid,
@ -546,7 +546,7 @@ impl Buffer {
offset: wgt::BufferAddress,
size: Option<wgt::BufferAddress>,
op: BufferMapOperation,
) -> Result<(), (BufferMapOperation, BufferAccessError)> {
) -> Result<SubmissionIndex, (BufferMapOperation, BufferAccessError)> {
let range_size = if let Some(size) = size {
size
} else if offset > self.size {
@ -633,9 +633,9 @@ impl Buffer {
.buffers
.set_single(self, internal_use);
device.lock_life().map(self);
let submit_index = device.lock_life().map(self).unwrap_or(0); // '0' means no wait is necessary
Ok(())
Ok(submit_index)
}
// Note: This must not be called while holding a lock.

View File

@ -1411,7 +1411,7 @@ impl crate::Context for ContextWgpuCore {
Some(range.end - range.start),
operation,
) {
Ok(()) => (),
Ok(_) => (),
Err(cause) => {
self.handle_error_nolabel(&buffer_data.error_sink, cause, "Buffer::map_async")
}