diff --git a/crates/integrations/datafusion/src/extensions.rs b/crates/integrations/datafusion/src/extensions.rs index a161afea49..76e26af55c 100644 --- a/crates/integrations/datafusion/src/extensions.rs +++ b/crates/integrations/datafusion/src/extensions.rs @@ -4,6 +4,7 @@ use std::sync; use datafusion::arrow::datatypes; +use datafusion::error; use iceberg::table; impl crate::IcebergTableProvider { @@ -28,4 +29,16 @@ impl crate::IcebergTableProvider { catalog: Some(catalog), } } + + /// Returns table to be used in operations. + /// If the catalog implementer is provided, loads a fresh table from it, otherwise clones the inner value. + pub(crate) async fn table_to_use(&self) -> error::Result { + match self.catalog { + Some(ref catalog) => catalog + .load_table(self.table.identifier()) + .await + .map_err(|e| error::DataFusionError::Internal(e.to_string())), + None => Ok(self.table.clone()), + } + } } diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index ffe8e5df66..c5937d7fc4 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -154,7 +154,7 @@ impl TableProvider for IcebergTableProvider { _limit: Option, ) -> DFResult> { Ok(Arc::new(IcebergTableScan::new( - self.table.clone(), + self.table_to_use().await?, // This automatically updates available snapshots when starting a scan. self.snapshot_id, self.schema.clone(), projection,