Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ use tokio::runtime::Handle;
use super::vrl_processing::*;
use crate::actors::Indexer;
use crate::models::{
NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch,
NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDoc,
RawDocBatch,
};

const PLAIN_TEXT: &str = "plain_text";
Expand Down Expand Up @@ -461,17 +462,20 @@ impl DocProcessor {
Ok(Some(timestamp))
}

fn process_raw_doc(&mut self, raw_doc: Bytes, processed_docs: &mut Vec<ProcessedDoc>) {
let num_bytes = raw_doc.len();
fn process_raw_doc(&mut self, raw_doc: RawDoc, processed_docs: &mut Vec<ProcessedDoc>) {
let num_bytes = raw_doc.doc.len();

#[cfg(feature = "vrl")]
let transform_opt = self.transform_opt.as_mut();
#[cfg(not(feature = "vrl"))]
let transform_opt: Option<&mut VrlProgram> = None;

for json_doc_result in parse_raw_doc(self.input_format, raw_doc, num_bytes, transform_opt) {
let processed_doc_result =
json_doc_result.and_then(|json_doc| self.process_json_doc(json_doc));
for json_doc_result in
parse_raw_doc(self.input_format, raw_doc.doc, num_bytes, transform_opt)
{
let processed_doc_result = json_doc_result.and_then(|json_doc| {
self.process_json_doc(json_doc, raw_doc.arrival_timestamp_secs_opt)
});

match processed_doc_result {
Ok(processed_doc) => {
Expand All @@ -491,7 +495,11 @@ impl DocProcessor {
}
}

fn process_json_doc(&self, json_doc: JsonDoc) -> Result<ProcessedDoc, DocProcessorError> {
fn process_json_doc(
&self,
json_doc: JsonDoc,
arrival_timestamp_secs_opt: Option<u64>,
) -> Result<ProcessedDoc, DocProcessorError> {
let num_bytes = json_doc.num_bytes;

let (partition, doc) = self
Expand All @@ -503,6 +511,7 @@ impl DocProcessor {
timestamp_opt,
partition,
num_bytes,
arrival_timestamp_secs_opt,
})
}
}
Expand Down Expand Up @@ -572,9 +581,10 @@ impl Handler<RawDocBatch> for DocProcessor {
if self.publish_lock.is_dead() {
return Ok(());
}
let mut processed_docs: Vec<ProcessedDoc> = Vec::with_capacity(raw_doc_batch.docs.len());
let mut processed_docs: Vec<ProcessedDoc> =
Vec::with_capacity(raw_doc_batch.raw_docs.len());

for raw_doc in raw_doc_batch.docs {
for raw_doc in raw_doc_batch.raw_docs {
let _protected_zone_guard = ctx.protect_zone();
self.process_raw_doc(raw_doc, &mut processed_docs);
ctx.record_progress();
Expand Down
30 changes: 30 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ impl IndexerState {
timestamp_opt,
partition,
num_bytes,
arrival_timestamp_secs_opt,
} = doc;
counters.num_docs_in_workbench += 1;
let (indexed_split, split_created) = self.get_or_create_indexed_split(
Expand All @@ -326,6 +327,12 @@ impl IndexerState {
if let Some(timestamp) = timestamp_opt {
record_timestamp(timestamp, &mut indexed_split.split_attrs.time_range);
}
if let Some(arrival_timestamp_millis) = arrival_timestamp_secs_opt {
record_arrival_timestamp(
arrival_timestamp_millis,
&mut indexed_split.split_attrs.min_arrival_timestamp_secs_opt,
);
}
let _protect_guard = ctx.protect_zone();
indexed_split
.index_writer
Expand Down Expand Up @@ -468,6 +475,14 @@ fn record_timestamp(timestamp: DateTime, time_range: &mut Option<RangeInclusive<
*time_range = Some(new_timestamp_range);
}

fn record_arrival_timestamp(
arrival_timestamp_millis: u64,
min_arrival_timestamp_secs_opt: &mut Option<u64>,
) {
let current_min = min_arrival_timestamp_secs_opt.get_or_insert(arrival_timestamp_millis);
*current_min = arrival_timestamp_millis.min(*current_min);
}

#[async_trait]
impl Handler<CommitTimeout> for Indexer {
type Reply = ();
Expand Down Expand Up @@ -797,6 +812,7 @@ mod tests {
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
partition: 1,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
},
ProcessedDoc {
doc: doc!(
Expand All @@ -806,6 +822,7 @@ mod tests {
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
partition: 1,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
},
],
SourceCheckpointDelta::from_range(4..6),
Expand All @@ -823,6 +840,7 @@ mod tests {
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435i64)),
partition: 1,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
},
ProcessedDoc {
doc: doc!(
Expand All @@ -832,6 +850,7 @@ mod tests {
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
partition: 1,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
},
],
SourceCheckpointDelta::from_range(6..8),
Expand All @@ -848,6 +867,7 @@ mod tests {
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
partition: 1,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
}],
SourceCheckpointDelta::from_range(8..9),
false,
Expand Down Expand Up @@ -933,6 +953,7 @@ mod tests {
timestamp_opt: None,
partition: 0,
num_bytes,
arrival_timestamp_secs_opt: None,
}
};
for i in 0..10_000 {
Expand Down Expand Up @@ -1011,6 +1032,7 @@ mod tests {
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
partition: 1,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
}],
SourceCheckpointDelta::from_range(position..position + 1),
false,
Expand Down Expand Up @@ -1089,6 +1111,7 @@ mod tests {
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
partition: 1,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
}],
SourceCheckpointDelta::from_range(8..9),
false,
Expand Down Expand Up @@ -1176,6 +1199,7 @@ mod tests {
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
partition: 1,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
}],
SourceCheckpointDelta::from_range(8..9),
false,
Expand Down Expand Up @@ -1260,6 +1284,7 @@ mod tests {
timestamp_opt: None,
partition: 1,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
},
ProcessedDoc {
doc: doc!(
Expand All @@ -1269,6 +1294,7 @@ mod tests {
timestamp_opt: None,
partition: 3,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
},
],
SourceCheckpointDelta::from_range(8..9),
Expand Down Expand Up @@ -1354,6 +1380,7 @@ mod tests {
timestamp_opt: None,
partition,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
}],
SourceCheckpointDelta::from_range(partition..partition + 1),
false,
Expand Down Expand Up @@ -1432,6 +1459,7 @@ mod tests {
timestamp_opt: None,
partition: 0,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
}],
SourceCheckpointDelta::from_range(0..1),
false,
Expand Down Expand Up @@ -1503,6 +1531,7 @@ mod tests {
timestamp_opt: None,
partition: 0,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
}],
SourceCheckpointDelta::from_range(0..1),
false,
Expand Down Expand Up @@ -1559,6 +1588,7 @@ mod tests {
timestamp_opt: None,
partition: 0,
num_bytes: 30,
arrival_timestamp_secs_opt: None,
}],
SourceCheckpointDelta::from_range(0..1),
true,
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ pub fn merge_split_attrs(
uncompressed_docs_size_in_bytes,
delete_opstamp,
num_merge_ops: max_merge_ops(splits) + 1,
min_arrival_timestamp_secs_opt: None,
})
}

Expand Down Expand Up @@ -470,6 +471,7 @@ impl MergeExecutor {
uncompressed_docs_size_in_bytes,
delete_opstamp: last_delete_opstamp,
num_merge_ops: split.num_merge_ops,
min_arrival_timestamp_secs_opt: None,
},
index: merged_index,
split_scratch_directory: merge_scratch_directory,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/actors/packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ mod tests {
replaced_split_ids: Vec::new(),
delete_opstamp: 0,
num_merge_ops: 0,
min_arrival_timestamp_secs_opt: None,
},
index,
split_scratch_directory,
Expand Down
20 changes: 17 additions & 3 deletions quickwit/quickwit-indexing/src/actors/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use fail::fail_point;
use quickwit_actors::{Actor, ActorContext, Handler, Mailbox, QueueCapacity};
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, PublishSplitsRequest};
use serde::Serialize;
use time::OffsetDateTime;
use tracing::{info, instrument, warn};

use crate::actors::MergePlanner;
use crate::metrics::INDEXER_METRICS;
use crate::models::{NewSplits, SplitsUpdate};
use crate::source::{SourceActor, SuggestTruncate};

Expand Down Expand Up @@ -143,6 +145,10 @@ impl Handler<SplitsUpdate> for Publisher {
.iter()
.map(|split| split.split_id.clone())
.collect();
let all_min_arrival_timestamp_secs: Vec<u64> = new_splits
.iter()
.flat_map(|split| split.min_arrival_timestamp_secs_opt)
.collect();
if let Some(_guard) = publish_lock.acquire().await {
let publish_splits_request = PublishSplitsRequest {
index_uid: Some(index_uid),
Expand All @@ -163,7 +169,7 @@ impl Handler<SplitsUpdate> for Publisher {
return Ok(());
}
info!("publish-new-splits");
if let Some(source_mailbox) = self.source_mailbox_opt.as_ref()
if let Some(source_mailbox) = &self.source_mailbox_opt
&& let Some(checkpoint) = checkpoint_delta_opt
{
// We voluntarily do not log anything here.
Expand All @@ -182,7 +188,6 @@ impl Handler<SplitsUpdate> for Publisher {
warn!(error=?send_truncate_err, "failed to send truncate message from publisher to source");
}
}

if !new_splits.is_empty() {
// The merge planner is not necessarily awake and this is not an error.
// For instance, when a source reaches its end, and the last "new" split
Expand All @@ -193,7 +198,6 @@ impl Handler<SplitsUpdate> for Publisher {
.send_message(merge_planner_mailbox, NewSplits { new_splits })
.await;
}

if replaced_split_ids.is_empty() {
self.counters.num_published_splits += 1;
} else {
Expand All @@ -202,6 +206,16 @@ impl Handler<SplitsUpdate> for Publisher {
} else {
self.counters.num_empty_splits += 1;
}
let now = OffsetDateTime::now_utc();
let now_timestamp = now.unix_timestamp() as u64;

for min_arrival_timestamp_secs in all_min_arrival_timestamp_secs {
if let Some(lag_secs) = now_timestamp.checked_sub(min_arrival_timestamp_secs) {
INDEXER_METRICS
.indexing_lag_seconds
.observe(lag_secs as f64);
}
}
fail_point!("publisher:after");
Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ mod tests {
split_id: "test-split".to_string(),
delete_opstamp: 10,
num_merge_ops: 0,
min_arrival_timestamp_secs_opt: None,
},
serialized_split_fields: Vec::new(),
split_scratch_directory,
Expand Down Expand Up @@ -707,6 +708,7 @@ mod tests {
],
delete_opstamp: 0,
num_merge_ops: 0,
min_arrival_timestamp_secs_opt: None,
},
serialized_split_fields: Vec::new(),
split_scratch_directory: split_scratch_directory_1,
Expand Down Expand Up @@ -734,6 +736,7 @@ mod tests {
],
delete_opstamp: 0,
num_merge_ops: 0,
min_arrival_timestamp_secs_opt: None,
},
serialized_split_fields: Vec::new(),
split_scratch_directory: split_scratch_directory_2,
Expand Down Expand Up @@ -854,6 +857,7 @@ mod tests {
replaced_split_ids: Vec::new(),
delete_opstamp: 10,
num_merge_ops: 0,
min_arrival_timestamp_secs_opt: None,
},
serialized_split_fields: Vec::new(),
split_scratch_directory,
Expand Down Expand Up @@ -1036,6 +1040,7 @@ mod tests {
split_id: SPLIT_ULID_STR.to_string(),
delete_opstamp: 10,
num_merge_ops: 0,
min_arrival_timestamp_secs_opt: None,
},
serialized_split_fields: Vec::new(),
split_scratch_directory,
Expand Down
12 changes: 10 additions & 2 deletions quickwit/quickwit-indexing/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
IntCounter, IntCounterVec, IntGauge, IntGaugeVec, new_counter, new_counter_vec, new_gauge,
new_gauge_vec,
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, linear_buckets,
new_counter, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, new_histogram_vec,
};

pub struct IndexerMetrics {
Expand All @@ -31,6 +31,7 @@ pub struct IndexerMetrics {
// We use a lazy counter, as most users do not use Kafka.
#[cfg_attr(not(feature = "kafka"), allow(dead_code))]
pub kafka_rebalance_total: Lazy<IntCounter>,
pub indexing_lag_seconds: Histogram,
}

impl Default for IndexerMetrics {
Expand Down Expand Up @@ -106,6 +107,13 @@ impl Default for IndexerMetrics {
&[],
)
}),
indexing_lag_seconds: new_histogram(
"indexing_lag_seconds",
"FIXME",
"indexing",
linear_buckets(0.0, 5.0, 120)
.expect("buckets should have a width and count greater than 0"),
),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/models/indexed_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl IndexedSplitBuilder {
time_range: None,
delete_opstamp: last_delete_opstamp,
num_merge_ops: 0,
min_arrival_timestamp_secs_opt: None,
},
index_writer,
split_scratch_directory,
Expand Down
Loading
Loading