Skip to content

Commit 43c08a1

Browse files
committed
Revised structure for lint errors and making it simpler
1 parent 122479d commit 43c08a1

File tree

6 files changed

+131
-96
lines changed

6 files changed

+131
-96
lines changed

ai/cd

Whitespace-only changes.

ai/git

Whitespace-only changes.

sources/pipedrive/__init__.py

Lines changed: 8 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,21 @@
2222
from dlt.common.time import ensure_pendulum_datetime
2323
from dlt.sources import DltResource, TDataItems
2424

25-
from .rest_v2 import rest_v2_resources
26-
from .settings import ENTITIES_V2, NESTED_ENTITIES_V2
25+
# Export v2 source for easy access
26+
from .rest_v2 import pipedrive_v2_source
2727

2828

2929
@dlt.source(name="pipedrive")
3030
def pipedrive_source(
3131
pipedrive_api_key: str = dlt.secrets.value,
32-
company_domain: str = dlt.secrets.value,
3332
since_timestamp: Optional[Union[pendulum.DateTime, str]] = "1970-01-01 00:00:00",
34-
use_v2_endpoints: Optional[List[str]] = None,
35-
v2_prefix: str = "v2_",
36-
v2_only: bool = False,
3733
) -> Iterator[DltResource]:
3834
"""
3935
Get data from the Pipedrive API. Supports incremental loading and custom fields mapping.
40-
Can switch between v1 and v2 endpoints.
4136
4237
Args:
4338
pipedrive_api_key: https://pipedrive.readme.io/docs/how-to-find-the-api-token
44-
company_domain: The domain of your company in Pipedrive
4539
since_timestamp: Starting timestamp for incremental loading. By default complete history is loaded on first run.
46-
use_v2_endpoints: A list of resource names to load using the v2 API. If None, all resources are loaded from v1.
47-
v2_prefix: A prefix to add to the table names of resources loaded from v2 endpoints.
48-
v2_only: When True, only v2 resources are yielded (v1 resources are skipped).
4940
5041
Returns resources:
5142
custom_fields_mapping
@@ -72,51 +63,22 @@ def pipedrive_source(
7263
Resources that depend on another resource are implemented as transformers
7364
so they can re-use the original resource data without re-downloading.
7465
Examples: deals_participants, deals_flow
75-
"""
76-
# create v2 resources
77-
use_v2_endpoints = use_v2_endpoints or []
78-
v2_resources_config = {
79-
resource: ENTITIES_V2[resource]
80-
for resource in use_v2_endpoints
81-
if resource in ENTITIES_V2 # this ensures that resource is supported by v2 api
82-
}
83-
if v2_resources_config:
84-
# Only include nested endpoints if their parent is in the v2 endpoints list
85-
nested_configs_to_create = {
86-
nested_name: nested_config
87-
for nested_name, nested_config in NESTED_ENTITIES_V2.items()
88-
if nested_config["parent"] in v2_resources_config
89-
}
90-
91-
v2_resources = rest_v2_resources(
92-
pipedrive_api_key,
93-
company_domain,
94-
v2_resources_config,
95-
nested_configs_to_create,
96-
v2_prefix,
97-
)
98-
for resource in v2_resources:
99-
yield resource
10066
101-
if v2_only:
102-
return
103-
elif v2_only:
104-
raise ValueError(
105-
"v2_only was set but no valid v2 endpoints were supplied via use_v2_endpoints."
106-
)
67+
Note: For v2 API endpoints, use pipedrive_v2_source from pipedrive.rest_v2
68+
"""
10769

108-
# yield nice rename mapping - always from v1
70+
# yield nice rename mapping
10971
yield create_state(pipedrive_api_key) | parsed_mapping
11072

111-
# parse timestamp and build kwargs for v1
73+
# parse timestamp and build kwargs
11274
since_timestamp = ensure_pendulum_datetime(since_timestamp).strftime(
11375
"%Y-%m-%d %H:%M:%S"
11476
)
11577
resource_kwargs: Any = (
11678
{"since_timestamp": since_timestamp} if since_timestamp else {}
11779
)
11880

119-
# create resources for all v1 endpoints
81+
# create resources for all endpoints
12082
endpoints_resources = {}
12183
for entity, resource_name in RECENTS_ENTITIES.items():
12284
endpoints_resources[resource_name] = dlt.resource(
@@ -128,7 +90,7 @@ def pipedrive_source(
12890

12991
yield from endpoints_resources.values()
13092

131-
# create transformers for deals to participants and flows, attached to v1 deals resource
93+
# create transformers for deals to participants and flows
13294
yield endpoints_resources["deals"] | dlt.transformer(
13395
name="deals_participants", write_disposition="merge", primary_key="id"
13496
)(_get_deals_participants)(pipedrive_api_key)
Lines changed: 94 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,66 @@
1-
from typing import Iterable, Dict, Any, List, cast
1+
from typing import Iterable, Dict, Any, List, Optional, Iterator, Union, cast
22

33
import dlt
44
from dlt.sources import DltResource
5-
from dlt.sources.rest_api import rest_api_source
5+
from dlt.sources.rest_api import rest_api_source, RESTAPIConfig
6+
from dlt.sources.rest_api.typing import EndpointResource
7+
8+
from ..settings import ENTITIES_V2, NESTED_ENTITIES_V2
9+
10+
11+
@dlt.source(name="pipedrive_v2")
12+
def pipedrive_v2_source(
13+
pipedrive_api_key: str = dlt.secrets.value,
14+
company_domain: str = dlt.secrets.value,
15+
resources: Optional[List[str]] = None,
16+
prefix: str = "v2_",
17+
) -> Iterator[DltResource]:
18+
"""
19+
Get data from the Pipedrive API v2.
20+
21+
Args:
22+
pipedrive_api_key: API token for authentication
23+
company_domain: Your Pipedrive company domain
24+
resources: List of resource names to load (e.g., ["deals", "persons"]). If None, loads all available v2 resources.
25+
prefix: Prefix for table names (default: "v2_")
26+
27+
Returns:
28+
Resources for v2 endpoints. Nested endpoints (e.g., deal_products, deal_followers) are automatically included when their parent resource is selected.
29+
30+
See also: https://pipedrive.readme.io/docs/pipedrive-api-v2#api-v2-availability
31+
"""
32+
resources = resources or list(ENTITIES_V2.keys())
33+
34+
# Filter valid v2 endpoints
35+
v2_resources_config = {
36+
resource: ENTITIES_V2[resource]
37+
for resource in resources
38+
if resource in ENTITIES_V2 # this ensures that resource is supported by v2 api
39+
}
40+
41+
if not v2_resources_config:
42+
raise ValueError(
43+
f"No valid v2 endpoints found in: {resources}. "
44+
f"Available endpoints: {list(ENTITIES_V2.keys())}"
45+
)
46+
47+
# Only include nested endpoints if their parent is in the v2 endpoints list
48+
nested_configs_to_create = {
49+
nested_name: nested_config
50+
for nested_name, nested_config in NESTED_ENTITIES_V2.items()
51+
if nested_config["parent"] in v2_resources_config
52+
}
53+
54+
# Create and yield v2 resources
55+
v2_resources = rest_v2_resources(
56+
pipedrive_api_key,
57+
company_domain,
58+
v2_resources_config,
59+
nested_configs_to_create,
60+
prefix,
61+
)
62+
for resource in v2_resources:
63+
yield resource
664

765

866
def rest_v2_resources(
@@ -16,40 +74,9 @@ def rest_v2_resources(
1674
Build and yield REST v2 resources for the given resource configurations.
1775
Includes nested endpoints that depend on parent resources.
1876
"""
19-
77+
# Build resources list
2078
resources: List[Dict[str, Any]] = []
2179

22-
config: Dict[str, Any] = {
23-
"client": {
24-
"base_url": f"https://{company_domain}.pipedrive.com/api/v2/",
25-
"auth": {
26-
"type": "api_key",
27-
"name": "api_token",
28-
"api_key": pipedrive_api_key,
29-
"location": "query",
30-
},
31-
},
32-
"resource_defaults": {
33-
"primary_key": "id",
34-
"write_disposition": "merge",
35-
"endpoint": {
36-
"params": {
37-
"limit": 500,
38-
"sort_by": "update_time",
39-
"sort_direction": "desc",
40-
},
41-
"data_selector": "data",
42-
"paginator": {
43-
"type": "cursor",
44-
"cursor_path": "additional_data.next_cursor",
45-
"cursor_param": "cursor",
46-
},
47-
},
48-
},
49-
# IMPORTANT: bind the typed list here
50-
"resources": resources,
51-
}
52-
5380
# Build the resources list for the config from the provided resource configs
5481
for resource_name, endpoint_config in resource_configs.items():
5582
resource_def: Dict[str, Any] = {
@@ -63,7 +90,7 @@ def rest_v2_resources(
6390
parent_name = nested_config["parent"]
6491
endpoint_path = nested_config["endpoint_path"]
6592
params = nested_config.get("params", {})
66-
primary_key = nested_config.get("primary_key", "id")
93+
primary_key: Union[str, List[str]] = nested_config.get("primary_key", "id")
6794
include_from_parent = nested_config.get("include_from_parent")
6895

6996
# Use native rest_api_source nested endpoint syntax: {resources.parent_name.id}
@@ -80,10 +107,40 @@ def rest_v2_resources(
80107
nested_resource_def["include_from_parent"] = include_from_parent
81108
if primary_key != "id":
82109
nested_resource_def["primary_key"] = primary_key
83-
84110
resources.append(nested_resource_def)
85111

86-
api_source = rest_api_source(cast(Any, config))
112+
# Create config with proper typing
113+
# Cast resources to the expected type since our Dict[str, Any] matches EndpointResource structure
114+
config: RESTAPIConfig = {
115+
"client": {
116+
"base_url": f"https://{company_domain}.pipedrive.com/api/v2/",
117+
"auth": {
118+
"type": "api_key",
119+
"name": "api_token",
120+
"api_key": pipedrive_api_key,
121+
"location": "query",
122+
},
123+
},
124+
"resource_defaults": {
125+
"primary_key": "id",
126+
"write_disposition": "merge",
127+
"endpoint": {
128+
"params": {
129+
"limit": 500,
130+
"sort_by": "update_time",
131+
"sort_direction": "desc",
132+
},
133+
"data_selector": "data",
134+
"paginator": {
135+
"type": "cursor",
136+
"cursor_path": "additional_data.next_cursor",
137+
"cursor_param": "cursor",
138+
},
139+
},
140+
},
141+
"resources": cast(List[Union[str, EndpointResource, DltResource]], resources),
142+
}
87143

144+
api_source = rest_api_source(config)
88145
for resource in api_source.resources.values():
89146
yield resource.with_name(f"{prefix}{resource.name}")

sources/pipedrive/settings.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
"user": "users",
2929
}
3030

31+
3132
"""
3233
Available Pipedrive API v2 endpoints for configuration.
3334
@@ -36,7 +37,6 @@
3637
3738
# For more details, see: https://developers.pipedrive.com/docs/api/v2
3839
"""
39-
4040
ENTITIES_V2 = {
4141
"activities": {},
4242
"deals": {

sources/pipedrive_pipeline.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
from typing import Optional, Sequence
22

33
import dlt
4-
from pipedrive import pipedrive_source
4+
from pipedrive import pipedrive_source, pipedrive_v2_source
55
from pipedrive.settings import DEFAULT_V2_RESOURCES
66

77

88
def load_pipedrive() -> None:
99
"""Constructs a pipeline that will load all pipedrive data"""
1010
# configure the pipeline with your destination details
1111
pipeline = dlt.pipeline(
12-
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
12+
pipeline_name="pipedrive",
13+
destination="duckdb",
14+
dataset_name="pipedrive_data",
15+
progress="log",
16+
dev_mode=True,
1317
)
1418
load_info = pipeline.run(pipedrive_source())
1519
print(load_info)
@@ -19,13 +23,18 @@ def load_pipedrive() -> None:
1923
def load_selected_data() -> None:
2024
"""Shows how to load just selected tables using `with_resources`"""
2125
pipeline = dlt.pipeline(
22-
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
26+
pipeline_name="pipedrive",
27+
destination="duckdb",
28+
dataset_name="pipedrive_data",
29+
progress="log",
30+
dev_mode=True,
2331
)
2432
# Use with_resources to select which entities to load
2533
# Note: `custom_fields_mapping` must be included to translate custom field hashes to corresponding names
2634
load_info = pipeline.run(
2735
pipedrive_source().with_resources(
28-
"products", "deals", "deals_participants", "custom_fields_mapping"
36+
# "products", "deals", "deals_participants", "custom_fields_mapping"
37+
"deals"
2938
)
3039
)
3140
print(load_info)
@@ -47,7 +56,11 @@ def load_selected_data() -> None:
4756
def load_from_start_date() -> None:
4857
"""Example to incrementally load activities limited to items updated after a given date"""
4958
pipeline = dlt.pipeline(
50-
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
59+
pipeline_name="pipedrive",
60+
destination="duckdb",
61+
dataset_name="pipedrive_data",
62+
progress="log",
63+
dev_mode=True,
5164
)
5265

5366
# First source configure to load everything except activities from the beginning
@@ -65,16 +78,19 @@ def load_from_start_date() -> None:
6578

6679

6780
def load_v2_resources(resources: Optional[Sequence[str]] = None) -> None:
68-
"""Load v2 entities using the default configuration."""
81+
"""Load v2 entities using the separate v2 source.
82+
83+
Note: company_domain will be read from dlt secrets if not provided.
84+
"""
6985
resources = list(resources or DEFAULT_V2_RESOURCES)
7086
pipeline = dlt.pipeline(
71-
pipeline_name="pipedrive_v2232",
87+
pipeline_name="pipedrive",
7288
destination="duckdb",
73-
dataset_name="pipedrive_v2_data",
89+
dataset_name="pipedrive_data",
7490
progress="log",
7591
dev_mode=True,
7692
)
77-
source = pipedrive_source(use_v2_endpoints=resources, v2_only=True)
93+
source = pipedrive_v2_source(resources=resources)
7894
load_info = pipeline.run(source)
7995
print(load_info)
8096
print(pipeline.last_trace.last_normalize_info)
@@ -83,13 +99,13 @@ def load_v2_resources(resources: Optional[Sequence[str]] = None) -> None:
8399
def load_selected_v2_data(resources: Sequence[str]) -> None:
84100
"""Load only the specified v2 entities (and their nested resources)."""
85101
pipeline = dlt.pipeline(
86-
pipeline_name="pipedrive_v2_selected",
102+
pipeline_name="pipedrive",
87103
destination="duckdb",
88-
dataset_name="pipedrive_v2_data",
104+
dataset_name="pipedrive_data",
89105
progress="log",
90106
dev_mode=True,
91107
)
92-
source = pipedrive_source(use_v2_endpoints=list(resources), v2_only=True)
108+
source = pipedrive_v2_source(resources=list(resources))
93109
load_info = pipeline.run(source)
94110
print(load_info)
95111
print(pipeline.last_trace.last_normalize_info)

0 commit comments

Comments
 (0)