1+ from concurrent .futures import wait
12import random
23import threading
34import time
@@ -35,6 +36,7 @@ def telemetry_setup_teardown(self):
3536 if TelemetryClientFactory ._executor :
3637 TelemetryClientFactory ._executor .shutdown (wait = True )
3738 TelemetryClientFactory ._executor = None
39+ TelemetryClientFactory ._stop_flush_thread ()
3840 TelemetryClientFactory ._initialized = False
3941
4042 def test_concurrent_queries_sends_telemetry (self ):
@@ -47,8 +49,7 @@ def test_concurrent_queries_sends_telemetry(self):
4749 captured_telemetry = []
4850 captured_session_ids = []
4951 captured_statement_ids = []
50- captured_responses = []
51- captured_exceptions = []
52+ captured_futures = []
5253
5354 original_send_telemetry = TelemetryClient ._send_telemetry
5455 original_callback = TelemetryClient ._telemetry_request_callback
@@ -63,18 +64,9 @@ def callback_wrapper(self_client, future, sent_count):
6364 Wraps the original callback to capture the server's response
6465 or any exceptions from the async network call.
6566 """
66- try :
67- original_callback (self_client , future , sent_count )
68-
69- # Now, capture the result for our assertions
70- response = future .result ()
71- response .raise_for_status () # Raise an exception for 4xx/5xx errors
72- telemetry_response = response .json ()
73- with capture_lock :
74- captured_responses .append (telemetry_response )
75- except Exception as e :
76- with capture_lock :
77- captured_exceptions .append (e )
67+ with capture_lock :
68+ captured_futures .append (future )
69+ original_callback (self_client , future , sent_count )
7870
7971 with patch .object (TelemetryClient , "_send_telemetry" , send_telemetry_wrapper ), \
8072 patch .object (TelemetryClient , "_telemetry_request_callback" , callback_wrapper ):
@@ -101,10 +93,26 @@ def execute_query_worker(thread_id):
10193 # Run the workers concurrently
10294 run_in_threads (execute_query_worker , num_threads , pass_index = True )
10395
104- if TelemetryClientFactory ._executor :
105- TelemetryClientFactory ._executor .shutdown (wait = True )
96+ timeout_seconds = 60
97+ start_time = time .time ()
98+ expected_event_count = num_threads
99+
100+ while len (captured_futures ) < expected_event_count and time .time () - start_time < timeout_seconds :
101+ time .sleep (0.1 )
102+
103+ done , not_done = wait (captured_futures , timeout = timeout_seconds )
104+ assert not not_done
105+
106+ captured_exceptions = []
107+ captured_responses = []
108+ for future in done :
109+ try :
110+ response = future .result ()
111+ response .raise_for_status ()
112+ captured_responses .append (response .json ())
113+ except Exception as e :
114+ captured_exceptions .append (e )
106115
107- # --- VERIFICATION ---
108116 assert not captured_exceptions
109117 assert len (captured_responses ) > 0
110118
0 commit comments