Skip to content

Commit 9a0c047

Browse files
committed
Separate all the processing state stuff into its own ProcessingState struct.
1 parent ac7f9b0 commit 9a0c047

File tree

2 files changed

+125
-56
lines changed

2 files changed

+125
-56
lines changed

crates/bevy_asset/src/io/processor_gated.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,6 @@ impl ProcessorGatedReader {
3535
reader,
3636
}
3737
}
38-
39-
/// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur
40-
/// while it is held.
41-
async fn get_transaction_lock(
42-
&self,
43-
path: &AssetPath<'static>,
44-
) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
45-
let infos = self.processor_data.asset_infos.read().await;
46-
let info = infos
47-
.get(path)
48-
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
49-
Ok(info.file_transaction_lock.read_arc().await)
50-
}
5138
}
5239

5340
impl AssetReader for ProcessorGatedReader {
@@ -56,6 +43,7 @@ impl AssetReader for ProcessorGatedReader {
5643
trace!("Waiting for processing to finish before reading {asset_path}");
5744
let process_result = self
5845
.processor_data
46+
.processing_state
5947
.wait_until_processed(asset_path.clone())
6048
.await;
6149
match process_result {
@@ -65,7 +53,11 @@ impl AssetReader for ProcessorGatedReader {
6553
}
6654
}
6755
trace!("Processing finished with {asset_path}, reading {process_result:?}",);
68-
let lock = self.get_transaction_lock(&asset_path).await?;
56+
let lock = self
57+
.processor_data
58+
.processing_state
59+
.get_transaction_lock(&asset_path)
60+
.await?;
6961
let asset_reader = self.reader.read(path).await?;
7062
let reader = TransactionLockedReader::new(asset_reader, lock);
7163
Ok(reader)
@@ -76,6 +68,7 @@ impl AssetReader for ProcessorGatedReader {
7668
trace!("Waiting for processing to finish before reading meta for {asset_path}",);
7769
let process_result = self
7870
.processor_data
71+
.processing_state
7972
.wait_until_processed(asset_path.clone())
8073
.await;
8174
match process_result {
@@ -85,7 +78,11 @@ impl AssetReader for ProcessorGatedReader {
8578
}
8679
}
8780
trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);
88-
let lock = self.get_transaction_lock(&asset_path).await?;
81+
let lock = self
82+
.processor_data
83+
.processing_state
84+
.get_transaction_lock(&asset_path)
85+
.await?;
8986
let meta_reader = self.reader.read_meta(path).await?;
9087
let reader = TransactionLockedReader::new(meta_reader, lock);
9188
Ok(reader)
@@ -99,7 +96,10 @@ impl AssetReader for ProcessorGatedReader {
9996
"Waiting for processing to finish before reading directory {:?}",
10097
path
10198
);
102-
self.processor_data.wait_until_finished().await;
99+
self.processor_data
100+
.processing_state
101+
.wait_until_finished()
102+
.await;
103103
trace!("Processing finished, reading directory {:?}", path);
104104
let result = self.reader.read_directory(path).await?;
105105
Ok(result)
@@ -110,7 +110,10 @@ impl AssetReader for ProcessorGatedReader {
110110
"Waiting for processing to finish before reading directory {:?}",
111111
path
112112
);
113-
self.processor_data.wait_until_finished().await;
113+
self.processor_data
114+
.processing_state
115+
.wait_until_finished()
116+
.await;
114117
trace!("Processing finished, getting directory status {:?}", path);
115118
let result = self.reader.is_directory(path).await?;
116119
Ok(result)

crates/bevy_asset/src/processor/mod.rs

Lines changed: 105 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
mod log;
4141
mod process;
4242

43+
use async_lock::RwLockReadGuardArc;
4344
pub use log::*;
4445
pub use process::*;
4546

@@ -103,7 +104,8 @@ pub struct AssetProcessor {
103104

104105
/// Internal data stored inside an [`AssetProcessor`].
105106
pub struct AssetProcessorData {
106-
pub(crate) asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
107+
/// The state of processing.
108+
pub(crate) processing_state: ProcessingState,
107109
/// The factory that creates the transaction log.
108110
///
109111
/// Note: we use a regular Mutex instead of an async mutex since we expect users to only set
@@ -114,12 +116,21 @@ pub struct AssetProcessorData {
114116
processors: RwLock<HashMap<&'static str, Arc<dyn ErasedProcessor>>>,
115117
/// Default processors for file extensions
116118
default_processors: RwLock<HashMap<Box<str>, &'static str>>,
117-
state: async_lock::RwLock<ProcessorState>,
118119
sources: AssetSources,
120+
}
121+
122+
/// The current state of processing, including the overall state and the state of all assets.
123+
pub(crate) struct ProcessingState {
124+
/// The overall state of processing.
125+
state: async_lock::RwLock<ProcessorState>,
126+
/// The channel to broadcast when the processor has completed initialization.
119127
initialized_sender: async_broadcast::Sender<()>,
120128
initialized_receiver: async_broadcast::Receiver<()>,
129+
/// The channel to broadcast when the processor has completed processing.
121130
finished_sender: async_broadcast::Sender<()>,
122131
finished_receiver: async_broadcast::Receiver<()>,
132+
/// The current state of the assets.
133+
asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
123134
}
124135

125136
impl AssetProcessor {
@@ -150,20 +161,9 @@ impl AssetProcessor {
150161
&self.server
151162
}
152163

153-
async fn set_state(&self, state: ProcessorState) {
154-
let mut state_guard = self.data.state.write().await;
155-
let last_state = *state_guard;
156-
*state_guard = state;
157-
if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
158-
self.data.finished_sender.broadcast(()).await.unwrap();
159-
} else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
160-
self.data.initialized_sender.broadcast(()).await.unwrap();
161-
}
162-
}
163-
164164
/// Retrieves the current [`ProcessorState`]
165165
pub async fn get_state(&self) -> ProcessorState {
166-
*self.data.state.read().await
166+
self.data.processing_state.get_state().await
167167
}
168168

169169
/// Retrieves the [`AssetSource`] for this processor
@@ -325,7 +325,10 @@ impl AssetProcessor {
325325
// to the finished state (otherwise we'd be sitting around stuck in the `Initialized`
326326
// state).
327327
if new_task_receiver.is_empty() {
328-
self.set_state(ProcessorState::Finished).await;
328+
self.data
329+
.processing_state
330+
.set_state(ProcessorState::Finished)
331+
.await;
329332
}
330333
enum ProcessorTaskEvent {
331334
Start(AssetSourceId<'static>, PathBuf),
@@ -371,14 +374,20 @@ impl AssetProcessor {
371374
let _ = task_finished_sender.send(()).await;
372375
})
373376
.detach();
374-
self.set_state(ProcessorState::Processing).await;
377+
self.data
378+
.processing_state
379+
.set_state(ProcessorState::Processing)
380+
.await;
375381
}
376382
ProcessorTaskEvent::Finished => {
377383
pending_tasks -= 1;
378384
if pending_tasks == 0 {
379385
// clean up metadata in asset server
380386
self.server.write_infos().consume_handle_drop_events();
381-
self.set_state(ProcessorState::Finished).await;
387+
self.data
388+
.processing_state
389+
.set_state(ProcessorState::Finished)
390+
.await;
382391
}
383392
}
384393
}
@@ -628,7 +637,7 @@ impl AssetProcessor {
628637
async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
629638
let asset_path = AssetPath::from(path).with_source(source.id());
630639
debug!("Removing processed {asset_path} because source was removed");
631-
let mut infos = self.data.asset_infos.write().await;
640+
let mut infos = self.data.processing_state.asset_infos.write().await;
632641
if let Some(info) = infos.get(&asset_path) {
633642
// we must wait for uncontested write access to the asset source to ensure existing readers / writers
634643
// can finish their operations
@@ -648,7 +657,7 @@ impl AssetProcessor {
648657
new: PathBuf,
649658
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
650659
) {
651-
let mut infos = self.data.asset_infos.write().await;
660+
let mut infos = self.data.processing_state.asset_infos.write().await;
652661
let old = AssetPath::from(old).with_source(source.id());
653662
let new = AssetPath::from(new).with_source(source.id());
654663
let processed_writer = source.processed_writer().unwrap();
@@ -740,7 +749,7 @@ impl AssetProcessor {
740749
/// This will validate transactions and recover failed transactions when necessary.
741750
async fn initialize(&self) -> Result<(), InitializeError> {
742751
self.validate_transaction_log_and_recover().await;
743-
let mut asset_infos = self.data.asset_infos.write().await;
752+
let mut asset_infos = self.data.processing_state.asset_infos.write().await;
744753

745754
/// Retrieves asset paths recursively. If `clean_empty_folders_writer` is Some, it will be used to clean up empty
746755
/// folders when they are discovered.
@@ -855,7 +864,10 @@ impl AssetProcessor {
855864
}
856865
}
857866

858-
self.set_state(ProcessorState::Processing).await;
867+
self.data
868+
.processing_state
869+
.set_state(ProcessorState::Processing)
870+
.await;
859871

860872
Ok(())
861873
}
@@ -913,7 +925,7 @@ impl AssetProcessor {
913925
) {
914926
let asset_path = AssetPath::from(path).with_source(source.id());
915927
let result = self.process_asset_internal(source, &asset_path).await;
916-
let mut infos = self.data.asset_infos.write().await;
928+
let mut infos = self.data.processing_state.asset_infos.write().await;
917929
infos
918930
.finish_processing(asset_path, result, processor_task_event)
919931
.await;
@@ -1016,7 +1028,7 @@ impl AssetProcessor {
10161028
};
10171029

10181030
{
1019-
let infos = self.data.asset_infos.read().await;
1031+
let infos = self.data.processing_state.asset_infos.read().await;
10201032
if let Some(current_processed_info) = infos
10211033
.get(asset_path)
10221034
.and_then(|i| i.processed_info.as_ref())
@@ -1042,7 +1054,7 @@ impl AssetProcessor {
10421054
// Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
10431055
// See ProcessedAssetInfo::file_transaction_lock docs for more info
10441056
let _transaction_lock = {
1045-
let mut infos = self.data.asset_infos.write().await;
1057+
let mut infos = self.data.processing_state.asset_infos.write().await;
10461058
let info = infos.get_or_insert(asset_path.clone());
10471059
info.file_transaction_lock.write_arc().await
10481060
};
@@ -1199,24 +1211,12 @@ impl AssetProcessor {
11991211
impl AssetProcessorData {
12001212
/// Initializes a new [`AssetProcessorData`] using the given [`AssetSources`].
12011213
pub fn new(source: AssetSources) -> Self {
1202-
let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1203-
let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1204-
// allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1205-
// not block if there was older state present.
1206-
finished_sender.set_overflow(true);
1207-
initialized_sender.set_overflow(true);
1208-
12091214
AssetProcessorData {
1215+
processing_state: ProcessingState::new(),
12101216
sources: source,
1211-
finished_sender,
1212-
finished_receiver,
1213-
initialized_sender,
1214-
initialized_receiver,
1215-
state: async_lock::RwLock::new(ProcessorState::Initializing),
12161217
log_factory: Mutex::new(Some(Box::new(FileTransactionLogFactory::default()))),
12171218
log: Default::default(),
12181219
processors: Default::default(),
1219-
asset_infos: Default::default(),
12201220
default_processors: Default::default(),
12211221
}
12221222
}
@@ -1245,6 +1245,72 @@ impl AssetProcessorData {
12451245

12461246
/// Returns a future that will not finish until the path has been processed.
12471247
pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1248+
self.processing_state.wait_until_processed(path).await
1249+
}
1250+
1251+
/// Returns a future that will not finish until the processor has been initialized.
1252+
pub async fn wait_until_initialized(&self) {
1253+
self.processing_state.wait_until_initialized().await
1254+
}
1255+
1256+
/// Returns a future that will not finish until processing has finished.
1257+
pub async fn wait_until_finished(&self) {
1258+
self.processing_state.wait_until_finished().await
1259+
}
1260+
}
1261+
1262+
impl ProcessingState {
1263+
/// Creates a new empty processing state.
1264+
fn new() -> Self {
1265+
let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1266+
let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1267+
// allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1268+
// not block if there was older state present.
1269+
initialized_sender.set_overflow(true);
1270+
finished_sender.set_overflow(true);
1271+
1272+
Self {
1273+
state: async_lock::RwLock::new(ProcessorState::Initializing),
1274+
initialized_sender,
1275+
initialized_receiver,
1276+
finished_sender,
1277+
finished_receiver,
1278+
asset_infos: Default::default(),
1279+
}
1280+
}
1281+
1282+
/// Sets the overall state of processing and broadcasts appropriate events.
1283+
async fn set_state(&self, state: ProcessorState) {
1284+
let mut state_guard = self.state.write().await;
1285+
let last_state = *state_guard;
1286+
*state_guard = state;
1287+
if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
1288+
self.finished_sender.broadcast(()).await.unwrap();
1289+
} else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
1290+
self.initialized_sender.broadcast(()).await.unwrap();
1291+
}
1292+
}
1293+
1294+
/// Retrieves the current [`ProcessorState`]
1295+
pub(crate) async fn get_state(&self) -> ProcessorState {
1296+
*self.state.read().await
1297+
}
1298+
1299+
/// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur
1300+
/// while it is held.
1301+
pub(crate) async fn get_transaction_lock(
1302+
&self,
1303+
path: &AssetPath<'static>,
1304+
) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
1305+
let infos = self.asset_infos.read().await;
1306+
let info = infos
1307+
.get(path)
1308+
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
1309+
Ok(info.file_transaction_lock.read_arc().await)
1310+
}
1311+
1312+
/// Returns a future that will not finish until the path has been processed.
1313+
pub(crate) async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
12481314
self.wait_until_initialized().await;
12491315
let mut receiver = {
12501316
let infos = self.asset_infos.write().await;
@@ -1262,7 +1328,7 @@ impl AssetProcessorData {
12621328
}
12631329

12641330
/// Returns a future that will not finish until the processor has been initialized.
1265-
pub async fn wait_until_initialized(&self) {
1331+
pub(crate) async fn wait_until_initialized(&self) {
12661332
let receiver = {
12671333
let state = self.state.read().await;
12681334
match *state {
@@ -1280,7 +1346,7 @@ impl AssetProcessorData {
12801346
}
12811347

12821348
/// Returns a future that will not finish until processing has finished.
1283-
pub async fn wait_until_finished(&self) {
1349+
pub(crate) async fn wait_until_finished(&self) {
12841350
let receiver = {
12851351
let state = self.state.read().await;
12861352
match *state {

0 commit comments

Comments
 (0)