Skip to content

Commit a1b4468

Browse files
committed
circuit breaker changes using pybreaker
Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com>
1 parent fd65fd2 commit a1b4468

10 files changed

+1687
-3
lines changed

docs/parameters.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,3 +254,73 @@ You should only set `use_inline_params=True` in the following cases:
254254
4. Your client code uses [sequences as parameter values](#passing-sequences-as-parameter-values)
255255

256256
We expect limitations (1) and (2) to be addressed in a future Databricks Runtime release.
257+
258+
# Telemetry Circuit Breaker Configuration
259+
260+
The Databricks SQL connector includes a circuit breaker pattern for telemetry requests to prevent telemetry failures from impacting main SQL operations. This feature is enabled by default and can be controlled through a connection parameter.
261+
262+
## Overview
263+
264+
The circuit breaker monitors telemetry request failures and automatically blocks telemetry requests when the failure rate exceeds a configured threshold. This prevents telemetry service issues from affecting your main SQL operations.
265+
266+
## Configuration Parameter
267+
268+
| Parameter | Type | Default | Description |
269+
|-----------|------|---------|-------------|
270+
| `telemetry_circuit_breaker_enabled` | bool | `True` | Enable or disable the telemetry circuit breaker |
271+
272+
## Usage Examples
273+
274+
### Default Configuration (Circuit Breaker Enabled)
275+
276+
```python
277+
from databricks import sql
278+
279+
# Circuit breaker is enabled by default
280+
with sql.connect(
281+
server_hostname="your-host.cloud.databricks.com",
282+
http_path="/sql/1.0/warehouses/your-warehouse-id",
283+
access_token="your-token"
284+
) as conn:
285+
# Your SQL operations here
286+
pass
287+
```
288+
289+
### Disable Circuit Breaker
290+
291+
```python
292+
from databricks import sql
293+
294+
# Disable circuit breaker entirely
295+
with sql.connect(
296+
server_hostname="your-host.cloud.databricks.com",
297+
http_path="/sql/1.0/warehouses/your-warehouse-id",
298+
access_token="your-token",
299+
telemetry_circuit_breaker_enabled=False
300+
) as conn:
301+
# Your SQL operations here
302+
pass
303+
```
304+
305+
## Circuit Breaker States
306+
307+
The circuit breaker operates in three states:
308+
309+
1. **Closed**: Normal operation, telemetry requests are allowed
310+
2. **Open**: Circuit breaker is open, telemetry requests are blocked
311+
3. **Half-Open**: Testing state, limited telemetry requests are allowed
312+
313+
314+
## Performance Impact
315+
316+
The circuit breaker has minimal performance impact on SQL operations:
317+
318+
- Circuit breaker only affects telemetry requests, not SQL queries
319+
- When circuit breaker is open, telemetry requests are simply skipped
320+
- No additional latency is added to successful operations
321+
322+
## Best Practices
323+
324+
1. **Keep circuit breaker enabled**: The default configuration works well for most use cases
325+
2. **Don't disable unless necessary**: Circuit breaker provides important protection against telemetry failures
326+
3. **Monitor application logs**: Circuit breaker state changes are logged for troubleshooting

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pyarrow = [
2626
{ version = ">=18.0.0", python = ">=3.13", optional=true }
2727
]
2828
pyjwt = "^2.0.0"
29+
pybreaker = "^1.0.0"
2930
requests-kerberos = {version = "^0.15.0", optional = true}
3031

3132

src/databricks/sql/auth/common.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ def __init__(
5151
pool_connections: Optional[int] = None,
5252
pool_maxsize: Optional[int] = None,
5353
user_agent: Optional[str] = None,
54+
# Telemetry circuit breaker configuration
55+
telemetry_circuit_breaker_enabled: Optional[bool] = None,
5456
):
5557
self.hostname = hostname
5658
self.access_token = access_token
@@ -83,6 +85,9 @@ def __init__(
8385
self.pool_connections = pool_connections or 10
8486
self.pool_maxsize = pool_maxsize or 20
8587
self.user_agent = user_agent
88+
89+
# Telemetry circuit breaker configuration
90+
self.telemetry_circuit_breaker_enabled = telemetry_circuit_breaker_enabled if telemetry_circuit_breaker_enabled is not None else True
8691

8792

8893
def get_effective_azure_login_app_id(hostname) -> str:
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
"""
2+
Circuit breaker implementation for telemetry requests.
3+
4+
This module provides circuit breaker functionality to prevent telemetry failures
5+
from impacting the main SQL operations. It uses pybreaker library to implement
6+
the circuit breaker pattern with configurable thresholds and timeouts.
7+
"""
8+
9+
import logging
10+
import threading
11+
from typing import Dict, Optional, Any
12+
from dataclasses import dataclass
13+
14+
import pybreaker
15+
from pybreaker import CircuitBreaker, CircuitBreakerError
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
@dataclass
21+
class CircuitBreakerConfig:
22+
"""Configuration for circuit breaker behavior."""
23+
24+
# Failure threshold percentage (0.0 to 1.0)
25+
failure_threshold: float = 0.5
26+
27+
# Minimum number of calls before circuit can open
28+
minimum_calls: int = 20
29+
30+
# Time window for counting failures (in seconds)
31+
timeout: int = 30
32+
33+
# Time to wait before trying to close circuit (in seconds)
34+
reset_timeout: int = 30
35+
36+
# Expected exception types that should trigger circuit breaker
37+
expected_exception: tuple = (Exception,)
38+
39+
# Name for the circuit breaker (for logging)
40+
name: str = "telemetry-circuit-breaker"
41+
42+
43+
class CircuitBreakerManager:
44+
"""
45+
Manages circuit breaker instances for telemetry requests.
46+
47+
This class provides a singleton pattern to manage circuit breaker instances
48+
per host, ensuring that telemetry failures don't impact main SQL operations.
49+
"""
50+
51+
_instances: Dict[str, CircuitBreaker] = {}
52+
_lock = threading.RLock()
53+
_config: Optional[CircuitBreakerConfig] = None
54+
55+
@classmethod
56+
def initialize(cls, config: CircuitBreakerConfig) -> None:
57+
"""
58+
Initialize the circuit breaker manager with configuration.
59+
60+
Args:
61+
config: Circuit breaker configuration
62+
"""
63+
with cls._lock:
64+
cls._config = config
65+
logger.debug("CircuitBreakerManager initialized with config: %s", config)
66+
67+
@classmethod
68+
def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
69+
"""
70+
Get or create a circuit breaker instance for the specified host.
71+
72+
Args:
73+
host: The hostname for which to get the circuit breaker
74+
75+
Returns:
76+
CircuitBreaker instance for the host
77+
"""
78+
if not cls._config:
79+
# Return a no-op circuit breaker if not initialized
80+
return cls._create_noop_circuit_breaker()
81+
82+
with cls._lock:
83+
if host not in cls._instances:
84+
cls._instances[host] = cls._create_circuit_breaker(host)
85+
logger.debug("Created circuit breaker for host: %s", host)
86+
87+
return cls._instances[host]
88+
89+
@classmethod
90+
def _create_circuit_breaker(cls, host: str) -> CircuitBreaker:
91+
"""
92+
Create a new circuit breaker instance for the specified host.
93+
94+
Args:
95+
host: The hostname for the circuit breaker
96+
97+
Returns:
98+
New CircuitBreaker instance
99+
"""
100+
config = cls._config
101+
102+
# Create circuit breaker with configuration
103+
breaker = CircuitBreaker(
104+
fail_max=config.minimum_calls,
105+
reset_timeout=config.reset_timeout,
106+
name=f"{config.name}-{host}"
107+
)
108+
109+
# Set failure threshold
110+
breaker.failure_threshold = config.failure_threshold
111+
112+
# Add state change listeners for logging
113+
breaker.add_listener(cls._on_state_change)
114+
115+
return breaker
116+
117+
@classmethod
118+
def _create_noop_circuit_breaker(cls) -> CircuitBreaker:
119+
"""
120+
Create a no-op circuit breaker that always allows calls.
121+
122+
Returns:
123+
CircuitBreaker that never opens
124+
"""
125+
# Create a circuit breaker with very high thresholds so it never opens
126+
breaker = CircuitBreaker(
127+
fail_max=1000000, # Very high threshold
128+
reset_timeout=1, # Short reset time
129+
name="noop-circuit-breaker"
130+
)
131+
breaker.failure_threshold = 1.0 # 100% failure threshold
132+
return breaker
133+
134+
@classmethod
135+
def _on_state_change(cls, old_state: str, new_state: str, breaker: CircuitBreaker) -> None:
136+
"""
137+
Handle circuit breaker state changes.
138+
139+
Args:
140+
old_state: Previous state of the circuit breaker
141+
new_state: New state of the circuit breaker
142+
breaker: The circuit breaker instance
143+
"""
144+
logger.info(
145+
"Circuit breaker state changed from %s to %s for %s",
146+
old_state, new_state, breaker.name
147+
)
148+
149+
if new_state == "open":
150+
logger.warning(
151+
"Circuit breaker opened for %s - telemetry requests will be blocked",
152+
breaker.name
153+
)
154+
elif new_state == "closed":
155+
logger.info(
156+
"Circuit breaker closed for %s - telemetry requests will be allowed",
157+
breaker.name
158+
)
159+
elif new_state == "half-open":
160+
logger.info(
161+
"Circuit breaker half-open for %s - testing telemetry requests",
162+
breaker.name
163+
)
164+
165+
@classmethod
166+
def get_circuit_breaker_state(cls, host: str) -> str:
167+
"""
168+
Get the current state of the circuit breaker for a host.
169+
170+
Args:
171+
host: The hostname
172+
173+
Returns:
174+
Current state of the circuit breaker
175+
"""
176+
if not cls._config:
177+
return "disabled"
178+
179+
with cls._lock:
180+
if host not in cls._instances:
181+
return "not_initialized"
182+
183+
breaker = cls._instances[host]
184+
return breaker.current_state
185+
186+
@classmethod
187+
def reset_circuit_breaker(cls, host: str) -> None:
188+
"""
189+
Reset the circuit breaker for a host to closed state.
190+
191+
Args:
192+
host: The hostname
193+
"""
194+
with cls._lock:
195+
if host in cls._instances:
196+
# pybreaker doesn't have a reset method, we need to recreate the breaker
197+
del cls._instances[host]
198+
logger.info("Reset circuit breaker for host: %s", host)
199+
200+
@classmethod
201+
def clear_circuit_breaker(cls, host: str) -> None:
202+
"""
203+
Remove the circuit breaker instance for a host.
204+
205+
Args:
206+
host: The hostname
207+
"""
208+
with cls._lock:
209+
if host in cls._instances:
210+
del cls._instances[host]
211+
logger.debug("Cleared circuit breaker for host: %s", host)
212+
213+
@classmethod
214+
def clear_all_circuit_breakers(cls) -> None:
215+
"""Clear all circuit breaker instances."""
216+
with cls._lock:
217+
cls._instances.clear()
218+
logger.debug("Cleared all circuit breakers")
219+
220+
221+
def is_circuit_breaker_error(exception: Exception) -> bool:
222+
"""
223+
Check if an exception is a circuit breaker error.
224+
225+
Args:
226+
exception: The exception to check
227+
228+
Returns:
229+
True if the exception is a circuit breaker error
230+
"""
231+
return isinstance(exception, CircuitBreakerError)

0 commit comments

Comments
 (0)