Skip to content

Commit 1b8e47c

Browse files
committed
added e2e test to verify circuit breaker
Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com>
1 parent d9e7c89 commit 1b8e47c

File tree

2 files changed

+351
-0
lines changed

2 files changed

+351
-0
lines changed

src/databricks/sql/utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -922,4 +922,7 @@ def build_client_context(server_hostname: str, version: str, **kwargs):
922922
proxy_auth_method=kwargs.get("_proxy_auth_method"),
923923
pool_connections=kwargs.get("_pool_connections"),
924924
pool_maxsize=kwargs.get("_pool_maxsize"),
925+
telemetry_circuit_breaker_enabled=kwargs.get(
926+
"_telemetry_circuit_breaker_enabled"
927+
),
925928
)

tests/e2e/test_circuit_breaker.py

Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
"""
2+
E2E tests for circuit breaker functionality in telemetry.
3+
4+
This test suite verifies:
5+
1. Circuit breaker opens after rate limit failures (429/503)
6+
2. Circuit breaker blocks subsequent calls while open
7+
3. Circuit breaker transitions through states correctly
8+
4. Circuit breaker does not trigger for non-rate-limit errors
9+
5. Circuit breaker can be disabled via configuration flag
10+
6. Circuit breaker closes after reset timeout
11+
12+
Run with:
13+
pytest tests/e2e/test_circuit_breaker.py -v -s
14+
"""
15+
16+
import time
17+
from unittest.mock import patch, MagicMock
18+
19+
import pytest
20+
from pybreaker import STATE_OPEN, STATE_CLOSED, STATE_HALF_OPEN
21+
from urllib3 import HTTPResponse
22+
23+
import databricks.sql as sql
24+
from databricks.sql.telemetry.circuit_breaker_manager import CircuitBreakerManager
25+
26+
27+
@pytest.fixture(autouse=True)
28+
def aggressive_circuit_breaker_config():
29+
"""
30+
Configure circuit breaker to be aggressive for faster testing.
31+
Opens after 2 failures instead of 20, with 5 second timeout.
32+
"""
33+
from databricks.sql.telemetry import circuit_breaker_manager
34+
35+
# Store original values
36+
original_minimum_calls = circuit_breaker_manager.MINIMUM_CALLS
37+
original_reset_timeout = circuit_breaker_manager.RESET_TIMEOUT
38+
39+
# Patch with aggressive test values
40+
circuit_breaker_manager.MINIMUM_CALLS = 2
41+
circuit_breaker_manager.RESET_TIMEOUT = 5
42+
43+
# Reset all circuit breakers before test
44+
CircuitBreakerManager._instances.clear()
45+
46+
yield
47+
48+
# Cleanup: restore original values and reset breakers
49+
circuit_breaker_manager.MINIMUM_CALLS = original_minimum_calls
50+
circuit_breaker_manager.RESET_TIMEOUT = original_reset_timeout
51+
CircuitBreakerManager._instances.clear()
52+
53+
54+
class TestCircuitBreakerTelemetry:
55+
"""Tests for circuit breaker functionality with telemetry"""
56+
57+
@pytest.fixture(autouse=True)
58+
def get_details(self, connection_details):
59+
"""Get connection details from pytest fixture"""
60+
self.arguments = connection_details.copy()
61+
62+
def test_circuit_breaker_opens_after_rate_limit_errors(self):
63+
"""
64+
Verify circuit breaker opens after 429/503 errors and blocks subsequent calls.
65+
"""
66+
request_count = {"count": 0}
67+
68+
def mock_rate_limited_request(*args, **kwargs):
69+
"""Mock that returns 429 rate limit response"""
70+
request_count["count"] += 1
71+
response = MagicMock(spec=HTTPResponse)
72+
response.status = 429
73+
response.data = b"Too Many Requests"
74+
return response
75+
76+
with patch(
77+
"databricks.sql.telemetry.telemetry_push_client.TelemetryPushClient.request",
78+
side_effect=mock_rate_limited_request,
79+
):
80+
with sql.connect(
81+
server_hostname=self.arguments["host"],
82+
http_path=self.arguments["http_path"],
83+
access_token=self.arguments.get("access_token"),
84+
force_enable_telemetry=True,
85+
telemetry_batch_size=1,
86+
_telemetry_circuit_breaker_enabled=True,
87+
) as conn:
88+
circuit_breaker = CircuitBreakerManager.get_circuit_breaker(
89+
self.arguments["host"]
90+
)
91+
92+
# Initial state should be CLOSED
93+
assert circuit_breaker.current_state == STATE_CLOSED
94+
95+
cursor = conn.cursor()
96+
97+
# Execute queries to trigger telemetry failures
98+
cursor.execute("SELECT 1")
99+
cursor.fetchone()
100+
time.sleep(1)
101+
102+
cursor.execute("SELECT 2")
103+
cursor.fetchone()
104+
time.sleep(2)
105+
106+
# Circuit should now be OPEN after 2 failures
107+
assert circuit_breaker.current_state == STATE_OPEN
108+
assert circuit_breaker.fail_counter == 2
109+
110+
# Track requests before executing another query
111+
requests_before = request_count["count"]
112+
113+
# Execute another query - circuit breaker should block telemetry
114+
cursor.execute("SELECT 3")
115+
cursor.fetchone()
116+
time.sleep(1)
117+
118+
requests_after = request_count["count"]
119+
120+
# No new telemetry requests should be made (circuit is open)
121+
assert (
122+
requests_after == requests_before
123+
), "Circuit breaker should block requests while OPEN"
124+
125+
def test_circuit_breaker_does_not_trigger_for_non_rate_limit_errors(self):
126+
"""
127+
Verify circuit breaker does NOT open for errors other than 429/503.
128+
Only rate limit errors should trigger the circuit breaker.
129+
"""
130+
request_count = {"count": 0}
131+
132+
def mock_server_error_request(*args, **kwargs):
133+
"""Mock that returns 500 server error (not rate limit)"""
134+
request_count["count"] += 1
135+
response = MagicMock(spec=HTTPResponse)
136+
response.status = 500 # Server error - should NOT trigger CB
137+
response.data = b"Internal Server Error"
138+
return response
139+
140+
with patch(
141+
"databricks.sql.telemetry.telemetry_push_client.TelemetryPushClient.request",
142+
side_effect=mock_server_error_request,
143+
):
144+
with sql.connect(
145+
server_hostname=self.arguments["host"],
146+
http_path=self.arguments["http_path"],
147+
access_token=self.arguments.get("access_token"),
148+
force_enable_telemetry=True,
149+
telemetry_batch_size=1,
150+
_telemetry_circuit_breaker_enabled=True,
151+
) as conn:
152+
circuit_breaker = CircuitBreakerManager.get_circuit_breaker(
153+
self.arguments["host"]
154+
)
155+
156+
cursor = conn.cursor()
157+
158+
# Execute multiple queries with 500 errors
159+
for i in range(5):
160+
cursor.execute(f"SELECT {i}")
161+
cursor.fetchone()
162+
time.sleep(0.5)
163+
164+
# Circuit should remain CLOSED (500 errors don't trigger CB)
165+
assert (
166+
circuit_breaker.current_state == STATE_CLOSED
167+
), "Circuit should stay CLOSED for non-rate-limit errors"
168+
assert (
169+
circuit_breaker.fail_counter == 0
170+
), "Non-rate-limit errors should not increment fail counter"
171+
172+
# Requests should still go through
173+
assert request_count["count"] >= 5, "Requests should not be blocked"
174+
175+
def test_circuit_breaker_disabled_allows_all_calls(self):
176+
"""
177+
Verify that when circuit breaker is disabled, all calls go through
178+
even with rate limit errors.
179+
"""
180+
request_count = {"count": 0}
181+
182+
def mock_rate_limited_request(*args, **kwargs):
183+
"""Mock that returns 429"""
184+
request_count["count"] += 1
185+
response = MagicMock(spec=HTTPResponse)
186+
response.status = 429
187+
response.data = b"Too Many Requests"
188+
return response
189+
190+
with patch(
191+
"databricks.sql.telemetry.telemetry_push_client.TelemetryPushClient.request",
192+
side_effect=mock_rate_limited_request,
193+
):
194+
with sql.connect(
195+
server_hostname=self.arguments["host"],
196+
http_path=self.arguments["http_path"],
197+
access_token=self.arguments.get("access_token"),
198+
force_enable_telemetry=True,
199+
telemetry_batch_size=1,
200+
_telemetry_circuit_breaker_enabled=False, # Disabled
201+
) as conn:
202+
cursor = conn.cursor()
203+
204+
# Execute multiple queries
205+
for i in range(5):
206+
cursor.execute(f"SELECT {i}")
207+
cursor.fetchone()
208+
time.sleep(0.3)
209+
210+
# All requests should go through (no circuit breaker)
211+
assert (
212+
request_count["count"] >= 5
213+
), "All requests should go through when CB disabled"
214+
215+
def test_circuit_breaker_recovers_after_reset_timeout(self):
216+
"""
217+
Verify circuit breaker transitions to HALF_OPEN after reset timeout
218+
and eventually CLOSES if requests succeed.
219+
"""
220+
request_count = {"count": 0}
221+
fail_requests = {"enabled": True}
222+
223+
def mock_conditional_request(*args, **kwargs):
224+
"""Mock that fails initially, then succeeds"""
225+
request_count["count"] += 1
226+
response = MagicMock(spec=HTTPResponse)
227+
228+
if fail_requests["enabled"]:
229+
# Return 429 to trigger circuit breaker
230+
response.status = 429
231+
response.data = b"Too Many Requests"
232+
else:
233+
# Return success
234+
response.status = 200
235+
response.data = b"OK"
236+
237+
return response
238+
239+
with patch(
240+
"databricks.sql.telemetry.telemetry_push_client.TelemetryPushClient.request",
241+
side_effect=mock_conditional_request,
242+
):
243+
with sql.connect(
244+
server_hostname=self.arguments["host"],
245+
http_path=self.arguments["http_path"],
246+
access_token=self.arguments.get("access_token"),
247+
force_enable_telemetry=True,
248+
telemetry_batch_size=1,
249+
_telemetry_circuit_breaker_enabled=True,
250+
) as conn:
251+
circuit_breaker = CircuitBreakerManager.get_circuit_breaker(
252+
self.arguments["host"]
253+
)
254+
255+
cursor = conn.cursor()
256+
257+
# Trigger failures to open circuit
258+
cursor.execute("SELECT 1")
259+
cursor.fetchone()
260+
time.sleep(1)
261+
262+
cursor.execute("SELECT 2")
263+
cursor.fetchone()
264+
time.sleep(2)
265+
266+
# Circuit should be OPEN
267+
assert circuit_breaker.current_state == STATE_OPEN
268+
269+
# Wait for reset timeout (5 seconds in test)
270+
time.sleep(6)
271+
272+
# Now make requests succeed
273+
fail_requests["enabled"] = False
274+
275+
# Execute query to trigger HALF_OPEN state
276+
cursor.execute("SELECT 3")
277+
cursor.fetchone()
278+
time.sleep(1)
279+
280+
# Circuit should be HALF_OPEN or CLOSED (testing recovery)
281+
assert circuit_breaker.current_state in [
282+
STATE_HALF_OPEN,
283+
STATE_CLOSED,
284+
], f"Circuit should be recovering, but is {circuit_breaker.current_state}"
285+
286+
# Execute more queries to fully recover
287+
cursor.execute("SELECT 4")
288+
cursor.fetchone()
289+
time.sleep(1)
290+
291+
# Eventually should be CLOSED if requests succeed
292+
# (may take a few successful requests to close from HALF_OPEN)
293+
current_state = circuit_breaker.current_state
294+
assert current_state in [
295+
STATE_CLOSED,
296+
STATE_HALF_OPEN,
297+
], f"Circuit should recover to CLOSED or HALF_OPEN, got {current_state}"
298+
299+
def test_circuit_breaker_503_also_triggers_circuit(self):
300+
"""
301+
Verify circuit breaker opens for 503 Service Unavailable errors
302+
in addition to 429 rate limit errors.
303+
"""
304+
request_count = {"count": 0}
305+
306+
def mock_service_unavailable_request(*args, **kwargs):
307+
"""Mock that returns 503 service unavailable"""
308+
request_count["count"] += 1
309+
response = MagicMock(spec=HTTPResponse)
310+
response.status = 503 # Service unavailable - should trigger CB
311+
response.data = b"Service Unavailable"
312+
return response
313+
314+
with patch(
315+
"databricks.sql.telemetry.telemetry_push_client.TelemetryPushClient.request",
316+
side_effect=mock_service_unavailable_request,
317+
):
318+
with sql.connect(
319+
server_hostname=self.arguments["host"],
320+
http_path=self.arguments["http_path"],
321+
access_token=self.arguments.get("access_token"),
322+
force_enable_telemetry=True,
323+
telemetry_batch_size=1,
324+
_telemetry_circuit_breaker_enabled=True,
325+
) as conn:
326+
circuit_breaker = CircuitBreakerManager.get_circuit_breaker(
327+
self.arguments["host"]
328+
)
329+
330+
cursor = conn.cursor()
331+
332+
# Execute queries to trigger 503 failures
333+
cursor.execute("SELECT 1")
334+
cursor.fetchone()
335+
time.sleep(1)
336+
337+
cursor.execute("SELECT 2")
338+
cursor.fetchone()
339+
time.sleep(2)
340+
341+
# Circuit should be OPEN after 2 x 503 errors
342+
assert (
343+
circuit_breaker.current_state == STATE_OPEN
344+
), "503 errors should trigger circuit breaker"
345+
346+
347+
if __name__ == "__main__":
348+
pytest.main([__file__, "-v", "-s"])

0 commit comments

Comments
 (0)