Skip to content

Commit 32e7022

Browse files
committed
fix(aci): Implement DataSource.normalize_before_relocation_import
1 parent 13fa062 commit 32e7022

File tree

6 files changed

+160
-2
lines changed

6 files changed

+160
-2
lines changed

src/sentry/monitors/models.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,7 @@ class Meta:
814814

815815
@data_source_type_registry.register(DATA_SOURCE_CRON_MONITOR)
816816
class CronMonitorDataSourceHandler(DataSourceTypeHandler[Monitor]):
817+
@override
817818
@staticmethod
818819
def bulk_get_query_object(
819820
data_sources: list[DataSource],
@@ -834,6 +835,7 @@ def bulk_get_query_object(
834835
}
835836
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}
836837

838+
@override
837839
@staticmethod
838840
def related_model(instance) -> list[ModelRelation]:
839841
return [ModelRelation(Monitor, {"id": instance.source_id})]
@@ -848,3 +850,8 @@ def get_instance_limit(org: Organization) -> int | None:
848850
def get_current_instance_count(org: Organization) -> int:
849851
# We don't have a limit at the moment, so no need to count.
850852
raise NotImplementedError
853+
854+
@override
855+
@staticmethod
856+
def get_relocation_model_name() -> str:
857+
return "monitors.monitor"

src/sentry/snuba/models.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ def write_relocation_import(
182182

183183
@data_source_type_registry.register(DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION)
184184
class QuerySubscriptionDataSourceHandler(DataSourceTypeHandler[QuerySubscription]):
185+
@override
185186
@staticmethod
186187
def bulk_get_query_object(
187188
data_sources: list[DataSource],
@@ -203,6 +204,7 @@ def bulk_get_query_object(
203204
}
204205
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}
205206

207+
@override
206208
@staticmethod
207209
def related_model(instance) -> list[ModelRelation]:
208210
return [ModelRelation(QuerySubscription, {"id": instance.source_id})]
@@ -223,3 +225,8 @@ def get_current_instance_count(org: Organization) -> int:
223225
QuerySubscription.Status.UPDATING.value,
224226
),
225227
).count()
228+
229+
@override
230+
@staticmethod
231+
def get_relocation_model_name() -> str:
232+
return "sentry.querysubscription"

src/sentry/uptime/models.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ class UptimeRegionScheduleMode(enum.StrEnum):
186186

187187
@data_source_type_registry.register(DATA_SOURCE_UPTIME_SUBSCRIPTION)
188188
class UptimeSubscriptionDataSourceHandler(DataSourceTypeHandler[UptimeSubscription]):
189+
@override
189190
@staticmethod
190191
def bulk_get_query_object(
191192
data_sources: list[DataSource],
@@ -210,6 +211,7 @@ def bulk_get_query_object(
210211
}
211212
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}
212213

214+
@override
213215
@staticmethod
214216
def related_model(instance) -> list[ModelRelation]:
215217
return [ModelRelation(UptimeSubscription, {"id": instance.source_id})]
@@ -225,6 +227,11 @@ def get_current_instance_count(org: Organization) -> int:
225227
# We don't have a limit at the moment, so no need to count.
226228
raise NotImplementedError
227229

230+
@override
231+
@staticmethod
232+
def get_relocation_model_name() -> str:
233+
return "uptime.uptimesubscription"
234+
228235

229236
def get_detector(uptime_subscription: UptimeSubscription, prefetch_workflow_data=False) -> Detector:
230237
"""

src/sentry/workflow_engine/models/data_source.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
11
import builtins
22
import dataclasses
3+
import logging
34
from typing import Generic, TypeVar
45

56
from django.db import models
67
from django.db.models.signals import pre_save
78
from django.dispatch import receiver
89

9-
from sentry.backup.scopes import RelocationScope
10+
from sentry.backup.dependencies import NormalizedModelName, PrimaryKeyMap
11+
from sentry.backup.helpers import ImportFlags
12+
from sentry.backup.scopes import ImportScope, RelocationScope
1013
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model
1114
from sentry.utils.registry import NoRegistrationExistsError
1215
from sentry.workflow_engine.models.data_source_detector import DataSourceDetector
1316
from sentry.workflow_engine.registry import data_source_type_registry
1417
from sentry.workflow_engine.types import DataSourceTypeHandler
1518

19+
logger = logging.getLogger(__name__)
20+
1621
T = TypeVar("T")
1722

1823

@@ -25,6 +30,13 @@ class DataPacket(Generic[T]):
2530
@region_silo_model
2631
class DataSource(DefaultFieldsModel):
2732
__relocation_scope__ = RelocationScope.Organization
33+
# DataSource.source_id dynamically references different models based on the 'type' field.
34+
# We declare all possible dependencies here to ensure proper import ordering.
35+
__relocation_dependencies__ = {
36+
"monitors.monitor", # For DATA_SOURCE_CRON_MONITOR
37+
"sentry.querysubscription", # For DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION
38+
"uptime.uptimesubscription", # For DATA_SOURCE_UPTIME_SUBSCRIPTION
39+
}
2840

2941
organization = FlexibleForeignKey("sentry.Organization")
3042

@@ -49,6 +61,43 @@ def type_handler(self) -> builtins.type[DataSourceTypeHandler]:
4961
raise ValueError(f"Unknown data source type: {self.type}")
5062
return handler
5163

64+
def normalize_before_relocation_import(
65+
self, pk_map: PrimaryKeyMap, scope: ImportScope, flags: ImportFlags
66+
) -> int | None:
67+
old_pk = super().normalize_before_relocation_import(pk_map, scope, flags)
68+
if old_pk is None:
69+
return None
70+
71+
# Map source_id based on the data source type
72+
try:
73+
handler = data_source_type_registry.get(self.type)
74+
model_name = NormalizedModelName(handler.get_relocation_model_name())
75+
old_source_id = int(self.source_id)
76+
new_source_id = pk_map.get_pk(model_name, old_source_id)
77+
78+
if new_source_id is not None:
79+
self.source_id = str(new_source_id)
80+
else:
81+
# Referenced model not in pk_map. This may be correct (reset_pks=False) or broken
82+
# (reset_pks=True but referenced model was filtered out or failed to import).
83+
logger.warning(
84+
"DataSource source_id not remapped - referenced model not in pk_map",
85+
extra={
86+
"data_source_id": old_pk,
87+
"type": self.type,
88+
"source_id": old_source_id,
89+
"model": str(model_name),
90+
},
91+
)
92+
except Exception:
93+
logger.exception(
94+
"DataSource.normalize_before_relocation_import failed",
95+
extra={"data_source_id": old_pk, "type": self.type, "source_id": self.source_id},
96+
)
97+
return None
98+
99+
return old_pk
100+
52101

53102
@receiver(pre_save, sender=DataSource)
54103
def ensure_type_handler_registered(sender, instance: DataSource, **kwargs):

src/sentry/workflow_engine/types.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,9 @@ def execute(event_data: WorkflowEventData, action: Action, detector: Detector) -
108108
raise NotImplementedError
109109

110110

111-
class DataSourceTypeHandler(Generic[T]):
111+
class DataSourceTypeHandler(ABC, Generic[T]):
112112
@staticmethod
113+
@abstractmethod
113114
def bulk_get_query_object(data_sources) -> dict[int, T | None]:
114115
"""
115116
Bulk fetch related data-source models returning a dict of the
@@ -118,6 +119,7 @@ def bulk_get_query_object(data_sources) -> dict[int, T | None]:
118119
raise NotImplementedError
119120

120121
@staticmethod
122+
@abstractmethod
121123
def related_model(instance) -> list[ModelRelation]:
122124
"""
123125
A list of deletion ModelRelations. The model relation query should map
@@ -127,6 +129,7 @@ def related_model(instance) -> list[ModelRelation]:
127129
raise NotImplementedError
128130

129131
@staticmethod
132+
@abstractmethod
130133
def get_instance_limit(org: Organization) -> int | None:
131134
"""
132135
Returns the maximum number of instances of this data source type for the organization.
@@ -135,13 +138,24 @@ def get_instance_limit(org: Organization) -> int | None:
135138
raise NotImplementedError
136139

137140
@staticmethod
141+
@abstractmethod
138142
def get_current_instance_count(org: Organization) -> int:
139143
"""
140144
Returns the current number of instances of this data source type for the organization.
141145
Only called if `get_instance_limit` returns a number >0
142146
"""
143147
raise NotImplementedError
144148

149+
@staticmethod
150+
@abstractmethod
151+
def get_relocation_model_name() -> str:
152+
"""
153+
Returns the normalized model name (e.g., "sentry.querysubscription") for the model that
154+
source_id references. This is used during backup/relocation to map old PKs to new PKs.
155+
The format is "app_label.model_name" in lowercase.
156+
"""
157+
raise NotImplementedError
158+
145159

146160
class DataConditionHandler(Generic[T]):
147161
class Group(StrEnum):

tests/sentry/workflow_engine/models/test_data_source.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
import pytest
44

5+
from sentry.backup.dependencies import ImportKind, NormalizedModelName, PrimaryKeyMap
6+
from sentry.backup.helpers import ImportFlags
7+
from sentry.backup.scopes import ImportScope
8+
from sentry.monitors.types import DATA_SOURCE_CRON_MONITOR
59
from sentry.workflow_engine.registry import data_source_type_registry
610
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest
711

@@ -18,3 +22,73 @@ def test_data_source_valid_type(self) -> None:
1822
data_source = self.create_data_source(type="test")
1923
assert data_source is not None
2024
assert data_source.type == "test"
25+
26+
def test_normalize_before_relocation_import(self) -> None:
27+
"""Test that normalize_before_relocation_import correctly maps source_id"""
28+
monitor = self.create_monitor(project=self.project)
29+
data_source = self.create_data_source(
30+
type=DATA_SOURCE_CRON_MONITOR,
31+
source_id=str(monitor.id),
32+
organization_id=self.organization.id,
33+
)
34+
35+
old_monitor_pk = monitor.id
36+
new_monitor_pk = 9999
37+
old_data_source_id = data_source.id
38+
old_org_id = data_source.organization_id
39+
40+
# Create a PrimaryKeyMap that maps the old monitor ID to a new one
41+
pk_map = PrimaryKeyMap()
42+
pk_map.insert(
43+
model_name=NormalizedModelName("monitors.monitor"),
44+
old=old_monitor_pk,
45+
new=new_monitor_pk,
46+
kind=ImportKind.Inserted,
47+
)
48+
pk_map.insert(
49+
model_name=NormalizedModelName("sentry.organization"),
50+
old=old_org_id,
51+
new=old_org_id,
52+
kind=ImportKind.Inserted,
53+
)
54+
55+
old_data_source_pk = data_source.normalize_before_relocation_import(
56+
pk_map, ImportScope.Organization, ImportFlags()
57+
)
58+
59+
assert (
60+
old_data_source_pk == old_data_source_id
61+
), f"Expected {old_data_source_id}, got {old_data_source_pk}"
62+
assert data_source.source_id == str(new_monitor_pk)
63+
assert data_source.pk is None
64+
65+
def test_normalize_before_relocation_import_missing_source(self) -> None:
66+
"""Test that normalize_before_relocation_import succeeds but doesn't update source_id if mapping not found"""
67+
monitor = self.create_monitor(project=self.project)
68+
data_source = self.create_data_source(
69+
type=DATA_SOURCE_CRON_MONITOR,
70+
source_id=str(monitor.id),
71+
organization_id=self.organization.id,
72+
)
73+
74+
old_source_id = data_source.source_id
75+
old_data_source_id = data_source.id
76+
old_org_id = data_source.organization_id
77+
78+
# Create a PrimaryKeyMap without the monitor mapping
79+
pk_map = PrimaryKeyMap()
80+
pk_map.insert(
81+
model_name=NormalizedModelName("sentry.organization"),
82+
old=old_org_id,
83+
new=old_org_id,
84+
kind=ImportKind.Inserted,
85+
)
86+
87+
result = data_source.normalize_before_relocation_import(
88+
pk_map, ImportScope.Organization, ImportFlags()
89+
)
90+
91+
# Should succeed but leave source_id unchanged
92+
assert result == old_data_source_id
93+
assert data_source.source_id == old_source_id
94+
assert data_source.pk is None

0 commit comments

Comments
 (0)