Skip to content

Commit 897b858

Browse files
committed
simplified CB config
1 parent 2ec79a3 commit 897b858

File tree

8 files changed

+506
-843
lines changed

8 files changed

+506
-843
lines changed

src/databricks/sql/telemetry/circuit_breaker_manager.py

Lines changed: 9 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,15 @@
1717
logger = logging.getLogger(__name__)
1818

1919
# Circuit Breaker Configuration Constants
20-
DEFAULT_FAILURE_THRESHOLD = 0.5
21-
DEFAULT_MINIMUM_CALLS = 20
22-
DEFAULT_TIMEOUT = 30
23-
DEFAULT_RESET_TIMEOUT = 30
24-
DEFAULT_EXPECTED_EXCEPTION = (Exception,)
25-
DEFAULT_NAME = "telemetry-circuit-breaker"
20+
MINIMUM_CALLS = 20
21+
RESET_TIMEOUT = 30
22+
CIRCUIT_BREAKER_NAME = "telemetry-circuit-breaker"
2623

2724
# Circuit Breaker State Constants
2825
CIRCUIT_BREAKER_STATE_OPEN = "open"
2926
CIRCUIT_BREAKER_STATE_CLOSED = "closed"
3027
CIRCUIT_BREAKER_STATE_HALF_OPEN = "half-open"
3128
CIRCUIT_BREAKER_STATE_DISABLED = "disabled"
32-
CIRCUIT_BREAKER_STATE_NOT_INITIALIZED = "not_initialized"
3329

3430
# Logging Message Constants
3531
LOG_CIRCUIT_BREAKER_STATE_CHANGED = "Circuit breaker state changed from %s to %s for %s"
@@ -76,56 +72,18 @@ def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None:
7672
logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name)
7773

7874

79-
@dataclass(frozen=True)
80-
class CircuitBreakerConfig:
81-
"""Configuration for circuit breaker behavior.
82-
83-
This class is immutable to prevent modification of circuit breaker settings.
84-
All configuration values are set to constants defined at the module level.
85-
"""
86-
87-
# Failure threshold percentage (0.0 to 1.0)
88-
failure_threshold: float = DEFAULT_FAILURE_THRESHOLD
89-
90-
# Minimum number of calls before circuit can open
91-
minimum_calls: int = DEFAULT_MINIMUM_CALLS
92-
93-
# Time window for counting failures (in seconds)
94-
timeout: int = DEFAULT_TIMEOUT
95-
96-
# Time to wait before trying to close circuit (in seconds)
97-
reset_timeout: int = DEFAULT_RESET_TIMEOUT
98-
99-
# Expected exception types that should trigger circuit breaker
100-
expected_exception: tuple = DEFAULT_EXPECTED_EXCEPTION
101-
102-
# Name for the circuit breaker (for logging)
103-
name: str = DEFAULT_NAME
104-
105-
10675
class CircuitBreakerManager:
10776
"""
10877
Manages circuit breaker instances for telemetry requests.
10978
11079
This class provides a singleton pattern to manage circuit breaker instances
11180
per host, ensuring that telemetry failures don't impact main SQL operations.
81+
82+
Circuit breaker configuration is fixed and cannot be overridden.
11283
"""
11384

11485
_instances: Dict[str, CircuitBreaker] = {}
11586
_lock = threading.RLock()
116-
_config: Optional[CircuitBreakerConfig] = None
117-
118-
@classmethod
119-
def initialize(cls, config: CircuitBreakerConfig) -> None:
120-
"""
121-
Initialize the circuit breaker manager with configuration.
122-
123-
Args:
124-
config: Circuit breaker configuration
125-
"""
126-
with cls._lock:
127-
cls._config = config
128-
logger.debug("CircuitBreakerManager initialized with config: %s", config)
12987

13088
@classmethod
13189
def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
@@ -138,10 +96,6 @@ def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
13896
Returns:
13997
CircuitBreaker instance for the host
14098
"""
141-
if not cls._config:
142-
# Return a no-op circuit breaker if not initialized
143-
return cls._create_noop_circuit_breaker()
144-
14599
with cls._lock:
146100
if host not in cls._instances:
147101
cls._instances[host] = cls._create_circuit_breaker(host)
@@ -160,93 +114,16 @@ def _create_circuit_breaker(cls, host: str) -> CircuitBreaker:
160114
Returns:
161115
New CircuitBreaker instance
162116
"""
163-
config = cls._config
164-
if config is None:
165-
raise RuntimeError("CircuitBreakerManager not initialized")
166-
167-
# Create circuit breaker with configuration
117+
# Create circuit breaker with fixed configuration
168118
breaker = CircuitBreaker(
169-
fail_max=config.minimum_calls, # Number of failures before circuit opens
170-
reset_timeout=config.reset_timeout,
171-
name=f"{config.name}-{host}",
119+
fail_max=MINIMUM_CALLS,
120+
reset_timeout=RESET_TIMEOUT,
121+
name=f"{CIRCUIT_BREAKER_NAME}-{host}",
172122
)
173-
174-
# Add state change listeners for logging
175123
breaker.add_listener(CircuitBreakerStateListener())
176124

177125
return breaker
178126

179-
@classmethod
180-
def _create_noop_circuit_breaker(cls) -> CircuitBreaker:
181-
"""
182-
Create a no-op circuit breaker that always allows calls.
183-
184-
Returns:
185-
CircuitBreaker that never opens
186-
"""
187-
# Create a circuit breaker with very high thresholds so it never opens
188-
breaker = CircuitBreaker(
189-
fail_max=1000000, # Very high threshold
190-
reset_timeout=1, # Short reset time
191-
name="noop-circuit-breaker",
192-
)
193-
return breaker
194-
195-
@classmethod
196-
def get_circuit_breaker_state(cls, host: str) -> str:
197-
"""
198-
Get the current state of the circuit breaker for a host.
199-
200-
Args:
201-
host: The hostname
202-
203-
Returns:
204-
Current state of the circuit breaker
205-
"""
206-
if not cls._config:
207-
return CIRCUIT_BREAKER_STATE_DISABLED
208-
209-
with cls._lock:
210-
if host not in cls._instances:
211-
return CIRCUIT_BREAKER_STATE_NOT_INITIALIZED
212-
213-
breaker = cls._instances[host]
214-
return breaker.current_state
215-
216-
@classmethod
217-
def reset_circuit_breaker(cls, host: str) -> None:
218-
"""
219-
Reset the circuit breaker for a host to closed state.
220-
221-
Args:
222-
host: The hostname
223-
"""
224-
with cls._lock:
225-
if host in cls._instances:
226-
# pybreaker doesn't have a reset method, we need to recreate the breaker
227-
del cls._instances[host]
228-
logger.info("Reset circuit breaker for host: %s", host)
229-
230-
@classmethod
231-
def clear_circuit_breaker(cls, host: str) -> None:
232-
"""
233-
Remove the circuit breaker instance for a host.
234-
235-
Args:
236-
host: The hostname
237-
"""
238-
with cls._lock:
239-
if host in cls._instances:
240-
del cls._instances[host]
241-
logger.debug("Cleared circuit breaker for host: %s", host)
242-
243-
@classmethod
244-
def clear_all_circuit_breakers(cls) -> None:
245-
"""Clear all circuit breaker instances."""
246-
with cls._lock:
247-
cls._instances.clear()
248-
logger.debug("Cleared all circuit breakers")
249-
250127

251128
def is_circuit_breaker_error(exception: Exception) -> bool:
252129
"""

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
CircuitBreakerTelemetryPushClient,
4848
)
4949
from databricks.sql.telemetry.circuit_breaker_manager import (
50-
CircuitBreakerConfig,
5150
is_circuit_breaker_error,
5251
)
5352

@@ -200,34 +199,15 @@ def __init__(
200199

201200
# Create telemetry push client based on circuit breaker enabled flag
202201
if client_context.telemetry_circuit_breaker_enabled:
203-
# Create circuit breaker configuration from client context or use defaults
204-
self._circuit_breaker_config = CircuitBreakerConfig(
205-
failure_threshold=getattr(
206-
client_context, "telemetry_circuit_breaker_failure_threshold", 0.5
207-
),
208-
minimum_calls=getattr(
209-
client_context, "telemetry_circuit_breaker_minimum_calls", 20
210-
),
211-
timeout=getattr(
212-
client_context, "telemetry_circuit_breaker_timeout", 30
213-
),
214-
reset_timeout=getattr(
215-
client_context, "telemetry_circuit_breaker_reset_timeout", 30
216-
),
217-
name=f"telemetry-circuit-breaker-{session_id_hex}",
218-
)
219-
220-
# Create circuit breaker telemetry push client
202+
# Create circuit breaker telemetry push client with fixed configuration
221203
self._telemetry_push_client: ITelemetryPushClient = (
222204
CircuitBreakerTelemetryPushClient(
223205
TelemetryPushClient(self._http_client),
224206
host_url,
225-
self._circuit_breaker_config,
226207
)
227208
)
228209
else:
229210
# Circuit breaker disabled - use direct telemetry push client
230-
self._circuit_breaker_config = None
231211
self._telemetry_push_client: ITelemetryPushClient = TelemetryPushClient(
232212
self._http_client
233213
)
@@ -410,18 +390,6 @@ def close(self):
410390
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
411391
self._flush()
412392

413-
def get_circuit_breaker_state(self) -> str:
414-
"""Get the current state of the circuit breaker."""
415-
return self._telemetry_push_client.get_circuit_breaker_state()
416-
417-
def is_circuit_breaker_open(self) -> bool:
418-
"""Check if the circuit breaker is currently open."""
419-
return self._telemetry_push_client.is_circuit_breaker_open()
420-
421-
def reset_circuit_breaker(self) -> None:
422-
"""Reset the circuit breaker."""
423-
self._telemetry_push_client.reset_circuit_breaker()
424-
425393

426394
class TelemetryClientFactory:
427395
"""

src/databricks/sql/telemetry/telemetry_push_client.py

Lines changed: 3 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
from databricks.sql.common.unified_http_client import UnifiedHttpClient
2121
from databricks.sql.common.http import HttpMethod
2222
from databricks.sql.telemetry.circuit_breaker_manager import (
23-
CircuitBreakerConfig,
2423
CircuitBreakerManager,
2524
is_circuit_breaker_error,
26-
CIRCUIT_BREAKER_STATE_OPEN,
2725
)
2826

2927
logger = logging.getLogger(__name__)
@@ -55,21 +53,6 @@ def request_context(
5553
"""Context manager for making HTTP requests."""
5654
pass
5755

58-
@abstractmethod
59-
def get_circuit_breaker_state(self) -> str:
60-
"""Get the current state of the circuit breaker."""
61-
pass
62-
63-
@abstractmethod
64-
def is_circuit_breaker_open(self) -> bool:
65-
"""Check if the circuit breaker is currently open."""
66-
pass
67-
68-
@abstractmethod
69-
def reset_circuit_breaker(self) -> None:
70-
"""Reset the circuit breaker to closed state."""
71-
pass
72-
7356

7457
class TelemetryPushClient(ITelemetryPushClient):
7558
"""Direct HTTP client implementation for telemetry requests."""
@@ -108,47 +91,27 @@ def request_context(
10891
) as response:
10992
yield response
11093

111-
def get_circuit_breaker_state(self) -> str:
112-
"""Circuit breaker is not available in direct implementation."""
113-
return "not_available"
114-
115-
def is_circuit_breaker_open(self) -> bool:
116-
"""Circuit breaker is not available in direct implementation."""
117-
return False
118-
119-
def reset_circuit_breaker(self) -> None:
120-
"""Circuit breaker is not available in direct implementation."""
121-
pass
122-
12394

12495
class CircuitBreakerTelemetryPushClient(ITelemetryPushClient):
12596
"""Circuit breaker wrapper implementation for telemetry requests."""
12697

127-
def __init__(
128-
self, delegate: ITelemetryPushClient, host: str, config: CircuitBreakerConfig
129-
):
98+
def __init__(self, delegate: ITelemetryPushClient, host: str):
13099
"""
131100
Initialize the circuit breaker telemetry push client.
132101
133102
Args:
134103
delegate: The underlying telemetry push client to wrap
135104
host: The hostname for circuit breaker identification
136-
config: Circuit breaker configuration
137105
"""
138106
self._delegate = delegate
139107
self._host = host
140-
self._config = config
141108

142-
# Initialize circuit breaker manager with config
143-
CircuitBreakerManager.initialize(config)
144-
145-
# Get circuit breaker for this host
109+
# Get circuit breaker for this host (creates if doesn't exist)
146110
self._circuit_breaker = CircuitBreakerManager.get_circuit_breaker(host)
147111

148112
logger.debug(
149-
"CircuitBreakerTelemetryPushClient initialized for host %s with config: %s",
113+
"CircuitBreakerTelemetryPushClient initialized for host %s",
150114
host,
151-
config,
152115
)
153116

154117
def request(
@@ -208,15 +171,3 @@ def _make_request():
208171
# Re-raise non-circuit breaker exceptions
209172
logger.debug("Telemetry request failed for host %s: %s", self._host, e)
210173
raise
211-
212-
def get_circuit_breaker_state(self) -> str:
213-
"""Get the current state of the circuit breaker."""
214-
return CircuitBreakerManager.get_circuit_breaker_state(self._host)
215-
216-
def is_circuit_breaker_open(self) -> bool:
217-
"""Check if the circuit breaker is currently open."""
218-
return self.get_circuit_breaker_state() == CIRCUIT_BREAKER_STATE_OPEN
219-
220-
def reset_circuit_breaker(self) -> None:
221-
"""Reset the circuit breaker to closed state."""
222-
CircuitBreakerManager.reset_circuit_breaker(self._host)

0 commit comments

Comments
 (0)