|
12 | 12 |
|
13 | 13 | from runtype import dataclass |
14 | 14 |
|
| 15 | +from .tracking import create_end_event_json, create_start_event_json, send_event_json, is_tracking_enabled |
15 | 16 | from .sql import Select, Checksum, Compare, Count, TableName, Time, Value |
16 | | -from .utils import CaseAwareMapping, CaseInsensitiveDict, safezip, split_space, CaseSensitiveDict, ArithString |
| 17 | +from .utils import ( |
| 18 | + CaseAwareMapping, |
| 19 | + CaseInsensitiveDict, |
| 20 | + safezip, |
| 21 | + split_space, |
| 22 | + CaseSensitiveDict, |
| 23 | + ArithString, |
| 24 | + run_as_daemon, |
| 25 | +) |
17 | 26 | from .databases.base import Database |
18 | 27 | from .databases.database_types import ( |
19 | 28 | DbPath, |
@@ -225,11 +234,11 @@ def count(self) -> Tuple[int, int]: |
225 | 234 |
|
226 | 235 | def count_and_checksum(self) -> Tuple[int, int]: |
227 | 236 | """Count and checksum the rows in the segment, in one pass.""" |
228 | | - start = time.time() |
| 237 | + start = time.monotonic() |
229 | 238 | count, checksum = self.database.query( |
230 | 239 | self._make_select(columns=[Count(), Checksum(self._relevant_columns_repr)]), tuple |
231 | 240 | ) |
232 | | - duration = time.time() - start |
| 241 | + duration = time.monotonic() - start |
233 | 242 | if duration > RECOMMENDED_CHECKSUM_DURATION: |
234 | 243 | logger.warning( |
235 | 244 | f"Checksum is taking longer than expected ({duration:.2f}s). " |
@@ -260,6 +269,11 @@ def query_key_range(self) -> Tuple[int, int]: |
260 | 269 | def is_bounded(self): |
261 | 270 | return self.min_key is not None and self.max_key is not None |
262 | 271 |
|
| 272 | + def approximate_size(self): |
| 273 | + if not self.is_bounded: |
| 274 | + raise RuntimeError("Cannot approximate the size of an unbounded segment. Must have min_key and max_key.") |
| 275 | + return self.max_key - self.min_key |
| 276 | + |
263 | 277 |
|
264 | 278 | def diff_sets(a: set, b: set) -> Iterator: |
265 | 279 | s1 = set(a) |
@@ -325,45 +339,79 @@ def diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult: |
325 | 339 | if self.bisection_factor < 2: |
326 | 340 | raise ValueError("Must have at least two segments per iteration (i.e. bisection_factor >= 2)") |
327 | 341 |
|
328 | | - # Query and validate schema |
329 | | - table1, table2 = self._threaded_call("with_schema", [table1, table2]) |
330 | | - self._validate_and_adjust_columns(table1, table2) |
331 | | - |
332 | | - key_type = table1._schema[table1.key_column] |
333 | | - key_type2 = table2._schema[table2.key_column] |
334 | | - if not isinstance(key_type, IKey): |
335 | | - raise NotImplementedError(f"Cannot use column of type {key_type} as a key") |
336 | | - if not isinstance(key_type2, IKey): |
337 | | - raise NotImplementedError(f"Cannot use column of type {key_type2} as a key") |
338 | | - assert key_type.python_type is key_type2.python_type |
| 342 | + if is_tracking_enabled(): |
| 343 | + options = dict(self) |
| 344 | + event_json = create_start_event_json(options) |
| 345 | + run_as_daemon(send_event_json, event_json) |
339 | 346 |
|
340 | | - # Query min/max values |
341 | | - key_ranges = self._threaded_call_as_completed("query_key_range", [table1, table2]) |
| 347 | + self.stats["diff_count"] = 0 |
| 348 | + start = time.monotonic() |
| 349 | + try: |
342 | 350 |
|
343 | | - # Start with the first completed value, so we don't waste time waiting |
344 | | - min_key1, max_key1 = self._parse_key_range_result(key_type, next(key_ranges)) |
| 351 | + # Query and validate schema |
| 352 | + table1, table2 = self._threaded_call("with_schema", [table1, table2]) |
| 353 | + self._validate_and_adjust_columns(table1, table2) |
345 | 354 |
|
346 | | - table1, table2 = [t.new(min_key=min_key1, max_key=max_key1) for t in (table1, table2)] |
| 355 | + key_type = table1._schema[table1.key_column] |
| 356 | + key_type2 = table2._schema[table2.key_column] |
| 357 | + if not isinstance(key_type, IKey): |
| 358 | + raise NotImplementedError(f"Cannot use column of type {key_type} as a key") |
| 359 | + if not isinstance(key_type2, IKey): |
| 360 | + raise NotImplementedError(f"Cannot use column of type {key_type2} as a key") |
| 361 | + assert key_type.python_type is key_type2.python_type |
347 | 362 |
|
348 | | - logger.info( |
349 | | - f"Diffing tables | segments: {self.bisection_factor}, bisection threshold: {self.bisection_threshold}. " |
350 | | - f"key-range: {table1.min_key}..{table2.max_key}, " |
351 | | - f"size: {table2.max_key-table1.min_key}" |
352 | | - ) |
| 363 | + # Query min/max values |
| 364 | + key_ranges = self._threaded_call_as_completed("query_key_range", [table1, table2]) |
353 | 365 |
|
354 | | - # Bisect (split) the table into segments, and diff them recursively. |
355 | | - yield from self._bisect_and_diff_tables(table1, table2) |
| 366 | + # Start with the first completed value, so we don't waste time waiting |
| 367 | + min_key1, max_key1 = self._parse_key_range_result(key_type, next(key_ranges)) |
356 | 368 |
|
357 | | - # Now we check for the second min-max, to diff the portions we "missed". |
358 | | - min_key2, max_key2 = self._parse_key_range_result(key_type, next(key_ranges)) |
| 369 | + table1, table2 = [t.new(min_key=min_key1, max_key=max_key1) for t in (table1, table2)] |
359 | 370 |
|
360 | | - if min_key2 < min_key1: |
361 | | - pre_tables = [t.new(min_key=min_key2, max_key=min_key1) for t in (table1, table2)] |
362 | | - yield from self._bisect_and_diff_tables(*pre_tables) |
| 371 | + logger.info( |
| 372 | + f"Diffing tables | segments: {self.bisection_factor}, bisection threshold: {self.bisection_threshold}. " |
| 373 | + f"key-range: {table1.min_key}..{table2.max_key}, " |
| 374 | + f"size: {table1.approximate_size()}" |
| 375 | + ) |
363 | 376 |
|
364 | | - if max_key2 > max_key1: |
365 | | - post_tables = [t.new(min_key=max_key1, max_key=max_key2) for t in (table1, table2)] |
366 | | - yield from self._bisect_and_diff_tables(*post_tables) |
| 377 | + # Bisect (split) the table into segments, and diff them recursively. |
| 378 | + yield from self._bisect_and_diff_tables(table1, table2) |
| 379 | + |
| 380 | + # Now we check for the second min-max, to diff the portions we "missed". |
| 381 | + min_key2, max_key2 = self._parse_key_range_result(key_type, next(key_ranges)) |
| 382 | + |
| 383 | + if min_key2 < min_key1: |
| 384 | + pre_tables = [t.new(min_key=min_key2, max_key=min_key1) for t in (table1, table2)] |
| 385 | + yield from self._bisect_and_diff_tables(*pre_tables) |
| 386 | + |
| 387 | + if max_key2 > max_key1: |
| 388 | + post_tables = [t.new(min_key=max_key1, max_key=max_key2) for t in (table1, table2)] |
| 389 | + yield from self._bisect_and_diff_tables(*post_tables) |
| 390 | + |
| 391 | + error = None |
| 392 | + except BaseException as e: # Catch KeyboardInterrupt too |
| 393 | + error = e |
| 394 | + finally: |
| 395 | + if is_tracking_enabled(): |
| 396 | + runtime = time.monotonic() - start |
| 397 | + table1_count = self.stats.get("table1_count") |
| 398 | + table2_count = self.stats.get("table2_count") |
| 399 | + diff_count = self.stats.get("diff_count") |
| 400 | + err_message = str(error)[:20] # Truncate possibly sensitive information. |
| 401 | + event_json = create_end_event_json( |
| 402 | + error is None, |
| 403 | + runtime, |
| 404 | + table1.database.name, |
| 405 | + table2.database.name, |
| 406 | + table1_count, |
| 407 | + table2_count, |
| 408 | + diff_count, |
| 409 | + err_message, |
| 410 | + ) |
| 411 | + send_event_json(event_json) |
| 412 | + |
| 413 | + if error: |
| 414 | + raise error |
367 | 415 |
|
368 | 416 | def _parse_key_range_result(self, key_type, key_range): |
369 | 417 | mn, mx = key_range |
@@ -441,6 +489,8 @@ def _bisect_and_diff_tables(self, table1, table2, level=0, max_rows=None): |
441 | 489 | self.stats["table1_count"] = len(rows1) |
442 | 490 | self.stats["table2_count"] = len(rows2) |
443 | 491 |
|
| 492 | + self.stats["diff_count"] += len(diff) |
| 493 | + |
444 | 494 | logger.info(". " * level + f"Diff found {len(diff)} different rows.") |
445 | 495 | self.stats["rows_downloaded"] = self.stats.get("rows_downloaded", 0) + max(len(rows1), len(rows2)) |
446 | 496 | yield from diff |
|
0 commit comments