Skip to content

Commit e24d1e8

Browse files
authored
fix: show continuous stats during batch updates (#1204)
* fix: show continuous stats during batch updates * feat: improve stats display with indicatif
1 parent 8e17156 commit e24d1e8

File tree

3 files changed

+105
-2
lines changed

3 files changed

+105
-2
lines changed

Cargo.lock

Lines changed: 45 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ blake2 = "0.10.6"
6565
pgvector = { version = "0.4.1", features = ["sqlx", "halfvec"] }
6666
phf = { version = "0.12.1", features = ["macros"] }
6767
indenter = "0.3.4"
68+
indicatif = "0.17.9"
6869
itertools = "0.14.0"
6970
derivative = "2.2.0"
7071
hex = "0.4.3"

src/execution/live_updater.rs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88

99
use super::stats;
1010
use futures::future::try_join_all;
11+
use indicatif::ProgressBar;
1112
use sqlx::PgPool;
1213
use tokio::{sync::watch, task::JoinSet, time::MissedTickBehavior};
1314

@@ -329,7 +330,47 @@ impl SourceUpdateTask {
329330
update_options: super::source_indexer::UpdateOptions,
330331
) -> Result<()> {
331332
let update_stats = Arc::new(stats::UpdateStats::default());
332-
source_indexing_context
333+
334+
// Spawn periodic stats reporting task if print_stats is enabled
335+
let (reporting_handle, progress_bar) = if self.options.print_stats {
336+
let update_stats_clone = update_stats.clone();
337+
let update_title_owned = update_title.to_string();
338+
let flow_name = self.flow.flow_instance.name.clone();
339+
let import_op_name = self.import_op().name.clone();
340+
341+
// Create a progress bar that will overwrite the same line
342+
let pb = ProgressBar::new_spinner();
343+
pb.set_style(
344+
indicatif::ProgressStyle::default_spinner()
345+
.template("{msg}")
346+
.unwrap(),
347+
);
348+
let pb_clone = pb.clone();
349+
350+
let report_task = async move {
351+
let mut interval = tokio::time::interval(REPORT_INTERVAL);
352+
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
353+
interval.tick().await; // Skip first tick
354+
355+
loop {
356+
interval.tick().await;
357+
let current_stats = update_stats_clone.as_ref().clone();
358+
if current_stats.has_any_change() {
359+
// Show cumulative stats (always show latest total, not delta)
360+
pb_clone.set_message(format!(
361+
"{}.{} ({update_title_owned}): {}",
362+
flow_name, import_op_name, current_stats
363+
));
364+
}
365+
}
366+
};
367+
(Some(tokio::spawn(report_task)), Some(pb))
368+
} else {
369+
(None, None)
370+
};
371+
372+
// Run the actual update
373+
let update_result = source_indexing_context
333374
.update(&self.pool, &update_stats, update_options)
334375
.await
335376
.with_context(|| {
@@ -338,12 +379,28 @@ impl SourceUpdateTask {
338379
self.flow.flow_instance.name,
339380
self.import_op().name
340381
)
341-
})?;
382+
});
383+
384+
// Cancel the reporting task if it was spawned
385+
if let Some(handle) = reporting_handle {
386+
handle.abort();
387+
}
388+
389+
// Clear the progress bar to ensure final stats appear on a new line
390+
if let Some(pb) = progress_bar {
391+
pb.finish_and_clear();
392+
}
393+
394+
// Check update result
395+
update_result?;
396+
342397
if update_stats.has_any_change() {
343398
self.status_tx.send_modify(|update| {
344399
update.source_updates_num[self.source_idx] += 1;
345400
});
346401
}
402+
403+
// Report final stats
347404
self.report_stats(&update_stats, update_title);
348405
Ok(())
349406
}

0 commit comments

Comments
 (0)