Skip to content

Commit 2ae7490

Browse files
authored
Merge pull request #13383 from valentijnscholten/pghistory-backfill-improvements
pghistory improvements: backfill and "empty" changes
2 parents dbb4950 + 39b51a1 commit 2ae7490

File tree

4 files changed

+970
-54
lines changed

4 files changed

+970
-54
lines changed

dojo/management/commands/pghistory_backfill.py

Lines changed: 157 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
This command creates initial snapshots for all existing records in tracked models.
55
"""
66
import logging
7+
import time
78

89
from django.apps import apps
910
from django.conf import settings
@@ -33,6 +34,16 @@ def add_arguments(self, parser):
3334
action="store_true",
3435
help="Show what would be done without actually creating events",
3536
)
37+
parser.add_argument(
38+
"--log-queries",
39+
action="store_true",
40+
help="Enable database query logging (default: enabled)",
41+
)
42+
parser.add_argument(
43+
"--no-log-queries",
44+
action="store_true",
45+
help="Disable database query logging",
46+
)
3647

3748
def get_excluded_fields(self, model_name):
3849
"""Get the list of excluded fields for a specific model from pghistory configuration."""
@@ -45,6 +56,89 @@ def get_excluded_fields(self, model_name):
4556
}
4657
return excluded_fields_map.get(model_name, [])
4758

59+
def process_batch(self, event_model, event_records, model_name, dry_run, batch_start_time, processed, backfill_count, *, is_final_batch=False):
60+
"""Process a batch of event records by bulk creating them in the database."""
61+
if not event_records:
62+
return 0, batch_start_time
63+
64+
if dry_run:
65+
actually_created = len(event_records)
66+
else:
67+
try:
68+
attempted = len(event_records)
69+
# No need to pass batch_size since we're already batching ourselves
70+
created_objects = event_model.objects.bulk_create(event_records)
71+
actually_created = len(created_objects) if created_objects else 0
72+
73+
if actually_created != attempted:
74+
logger.warning(
75+
f"bulk_create for {model_name}: attempted {attempted}, "
76+
f"actually created {actually_created} ({attempted - actually_created} skipped)",
77+
)
78+
except Exception:
79+
logger.exception(f"Failed to bulk create events for {model_name}")
80+
raise
81+
82+
# Calculate timing after the actual database operation
83+
batch_end_time = time.time()
84+
batch_duration = batch_end_time - batch_start_time
85+
batch_records_per_second = len(event_records) / batch_duration if batch_duration > 0 else 0
86+
87+
# Log batch timing
88+
if is_final_batch:
89+
self.stdout.write(f" Final batch: {batch_duration:.2f}s ({batch_records_per_second:.1f} records/sec)")
90+
else:
91+
progress = (processed + actually_created) / backfill_count * 100
92+
self.stdout.write(f" Processed {processed + actually_created:,}/{backfill_count:,} records needing backfill ({progress:.1f}%) - "
93+
f"Last batch: {batch_duration:.2f}s ({batch_records_per_second:.1f} records/sec)")
94+
95+
return actually_created, batch_end_time
96+
97+
def enable_db_logging(self):
98+
"""Enable database query logging for this command."""
99+
# Store original DEBUG setting
100+
self.original_debug = settings.DEBUG
101+
102+
# Configure database query logging
103+
db_logger = logging.getLogger("django.db.backends")
104+
db_logger.setLevel(logging.DEBUG)
105+
106+
# Add a handler if one doesn't exist
107+
if not db_logger.handlers:
108+
handler = logging.StreamHandler()
109+
formatter = logging.Formatter(
110+
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
111+
)
112+
handler.setFormatter(formatter)
113+
db_logger.addHandler(handler)
114+
115+
# Also enable the SQL logger specifically
116+
sql_logger = logging.getLogger("django.db.backends.sql")
117+
sql_logger.setLevel(logging.DEBUG)
118+
119+
# Ensure the root logger propagates to our handlers
120+
if not sql_logger.handlers:
121+
sql_logger.addHandler(handler)
122+
123+
# Enable query logging in Django settings
124+
settings.DEBUG = True
125+
126+
self.stdout.write(
127+
self.style.SUCCESS("Database query logging enabled"),
128+
)
129+
130+
def disable_db_logging(self):
131+
"""Disable database query logging."""
132+
# Restore original DEBUG setting
133+
settings.DEBUG = self.original_debug
134+
135+
# Disable query logging by setting a higher level
136+
logging.getLogger("django.db.backends").setLevel(logging.INFO)
137+
logging.getLogger("django.db.backends.sql").setLevel(logging.INFO)
138+
self.stdout.write(
139+
self.style.SUCCESS("Database query logging disabled"),
140+
)
141+
48142
def handle(self, *args, **options):
49143
if not settings.ENABLE_AUDITLOG or settings.AUDITLOG_TYPE != "django-pghistory":
50144
self.stdout.write(
@@ -55,6 +149,17 @@ def handle(self, *args, **options):
55149
)
56150
return
57151

152+
# Enable database query logging based on options
153+
# Default to enabled unless explicitly disabled
154+
enable_query_logging = not options.get("no_log_queries")
155+
156+
if enable_query_logging:
157+
self.enable_db_logging()
158+
else:
159+
self.stdout.write(
160+
self.style.WARNING("Database query logging disabled"),
161+
)
162+
58163
# Models that are tracked by pghistory
59164
tracked_models = [
60165
"Dojo_User", "Endpoint", "Engagement", "Finding", "Finding_Group",
@@ -83,9 +188,11 @@ def handle(self, *args, **options):
83188
)
84189

85190
total_processed = 0
191+
total_start_time = time.time()
86192
self.stdout.write(f"Starting backfill for {len(tracked_models)} model(s)...")
87193

88194
for model_name in tracked_models:
195+
model_start_time = time.time()
89196
self.stdout.write(f"\nProcessing {model_name}...")
90197

91198
try:
@@ -143,6 +250,7 @@ def handle(self, *args, **options):
143250
processed = 0
144251
event_records = []
145252
failed_records = []
253+
batch_start_time = time.time()
146254

147255
for instance in records_needing_backfill.iterator():
148256
try:
@@ -156,8 +264,17 @@ def handle(self, *args, **options):
156264
for field in instance._meta.fields:
157265
field_name = field.name
158266
if field_name not in excluded_fields:
159-
field_value = getattr(instance, field_name)
160-
event_data[field_name] = field_value
267+
# Handle foreign key fields differently
268+
if field.many_to_one: # ForeignKey field
269+
# For foreign keys, use the _id field to get the raw ID value
270+
# Store it under the _id field name for the Event model
271+
field_id_name = f"{field_name}_id"
272+
field_value = getattr(instance, field_id_name)
273+
event_data[field_id_name] = field_value
274+
else:
275+
# For non-foreign key fields, use value_from_object() to avoid queries
276+
field_value = field.value_from_object(instance)
277+
event_data[field_name] = field_value
161278

162279
# Explicitly preserve created timestamp from the original instance
163280
# Only if not excluded and exists
@@ -178,57 +295,32 @@ def handle(self, *args, **options):
178295

179296
event_records.append(EventModel(**event_data))
180297

181-
except Exception as e:
298+
except Exception:
182299
failed_records.append(instance.id)
183-
logger.error(
184-
f"Failed to prepare event for {model_name} ID {instance.id}: {e}",
300+
logger.exception(
301+
f"Failed to prepare event for {model_name} ID {instance.id}",
185302
)
186303

187304
# Bulk create when we hit batch_size records
188305
if len(event_records) >= batch_size:
189-
if not dry_run and event_records:
190-
try:
191-
attempted = len(event_records)
192-
created_objects = EventModel.objects.bulk_create(event_records, batch_size=batch_size)
193-
actually_created = len(created_objects) if created_objects else 0
194-
processed += actually_created
195-
196-
if actually_created != attempted:
197-
logger.warning(
198-
f"bulk_create for {model_name}: attempted {attempted}, "
199-
f"actually created {actually_created} ({attempted - actually_created} skipped)",
200-
)
201-
except Exception as e:
202-
logger.error(f"Failed to bulk create events for {model_name}: {e}")
203-
raise
204-
elif dry_run:
205-
processed += len(event_records)
306+
# Process the batch
307+
batch_processed, batch_start_time = self.process_batch(
308+
EventModel, event_records, model_name, dry_run,
309+
batch_start_time, processed, backfill_count,
310+
)
311+
processed += batch_processed
206312

207313
event_records = [] # Reset for next batch
208-
209-
# Progress update
210-
progress = (processed / backfill_count) * 100
211-
self.stdout.write(f" Processed {processed:,}/{backfill_count:,} records needing backfill ({progress:.1f}%)")
314+
batch_start_time = time.time() # Reset batch timer
212315

213316
# Handle remaining records
214317
if event_records:
215-
if not dry_run:
216-
try:
217-
attempted = len(event_records)
218-
created_objects = EventModel.objects.bulk_create(event_records, batch_size=batch_size)
219-
actually_created = len(created_objects) if created_objects else 0
220-
processed += actually_created
221-
222-
if actually_created != attempted:
223-
logger.warning(
224-
f"bulk_create final batch for {model_name}: attempted {attempted}, "
225-
f"actually created {actually_created} ({attempted - actually_created} skipped)",
226-
)
227-
except Exception as e:
228-
logger.error(f"Failed to bulk create final batch for {model_name}: {e}")
229-
raise
230-
else:
231-
processed += len(event_records)
318+
# Process the final batch
319+
batch_processed, _ = self.process_batch(
320+
EventModel, event_records, model_name, dry_run,
321+
batch_start_time, processed, backfill_count, is_final_batch=True,
322+
)
323+
processed += batch_processed
232324

233325
# Final progress update
234326
if backfill_count > 0:
@@ -237,29 +329,46 @@ def handle(self, *args, **options):
237329

238330
total_processed += processed
239331

240-
# Show completion summary
332+
# Calculate timing for this model
333+
model_end_time = time.time()
334+
model_duration = model_end_time - model_start_time
335+
records_per_second = processed / model_duration if model_duration > 0 else 0
336+
337+
# Show completion summary with timing
241338
if failed_records:
242339
self.stdout.write(
243340
self.style.WARNING(
244341
f" ⚠ Completed {model_name}: {processed:,} records processed, "
245-
f"{len(failed_records)} records failed",
342+
f"{len(failed_records)} records failed in {model_duration:.2f}s "
343+
f"({records_per_second:.1f} records/sec)",
246344
),
247345
)
248346
else:
249347
self.stdout.write(
250348
self.style.SUCCESS(
251-
f" ✓ Completed {model_name}: {processed:,} records",
349+
f" ✓ Completed {model_name}: {processed:,} records in {model_duration:.2f}s "
350+
f"({records_per_second:.1f} records/sec)",
252351
),
253352
)
254353

255354
except Exception as e:
256355
self.stdout.write(
257356
self.style.ERROR(f" ✗ Failed to process {model_name}: {e}"),
258357
)
259-
logger.error(f"Error processing {model_name}: {e}")
358+
logger.exception(f"Error processing {model_name}")
359+
360+
# Calculate total timing
361+
total_end_time = time.time()
362+
total_duration = total_end_time - total_start_time
363+
total_records_per_second = total_processed / total_duration if total_duration > 0 else 0
364+
365+
# Disable database query logging if it was enabled
366+
if enable_query_logging:
367+
self.disable_db_logging()
260368

261369
self.stdout.write(
262370
self.style.SUCCESS(
263-
f"\nBACKFILL COMPLETE: Processed {total_processed:,} records",
371+
f"\nBACKFILL COMPLETE: Processed {total_processed:,} records in {total_duration:.2f}s "
372+
f"({total_records_per_second:.1f} records/sec)",
264373
),
265374
)

0 commit comments

Comments
 (0)