Skip to content

Commit 489eb9c

Browse files
authored
feat(query): add table statistics admin api (#18967)
1 parent 4f221a0 commit 489eb9c

File tree

5 files changed

+196
-120
lines changed

5 files changed

+196
-120
lines changed

src/query/service/src/servers/admin/admin_service.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ impl AdminService {
110110
.at(
111111
"/v1/tenants/:tenant/user_functions",
112112
get(super::v1::user_functions::user_functions),
113+
)
114+
.at(
115+
"/v1/tenants/:tenant/databases/:database/tables/:table/stats",
116+
get(super::v1::table_statistics::get_table_stats_handler),
113117
);
114118
}
115119

src/query/service/src/servers/admin/v1/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub mod query_dump;
2020
pub mod settings;
2121
pub mod stream_status;
2222
pub mod system;
23+
pub mod table_statistics;
2324
pub mod tenant_table_stats;
2425
pub mod tenant_tables;
2526
pub mod user_functions;
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_catalog::catalog::CatalogManager;
18+
use databend_common_catalog::session_type::SessionType;
19+
use databend_common_catalog::table_context::TableContext;
20+
use databend_common_exception::Result;
21+
use databend_common_meta_app::tenant::Tenant;
22+
use databend_common_storages_system::StatisticsTable;
23+
use databend_common_storages_system::TableColumnStatistics;
24+
use poem::web::Json;
25+
use poem::web::Path;
26+
use poem::IntoResponse;
27+
28+
use crate::sessions::SessionManager;
29+
30+
#[poem::handler]
31+
#[async_backtrace::framed]
32+
pub async fn get_table_stats_handler(
33+
Path((tenant, database, table)): Path<(String, String, String)>,
34+
) -> poem::Result<impl IntoResponse> {
35+
match dump_columns(&tenant, &database, &table).await {
36+
Ok(columns) => Ok(Json(columns)),
37+
Err(error) => Err(poem::error::InternalServerError(error)),
38+
}
39+
}
40+
41+
async fn dump_columns(
42+
tenant: &str,
43+
database: &str,
44+
table: &str,
45+
) -> Result<Vec<TableColumnStatistics>> {
46+
let dummy_session = SessionManager::instance()
47+
.create_session(SessionType::Dummy)
48+
.await?;
49+
50+
let session = SessionManager::instance().register_session(dummy_session)?;
51+
52+
let ctx: Arc<dyn TableContext> = session
53+
.create_query_context(&databend_common_version::BUILD_INFO)
54+
.await?;
55+
56+
let catalog = CatalogManager::instance().get_default_catalog(Default::default())?;
57+
58+
let table = catalog
59+
.get_table(&Tenant::new_literal(tenant), database, table)
60+
.await?;
61+
62+
StatisticsTable::dump_table_columns(&ctx, &catalog, database, &table).await
63+
}

src/query/storages/system/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ pub use roles_table::RolesTable;
118118
pub use settings_table::SettingsTable;
119119
pub use stages_table::StagesTable;
120120
pub use statistics_table::StatisticsTable;
121+
pub use statistics_table::TableColumnStatistics;
121122
pub use streams_table::FullStreamsTable;
122123
pub use streams_table::TerseStreamsTable;
123124
pub use table::SyncOneBlockSystemTable;

src/query/storages/system/src/statistics_table.rs

Lines changed: 127 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ pub struct StatisticsTable {
5151
table_info: TableInfo,
5252
}
5353

54-
#[derive(Default)]
55-
struct TableColumnStatistics {
54+
#[derive(Default, serde::Serialize, serde::Deserialize)]
55+
pub struct TableColumnStatistics {
5656
database_name: String,
5757
table_name: String,
5858
column_name: String,
@@ -119,7 +119,7 @@ impl StatisticsTable {
119119
}
120120

121121
#[async_backtrace::framed]
122-
async fn dump_table_columns(
122+
async fn dump_tables_columns(
123123
&self,
124124
ctx: Arc<dyn TableContext>,
125125
push_downs: Option<PushDownInfo>,
@@ -129,139 +129,146 @@ impl StatisticsTable {
129129
let mut rows: Vec<TableColumnStatistics> = vec![];
130130
for (database, tables) in database_and_tables {
131131
for table in tables {
132-
match table.engine() {
133-
VIEW_ENGINE => {
134-
let fields = if let Some(query) = table.options().get(QUERY) {
135-
let mut planner = Planner::new(ctx.clone());
136-
match planner.plan_sql(query).await {
137-
Ok((plan, _)) => {
138-
infer_table_schema(&plan.schema())?.fields().clone()
139-
}
140-
Err(e) => {
141-
// If VIEW SELECT QUERY plan err, should return empty. not destroy the query.
142-
warn!(
143-
"failed to get columns for {}: {}",
144-
table.get_table_info().desc,
145-
e
146-
);
147-
vec![]
148-
}
149-
}
150-
} else {
132+
rows.extend(Self::dump_table_columns(&ctx, catalog, &database, &table).await?);
133+
}
134+
}
135+
Ok(rows)
136+
}
137+
138+
pub async fn dump_table_columns(
139+
ctx: &Arc<dyn TableContext>,
140+
catalog: &Arc<dyn Catalog>,
141+
database: &str,
142+
table: &Arc<dyn Table>,
143+
) -> Result<Vec<TableColumnStatistics>> {
144+
let mut columns = vec![];
145+
match table.engine() {
146+
VIEW_ENGINE => {
147+
let fields = if let Some(query) = table.options().get(QUERY) {
148+
let mut planner = Planner::new(ctx.clone());
149+
match planner.plan_sql(query).await {
150+
Ok((plan, _)) => infer_table_schema(&plan.schema())?.fields().clone(),
151+
Err(e) => {
152+
// If VIEW SELECT QUERY plan err, should return empty. not destroy the query.
153+
warn!(
154+
"failed to get columns for {}: {}",
155+
table.get_table_info().desc,
156+
e
157+
);
151158
vec![]
152-
};
153-
for field in fields {
154-
rows.push(TableColumnStatistics {
155-
database_name: database.clone(),
156-
table_name: table.name().into(),
157-
column_name: field.name,
158-
..Default::default()
159-
})
160159
}
161160
}
162-
_ => {
163-
let schema = table.schema();
164-
// attach table should not collect statistics, source table column already collect them.
165-
let columns_statistics = if !FuseTable::is_table_attached(
166-
&table.get_table_info().meta.options,
161+
} else {
162+
vec![]
163+
};
164+
for field in fields {
165+
columns.push(TableColumnStatistics {
166+
database_name: database.to_string(),
167+
table_name: table.name().into(),
168+
column_name: field.name,
169+
..Default::default()
170+
})
171+
}
172+
}
173+
_ => {
174+
let schema = table.schema();
175+
// attach table should not collect statistics, source table column already collect them.
176+
let columns_statistics =
177+
if !FuseTable::is_table_attached(&table.get_table_info().meta.options) {
178+
table
179+
.column_statistics_provider(ctx.clone())
180+
.await
181+
.unwrap_or_else(|e| {
182+
let msg = format!(
183+
"Collect {}.{}.{} column statistics with error: {}",
184+
catalog.name(),
185+
database,
186+
table.name(),
187+
e
188+
);
189+
warn!("{}", msg);
190+
ctx.push_warning(msg);
191+
Box::new(DummyColumnStatisticsProvider)
192+
})
193+
} else {
194+
Box::new(DummyColumnStatisticsProvider)
195+
};
196+
let stats_row_count = columns_statistics.stats_num_rows();
197+
let actual_row_count = columns_statistics.num_rows();
198+
for field in schema.fields() {
199+
let column_id = field.column_id;
200+
let column_statistics = columns_statistics.column_statistics(column_id);
201+
let his_info = columns_statistics.histogram(column_id);
202+
let histogram = if let Some(his_info) = his_info {
203+
let mut his_infos = vec![];
204+
for (i, bucket) in his_info.buckets.iter().enumerate() {
205+
let min = bucket.lower_bound().to_string()?;
206+
let max = bucket.upper_bound().to_string()?;
207+
let ndv = bucket.num_distinct();
208+
let count = bucket.num_values();
209+
let his_info = format!(
210+
"[bucket id: {:?}, min: {:?}, max: {:?}, ndv: {:?}, count: {:?}]",
211+
i, min, max, ndv, count
212+
);
213+
his_infos.push(his_info);
214+
}
215+
his_infos.join(", ")
216+
} else {
217+
"".to_string()
218+
};
219+
columns.push(TableColumnStatistics {
220+
database_name: database.to_string(),
221+
table_name: table.name().into(),
222+
column_name: field.name().clone(),
223+
stats_row_count,
224+
actual_row_count,
225+
distinct_count: column_statistics.and_then(|v| v.ndv),
226+
null_count: column_statistics.map(|v| v.null_count),
227+
min: column_statistics
228+
.and_then(|s| s.min.clone())
229+
.map(|v| v.to_string().unwrap()),
230+
max: column_statistics
231+
.and_then(|s| s.max.clone())
232+
.map(|v| v.to_string().unwrap()),
233+
avg_size: columns_statistics.average_size(column_id),
234+
histogram,
235+
})
236+
}
237+
// add virtual column statistics
238+
let table_info = table.get_table_info();
239+
if let Some(virtual_schema) = &table_info.meta.virtual_schema {
240+
for virtual_field in virtual_schema.fields() {
241+
if let (Ok(source_field), Some(column_statistics)) = (
242+
schema.field_of_column_id(virtual_field.source_column_id),
243+
columns_statistics.column_statistics(virtual_field.column_id),
167244
) {
168-
table
169-
.column_statistics_provider(ctx.clone())
170-
.await
171-
.unwrap_or_else(|e| {
172-
let msg = format!(
173-
"Collect {}.{}.{} column statistics with error: {}",
174-
catalog.name(),
175-
database,
176-
table.name(),
177-
e
178-
);
179-
warn!("{}", msg);
180-
ctx.push_warning(msg);
181-
Box::new(DummyColumnStatisticsProvider)
182-
})
183-
} else {
184-
Box::new(DummyColumnStatisticsProvider)
185-
};
186-
let stats_row_count = columns_statistics.stats_num_rows();
187-
let actual_row_count = columns_statistics.num_rows();
188-
for field in schema.fields() {
189-
let column_id = field.column_id;
190-
let column_statistics = columns_statistics.column_statistics(column_id);
191-
let his_info = columns_statistics.histogram(column_id);
192-
let histogram = if let Some(his_info) = his_info {
193-
let mut his_infos = vec![];
194-
for (i, bucket) in his_info.buckets.iter().enumerate() {
195-
let min = bucket.lower_bound().to_string()?;
196-
let max = bucket.upper_bound().to_string()?;
197-
let ndv = bucket.num_distinct();
198-
let count = bucket.num_values();
199-
let his_info = format!(
200-
"[bucket id: {:?}, min: {:?}, max: {:?}, ndv: {:?}, count: {:?}]",
201-
i, min, max, ndv, count
202-
);
203-
his_infos.push(his_info);
204-
}
205-
his_infos.join(", ")
206-
} else {
207-
"".to_string()
208-
};
209-
rows.push(TableColumnStatistics {
210-
database_name: database.clone(),
245+
let column_name =
246+
format!("{}{}", source_field.name, virtual_field.name);
247+
columns.push(TableColumnStatistics {
248+
database_name: database.to_string(),
211249
table_name: table.name().into(),
212-
column_name: field.name().clone(),
250+
column_name,
213251
stats_row_count,
214252
actual_row_count,
215-
distinct_count: column_statistics.and_then(|v| v.ndv),
216-
null_count: column_statistics.map(|v| v.null_count),
253+
distinct_count: column_statistics.ndv,
254+
null_count: Some(column_statistics.null_count),
217255
min: column_statistics
218-
.and_then(|s| s.min.clone())
256+
.min
257+
.clone()
219258
.map(|v| v.to_string().unwrap()),
220259
max: column_statistics
221-
.and_then(|s| s.max.clone())
260+
.max
261+
.clone()
222262
.map(|v| v.to_string().unwrap()),
223-
avg_size: columns_statistics.average_size(column_id),
224-
histogram,
263+
avg_size: columns_statistics.average_size(virtual_field.column_id),
264+
histogram: "".to_string(),
225265
})
226266
}
227-
// add virtual column statistics
228-
let table_info = table.get_table_info();
229-
if let Some(virtual_schema) = &table_info.meta.virtual_schema {
230-
for virtual_field in virtual_schema.fields() {
231-
if let (Ok(source_field), Some(column_statistics)) = (
232-
schema.field_of_column_id(virtual_field.source_column_id),
233-
columns_statistics.column_statistics(virtual_field.column_id),
234-
) {
235-
let column_name =
236-
format!("{}{}", source_field.name, virtual_field.name);
237-
rows.push(TableColumnStatistics {
238-
database_name: database.clone(),
239-
table_name: table.name().into(),
240-
column_name,
241-
stats_row_count,
242-
actual_row_count,
243-
distinct_count: column_statistics.ndv,
244-
null_count: Some(column_statistics.null_count),
245-
min: column_statistics
246-
.min
247-
.clone()
248-
.map(|v| v.to_string().unwrap()),
249-
max: column_statistics
250-
.max
251-
.clone()
252-
.map(|v| v.to_string().unwrap()),
253-
avg_size: columns_statistics
254-
.average_size(virtual_field.column_id),
255-
histogram: "".to_string(),
256-
})
257-
}
258-
}
259-
}
260267
}
261268
}
262269
}
263270
}
264-
Ok(rows)
271+
Ok(columns)
265272
}
266273
}
267274

@@ -287,7 +294,7 @@ impl AsyncSystemTable for StatisticsTable {
287294
ctx.session_state()?,
288295
)
289296
.await?;
290-
let rows = self.dump_table_columns(ctx, push_downs, &catalog).await?;
297+
let rows = self.dump_tables_columns(ctx, push_downs, &catalog).await?;
291298

292299
let mut names = Vec::with_capacity(rows.len());
293300
let mut databases = Vec::with_capacity(rows.len());

0 commit comments

Comments
 (0)