Skip to content

Commit 1f9c4d3

Browse files
committed
Added interface layer top of http client to use circuit rbeaker
Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com>
1 parent db93974 commit 1f9c4d3

File tree

7 files changed

+54
-98
lines changed

7 files changed

+54
-98
lines changed

docs/parameters.md

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -254,73 +254,3 @@ You should only set `use_inline_params=True` in the following cases:
254254
4. Your client code uses [sequences as parameter values](#passing-sequences-as-parameter-values)
255255

256256
We expect limitations (1) and (2) to be addressed in a future Databricks Runtime release.
257-
258-
# Telemetry Circuit Breaker Configuration
259-
260-
The Databricks SQL connector includes a circuit breaker pattern for telemetry requests to prevent telemetry failures from impacting main SQL operations. This feature is enabled by default and can be controlled through a connection parameter.
261-
262-
## Overview
263-
264-
The circuit breaker monitors telemetry request failures and automatically blocks telemetry requests when the failure rate exceeds a configured threshold. This prevents telemetry service issues from affecting your main SQL operations.
265-
266-
## Configuration Parameter
267-
268-
| Parameter | Type | Default | Description |
269-
|-----------|------|---------|-------------|
270-
| `telemetry_circuit_breaker_enabled` | bool | `True` | Enable or disable the telemetry circuit breaker |
271-
272-
## Usage Examples
273-
274-
### Default Configuration (Circuit Breaker Enabled)
275-
276-
```python
277-
from databricks import sql
278-
279-
# Circuit breaker is enabled by default
280-
with sql.connect(
281-
server_hostname="your-host.cloud.databricks.com",
282-
http_path="/sql/1.0/warehouses/your-warehouse-id",
283-
access_token="your-token"
284-
) as conn:
285-
# Your SQL operations here
286-
pass
287-
```
288-
289-
### Disable Circuit Breaker
290-
291-
```python
292-
from databricks import sql
293-
294-
# Disable circuit breaker entirely
295-
with sql.connect(
296-
server_hostname="your-host.cloud.databricks.com",
297-
http_path="/sql/1.0/warehouses/your-warehouse-id",
298-
access_token="your-token",
299-
telemetry_circuit_breaker_enabled=False
300-
) as conn:
301-
# Your SQL operations here
302-
pass
303-
```
304-
305-
## Circuit Breaker States
306-
307-
The circuit breaker operates in three states:
308-
309-
1. **Closed**: Normal operation, telemetry requests are allowed
310-
2. **Open**: Circuit breaker is open, telemetry requests are blocked
311-
3. **Half-Open**: Testing state, limited telemetry requests are allowed
312-
313-
314-
## Performance Impact
315-
316-
The circuit breaker has minimal performance impact on SQL operations:
317-
318-
- Circuit breaker only affects telemetry requests, not SQL queries
319-
- When circuit breaker is open, telemetry requests are simply skipped
320-
- No additional latency is added to successful operations
321-
322-
## Best Practices
323-
324-
1. **Keep circuit breaker enabled**: The default configuration works well for most use cases
325-
2. **Don't disable unless necessary**: Circuit breaker provides important protection against telemetry failures
326-
3. **Monitor application logs**: Circuit breaker state changes are logged for troubleshooting

src/databricks/sql/auth/common.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ def __init__(
5151
pool_connections: Optional[int] = None,
5252
pool_maxsize: Optional[int] = None,
5353
user_agent: Optional[str] = None,
54-
# Telemetry circuit breaker configuration
5554
telemetry_circuit_breaker_enabled: Optional[bool] = None,
5655
):
5756
self.hostname = hostname
@@ -85,9 +84,7 @@ def __init__(
8584
self.pool_connections = pool_connections or 10
8685
self.pool_maxsize = pool_maxsize or 20
8786
self.user_agent = user_agent
88-
89-
# Telemetry circuit breaker configuration
90-
self.telemetry_circuit_breaker_enabled = telemetry_circuit_breaker_enabled if telemetry_circuit_breaker_enabled is not None else True
87+
self.telemetry_circuit_breaker_enabled = telemetry_circuit_breaker_enabled if telemetry_circuit_breaker_enabled is not None else False
9188

9289

9390
def get_effective_azure_login_app_id(hostname) -> str:

src/databricks/sql/telemetry/circuit_breaker_manager.py

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,53 @@
1616

1717
logger = logging.getLogger(__name__)
1818

19+
# Circuit Breaker Configuration Constants
20+
DEFAULT_FAILURE_THRESHOLD = 0.5
21+
DEFAULT_MINIMUM_CALLS = 20
22+
DEFAULT_TIMEOUT = 30
23+
DEFAULT_RESET_TIMEOUT = 30
24+
DEFAULT_EXPECTED_EXCEPTION = (Exception,)
25+
DEFAULT_NAME = "telemetry-circuit-breaker"
1926

20-
@dataclass
27+
# Circuit Breaker State Constants
28+
CIRCUIT_BREAKER_STATE_OPEN = "open"
29+
CIRCUIT_BREAKER_STATE_CLOSED = "closed"
30+
CIRCUIT_BREAKER_STATE_HALF_OPEN = "half-open"
31+
CIRCUIT_BREAKER_STATE_DISABLED = "disabled"
32+
CIRCUIT_BREAKER_STATE_NOT_INITIALIZED = "not_initialized"
33+
34+
# Logging Message Constants
35+
LOG_CIRCUIT_BREAKER_STATE_CHANGED = "Circuit breaker state changed from %s to %s for %s"
36+
LOG_CIRCUIT_BREAKER_OPENED = "Circuit breaker opened for %s - telemetry requests will be blocked"
37+
LOG_CIRCUIT_BREAKER_CLOSED = "Circuit breaker closed for %s - telemetry requests will be allowed"
38+
LOG_CIRCUIT_BREAKER_HALF_OPEN = "Circuit breaker half-open for %s - testing telemetry requests"
39+
40+
41+
@dataclass(frozen=True)
2142
class CircuitBreakerConfig:
22-
"""Configuration for circuit breaker behavior."""
43+
"""Configuration for circuit breaker behavior.
44+
45+
This class is immutable to prevent modification of circuit breaker settings.
46+
All configuration values are set to constants defined at the module level.
47+
"""
2348

2449
# Failure threshold percentage (0.0 to 1.0)
25-
failure_threshold: float = 0.5
50+
failure_threshold: float = DEFAULT_FAILURE_THRESHOLD
2651

2752
# Minimum number of calls before circuit can open
28-
minimum_calls: int = 20
53+
minimum_calls: int = DEFAULT_MINIMUM_CALLS
2954

3055
# Time window for counting failures (in seconds)
31-
timeout: int = 30
56+
timeout: int = DEFAULT_TIMEOUT
3257

3358
# Time to wait before trying to close circuit (in seconds)
34-
reset_timeout: int = 30
59+
reset_timeout: int = DEFAULT_RESET_TIMEOUT
3560

3661
# Expected exception types that should trigger circuit breaker
37-
expected_exception: tuple = (Exception,)
62+
expected_exception: tuple = DEFAULT_EXPECTED_EXCEPTION
3863

3964
# Name for the circuit breaker (for logging)
40-
name: str = "telemetry-circuit-breaker"
65+
name: str = DEFAULT_NAME
4166

4267

4368
class CircuitBreakerManager:
@@ -142,23 +167,23 @@ def _on_state_change(cls, old_state: str, new_state: str, breaker: CircuitBreake
142167
breaker: The circuit breaker instance
143168
"""
144169
logger.info(
145-
"Circuit breaker state changed from %s to %s for %s",
170+
LOG_CIRCUIT_BREAKER_STATE_CHANGED,
146171
old_state, new_state, breaker.name
147172
)
148173

149-
if new_state == "open":
174+
if new_state == CIRCUIT_BREAKER_STATE_OPEN:
150175
logger.warning(
151-
"Circuit breaker opened for %s - telemetry requests will be blocked",
176+
LOG_CIRCUIT_BREAKER_OPENED,
152177
breaker.name
153178
)
154-
elif new_state == "closed":
179+
elif new_state == CIRCUIT_BREAKER_STATE_CLOSED:
155180
logger.info(
156-
"Circuit breaker closed for %s - telemetry requests will be allowed",
181+
LOG_CIRCUIT_BREAKER_CLOSED,
157182
breaker.name
158183
)
159-
elif new_state == "half-open":
184+
elif new_state == CIRCUIT_BREAKER_STATE_HALF_OPEN:
160185
logger.info(
161-
"Circuit breaker half-open for %s - testing telemetry requests",
186+
LOG_CIRCUIT_BREAKER_HALF_OPEN,
162187
breaker.name
163188
)
164189

@@ -174,11 +199,11 @@ def get_circuit_breaker_state(cls, host: str) -> str:
174199
Current state of the circuit breaker
175200
"""
176201
if not cls._config:
177-
return "disabled"
202+
return CIRCUIT_BREAKER_STATE_DISABLED
178203

179204
with cls._lock:
180205
if host not in cls._instances:
181-
return "not_initialized"
206+
return CIRCUIT_BREAKER_STATE_NOT_INITIALIZED
182207

183208
breaker = cls._instances[host]
184209
return breaker.current_state

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,6 @@ def close(self):
393393
"""Flush remaining events before closing"""
394394
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
395395
self._flush()
396-
397396

398397

399398
class TelemetryClientFactory:

src/databricks/sql/telemetry/telemetry_push_client.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616

1717
from databricks.sql.common.unified_http_client import UnifiedHttpClient
1818
from databricks.sql.common.http import HttpMethod
19-
from databricks.sql.telemetry.circuit_breaker_manager import CircuitBreakerConfig, is_circuit_breaker_error
19+
from databricks.sql.telemetry.circuit_breaker_manager import (
20+
CircuitBreakerConfig,
21+
CircuitBreakerManager,
22+
is_circuit_breaker_error,
23+
CIRCUIT_BREAKER_STATE_OPEN
24+
)
2025

2126
logger = logging.getLogger(__name__)
2227

@@ -133,7 +138,6 @@ def __init__(
133138
self._config = config
134139

135140
# Initialize circuit breaker manager with config
136-
from databricks.sql.telemetry.circuit_breaker_manager import CircuitBreakerManager
137141
CircuitBreakerManager.initialize(config)
138142

139143
# Get circuit breaker for this host
@@ -200,14 +204,14 @@ def request_context(
200204

201205
def get_circuit_breaker_state(self) -> str:
202206
"""Get the current state of the circuit breaker."""
203-
from databricks.sql.telemetry.circuit_breaker_manager import CircuitBreakerManager
204207
return CircuitBreakerManager.get_circuit_breaker_state(self._host)
205208

206209
def is_circuit_breaker_open(self) -> bool:
207210
"""Check if the circuit breaker is currently open."""
208-
return self.get_circuit_breaker_state() == "open"
211+
return self.get_circuit_breaker_state() == CIRCUIT_BREAKER_STATE_OPEN
209212

210213
def reset_circuit_breaker(self) -> None:
211214
"""Reset the circuit breaker to closed state."""
212-
from databricks.sql.telemetry.circuit_breaker_manager import CircuitBreakerManager
213215
CircuitBreakerManager.reset_circuit_breaker(self._host)
216+
217+

tests/unit/test_circuit_breaker_http_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import pytest
66
from unittest.mock import Mock, patch, MagicMock
7-
import urllib.parse
87

98
from databricks.sql.telemetry.telemetry_push_client import (
109
ITelemetryPushClient,

tests/unit/test_telemetry_circuit_breaker_integration.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,3 +279,5 @@ def make_request():
279279
assert len(results) + len(errors) == 5
280280
# Some should be CircuitBreakerError after circuit opens
281281
assert "CircuitBreakerError" in errors or len(errors) == 0
282+
283+

0 commit comments

Comments
 (0)