|
1 | 1 | import json |
2 | 2 | import os |
3 | | -import shutil |
4 | 3 | import time |
5 | 4 | from collections import defaultdict |
6 | 5 | from logging import getLogger |
7 | 6 |
|
8 | 7 | import pymysql.err |
9 | 8 |
|
| 9 | +from .binlog_recovery import recover_from_binlog_corruption |
10 | 10 | from .binlog_replicator import EventType, LogEvent |
11 | 11 | from .common import Status |
12 | 12 | from .converter import strip_sql_comments |
@@ -79,34 +79,12 @@ def run_realtime_replication(self): |
79 | 79 | except pymysql.err.OperationalError as e: |
80 | 80 | # Check if this is the binlog index file corruption error (Error 1236) |
81 | 81 | if e.args[0] == 1236: |
82 | | - logger.error( |
83 | | - "[binlogrepl] operational error (1236, 'Could not find first log file name in binary log index file')" |
84 | | - ) |
85 | | - logger.error(f"[binlogrepl] Full error: {e}") |
86 | | - logger.info("[binlogrepl] Attempting automatic recovery...") |
87 | | - |
88 | 82 | # Get binlog directory path for this database |
89 | 83 | binlog_dir = os.path.join( |
90 | 84 | self.replicator.config.binlog_replicator.data_dir, |
91 | 85 | self.replicator.database |
92 | 86 | ) |
93 | | - |
94 | | - # Delete the corrupted binlog directory |
95 | | - if os.path.exists(binlog_dir): |
96 | | - logger.warning(f"[binlogrepl] Deleting corrupted binlog directory: {binlog_dir}") |
97 | | - try: |
98 | | - shutil.rmtree(binlog_dir) |
99 | | - logger.info(f"[binlogrepl] Successfully deleted binlog directory: {binlog_dir}") |
100 | | - except Exception as delete_error: |
101 | | - logger.error(f"[binlogrepl] Failed to delete binlog directory: {delete_error}", exc_info=True) |
102 | | - raise RuntimeError("Failed to delete corrupted binlog directory") from delete_error |
103 | | - else: |
104 | | - logger.warning(f"[binlogrepl] Binlog directory does not exist: {binlog_dir}") |
105 | | - |
106 | | - # Exit process cleanly to trigger automatic restart by runner |
107 | | - logger.info("[binlogrepl] Exiting process for automatic restart by runner") |
108 | | - logger.info("[binlogrepl] The runner will automatically restart this process") |
109 | | - raise RuntimeError("Binlog corruption detected (Error 1236) - restarting for recovery") from e |
| 87 | + recover_from_binlog_corruption(binlog_dir, e) |
110 | 88 | else: |
111 | 89 | # Re-raise other OperationalErrors |
112 | 90 | logger.error(f"[binlogrepl] Unhandled OperationalError: {e}", exc_info=True) |
|
0 commit comments