Skip to content

Commit d41c3e0

Browse files
authored
Fix: Disable WAP support by default for Spark and Iceberg (#5415)
1 parent 50aee2c commit d41c3e0

File tree

5 files changed

+38
-15
lines changed

5 files changed

+38
-15
lines changed

sqlmesh/core/config/connection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,6 +1755,7 @@ class SparkConnectionConfig(ConnectionConfig):
17551755
config_dir: t.Optional[str] = None
17561756
catalog: t.Optional[str] = None
17571757
config: t.Dict[str, t.Any] = {}
1758+
wap_enabled: bool = False
17581759

17591760
concurrent_tasks: int = 4
17601761
register_comments: bool = True
@@ -1801,6 +1802,10 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
18011802
.getOrCreate(),
18021803
}
18031804

1805+
@property
1806+
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
1807+
return {"wap_enabled": self.wap_enabled}
1808+
18041809

18051810
class TrinoAuthenticationMethod(str, Enum):
18061811
NO_AUTH = "no-auth"

sqlmesh/core/engine_adapter/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2357,6 +2357,11 @@ def fetch_pyspark_df(
23572357
"""Fetches a PySpark DataFrame from the cursor"""
23582358
raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}")
23592359

2360+
@property
2361+
def wap_enabled(self) -> bool:
2362+
"""Returns whether WAP is enabled for this engine."""
2363+
return self._extra_config.get("wap_enabled", False)
2364+
23602365
def wap_supported(self, table_name: TableName) -> bool:
23612366
"""Returns whether WAP for the target table is supported."""
23622367
return False

sqlmesh/core/engine_adapter/spark.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -457,12 +457,14 @@ def _create_table(
457457
if wap_id.startswith(f"{self.BRANCH_PREFIX}{self.WAP_PREFIX}"):
458458
table_name.set("this", table_name.this.this)
459459

460-
wap_supported = (
461-
kwargs.get("storage_format") or ""
462-
).lower() == "iceberg" or self.wap_supported(table_name)
463-
do_dummy_insert = (
464-
False if not wap_supported or not exists else not self.table_exists(table_name)
465-
)
460+
do_dummy_insert = False
461+
if self.wap_enabled:
462+
wap_supported = (
463+
kwargs.get("storage_format") or ""
464+
).lower() == "iceberg" or self.wap_supported(table_name)
465+
do_dummy_insert = (
466+
False if not wap_supported or not exists else not self.table_exists(table_name)
467+
)
466468
super()._create_table(
467469
table_name_or_schema,
468470
expression,

sqlmesh/core/snapshot/evaluator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,7 @@ def _evaluate_snapshot(
793793
if (
794794
snapshot.is_materialized
795795
and target_table_exists
796+
and adapter.wap_enabled
796797
and (model.wap_supported or adapter.wap_supported(target_table_name))
797798
):
798799
wap_id = random_id()[0:8]

tests/core/engine_adapter/test_spark.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,15 @@ def test_create_table_properties(make_mocked_engine_adapter: t.Callable):
6666
)
6767

6868

69+
@pytest.mark.parametrize("wap_enabled", [True, False])
6970
def test_replace_query_table_properties_not_exists(
70-
mocker: MockerFixture, make_mocked_engine_adapter: t.Callable
71+
mocker: MockerFixture, make_mocked_engine_adapter: t.Callable, wap_enabled: bool
7172
):
7273
mocker.patch(
7374
"sqlmesh.core.engine_adapter.spark.SparkEngineAdapter.table_exists",
7475
return_value=False,
7576
)
76-
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
77+
adapter = make_mocked_engine_adapter(SparkEngineAdapter, wap_enabled=wap_enabled)
7778

7879
columns_to_types = {
7980
"cola": exp.DataType.build("INT"),
@@ -89,10 +90,13 @@ def test_replace_query_table_properties_not_exists(
8990
table_properties={"a": exp.convert(1)},
9091
)
9192

92-
assert to_sql_calls(adapter) == [
93+
expected_sql_calls = [
9394
"CREATE TABLE IF NOT EXISTS `test_table` USING ICEBERG PARTITIONED BY (`colb`) TBLPROPERTIES ('a'=1) AS SELECT CAST(`cola` AS INT) AS `cola`, CAST(`colb` AS STRING) AS `colb`, CAST(`colc` AS STRING) AS `colc` FROM (SELECT 1 AS `cola`, '2' AS `colb`, '3' AS `colc`) AS `_subquery`",
94-
"INSERT INTO `test_table` SELECT * FROM `test_table`",
9595
]
96+
if wap_enabled:
97+
expected_sql_calls.append("INSERT INTO `test_table` SELECT * FROM `test_table`")
98+
99+
assert to_sql_calls(adapter) == expected_sql_calls
96100

97101

98102
def test_replace_query_table_properties_exists(
@@ -825,13 +829,16 @@ def test_wap_publish(make_mocked_engine_adapter: t.Callable, mocker: MockerFixtu
825829
)
826830

827831

828-
def test_create_table_iceberg(mocker: MockerFixture, make_mocked_engine_adapter: t.Callable):
832+
@pytest.mark.parametrize("wap_enabled", [True, False])
833+
def test_create_table_iceberg(
834+
mocker: MockerFixture, make_mocked_engine_adapter: t.Callable, wap_enabled: bool
835+
):
829836
mocker.patch(
830837
"sqlmesh.core.engine_adapter.spark.SparkEngineAdapter.table_exists",
831838
return_value=False,
832839
)
833840

834-
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
841+
adapter = make_mocked_engine_adapter(SparkEngineAdapter, wap_enabled=wap_enabled)
835842

836843
columns_to_types = {
837844
"cola": exp.DataType.build("INT"),
@@ -846,10 +853,13 @@ def test_create_table_iceberg(mocker: MockerFixture, make_mocked_engine_adapter:
846853
storage_format="ICEBERG",
847854
)
848855

849-
assert to_sql_calls(adapter) == [
856+
expected_sql_calls = [
850857
"CREATE TABLE IF NOT EXISTS `test_table` (`cola` INT, `colb` STRING, `colc` STRING) USING ICEBERG PARTITIONED BY (`colb`)",
851-
"INSERT INTO `test_table` SELECT * FROM `test_table`",
852858
]
859+
if wap_enabled:
860+
expected_sql_calls.append("INSERT INTO `test_table` SELECT * FROM `test_table`")
861+
862+
assert to_sql_calls(adapter) == expected_sql_calls
853863

854864

855865
def test_comments_hive(mocker: MockerFixture, make_mocked_engine_adapter: t.Callable):
@@ -973,7 +983,7 @@ def test_create_table_with_wap(make_mocked_engine_adapter: t.Callable, mocker: M
973983
"sqlmesh.core.engine_adapter.spark.SparkEngineAdapter.table_exists",
974984
return_value=False,
975985
)
976-
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
986+
adapter = make_mocked_engine_adapter(SparkEngineAdapter, wap_enabled=True)
977987

978988
adapter.create_table(
979989
"catalog.schema.table.branch_wap_12345",

0 commit comments

Comments
 (0)