From 22d9c63fdff9cefe40d2982ce4f21acfd2edfa87 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 14:57:43 +0200 Subject: [PATCH 01/24] Add test with concurrent delta append --- tests/integration/test_backends.py | 34 ++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index 6930aec2..ee35c72a 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -1,6 +1,7 @@ import pytest from databricks.labs.blueprint.commands import CommandExecutor from databricks.labs.blueprint.installation import Installation +from databricks.labs.blueprint.parallel import Threads from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2 from databricks.labs.lsql import Row @@ -186,3 +187,36 @@ def test_runtime_backend_use_statements(ws): """ result = commands.run(permission_denied_query) assert result == "PASSED" + + +def test_runtime_backend_handles_concurrent_append(sql_backend, make_schema, make_random) -> None: + + def wait_until_10s_rollover() -> None: + import math + import time + + # First figure out how long until rollover. + now = time.clock_gettime_ns(time.CLOCK_REALTIME) + target = math.ceil(now // 1e9 / 10) * 10 * 1e9 + # Sleep until just before the rollover. + nanos_until_almost_target = ((target - now) - 1e7) + if 0 < nanos_until_almost_target: + time.sleep(nanos_until_almost_target / 1e9) + # Busy-wait until the rollover occurs. + while time.clock_gettime_ns(time.CLOCK_REALTIME) < target: + pass + + schema = make_schema(name=f"lsql_{make_random(8)}") + table_full_name = f"{schema.full_name}.concurrent_append" + sql_backend.execute(f"CREATE TABLE IF NOT EXISTS {table_full_name} (x int, y float)") + sql_backend.execute( + f"INSERT INTO {table_full_name} BY NAME " + "SELECT r.id AS x, random() AS y FROM range(100000000) r" + ) + + def task() -> None: + wait_until_10s_rollover() + sql_backend.execute(f"UPDATE {table_full_name} SET y = y * 2 WHERE (x % 2 = 0)") + + + Threads.strict("concurrent appends", [task, task]) \ No newline at end of file From 1f4bb5a276c80064d74781d7a3c63bfb1f415156 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 15:07:36 +0200 Subject: [PATCH 02/24] Move roll over method out --- tests/integration/test_backends.py | 42 ++++++++++++++++++------------ 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index ee35c72a..38845e10 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -1,3 +1,6 @@ +import math +import time + import pytest from databricks.labs.blueprint.commands import CommandExecutor from databricks.labs.blueprint.installation import Installation @@ -189,23 +192,30 @@ def test_runtime_backend_use_statements(ws): assert result == "PASSED" -def test_runtime_backend_handles_concurrent_append(sql_backend, make_schema, make_random) -> None: +def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None: + """Wait until the next rollover. + + Useful to align concurrent writes. + + Args: + rollover_seconds (int) : The multiple of seconds to wait until the next rollover. + """ + nanoseconds = 1e9 + microseconds = 1e6 - def wait_until_10s_rollover() -> None: - import math - import time - - # First figure out how long until rollover. - now = time.clock_gettime_ns(time.CLOCK_REALTIME) - target = math.ceil(now // 1e9 / 10) * 10 * 1e9 - # Sleep until just before the rollover. - nanos_until_almost_target = ((target - now) - 1e7) - if 0 < nanos_until_almost_target: - time.sleep(nanos_until_almost_target / 1e9) - # Busy-wait until the rollover occurs. - while time.clock_gettime_ns(time.CLOCK_REALTIME) < target: - pass + now = time.clock_gettime_ns(time.CLOCK_REALTIME) + target = math.ceil(now / nanoseconds // rollover_seconds) * nanoseconds * rollover_seconds + # To hit the rollover more accurate, first sleep until almost target + nanoseconds_until_almost_target = ((target - now) - microseconds) + time.sleep(max(nanoseconds_until_almost_target / 1e9, 0)) + + # Then busy-wait until the rollover occurs + while time.clock_gettime_ns(time.CLOCK_REALTIME) < target: + pass + + +def test_runtime_backend_handles_concurrent_append(sql_backend, make_schema, make_random) -> None: schema = make_schema(name=f"lsql_{make_random(8)}") table_full_name = f"{schema.full_name}.concurrent_append" sql_backend.execute(f"CREATE TABLE IF NOT EXISTS {table_full_name} (x int, y float)") @@ -215,7 +225,7 @@ def wait_until_10s_rollover() -> None: ) def task() -> None: - wait_until_10s_rollover() + wait_until_seconds_rollover() sql_backend.execute(f"UPDATE {table_full_name} SET y = y * 2 WHERE (x % 2 = 0)") From 17459117dd5268e0ccab30b2a3acd25d13c3e723 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 15:08:18 +0200 Subject: [PATCH 03/24] Rename table --- tests/integration/test_backends.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index 38845e10..fe8aae8c 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -224,9 +224,9 @@ def test_runtime_backend_handles_concurrent_append(sql_backend, make_schema, mak "SELECT r.id AS x, random() AS y FROM range(100000000) r" ) - def task() -> None: + def update_table() -> None: wait_until_seconds_rollover() sql_backend.execute(f"UPDATE {table_full_name} SET y = y * 2 WHERE (x % 2 = 0)") - Threads.strict("concurrent appends", [task, task]) \ No newline at end of file + Threads.strict("concurrent appends", [update_table, update_table]) \ No newline at end of file From 92394eafb8ae70113cebbacc1ece7c2a338ac836 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 15:11:26 +0200 Subject: [PATCH 04/24] Assert the right way --- tests/integration/test_backends.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index fe8aae8c..2cb22ba5 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -6,6 +6,7 @@ from databricks.labs.blueprint.installation import Installation from databricks.labs.blueprint.parallel import Threads from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2 +from databricks.sdk.errors import BadRequest from databricks.labs.lsql import Row from databricks.labs.lsql.backends import SqlBackend, StatementExecutionBackend @@ -228,5 +229,10 @@ def update_table() -> None: wait_until_seconds_rollover() sql_backend.execute(f"UPDATE {table_full_name} SET y = y * 2 WHERE (x % 2 = 0)") - - Threads.strict("concurrent appends", [update_table, update_table]) \ No newline at end of file + try: + Threads.strict("concurrent appends", [update_table, update_table]) + except BadRequest as e: + if "[DELTA_CONCURRENT_APPEND]" in str(e): + assert False, str(e) + else: + raise # Raise in case of unexpected error \ No newline at end of file From 10102366a3ffc141dd8a993bebdf5e6dacc34ad9 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 15:12:49 +0200 Subject: [PATCH 05/24] Use test table --- tests/integration/test_backends.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index 2cb22ba5..1e42d511 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -216,18 +216,17 @@ def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None: pass -def test_runtime_backend_handles_concurrent_append(sql_backend, make_schema, make_random) -> None: - schema = make_schema(name=f"lsql_{make_random(8)}") - table_full_name = f"{schema.full_name}.concurrent_append" - sql_backend.execute(f"CREATE TABLE IF NOT EXISTS {table_full_name} (x int, y float)") +def test_runtime_backend_handles_concurrent_append(sql_backend, make_random) -> None: + table_name = f"lsql_test_{make_random(8)}" + sql_backend.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (x int, y float)") sql_backend.execute( - f"INSERT INTO {table_full_name} BY NAME " + f"INSERT INTO {table_name} BY NAME " "SELECT r.id AS x, random() AS y FROM range(100000000) r" ) def update_table() -> None: wait_until_seconds_rollover() - sql_backend.execute(f"UPDATE {table_full_name} SET y = y * 2 WHERE (x % 2 = 0)") + sql_backend.execute(f"UPDATE {table_name} SET y = y * 2 WHERE (x % 2 = 0)") try: Threads.strict("concurrent appends", [update_table, update_table]) From 65dca14ad20a40cce6666c1ca09d691af1f4d564 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 15:14:25 +0200 Subject: [PATCH 06/24] Add comment explaining rollover --- tests/integration/test_backends.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index 1e42d511..cf6ae3b3 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -225,7 +225,7 @@ def test_runtime_backend_handles_concurrent_append(sql_backend, make_random) -> ) def update_table() -> None: - wait_until_seconds_rollover() + wait_until_seconds_rollover() # Update the table at the same time sql_backend.execute(f"UPDATE {table_name} SET y = y * 2 WHERE (x % 2 = 0)") try: From bffd2d52392135a580ca6891a9609ccf5f57c8c9 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 15:36:01 +0200 Subject: [PATCH 07/24] Retry concurrent append --- src/databricks/labs/lsql/core.py | 10 +++++++++- tests/integration/test_backends.py | 9 +++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/lsql/core.py b/src/databricks/labs/lsql/core.py index 856315b1..27799038 100644 --- a/src/databricks/labs/lsql/core.py +++ b/src/databricks/labs/lsql/core.py @@ -13,7 +13,8 @@ import requests import sqlglot from databricks.sdk import WorkspaceClient, errors -from databricks.sdk.errors import DataLoss, NotFound +from databricks.sdk.errors import BadRequest, DataLoss, NotFound +from databricks.sdk.retries import retried from databricks.sdk.service.sql import ( ColumnInfoTypeName, Disposition, @@ -119,6 +120,12 @@ def __repr__(self): return f"Row({', '.join(f'{k}={v!r}' for (k, v) in zip(self.__columns__, self, strict=True))})" +def _is_retryable_delta_concurrent_append(e: BaseException) -> str: + """Retry a concurrent append to a delta table""" + if isinstance(e, BadRequest) and "DELTA_CONCURRENT_APPEND" in str(e): + return "Concurrent append" + + class StatementExecutionExt: """Execute SQL statements in a stateless manner. @@ -182,6 +189,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument ColumnInfoTypeName.TIMESTAMP: self._parse_timestamp, } + @retried(is_retryable=_is_retryable_delta_concurrent_append, timeout=timedelta(seconds=10)) def execute( self, statement: str, diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index cf6ae3b3..f0556577 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -208,7 +208,7 @@ def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None: target = math.ceil(now / nanoseconds // rollover_seconds) * nanoseconds * rollover_seconds # To hit the rollover more accurate, first sleep until almost target - nanoseconds_until_almost_target = ((target - now) - microseconds) + nanoseconds_until_almost_target = (target - now) - microseconds time.sleep(max(nanoseconds_until_almost_target / 1e9, 0)) # Then busy-wait until the rollover occurs @@ -219,10 +219,7 @@ def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None: def test_runtime_backend_handles_concurrent_append(sql_backend, make_random) -> None: table_name = f"lsql_test_{make_random(8)}" sql_backend.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (x int, y float)") - sql_backend.execute( - f"INSERT INTO {table_name} BY NAME " - "SELECT r.id AS x, random() AS y FROM range(100000000) r" - ) + sql_backend.execute(f"INSERT INTO {table_name} BY NAME " "SELECT r.id AS x, random() AS y FROM range(100000000) r") def update_table() -> None: wait_until_seconds_rollover() # Update the table at the same time @@ -234,4 +231,4 @@ def update_table() -> None: if "[DELTA_CONCURRENT_APPEND]" in str(e): assert False, str(e) else: - raise # Raise in case of unexpected error \ No newline at end of file + raise # Raise in case of unexpected error From 1f0b0a89da98c2f0ae3e5f8961293d4a9b26c203 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 15:55:29 +0200 Subject: [PATCH 08/24] Fix string concat --- tests/integration/test_backends.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index f0556577..03ba4d05 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -219,7 +219,7 @@ def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None: def test_runtime_backend_handles_concurrent_append(sql_backend, make_random) -> None: table_name = f"lsql_test_{make_random(8)}" sql_backend.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (x int, y float)") - sql_backend.execute(f"INSERT INTO {table_name} BY NAME " "SELECT r.id AS x, random() AS y FROM range(100000000) r") + sql_backend.execute(f"INSERT INTO {table_name} BY NAME SELECT r.id AS x, random() AS y FROM range(100000000) r") def update_table() -> None: wait_until_seconds_rollover() # Update the table at the same time From 44513b344ec3521ee4cf2d5f92ab9d930c327166 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 15:56:19 +0200 Subject: [PATCH 09/24] Fix return type hint of retryable --- src/databricks/labs/lsql/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/lsql/core.py b/src/databricks/labs/lsql/core.py index 27799038..8217f574 100644 --- a/src/databricks/labs/lsql/core.py +++ b/src/databricks/labs/lsql/core.py @@ -120,10 +120,11 @@ def __repr__(self): return f"Row({', '.join(f'{k}={v!r}' for (k, v) in zip(self.__columns__, self, strict=True))})" -def _is_retryable_delta_concurrent_append(e: BaseException) -> str: +def _is_retryable_delta_concurrent_append(e: BaseException) -> str | None: """Retry a concurrent append to a delta table""" if isinstance(e, BadRequest) and "DELTA_CONCURRENT_APPEND" in str(e): return "Concurrent append" + return None class StatementExecutionExt: From 75050ad11cd3492f3b965f12199154cfe519fdc1 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 15:59:01 +0200 Subject: [PATCH 10/24] Rename variables --- tests/integration/test_backends.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index 03ba4d05..34a60b76 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -201,18 +201,17 @@ def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None: Args: rollover_seconds (int) : The multiple of seconds to wait until the next rollover. """ - nanoseconds = 1e9 - microseconds = 1e6 + nano, micro = 1e9, 1e6 - now = time.clock_gettime_ns(time.CLOCK_REALTIME) - target = math.ceil(now / nanoseconds // rollover_seconds) * nanoseconds * rollover_seconds + nanoseconds_now = time.clock_gettime_ns(time.CLOCK_REALTIME) + nanoseconds_target = math.ceil(nanoseconds_now / nano // rollover_seconds) * nano * rollover_seconds # To hit the rollover more accurate, first sleep until almost target - nanoseconds_until_almost_target = (target - now) - microseconds - time.sleep(max(nanoseconds_until_almost_target / 1e9, 0)) + nanoseconds_until_almost_target = (nanoseconds_target - nanoseconds_now) - micro + time.sleep(max(nanoseconds_until_almost_target / nano, 0)) # Then busy-wait until the rollover occurs - while time.clock_gettime_ns(time.CLOCK_REALTIME) < target: + while time.clock_gettime_ns(time.CLOCK_REALTIME) < nanoseconds_target: pass From ac3e37cb1764476328623f7d80e6800b12b16e56 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 16:41:09 +0200 Subject: [PATCH 11/24] Remove if delta missing raise as data loss --- src/databricks/labs/lsql/core.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/databricks/labs/lsql/core.py b/src/databricks/labs/lsql/core.py index 8217f574..ef365e02 100644 --- a/src/databricks/labs/lsql/core.py +++ b/src/databricks/labs/lsql/core.py @@ -476,8 +476,6 @@ def _raise_if_needed(status: StatementStatus): raise NotFound(error_message) if "does not exist" in error_message: raise NotFound(error_message) - if "DELTA_MISSING_TRANSACTION_LOG" in error_message: - raise DataLoss(error_message) mapping = { ServiceErrorCode.ABORTED: errors.Aborted, ServiceErrorCode.ALREADY_EXISTS: errors.AlreadyExists, From 3a7d5b3447a41acb7c3a5f69748911feff2d6907 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 16:41:18 +0200 Subject: [PATCH 12/24] Simplify create table --- tests/integration/test_backends.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index 34a60b76..66aee38e 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -217,8 +217,7 @@ def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None: def test_runtime_backend_handles_concurrent_append(sql_backend, make_random) -> None: table_name = f"lsql_test_{make_random(8)}" - sql_backend.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (x int, y float)") - sql_backend.execute(f"INSERT INTO {table_name} BY NAME SELECT r.id AS x, random() AS y FROM range(100000000) r") + sql_backend.execute(f"CREATE TABLE {table_name} AS (SELECT r.id AS x, random() AS y FROM range(1000000) r)") def update_table() -> None: wait_until_seconds_rollover() # Update the table at the same time From c83f117bce3c30e60a81c2005ea98cfe850b7798 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Mon, 30 Sep 2024 16:44:41 +0200 Subject: [PATCH 13/24] Use `make_table` fixture --- tests/integration/test_backends.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index 66aee38e..70b5f700 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -215,13 +215,15 @@ def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None: pass -def test_runtime_backend_handles_concurrent_append(sql_backend, make_random) -> None: - table_name = f"lsql_test_{make_random(8)}" - sql_backend.execute(f"CREATE TABLE {table_name} AS (SELECT r.id AS x, random() AS y FROM range(1000000) r)") +def test_runtime_backend_handles_concurrent_append(sql_backend, make_random, make_table) -> None: + table = make_table( + name=f"lsql_test_{make_random()}", + ctas="SELECT r.id AS x, random() AS y FROM range(1000000) r" + ) def update_table() -> None: wait_until_seconds_rollover() # Update the table at the same time - sql_backend.execute(f"UPDATE {table_name} SET y = y * 2 WHERE (x % 2 = 0)") + sql_backend.execute(f"UPDATE {table.full_name} SET y = y * 2 WHERE (x % 2 = 0)") try: Threads.strict("concurrent appends", [update_table, update_table]) From 4844218345b4bde44147d81a3edb6291a1114643 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 08:38:17 +0200 Subject: [PATCH 14/24] Remove wait until roll over --- tests/integration/test_backends.py | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index 70b5f700..8459d331 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -193,28 +193,6 @@ def test_runtime_backend_use_statements(ws): assert result == "PASSED" -def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None: - """Wait until the next rollover. - - Useful to align concurrent writes. - - Args: - rollover_seconds (int) : The multiple of seconds to wait until the next rollover. - """ - nano, micro = 1e9, 1e6 - - nanoseconds_now = time.clock_gettime_ns(time.CLOCK_REALTIME) - nanoseconds_target = math.ceil(nanoseconds_now / nano // rollover_seconds) * nano * rollover_seconds - - # To hit the rollover more accurate, first sleep until almost target - nanoseconds_until_almost_target = (nanoseconds_target - nanoseconds_now) - micro - time.sleep(max(nanoseconds_until_almost_target / nano, 0)) - - # Then busy-wait until the rollover occurs - while time.clock_gettime_ns(time.CLOCK_REALTIME) < nanoseconds_target: - pass - - def test_runtime_backend_handles_concurrent_append(sql_backend, make_random, make_table) -> None: table = make_table( name=f"lsql_test_{make_random()}", @@ -222,7 +200,6 @@ def test_runtime_backend_handles_concurrent_append(sql_backend, make_random, mak ) def update_table() -> None: - wait_until_seconds_rollover() # Update the table at the same time sql_backend.execute(f"UPDATE {table.full_name} SET y = y * 2 WHERE (x % 2 = 0)") try: From f5e4db0b4da940d6fd86cfd2c586141e027e5d2c Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 08:54:45 +0200 Subject: [PATCH 15/24] Remove unused import --- src/databricks/labs/lsql/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/lsql/core.py b/src/databricks/labs/lsql/core.py index ef365e02..313bd9f4 100644 --- a/src/databricks/labs/lsql/core.py +++ b/src/databricks/labs/lsql/core.py @@ -13,7 +13,7 @@ import requests import sqlglot from databricks.sdk import WorkspaceClient, errors -from databricks.sdk.errors import BadRequest, DataLoss, NotFound +from databricks.sdk.errors import BadRequest, NotFound from databricks.sdk.retries import retried from databricks.sdk.service.sql import ( ColumnInfoTypeName, From 3f1c0043e0c98c12c3caa80ca759e2ebaac5df4d Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 08:56:15 +0200 Subject: [PATCH 16/24] Move integration test to the appropriate module --- tests/integration/test_backends.py | 23 ----------------------- tests/integration/test_core.py | 17 +++++++++++++++++ 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index 8459d331..6930aec2 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -1,12 +1,7 @@ -import math -import time - import pytest from databricks.labs.blueprint.commands import CommandExecutor from databricks.labs.blueprint.installation import Installation -from databricks.labs.blueprint.parallel import Threads from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2 -from databricks.sdk.errors import BadRequest from databricks.labs.lsql import Row from databricks.labs.lsql.backends import SqlBackend, StatementExecutionBackend @@ -191,21 +186,3 @@ def test_runtime_backend_use_statements(ws): """ result = commands.run(permission_denied_query) assert result == "PASSED" - - -def test_runtime_backend_handles_concurrent_append(sql_backend, make_random, make_table) -> None: - table = make_table( - name=f"lsql_test_{make_random()}", - ctas="SELECT r.id AS x, random() AS y FROM range(1000000) r" - ) - - def update_table() -> None: - sql_backend.execute(f"UPDATE {table.full_name} SET y = y * 2 WHERE (x % 2 = 0)") - - try: - Threads.strict("concurrent appends", [update_table, update_table]) - except BadRequest as e: - if "[DELTA_CONCURRENT_APPEND]" in str(e): - assert False, str(e) - else: - raise # Raise in case of unexpected error diff --git a/tests/integration/test_core.py b/tests/integration/test_core.py index 375a03b7..be8f15d3 100644 --- a/tests/integration/test_core.py +++ b/tests/integration/test_core.py @@ -1,6 +1,8 @@ import logging import pytest +from databricks.labs.blueprint.parallel import Threads +from databricks.sdk.errors import BadRequest from databricks.sdk.service.sql import Disposition from databricks.labs.lsql.core import Row, StatementExecutionExt @@ -83,3 +85,18 @@ def test_fetch_value(ws): see = StatementExecutionExt(ws) count = see.fetch_value("SELECT COUNT(*) FROM samples.nyctaxi.trips") assert count == 21932 + + +def test_runtime_backend_handles_concurrent_append(sql_backend, make_random, make_table) -> None: + table = make_table(name=f"lsql_test_{make_random()}", ctas="SELECT r.id AS x, random() AS y FROM range(1000000) r") + + def update_table() -> None: + sql_backend.execute(f"UPDATE {table.full_name} SET y = y * 2 WHERE (x % 2 = 0)") + + try: + Threads.strict("concurrent appends", [update_table, update_table]) + except BadRequest as e: + if "[DELTA_CONCURRENT_APPEND]" in str(e): + assert False, str(e) + else: + raise # Raise in case of unexpected error From ae1ee5bbf799d6816f02a89f3a3b1a76995f9471 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 08:58:40 +0200 Subject: [PATCH 17/24] Put back raise error for missing delta transaction log --- src/databricks/labs/lsql/core.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/lsql/core.py b/src/databricks/labs/lsql/core.py index 313bd9f4..8217f574 100644 --- a/src/databricks/labs/lsql/core.py +++ b/src/databricks/labs/lsql/core.py @@ -13,7 +13,7 @@ import requests import sqlglot from databricks.sdk import WorkspaceClient, errors -from databricks.sdk.errors import BadRequest, NotFound +from databricks.sdk.errors import BadRequest, DataLoss, NotFound from databricks.sdk.retries import retried from databricks.sdk.service.sql import ( ColumnInfoTypeName, @@ -476,6 +476,8 @@ def _raise_if_needed(status: StatementStatus): raise NotFound(error_message) if "does not exist" in error_message: raise NotFound(error_message) + if "DELTA_MISSING_TRANSACTION_LOG" in error_message: + raise DataLoss(error_message) mapping = { ServiceErrorCode.ABORTED: errors.Aborted, ServiceErrorCode.ALREADY_EXISTS: errors.AlreadyExists, From 7c0d4f4add129b562f8f378812ea0fda13d00808 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 09:10:06 +0200 Subject: [PATCH 18/24] Introduce custom `DeltaConcurrentAppend` error --- src/databricks/labs/lsql/backends.py | 4 +++- src/databricks/labs/lsql/core.py | 13 ++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/databricks/labs/lsql/backends.py b/src/databricks/labs/lsql/backends.py index 52378224..ef1e6066 100644 --- a/src/databricks/labs/lsql/backends.py +++ b/src/databricks/labs/lsql/backends.py @@ -21,7 +21,7 @@ from databricks.sdk.retries import retried from databricks.sdk.service.compute import Language -from databricks.labs.lsql.core import Row, StatementExecutionExt +from databricks.labs.lsql.core import DeltaConcurrentAppend, Row, StatementExecutionExt from databricks.labs.lsql.structs import StructInference logger = logging.getLogger(__name__) @@ -117,6 +117,8 @@ def _api_error_from_message(error_message: str) -> DatabricksError: return BadRequest(error_message) if "Operation not allowed" in error_message: return PermissionDenied(error_message) + if "DELTA_CONCURRENT_APPEND" in error_message: + return DeltaConcurrentAppend(error_message) return Unknown(error_message) diff --git a/src/databricks/labs/lsql/core.py b/src/databricks/labs/lsql/core.py index 8217f574..2ac80df6 100644 --- a/src/databricks/labs/lsql/core.py +++ b/src/databricks/labs/lsql/core.py @@ -13,7 +13,7 @@ import requests import sqlglot from databricks.sdk import WorkspaceClient, errors -from databricks.sdk.errors import BadRequest, DataLoss, NotFound +from databricks.sdk.errors import BadRequest, DatabricksError, DataLoss, NotFound from databricks.sdk.retries import retried from databricks.sdk.service.sql import ( ColumnInfoTypeName, @@ -120,11 +120,8 @@ def __repr__(self): return f"Row({', '.join(f'{k}={v!r}' for (k, v) in zip(self.__columns__, self, strict=True))})" -def _is_retryable_delta_concurrent_append(e: BaseException) -> str | None: - """Retry a concurrent append to a delta table""" - if isinstance(e, BadRequest) and "DELTA_CONCURRENT_APPEND" in str(e): - return "Concurrent append" - return None +class DeltaConcurrentAppend(DatabricksError): + """Error raised when appending concurrent to a Delta table.""" class StatementExecutionExt: @@ -190,7 +187,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument ColumnInfoTypeName.TIMESTAMP: self._parse_timestamp, } - @retried(is_retryable=_is_retryable_delta_concurrent_append, timeout=timedelta(seconds=10)) + @retried(on=[DeltaConcurrentAppend], timeout=timedelta(seconds=10)) def execute( self, statement: str, @@ -478,6 +475,8 @@ def _raise_if_needed(status: StatementStatus): raise NotFound(error_message) if "DELTA_MISSING_TRANSACTION_LOG" in error_message: raise DataLoss(error_message) + if "DELTA_CONCURRENT_APPEND" in error_message: + raise DeltaConcurrentAppend(error_message) mapping = { ServiceErrorCode.ABORTED: errors.Aborted, ServiceErrorCode.ALREADY_EXISTS: errors.AlreadyExists, From c31b679b3ca1af65a8e6d1abb4f5562f108c8075 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 09:12:47 +0200 Subject: [PATCH 19/24] Unit test `DeltaConcurrentAppend` error on statement execution backend --- tests/integration/test_core.py | 2 +- tests/unit/test_core.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_core.py b/tests/integration/test_core.py index be8f15d3..865d9364 100644 --- a/tests/integration/test_core.py +++ b/tests/integration/test_core.py @@ -96,7 +96,7 @@ def update_table() -> None: try: Threads.strict("concurrent appends", [update_table, update_table]) except BadRequest as e: - if "[DELTA_CONCURRENT_APPEND]" in str(e): + if "DELTA_CONCURRENT_APPEND" in str(e): assert False, str(e) else: raise # Raise in case of unexpected error diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index 18e93549..c3de67dc 100644 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -22,7 +22,7 @@ timedelta, ) -from databricks.labs.lsql.core import Row, StatementExecutionExt +from databricks.labs.lsql.core import DeltaConcurrentAppend, Row, StatementExecutionExt @pytest.mark.parametrize( @@ -196,6 +196,7 @@ def test_execute_poll_succeeds(): (ServiceError(message="... DELTA_TABLE_NOT_FOUND ..."), errors.NotFound), (ServiceError(message="... DELTA_TABLE_NOT_FOUND ..."), errors.NotFound), (ServiceError(message="... DELTA_MISSING_TRANSACTION_LOG ..."), errors.DataLoss), + (ServiceError(message="... DELTA_CONCURRENT_APPEND ..."), DeltaConcurrentAppend), ], ) def test_execute_fails(status_error, platform_error_type): From 664796bf73fbf3f9b93e1350fa12cd01659ce486 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 09:15:40 +0200 Subject: [PATCH 20/24] Narrow test --- tests/unit/test_core.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index c3de67dc..a11eb593 100644 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -199,17 +199,15 @@ def test_execute_poll_succeeds(): (ServiceError(message="... DELTA_CONCURRENT_APPEND ..."), DeltaConcurrentAppend), ], ) -def test_execute_fails(status_error, platform_error_type): +def test_execute_fails(status_error, platform_error_type) -> None: ws = create_autospec(WorkspaceClient) - ws.statement_execution.execute_statement.return_value = StatementResponse( status=StatementStatus(state=StatementState.FAILED, error=status_error), statement_id="bcd", ) - see = StatementExecutionExt(ws, warehouse_id="abc") - with pytest.raises(platform_error_type): + with pytest.raises(platform_error_type, match=status_error.message if status_error is not None else None): see.execute("SELECT 2+2") From 4f944ade35c69e9ba9a4ecb61c6ee5d4a1da1cfe Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 09:19:38 +0200 Subject: [PATCH 21/24] Test `DeltaConcurrentAppend` error on `RuntimeBackend` --- tests/unit/test_backends.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/test_backends.py b/tests/unit/test_backends.py index 71db5207..753ff5fa 100644 --- a/tests/unit/test_backends.py +++ b/tests/unit/test_backends.py @@ -32,6 +32,7 @@ RuntimeBackend, StatementExecutionBackend, ) +from databricks.labs.lsql.core import DeltaConcurrentAppend # pylint: disable=protected-access @@ -364,6 +365,7 @@ def test_save_table_with_not_null_constraint_violated(): ("PARSE_SYNTAX_ERROR foo", BadRequest), ("foo Operation not allowed", PermissionDenied), ("foo error failure", Unknown), + ("[DELTA_CONCURRENT_APPEND] ConcurrentAppendException: Files were added ...", DeltaConcurrentAppend) ], ) def test_runtime_backend_error_mapping_similar_to_statement_execution(msg, err_t): From 79cb07a9d5166bdfc79076852f90e438a8aa528e Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 09:21:09 +0200 Subject: [PATCH 22/24] Narrow tests --- tests/unit/test_backends.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_backends.py b/tests/unit/test_backends.py index 753ff5fa..9d6c9557 100644 --- a/tests/unit/test_backends.py +++ b/tests/unit/test_backends.py @@ -1,5 +1,6 @@ import datetime import os +import re import sys from dataclasses import dataclass from unittest import mock @@ -368,7 +369,7 @@ def test_save_table_with_not_null_constraint_violated(): ("[DELTA_CONCURRENT_APPEND] ConcurrentAppendException: Files were added ...", DeltaConcurrentAppend) ], ) -def test_runtime_backend_error_mapping_similar_to_statement_execution(msg, err_t): +def test_runtime_backend_error_mapping_similar_to_statement_execution(msg, err_t) -> None: with mock.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "14.0"}): pyspark_sql_session = MagicMock() sys.modules["pyspark.sql.session"] = pyspark_sql_session @@ -378,10 +379,10 @@ def test_runtime_backend_error_mapping_similar_to_statement_execution(msg, err_t runtime_backend = RuntimeBackend() - with pytest.raises(err_t): + with pytest.raises(err_t, match=re.escape(msg)): runtime_backend.execute("SELECT * from bar") - with pytest.raises(err_t): + with pytest.raises(err_t, match=re.escape(msg)): list(runtime_backend.fetch("SELECT * from bar")) From b2f2c8cfa138912650b98d4aee4a1ece174213dc Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 10:06:52 +0200 Subject: [PATCH 23/24] Format --- tests/unit/test_backends.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_backends.py b/tests/unit/test_backends.py index 9d6c9557..04700be0 100644 --- a/tests/unit/test_backends.py +++ b/tests/unit/test_backends.py @@ -366,7 +366,7 @@ def test_save_table_with_not_null_constraint_violated(): ("PARSE_SYNTAX_ERROR foo", BadRequest), ("foo Operation not allowed", PermissionDenied), ("foo error failure", Unknown), - ("[DELTA_CONCURRENT_APPEND] ConcurrentAppendException: Files were added ...", DeltaConcurrentAppend) + ("[DELTA_CONCURRENT_APPEND] ConcurrentAppendException: Files were added ...", DeltaConcurrentAppend), ], ) def test_runtime_backend_error_mapping_similar_to_statement_execution(msg, err_t) -> None: From 7f3d72e0df5670426c3c4b453ce593112f9aa833 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 10:36:27 +0200 Subject: [PATCH 24/24] Add integration test for concurrent write through runtime backend --- tests/integration/test_backends.py | 56 +++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index 6930aec2..ad853c8c 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -1,7 +1,10 @@ import pytest from databricks.labs.blueprint.commands import CommandExecutor from databricks.labs.blueprint.installation import Installation +from databricks.labs.blueprint.parallel import Threads from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2 +from databricks.sdk.errors import BadRequest +from databricks.sdk.service import compute from databricks.labs.lsql import Row from databricks.labs.lsql.backends import SqlBackend, StatementExecutionBackend @@ -74,7 +77,6 @@ return "PASSED" """ - UNKNOWN_ERROR = """ from databricks.labs.lsql.backends import RuntimeBackend from databricks.sdk.errors import Unknown @@ -86,6 +88,37 @@ return "PASSED" """ +CONCURRENT_APPEND = ''' +import math +import time + + +def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None: + """Wait until the next rollover. + + Useful to align concurrent writes. + + Args: + rollover_seconds (int) : The multiple of seconds to wait until the next rollover. + """ + nano, micro = 1e9, 1e6 + + nanoseconds_now = time.clock_gettime_ns(time.CLOCK_BOOTTIME) + nanoseconds_target = math.ceil(nanoseconds_now / nano // rollover_seconds) * nano * rollover_seconds + + # To hit the rollover more accurate, first sleep until almost target + nanoseconds_until_almost_target = (nanoseconds_target - nanoseconds_now) - micro + time.sleep(max(nanoseconds_until_almost_target / nano, 0)) + + # Then busy-wait until the rollover occurs + while time.clock_gettime_ns(time.CLOCK_BOOTTIME) < nanoseconds_target: + pass + + +wait_until_seconds_rollover() +spark.sql("UPDATE {table_full_name} SET y = y * 2 WHERE (x % 2 = 0)") +''' + @pytest.mark.xfail def test_runtime_backend_works_maps_permission_denied(ws): @@ -139,6 +172,27 @@ def test_runtime_backend_errors_handled(ws, query): assert result == "PASSED" +def test_runtime_backend_handles_concurrent_append(ws, make_random, make_table) -> None: + commands = CommandExecutor( + ws.clusters, + ws.command_execution, + lambda: ws.config.cluster_id, + language=compute.Language.PYTHON, + ) + table = make_table(name=f"lsql_test_{make_random()}", ctas="SELECT r.id AS x, random() AS y FROM range(1000000) r") + + def update_table() -> None: + commands.run(CONCURRENT_APPEND.format(table_full_name=table.full_name)) + + try: + Threads.strict("concurrent appends", [update_table, update_table]) + except BadRequest as e: + if "DELTA_CONCURRENT_APPEND" in str(e): + assert False, str(e) + else: + raise # Raise in case of unexpected error + + def test_statement_execution_backend_works(ws, env_or_skip): sql_backend = StatementExecutionBackend(ws, env_or_skip("TEST_DEFAULT_WAREHOUSE_ID")) rows = list(sql_backend.fetch("SELECT * FROM samples.nyctaxi.trips LIMIT 10"))