Skip to content

Commit 3dd29e7

Browse files
committed
Enhance bug report and replication logic in DbReplicator
- Updated the bug report for the critical replication issue, clarifying the status and latest findings regarding the infinite loop on the `api_key` table. - Improved logging in the `perform_initial_replication` method to track table processing and error handling, allowing for better diagnostics during replication. - Added exception handling to ensure that individual table failures do not halt the entire replication process, enhancing robustness. - Implemented detailed logging for worker processes, including primary key advancement tracking and iteration counts, to aid in debugging. - Enhanced SQL query logging in MySQLApi to provide better visibility into executed queries and parameters, improving overall error handling.
1 parent 6e764c5 commit 3dd29e7

File tree

8 files changed

+492
-106
lines changed

8 files changed

+492
-106
lines changed

docker_startup.log

Lines changed: 0 additions & 52 deletions
This file was deleted.

mysql_ch_replicator/db_replicator.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,18 +208,37 @@ def run(self):
208208
if self.target_database not in self.clickhouse_api.get_databases() and f"{self.target_database}_tmp" not in self.clickhouse_api.get_databases():
209209
logger.warning(f'database {self.target_database} missing in CH')
210210
logger.warning('will run replication from scratch')
211+
# 🔄 PHASE 1.2: Status transition logging
212+
old_status = self.state.status
211213
self.state.remove()
212214
self.state = self.create_state()
215+
logger.info(f"🔄 STATUS CHANGE: {old_status}{Status.NONE}, reason='database_missing_resetting_state'")
213216

214217
if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
215218
self.run_realtime_replication()
216219
return
217220
if self.state.status == Status.PERFORMING_INITIAL_REPLICATION:
221+
logger.info(f'🔍 DEBUG: Starting initial replication (initial_only={self.initial_only})')
222+
logger.info(f'🔍 DEBUG: Current state status: {self.state.status}')
223+
logger.info(f'🔍 DEBUG: Process PID: {os.getpid()}')
224+
218225
self.initial_replicator.perform_initial_replication()
226+
227+
logger.info(f'🔍 DEBUG: Initial replication completed')
228+
logger.info(f'🔍 DEBUG: State status before update: {self.state.status}')
229+
219230
if not self.initial_only:
231+
logger.info(f'🔍 DEBUG: initial_only=False, transitioning to realtime replication')
220232
self.run_realtime_replication()
221233
else:
234+
logger.info(f'🔍 DEBUG: initial_only=True, will exit after state update')
222235
logger.info('initial_only mode enabled - exiting after initial replication')
236+
# FIX #1: Update status to indicate completion
237+
self.state.status = Status.RUNNING_REALTIME_REPLICATION
238+
self.state.save()
239+
logger.info('State updated: Initial replication completed successfully')
240+
logger.info(f'🔍 DEBUG: State status after update: {self.state.status}')
241+
logger.info(f'🔍 DEBUG: Process {os.getpid()} exiting normally')
223242
return
224243

225244
# If ignore_deletes is enabled, we don't create a temporary DB and don't swap DBs
@@ -251,8 +270,33 @@ def run(self):
251270
self.run_realtime_replication()
252271
else:
253272
logger.info('initial_only mode enabled - exiting after initial replication')
254-
except Exception:
255-
logger.error(f'unhandled exception', exc_info=True)
273+
except Exception as exc:
274+
# Build rich error context for debugging
275+
error_context = {
276+
'database': self.database,
277+
'table': getattr(self, 'table', None),
278+
'worker_id': self.worker_id,
279+
'total_workers': self.total_workers,
280+
'target_database': self.target_database,
281+
'is_worker': self.is_parallel_worker,
282+
'initial_only': self.initial_only,
283+
}
284+
logger.error(f'Worker {self.worker_id} unhandled exception: {error_context}', exc_info=True)
285+
286+
# Ensure exception info gets to stderr for parent process
287+
# This guarantees output even if logging fails
288+
import sys
289+
import traceback
290+
sys.stderr.write(f"\n{'='*60}\n")
291+
sys.stderr.write(f"WORKER FAILURE CONTEXT:\n")
292+
for key, value in error_context.items():
293+
sys.stderr.write(f" {key}: {value}\n")
294+
sys.stderr.write(f"{'='*60}\n")
295+
sys.stderr.write(f"Exception: {type(exc).__name__}: {exc}\n")
296+
sys.stderr.write(f"{'='*60}\n")
297+
traceback.print_exc(file=sys.stderr)
298+
sys.stderr.flush()
299+
256300
raise
257301

258302
def run_realtime_replication(self):

0 commit comments

Comments
 (0)