From 8dbcf871aef2b882871f183de0a895b37970baab Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Thu, 20 Nov 2025 18:14:32 -0500 Subject: [PATCH] WIP --- .../src/actors/doc_processor.rs | 28 +++++++++++------ .../quickwit-indexing/src/actors/indexer.rs | 30 ++++++++++++++++++ .../src/actors/merge_executor.rs | 2 ++ .../quickwit-indexing/src/actors/packager.rs | 1 + .../quickwit-indexing/src/actors/publisher.rs | 20 ++++++++++-- .../quickwit-indexing/src/actors/uploader.rs | 5 +++ quickwit/quickwit-indexing/src/metrics.rs | 12 +++++-- .../src/models/indexed_split.rs | 1 + quickwit/quickwit-indexing/src/models/mod.rs | 2 +- .../src/models/processed_doc.rs | 1 + .../src/models/raw_doc_batch.rs | 31 ++++++++++++++----- .../src/models/split_attrs.rs | 6 ++++ .../src/source/doc_file_reader.rs | 4 +-- .../src/source/file_source.rs | 4 +-- .../src/source/ingest/mod.rs | 12 +++---- .../src/source/ingest_api_source.rs | 14 ++++----- .../src/source/kafka_source.rs | 17 ++++++++-- .../src/source/kinesis/kinesis_source.rs | 5 +-- quickwit/quickwit-indexing/src/source/mod.rs | 25 ++++++++++----- .../src/source/queue_sources/coordinator.rs | 17 +++++----- .../src/source/stdin_source.rs | 4 +-- .../src/source/vec_source.rs | 8 ++--- .../quickwit-metastore/src/split_metadata.rs | 23 +++++++++----- .../src/split_metadata_version.rs | 5 +++ 24 files changed, 204 insertions(+), 73 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 407c55ff526..d3f2b75833f 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -41,7 +41,8 @@ use tokio::runtime::Handle; use super::vrl_processing::*; use crate::actors::Indexer; use crate::models::{ - NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch, + NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDoc, + RawDocBatch, }; const PLAIN_TEXT: &str = "plain_text"; @@ -461,17 +462,20 @@ impl DocProcessor { Ok(Some(timestamp)) } - fn process_raw_doc(&mut self, raw_doc: Bytes, processed_docs: &mut Vec) { - let num_bytes = raw_doc.len(); + fn process_raw_doc(&mut self, raw_doc: RawDoc, processed_docs: &mut Vec) { + let num_bytes = raw_doc.doc.len(); #[cfg(feature = "vrl")] let transform_opt = self.transform_opt.as_mut(); #[cfg(not(feature = "vrl"))] let transform_opt: Option<&mut VrlProgram> = None; - for json_doc_result in parse_raw_doc(self.input_format, raw_doc, num_bytes, transform_opt) { - let processed_doc_result = - json_doc_result.and_then(|json_doc| self.process_json_doc(json_doc)); + for json_doc_result in + parse_raw_doc(self.input_format, raw_doc.doc, num_bytes, transform_opt) + { + let processed_doc_result = json_doc_result.and_then(|json_doc| { + self.process_json_doc(json_doc, raw_doc.arrival_timestamp_secs_opt) + }); match processed_doc_result { Ok(processed_doc) => { @@ -491,7 +495,11 @@ impl DocProcessor { } } - fn process_json_doc(&self, json_doc: JsonDoc) -> Result { + fn process_json_doc( + &self, + json_doc: JsonDoc, + arrival_timestamp_secs_opt: Option, + ) -> Result { let num_bytes = json_doc.num_bytes; let (partition, doc) = self @@ -503,6 +511,7 @@ impl DocProcessor { timestamp_opt, partition, num_bytes, + arrival_timestamp_secs_opt, }) } } @@ -572,9 +581,10 @@ impl Handler for DocProcessor { if self.publish_lock.is_dead() { return Ok(()); } - let mut processed_docs: Vec = Vec::with_capacity(raw_doc_batch.docs.len()); + let mut processed_docs: Vec = + Vec::with_capacity(raw_doc_batch.raw_docs.len()); - for raw_doc in raw_doc_batch.docs { + for raw_doc in raw_doc_batch.raw_docs { let _protected_zone_guard = ctx.protect_zone(); self.process_raw_doc(raw_doc, &mut processed_docs); ctx.record_progress(); diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 84ba3987f4a..ca6a07fd33c 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -305,6 +305,7 @@ impl IndexerState { timestamp_opt, partition, num_bytes, + arrival_timestamp_secs_opt, } = doc; counters.num_docs_in_workbench += 1; let (indexed_split, split_created) = self.get_or_create_indexed_split( @@ -326,6 +327,12 @@ impl IndexerState { if let Some(timestamp) = timestamp_opt { record_timestamp(timestamp, &mut indexed_split.split_attrs.time_range); } + if let Some(arrival_timestamp_millis) = arrival_timestamp_secs_opt { + record_arrival_timestamp( + arrival_timestamp_millis, + &mut indexed_split.split_attrs.min_arrival_timestamp_secs_opt, + ); + } let _protect_guard = ctx.protect_zone(); indexed_split .index_writer @@ -468,6 +475,14 @@ fn record_timestamp(timestamp: DateTime, time_range: &mut Option, +) { + let current_min = min_arrival_timestamp_secs_opt.get_or_insert(arrival_timestamp_millis); + *current_min = arrival_timestamp_millis.min(*current_min); +} + #[async_trait] impl Handler for Indexer { type Reply = (); @@ -797,6 +812,7 @@ mod tests { timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)), partition: 1, num_bytes: 30, + arrival_timestamp_secs_opt: None, }, ProcessedDoc { doc: doc!( @@ -806,6 +822,7 @@ mod tests { timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)), partition: 1, num_bytes: 30, + arrival_timestamp_secs_opt: None, }, ], SourceCheckpointDelta::from_range(4..6), @@ -823,6 +840,7 @@ mod tests { timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435i64)), partition: 1, num_bytes: 30, + arrival_timestamp_secs_opt: None, }, ProcessedDoc { doc: doc!( @@ -832,6 +850,7 @@ mod tests { timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)), partition: 1, num_bytes: 30, + arrival_timestamp_secs_opt: None, }, ], SourceCheckpointDelta::from_range(6..8), @@ -848,6 +867,7 @@ mod tests { timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)), partition: 1, num_bytes: 30, + arrival_timestamp_secs_opt: None, }], SourceCheckpointDelta::from_range(8..9), false, @@ -933,6 +953,7 @@ mod tests { timestamp_opt: None, partition: 0, num_bytes, + arrival_timestamp_secs_opt: None, } }; for i in 0..10_000 { @@ -1011,6 +1032,7 @@ mod tests { timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)), partition: 1, num_bytes: 30, + arrival_timestamp_secs_opt: None, }], SourceCheckpointDelta::from_range(position..position + 1), false, @@ -1089,6 +1111,7 @@ mod tests { timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)), partition: 1, num_bytes: 30, + arrival_timestamp_secs_opt: None, }], SourceCheckpointDelta::from_range(8..9), false, @@ -1176,6 +1199,7 @@ mod tests { timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)), partition: 1, num_bytes: 30, + arrival_timestamp_secs_opt: None, }], SourceCheckpointDelta::from_range(8..9), false, @@ -1260,6 +1284,7 @@ mod tests { timestamp_opt: None, partition: 1, num_bytes: 30, + arrival_timestamp_secs_opt: None, }, ProcessedDoc { doc: doc!( @@ -1269,6 +1294,7 @@ mod tests { timestamp_opt: None, partition: 3, num_bytes: 30, + arrival_timestamp_secs_opt: None, }, ], SourceCheckpointDelta::from_range(8..9), @@ -1354,6 +1380,7 @@ mod tests { timestamp_opt: None, partition, num_bytes: 30, + arrival_timestamp_secs_opt: None, }], SourceCheckpointDelta::from_range(partition..partition + 1), false, @@ -1432,6 +1459,7 @@ mod tests { timestamp_opt: None, partition: 0, num_bytes: 30, + arrival_timestamp_secs_opt: None, }], SourceCheckpointDelta::from_range(0..1), false, @@ -1503,6 +1531,7 @@ mod tests { timestamp_opt: None, partition: 0, num_bytes: 30, + arrival_timestamp_secs_opt: None, }], SourceCheckpointDelta::from_range(0..1), false, @@ -1559,6 +1588,7 @@ mod tests { timestamp_opt: None, partition: 0, num_bytes: 30, + arrival_timestamp_secs_opt: None, }], SourceCheckpointDelta::from_range(0..1), true, diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 3ca35a06c5b..0ec159bd855 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -287,6 +287,7 @@ pub fn merge_split_attrs( uncompressed_docs_size_in_bytes, delete_opstamp, num_merge_ops: max_merge_ops(splits) + 1, + min_arrival_timestamp_secs_opt: None, }) } @@ -470,6 +471,7 @@ impl MergeExecutor { uncompressed_docs_size_in_bytes, delete_opstamp: last_delete_opstamp, num_merge_ops: split.num_merge_ops, + min_arrival_timestamp_secs_opt: None, }, index: merged_index, split_scratch_directory: merge_scratch_directory, diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index 9dbb1fd8963..b5e1e4b0431 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -529,6 +529,7 @@ mod tests { replaced_split_ids: Vec::new(), delete_opstamp: 0, num_merge_ops: 0, + min_arrival_timestamp_secs_opt: None, }, index, split_scratch_directory, diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index b05081be706..fd05b7dd876 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -18,9 +18,11 @@ use fail::fail_point; use quickwit_actors::{Actor, ActorContext, Handler, Mailbox, QueueCapacity}; use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, PublishSplitsRequest}; use serde::Serialize; +use time::OffsetDateTime; use tracing::{info, instrument, warn}; use crate::actors::MergePlanner; +use crate::metrics::INDEXER_METRICS; use crate::models::{NewSplits, SplitsUpdate}; use crate::source::{SourceActor, SuggestTruncate}; @@ -143,6 +145,10 @@ impl Handler for Publisher { .iter() .map(|split| split.split_id.clone()) .collect(); + let all_min_arrival_timestamp_secs: Vec = new_splits + .iter() + .flat_map(|split| split.min_arrival_timestamp_secs_opt) + .collect(); if let Some(_guard) = publish_lock.acquire().await { let publish_splits_request = PublishSplitsRequest { index_uid: Some(index_uid), @@ -163,7 +169,7 @@ impl Handler for Publisher { return Ok(()); } info!("publish-new-splits"); - if let Some(source_mailbox) = self.source_mailbox_opt.as_ref() + if let Some(source_mailbox) = &self.source_mailbox_opt && let Some(checkpoint) = checkpoint_delta_opt { // We voluntarily do not log anything here. @@ -182,7 +188,6 @@ impl Handler for Publisher { warn!(error=?send_truncate_err, "failed to send truncate message from publisher to source"); } } - if !new_splits.is_empty() { // The merge planner is not necessarily awake and this is not an error. // For instance, when a source reaches its end, and the last "new" split @@ -193,7 +198,6 @@ impl Handler for Publisher { .send_message(merge_planner_mailbox, NewSplits { new_splits }) .await; } - if replaced_split_ids.is_empty() { self.counters.num_published_splits += 1; } else { @@ -202,6 +206,16 @@ impl Handler for Publisher { } else { self.counters.num_empty_splits += 1; } + let now = OffsetDateTime::now_utc(); + let now_timestamp = now.unix_timestamp() as u64; + + for min_arrival_timestamp_secs in all_min_arrival_timestamp_secs { + if let Some(lag_secs) = now_timestamp.checked_sub(min_arrival_timestamp_secs) { + INDEXER_METRICS + .indexing_lag_seconds + .observe(lag_secs as f64); + } + } fail_point!("publisher:after"); Ok(()) } diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index d6fe2751aaa..75e9dd08a1d 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -593,6 +593,7 @@ mod tests { split_id: "test-split".to_string(), delete_opstamp: 10, num_merge_ops: 0, + min_arrival_timestamp_secs_opt: None, }, serialized_split_fields: Vec::new(), split_scratch_directory, @@ -707,6 +708,7 @@ mod tests { ], delete_opstamp: 0, num_merge_ops: 0, + min_arrival_timestamp_secs_opt: None, }, serialized_split_fields: Vec::new(), split_scratch_directory: split_scratch_directory_1, @@ -734,6 +736,7 @@ mod tests { ], delete_opstamp: 0, num_merge_ops: 0, + min_arrival_timestamp_secs_opt: None, }, serialized_split_fields: Vec::new(), split_scratch_directory: split_scratch_directory_2, @@ -854,6 +857,7 @@ mod tests { replaced_split_ids: Vec::new(), delete_opstamp: 10, num_merge_ops: 0, + min_arrival_timestamp_secs_opt: None, }, serialized_split_fields: Vec::new(), split_scratch_directory, @@ -1036,6 +1040,7 @@ mod tests { split_id: SPLIT_ULID_STR.to_string(), delete_opstamp: 10, num_merge_ops: 0, + min_arrival_timestamp_secs_opt: None, }, serialized_split_fields: Vec::new(), split_scratch_directory, diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index d3186616252..3d9ff7e69ce 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -14,8 +14,8 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - IntCounter, IntCounterVec, IntGauge, IntGaugeVec, new_counter, new_counter_vec, new_gauge, - new_gauge_vec, + Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, linear_buckets, + new_counter, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, new_histogram_vec, }; pub struct IndexerMetrics { @@ -31,6 +31,7 @@ pub struct IndexerMetrics { // We use a lazy counter, as most users do not use Kafka. #[cfg_attr(not(feature = "kafka"), allow(dead_code))] pub kafka_rebalance_total: Lazy, + pub indexing_lag_seconds: Histogram, } impl Default for IndexerMetrics { @@ -106,6 +107,13 @@ impl Default for IndexerMetrics { &[], ) }), + indexing_lag_seconds: new_histogram( + "indexing_lag_seconds", + "FIXME", + "indexing", + linear_buckets(0.0, 5.0, 120) + .expect("buckets should have a width and count greater than 0"), + ), } } } diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index cd272bdc34c..31c3a96c45c 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -110,6 +110,7 @@ impl IndexedSplitBuilder { time_range: None, delete_opstamp: last_delete_opstamp, num_merge_ops: 0, + min_arrival_timestamp_secs_opt: None, }, index_writer, split_scratch_directory, diff --git a/quickwit/quickwit-indexing/src/models/mod.rs b/quickwit/quickwit-indexing/src/models/mod.rs index 9dfdfde1594..e8fd0c371a9 100644 --- a/quickwit/quickwit-indexing/src/models/mod.rs +++ b/quickwit/quickwit-indexing/src/models/mod.rs @@ -44,7 +44,7 @@ pub use processed_doc::{ProcessedDoc, ProcessedDocBatch}; pub use publish_lock::{NewPublishLock, PublishLock}; pub use publisher_message::SplitsUpdate; use quickwit_proto::types::PublishToken; -pub use raw_doc_batch::RawDocBatch; +pub use raw_doc_batch::{RawDoc, RawDocBatch}; pub(crate) use shard_positions::LocalShardPositionsUpdate; pub use shard_positions::ShardPositionsService; pub use split_attrs::{SplitAttrs, create_split_metadata}; diff --git a/quickwit/quickwit-indexing/src/models/processed_doc.rs b/quickwit/quickwit-indexing/src/models/processed_doc.rs index bed695aa1d4..9c20041aa0f 100644 --- a/quickwit/quickwit-indexing/src/models/processed_doc.rs +++ b/quickwit/quickwit-indexing/src/models/processed_doc.rs @@ -23,6 +23,7 @@ pub struct ProcessedDoc { pub timestamp_opt: Option, pub partition: u64, pub num_bytes: usize, + pub arrival_timestamp_secs_opt: Option, } impl fmt::Debug for ProcessedDoc { diff --git a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs index f88d9fcac2b..77d0a2d3d4c 100644 --- a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs +++ b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs @@ -18,10 +18,16 @@ use bytes::Bytes; use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; +#[derive(Debug)] +pub struct RawDoc { + pub doc: Bytes, + pub arrival_timestamp_secs_opt: Option, +} + pub struct RawDocBatch { // Do not directly append documents to this vector; otherwise, in-flight metrics will be // incorrect. - pub docs: Vec, + pub raw_docs: Vec, pub checkpoint_delta: SourceCheckpointDelta, pub force_commit: bool, _gauge_guard: GaugeGuard<'static>, @@ -29,17 +35,20 @@ pub struct RawDocBatch { impl RawDocBatch { pub fn new( - docs: Vec, + raw_docs: Vec, checkpoint_delta: SourceCheckpointDelta, force_commit: bool, ) -> Self { - let delta = docs.iter().map(|doc| doc.len() as i64).sum::(); + let delta = raw_docs + .iter() + .map(|raw_doc| raw_doc.doc.len() as i64) + .sum::(); let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.doc_processor_mailbox); gauge_guard.add(delta); Self { - docs, + raw_docs, checkpoint_delta, force_commit, _gauge_guard: gauge_guard, @@ -48,9 +57,15 @@ impl RawDocBatch { #[cfg(test)] pub fn for_test(docs: &[&[u8]], range: std::ops::Range) -> Self { - let docs = docs.iter().map(|doc| Bytes::from(doc.to_vec())).collect(); + let raw_docs = docs + .iter() + .map(|doc| RawDoc { + doc: Bytes::from(doc.to_vec()), + arrival_timestamp_secs_opt: None, + }) + .collect(); let checkpoint_delta = SourceCheckpointDelta::from_range(range); - Self::new(docs, checkpoint_delta, false) + Self::new(raw_docs, checkpoint_delta, false) } } @@ -58,7 +73,7 @@ impl fmt::Debug for RawDocBatch { fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { formatter .debug_struct("RawDocBatch") - .field("num_docs", &self.docs.len()) + .field("num_docs", &self.raw_docs.len()) .field("checkpoint_delta", &self.checkpoint_delta) .field("force_commit", &self.force_commit) .finish() @@ -69,7 +84,7 @@ impl Default for RawDocBatch { fn default() -> Self { let _gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.doc_processor_mailbox); Self { - docs: Vec::new(), + raw_docs: Vec::new(), checkpoint_delta: SourceCheckpointDelta::default(), force_commit: false, _gauge_guard, diff --git a/quickwit/quickwit-indexing/src/models/split_attrs.rs b/quickwit/quickwit-indexing/src/models/split_attrs.rs index 2e0504bdd35..a6cd75ca07b 100644 --- a/quickwit/quickwit-indexing/src/models/split_attrs.rs +++ b/quickwit/quickwit-indexing/src/models/split_attrs.rs @@ -67,6 +67,11 @@ pub struct SplitAttrs { // Number of merge operation the split has been through so far. pub num_merge_ops: usize, + + /// Earliest arrival timestamp (in milliseconds since the Unix epoch) of all documents in the + /// split. In other words: `min(doc.arrival_timestamp_millis for doc in split)`. + /// This is used to track ingestion and indexing lag. + pub min_arrival_timestamp_secs_opt: Option, } impl fmt::Debug for SplitAttrs { @@ -125,6 +130,7 @@ pub fn create_split_metadata( footer_offsets, delete_opstamp: split_attrs.delete_opstamp, num_merge_ops: split_attrs.num_merge_ops, + min_arrival_timestamp_secs_opt: split_attrs.min_arrival_timestamp_secs_opt, } } diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs index 90bc99c01ad..d7881115194 100644 --- a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -197,7 +197,7 @@ impl ObjectUriBatchReader { .await? { new_offset = record.next_offset as usize; - batch_builder.add_doc(record.doc); + batch_builder.add_doc(record.doc, None); if record.is_last { self.is_eof = true; break; @@ -475,7 +475,7 @@ mod tests { .read_batch(&progress, SourceType::Unspecified) .await .unwrap(); - parsed_lines += batch.docs.len(); + parsed_lines += batch.raw_docs.len(); parsed_batches += 1; checkpoint_delta.extend(batch.checkpoint_delta).unwrap(); } diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index 9271e8bb3a6..f73346f614a 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -87,7 +87,7 @@ impl Source for FileSource { .read_batch(ctx.progress(), self.source_type) .await?; *num_bytes_processed += batch_builder.num_bytes; - *num_lines_processed += batch_builder.docs.len() as u64; + *num_lines_processed += batch_builder.raw_docs.len() as u64; doc_processor_mailbox .send_message(batch_builder.build()) .await?; @@ -388,7 +388,7 @@ mod tests { ); let indexer_messages: Vec = doc_processor_inbox.drain_for_test_typed(); assert_eq!( - indexer_messages[0].docs[0], + indexer_messages[0].raw_docs[0].doc, Bytes::from_static(b"0000002\n") ); } diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index d9e21affb87..5a054d40029 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -219,7 +219,7 @@ impl IngestSource { for mrecord in decoded_mrecords(mrecord_batch) { match mrecord { MRecord::Doc(doc) => { - batch_builder.add_doc(doc); + batch_builder.add_doc(doc, None); } MRecord::Commit => { batch_builder.force_commit(); @@ -496,7 +496,7 @@ impl Source for IngestSource { } if !batch_builder.checkpoint_delta.is_empty() { debug!( - num_docs=%batch_builder.docs.len(), + num_docs=%batch_builder.raw_docs.len(), num_bytes=%batch_builder.num_bytes, num_millis=%now.elapsed().as_millis(), "Sending doc batch to indexer." @@ -1476,10 +1476,10 @@ mod tests { .recv_typed_message::() .await .unwrap(); - assert_eq!(doc_batch.docs.len(), 3); - assert_eq!(doc_batch.docs[0], "test-doc-foo"); - assert_eq!(doc_batch.docs[1], "test-doc-bar"); - assert_eq!(doc_batch.docs[2], "test-doc-qux"); + assert_eq!(doc_batch.raw_docs.len(), 3); + assert_eq!(doc_batch.raw_docs[0].doc, "test-doc-foo"); + assert_eq!(doc_batch.raw_docs[1].doc, "test-doc-bar"); + assert_eq!(doc_batch.raw_docs[2].doc, "test-doc-qux"); assert!(doc_batch.force_commit); let partition_deltas = doc_batch diff --git a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs index 96feaec8fb3..db1da04aae9 100644 --- a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs +++ b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs @@ -183,7 +183,7 @@ impl Source for IngestApiSource { BatchBuilder::with_capacity(doc_batch.num_docs(), SourceType::IngestV1); for doc in doc_batch.into_iter() { match doc { - DocCommand::Ingest { payload } => batch_builder.add_doc(payload), + DocCommand::Ingest { payload } => batch_builder.add_doc(payload, None), DocCommand::Commit => batch_builder.force_commit(), } } @@ -201,7 +201,7 @@ impl Source for IngestApiSource { ) .map_err(anyhow::Error::from)?; - self.update_counters(current_offset, batch_builder.docs.len() as u64); + self.update_counters(current_offset, batch_builder.raw_docs.len() as u64); ctx.send_message(batch_sink, batch_builder.build()).await?; Ok(Duration::default()) } @@ -344,7 +344,7 @@ mod tests { ); let doc_batches: Vec = doc_processor_inbox.drain_for_test_typed(); assert_eq!(doc_batches.len(), 2); - assert!(&doc_batches[1].docs[0].starts_with(b"037736")); + assert!(&doc_batches[1].raw_docs[0].doc.starts_with(b"037736")); // TODO: Source deadlocks and test hangs occasionally if we don't quit source first. ingest_api_source_handle.quit().await; universe.assert_quit().await; @@ -448,7 +448,7 @@ mod tests { ); let doc_batches: Vec = doc_processor_inbox.drain_for_test_typed(); assert_eq!(doc_batches.len(), 1); - assert!(&doc_batches[0].docs[0].starts_with(b"001201")); + assert!(&doc_batches[0].raw_docs[0].doc.starts_with(b"001201")); assert_eq!(doc_batches[0].checkpoint_delta.num_partitions(), 1); assert_eq!( doc_batches[0].checkpoint_delta.partitions().next().unwrap(), @@ -505,7 +505,7 @@ mod tests { ); let doc_batches: Vec = doc_processor_inbox.drain_for_test_typed(); assert_eq!(doc_batches.len(), 1); - assert!(&doc_batches[0].docs[0].starts_with(b"000000")); + assert!(&doc_batches[0].raw_docs[0].doc.starts_with(b"000000")); // TODO: Source deadlocks and test hangs occasionally if we don't quit source first. ingest_api_source_handle.quit().await; universe.assert_quit().await; @@ -556,7 +556,7 @@ mod tests { ); let doc_batches: Vec = doc_processor_inbox.drain_for_test_typed(); assert_eq!(doc_batches.len(), 2); - assert!(doc_batches[1].docs[0].starts_with(b"037736")); + assert!(doc_batches[1].raw_docs[0].doc.starts_with(b"037736")); assert!(doc_batches[0].force_commit); assert!(doc_batches[1].force_commit); ingest_api_service @@ -619,7 +619,7 @@ mod tests { ); let doc_batches: Vec = doc_processor_inbox.drain_for_test_typed(); assert_eq!(doc_batches.len(), 2); - assert!(doc_batches[1].docs[0].starts_with(b"037736")); + assert!(doc_batches[1].raw_docs[0].doc.starts_with(b"037736")); assert!(!doc_batches[0].force_commit); assert!(!doc_batches[1].force_commit); ingest_api_service diff --git a/quickwit/quickwit-indexing/src/source/kafka_source.rs b/quickwit/quickwit-indexing/src/source/kafka_source.rs index bc5b400a9bc..1a648d74262 100644 --- a/quickwit/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit/quickwit-indexing/src/source/kafka_source.rs @@ -33,7 +33,7 @@ use rdkafka::consumer::{ use rdkafka::error::KafkaError; use rdkafka::message::BorrowedMessage; use rdkafka::util::Timeout; -use rdkafka::{ClientContext, Message, Offset, TopicPartitionList}; +use rdkafka::{ClientContext, Message, Offset, Timestamp, TopicPartitionList}; use serde_json::{Value as JsonValue, json}; use tokio::sync::{mpsc, watch}; use tokio::task::{JoinHandle, spawn_blocking}; @@ -85,15 +85,25 @@ struct KafkaMessage { payload_len: u64, partition: i32, offset: i64, + /// Arrival timestamp (in milliseconds since the Unix epoch) of the record in the system. + /// This is used to track ingestion and indexing lag. + arrival_timestamp_secs_opt: Option, } impl From> for KafkaMessage { fn from(message: BorrowedMessage<'_>) -> Self { + let arrival_timestamp_secs_opt = match message.timestamp() { + Timestamp::LogAppendTime(timestamp_millis) if timestamp_millis > 0 => { + Some(timestamp_millis as u64 / 1_000) + } + _ => None, + }; Self { doc_opt: message_payload_to_doc(&message), payload_len: message.payload_len() as u64, partition: message.partition(), offset: message.offset(), + arrival_timestamp_secs_opt, } } } @@ -296,11 +306,12 @@ impl KafkaSource { payload_len, partition, offset, + arrival_timestamp_secs_opt, .. } = message; if let Some(doc) = doc_opt { - batch.add_doc(doc); + batch.add_doc(doc, arrival_timestamp_secs_opt); } else { self.state.num_invalid_messages += 1; } @@ -486,7 +497,7 @@ impl Source for KafkaSource { } if !batch_builder.checkpoint_delta.is_empty() { debug!( - num_docs=%batch_builder.docs.len(), + num_docs=%batch_builder.raw_docs.len(), num_bytes=%batch_builder.num_bytes, num_millis=%now.elapsed().as_millis(), "sending doc batch to indexer" diff --git a/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs b/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs index 5d2e2ec00c9..da43e3e235f 100644 --- a/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs +++ b/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs @@ -253,7 +253,8 @@ impl Source for KinesisSource { self.state.num_invalid_records += 1; continue; } - batch_builder.add_doc(Bytes::from(record_data)); + let arrival_timestamp_secs_opt = record.approximate_arrival_timestamp.and_then(|datetime| datetime.to_millis().ok()).map(|millis| millis as u64); + batch_builder.add_doc(Bytes::from(record_data), arrival_timestamp_secs_opt); if i == num_records - 1 { let shard_consumer_state = self @@ -310,7 +311,7 @@ impl Source for KinesisSource { } } self.state.num_bytes_processed += batch_builder.num_bytes; - self.state.num_records_processed += batch_builder.docs.len() as u64; + self.state.num_records_processed += batch_builder.raw_docs.len() as u64; if !batch_builder.checkpoint_delta.is_empty() { ctx.send_message(indexer_mailbox, batch_builder.build()) diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 20e3effda87..1974d96e996 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -117,7 +117,7 @@ pub use void_source::{VoidSource, VoidSourceFactory}; use self::doc_file_reader::dir_and_filename; use self::stdin_source::StdinSourceFactory; use crate::actors::DocProcessor; -use crate::models::RawDocBatch; +use crate::models::{RawDoc, RawDocBatch}; use crate::source::ingest::IngestSourceFactory; use crate::source::ingest_api_source::IngestApiSourceFactory; @@ -510,7 +510,7 @@ impl Handler for SourceActor { pub(super) struct BatchBuilder { // Do not directly append documents to this vector; otherwise, in-flight metrics will be // incorrect. Use `add_doc` instead. - docs: Vec, + raw_docs: Vec, num_bytes: u64, checkpoint_delta: SourceCheckpointDelta, force_commit: bool, @@ -535,7 +535,7 @@ impl BatchBuilder { let gauge_guard = GaugeGuard::from_gauge(gauge); Self { - docs: Vec::with_capacity(capacity), + raw_docs: Vec::with_capacity(capacity), num_bytes: 0, checkpoint_delta: SourceCheckpointDelta::default(), force_commit: false, @@ -543,9 +543,13 @@ impl BatchBuilder { } } - pub fn add_doc(&mut self, doc: Bytes) { + pub fn add_doc(&mut self, doc: Bytes, arrival_timestamp_secs_opt: Option) { let num_bytes = doc.len(); - self.docs.push(doc); + let raw_doc = RawDoc { + doc, + arrival_timestamp_secs_opt, + }; + self.raw_docs.push(raw_doc); self.gauge_guard.add(num_bytes as i64); self.num_bytes += num_bytes as u64; } @@ -554,13 +558,20 @@ impl BatchBuilder { self.force_commit = true; } + // pub fn record_arrival_timestamp(&mut self, arrival_timestamp_millis: u64) { + // let current_min = self + // .min_arrival_timestamp_secs_opt + // .get_or_insert(arrival_timestamp_millis); + // *current_min = arrival_timestamp_millis.min(*current_min); + // } + pub fn build(self) -> RawDocBatch { - RawDocBatch::new(self.docs, self.checkpoint_delta, self.force_commit) + RawDocBatch::new(self.raw_docs, self.checkpoint_delta, self.force_commit) } #[cfg(feature = "kafka")] pub fn clear(&mut self) { - self.docs.clear(); + self.raw_docs.clear(); self.checkpoint_delta = SourceCheckpointDelta::default(); self.gauge_guard.sub(self.num_bytes as i64); self.num_bytes = 0; diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs index fadb4282c37..a72dad26ff0 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs @@ -260,7 +260,7 @@ impl QueueCoordinator { .batch_reader .read_batch(ctx.progress(), self.source_type) .await?; - self.observable_state.num_lines_processed += batch_builder.docs.len() as u64; + self.observable_state.num_lines_processed += batch_builder.raw_docs.len() as u64; self.observable_state.num_bytes_processed += batch_builder.num_bytes; doc_processor_mailbox .send_message(batch_builder.build()) @@ -428,7 +428,7 @@ mod tests { let partition_id = PreProcessedPayload::ObjectUri(test_uri.clone()).partition_id(); let batches = process_messages(&mut coordinator, queue, &[(&test_uri, "ack-id")]).await; assert_eq!(batches.len(), 1); - assert_eq!(batches[0].docs.len(), 10); + assert_eq!(batches[0].raw_docs.len(), 10); assert!(coordinator.local_state.is_awaiting_commit(&partition_id)); } @@ -442,7 +442,10 @@ mod tests { let test_uri = Uri::from_str(dummy_doc_file.path().to_str().unwrap()).unwrap(); let batches = process_messages(&mut coordinator, queue, &[(&test_uri, "ack-id")]).await; assert_eq!(batches.len(), 2); - assert_eq!(batches.iter().map(|b| b.docs.len()).sum::(), lines); + assert_eq!( + batches.iter().map(|b| b.raw_docs.len()).sum::(), + lines + ); } #[tokio::test] @@ -461,7 +464,7 @@ mod tests { ) .await; // could be generated in 1 or 2 batches, it doesn't matter - assert_eq!(batches.iter().map(|b| b.docs.len()).sum::(), 20); + assert_eq!(batches.iter().map(|b| b.raw_docs.len()).sum::(), 20); } #[tokio::test] @@ -478,7 +481,7 @@ mod tests { ) .await; assert_eq!(batches.len(), 1); - assert_eq!(batches.iter().map(|b| b.docs.len()).sum::(), 10); + assert_eq!(batches.iter().map(|b| b.raw_docs.len()).sum::(), 10); } #[tokio::test] @@ -559,7 +562,7 @@ mod tests { ) .await; assert_eq!(batches.len(), 2); - assert_eq!(batches.iter().map(|b| b.docs.len()).sum::(), 18); + assert_eq!(batches.iter().map(|b| b.raw_docs.len()).sum::(), 18); assert!(coordinator.local_state.is_awaiting_commit(&partition_id_1)); assert!(coordinator.local_state.is_awaiting_commit(&partition_id_2)); } @@ -578,7 +581,7 @@ mod tests { let batches_2 = process_messages(&mut coord_2, queue, &[(&test_uri, "ack2")]).await; assert_eq!(batches_1.len(), 1); - assert_eq!(batches_1[0].docs.len(), 10); + assert_eq!(batches_1[0].raw_docs.len(), 10); assert!(coord_1.local_state.is_awaiting_commit(&partition_id)); // proc_2 learns from shared state that the message is likely still // being processed and skips it diff --git a/quickwit/quickwit-indexing/src/source/stdin_source.rs b/quickwit/quickwit-indexing/src/source/stdin_source.rs index 881dca5770d..d6a6f0903db 100644 --- a/quickwit/quickwit-indexing/src/source/stdin_source.rs +++ b/quickwit/quickwit-indexing/src/source/stdin_source.rs @@ -48,7 +48,7 @@ impl StdinBatchReader { .protect_future(self.reader.read_line(&mut buf)) .await?; if bytes_read > 0 { - batch_builder.add_doc(buf.into()); + batch_builder.add_doc(buf.into(), None); } else { self.is_eof = true; break; @@ -84,7 +84,7 @@ impl Source for StdinSource { ) -> Result { let batch_builder = self.reader.read_batch(ctx.progress()).await?; self.num_bytes_processed += batch_builder.num_bytes; - self.num_lines_processed += batch_builder.docs.len() as u64; + self.num_lines_processed += batch_builder.raw_docs.len() as u64; doc_processor_mailbox .send_message(batch_builder.build()) .await?; diff --git a/quickwit/quickwit-indexing/src/source/vec_source.rs b/quickwit/quickwit-indexing/src/source/vec_source.rs index a66204b72fa..a4929020660 100644 --- a/quickwit/quickwit-indexing/src/source/vec_source.rs +++ b/quickwit/quickwit-indexing/src/source/vec_source.rs @@ -94,15 +94,15 @@ impl Source for VecSource { .take(self.source_params.batch_num_docs) .cloned() { - batch_builder.add_doc(doc); + batch_builder.add_doc(doc, None); } - if batch_builder.docs.is_empty() { + if batch_builder.raw_docs.is_empty() { info!("reached end of source"); ctx.send_exit_with_success(batch_sink).await?; return Err(ActorExitStatus::Success); } let from_item_idx = self.next_item_idx; - self.next_item_idx += batch_builder.docs.len(); + self.next_item_idx += batch_builder.raw_docs.len(); let to_item_idx = self.next_item_idx; batch_builder.checkpoint_delta = SourceCheckpointDelta::from_partition_delta( @@ -227,7 +227,7 @@ mod tests { assert_eq!(last_observation, json!({"next_item_idx": 10})); let messages = doc_processor_inbox.drain_for_test(); let batch = messages[0].downcast_ref::().unwrap(); - assert_eq!(&batch.docs[0], "2"); + assert_eq!(&batch.raw_docs[0].doc, "2"); Ok(()) } } diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index fe88fe379d3..66dd919fb99 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -132,6 +132,11 @@ pub struct SplitMetadata { /// Doc mapping UID used when creating this split. This split may only be merged with other /// splits using the same doc mapping UID. pub doc_mapping_uid: DocMappingUid, + + /// Earliest arrival timestamp (in milliseconds since the Unix epoch) of all documents in the + /// split. In other words: `min(doc.arrival_timestamp_millis for doc in split)`. + /// This is used to track ingestion lag. + pub min_arrival_timestamp_secs_opt: Option, } impl fmt::Debug for SplitMetadata { @@ -281,6 +286,7 @@ impl quickwit_config::TestableForRegression for SplitMetadata { footer_offsets: 1000..2000, num_merge_ops: 3, doc_mapping_uid: DocMappingUid::default(), + min_arrival_timestamp_secs_opt: Some(1763681493923), } } @@ -421,16 +427,17 @@ mod tests { delete_opstamp: 0, num_merge_ops: 0, doc_mapping_uid: DocMappingUid::default(), + min_arrival_timestamp_secs_opt: Some(1763681493923), }; - let expected_output = "SplitMetadata { split_id: \"split-1\", index_uid: IndexUid { \ - index_id: \"00000000-0000-0000-0000-000000000000\", \ - incarnation_id: Ulid(0) }, partition_id: 0, source_id: \ - \"source-1\", node_id: \"node-1\", num_docs: 100, \ - uncompressed_docs_size_in_bytes: 1024, time_range: Some(0..=100), \ - create_timestamp: 1629867600, maturity: Mature, tags: \ - \"{\\\"🐱\\\", \\\"😻\\\", \\\"😼\\\", \\\"😿\\\", and 1 more}\", \ - footer_offsets: 0..1024, delete_opstamp: 0, num_merge_ops: 0 }"; + let expected_output = + "SplitMetadata { split_id: \"split-1\", index_uid: IndexUid { index_id: \ + \"00000000-0000-0000-0000-000000000000\", incarnation_id: Ulid(0) }, partition_id: \ + 0, source_id: \"source-1\", node_id: \"node-1\", num_docs: 100, \ + uncompressed_docs_size_in_bytes: 1024, time_range: Some(0..=100), create_timestamp: \ + 1629867600, maturity: Mature, tags: \"{\\\"🐱\\\", \\\"😻\\\", \\\"😼\\\", \ + \\\"😿\\\", and 1 more}\", footer_offsets: 0..1024, delete_opstamp: 0, \ + num_merge_ops: 0, min_arrival_timestamp_secs_opt: Some(1763681493923) }"; assert_eq!(format!("{split_metadata:?}"), expected_output); } diff --git a/quickwit/quickwit-metastore/src/split_metadata_version.rs b/quickwit/quickwit-metastore/src/split_metadata_version.rs index 8325290be92..97b24331d7a 100644 --- a/quickwit/quickwit-metastore/src/split_metadata_version.rs +++ b/quickwit/quickwit-metastore/src/split_metadata_version.rs @@ -92,6 +92,9 @@ pub(crate) struct SplitMetadataV0_8 { // splits before when updates first appeared are compatible with each other. #[serde(default)] doc_mapping_uid: DocMappingUid, + + #[serde(default)] + min_arrival_timestamp_secs_opt: Option, } impl From for SplitMetadata { @@ -128,6 +131,7 @@ impl From for SplitMetadata { footer_offsets: v8.footer_offsets, num_merge_ops: v8.num_merge_ops, doc_mapping_uid: v8.doc_mapping_uid, + min_arrival_timestamp_secs_opt: v8.min_arrival_timestamp_secs_opt, } } } @@ -150,6 +154,7 @@ impl From for SplitMetadataV0_8 { footer_offsets: split.footer_offsets, num_merge_ops: split.num_merge_ops, doc_mapping_uid: split.doc_mapping_uid, + min_arrival_timestamp_secs_opt: split.min_arrival_timestamp_secs_opt, } } }