Skip to content

Commit 4800d4b

Browse files
authored
Merge branch 'master' into make-hubspot-incremental
2 parents 0e96158 + 00a045f commit 4800d4b

File tree

9 files changed

+427
-208
lines changed

9 files changed

+427
-208
lines changed

sources/facebook_ads/__init__.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
)
4040
from .settings import (
4141
FACEBOOK_INSIGHTS_RETENTION_PERIOD,
42-
ALL_ACTION_BREAKDOWNS,
4342
ALL_ACTION_ATTRIBUTION_WINDOWS,
4443
DEFAULT_INSIGHT_FIELDS,
4544
INSIGHT_FIELDS_TYPES,
@@ -126,8 +125,8 @@ def facebook_insights_source(
126125
fields: Sequence[str] = DEFAULT_INSIGHT_FIELDS,
127126
attribution_window_days_lag: int = 7,
128127
time_increment_days: int = 1,
129-
breakdowns: TInsightsBreakdownOptions = "ads_insights",
130-
action_breakdowns: Sequence[str] = ALL_ACTION_BREAKDOWNS,
128+
breakdowns: TInsightsBreakdownOptions = None,
129+
action_breakdowns: Sequence[str] = None,
131130
level: TInsightsLevels = "ad",
132131
action_attribution_windows: Sequence[str] = ALL_ACTION_ATTRIBUTION_WINDOWS,
133132
batch_size: int = 50,
@@ -149,8 +148,8 @@ def facebook_insights_source(
149148
fields (Sequence[str], optional): A list of fields to include in each reports. Note that `breakdowns` option adds fields automatically. Defaults to DEFAULT_INSIGHT_FIELDS.
150149
attribution_window_days_lag (int, optional): Attribution window in days. The reports in attribution window are refreshed on each run.. Defaults to 7.
151150
time_increment_days (int, optional): The report aggregation window in days. use 7 for weekly aggregation. Defaults to 1.
152-
breakdowns (TInsightsBreakdownOptions, optional): A presents with common aggregations. See settings.py for details. Defaults to "ads_insights_age_and_gender".
153-
action_breakdowns (Sequence[str], optional): Action aggregation types. See settings.py for details. Defaults to ALL_ACTION_BREAKDOWNS.
151+
breakdowns (TInsightsBreakdownOptions, optional): A presents with common aggregations. See settings.py for details. Defaults to None (no breakdowns).
152+
action_breakdowns (Sequence[str], optional): Action aggregation types. See settings.py for details. Defaults to None (no action breakdowns).
154153
level (TInsightsLevels, optional): The granularity level. Defaults to "ad".
155154
action_attribution_windows (Sequence[str], optional): Attribution windows for actions. Defaults to ALL_ACTION_ATTRIBUTION_WINDOWS.
156155
batch_size (int, optional): Page size when reading data from particular report. Defaults to 50.
@@ -186,16 +185,7 @@ def facebook_insights(
186185
while start_date <= end_date:
187186
query = {
188187
"level": level,
189-
"action_breakdowns": list(action_breakdowns),
190-
"breakdowns": list(
191-
INSIGHTS_BREAKDOWNS_OPTIONS[breakdowns]["breakdowns"]
192-
),
193188
"limit": batch_size,
194-
"fields": list(
195-
set(fields)
196-
.union(INSIGHTS_BREAKDOWNS_OPTIONS[breakdowns]["fields"])
197-
.difference(INVALID_INSIGHTS_FIELDS)
198-
),
199189
"time_increment": time_increment_days,
200190
"action_attribution_windows": list(action_attribution_windows),
201191
"time_ranges": [
@@ -207,6 +197,22 @@ def facebook_insights(
207197
}
208198
],
209199
}
200+
201+
fields_to_use = set(fields)
202+
# Only add breakdowns if explicitly provided
203+
if breakdowns is not None:
204+
query["breakdowns"] = list(
205+
INSIGHTS_BREAKDOWNS_OPTIONS[breakdowns]["breakdowns"]
206+
)
207+
fields_to_use = fields_to_use.union(
208+
INSIGHTS_BREAKDOWNS_OPTIONS[breakdowns]["fields"]
209+
)
210+
query["fields"] = list(fields_to_use.difference(INVALID_INSIGHTS_FIELDS))
211+
212+
# Only add action_breakdowns if explicitly provided
213+
if action_breakdowns is not None:
214+
query["action_breakdowns"] = list(action_breakdowns)
215+
210216
job = execute_job(account.get_insights(params=query, is_async=True))
211217
yield list(map(process_report_item, job.get_result()))
212218
start_date = start_date.add(days=time_increment_days)

sources/facebook_ads_pipeline.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,30 @@ def load_insights() -> None:
117117
print(info)
118118

119119

120+
def load_insights_with_breakdowns() -> None:
121+
"""Shows how to load insights with custom breakdowns and action breakdowns"""
122+
pipeline = dlt.pipeline(
123+
pipeline_name="facebook_insights_breakdowns",
124+
destination="duckdb",
125+
dataset_name="facebook_insights_data",
126+
dev_mode=True,
127+
)
128+
# Load insights with age and gender breakdowns
129+
i_with_breakdowns = facebook_insights_source(
130+
initial_load_past_days=7,
131+
breakdowns="ads_insights_age_and_gender",
132+
# Uncomment to add action breakdowns:
133+
# action_breakdowns=["action_type", "action_target_id"]
134+
)
135+
info = pipeline.run(i_with_breakdowns)
136+
print(info)
137+
138+
120139
if __name__ == "__main__":
121140
# load_all_ads_objects()
122141
merge_ads_objects()
123142
# load_ads_with_custom_fields()
124143
# load_only_disapproved_ads()
125144
# load_and_enrich_objects()
126145
# load_insights()
146+
# load_insights_with_breakdowns()

sources/pg_replication/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Resources that can be loaded using this verified source are:
66
| Name | Description |
77
|----------------------|-------------------------------------------------|
88
| replication_resource | Load published messages from a replication slot |
9+
| init_replication | Initialize replication and optionally return snapshot resources for initial data load |
910

1011
## Initialize the pipeline
1112

@@ -29,6 +30,13 @@ It also needs `CREATE` privilege on the database:
2930
GRANT CREATE ON DATABASE dlt_data TO replication_user;
3031
```
3132

33+
If not a superuser, the user must have ownership of the tables that need to be replicated:
34+
35+
```sql
36+
ALTER TABLE your_table OWNER TO replication_user;
37+
```
38+
39+
3240
### Set up RDS
3341
1. You must enable replication for RDS Postgres instance via **Parameter Group**: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.Replication.ReadReplicas.html
3442
2. `WITH LOGIN REPLICATION;` does not work on RDS, instead do:

sources/pg_replication/helpers.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
from dlt.sources.credentials import ConnectionStringCredentials
4343
from dlt.sources.sql_database import (
4444
sql_table as core_sql_table,
45-
sql_database as core_sql_datbase,
45+
sql_database as core_sql_database,
4646
)
4747

4848
from .schema_types import _to_dlt_column_schema, _to_dlt_val
@@ -114,7 +114,8 @@ def init_replication(
114114
table_names (Optional[Union[str, Sequence[str]]]): Name(s) of the table(s)
115115
to include in the publication. If not provided, the whole schema with `schema_name` will be replicated
116116
(also tables added to the schema after the publication was created). You need superuser privileges
117-
for the schema replication.
117+
for the whole schema replication. When specifying individual table names, the database role must
118+
own the tables if it's not a superuser.
118119
credentials (ConnectionStringCredentials): Postgres database credentials.
119120
publish (str): Comma-separated string of DML operations. Can be used to
120121
control which changes are included in the publication. Allowed operations
@@ -184,7 +185,7 @@ def init_replication(
184185
# do not include dlt tables
185186
table_names = [
186187
table_name
187-
for table_name in core_sql_datbase(
188+
for table_name in core_sql_database(
188189
credentials, schema=schema_name, reflection_level="minimal"
189190
).resources.keys()
190191
if not table_name.lower().startswith(DLT_NAME_PREFIX)

0 commit comments

Comments
 (0)