Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 20 additions & 27 deletions crates/bevy_asset/src/io/processor_gated.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
io::{AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader},
processor::{AssetProcessorData, ProcessStatus},
processor::{ProcessStatus, ProcessingState},
AssetPath,
};
use alloc::{borrow::ToOwned, boxed::Box, sync::Arc, vec::Vec};
Expand All @@ -16,46 +16,33 @@ use super::{AsyncSeekForward, ErasedAssetReader};
/// given path until that path has been processed by [`AssetProcessor`].
///
/// [`AssetProcessor`]: crate::processor::AssetProcessor
pub struct ProcessorGatedReader {
reader: Box<dyn ErasedAssetReader>,
pub(crate) struct ProcessorGatedReader {
reader: Arc<dyn ErasedAssetReader>,
source: AssetSourceId<'static>,
processor_data: Arc<AssetProcessorData>,
processing_state: Arc<ProcessingState>,
}

impl ProcessorGatedReader {
/// Creates a new [`ProcessorGatedReader`].
pub fn new(
pub(crate) fn new(
source: AssetSourceId<'static>,
reader: Box<dyn ErasedAssetReader>,
processor_data: Arc<AssetProcessorData>,
reader: Arc<dyn ErasedAssetReader>,
processing_state: Arc<ProcessingState>,
) -> Self {
Self {
source,
processor_data,
reader,
processing_state,
}
}

/// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur
/// while it is held.
async fn get_transaction_lock(
&self,
path: &AssetPath<'static>,
) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
let infos = self.processor_data.asset_infos.read().await;
let info = infos
.get(path)
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
Ok(info.file_transaction_lock.read_arc().await)
}
}

impl AssetReader for ProcessorGatedReader {
async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
trace!("Waiting for processing to finish before reading {asset_path}");
let process_result = self
.processor_data
.processing_state
.wait_until_processed(asset_path.clone())
.await;
match process_result {
Expand All @@ -65,7 +52,10 @@ impl AssetReader for ProcessorGatedReader {
}
}
trace!("Processing finished with {asset_path}, reading {process_result:?}",);
let lock = self.get_transaction_lock(&asset_path).await?;
let lock = self
.processing_state
.get_transaction_lock(&asset_path)
.await?;
let asset_reader = self.reader.read(path).await?;
let reader = TransactionLockedReader::new(asset_reader, lock);
Ok(reader)
Expand All @@ -75,7 +65,7 @@ impl AssetReader for ProcessorGatedReader {
let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
trace!("Waiting for processing to finish before reading meta for {asset_path}",);
let process_result = self
.processor_data
.processing_state
.wait_until_processed(asset_path.clone())
.await;
match process_result {
Expand All @@ -85,7 +75,10 @@ impl AssetReader for ProcessorGatedReader {
}
}
trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);
let lock = self.get_transaction_lock(&asset_path).await?;
let lock = self
.processing_state
.get_transaction_lock(&asset_path)
.await?;
let meta_reader = self.reader.read_meta(path).await?;
let reader = TransactionLockedReader::new(meta_reader, lock);
Ok(reader)
Expand All @@ -99,7 +92,7 @@ impl AssetReader for ProcessorGatedReader {
"Waiting for processing to finish before reading directory {:?}",
path
);
self.processor_data.wait_until_finished().await;
self.processing_state.wait_until_finished().await;
trace!("Processing finished, reading directory {:?}", path);
let result = self.reader.read_directory(path).await?;
Ok(result)
Expand All @@ -110,7 +103,7 @@ impl AssetReader for ProcessorGatedReader {
"Waiting for processing to finish before reading directory {:?}",
path
);
self.processor_data.wait_until_finished().await;
self.processing_state.wait_until_finished().await;
trace!("Processing finished, getting directory status {:?}", path);
let result = self.reader.is_directory(path).await?;
Ok(result)
Expand Down
30 changes: 22 additions & 8 deletions crates/bevy_asset/src/io/source.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
io::{processor_gated::ProcessorGatedReader, AssetSourceEvent, AssetWatcher},
processor::AssetProcessorData,
processor::ProcessingState,
};
use alloc::{
boxed::Box,
Expand Down Expand Up @@ -180,7 +180,12 @@ impl AssetSourceBuilder {
id: id.clone(),
reader,
writer,
processed_reader: self.processed_reader.as_mut().map(|r| r()),
processed_reader: self
.processed_reader
.as_mut()
.map(|r| r())
.map(Into::<Arc<_>>::into),
Comment on lines +186 to +187
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the AssetSourceBuilder functions generate Arcs instead of always converting the Box they make?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They could do. I didn't want to change too much since it's not fully necessary, and also returning a box means we own the reader rather than being given a cloned arc they got from somewhere else. I'd say that's malicious, but it lines up!

In the future, we might refactor this even more since I think the builder API mostly only made sense with AssetSourceBuilders which I'm getting rid of in the next PR.

ungated_processed_reader: None,
processed_writer,
event_receiver: None,
watcher: None,
Expand Down Expand Up @@ -386,7 +391,8 @@ pub struct AssetSource {
id: AssetSourceId<'static>,
reader: Box<dyn ErasedAssetReader>,
writer: Option<Box<dyn ErasedAssetWriter>>,
processed_reader: Option<Box<dyn ErasedAssetReader>>,
processed_reader: Option<Arc<dyn ErasedAssetReader>>,
ungated_processed_reader: Option<Arc<dyn ErasedAssetReader>>,
processed_writer: Option<Box<dyn ErasedAssetWriter>>,
watcher: Option<Box<dyn AssetWatcher>>,
processed_watcher: Option<Box<dyn AssetWatcher>>,
Expand Down Expand Up @@ -425,6 +431,13 @@ impl AssetSource {
.ok_or_else(|| MissingProcessedAssetReaderError(self.id.clone_owned()))
}

/// Return's this source's ungated processed [`AssetReader`](crate::io::AssetReader), if it
/// exists.
#[inline]
pub(crate) fn ungated_processed_reader(&self) -> Option<&dyn ErasedAssetReader> {
self.ungated_processed_reader.as_deref()
}

/// Return's this source's processed [`AssetWriter`](crate::io::AssetWriter), if it exists.
#[inline]
pub fn processed_writer(
Expand Down Expand Up @@ -560,12 +573,13 @@ impl AssetSource {

/// This will cause processed [`AssetReader`](crate::io::AssetReader) futures (such as [`AssetReader::read`](crate::io::AssetReader::read)) to wait until
/// the [`AssetProcessor`](crate::AssetProcessor) has finished processing the requested asset.
pub fn gate_on_processor(&mut self, processor_data: Arc<AssetProcessorData>) {
pub(crate) fn gate_on_processor(&mut self, processing_state: Arc<ProcessingState>) {
if let Some(reader) = self.processed_reader.take() {
self.processed_reader = Some(Box::new(ProcessorGatedReader::new(
self.ungated_processed_reader = Some(reader.clone());
self.processed_reader = Some(Arc::new(ProcessorGatedReader::new(
self.id(),
reader,
processor_data,
processing_state,
)));
}
}
Expand Down Expand Up @@ -622,9 +636,9 @@ impl AssetSources {

/// This will cause processed [`AssetReader`](crate::io::AssetReader) futures (such as [`AssetReader::read`](crate::io::AssetReader::read)) to wait until
/// the [`AssetProcessor`](crate::AssetProcessor) has finished processing the requested asset.
pub fn gate_on_processor(&mut self, processor_data: Arc<AssetProcessorData>) {
pub(crate) fn gate_on_processor(&mut self, processing_state: Arc<ProcessingState>) {
for source in self.iter_processed_mut() {
source.gate_on_processor(processor_data.clone());
source.gate_on_processor(processing_state.clone());
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions crates/bevy_asset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ impl Plugin for AssetPlugin {
let sources = builders.build_sources(watch, false);

app.insert_resource(AssetServer::new_with_meta_check(
sources,
Arc::new(sources),
AssetServerMode::Unprocessed,
self.meta_check.clone(),
watch,
Expand All @@ -388,9 +388,7 @@ impl Plugin for AssetPlugin {
.unwrap_or(cfg!(feature = "asset_processor"));
if use_asset_processor {
let mut builders = app.world_mut().resource_mut::<AssetSourceBuilders>();
let processor = AssetProcessor::new(&mut builders);
let mut sources = builders.build_sources(false, watch);
sources.gate_on_processor(processor.data.clone());
let (processor, sources) = AssetProcessor::new(&mut builders, watch);
// the main asset server shares loaders with the processor asset server
app.insert_resource(AssetServer::new_with_loaders(
sources,
Expand All @@ -406,7 +404,7 @@ impl Plugin for AssetPlugin {
let mut builders = app.world_mut().resource_mut::<AssetSourceBuilders>();
let sources = builders.build_sources(false, watch);
app.insert_resource(AssetServer::new_with_meta_check(
sources,
Arc::new(sources),
AssetServerMode::Processed,
AssetMetaCheck::Always,
watch,
Expand Down
Loading