diff --git a/Cargo.lock b/Cargo.lock index de46032f7b3f3..c6239d136efd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4039,6 +4039,7 @@ dependencies = [ "databend-common-base", "databend-common-exception", "databend-common-expression", + "databend-common-tracing", "fastrace", "futures", "log", @@ -4069,6 +4070,7 @@ dependencies = [ "databend-common-license", "databend-common-pipeline", "databend-common-sql", + "databend-common-tracing", "databend-storages-common-cache", "enum-as-inner", "fastrace", @@ -4191,6 +4193,7 @@ dependencies = [ "databend-common-settings", "databend-common-storage", "databend-common-storages-basic", + "databend-common-tracing", "databend-common-users", "databend-enterprise-data-mask-feature", "databend-enterprise-row-access-policy-feature", @@ -4370,6 +4373,7 @@ dependencies = [ "databend-common-pipeline-transforms", "databend-common-sql", "databend-common-storage", + "databend-common-tracing", "databend-common-users", "databend-enterprise-fail-safe", "databend-enterprise-vacuum-handler", @@ -4699,6 +4703,7 @@ dependencies = [ "defer", "fastrace", "fastrace-opentelemetry", + "inventory", "itertools 0.13.0", "jiff", "libc", diff --git a/Cargo.toml b/Cargo.toml index 1548088d75883..eeef2fd943d79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -547,6 +547,7 @@ cargo_metadata = "0.19" fast-float2 = "0.2.3" gix = "0.71.0" indent = "0.1.1" +inventory = "0.3.15" logos = "0.12.1" nom = "8.0.0" nom-rule = "0.5.1" diff --git a/src/common/tracing/Cargo.toml b/src/common/tracing/Cargo.toml index 4dbd568d0f0ca..45e8ed36d56dd 100644 --- a/src/common/tracing/Cargo.toml +++ b/src/common/tracing/Cargo.toml @@ -19,6 +19,7 @@ databend-common-meta-app-storage = { workspace = true } defer = { workspace = true } fastrace = { workspace = true } fastrace-opentelemetry = { workspace = true } +inventory = { workspace = true } itertools = { workspace = true } jiff = { workspace = true } libc = { workspace = true } diff --git a/src/common/tracing/src/lib.rs b/src/common/tracing/src/lib.rs index 6a891e5a7b4d7..48b4634315a48 100644 --- a/src/common/tracing/src/lib.rs +++ b/src/common/tracing/src/lib.rs @@ -22,6 +22,7 @@ mod crash_hook; mod filter; mod init; mod loggers; +pub mod module_tag; mod panic_hook; mod remote_log; mod structlog; diff --git a/src/common/tracing/src/loggers.rs b/src/common/tracing/src/loggers.rs index 2d6f8c91f0560..856366ab3641e 100644 --- a/src/common/tracing/src/loggers.rs +++ b/src/common/tracing/src/loggers.rs @@ -36,6 +36,8 @@ use logforth::Layout; use serde_json::Map; use serde_json::Value as JsonValue; +use crate::module_tag::label_for_module; + const PRINTER: DateTimePrinter = DateTimePrinter::new().precision(Some(6)); pub fn format_timestamp(zdt: &Zoned) -> String { @@ -123,18 +125,15 @@ impl Layout for TextLayout { PRINTER.print_timestamp_with_offset(×tamp, offset, &mut buf)?; } - write!( - buf, - " {:>5} {}: {}:{} {}", - record.level(), - record.module_path().unwrap_or(""), - Path::new(record.file().unwrap_or_default()) - .file_name() - .and_then(|name| name.to_str()) - .unwrap_or_default(), - record.line().unwrap_or(0), - record.args(), - )?; + let level = record.level(); + let module = record.module_path().map(label_for_module).unwrap_or(""); + let log_file = Path::new(record.file().unwrap_or_default()) + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or_default(); + let line = record.line().unwrap_or(0); + let msg = record.args(); + write!(buf, " {level:>5} {module}: {log_file}:{line} {msg}")?; record.key_values().visit(&mut KvWriter(&mut buf))?; Ok(buf) diff --git a/src/common/tracing/src/module_tag.rs b/src/common/tracing/src/module_tag.rs new file mode 100644 index 0000000000000..2ab8f34a2ac56 --- /dev/null +++ b/src/common/tracing/src/module_tag.rs @@ -0,0 +1,238 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::OnceLock; + +pub use inventory::submit; + +/// A mapping between a module path prefix and a short log label. +#[derive(Debug, Clone, Copy)] +pub struct ModuleTag { + pub prefix: &'static str, + pub label: &'static str, +} + +inventory::collect!(ModuleTag); + +static MODULE_TAG_TRIE: OnceLock> = OnceLock::new(); + +/// Resolve the best matching label for the provided module path. +pub fn label_for_module(full_name: &str) -> &str { + if let Some(label) = MODULE_TAG_TRIE + .get_or_init(|| Trie::build(inventory::iter::.into_iter().copied())) + .search(full_name) + { + label + } else { + full_name + } +} + +#[macro_export] +macro_rules! register_module_tag { + ($label:expr $(,)?) => { + $crate::module_tag::submit! { + $crate::module_tag::ModuleTag { + prefix: module_path!(), + label: $label, + } + } + }; + ($label:expr, $prefix:expr $(,)?) => { + $crate::module_tag::submit! { + $crate::module_tag::ModuleTag { + prefix: $prefix, + label: $label, + } + } + }; +} + +struct Trie<'a> { + root: TrieNode<'a>, +} + +impl<'a> Trie<'a> { + fn build(iter: I) -> Self + where I: Iterator { + let mut root = TrieNode::default(); + for entry in iter { + root.insert(entry.prefix, entry.label); + } + Trie { root } + } + + fn search(&self, query: &str) -> Option<&'a str> { + self.root.search(query.as_bytes()) + } +} + +#[derive(Default)] +struct TrieNode<'a> { + label: Option<&'a str>, + segment: &'a [u8], + children: Vec>, +} + +impl<'a> TrieNode<'a> { + fn insert(&mut self, prefix: &'a str, label: &'a str) -> bool { + self.insert_inner(prefix.as_bytes(), label) + } + + fn insert_inner(&mut self, bytes: &'a [u8], label: &'a str) -> bool { + if bytes.is_empty() { + return match &self.label { + Some(_) => false, + None => { + self.label = Some(label); + true + } + }; + } + + for child in &mut self.children { + let lcp = child.segment_match_len(bytes); + if lcp == 0 { + continue; + } + + if lcp < child.segment.len() { + child.split_at(lcp); + } + + return child.insert_inner(&bytes[lcp..], label); + } + + self.children.push(TrieNode { + label: Some(label), + segment: bytes, + children: Vec::new(), + }); + true + } + + fn search(&self, query: &[u8]) -> Option<&'a str> { + let mut best = self.label; + let mut current = self; + let mut offset = 0; + + loop { + let mut matched = false; + for child in ¤t.children { + let matched_len = child.segment_match_len(&query[offset..]); + if matched_len == child.segment.len() { + offset += matched_len; + current = child; + if current.label.is_some() { + best = current.label; + } + matched = true; + break; + } + } + + if !matched { + break; + } + } + + best + } + + fn split_at(&mut self, mid: usize) { + let prefix = &self.segment[..mid]; + let suffix = &self.segment[mid..]; + + let suffix_node = TrieNode { + segment: suffix, + ..std::mem::take(self) + }; + + self.segment = prefix; + self.children = vec![suffix_node]; + } + + fn segment_match_len(&self, bytes: &[u8]) -> usize { + let mut idx = 0; + while idx < bytes.len() && idx < self.segment.len() && bytes[idx] == self.segment[idx] { + idx += 1; + } + idx + } +} + +#[cfg(test)] +mod tests { + use super::*; + + inventory::submit! { + ModuleTag { + prefix: "module::submodule", + label: "[SUB]", + } + } + + inventory::submit! { + ModuleTag { + prefix: "module", + label: "[MODULE]", + } + } + + #[test] + fn chooses_longest_prefix() { + assert_eq!(label_for_module("module::submodule::leaf"), "[SUB]"); + } + + #[test] + fn falls_back_to_full_module_name() { + assert_eq!(label_for_module("unknown::module"), "unknown::module"); + } + + #[test] + fn trie_search_returns_none_without_prefix() { + let trie = Trie::build( + [ + ModuleTag { + prefix: "foo", + label: "foo", + }, + ModuleTag { + prefix: "baraa", + label: "baraa", + }, + ModuleTag { + prefix: "bar", + label: "bar", + }, + ModuleTag { + prefix: "ba", + label: "ba", + }, + ModuleTag { + prefix: "b", + label: "b", + }, + ] + .into_iter(), + ); + + assert_eq!(trie.search("baz"), Some("ba")); + assert_eq!(trie.search("bar"), Some("bar")); + assert_eq!(trie.search("baa"), Some("ba")); + assert_eq!(trie.search("barista"), Some("bar")); + assert_eq!(trie.search("food"), Some("foo")); + assert_eq!(trie.search("qux"), None); + } +} diff --git a/src/meta/client/src/lib.rs b/src/meta/client/src/lib.rs index 205452b61d64a..24ffae65e292b 100644 --- a/src/meta/client/src/lib.rs +++ b/src/meta/client/src/lib.rs @@ -16,6 +16,8 @@ extern crate core; +databend_common_tracing::register_module_tag!("[META_CLIENT]"); + mod channel_manager; mod client_handle; pub mod endpoints; diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs b/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs index cf90670b90fbe..12c89d2e1bd76 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[FUSE-VACUUM2] ...". +databend_common_tracing::register_module_tag!("[FUSE-VACUUM2]"); + use std::collections::HashSet; use std::sync::Arc; @@ -89,7 +92,7 @@ pub async fn do_vacuum2( { if ctx.txn_mgr().lock().is_active() { info!( - "[FUSE-VACUUM2] Transaction is active, skipping vacuum, target table {}", + "Transaction is active, skipping vacuum, target table {}", table.get_table_info().desc ); return Ok(vec![]); @@ -100,7 +103,7 @@ pub async fn do_vacuum2( let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else { info!( - "[FUSE-VACUUM2] Table {} has no snapshot, stopping vacuum", + "Table {} has no snapshot, stopping vacuum", fuse_table.get_table_info().desc ); return Ok(vec![]); @@ -116,10 +119,7 @@ pub async fn do_vacuum2( let snapshots_before_lvt = match retention_policy { RetentionPolicy::ByTimePeriod(delta_duration) => { - info!( - "[FUSE-VACUUM2] Using ByTimePeriod policy {:?}", - delta_duration - ); + info!("Using ByTimePeriod policy {:?}", delta_duration); let retention_period = if fuse_table.is_transient() { // For transient table, keep no history data TimeDelta::zero() @@ -141,7 +141,7 @@ pub async fn do_vacuum2( } ctx.set_status_info(&format!( - "[FUSE-VACUUM2] Set LVT for table {}, elapsed: {:?}, LVT: {:?}", + "Set LVT for table {}, elapsed: {:?}, LVT: {:?}", fuse_table.get_table_info().desc, start.elapsed(), lvt @@ -153,7 +153,7 @@ pub async fn do_vacuum2( } RetentionPolicy::ByNumOfSnapshotsToKeep(num_snapshots_to_keep) => { info!( - "[FUSE-VACUUM2] Using ByNumOfSnapshotsToKeep policy {:?}", + "Using ByNumOfSnapshotsToKeep policy {:?}", num_snapshots_to_keep ); // List the snapshot order by timestamp asc, till the current snapshot(inclusively). @@ -195,7 +195,7 @@ pub async fn do_vacuum2( let elapsed = start.elapsed(); ctx.set_status_info(&format!( - "[FUSE-VACUUM2] Listed snapshots for table {}, elapsed: {:?}, snapshots_dir: {:?}, snapshots: {:?}", + "Listed snapshots for table {}, elapsed: {:?}, snapshots_dir: {:?}, snapshots: {:?}", fuse_table.get_table_info().desc, elapsed, fuse_table @@ -217,7 +217,7 @@ pub async fn do_vacuum2( return Ok(vec![]); }; ctx.set_status_info(&format!( - "[FUSE-VACUUM2] Selected gc_root for table {}, elapsed: {:?}, gc_root: {:?}, snapshots_to_gc: {:?}", + "Selected gc_root for table {}, elapsed: {:?}, gc_root: {:?}, snapshots_to_gc: {:?}", fuse_table.get_table_info().desc, start.elapsed(), gc_root, @@ -246,7 +246,7 @@ pub async fn do_vacuum2( .collect::>(); ctx.set_status_info(&format!( - "[FUSE-VACUUM2] Listed segments before gc_root for table {}, elapsed: {:?}, segment_dir: {:?}, gc_root_timestamp: {:?}, segments: {:?}", + "Listed segments before gc_root for table {}, elapsed: {:?}, segment_dir: {:?}, gc_root_timestamp: {:?}, segments: {:?}", fuse_table.get_table_info().desc, start.elapsed(), fuse_table.meta_location_generator().segment_location_prefix(), @@ -264,7 +264,7 @@ pub async fn do_vacuum2( .map(|v| TableMetaLocationGenerator::gen_segment_stats_location_from_segment_location(v)) .collect::>(); ctx.set_status_info(&format!( - "[FUSE-VACUUM2] Filtered segments_to_gc for table {}, elapsed: {:?}, segments_to_gc: {:?}, stats_to_gc: {:?}", + "Filtered segments_to_gc for table {}, elapsed: {:?}, segments_to_gc: {:?}, stats_to_gc: {:?}", fuse_table.get_table_info().desc, start.elapsed(), slice_summary(&segments_to_gc), @@ -282,7 +282,7 @@ pub async fn do_vacuum2( gc_root_blocks.extend(segment?.block_metas()?.iter().map(|b| b.location.0.clone())); } ctx.set_status_info(&format!( - "[FUSE-VACUUM2] Read segments for table {}, elapsed: {:?}", + "Read segments for table {}, elapsed: {:?}", fuse_table.get_table_info().desc, start.elapsed(), )); @@ -301,7 +301,7 @@ pub async fn do_vacuum2( .collect::>(); ctx.set_status_info(&format!( - "[FUSE-VACUUM2] Listed blocks before gc_root for table {}, elapsed: {:?}, block_dir: {:?}, gc_root_timestamp: {:?}, blocks: {:?}", + "Listed blocks before gc_root for table {}, elapsed: {:?}, block_dir: {:?}, gc_root_timestamp: {:?}, blocks: {:?}", fuse_table.get_table_info().desc, start.elapsed(), fuse_table.meta_location_generator().block_location_prefix(), @@ -315,7 +315,7 @@ pub async fn do_vacuum2( .filter(|b| !gc_root_blocks.contains(b)) .collect(); ctx.set_status_info(&format!( - "[FUSE-VACUUM2] Filtered blocks_to_gc for table {}, elapsed: {:?}, blocks_to_gc: {:?}", + "Filtered blocks_to_gc for table {}, elapsed: {:?}, blocks_to_gc: {:?}", fuse_table.get_table_info().desc, start.elapsed(), slice_summary(&blocks_to_gc) @@ -355,7 +355,7 @@ pub async fn do_vacuum2( } ctx.set_status_info(&format!( - "[FUSE-VACUUM2] Collected indexes_to_gc for table {}, elapsed: {:?}, indexes_to_gc: {:?}", + "Collected indexes_to_gc for table {}, elapsed: {:?}, indexes_to_gc: {:?}", fuse_table.get_table_info().desc, start.elapsed(), slice_summary(&indexes_to_gc) @@ -399,7 +399,7 @@ pub async fn do_vacuum2( .chain(indexes_to_gc.into_iter()) .collect(); ctx.set_status_info(&format!( - "[FUSE-VACUUM2] Removed files for table {}, elapsed: {:?}, files_to_gc: {:?}", + "Removed files for table {}, elapsed: {:?}, files_to_gc: {:?}", fuse_table.get_table_info().desc, start.elapsed(), slice_summary(&files_to_gc) @@ -450,7 +450,7 @@ async fn set_lvt( ) -> Result>> { if !is_uuid_v7(&latest_snapshot.snapshot_id) { info!( - "[FUSE-VACUUM2] Latest snapshot is not v7, stopping vacuum: {:?}", + "Latest snapshot is not v7, stopping vacuum: {:?}", latest_snapshot.snapshot_id ); return Ok(None); @@ -482,7 +482,7 @@ async fn list_until_prefix( need_one_more: bool, gc_root_meta_ts: Option>, ) -> Result> { - info!("[FUSE-VACUUM2] Listing files until prefix: {}", until); + info!("Listing files until prefix: {}", until); let dal = fuse_table.get_operator_ref(); match dal.info().scheme() { diff --git a/src/query/pipeline/Cargo.toml b/src/query/pipeline/Cargo.toml index 31aed0a486a2a..a25856d1ea022 100644 --- a/src/query/pipeline/Cargo.toml +++ b/src/query/pipeline/Cargo.toml @@ -12,6 +12,7 @@ async-trait = { workspace = true } databend-common-base = { workspace = true } databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } +databend-common-tracing = { workspace = true } fastrace = { workspace = true } futures = { workspace = true } log = { workspace = true } diff --git a/src/query/pipeline/src/core/processor.rs b/src/query/pipeline/src/core/processor.rs index cc3ab1cbdc533..375ac68d83e4f 100644 --- a/src/query/pipeline/src/core/processor.rs +++ b/src/query/pipeline/src/core/processor.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[PIPELINE-EXECUTOR] ...". +databend_common_tracing::register_module_tag!("[PIPELINE-EXECUTOR]"); + use std::any::Any; use std::cell::UnsafeCell; use std::ops::Deref; @@ -176,7 +179,7 @@ impl ProcessorPtr { ("error.message", err.display_text()), ] }); - log::info!(error = err.to_string(); "[PIPELINE-EXECUTOR] Error in process"); + log::info!(error = err.to_string(); "Error in process"); Err(err) } } @@ -216,7 +219,7 @@ impl ProcessorPtr { ("error.message", err.display_text()), ] }); - log::info!(error = err.to_string(); "[PIPELINE-EXECUTOR] Error in process"); + log::info!(error = err.to_string(); "Error in process"); Err(err) } } diff --git a/src/query/pipeline/transforms/Cargo.toml b/src/query/pipeline/transforms/Cargo.toml index eab90fbba0864..04d0c739a83fc 100644 --- a/src/query/pipeline/transforms/Cargo.toml +++ b/src/query/pipeline/transforms/Cargo.toml @@ -21,6 +21,7 @@ databend-common-functions = { workspace = true } databend-common-license = { workspace = true } databend-common-pipeline = { workspace = true } databend-common-sql = { workspace = true } +databend-common-tracing = { workspace = true } databend-storages-common-cache = { workspace = true } enum-as-inner = { workspace = true } fastrace = { workspace = true } diff --git a/src/query/pipeline/transforms/src/lib.rs b/src/query/pipeline/transforms/src/lib.rs index bd5cd9f716288..33c5f2f7e5d10 100644 --- a/src/query/pipeline/transforms/src/lib.rs +++ b/src/query/pipeline/transforms/src/lib.rs @@ -21,5 +21,8 @@ #![feature(assert_matches)] #![feature(debug_closure_helpers)] +// Logs from this crate will show up as "...". +databend_common_tracing::register_module_tag!("[TRANSFORMS]"); + pub mod processors; pub use processors::*; diff --git a/src/query/service/src/auth.rs b/src/query/service/src/auth.rs index e00f08ae342b4..6138b5fd87a44 100644 --- a/src/query/service/src/auth.rs +++ b/src/query/service/src/auth.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[AUTH] ...". +databend_common_tracing::register_module_tag!("[AUTH]"); + use std::sync::Arc; use databend_common_base::base::BuildInfoRef; @@ -127,14 +130,15 @@ impl AuthMgr { token: t, client_ip, } => { - let jwt_auth = self - .jwt_auth - .as_ref() - .ok_or_else(|| ErrorCode::AuthenticateFailure("[AUTH] JWT authentication failed: JWT auth is not configured on this server"))?; + let jwt_auth = self.jwt_auth.as_ref().ok_or_else(|| { + ErrorCode::AuthenticateFailure( + "JWT authentication failed: JWT auth is not configured on this server", + ) + })?; let jwt = jwt_auth.parse_jwt_claims(t.as_str()).await?; let user_name = jwt.subject.ok_or_else(|| { ErrorCode::AuthenticateFailure( - "[AUTH] JWT authentication failed: subject claim (user name) is missing in the token", + "JWT authentication failed: subject claim (user name) is missing in the token", ) })?; @@ -154,7 +158,7 @@ impl AuthMgr { { Ok(mut user_info) => { if user_info.auth_info != AuthInfo::JWT { - return Err(ErrorCode::AuthenticateFailure("[AUTH] Authentication failed: user exists but is not configured for JWT authentication")); + return Err(ErrorCode::AuthenticateFailure("Authentication failed: user exists but is not configured for JWT authentication")); } if let Some(ensure_user) = jwt.custom.ensure_user { let current_roles = user_info.grants.roles(); @@ -163,7 +167,7 @@ impl AuthMgr { for role in roles.iter() { if !current_roles.contains(role) { info!( - "[AUTH] JWT grant role to user: {} -> {}", + "JWT grant role to user: {} -> {}", user_info.name, role ); user_api @@ -182,7 +186,7 @@ impl AuthMgr { // grant default role to user if not exists if !current_roles.contains(jwt_default_role) { info!( - "[AUTH] JWT grant default role to user: {} -> {}", + "JWT grant default role to user: {} -> {}", user_info.name, jwt_default_role ); user_api @@ -197,7 +201,7 @@ impl AuthMgr { // ensure default role to jwt role if user_info.option.default_role() != Some(jwt_default_role) { info!( - "[AUTH] JWT update default role for user: {} -> {}", + "JWT update default role for user: {} -> {}", user_info.name, jwt_default_role ); user_api @@ -223,7 +227,7 @@ impl AuthMgr { } _ => { return Err(ErrorCode::AuthenticateFailure(format!( - "[AUTH] Authentication failed: {}", + "Authentication failed: {}", e.message() ))) } @@ -231,7 +235,7 @@ impl AuthMgr { let ensure_user = jwt .custom .ensure_user - .ok_or_else(|| ErrorCode::AuthenticateFailure(format!("[AUTH] JWT authentication failed: ensure_user claim is missing and user does not exist: {}", e.message())))?; + .ok_or_else(|| ErrorCode::AuthenticateFailure(format!("JWT authentication failed: ensure_user claim is missing and user does not exist: {}", e.message())))?; // create a new user if not exists let mut user_info = UserInfo::new(&user_name, "%", AuthInfo::JWT); if let Some(ref roles) = ensure_user.roles { @@ -247,7 +251,7 @@ impl AuthMgr { .option .set_default_role(Some(default_role.clone())); } - info!("[AUTH] JWT create user: {}", user_info.name); + info!("JWT create user: {}", user_info.name); user_api .add_user(&tenant, user_info.clone(), &CreateOption::CreateIfNotExists) .await?; @@ -306,16 +310,16 @@ impl AuthMgr { hash_method: t, .. } => match p { - None => Err(ErrorCode::AuthenticateFailure("[AUTH] Authentication failed: password is required but was not provided")), + None => Err(ErrorCode::AuthenticateFailure("Authentication failed: password is required but was not provided")), Some(p) => { if *h == t.hash(p) { Ok(()) } else { - Err(ErrorCode::AuthenticateFailure("[AUTH] Authentication failed: incorrect password")) + Err(ErrorCode::AuthenticateFailure("Authentication failed: incorrect password")) } } }, - _ => Err(ErrorCode::AuthenticateFailure("[AUTH] Authentication failed: user exists but is not configured for password authentication")), + _ => Err(ErrorCode::AuthenticateFailure("Authentication failed: user exists but is not configured for password authentication")), }; UserApiProvider::instance() .update_user_login_result(tenant, identity, authed.is_ok(), &user) diff --git a/src/query/service/src/history_tables/alter_table.rs b/src/query/service/src/history_tables/alter_table.rs index 58084e1bbc9d0..2e1193b598128 100644 --- a/src/query/service/src/history_tables/alter_table.rs +++ b/src/query/service/src/history_tables/alter_table.rs @@ -125,7 +125,7 @@ pub async fn should_reset( // Internal -> External if current_storage_params.is_none() && connection.is_some() { info!( - "[HISTORY-TABLES] Converting internal table to external table, current None vs new {:?}", + "Converting internal table to external table, current None vs new {:?}", connection ); return Ok(true); @@ -134,7 +134,7 @@ pub async fn should_reset( // External -> Internal if current_storage_params.is_some() && connection.is_none() { info!( - "[HISTORY-TABLES] Converting external table to internal table, current {:?} vs new None", + "Converting external table to internal table, current {:?} vs new None", current_storage_params ); return Ok(false); @@ -156,11 +156,11 @@ pub async fn should_reset( // return error to prevent cyclic conversion if current_storage_params != Some(&new_storage_params) { info!( - "[HISTORY-TABLES] Storage parameters have changed, current {:?} vs new {:?}", + "Storage parameters have changed, current {:?} vs new {:?}", current_storage_params, new_storage_params ); return Err(ErrorCode::InvalidConfig( - "[HISTORY-TABLES] Cannot change storage parameters of external history table, please drop the tables and stage first." + "Cannot change storage parameters of external history table, please drop the tables and stage first." )); } } diff --git a/src/query/service/src/history_tables/global_history_log.rs b/src/query/service/src/history_tables/global_history_log.rs index 3196ee2d6e109..7a6cea19c05c0 100644 --- a/src/query/service/src/history_tables/global_history_log.rs +++ b/src/query/service/src/history_tables/global_history_log.rs @@ -115,9 +115,7 @@ impl GlobalHistoryLog { }); GlobalInstance::set(instance); if cfg.log.history.log_only { - info!( - "[HISTORY-TABLES] History tables transform is disabled, only logging is enabled." - ); + info!("History tables transform is disabled, only logging is enabled."); return Ok(()); } runtime.spawn(async move { @@ -152,7 +150,7 @@ impl GlobalHistoryLog { self.wait_for_initialization().await; self.prepare().await?; self.update_operator(true).await?; - info!("[HISTORY-TABLES] System history prepared successfully"); + info!("System history prepared successfully"); let mut handles = vec![]; @@ -255,7 +253,7 @@ impl GlobalHistoryLog { get_alter_table_sql(self.create_context().await?, &create_table, &table.name) .await?; for alter_sql in get_alter_sql { - info!("[HISTORY-TABLES] executing alter table: {}", alter_sql); + info!("executing alter table: {}", alter_sql); self.execute_sql(&alter_sql).await?; } } @@ -263,7 +261,7 @@ impl GlobalHistoryLog { } pub async fn reset(&self) -> Result<()> { - info!("[HISTORY-TABLES] Resetting system history tables"); + info!("Resetting system history tables"); let drop_stage = format!("DROP STAGE IF EXISTS {}", self.stage_name); self.execute_sql(&drop_stage).await?; @@ -330,9 +328,7 @@ impl GlobalHistoryLog { { let mut params = self.current_params.lock(); if params.as_ref() != params_from_meta.as_ref() || force { - info!( - "[HISTORY-TABLES] log_history table storage params changed, update log operator" - ); + info!("log_history table storage params changed, update log operator"); *params = params_from_meta.clone(); } else { return Ok(()); @@ -362,7 +358,7 @@ impl GlobalHistoryLog { let vacuum = format!("VACUUM TABLE system_history.{}", table.name); self.execute_sql(&vacuum).await?; info!( - "[HISTORY-TABLES] periodic VACUUM operation on history log table '{}' completed successfully.", + "periodic VACUUM operation on history log table '{}' completed successfully.", table.name ); } @@ -391,19 +387,13 @@ impl GlobalHistoryLog { continue; } Err(e) => { - error!( - "[HISTORY-TABLES] {} failed to create heartbeat, retry: {}", - table.name, e - ); + error!("{} failed to create heartbeat, retry: {}", table.name, e); sleep(self.transform_sleep_duration()).await; continue; } }; - debug!( - "[HISTORY-TABLES] {} acquired heartbeat, starting work loop", - table.name - ); + debug!("{} acquired heartbeat, starting work loop", table.name); // 2. Start to work on the task, hold the heartbeat guard during the work let mut transform_cnt = 0; @@ -411,10 +401,7 @@ impl GlobalHistoryLog { // Check if heartbeat is lost or cancelled, indicating another instance took over // or the task should be terminated if heartbeat_guard.exited() { - info!( - "[HISTORY-TABLES] {} lost heartbeat, releasing and retrying", - table.name - ); + info!("{} lost heartbeat, releasing and retrying", table.name); break; } match self.transform(&table, &meta_key).await { @@ -428,19 +415,19 @@ impl GlobalHistoryLog { let temp_count = error_counters.increment_temporary(); let backoff_second = error_counters.calculate_temp_backoff(); warn!( - "[HISTORY-TABLES] {} log transform failed with temporary error {}, count {}, next retry in {} seconds", + "{} log transform failed with temporary error {}, count {}, next retry in {} seconds", table.name, e, temp_count, backoff_second ); sleep(Duration::from_secs(backoff_second)).await; } else { let persistent_count = error_counters.increment_persistent(); error!( - "[HISTORY-TABLES] {} log transform failed with persistent error {}, retry count {}", + "{} log transform failed with persistent error {}, retry count {}", table.name, e, persistent_count ); if error_counters.persistent_exceeded_limit() { error!( - "[HISTORY-TABLES] {} log transform failed too many times, giving up", + "{} log transform failed too many times, giving up", table.name ); break; @@ -461,10 +448,7 @@ impl GlobalHistoryLog { self.meta_handle.is_heartbeat_valid(&heartbeat_key).await { if !valid { - info!( - "[HISTORY-TABLES] {} heartbeat lost during transform", - table.name - ); + info!("{} heartbeat lost during transform", table.name); break; } } @@ -477,7 +461,7 @@ impl GlobalHistoryLog { break; } } - debug!("[HISTORY-TABLES] {} released heartbeat", table.name); + debug!("{} released heartbeat", table.name); if error_counters.persistent_exceeded_limit() { return; @@ -493,7 +477,7 @@ impl GlobalHistoryLog { ) { loop { if let Err(e) = self.clean(&table, &meta_key).await { - error!("[HISTORY-TABLES] {} log clean failed {}", table.name, e); + error!("{} log clean failed {}", table.name, e); } sleep(sleep_time).await; } diff --git a/src/query/service/src/history_tables/meta.rs b/src/query/service/src/history_tables/meta.rs index c096a51d4f674..b9f1b5706b589 100644 --- a/src/query/service/src/history_tables/meta.rs +++ b/src/query/service/src/history_tables/meta.rs @@ -122,7 +122,7 @@ impl HeartbeatTask { return Ok(None); } - debug!("[HISTORY-TABLES] Heartbeat key created: {}", &heartbeat_key); + debug!("Heartbeat key created: {}", &heartbeat_key); let (cancel_tx, cancel_rx) = oneshot::channel::<()>(); let loop_fut = HeartbeatTask::heartbeat_loop( @@ -142,10 +142,7 @@ impl HeartbeatTask { async move { let result = loop_fut.await; if let Err(e) = result { - warn!( - "[HISTORY-TABLES] {} loop exited with error: {}", - meta_key, e - ); + warn!("{} loop exited with error: {}", meta_key, e); } exited_clone.store(true, std::sync::atomic::Ordering::SeqCst); }, @@ -212,7 +209,7 @@ impl HeartbeatTask { }; let _resp = meta_client.transaction(txn_req).await?; - debug!("[HISTORY-TABLES] Heartbeat key delete: {}", &heartbeat_key); + debug!("Heartbeat key delete: {}", &heartbeat_key); return Ok(()) } diff --git a/src/query/service/src/history_tables/mod.rs b/src/query/service/src/history_tables/mod.rs index 3eedae10a0103..3fd699288b8d1 100644 --- a/src/query/service/src/history_tables/mod.rs +++ b/src/query/service/src/history_tables/mod.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[HISTORY-TABLES] ...". +databend_common_tracing::register_module_tag!("[HISTORY-TABLES]"); + mod alter_table; pub mod error_handling; mod external; diff --git a/src/query/service/src/interpreters/hook/analyze_hook.rs b/src/query/service/src/interpreters/hook/analyze_hook.rs index 6b7bc013e9d6a..51e31ee13bea4 100644 --- a/src/query/service/src/interpreters/hook/analyze_hook.rs +++ b/src/query/service/src/interpreters/hook/analyze_hook.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[ANALYZE-HOOK] ...". +databend_common_tracing::register_module_tag!("[ANALYZE-HOOK]"); + use std::collections::HashMap; use std::sync::Arc; @@ -42,17 +45,17 @@ pub async fn hook_analyze(ctx: Arc, pipeline: &mut Pipeline, desc: pipeline.set_on_finished(move |info: &ExecutionInfo| { if info.res.is_ok() { - info!("[ANALYZE-HOOK] Pipeline execution completed successfully, starting analyze job"); + info!("Pipeline execution completed successfully, starting analyze job"); if !ctx.get_enable_auto_analyze() { return Ok(()); } match GlobalIORuntime::instance().block_on(do_analyze(ctx, desc)) { Ok(_) => { - info!("[ANALYZE-HOOK] Analyze job completed successfully"); + info!("Analyze job completed successfully"); } Err(e) => { - info!("[ANALYZE-HOOK] Analyze job failed: {:?}", e); + info!("Analyze job failed: {:?}", e); } } } diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index c2ee3101c6cad..7ce94db6dd884 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[COMPACT-HOOK] ...". +databend_common_tracing::register_module_tag!("[COMPACT-HOOK]"); + use std::sync::Arc; use std::time::Instant; @@ -66,10 +69,7 @@ pub async fn hook_compact( ) { let op_name = trace_ctx.operation_name.clone(); if let Err(e) = do_hook_compact(ctx, pipeline, compact_target, trace_ctx, lock_opt).await { - info!( - "[COMPACT-HOOK] Operation {} failed with error (ignored): {}", - op_name, e - ); + info!("Operation {} failed with error (ignored): {}", op_name, e); } } @@ -88,14 +88,21 @@ async fn do_hook_compact( pipeline.set_on_finished(move |info: &ExecutionInfo| { if info.res.is_ok() { let op_name = &trace_ctx.operation_name; - metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64); - info!("[COMPACT-HOOK] Operation {op_name} completed successfully, starting table optimization job."); + metrics_inc_compact_hook_main_operation_time_ms( + op_name, + trace_ctx.start.elapsed().as_millis() as u64, + ); + info!("Operation {op_name} completed successfully, starting table optimization job."); let compact_start_at = Instant::now(); let compaction_limits = match compact_target.mutation_kind { MutationKind::Insert => { - let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table); - info!("[COMPACT-HOOK] Table {} requires compaction of {} blocks", compact_target.table, compaction_num_block_hint); + let compaction_num_block_hint = + ctx.get_compaction_num_block_hint(&compact_target.table); + info!( + "Table {} requires compaction of {} blocks", + compact_target.table, compaction_num_block_hint + ); if compaction_num_block_hint == 0 { return Ok(()); } @@ -105,7 +112,8 @@ async fn do_hook_compact( } } _ => { - let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?; + let auto_compaction_segments_limit = + ctx.get_settings().get_auto_compaction_segments_limit()?; CompactionLimits { segment_limit: Some(auto_compaction_segments_limit as usize), block_limit: None, @@ -119,19 +127,30 @@ async fn do_hook_compact( let scan_progress = ctx.get_scan_progress(); let scan_progress_value = scan_progress.as_ref().get_values(); - match GlobalIORuntime::instance().block_on({ - compact_table(ctx, compact_target, compaction_limits, lock_opt) - }) { + match GlobalIORuntime::instance().block_on(compact_table( + ctx, + compact_target, + compaction_limits, + lock_opt, + )) { Ok(_) => { - info!("[COMPACT-HOOK] Operation {op_name} and table optimization job completed successfully."); + info!("Operation {op_name} and table optimization job completed successfully."); + } + Err(e) => { + info!( + "Operation {op_name} completed but table optimization job failed: {:?}", + e + ); } - Err(e) => { info!("[COMPACT-HOOK] Operation {op_name} completed but table optimization job failed: {:?}", e); } } // reset the progress value write_progress.set(&write_progress_value); scan_progress.set(&scan_progress_value); - metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64); + metrics_inc_compact_hook_compact_time_ms( + &trace_ctx.operation_name, + compact_start_at.elapsed().as_millis() as u64, + ); } Ok(()) diff --git a/src/query/service/src/interpreters/hook/hook.rs b/src/query/service/src/interpreters/hook/hook.rs index 118a8a079a380..e0ef4835dce06 100644 --- a/src/query/service/src/interpreters/hook/hook.rs +++ b/src/query/service/src/interpreters/hook/hook.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[TABLE-HOOK] ...". +databend_common_tracing::register_module_tag!("[TABLE-HOOK]"); + use std::sync::Arc; use std::time::Instant; @@ -79,12 +82,15 @@ impl HookOperator { pub async fn execute_compact(&self, pipeline: &mut Pipeline) { match self.ctx.get_settings().get_enable_compact_after_write() { Ok(false) => { - info!("[TABLE-HOOK] Auto compaction is disabled"); + info!("Auto compaction is disabled"); return; } Err(e) => { // swallow the exception, compaction hook should not prevent the main operation. - warn!("[TABLE-HOOK] Failed to retrieve compaction settings, continuing without compaction: {}", e); + warn!( + "Failed to retrieve compaction settings, continuing without compaction: {}", + e + ); return; } Ok(true) => { diff --git a/src/query/service/src/interpreters/hook/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs index 1d58b4eaf52e1..3fe2eecb39176 100644 --- a/src/query/service/src/interpreters/hook/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[REFRESH-HOOK] ...". +databend_common_tracing::register_module_tag!("[REFRESH-HOOK]"); + use std::sync::Arc; use databend_common_ast::ast; @@ -63,13 +66,13 @@ pub async fn hook_refresh(ctx: Arc, pipeline: &mut Pipeline, desc: pipeline.set_on_finished(move |info: &ExecutionInfo| { if info.res.is_ok() { - info!("[REFRESH-HOOK] Pipeline execution completed successfully, starting refresh job"); + info!("Pipeline execution completed successfully, starting refresh job"); match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc)) { Ok(_) => { - info!("[REFRESH-HOOK] Refresh job completed successfully"); + info!("Refresh job completed successfully"); } Err(e) => { - info!("[REFRESH-HOOK] Refresh job failed: {:?}", e); + info!("Refresh job failed: {:?}", e); } } } diff --git a/src/query/service/src/interpreters/hook/vacuum_hook.rs b/src/query/service/src/interpreters/hook/vacuum_hook.rs index c3eed71fc475e..33a9ad89f9866 100644 --- a/src/query/service/src/interpreters/hook/vacuum_hook.rs +++ b/src/query/service/src/interpreters/hook/vacuum_hook.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[VACUUM-HOOK] ...". +databend_common_tracing::register_module_tag!("[VACUUM-HOOK]"); + use std::collections::HashMap; use std::sync::Arc; @@ -60,10 +63,7 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc) -> Result<()> { return Ok(()); } - log::info!( - "[VACUUM-HOOK] Cleaning temporary files from nodes: {:?}", - node_files - ); + log::info!("Cleaning temporary files from nodes: {:?}", node_files); let nodes = node_files.keys().cloned().collect::>(); let _ = GlobalIORuntime::instance().block_on::<(), ErrorCode, _>(async move { @@ -77,7 +77,7 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc) -> Result<()> { .await; if let Err(cause) = &removed_files { - log::warn!("[VACUUM-HOOK] Failed to clean temporary files: {:?}", cause); + log::warn!("Failed to clean temporary files: {:?}", cause); } Ok(()) @@ -96,10 +96,7 @@ pub fn hook_disk_temp_dir(query_ctx: &Arc) -> Result<()> { .get_spilling_to_disk_vacuum_unknown_temp_dirs_limit()?; let deleted = mgr.drop_disk_spill_dir_unknown(limit)?; if !deleted.is_empty() { - warn!( - "[VACUUM-HOOK] Removed residual temporary directories: {:?}", - deleted - ) + warn!("Removed residual temporary directories: {:?}", deleted) } } diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index f7d76e1a25e76..94c7601529839 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[INTERPRETER] ...". +databend_common_tracing::register_module_tag!("[INTERPRETER]"); + use std::collections::BTreeMap; use std::sync::Arc; use std::time::SystemTime; @@ -112,7 +115,7 @@ pub trait Interpreter: Sync + Send { let make_error = || "failed to execute interpreter"; - ctx.set_status_info("[INTERPRETER] Building execution pipeline"); + ctx.set_status_info("Building execution pipeline"); ctx.check_aborting().with_context(make_error)?; let allow_disk_cache = { @@ -121,7 +124,7 @@ pub trait Interpreter: Sync + Send { Ok(_) => true, Err(e) if !license_key.is_empty() => { let msg = - format!("[INTERPRETER] CRITICAL ALERT: License validation FAILED - enterprise features DISABLED, System may operate in DEGRADED MODE with LIMITED CAPABILITIES and REDUCED PERFORMANCE. Please contact us at https://www.databend.com/contact-us/ or email hi@databend.com to restore full functionality: {}", + format!("CRITICAL ALERT: License validation FAILED - enterprise features DISABLED, System may operate in DEGRADED MODE with LIMITED CAPABILITIES and REDUCED PERFORMANCE. Please contact us at https://www.databend.com/contact-us/ or email hi@databend.com to restore full functionality: {}", e); log::error!("{msg}"); @@ -155,7 +158,7 @@ pub trait Interpreter: Sync + Send { on_execution_finished(info, query_ctx) })); - ctx.set_status_info("[INTERPRETER] Executing pipeline"); + ctx.set_status_info("Executing pipeline"); let settings = ctx.get_settings(); build_res.set_max_threads(settings.get_max_threads()? as usize); @@ -208,7 +211,7 @@ fn log_query_start(ctx: &QueryContext) { } if let Err(error) = InterpreterQueryLog::log_start(ctx, now, None) { - error!("[INTERPRETER] Failed to log query start: {:?}", error) + error!("Failed to log query start: {:?}", error) } } @@ -237,7 +240,7 @@ fn log_query_finished(ctx: &QueryContext, error: Option, has_profiles log::info!(memory:? = ctx.get_node_peek_memory_usage(); "total memory usage"); if let Err(error) = InterpreterQueryLog::log_finish(ctx, now, error, has_profiles) { - error!("[INTERPRETER] Failed to log query finish: {:?}", error) + error!("Failed to log query finish: {:?}", error) } } @@ -280,7 +283,7 @@ pub async fn auto_commit_if_not_allowed_in_transaction( } if !stmt.is_transaction_command() && ctx.txn_mgr().lock().is_fail() { let err = ErrorCode::CurrentTransactionIsAborted( - "[INTERPRETER] Current transaction is aborted, commands ignored until end of transaction block", + "Current transaction is aborted, commands ignored until end of transaction block", ); return Err(err); } diff --git a/src/query/service/src/interpreters/interpreter_select.rs b/src/query/service/src/interpreters/interpreter_select.rs index 75d2a068f1809..3ccbc819d51cf 100644 --- a/src/query/service/src/interpreters/interpreter_select.rs +++ b/src/query/service/src/interpreters/interpreter_select.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[SELECT-INTERP] ...". +databend_common_tracing::register_module_tag!("[SELECT-INTERP]"); + use std::sync::Arc; use databend_common_base::runtime::GlobalIORuntime; @@ -114,8 +117,7 @@ impl SelectInterpreter { #[async_backtrace::framed] pub async fn build_physical_plan(&self) -> Result { let mut builder = PhysicalPlanBuilder::new(self.metadata.clone(), self.ctx.clone(), false); - self.ctx - .set_status_info("[SELECT-INTERP] Building physical plan"); + self.ctx.set_status_info("Building physical plan"); builder .build(&self.s_expr, self.bind_context.column_set()) .await @@ -155,7 +157,7 @@ impl SelectInterpreter { update_table_metas: streams.update_table_metas, ..Default::default() }; - info!("[SELECT-INTERP] Updating stream metadata to consume data"); + info!("Updating stream metadata to consume data"); catalog.update_multi_table_meta(r).await.map(|_| ()) } None => Ok(()), @@ -243,7 +245,7 @@ impl SelectInterpreter { if t.name().eq_ignore_ascii_case("result_scan") { return if tables.len() > 1 { Err(ErrorCode::Unimplemented( - "[SELECT-INTERP] RESULT_SCAN currently supports only single table queries", + "RESULT_SCAN currently supports only single table queries", )) } else { Ok(Some(t.table())) @@ -283,8 +285,7 @@ impl Interpreter for SelectInterpreter { async fn execute2(&self) -> Result { self.attach_tables_to_ctx(); - self.ctx - .set_status_info("[SELECT-INTERP] Preparing execution plan"); + self.ctx.set_status_info("Preparing execution plan"); // 0. Need to build physical plan first to get the partitions. let physical_plan = self.build_physical_plan().await?; @@ -296,7 +297,7 @@ impl Interpreter for SelectInterpreter { .format_pretty()? }; - info!("[SELECT-INTERP] Query physical plan:\n{}", query_plan); + info!("Query physical plan:\n{}", query_plan); if self.ctx.get_settings().get_enable_query_result_cache()? && self.ctx.get_cacheable() @@ -353,7 +354,7 @@ impl Interpreter for SelectInterpreter { } Err(e) => { // 2.3 If an error occurs, turn back to the normal pipeline. - error!("[SELECT-INTERP] Failed to read query result cache: {}", e); + error!("Failed to read query result cache: {}", e); } } } diff --git a/src/query/service/src/pipelines/builders/mod.rs b/src/query/service/src/pipelines/builders/mod.rs index 229465b2ad902..a61b30b0716a6 100644 --- a/src/query/service/src/pipelines/builders/mod.rs +++ b/src/query/service/src/pipelines/builders/mod.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[BUILD-PIPELINES] ...". +databend_common_tracing::register_module_tag!("[BUILD-PIPELINES]"); + mod builder_aggregate; mod builder_append_table; diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 702373648b3bf..46a8d45bc9fcd 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -161,7 +161,7 @@ impl PipelineExecutor { } info!(query_id, elapsed:? = instant.elapsed(); - "[PIPELINE-EXECUTOR] Pipeline initialized successfully", + "Pipeline initialized successfully", ); } Ok(()) @@ -171,10 +171,7 @@ impl PipelineExecutor { pub fn execute(&self) -> Result<()> { let instants = Instant::now(); let _guard = defer(move || { - info!( - "[PIPELINE-EXECUTOR] Execution completed, elapsed: {:?}", - instants.elapsed() - ); + info!("Execution completed, elapsed: {:?}", instants.elapsed()); }); match self { PipelineExecutor::QueryPipelineExecutor(executor) => executor.execute(), @@ -226,9 +223,11 @@ impl PipelineExecutor { let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds)); if let Either::Left(_) = select(max_execute_future, finished_future).await { if let Some(graph) = this_graph.upgrade() { - graph.should_finish(Err(ErrorCode::AbortedQuery( - "[PIPELINE-EXECUTOR] Query aborted due to execution time exceeding maximum limit", - ))).expect("[PIPELINE-EXECUTOR] Failed to send timeout error message"); + graph + .should_finish(Err(ErrorCode::AbortedQuery( + "Query aborted due to execution time exceeding maximum limit", + ))) + .expect("Failed to send timeout error message"); } } }); @@ -245,13 +244,13 @@ impl PipelineExecutor { query_wrapper .graph .should_finish(Err(may_error)) - .expect("[PIPELINE-EXECUTOR] Failed to send error message"); + .expect("Failed to send error message"); } None => { query_wrapper .graph .should_finish::<()>(Ok(())) - .expect("[PIPELINE-EXECUTOR] Failed to send completion message"); + .expect("Failed to send completion message"); } }, } @@ -290,7 +289,7 @@ impl PipelineExecutor { pub fn change_priority(&self, priority: u8) { match self { PipelineExecutor::QueryPipelineExecutor(_) => { - unreachable!("[PIPELINE-EXECUTOR] Logic error: cannot change priority for QueryPipelineExecutor") + unreachable!("Logic error: cannot change priority for QueryPipelineExecutor") } PipelineExecutor::QueriesPipelineExecutor(query_wrapper) => { query_wrapper.graph.change_priority(priority as u64); diff --git a/src/query/service/src/pipelines/executor/processor_async_task.rs b/src/query/service/src/pipelines/executor/processor_async_task.rs index 5a33318b1504d..1ee3b47fdac31 100644 --- a/src/query/service/src/pipelines/executor/processor_async_task.rs +++ b/src/query/service/src/pipelines/executor/processor_async_task.rs @@ -116,7 +116,7 @@ impl ProcessorAsyncTask { match futures::future::select(left, right).await { Either::Left((res, _)) => res, Either::Right((_, _)) => Err(ErrorCode::AbortedQuery( - "[PROCESSOR-ASYNC-TASK] Query aborted due to server shutdown or query termination", + "Query aborted due to server shutdown or query termination", )), } }; @@ -136,7 +136,7 @@ impl ProcessorAsyncTask { let elapsed = start.elapsed(); let active_workers = queue_clone.active_workers(); warn!( - "[PROCESSOR-ASYNC-TASK] Slow async task detected - query: {:?}, processor: {:?} ({}), elapsed: {:?}, active workers: {:?}", + "Slow async task detected - query: {:?}, processor: {:?} ({}), elapsed: {:?}, active workers: {:?}", query_id, processor_id, processor_name, elapsed, active_workers ); } diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index c9e58fec2aefe..940bd20e9ee93 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[PIPELINE-EXECUTOR] ...". +databend_common_tracing::register_module_tag!("[PIPELINE-EXECUTOR]"); + use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; @@ -75,9 +78,7 @@ impl QueryPipelineExecutor { let threads_num = pipeline.get_max_threads(); if threads_num.is_zero() { - return Err(ErrorCode::Internal( - "[PIPELINE-EXECUTOR] Pipeline max threads cannot be zero", - )); + return Err(ErrorCode::Internal("Pipeline max threads cannot be zero")); } let on_init_callback = pipeline.take_on_init(); @@ -108,9 +109,7 @@ impl QueryPipelineExecutor { settings: ExecutorSettings, ) -> Result> { if pipelines.is_empty() { - return Err(ErrorCode::Internal( - "[PIPELINE-EXECUTOR] Executor pipelines cannot be empty", - )); + return Err(ErrorCode::Internal("Executor pipelines cannot be empty")); } let threads_num = pipelines @@ -120,9 +119,7 @@ impl QueryPipelineExecutor { .unwrap_or(0); if threads_num.is_zero() { - return Err(ErrorCode::Internal( - "[PIPELINE-EXECUTOR] Pipeline max threads cannot be zero", - )); + return Err(ErrorCode::Internal("Pipeline max threads cannot be zero")); } let on_init_callback = { @@ -204,8 +201,7 @@ impl QueryPipelineExecutor { #[fastrace::trace(name = "QueryPipelineExecutor::finish")] pub fn finish(&self, cause: Option>) { - let cause = - cause.map(|err| err.with_context("[PIPELINE-EXECUTOR] Pipeline executor finished")); + let cause = cause.map(|err| err.with_context("Pipeline executor finished")); let mut finished_error = self.finished_error.lock(); if let Some(cause) = cause { @@ -290,7 +286,7 @@ impl QueryPipelineExecutor { } info!(query_id = self.settings.query_id, elapsed:? = instant.elapsed(); - "[PIPELINE-EXECUTOR] Pipeline initialized successfully", + "Pipeline initialized successfully", ); } @@ -331,7 +327,7 @@ impl QueryPipelineExecutor { if let Either::Left(_) = select(max_execute_future, finished_future).await { if let Some(executor) = this.upgrade() { executor.finish(Some(ErrorCode::AbortedQuery( - "[PIPELINE-EXECUTOR] Query aborted due to execution time exceeding maximum limit", + "Query aborted due to execution time exceeding maximum limit", ))); } } @@ -467,9 +463,7 @@ impl Drop for QueryPipelineExecutor { let cause = match self.finished_error.lock().as_ref() { Some(cause) => cause.clone(), - None => ErrorCode::Internal( - "[PIPELINE-EXECUTOR] Pipeline illegal state: not successfully shutdown", - ), + None => ErrorCode::Internal("Pipeline illegal state: not successfully shutdown"), }; let mut on_finished_chain = self.on_finished_chain.lock(); @@ -480,7 +474,7 @@ impl Drop for QueryPipelineExecutor { let profiling = self.fetch_plans_profile(true); let info = ExecutionInfo::create(Err(cause), profiling); if let Err(cause) = on_finished_chain.apply(info) { - warn!("[PIPELINE-EXECUTOR] Shutdown failure: {:?}", cause); + warn!("Shutdown failure: {:?}", cause); } } } diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index f1d74da1383a3..50f5e2fc32d4e 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[TRANSFORMS] ...". +databend_common_tracing::register_module_tag!("[TRANSFORMS]"); + pub mod aggregator; mod broadcast; mod hash_join; diff --git a/src/query/service/src/servers/http/middleware/session.rs b/src/query/service/src/servers/http/middleware/session.rs index e4117de3f6c7c..584646cafff6c 100644 --- a/src/query/service/src/servers/http/middleware/session.rs +++ b/src/query/service/src/servers/http/middleware/session.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[HTTP-SESSION] ...". +databend_common_tracing::register_module_tag!("[HTTP-SESSION]"); + use std::collections::HashMap; use std::sync::Arc; @@ -133,7 +136,7 @@ impl EndpointKind { } } EndpointKind::Login | EndpointKind::Clickhouse => Err(ErrorCode::AuthenticateFailure( - format!("[HTTP-SESSION] Invalid token usage: databend token cannot be used for {self:?}",), + format!("Invalid token usage: databend token cannot be used for {self:?}",), )), } } @@ -198,7 +201,7 @@ fn get_credential( let std_auth_headers: Vec<_> = req.headers().get_all(AUTHORIZATION).iter().collect(); if std_auth_headers.len() > 1 { let msg = &format!( - "[HTTP-SESSION] Authentication error: multiple {} headers detected", + "Authentication error: multiple {} headers detected", AUTHORIZATION ); return Err(ErrorCode::AuthenticateFailure(msg)); @@ -209,7 +212,7 @@ fn get_credential( get_clickhouse_name_password(req, client_ip) } else { Err(ErrorCode::AuthenticateFailure( - "[HTTP-SESSION] Authentication error: no authorization header provided", + "Authentication error: no authorization header provided", )) } } else { @@ -264,7 +267,7 @@ fn get_credential_from_header( Ok(c) } None => Err(ErrorCode::AuthenticateFailure( - "[HTTP-SESSION] Authentication error: invalid Basic auth header format", + "Authentication error: invalid Basic auth header format", )), } } else if value.as_bytes().starts_with(b"Bearer ") { @@ -274,7 +277,9 @@ fn get_credential_from_header( if SessionClaim::is_databend_token(&token) { if let Some(t) = endpoint_kind.require_databend_token_type()? { if t != SessionClaim::get_type(&token)? { - return Err(ErrorCode::AuthenticateFailure("[HTTP-SESSION] Authentication error: incorrect token type for this endpoint")); + return Err(ErrorCode::AuthenticateFailure( + "Authentication error: incorrect token type for this endpoint", + )); } } Ok(Credential::DatabendToken { token }) @@ -283,12 +288,12 @@ fn get_credential_from_header( } } None => Err(ErrorCode::AuthenticateFailure( - "[HTTP-SESSION] Authentication error: invalid Bearer auth header format", + "Authentication error: invalid Bearer auth header format", )), } } else { Err(ErrorCode::AuthenticateFailure( - "[HTTP-SESSION] Authentication error: unsupported authorization header format", + "Authentication error: unsupported authorization header format", )) } } @@ -309,10 +314,7 @@ fn get_clickhouse_name_password(req: &Request, client_ip: Option) -> Res let query_str = req.uri().query().unwrap_or_default(); let query_params = serde_urlencoded::from_str::>(query_str) .map_err(|e| { - ErrorCode::BadArguments(format!( - "[HTTP-SESSION] Failed to parse query parameters: {}", - e - )) + ErrorCode::BadArguments(format!("Failed to parse query parameters: {}", e)) })?; let (user, key) = (query_params.get("user"), query_params.get("password")); if let (Some(name), Some(password)) = (user, key) { @@ -323,7 +325,7 @@ fn get_clickhouse_name_password(req: &Request, client_ip: Option) -> Res }) } else { Err(ErrorCode::AuthenticateFailure( - "[HTTP-SESSION] Authentication error: no credentials found in headers or query parameters", + "Authentication error: no credentials found in headers or query parameters", )) } } @@ -428,7 +430,7 @@ impl HTTPSessionEndpoint { }; if client_session.is_none() && !matches!(self.endpoint_kind, EndpointKind::PollQuery) { info!( - "[HTTP-SESSION] got request without session, url={}, headers={:?}", + "got request without session, url={}, headers={:?}", req.uri(), &req.headers() ); @@ -437,7 +439,7 @@ impl HTTPSessionEndpoint { if let (Some(id1), Some(c)) = (&authed_client_session_id, &client_session) { if *id1 != c.header.id { return Err(ErrorCode::AuthenticateFailure(format!( - "[HTTP-SESSION] Session ID mismatch: token session ID '{}' does not match header session ID '{}'", + "Session ID mismatch: token session ID '{}' does not match header session ID '{}'", id1, c.header.id ))); } @@ -602,7 +604,7 @@ impl Endpoint for HTTPSessionEndpoint { .map_err(HttpErrorCode::server_error)? { log::info!( - "[HTTP-SESSION] forwarding /v1{} from {local_id} to {sticky_node_id}", + "forwarding /v1{} from {local_id} to {sticky_node_id}", req.uri() ); forward_request(req, node).await @@ -611,7 +613,7 @@ impl Endpoint for HTTPSessionEndpoint { "Sticky session state lost: node '{sticky_node_id}' not found in cluster, request={}", get_request_info(req) ); - warn!("[HTTP-SESSION] {}", msg); + warn!("{}", msg); Err(Error::from(HttpErrorCode::bad_request( ErrorCode::BadArguments(msg), ))) @@ -644,7 +646,7 @@ impl Endpoint for HTTPSessionEndpoint { } Ok(None) => { let msg = format!("Not find the '{}' warehouse; it is possible that all nodes of the warehouse have gone offline. Please exit the client and reconnect, or use `use warehouse `. request = {}.", warehouse, get_request_info(req)); - warn!("[HTTP-SESSION] {}", msg); + warn!("{}", msg); return Err(Error::from(HttpErrorCode::bad_request( ErrorCode::UnknownWarehouse(msg), ))); @@ -665,9 +667,7 @@ impl Endpoint for HTTPSessionEndpoint { } } - log::warn!( - "[HTTP-SESSION] Ignoring warehouse header: {HEADER_WAREHOUSE}={warehouse:?}" - ); + log::warn!("Ignoring warehouse header: {HEADER_WAREHOUSE}={warehouse:?}"); } }; @@ -709,13 +709,13 @@ impl Endpoint for HTTPSessionEndpoint { let err = HttpErrorCode::error_code(err); if err.status() == StatusCode::UNAUTHORIZED { warn!( - "[HTTP-SESSION] Authentication failure: {method} {uri}, headers={:?}, error={}", + "Authentication failure: {method} {uri}, headers={:?}, error={}", sanitize_request_headers(&headers), err ); } else { error!( - "[HTTP-SESSION] Request error: {method} {uri}, headers={:?}, error={}", + "Request error: {method} {uri}, headers={:?}, error={}", sanitize_request_headers(&headers), err ); diff --git a/src/query/service/src/servers/http/middleware/session_header.rs b/src/query/service/src/servers/http/middleware/session_header.rs index 1c35a2f07d9b2..e1198c818a703 100644 --- a/src/query/service/src/servers/http/middleware/session_header.rs +++ b/src/query/service/src/servers/http/middleware/session_header.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[HTTP-SESSION] ...". +databend_common_tracing::register_module_tag!("[HTTP-SESSION]"); + use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -113,7 +116,7 @@ impl ClientSession { fn new_session(typ: ClientSessionType) -> Self { let id = Uuid::now_v7().to_string(); - info!("[HTTP-SESSION] Created new session with ID: {}", id); + info!("Created new session with ID: {}", id); ClientSession { header: ClientSessionHeader { id, @@ -229,7 +232,7 @@ impl ClientSession { } Err(err) => { warn!( - "[HTTP-SESSION] Invalid last_refresh_time: detected clock drift or incorrect timestamp, difference: {:?}", + "Invalid last_refresh_time: detected clock drift or incorrect timestamp, difference: {:?}", err.duration() ); } diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index b19ce223b24bf..27254059956f0 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -376,7 +376,7 @@ async fn query_final_handler( let _t = SlowRequestLogTracker::new(ctx); async { info!( - "[HTTP-QUERY] Query {} received final request at {}, completing query execution", + "Query {} received final request at {}, completing query execution", query_id, make_final_uri(&query_id) ); @@ -422,7 +422,7 @@ async fn query_cancel_handler( let _t = SlowRequestLogTracker::new(ctx); async { info!( - "[HTTP-QUERY] Query {} received cancel request at {}, terminating execution", + "Query {} received cancel request at {}, terminating execution", query_id, make_kill_uri(&query_id) ); @@ -496,7 +496,7 @@ async fn query_page_handler( if query.user_name != ctx.user_name { return Err(poem::error::Error::from_string( format!( - "[HTTP-QUERY] Authentication error: query {} expected user {}, but got {}", + "Authentication error: query {} expected user {}, but got {}", query_id, query.user_name, ctx.user_name ), StatusCode::UNAUTHORIZED, @@ -506,7 +506,7 @@ async fn query_page_handler( query.check_client_session_id(&ctx.client_session_id)?; if let Some(st) = query.check_closed() { info!( - "[HTTP-QUERY] /query/{}/page/{} - query is close (reason: {:?})", + "/query/{}/page/{} - query is close (reason: {:?})", query_id, page_no, st ); Err(query_id_closed(&query_id, st.reason)) @@ -514,15 +514,12 @@ async fn query_page_handler( query.update_expire_time(true, false).await; let resp = query.get_response_page(page_no).await.map_err(|err| { info!( - "[HTTP-QUERY] /query/{}/page/{} - get response page error (reason: {})", + "/query/{}/page/{} - get response page error (reason: {})", query_id, page_no, err.message() ); - poem::Error::from_string( - format!("[HTTP-QUERY] {}", err.message()), - StatusCode::NOT_FOUND, - ) + poem::Error::from_string(format!("{}", err.message()), StatusCode::NOT_FOUND) })?; query .update_expire_time(false, resp.is_data_drained()) @@ -575,7 +572,7 @@ pub(crate) async fn query_handler( .map(|s| format!("(client_session_id={s})")) .unwrap_or("".to_string()); info!( - "[HTTP-QUERY] New query request{}{}: {}", + "New query request{}{}: {}", agent_info, client_session_id_info, mask_connection_info(&format!("{:?}", req)) @@ -585,14 +582,14 @@ pub(crate) async fn query_handler( match HttpQuery::try_create(ctx, req.clone()).await { Err(err) => { let err = err.display_with_sql(&sql); - error!("[HTTP-QUERY] Failed to start SQL query, error: {:?}", err); + error!("Failed to start SQL query, error: {:?}", err); ctx.set_fail(); Ok(req.fail_to_start_sql(err).into_response()) } Ok(mut query) => { if let Err(err) = query.start_query(sql.clone()).await { let err = err.display_with_sql(&sql); - error!("[HTTP-QUERY] Failed to start SQL query, error: {:?}", err); + error!("Failed to start SQL query, error: {:?}", err); ctx.set_fail(); return Ok(req.fail_to_start_sql(err).into_response()); } @@ -606,7 +603,7 @@ pub(crate) async fn query_handler( .map_err(|err| err.display_with_sql(&sql)) .map_err(|err| { poem::Error::from_string( - format!("[HTTP-QUERY] {}", err.message()), + format!("{}", err.message()), StatusCode::NOT_FOUND, ) })?; @@ -619,7 +616,7 @@ pub(crate) async fn query_handler( None => (0, None), Some(p) => (p.page.data.num_rows(), p.next_page_no), }; - info!("[HTTP-QUERY] Initial response for query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'", + info!("Initial response for query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'", &query.id, &resp.state, rows, next_page, mask_connection_info(&sql) ); query @@ -767,12 +764,12 @@ pub async fn heartbeat_handler( .unwrap(), ) } else { - warn!("[HTTP-QUERY] Heartbeat forward failed: {:?}", resp); + warn!("Heartbeat forward failed: {:?}", resp); None } } Err(e) => { - warn!("[HTTP-QUERY] Heartbeat forward error: {:?}", e); + warn!("Heartbeat forward error: {:?}", e); None } } @@ -920,17 +917,14 @@ pub fn query_route() -> Route { fn query_id_closed(query_id: &str, closed_reason: CloseReason) -> PoemError { PoemError::from_string( - format!( - "[HTTP-QUERY] Query {query_id} is closed for {}", - closed_reason - ), + format!("Query {query_id} is closed for {}", closed_reason), StatusCode::BAD_REQUEST, ) } fn query_id_not_found(query_id: &str, node_id: &str) -> PoemError { PoemError::from_string( - format!("[HTTP-QUERY] Query ID {query_id} not found on node {node_id}"), + format!("Query ID {query_id} not found on node {node_id}"), StatusCode::NOT_FOUND, ) } @@ -965,7 +959,7 @@ impl Drop for SlowRequestLogTracker { let elapsed = self.started_at.elapsed(); if elapsed.as_secs_f64() > 60.0 { warn!( - "[HTTP-QUERY] Slow request detected on {} {}, elapsed time: {:.2}s", + "Slow request detected on {} {}, elapsed time: {:.2}s", self.method, self.uri, elapsed.as_secs_f64() @@ -990,7 +984,7 @@ pub(crate) fn get_http_tracing_span( .with_properties(|| ctx.to_fastrace_properties()); } None => { - warn!("[HTTP-QUERY] Failed to decode trace parent: {}", trace); + warn!("Failed to decode trace parent: {}", trace); } } } diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index 4f7674b1b4134..fd0b1d3b13b0f 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -298,7 +298,7 @@ impl Executor { Starting(s) => { info!( query_id = guard.query_id, reason:? = reason; - "[HTTP-QUERY] Query state transitioning from Starting to Stopped" + "Query state transitioning from Starting to Stopped" ); s.ctx.get_abort_notify().notify_waiters(); @@ -309,7 +309,7 @@ impl Executor { Some(e.clone()), false, ) - .unwrap_or_else(|e| error!("[HTTP-QUERY] Failed to write query_log: {:?}", e)); + .unwrap_or_else(|e| error!("Failed to write query_log: {:?}", e)); } if let Err(e) = &reason { if e.code() != ErrorCode::CLOSED_QUERY { @@ -330,7 +330,7 @@ impl Executor { } Running(r) => { info!( - "[HTTP-QUERY] Query {} state transitioning from Running to Stopped, reason: {:?}", + "Query {} state transitioning from Running to Stopped, reason: {:?}", &guard.query_id, reason, ); if let Err(e) = &reason { @@ -353,14 +353,14 @@ impl Executor { } Stopped(s) => { debug!( - "[HTTP-QUERY] Query {} already in Stopped state, original reason: {:?}, new reason: {:?}", + "Query {} already in Stopped state, original reason: {:?}, new reason: {:?}", &guard.query_id, s.reason, reason ); return; } }; info!( - "[HTTP-QUERY] Query {} state changed to Stopped, reason: {:?}", + "Query {} state changed to Stopped, reason: {:?}", &guard.query_id, reason ); guard.state = Stopped(Box::new(state)); @@ -379,7 +379,7 @@ impl ExecuteState { ) -> Result<(), ExecutionError> { let make_error = || format!("failed to start query: {sql}"); - info!("[HTTP-QUERY] Preparing to plan SQL query"); + info!("Preparing to plan SQL query"); // Use interpreter_plan_sql, we can write the query log if an error occurs. let (plan, _, queue_guard) = interpreter_plan_sql(ctx.clone(), &sql, true) @@ -420,7 +420,7 @@ impl ExecuteState { result_format_settings, has_result_set, }; - info!("[HTTP-QUERY] Query state changed to Running"); + info!("Query state changed to Running"); Executor::start_to_running(&executor, Running(running_state)); let executor_clone = executor.clone(); @@ -481,7 +481,7 @@ impl ExecuteState { if is_dynamic_schema { if let Some(schema) = interpreter.get_dynamic_schema().await { info!( - "[HTTP-QUERY] Dynamic schema detected, updating schema to have {} fields", + "Dynamic schema detected, updating schema to have {} fields", schema.fields().len() ); Executor::update_schema(&executor, schema); diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 0359669786f1c..480398ada9698 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -395,7 +395,7 @@ impl HttpSessionConf { .set_setting(k.to_string(), v.to_string()) .or_else(|e| { if e.code() == ErrorCode::UNKNOWN_VARIABLE { - warn!("[HTTP-QUERY] Unknown session setting ignored: {}", k); + warn!("Unknown session setting ignored: {}", k); Ok(()) } else { Err(e) @@ -412,7 +412,7 @@ impl HttpSessionConf { let state = *last_query.client_state.lock(); if !matches!(state, ClientState::Closed(_)) { warn!( - "[HTTP-QUERY] Last query id not finished yet, id = {}, state = {:?}", + "Last query id not finished yet, id = {}, state = {:?}", id, state ); } @@ -594,19 +594,20 @@ fn try_set_txn( http_query_manager.check_sticky_for_txn(&internal_state.last_node_id)?; let last_query_id = internal_state.last_query_ids.first().ok_or_else(|| { ErrorCode::InvalidSessionState( - "[HTTP-QUERY] Invalid transaction state: transaction is active but last_query_ids is empty".to_string(), + "Invalid transaction state: transaction is active but last_query_ids is empty" + .to_string(), ) })?; if let Some(txn_mgr) = http_query_manager.get_txn(last_query_id) { session.set_txn_mgr(txn_mgr); info!( - "[HTTP-QUERY] Query {} continuing transaction from previous query {}", + "Query {} continuing transaction from previous query {}", query_id, last_query_id ); } else { // the returned TxnState should be Fail return Err(ErrorCode::TransactionTimeout(format!( - "[HTTP-QUERY] Transaction timeout: last_query_id {} not found on this server", + "Transaction timeout: last_query_id {} not found on this server", last_query_id ))); } @@ -632,7 +633,7 @@ impl HttpQuery { let client_session_id = http_ctx.client_session_id.as_deref().unwrap_or("None"); let sql = &req.sql; let node_id = &GlobalConfig::instance().query.node_id; - info!(query_id = query_id, session_id = client_session_id, node_id = node_id, sql = sql; "[HTTP-QUERY] Creating new query"); + info!(query_id = query_id, session_id = client_session_id, node_id = node_id, sql = sql; "Creating new query"); // Stage attachment is used to carry the data payload to the INSERT/REPLACE statements. // When stage attachment is specified, the query may look like `INSERT INTO mytbl VALUES;`, @@ -818,7 +819,7 @@ impl HttpQuery { let state = &mut self.execute_state.lock().state; let ExecuteState::Starting(state) = state else { return Err(ErrorCode::Internal( - "[HTTP-QUERY] Invalid query state: expected Starting state", + "Invalid query state: expected Starting state", )); }; @@ -860,10 +861,7 @@ impl HttpQuery { warnings: query_context.pop_warnings(), }; - error!( - "[HTTP-QUERY] Query state changed to Stopped, failed to start: {:?}", - e - ); + error!("Query state changed to Stopped, failed to start: {:?}", e); Executor::start_to_stop(&query_state, ExecuteState::Stopped(Box::new(state))); block_sender_closer.abort(); } @@ -975,7 +973,7 @@ impl HttpQuery { if *id != self.client_session_id { return Err(poem::error::Error::from_string( format!( - "[HTTP-QUERY] Authentication error: wrong client_session_id, expected {:?}, got {id:?}", + "Authentication error: wrong client_session_id, expected {:?}, got {id:?}", &self.client_session_id ), StatusCode::UNAUTHORIZED, diff --git a/src/query/service/src/servers/http/v1/query/http_query_context.rs b/src/query/service/src/servers/http/v1/query/http_query_context.rs index 964597a6400cd..35c066ad31041 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_context.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_context.rs @@ -106,7 +106,7 @@ impl HttpQueryContext { let start_time = manager.server_info.start_time.clone(); let uptime = (Instant::now() - manager.start_instant).as_secs_f32(); let msg = format!( - "[HTTP-QUERY] Routing error: query {query_id} should be on server {expected_node_id}, but current server is {}, which started at {start_time} ({uptime} secs ago)", + "Routing error: query {query_id} should be on server {expected_node_id}, but current server is {}, which started at {start_time} ({uptime} secs ago)", self.node_id ); warn!("{}", msg); @@ -171,9 +171,9 @@ impl HttpQueryContext { http_session_conf: &Option, session_type: SessionType, ) -> databend_common_exception::Result<(Arc, Arc)> { - let session = self.upgrade_session(session_type).map_err(|err| { - ErrorCode::Internal(format!("[HTTP-QUERY] Failed to upgrade session: {err}")) - })?; + let session = self + .upgrade_session(session_type) + .map_err(|err| ErrorCode::Internal(format!("Failed to upgrade session: {err}")))?; if let Some(cid) = session.get_client_session_id() { ClientSessionManager::instance().on_query_start(&cid, &self.user_name, &session); diff --git a/src/query/service/src/servers/http/v1/query/http_query_manager.rs b/src/query/service/src/servers/http/v1/query/http_query_manager.rs index 1ae1b37c38f91..f5c1529c1fb1c 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_manager.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_manager.rs @@ -94,7 +94,7 @@ impl Queries { if let Some(st) = closed_state { self.last_query_end_at = Some(now); self.num_active_queries = self.num_active_queries.saturating_sub(1); - log::info!("[HTTP-QUERY] Query {query_id} closed: {st:?}"); + log::info!("Query {query_id} closed: {st:?}"); } return Ok((Some(q), closed_state)); } @@ -176,7 +176,7 @@ impl HttpQueryManager { TimeoutResult::Remove => { let mut queries = self_clone.queries.write(); queries.remove(&query_id_clone); - log::info!("[HTTP-QUERY] Query {query_id_clone} removed"); + log::info!("Query {query_id_clone} removed"); break; } } @@ -227,7 +227,7 @@ impl HttpQueryManager { sleep(Duration::from_secs(timeout_secs)).await; if self_clone.get_txn(&last_query_id_clone).is_some() { log::info!( - "[HTTP-QUERY] Transaction timed out after {} seconds, last_query_id = {}", + "Transaction timed out after {} seconds, last_query_id = {}", timeout_secs, last_query_id_clone ); @@ -252,13 +252,13 @@ impl HttpQueryManager { if let Some(id) = last_node_id { if self.server_info.id != *id { return Err(ErrorCode::SessionLost(format!( - "[HTTP-QUERY] Transaction aborted because server restart or route error: expecting server {}, current one is {} started at {} ", + "Transaction aborted because server restart or route error: expecting server {}, current one is {} started at {} ", id, self.server_info.id, self.server_info.start_time ))); } } else { return Err(ErrorCode::InvalidSessionState( - "[HTTP-QUERY] Transaction is active but missing server_info".to_string(), + "Transaction is active but missing server_info".to_string(), )); } Ok(()) diff --git a/src/query/service/src/servers/http/v1/query/mod.rs b/src/query/service/src/servers/http/v1/query/mod.rs index 358d8ad20ddb0..847aa70947447 100644 --- a/src/query/service/src/servers/http/v1/query/mod.rs +++ b/src/query/service/src/servers/http/v1/query/mod.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[HTTP-QUERY] ...". +databend_common_tracing::register_module_tag!("[HTTP-QUERY]"); + pub mod blocks_serializer; pub mod execute_state; pub mod expirable; diff --git a/src/query/service/src/servers/http/v1/query/page_manager.rs b/src/query/service/src/servers/http/v1/query/page_manager.rs index 6164db6e309d7..a5eac36d5ac60 100644 --- a/src/query/service/src/servers/http/v1/query/page_manager.rs +++ b/src/query/service/src/servers/http/v1/query/page_manager.rs @@ -121,9 +121,7 @@ impl PageManager { Ok(self .last_page .as_ref() - .ok_or_else(|| { - ErrorCode::Internal("[HTTP-QUERY] Failed to retrieve last page: page is None") - })? + .ok_or_else(|| ErrorCode::Internal("Failed to retrieve last page: page is None"))? .clone()) } else { let message = format!( diff --git a/src/query/service/src/servers/http/v1/query/sized_spsc.rs b/src/query/service/src/servers/http/v1/query/sized_spsc.rs index 1f695116b5b64..f5c1911940185 100644 --- a/src/query/service/src/servers/http/v1/query/sized_spsc.rs +++ b/src/query/service/src/servers/http/v1/query/sized_spsc.rs @@ -375,11 +375,11 @@ where S: DataBlockSpill match tokio::time::timeout_at((*t).into(), self.chan.recv()).await { Ok(true) => self.try_take_page().await?, Ok(false) => { - info!("[HTTP-QUERY] Reached end of data blocks"); + info!("Reached end of data blocks"); return Ok((BlocksSerializer::empty(), true)); } Err(_) => { - debug!("[HTTP-QUERY] Long polling timeout reached"); + debug!("Long polling timeout reached"); return Ok((BlocksSerializer::empty(), self.chan.is_close())); } } diff --git a/src/query/service/src/servers/http/v1/session/client_session_manager.rs b/src/query/service/src/servers/http/v1/session/client_session_manager.rs index 87f5614b2cb61..8021ec31f332f 100644 --- a/src/query/service/src/servers/http/v1/session/client_session_manager.rs +++ b/src/query/service/src/servers/http/v1/session/client_session_manager.rs @@ -177,7 +177,7 @@ impl ClientSessionManager { if self.refresh_in_memory_states(sid, user_name) { self.refresh_session_handle(tenant, user_name.to_string(), sid) .await?; - info!("[HTTP-SESSION] refreshing session {}", sid); + info!("refreshing session {}", sid); } Ok(()) } @@ -298,10 +298,10 @@ impl ClientSessionManager { if SystemTime::now() > claim.expire_at() + TTL_GRACE_PERIOD_QUERY { return match token_type { TokenType::Refresh => Err(ErrorCode::RefreshTokenExpired( - "[HTTP-SESSION] Authentication failed: refresh token has expired", + "Authentication failed: refresh token has expired", )), TokenType::Session => Err(ErrorCode::SessionTokenExpired( - "[HTTP-SESSION] Authentication failed: session token has expired", + "Authentication failed: session token has expired", )), }; } @@ -324,12 +324,12 @@ impl ClientSessionManager { } _ => { return match token_type { - TokenType::Refresh => { - Err(ErrorCode::RefreshTokenNotFound("[HTTP-SESSION] Authentication failed: refresh token not found in database")) - } - TokenType::Session => { - Err(ErrorCode::SessionTokenNotFound("[HTTP-SESSION] Authentication failed: session token not found in database")) - } + TokenType::Refresh => Err(ErrorCode::RefreshTokenNotFound( + "Authentication failed: refresh token not found in database", + )), + TokenType::Session => Err(ErrorCode::SessionTokenNotFound( + "Authentication failed: session token not found in database", + )), }; } }; diff --git a/src/query/service/src/servers/http/v1/session/login_handler.rs b/src/query/service/src/servers/http/v1/session/login_handler.rs index 5855b3619c31f..dcc876a7f274f 100644 --- a/src/query/service/src/servers/http/v1/session/login_handler.rs +++ b/src/query/service/src/servers/http/v1/session/login_handler.rs @@ -128,8 +128,6 @@ pub async fn login_handler( }), })) } - _ => unreachable!( - "[HTTP-SESSION] /session/login endpoint requires password or JWT authentication" - ), + _ => unreachable!("/session/login endpoint requires password or JWT authentication"), } } diff --git a/src/query/service/src/servers/http/v1/session/logout_handler.rs b/src/query/service/src/servers/http/v1/session/logout_handler.rs index 73975c474a2b2..d163fc5de675d 100644 --- a/src/query/service/src/servers/http/v1/session/logout_handler.rs +++ b/src/query/service/src/servers/http/v1/session/logout_handler.rs @@ -34,7 +34,7 @@ pub struct LogoutResponse { pub async fn logout_handler(ctx: &HttpQueryContext) -> PoemResult { if let Some(id) = &ctx.client_session_id { info!( - "[HTTP-SESSION] Logout request: user={}, client_session_id={}, credential_type={:?}", + "Logout request: user={}, client_session_id={}, credential_type={:?}", ctx.user_name, id, ctx.credential.type_name() diff --git a/src/query/service/src/servers/http/v1/session/mod.rs b/src/query/service/src/servers/http/v1/session/mod.rs index 6fc2193c45e8e..89c42a74858a7 100644 --- a/src/query/service/src/servers/http/v1/session/mod.rs +++ b/src/query/service/src/servers/http/v1/session/mod.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[HTTP-SESSION] ...". +databend_common_tracing::register_module_tag!("[HTTP-SESSION]"); + mod client_session_manager; mod consts; pub mod login_handler; diff --git a/src/query/service/src/servers/http/v1/session/refresh_handler.rs b/src/query/service/src/servers/http/v1/session/refresh_handler.rs index 16f05bbffe5f3..1997c56b697ef 100644 --- a/src/query/service/src/servers/http/v1/session/refresh_handler.rs +++ b/src/query/service/src/servers/http/v1/session/refresh_handler.rs @@ -44,7 +44,7 @@ pub async fn refresh_handler( let client_session_id = ctx .client_session_id .as_ref() - .expect("[HTTP-SESSION] Refresh handler requires session ID in context") + .expect("Refresh handler requires session ID in context") .clone(); let mgr = ClientSessionManager::instance(); match &ctx.credential { @@ -69,7 +69,9 @@ pub async fn refresh_handler( })) } _ => { - unreachable!("[HTTP-SESSION] /v1/session/refresh endpoint requires authentication with databend refresh token") + unreachable!( + "/v1/session/refresh endpoint requires authentication with databend refresh token" + ) } } } diff --git a/src/query/service/src/servers/http/v1/session/token.rs b/src/query/service/src/servers/http/v1/session/token.rs index 73e433220e4b2..d757f6c425262 100644 --- a/src/query/service/src/servers/http/v1/session/token.rs +++ b/src/query/service/src/servers/http/v1/session/token.rs @@ -83,7 +83,7 @@ impl SessionClaim { pub fn decode(token: &str) -> Result<(Self, TokenType)> { let fmt_err = |reason: String| { ErrorCode::AuthenticateFailure(format!( - "[HTTP-SESSION] Failed to decode token: {reason}, token: {token}" + "Failed to decode token: {reason}, token: {token}" )) }; let t = Self::get_type(token)?; diff --git a/src/query/service/src/servers/http/v1/streaming_load.rs b/src/query/service/src/servers/http/v1/streaming_load.rs index ca774e703d7ad..fdd649dff3456 100644 --- a/src/query/service/src/servers/http/v1/streaming_load.rs +++ b/src/query/service/src/servers/http/v1/streaming_load.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[HTTP-STREAMING-LOAD] ...". +databend_common_tracing::register_module_tag!("[HTTP-STREAMING-LOAD]"); + use std::future::Future; use std::sync::Arc; @@ -115,13 +118,15 @@ pub async fn streaming_load_handler( let s = v.to_str().unwrap().to_string(); match serde_json::from_str(s.trim()) { Ok(s) => Some(s), - Err(e) => return poem::Error::from_string( - format!( - "[HTTP-STREAMING-LOAD] invalid value for header {HEADER_QUERY_CONTEXT}({s}) in request: {e}" - ), - StatusCode::BAD_REQUEST, - ) - .into_response(), + Err(e) => { + return poem::Error::from_string( + format!( + "invalid value for header {HEADER_QUERY_CONTEXT}({s}) in request: {e}" + ), + StatusCode::BAD_REQUEST, + ) + .into_response() + } } } None => None, @@ -179,7 +184,7 @@ async fn streaming_load_handler_inner( session_conf: &Option, ) -> PoemResult> { info!( - "[HTTP-STREAMING-LOAD] New streaming load request, headers={:?}", + "New streaming load request, headers={:?}", sanitize_request_headers(req.headers()), ); let (_, query_context) = http_context @@ -191,15 +196,12 @@ async fn streaming_load_handler_inner( .headers() .get(HEADER_SQL) .ok_or(poem::Error::from_string( - format!("[HTTP-STREAMING-LOAD] Missing required header {HEADER_SQL} in request"), + format!("Missing required header {HEADER_SQL} in request"), StatusCode::BAD_REQUEST, ))?; let sql = sql.to_str().map_err(|e| { poem::Error::from_string( - format!( - "[HTTP-STREAMING-LOAD] Invalid UTF-8 in value of header {HEADER_SQL}: {}", - e - ), + format!("Invalid UTF-8 in value of header {HEADER_SQL}: {}", e), StatusCode::BAD_REQUEST, ) })?; @@ -230,7 +232,7 @@ async fn streaming_load_handler_inner( InsertInputSource::StreamingLoad(streaming_load) => { if !streaming_load.file_format.support_streaming_load() { - return Err(poem::Error::from_string( format!( "[HTTP-STREAMING-LOAD] Unsupported file format: {}", streaming_load.file_format.get_type() ), StatusCode::BAD_REQUEST)); + return Err(poem::Error::from_string( format!( "Unsupported file format: {}", streaming_load.file_format.get_type() ), StatusCode::BAD_REQUEST)); } let (tx, rx) = tokio::sync::mpsc::channel(1); *streaming_load.receiver.lock() = Some(rx); @@ -245,18 +247,18 @@ async fn streaming_load_handler_inner( stats: query_context.get_write_progress().get_values(), })), Ok(Err(cause)) => { - info!("[HTTP-STREAMING-LOAD] Query execution failed: {:?}", cause); + info!("Query execution failed: {:?}", cause); Err(poem::Error::from_string( format!( - "[HTTP-STREAMING-LOAD] Query execution failed: {}", + "Query execution failed: {}", cause.display_with_sql(sql).message() ), StatusCode::BAD_REQUEST, ))}, Err(err) => { - info!("[HTTP-STREAMING-LOAD] Internal server error: {:?}", err); + info!("Internal server error: {:?}", err); Err(poem::Error::from_string( - "[HTTP-STREAMING-LOAD] Internal server error: execution thread panicked", + "Internal server error: execution thread panicked", StatusCode::INTERNAL_SERVER_ERROR, )) }, @@ -264,7 +266,7 @@ async fn streaming_load_handler_inner( } _non_supported_source => Err(poem::Error::from_string( format!( - "[HTTP-STREAMING-LOAD] Unsupported INSERT source. Streaming upload only supports 'INSERT INTO $table FILE_FORMAT = (type = ...)'. Got: {}", + "Unsupported INSERT source. Streaming upload only supports 'INSERT INTO $table FILE_FORMAT = (type = ...)'. Got: {}", plan ), StatusCode::BAD_REQUEST, @@ -273,7 +275,7 @@ async fn streaming_load_handler_inner( } non_insert_plan => Err(poem::Error::from_string( format!( - "[HTTP-STREAMING-LOAD] Only INSERT statements are supported in streaming load. Got: {}", + "Only INSERT statements are supported in streaming load. Got: {}", non_insert_plan ), StatusCode::BAD_REQUEST, @@ -292,15 +294,12 @@ async fn read_multi_part( Err(cause) => { if let Err(cause) = tx .send(Err(ErrorCode::BadBytes(format!( - "[HTTP-STREAMING-LOAD] Failed to parse multipart data: {:?}", + "Failed to parse multipart data: {:?}", cause )))) .await { - warn!( - "[HTTP-STREAMING-LOAD] Multipart channel disconnected: {}", - cause - ); + warn!("Multipart channel disconnected: {}", cause); } return Err(cause.into()); } @@ -312,12 +311,12 @@ async fn read_multi_part( // resolve the ability to utilize name if name != Some("upload") { return Err(poem::Error::from_string( - "[HTTP-STREAMING-LOAD] Invalid field name in form-data: must be 'upload'", + "Invalid field name in form-data: must be 'upload'", StatusCode::BAD_REQUEST, )); } let filename = field.file_name().unwrap_or("file_with_no_name").to_string(); - debug!("[HTTP-STREAMING-LOAD] Started reading file: {}", &filename); + debug!("Started reading file: {}", &filename); let mut reader = field.into_async_read(); match file_format { FileFormatParams::Parquet(_) @@ -333,11 +332,7 @@ async fn read_multi_part( break; } } - debug!( - "[HTTP-STREAMING-LOAD] Read file {} with {} bytes", - filename, - data.len() - ); + debug!("Read file {} with {} bytes", filename, data.len()); let batch = BytesBatch { data, path: filename.clone(), @@ -346,10 +341,7 @@ async fn read_multi_part( }; let block = DataBlock::empty_with_meta(Box::new(batch)); if let Err(e) = tx.send(Ok(block)).await { - warn!( - "[HTTP-STREAMING-LOAD] Failed to send data to pipeline: {}", - e - ); + warn!("Failed to send data to pipeline: {}", e); break; } } @@ -372,15 +364,15 @@ async fn read_multi_part( }; let block = DataBlock::empty_with_meta(Box::new(batch)); if let Err(e) = tx.send(Ok(block)).await { - warn!( - "[HTTP-STREAMING-LOAD] Failed to send data to pipeline: {}", - e - ); + warn!("Failed to send data to pipeline: {}", e); // the caller get the actual error from interpreter return Ok(()); } if n == 0 { - debug!("[HTTP-STREAMING-LOAD] Finished reading file: {}, total size: {} bytes", &filename, offset); + debug!( + "Finished reading file: {}, total size: {} bytes", + &filename, offset + ); break; } offset += n; @@ -388,7 +380,10 @@ async fn read_multi_part( } _ => { return Err(poem::Error::from_string( - format!("[HTTP-STREAMING-LOAD] Unsupported file format for streaming load: {}", file_format), + format!( + "Unsupported file format for streaming load: {}", + file_format + ), StatusCode::BAD_REQUEST, )); } diff --git a/src/query/service/src/sessions/mod.rs b/src/query/service/src/sessions/mod.rs index 34d9603e6348c..399ebfbe53b7a 100644 --- a/src/query/service/src/sessions/mod.rs +++ b/src/query/service/src/sessions/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +databend_common_tracing::register_module_tag!("[SESSION]"); + mod query_affect; pub mod query_ctx; mod query_ctx_shared; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 560a63f27326a..e7e15c6bc5b0f 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[QUERY-CTX] ...". +databend_common_tracing::register_module_tag!("[QUERY-CTX]"); + use std::any::Any; use std::cmp::min; use std::collections::hash_map::Entry; @@ -192,7 +195,7 @@ impl QueryContext { } pub fn create_from_shared(shared: Arc) -> Arc { - debug!("[QUERY-CTX] Creating new QueryContext instance"); + debug!("Creating new QueryContext instance"); let tenant = GlobalConfig::instance().query.tenant_id.clone(); let query_settings = Settings::create(tenant); @@ -233,7 +236,7 @@ impl QueryContext { Ok(table_function.as_table()) } (Some(_), false) => Err(ErrorCode::InvalidArgument( - "[QUERY-CTX] Table args not supported in non-default catalog", + "Table args not supported in non-default catalog", )), // Load table first, if not found, try to load table function. (None, true) => { @@ -290,7 +293,7 @@ impl QueryContext { } Err(_) => { return Err(ErrorCode::UnknownDatabase(format!( - "[QUERY-CTX] Cannot use database '{}': database does not exist", + "Cannot use database '{}': database does not exist", new_database_name ))); } @@ -731,7 +734,7 @@ impl QueryContext { let _ = op.write(&meta_path, joined_contents).await?; Ok(()) }) { - log::error!("[QUERY-CTX] Failed to create spill meta file: {}", e); + log::error!("Failed to create spill meta file: {}", e); } } @@ -884,7 +887,7 @@ impl TableContext for QueryContext { fn set_status_info(&self, info: &str) { // set_status_info is not called frequently, so we can use info! here. // make it easier to match the status to the log. - info!("[QUERY-CTX] Status update: {}", info); + info!("Status update: {}", info); let mut status = self.shared.status.write(); *status = info.to_string(); } @@ -1005,7 +1008,7 @@ impl TableContext for QueryContext { .lock() .insert(table_name.to_string(), hint); info!( - "[QUERY-CTX] Set compaction hint for table '{}': old={:?}, new={}", + "Set compaction hint for table '{}': old={:?}, new={}", table_name, old, hint ); } @@ -1147,14 +1150,10 @@ impl TableContext for QueryContext { fn get_format_settings(&self) -> Result { let tz = self.get_settings().get_timezone()?; let timezone = tz.parse::().map_err(|_| { - ErrorCode::InvalidTimezone( - "[QUERY-CTX] Invalid timezone format - timezone validation failed", - ) + ErrorCode::InvalidTimezone("Invalid timezone format - timezone validation failed") })?; let jiff_timezone = TimeZone::get(&tz).map_err(|_| { - ErrorCode::InvalidTimezone( - "[QUERY-CTX] Invalid timezone format - jiff timezone parsing failed", - ) + ErrorCode::InvalidTimezone("Invalid timezone format - jiff timezone parsing failed") })?; let geometry_format = self.get_settings().get_geometry_output_format()?; let format_null_as_str = self.get_settings().get_format_null_as_str()?; @@ -1182,7 +1181,7 @@ impl TableContext for QueryContext { let tz_string = settings.get_timezone()?; let tz = TimeZone::get(&tz_string).map_err(|e| { - ErrorCode::InvalidTimezone(format!("[QUERY-CTX] Timezone validation failed: {}", e)) + ErrorCode::InvalidTimezone(format!("Timezone validation failed: {}", e)) })?; let now = Zoned::now().with_time_zone(TimeZone::UTC); let numeric_cast_option = settings.get_numeric_cast_option()?; @@ -1448,10 +1447,7 @@ impl TableContext for QueryContext { } None => { if let Some(v) = self.get_settings().get_stream_consume_batch_size_hint()? { - info!( - "[QUERY-CTX] Overriding stream max_batch_size with setting value: {}", - v - ); + info!("Overriding stream max_batch_size with setting value: {}", v); Some(v) } else { None @@ -1469,14 +1465,14 @@ impl TableContext for QueryContext { if actual_batch_limit != max_batch_size { return Err(ErrorCode::StorageUnsupported( format!( - "[QUERY-CTX] Stream batch size must be consistent within transaction: actual={:?}, requested={:?}", + "Stream batch size must be consistent within transaction: actual={:?}, requested={:?}", actual_batch_limit, max_batch_size ) )); } } else if max_batch_size.is_some() { return Err(ErrorCode::StorageUnsupported( - "[QUERY-CTX] MAX_BATCH_SIZE parameter only supported for STREAM tables", + "MAX_BATCH_SIZE parameter only supported for STREAM tables", )); } Ok(table) @@ -1493,7 +1489,7 @@ impl TableContext for QueryContext { max_files: Option, ) -> Result { if files.is_empty() { - info!("[QUERY-CTX] No files to filter for copy operation"); + info!("No files to filter for copy operation"); return Ok(FilteredCopyFiles::default()); } @@ -1552,7 +1548,7 @@ impl TableContext for QueryContext { } if result_size > COPY_MAX_FILES_PER_COMMIT { return Err(ErrorCode::Internal(format!( - "[QUERY-CTX] {}", + "{}", COPY_MAX_FILES_COMMIT_MSG ))); } @@ -1846,7 +1842,7 @@ impl TableContext for QueryContext { chrono::Duration::from_std(duration).map_err(|e| { ErrorCode::Internal(format!( - "[QUERY-CTX] Unable to construct delta duration of table meta timestamp: {e}", + "Unable to construct delta duration of table meta timestamp: {e}", )) })? }; @@ -1878,7 +1874,7 @@ impl TableContext for QueryContext { // the safety of vacuum operation. if table_meta_timestamps.segment_block_timestamp < existing_ts { return Err(ErrorCode::Internal(format!( - "[QUERY-CTX] Transaction timestamp violation: table_id = {}, new segment timestamp {:?} is lesser than existing transaction timestamp {:?}", + "Transaction timestamp violation: table_id = {}, new segment timestamp {:?} is lesser than existing transaction timestamp {:?}", table_id, table_meta_timestamps.segment_block_timestamp, existing_ts ))); } @@ -1946,7 +1942,7 @@ impl TableContext for QueryContext { } // TODO: iceberg doesn't support load from storage directly. _ => Err(ErrorCode::Internal( - "[QUERY-CTX] Unsupported datalake type for schema loading", + "Unsupported datalake type for schema loading", )), } } @@ -1971,7 +1967,7 @@ impl TableContext for QueryContext { FileFormatParams::Parquet(fmt) => { if max_column_position > 1 { Err(ErrorCode::SemanticError( - "[QUERY-CTX] Query from parquet file only support $1 as column position", + "Query from parquet file only support $1 as column position", )) } else if max_column_position == 0 { let settings = self.get_settings(); @@ -2021,14 +2017,15 @@ impl TableContext for QueryContext { } } FileFormatParams::Orc(..) => { - let is_variant = - match max_column_position { - 0 => false, - 1 => true, - _ => return Err(ErrorCode::SemanticError( - "[QUERY-CTX] Query from ORC file only support $1 as column position", - )), - }; + let is_variant = match max_column_position { + 0 => false, + 1 => true, + _ => { + return Err(ErrorCode::SemanticError( + "Query from ORC file only support $1 as column position", + )) + } + }; let schema = Arc::new(TableSchema::empty()); let info = StageTableInfo { schema, @@ -2072,7 +2069,7 @@ impl TableContext for QueryContext { }; return Err(ErrorCode::SemanticError(format!( - "[QUERY-CTX] Query from {} file lacks column positions. Specify as $1, $2, etc.", + "Query from {} file lacks column positions. Specify as $1, $2, etc.", file_type ))); } @@ -2102,7 +2099,7 @@ impl TableContext for QueryContext { } _ => { return Err(ErrorCode::Unimplemented(format!( - "[QUERY-CTX] Unsupported file format in query stage. Supported formats: Parquet, NDJson, AVRO, CSV, TSV. Provided: '{}'", + "Unsupported file format in query stage. Supported formats: Parquet, NDJson, AVRO, CSV, TSV. Provided: '{}'", stage_info.file_format_params ))); } @@ -2232,9 +2229,9 @@ impl TableContext for QueryContext { if query && !consume { continue; } - let stream = tables.get(stream_key).ok_or_else(|| { - ErrorCode::Internal("[QUERY-CTX] Stream reference not found in tables cache") - })?; + let stream = tables + .get(stream_key) + .ok_or_else(|| ErrorCode::Internal("Stream reference not found in tables cache"))?; streams_meta.push(stream.clone()); } Ok(streams_meta) diff --git a/src/query/service/src/sessions/queue_mgr.rs b/src/query/service/src/sessions/queue_mgr.rs index 65da111f33d54..bee182c37b675 100644 --- a/src/query/service/src/sessions/queue_mgr.rs +++ b/src/query/service/src/sessions/queue_mgr.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[QUERY-QUEUE] ...". +databend_common_tracing::register_module_tag!("[QUERY-QUEUE]"); + use std::collections::HashMap; use std::fmt::Display; use std::future::Future; @@ -118,17 +121,11 @@ impl QueueManager { )); provider.create_meta_store().await.map_err(|e| { - ErrorCode::MetaServiceError(format!( - "[QUERY-QUEUE] Failed to create meta store: {}", - e - )) + ErrorCode::MetaServiceError(format!("Failed to create meta store: {}", e)) })? }; - info!( - "[QUERY-QUEUE] Queue manager initialized with permits: {:?}", - permits - ); + info!("Queue manager initialized with permits: {:?}", permits); GlobalInstance::set(Self::create( permits, metastore, @@ -195,20 +192,18 @@ impl QueueManager { let acquire = Box::pin(self.acquire_inner(data)); match futures::future::select(acquire, watch_abort_notify).await { Either::Left((left, _)) => left, - Either::Right((_, _)) => Err(ErrorCode::AbortedQuery( - "[QUERY-QUEUE] recv query abort notify.", - )), + Either::Right((_, _)) => Err(ErrorCode::AbortedQuery("recv query abort notify.")), } } async fn acquire_inner(self: &Arc, data: Data) -> Result { if !data.need_acquire_to_queue() { - info!("[QUERY-QUEUE] Non-heavy queries skip the query queue and execute directly."); + info!("Non-heavy queries skip the query queue and execute directly."); return Ok(AcquireQueueGuard::create(vec![])); } info!( - "[QUERY-QUEUE] Preparing to acquire from query queue, current length: {}", + "Preparing to acquire from query queue, current length: {}", self.length() ); @@ -274,7 +269,7 @@ impl QueueManager { .await?; info!( - "[QUERY-QUEUE] Successfully acquired from global workload semaphore. elapsed: {:?}", + "Successfully acquired from global workload semaphore. elapsed: {:?}", instant.elapsed() ); timeout -= instant.elapsed(); @@ -522,12 +517,10 @@ macro_rules! impl_acquire_queue_future { Poll::Ready(match res { Ok(Ok(v)) => Ok(AcquireQueueGuardInner::$fn_name(Some(v))), - Ok(Err(_)) => Err(ErrorCode::TokioError( - "[QUERY-QUEUE] Queue acquisition failed", - )), - Err(_elapsed) => Err(ErrorCode::Timeout( - "[QUERY-QUEUE] Query queuing timeout exceeded", - )), + Ok(Err(_)) => Err(ErrorCode::TokioError("Queue acquisition failed")), + Err(_elapsed) => { + Err(ErrorCode::Timeout("Query queuing timeout exceeded")) + } }) } Poll::Pending => { @@ -717,11 +710,10 @@ impl QueueData for QueryEntry { fn remove_error_message(key: Option) -> ErrorCode { match key { - None => ErrorCode::AbortedQuery("[QUERY-QUEUE] Query was killed while in queue"), - Some(key) => ErrorCode::AbortedQuery(format!( - "[QUERY-QUEUE] Query {} was killed while in queue", - key - )), + None => ErrorCode::AbortedQuery("Query was killed while in queue"), + Some(key) => { + ErrorCode::AbortedQuery(format!("Query {} was killed while in queue", key)) + } } } @@ -747,11 +739,7 @@ impl QueueData for QueryEntry { fn exit_wait_pending(&self, wait_time: Duration) { self.ctx.set_status_info( - format!( - "[QUERY-QUEUE] Resource scheduling completed, elapsed: {:?}", - wait_time - ) - .as_str(), + format!("Resource scheduling completed, elapsed: {:?}", wait_time).as_str(), ); self.ctx.set_query_queued_duration(wait_time) } diff --git a/src/query/service/src/sessions/session_ctx.rs b/src/query/service/src/sessions/session_ctx.rs index 9b4b6552cc2ea..a2418d523962f 100644 --- a/src/query/service/src/sessions/session_ctx.rs +++ b/src/query/service/src/sessions/session_ctx.rs @@ -140,7 +140,7 @@ impl SessionContext { // Set current catalog. pub fn set_current_catalog(&self, catalog_name: String) { if catalog_name.is_empty() { - log::error!("[HTTP-QUERY] set_current_catalog, catalog_name is empty"); + log::error!("set_current_catalog, catalog_name is empty"); return; } let mut lock = self.current_catalog.write(); @@ -156,7 +156,7 @@ impl SessionContext { // Set current database. pub fn set_current_database(&self, db: String) { if db.is_empty() { - log::error!("[HTTP-QUERY] set_current_database, db is empty"); + log::error!("set_current_database, db is empty"); return; } let mut lock = self.current_database.write(); @@ -182,7 +182,7 @@ impl SessionContext { pub fn set_current_warehouse(&self, w: Option) { if w.as_ref().is_some_and(|w| w.is_empty()) { - log::error!("[HTTP-QUERY] set_current_warehouse, w is empty"); + log::error!("set_current_warehouse, w is empty"); return; } let mut lock = self.current_warehouse.write(); @@ -220,7 +220,7 @@ impl SessionContext { pub(in crate::sessions) fn set_current_tenant(&mut self, tenant: Tenant) { if tenant.tenant.is_empty() { - log::error!("[HTTP-QUERY] set_current_tenant, tenant is empty"); + log::error!("set_current_tenant, tenant is empty"); return; } self.current_tenant = Some(tenant); @@ -408,7 +408,7 @@ impl SessionContext { pub fn set_current_workload_group(&self, workload_group: String) { if workload_group.is_empty() { - log::error!("[HTTP-QUERY] set_current_workload_group, workload_group is empty"); + log::error!("set_current_workload_group, workload_group is empty"); return; } *self.current_workload_group.write() = Some(workload_group) diff --git a/src/query/service/src/task/service.rs b/src/query/service/src/task/service.rs index c71322b1a4788..4fc406540a283 100644 --- a/src/query/service/src/task/service.rs +++ b/src/query/service/src/task/service.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[PRIVATE-TASKS] ...". +databend_common_tracing::register_module_tag!("[PRIVATE-TASKS]"); + use std::collections::BTreeMap; use std::collections::HashMap; use std::ops::Deref; @@ -184,10 +187,10 @@ impl TaskService { } } if let Err(err) = task_service.prepare().await { - error!("[PRIVATE-TASKS] prepare failed due to {}", err); + error!("prepare failed due to {}", err); } if let Err(err) = task_service.work(&tenant, runtime).await { - error!("[PRIVATE-TASKS] prepare failed due to {}", err); + error!("prepare failed due to {}", err); } }, None, @@ -303,7 +306,7 @@ impl TaskService { Result::Ok(()) }; if let Err(err) = fn_work().await { - error!("[PRIVATE-TASKS] interval schedule failed due to {}", err); + error!("interval schedule failed due to {}", err); } }); } @@ -351,7 +354,7 @@ impl TaskService { Result::Ok(()) }; if let Err(err) = fn_work().await { - error!("[PRIVATE-TASKS] cron schedule failed due to {}", err); + error!("cron schedule failed due to {}", err); } }); } @@ -450,7 +453,7 @@ impl TaskService { Result::Ok(()) }; if let Err(err) = fn_work().await { - error!("[PRIVATE-TASKS] execute failed due to {}", err); + error!("execute failed due to {}", err); } }, None, diff --git a/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs b/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs index 7e8933de93db1..4a1630ed34214 100644 --- a/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs @@ -59,7 +59,7 @@ async fn test_always_call_on_finished() -> Result<()> { assert_eq!(error.code(), 1001); assert_eq!( error.message().as_str(), - "[PIPELINE-EXECUTOR] Pipeline max threads cannot be zero" + "Pipeline max threads cannot be zero" ); assert!(called_finished.load(Ordering::SeqCst)); } diff --git a/src/query/service/tests/it/servers/http/http_query_handlers.rs b/src/query/service/tests/it/servers/http/http_query_handlers.rs index 3319aada71616..12f5319748254 100644 --- a/src/query/service/tests/it/servers/http/http_query_handlers.rs +++ b/src/query/service/tests/it/servers/http/http_query_handlers.rs @@ -377,7 +377,7 @@ async fn test_simple_sql() -> Result<()> { let body = response.into_body().into_string().await.unwrap(); assert_eq!( body, - r#"{"error":{"code":404,"message":"[HTTP-QUERY] Invalid page number: requested 2, current page is 1"}}"# + r#"{"error":{"code":404,"message":"Invalid page number: requested 2, current page is 1"}}"# ); // final @@ -554,7 +554,7 @@ async fn test_active_sessions() -> Result<()> { .map(|(_status, resp)| (resp.error.map(|e| e.message).unwrap_or_default())) .collect::>(); results.sort(); - let msg = "[HTTP-QUERY] Failed to upgrade session: Current active sessions (2) has exceeded the max_active_sessions limit (2)"; + let msg = "Failed to upgrade session: Current active sessions (2) has exceeded the max_active_sessions limit (2)"; let expect = vec!["", "", msg]; assert_eq!(results, expect); Ok(()) @@ -658,7 +658,7 @@ async fn test_pagination() -> Result<()> { let body = response.into_body().into_string().await.unwrap(); assert_eq!( body, - r#"{"error":{"code":404,"message":"[HTTP-QUERY] Invalid page number: requested 6, current page is 1"}}"# + r#"{"error":{"code":404,"message":"Invalid page number: requested 6, current page is 1"}}"# ); let mut next_uri = result.next_uri.clone().unwrap(); @@ -1723,7 +1723,7 @@ async fn test_txn_timeout() -> Result<()> { assert_eq!( reply.last().1.error.unwrap().message, format!( - "[HTTP-QUERY] Transaction timeout: last_query_id {} not found on this server", + "Transaction timeout: last_query_id {} not found on this server", last_query_id ) ); diff --git a/src/query/sql/Cargo.toml b/src/query/sql/Cargo.toml index d4439203bb2c8..144f4a155e18f 100644 --- a/src/query/sql/Cargo.toml +++ b/src/query/sql/Cargo.toml @@ -28,6 +28,7 @@ databend-common-pipeline = { workspace = true } databend-common-settings = { workspace = true } databend-common-storage = { workspace = true } databend-common-storages-basic = { workspace = true } +databend-common-tracing = { workspace = true } databend-common-users = { workspace = true } databend-enterprise-data-mask-feature = { workspace = true } databend-enterprise-row-access-policy-feature = { workspace = true } diff --git a/src/query/sql/src/planner/binder/location.rs b/src/query/sql/src/planner/binder/location.rs index 798b4cd9f40ab..db0bb4a3c6f90 100644 --- a/src/query/sql/src/planner/binder/location.rs +++ b/src/query/sql/src/planner/binder/location.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[HISTORY-TABLES] ...". +databend_common_tracing::register_module_tag!("[HISTORY-TABLES]"); + use std::collections::BTreeMap; use std::io::Error; use std::io::ErrorKind; @@ -178,7 +181,7 @@ fn parse_s3_params(l: &mut UriLocation, root: String) -> Result { let in_history_table_scope = ThreadTracker::capture_log_settings() .is_some_and(|settings| settings.level == LevelFilter::Off); if in_history_table_scope { - info!("[HISTORY-TABLES] Enable credential loader for history tables"); + info!("Enable credential loader for history tables"); disable_credential_loader = false; } diff --git a/src/query/sql/src/planner/optimizer/mod.rs b/src/query/sql/src/planner/optimizer/mod.rs index ad61a79c23b4a..ace3f3e86a213 100644 --- a/src/query/sql/src/planner/optimizer/mod.rs +++ b/src/query/sql/src/planner/optimizer/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +databend_common_tracing::register_module_tag!("[OPTIMIZER]"); + mod cost; pub mod ir; #[allow(clippy::module_inception)] diff --git a/src/query/sql/src/planner/planner.rs b/src/query/sql/src/planner/planner.rs index 700a591637eae..f3f900ee9a47c 100644 --- a/src/query/sql/src/planner/planner.rs +++ b/src/query/sql/src/planner/planner.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[SQL-PLANNER] ...". +databend_common_tracing::register_module_tag!("[SQL-PLANNER]"); + use std::sync::Arc; use std::time::Instant; @@ -101,14 +104,12 @@ impl Planner { let options = prqlc::Options::default(); match prqlc::compile(sql, &options) { Ok(res) => { - info!("[SQL-PLANNER] PRQL to SQL conversion successful: {}", &res); + info!("PRQL to SQL conversion successful: {}", &res); prql_converted = true; res } Err(e) => { - warn!( - "[SQL-PLANNER] PRQL to SQL conversion failed, fallback to raw SQL parsing: {e}" - ); + warn!("PRQL to SQL conversion failed, fallback to raw SQL parsing: {e}"); sql.to_string() } } @@ -167,9 +168,7 @@ impl Planner { && sql_dialect == Dialect::PRQL && !prql_converted { - return Err(ErrorCode::SyntaxException( - "[SQL-PLANNER] PRQL to SQL conversion failed", - )); + return Err(ErrorCode::SyntaxException("PRQL to SQL conversion failed")); } self.replace_stmt(&mut stmt)?; @@ -249,7 +248,7 @@ impl Planner { ); if let Some(plan) = plan { info!( - "[SQL-PLANNER] Logical plan retrieved from cache, elapsed: {:?}", + "Logical plan retrieved from cache, elapsed: {:?}", start.elapsed() ); // update for clickhouse handler @@ -292,7 +291,7 @@ impl Planner { } info!( - "[SQL-PLANNER] Logical plan construction completed, elapsed: {:?}", + "Logical plan construction completed, elapsed: {:?}", start.elapsed() ); Ok(optimized_plan) @@ -331,7 +330,7 @@ impl Planner { let max_set_ops = self.ctx.get_settings().get_max_set_operator_count()?; if max_set_ops < set_ops_counter.count as u64 { return Err(ErrorCode::SyntaxException(format!( - "[SQL-PLANNER] Set operations count {} exceeds maximum limit {}", + "Set operations count {} exceeds maximum limit {}", set_ops_counter.count, max_set_ops ))); } diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index a1ed42e8d5268..68ebf2b0bd762 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -26,6 +26,7 @@ databend-common-pipeline = { workspace = true } databend-common-pipeline-transforms = { workspace = true } databend-common-sql = { workspace = true } databend-common-storage = { workspace = true } +databend-common-tracing = { workspace = true } databend-common-users = { workspace = true } databend-enterprise-fail-safe = { workspace = true } databend-enterprise-vacuum-handler = { workspace = true } diff --git a/src/query/storages/fuse/src/io/snapshots.rs b/src/query/storages/fuse/src/io/snapshots.rs index 70f3f30e9fa16..64dc1895505e4 100644 --- a/src/query/storages/fuse/src/io/snapshots.rs +++ b/src/query/storages/fuse/src/io/snapshots.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[FUSE-SNAPSHOT] ...". +databend_common_tracing::register_module_tag!("[FUSE-SNAPSHOT]"); + use std::collections::HashMap; use std::collections::HashSet; use std::path::Path; @@ -84,10 +87,7 @@ impl SnapshotsIO { ver, put_cache: true, }; - info!( - "[FUSE-SNAPSHOT] Reading snapshot with parameters: {:?}", - load_params - ); + info!("Reading snapshot with parameters: {:?}", load_params); let snapshot = reader.read(&load_params).await?; Ok((snapshot, ver)) } @@ -118,7 +118,7 @@ impl SnapshotsIO { // Error is directly returned, since it can be ignored through flatten // in read_snapshot_lites_ext. return Err(ErrorCode::StorageOther( - "[FUSE-SNAPSHOT] Invalid snapshot: timestamp must be less than min_snapshot_timestamp", + "Invalid snapshot: timestamp must be less than min_snapshot_timestamp", )); } Ok(TableSnapshotLite::from((snapshot.as_ref(), ver))) @@ -200,7 +200,7 @@ impl SnapshotsIO { snapshot_files.len(), start.elapsed() ); - info!("[FUSE-SNAPSHOT] {}", status); + info!("{}", status); (status_callback)(status); } } @@ -264,7 +264,7 @@ impl SnapshotsIO { // members of precedents of the current snapshot, though. // Error is directly returned, since we can be ignored through flatten. return Err(ErrorCode::StorageOther( - "[FUSE-SNAPSHOT] Invalid snapshot: timestamp must be less than root snapshot timestamp", + "Invalid snapshot: timestamp must be less than root snapshot timestamp", )); } let mut segments = HashSet::new(); @@ -382,7 +382,7 @@ impl SnapshotsIO { }, _ => { warn!( - "[FUSE-SNAPSHOT] Non-file entry found in prefix '{}', entry: {:?}", + "Non-file entry found in prefix '{}', entry: {:?}", prefix, de ); continue; diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 9d0efe15aa718..8bb5462b4d347 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[SINK-COMMIT] ...". +databend_common_tracing::register_module_tag!("[SINK-COMMIT]"); + use std::any::Any; use std::collections::HashMap; use std::sync::Arc; @@ -344,7 +347,7 @@ where F: SnapshotGenerator + Send + Sync + 'static let respect_flash_back = true; vacuum_table(tbl, self.ctx.clone(), vacuum_handler, respect_flash_back).await; } else { - info!("[SINK-COMMIT] No vacuum handler available for auto vacuuming, please verify your license"); + info!("No vacuum handler available for auto vacuuming, please verify your license"); } Ok(()) @@ -599,7 +602,7 @@ where F: SnapshotGenerator + Send + Sync + 'static { let elapsed_time = self.start_time.elapsed(); let status = format!( - "[SINK-COMMIT] Mutation committed successfully after {} retries in {:?}", + "Mutation committed successfully after {} retries in {:?}", self.retries, elapsed_time ); metrics_inc_commit_milliseconds(elapsed_time.as_millis()); @@ -627,7 +630,7 @@ where F: SnapshotGenerator + Send + Sync + 'static (tbl, stream_descriptions) }; info!( - "[SINK-COMMIT] Mutation committed successfully, targets: {:?}", + "Mutation committed successfully, targets: {:?}", target_descriptions ); self.state = State::Finish; @@ -638,7 +641,7 @@ where F: SnapshotGenerator + Send + Sync + 'static Some(d) => { let name = table_info.name.clone(); debug!( - "[SINK-COMMIT] TableVersionMismatched error detected, transaction will retry in {} ms. Table: {}, ID: {}", + "TableVersionMismatched error detected, transaction will retry in {} ms. Table: {}, ID: {}", d.as_millis(), name.as_str(), table_info.ident diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 765d5af8ec77a..1a6d721783909 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[FUSE-PARTITIONS] ...". +databend_common_tracing::register_module_tag!("[FUSE-PARTITIONS]"); + use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; @@ -128,7 +131,7 @@ impl FuseTable { let snapshot = self.read_table_snapshot().await?; info!( - "[FUSE-PARTITIONS] Reading partitions for table {}, push downs: {:?}, snapshot: {:?}", + "Reading partitions for table {}, push downs: {:?}, snapshot: {:?}", self.name(), push_downs, snapshot.as_ref().map(|sn| sn.snapshot_id) @@ -244,7 +247,7 @@ impl FuseTable { if let Some((stat, part)) = Self::check_prune_cache(&derterministic_cache_key) { ctx.set_pruned_partitions_stats(plan_id, stat); let sender = part_info_tx.clone(); - info!("[FUSE-PARTITIONS] Retrieved pruning result from cache"); + info!("Retrieved pruning result from cache"); source_pipeline.set_on_init(move || { // We cannot use the runtime associated with the query to avoid increasing its lifetime. GlobalIORuntime::instance().spawn(async move { @@ -262,10 +265,7 @@ impl FuseTable { }); if let Err(cause) = join_handler.await { - log::warn!( - "[FUSE-PARTITIONS] Join error in prune pipeline: {:?}", - cause - ); + log::warn!("Join error in prune pipeline: {:?}", cause); } Result::Ok(()) @@ -324,10 +324,7 @@ impl FuseTable { }); if let Err(cause) = join_handler.await { - log::warn!( - "[FUSE-PARTITIONS] Join error in prune pipeline: {:?}", - cause - ); + log::warn!("Join error in prune pipeline: {:?}", cause); } Ok::<_, ErrorCode>(()) }); @@ -350,7 +347,7 @@ impl FuseTable { let num_segments_to_prune = segments_location.len(); let start = Instant::now(); info!( - "[FUSE-PARTITIONS] prune snapshot block start, {} segment to be processed, at node {}", + "prune snapshot block start, {} segment to be processed, at node {}", num_segments_to_prune, ctx.get_cluster().local_id, ); @@ -371,7 +368,7 @@ impl FuseTable { }); if let Some(cached_result) = Self::check_prune_cache(&derterministic_cache_key) { - info!("[FUSE-PARTITIONS] Retrieved snapshot block pruning result from cache"); + info!("Retrieved snapshot block pruning result from cache"); return Ok(cached_result); } @@ -382,7 +379,7 @@ impl FuseTable { let pruning_stats = pruner.pruning_stats(); info!( - "[FUSE-PARTITIONS] prune snapshot block end, final block numbers:{}, out of {} segments, cost:{:?}, at node {}", + "prune snapshot block end, final block numbers:{}, out of {} segments, cost:{:?}, at node {}", block_metas.len(), num_segments_to_prune, start.elapsed(), @@ -547,7 +544,7 @@ impl FuseTable { let pruned_part_stats = send_part_state.get_pruned_stats(); ctx.set_pruned_partitions_stats(plan_id, pruned_part_stats); if enable_prune_cache { - info!("[FUSE-PARTITIONS] Prune cache enabled"); + info!("Prune cache enabled"); send_part_state.populating_cache(); } } diff --git a/src/query/storages/fuse/src/operations/vacuum.rs b/src/query/storages/fuse/src/operations/vacuum.rs index dd16e5d3dc24e..c6a4ba62d3960 100644 --- a/src/query/storages/fuse/src/operations/vacuum.rs +++ b/src/query/storages/fuse/src/operations/vacuum.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[VACUUM] ...". +databend_common_tracing::register_module_tag!("[VACUUM]"); + // src/query/storages/fuse/src/vacuum/mod.rs use std::sync::Arc; @@ -33,7 +36,7 @@ pub async fn vacuum_table( respect_flash_back: bool, ) { warn!( - "[VACUUM] Vacuuming table: {}, ident: {}", + "Vacuuming table: {}, ident: {}", fuse_table.table_info.name, fuse_table.table_info.ident ); @@ -42,12 +45,9 @@ pub async fn vacuum_table( .await { // Vacuum in a best-effort manner, errors are ignored - warn!( - "[VACUUM] Vacuum table {} failed : {}", - fuse_table.table_info.name, e - ); + warn!("Vacuum table {} failed : {}", fuse_table.table_info.name, e); } else { - info!("[VACUUM] Vacuum table {} done", fuse_table.table_info.name); + info!("Vacuum table {} done", fuse_table.table_info.name); } } diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index 03e6cbfd51d94..54b2653605af8 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -257,7 +257,7 @@ impl FusePruner { let v = std::cmp::max(max_io_requests, 10); if v > max_io_requests { warn!( - "[FUSE-PRUNER] max_io_requests setting too low ({}), automatically increased to {} for optimal performance", + "max_io_requests setting too low ({}), automatically increased to {} for optimal performance", max_io_requests, v ) } @@ -265,7 +265,7 @@ impl FusePruner { }; info!( - "[FUSE-PRUNER] Pruning max concurrency configured to {} threads", + "Pruning max concurrency configured to {} threads", max_concurrency ); @@ -631,7 +631,7 @@ pub fn table_sample(push_down_info: &Option) -> Result if let Some(block_sample_value) = sample.block_level { if block_sample_value > 100.0 { return Err(ErrorCode::SyntaxException(format!( - "[FUSE-PRUNER] Invalid sample value: {} exceeds maximum allowed value of 100", + "Invalid sample value: {} exceeds maximum allowed value of 100", block_sample_value ))); } diff --git a/src/query/storages/fuse/src/pruning/vector_index_pruner.rs b/src/query/storages/fuse/src/pruning/vector_index_pruner.rs index 69a2b4201f2dd..a0d32f65f1f14 100644 --- a/src/query/storages/fuse/src/pruning/vector_index_pruner.rs +++ b/src/query/storages/fuse/src/pruning/vector_index_pruner.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[FUSE-PRUNER] ...". +databend_common_tracing::register_module_tag!("[FUSE-PRUNER]"); + use std::cmp::Ordering; use std::collections::HashMap; use std::collections::HashSet; @@ -203,9 +206,9 @@ impl VectorIndexPruner { metrics_inc_block_vector_index_pruning_milliseconds(elapsed); } if !param.has_filter && param.asc { - info!("[FUSE-PRUNER] Vector index hnsw topn prune elapsed: {elapsed}"); + info!("Vector index hnsw topn prune elapsed: {elapsed}"); } else { - info!("[FUSE-PRUNER] Vector index calculate score topn prune elapsed: {elapsed}"); + info!("Vector index calculate score topn prune elapsed: {elapsed}"); } return Ok(pruned_metas); @@ -391,7 +394,7 @@ impl VectorIndexPruner { { metrics_inc_block_vector_index_pruning_milliseconds(elapsed); } - info!("[FUSE-PRUNER] Vector index calculate score elapsed: {elapsed}"); + info!("Vector index calculate score elapsed: {elapsed}"); Ok(new_metas) } diff --git a/src/query/storages/fuse/src/pruning_pipeline/vector_index_prune_transform.rs b/src/query/storages/fuse/src/pruning_pipeline/vector_index_prune_transform.rs index 513e683dfe00f..23d655e4cdb4b 100644 --- a/src/query/storages/fuse/src/pruning_pipeline/vector_index_prune_transform.rs +++ b/src/query/storages/fuse/src/pruning_pipeline/vector_index_prune_transform.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Logs from this module will show up as "[PROCESSOR-ASYNC-TASK] ...". +databend_common_tracing::register_module_tag!("[PROCESSOR-ASYNC-TASK]"); + use std::sync::Arc; use std::time::Instant; @@ -79,7 +82,7 @@ impl VectorIndexPruneTransform { let start = Instant::now(); let pruned = self.vector_index_pruner.prune(self.metas.clone()).await?; let elapsed = start.elapsed().as_millis() as u64; - info!("[PROCESSOR-ASYNC-TASK] Vector index prune transform elapsed: {elapsed}"); + info!("Vector index prune transform elapsed: {elapsed}"); if pruned.is_empty() { Ok(None) diff --git a/src/query/storages/orc/src/read_pipeline.rs b/src/query/storages/orc/src/read_pipeline.rs index 9295525a3423f..d5336d4f43261 100644 --- a/src/query/storages/orc/src/read_pipeline.rs +++ b/src/query/storages/orc/src/read_pipeline.rs @@ -100,7 +100,7 @@ impl OrcTable { let settings = ctx.get_settings(); let tz_string = settings.get_timezone()?; let tz = TimeZone::get(&tz_string).map_err(|e| { - ErrorCode::InvalidTimezone(format!("[QUERY-CTX] Timezone validation failed: {}", e)) + ErrorCode::InvalidTimezone(format!("Timezone validation failed: {}", e)) })?; pipeline.add_accumulating_transformer(|| { StripeDecoderForVariantTable::new(ctx.clone(), tz.clone(), internal_columns.clone()) diff --git a/src/query/storages/parquet/src/parquet_variant_table/source.rs b/src/query/storages/parquet/src/parquet_variant_table/source.rs index 40af13ed5dbda..bc74be4c4f5d7 100644 --- a/src/query/storages/parquet/src/parquet_variant_table/source.rs +++ b/src/query/storages/parquet/src/parquet_variant_table/source.rs @@ -107,7 +107,7 @@ impl ParquetVariantSource { let settings = ctx.get_settings(); let tz_string = settings.get_timezone()?; let tz = TimeZone::get(&tz_string).map_err(|e| { - ErrorCode::InvalidTimezone(format!("[QUERY-CTX] Timezone validation failed: {}", e)) + ErrorCode::InvalidTimezone(format!("Timezone validation failed: {}", e)) })?; Ok(ProcessorPtr::create(Box::new(Self { diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0006_route_error.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0006_route_error.py index a9b0d5d6b59be..ed1a32c4a3cbd 100644 --- a/tests/nox/suites/1_stateful/09_http_handler/test_09_0006_route_error.py +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0006_route_error.py @@ -49,11 +49,11 @@ def get_query_final(query_id, node_id=None): @comparison_output( """# error ## page -{"error":{"code":404,"message":"[HTTP-QUERY] Routing error: query QID should be on server XXX, but current server is NODE, which started ... ago)"}} +{"error":{"code":404,"message":"Routing error: query QID should be on server XXX, but current server is NODE, which started ... ago)"}} ## kill -{"error":{"code":404,"message":"[HTTP-QUERY] Routing error: query QID should be on server XXX, but current server is NODE, which started ... ago)"}} +{"error":{"code":404,"message":"Routing error: query QID should be on server XXX, but current server is NODE, which started ... ago)"}} ## final -{"error":{"code":404,"message":"[HTTP-QUERY] Routing error: query QID should be on server XXX, but current server is NODE, which started ... ago)"}} +{"error":{"code":404,"message":"Routing error: query QID should be on server XXX, but current server is NODE, which started ... ago)"}} # ok ## page diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0012_token.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0012_token.py index fdf626e79cf20..c0b45c12d002a 100755 --- a/tests/nox/suites/1_stateful/09_http_handler/test_09_0012_token.py +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0012_token.py @@ -170,22 +170,21 @@ def fake_expired_token(ty): ---- do_query('select 2',) 401 {'code': 5100, - 'message': '[AUTH] JWT authentication failed: JWT auth is not configured on ' - 'this server'} + 'message': 'JWT authentication failed: JWT auth is not configured on this ' + 'server'} ---- do_query('select 3',) 401 {'code': 5100, - 'message': '[HTTP-SESSION] Failed to decode token: base64 decode error: ' - 'Invalid padding, token: bend-v1-s-xxx'} + 'message': 'Failed to decode token: base64 decode error: Invalid padding, ' + 'token: bend-v1-s-xxx'} ---- do_query('select 4',) 401 {'code': 5101, - 'message': '[HTTP-SESSION] Authentication failed: session token has expired'} + 'message': 'Authentication failed: session token has expired'} ---- do_query('select 5',) 401 {'code': 5100, - 'message': '[HTTP-SESSION] Authentication error: incorrect token type for ' - 'this endpoint'} + 'message': 'Authentication error: incorrect token type for this endpoint'} ---- do_refresh(1,) 200 ['tokens'] @@ -198,39 +197,35 @@ def fake_expired_token(ty): ---- do_refresh(2,) 401 {'code': 5100, - 'message': '[AUTH] JWT authentication failed: JWT auth is not configured on ' - 'this server'} + 'message': 'JWT authentication failed: JWT auth is not configured on this ' + 'server'} ---- do_refresh(3,) 401 {'code': 5100, 'message': "invalid token type 'x'"} ---- do_refresh(4,) 401 {'code': 5102, - 'message': '[HTTP-SESSION] Authentication failed: refresh token has expired'} + 'message': 'Authentication failed: refresh token has expired'} ---- do_refresh(5,) 401 {'code': 5100, - 'message': '[HTTP-SESSION] Authentication error: incorrect token type for ' - 'this endpoint'} + 'message': 'Authentication error: incorrect token type for this endpoint'} ---- do_refresh(6,) 200 ---- do_logout(0,) 401 {'code': 5100, - 'message': '[HTTP-SESSION] Authentication error: incorrect token type for ' - 'this endpoint'} + 'message': 'Authentication error: incorrect token type for this endpoint'} ---- do_logout(1,) 200 ---- do_query("select 'after logout'",) 401 {'code': 5103, - 'message': '[HTTP-SESSION] Authentication failed: session token not found in ' - 'database'} + 'message': 'Authentication failed: session token not found in database'} ---- do_refresh('after_logout',) 401 {'code': 5104, - 'message': '[HTTP-SESSION] Authentication failed: refresh token not found in ' - 'database'} + 'message': 'Authentication failed: refresh token not found in database'} """ ) def test_token(): diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0014_query_lifecycle.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0014_query_lifecycle.py index 3ea94ae35358e..06b768861ef76 100644 --- a/tests/nox/suites/1_stateful/09_http_handler/test_09_0014_query_lifecycle.py +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0014_query_lifecycle.py @@ -230,8 +230,8 @@ def test_query_lifecycle_finalized(rows): assert do_hb([resp0]) == (200, {"queries_to_remove": [query_id]}) # Fetch the page 0 result in 404 - assert get_next_page(f"/v1/query/{query_id}/page/0", node_id) == (404, {"error":{"code":404,"message":"[HTTP-QUERY] Invalid page number: requested 0, current page is 2"}}) - assert get_next_page(f"/v1/query/{query_id}/page/{large_page_no}", node_id) == (404, {"error":{"code":404,"message":f"[HTTP-QUERY] Invalid page number: requested {large_page_no}, current page is 2"}}) + assert get_next_page(f"/v1/query/{query_id}/page/0", node_id) == (404, {"error":{"code":404,"message":"Invalid page number: requested 0, current page is 2"}}) + assert get_next_page(f"/v1/query/{query_id}/page/{large_page_no}", node_id) == (404, {"error":{"code":404,"message":f"Invalid page number: requested {large_page_no}, current page is 2"}}) # Finalize the query (CloseReason::Finalized), support retry exp["next_uri"] = None @@ -244,7 +244,7 @@ def test_query_lifecycle_finalized(rows): assert cancel_resp.status_code == 200 # Fetch the page 0 result in 400 - assert get_next_page(f"/v1/query/{query_id}/page/0", node_id) == (400, {"error":{"code":400,"message":f"[HTTP-QUERY] Query {query_id} is closed for finalized"}}) + assert get_next_page(f"/v1/query/{query_id}/page/0", node_id) == (400, {"error":{"code":400,"message":f"Query {query_id} is closed for finalized"}}) assert do_hb([resp0]) == (200, {"queries_to_remove": [query_id]}) # assert get_query_state(query_id, node_id) == (200, exp_state) @@ -283,7 +283,7 @@ def test_query_lifecycle_canceled(): assert cancel_resp.status_code == 200, f"Cancel failed: {cancel_resp.text}" assert cancel_resp.text == "", f"Cancel failed: {cancel_resp.text}" - assert get_next_page(f"/v1/query/{query_id}/page/0", node_id) == (400, {"error":{"code":400,"message":f"[HTTP-QUERY] Query {query_id} is closed for canceled"}}) + assert get_next_page(f"/v1/query/{query_id}/page/0", node_id) == (400, {"error":{"code":400,"message":f"Query {query_id} is closed for canceled"}}) status_code, resp_state = get_query_state(query_id, node_id) assert status_code == 200 assert resp_state.get("state") == "Failed" @@ -342,7 +342,7 @@ def test_query_lifecycle_timeout(rows): node_id = resp.get("node_id") query_id = resp.get("id") uri_page_0 = f"/v1/query/{query_id}/page/0" - assert get_next_page(uri_page_0, node_id) == (400, {"error":{"code":400,"message":f"[HTTP-QUERY] Query {query_id} is closed for timed out"}}) + assert get_next_page(uri_page_0, node_id) == (400, {"error":{"code":400,"message":f"Query {query_id} is closed for timed out"}}) assert do_hb([resp]) == (200, {"queries_to_remove": [query_id]}) diff --git a/tests/suites/1_stateful/00_stage/00_0004_copy_with_max_files.result b/tests/suites/1_stateful/00_stage/00_0004_copy_with_max_files.result index 2f33caa8d7f63..6ef9cda03100b 100644 --- a/tests/suites/1_stateful/00_stage/00_0004_copy_with_max_files.result +++ b/tests/suites/1_stateful/00_stage/00_0004_copy_with_max_files.result @@ -33,7 +33,7 @@ copied 0 files with 6 rows, remain 0 files >>>> drop table if exists test_max_files_limit >>>> create table test_max_files_limit (a int, b int) >>>> copy into test_max_files_limit from 'fs:///tmp/00_0004_2/' FILE_FORMAT = (type = CSV) -Error: APIError: QueryFailed: [1001][QUERY-CTX] Commit limit reached: 15,000 files for 'copy into table'. To handle more files, adjust 'CopyOption' with 'max_files='(e.g., 'max_files=10000') and perform several operations until all files are processed. +Error: APIError: QueryFailed: [1001]Commit limit reached: 15,000 files for 'copy into table'. To handle more files, adjust 'CopyOption' with 'max_files='(e.g., 'max_files=10000') and perform several operations until all files are processed. <<<< >>>> copy into test_max_files_limit from 'fs:///tmp/00_0004_2/' FILE_FORMAT = (type = CSV) force=true Error: APIError: QueryFailed: [1001][COPY-PLANNER] Commit limit reached: 15,000 files for 'copy into table'. To handle more files, adjust 'CopyOption' with 'max_files='(e.g., 'max_files=10000') and perform several operations until all files are processed. diff --git a/tests/suites/1_stateful/01_streaming_load/01_0006_streaming_load_parquet.result b/tests/suites/1_stateful/01_streaming_load/01_0006_streaming_load_parquet.result index af42ad6f64917..e50553d2cb9f1 100755 --- a/tests/suites/1_stateful/01_streaming_load/01_0006_streaming_load_parquet.result +++ b/tests/suites/1_stateful/01_streaming_load/01_0006_streaming_load_parquet.result @@ -16,7 +16,7 @@ ok 1 2021-01-01 q2.parquet 426 1 >>>> streaming load: q2.parquet error : + curl -sS -H x-databend-query-id:load-q2 -H 'X-Databend-SQL:insert into streaming_load_parquet(c2,c3) from @_databend_load file_format = (type='\''parquet'\'', missing_field_as=error, null_if=())' -F upload=@/tmp/streaming_load_parquet/q2.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load -{"error":{"code":400,"message":"[HTTP-STREAMING-LOAD] Query execution failed: file q2.parquet missing column `c2`"}} +{"error":{"code":400,"message":"Query execution failed: file q2.parquet missing column `c2`"}} <<<< >>>> select * from streaming_load_parquet; <<<<