Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/query/storages/common/table_meta/src/meta/v4/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::io::Cursor;
use std::io::Read;
use std::sync::Arc;
Expand Down Expand Up @@ -143,6 +144,8 @@ impl TableSnapshot {
return Err(ErrorCode::TransactionTimeout(err_msg));
}

ensure_segments_unique(&segments)?;

Ok(Self {
format_version: TableSnapshot::VERSION,
snapshot_id: uuid_from_date_time(snapshot_timestamp_adjusted),
Expand Down Expand Up @@ -244,8 +247,11 @@ impl TableSnapshot {
let compression = MetaCompression::try_from(r.read_scalar::<u8>()?)?;
let snapshot_size: u64 = r.read_scalar::<u64>()?;

read_and_deserialize(&mut r, snapshot_size, &encoding, &compression)
.map_err(|x| x.add_message("fail to deserialize table snapshot"))
let snapshot: TableSnapshot =
read_and_deserialize(&mut r, snapshot_size, &encoding, &compression)
.map_err(|x| x.add_message("fail to deserialize table snapshot"))?;
snapshot.ensure_segments_unique()?;
Ok(snapshot)
}

#[inline]
Expand All @@ -257,11 +263,36 @@ impl TableSnapshot {
pub fn table_statistics_location(&self) -> Option<String> {
self.table_statistics_location.clone()
}

#[inline]
pub fn ensure_segments_unique(&self) -> Result<()> {
ensure_segments_unique(&self.segments)
}
}

fn ensure_segments_unique(segments: &[Location]) -> Result<()> {
if segments.len() < 2 {
return Ok(());
}

let mut seen = HashSet::with_capacity(segments.len());
for loc in segments {
let key = loc.0.as_str();
if !seen.insert(key) {
return Err(ErrorCode::Internal(format!(
"duplicate segment location {} detected while constructing snapshot",
key
)));
}
}
Ok(())
}

// use the chain of converters, for versions before v3
impl From<v2::TableSnapshot> for TableSnapshot {
fn from(s: v2::TableSnapshot) -> Self {
ensure_segments_unique(&s.segments)
.expect("duplicate segment location found while converting snapshot from v2");
Self {
// NOTE: it is important to let the format_version return from here
// carries the format_version of snapshot being converted.
Expand All @@ -284,6 +315,8 @@ where T: Into<v3::TableSnapshot>
{
fn from(s: T) -> Self {
let s: v3::TableSnapshot = s.into();
ensure_segments_unique(&s.segments)
.expect("duplicate segment location found while converting snapshot from v3");
Self {
// NOTE: it is important to let the format_version return from here
// carries the format_version of snapshot being converted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ async fn build_update_table_meta_req(
table_meta_timestamps,
table_stats_gen,
)?;
snapshot.ensure_segments_unique()?;

// write snapshot
let dal = fuse_table.get_operator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ where F: SnapshotGenerator + Send + Sync + 'static
snapshot,
table_info,
} => {
snapshot.ensure_segments_unique()?;
let location = self
.location_gen
.snapshot_location_from_uuid(&snapshot.snapshot_id, TableSnapshot::VERSION)?;
Expand Down
118 changes: 106 additions & 12 deletions src/query/storages/fuse/src/retry/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

Expand Down Expand Up @@ -39,6 +40,8 @@ use crate::statistics::merge_statistics;
use crate::statistics::reducers::deduct_statistics;
use crate::FuseTable;

const FUSE_ENGINE: &str = "FUSE";

pub async fn commit_with_backoff(
ctx: Arc<dyn TableContext>,
mut req: UpdateMultiTableMetaReq,
Expand All @@ -47,6 +50,13 @@ pub async fn commit_with_backoff(
let mut backoff = set_backoff(None, None, None);
let mut retries = 0;

// Compute segments diff for all tables before entering the retry loop.
// This diff represents the actual changes made by the transaction (base -> txn_generated),
// and remains constant across all retries.
// Also cache the original snapshots for statistics merging.
let (table_segments_diffs, table_original_snapshots) =
compute_table_segments_diffs(ctx.clone(), &req).await?;

loop {
let ret = catalog
.retryable_update_multi_table_meta(req.clone())
Expand All @@ -63,14 +73,88 @@ pub async fn commit_with_backoff(
};
sleep(duration).await;
retries += 1;
try_rebuild_req(ctx.clone(), &mut req, update_failed_tbls).await?;
try_rebuild_req(
ctx.clone(),
&mut req,
update_failed_tbls,
&table_segments_diffs,
&table_original_snapshots,
)
.await?;
}
}

async fn compute_table_segments_diffs(
ctx: Arc<dyn TableContext>,
req: &UpdateMultiTableMetaReq,
) -> Result<(
HashMap<u64, SegmentsDiff>,
HashMap<u64, Option<Arc<TableSnapshot>>>,
)> {
let txn_mgr = ctx.txn_mgr();
let storage_class = ctx.get_settings().get_s3_storage_class()?;
let mut table_segments_diffs = HashMap::new();
let mut table_original_snapshots = HashMap::new();

for (update_table_meta_req, _) in &req.update_table_metas {
let tid = update_table_meta_req.table_id;
let engine = update_table_meta_req.new_table_meta.engine.as_str();

if engine != FUSE_ENGINE {
log::info!(
"Skipping segments diff pre-compute for table {} with engine {}",
tid,
engine
);
continue;
}

// Read the base snapshot (snapshot at transaction begin)
let base_snapshot_location = txn_mgr.lock().get_base_snapshot_location(tid);

// Read the transaction-generated snapshot (original snapshot before any merge)
let new_table = FuseTable::from_table_meta(
update_table_meta_req.table_id,
0,
update_table_meta_req.new_table_meta.clone(),
storage_class,
)?;

let base_snapshot = new_table
.read_table_snapshot_with_location(base_snapshot_location)
.await?;
let new_snapshot = new_table.read_table_snapshot().await?;

let base_segments = base_snapshot
.as_ref()
.map(|s| s.segments.as_slice())
.unwrap_or(&[]);
let new_segments = new_snapshot
.as_ref()
.map(|s| s.segments.as_slice())
.unwrap_or(&[]);

info!(
"Computing segments diff for table {} (base: {} segments, txn: {} segments)",
tid,
base_segments.len(),
new_segments.len()
);

let diff = SegmentsDiff::new(base_segments, new_segments);
table_segments_diffs.insert(tid, diff);
table_original_snapshots.insert(tid, new_snapshot);
}

Ok((table_segments_diffs, table_original_snapshots))
}

async fn try_rebuild_req(
ctx: Arc<dyn TableContext>,
req: &mut UpdateMultiTableMetaReq,
update_failed_tbls: Vec<(u64, u64, TableMeta)>,
table_segments_diffs: &HashMap<u64, SegmentsDiff>,
table_original_snapshots: &HashMap<u64, Option<Arc<TableSnapshot>>>,
) -> Result<()> {
info!(
"try_rebuild_req: update_failed_tbls={:?}",
Expand Down Expand Up @@ -98,26 +182,35 @@ async fn try_rebuild_req(
.iter_mut()
.find(|(meta, _)| meta.table_id == tid)
.unwrap();
let new_table = FuseTable::from_table_meta(
update_table_meta_req.table_id,
0,
update_table_meta_req.new_table_meta.clone(),
storage_class,
)?;
let new_snapshot = new_table.read_table_snapshot().await?;

let base_snapshot_location = txn_mgr.lock().get_base_snapshot_location(tid);
let base_snapshot = new_table
.read_table_snapshot_with_location(base_snapshot_location)
let base_snapshot = latest_table
.read_table_snapshot_with_location(base_snapshot_location.clone())
.await?;

let segments_diff = SegmentsDiff::new(base_snapshot.segments(), new_snapshot.segments());
let Some(merged_segments) = segments_diff.apply(latest_snapshot.segments().to_vec()) else {
// Get the pre-computed segments diff for this table (computed before retry loop)
let segments_diff = table_segments_diffs.get(&tid).ok_or_else(|| {
ErrorCode::Internal(format!("Missing segments diff for table {}", tid))
})?;

let Some(merged_segments) = segments_diff
.clone()
.apply(latest_snapshot.segments().to_vec())
else {
return Err(ErrorCode::UnresolvableConflict(format!(
"Unresolvable conflict detected for table {}",
tid
)));
};

// Read the original transaction-generated snapshot from cache for statistics merging
let new_snapshot = table_original_snapshots
.get(&tid)
.ok_or_else(|| {
ErrorCode::Internal(format!("Missing original snapshot for table {}", tid))
})?
.clone();

let s = merge_statistics(
new_snapshot.summary(),
&latest_snapshot.summary(),
Expand Down Expand Up @@ -214,6 +307,7 @@ async fn try_rebuild_req(
latest_snapshot.table_statistics_location(),
table_meta_timestamps,
)?;
merged_snapshot.ensure_segments_unique()?;

// write snapshot
let dal = latest_table.get_operator();
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/retry/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashSet;

use databend_storages_common_table_meta::meta::Location;

#[derive(Clone)]
pub struct SegmentsDiff {
appended: Vec<Location>,
replaced: HashMap<Location, Vec<Location>>,
Expand Down
Loading