Skip to content

Commit 4a7807f

Browse files
committed
This commit introduces the ability to select specific v2 API endpoints
to be used in the Pipedrive source. Users can now configure this by passing a list of endpoint names to the `use_v2_endpoints` parameter. This allows for a gradual migration to v2 without enabling all endpoints at once. fix: clean whitespace and update pipedrive v2 source Revised structure for lint errors and making it simpler minor fixes
1 parent 78525b5 commit 4a7807f

File tree

4 files changed

+294
-11
lines changed

4 files changed

+294
-11
lines changed

sources/pipedrive/__init__.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,29 @@
99
To get an api key: https://pipedrive.readme.io/docs/how-to-find-the-api-token
1010
"""
1111

12-
from typing import Any, Dict, Iterator, List, Optional, Union, Iterator
12+
from typing import Any, Dict, Iterator, List, Optional, Union, Iterable
1313

1414
import dlt
15+
from dlt.common import pendulum
16+
from dlt.common.time import ensure_pendulum_datetime
17+
from dlt.common.typing import TDataItems
18+
from dlt.sources import DltResource
1519

1620
from .helpers.custom_fields_munger import update_fields_mapping, rename_fields
1721
from .helpers.pages import get_recent_items_incremental, get_pages
1822
from .helpers import group_deal_flows
1923
from .typing import TDataPage
2024
from .settings import ENTITY_MAPPINGS, RECENTS_ENTITIES
21-
from dlt.common import pendulum
22-
from dlt.common.time import ensure_pendulum_datetime
23-
from dlt.sources import DltResource, TDataItems
25+
26+
# Export v2 source for easy access
27+
from .rest_v2 import pipedrive_v2_source
2428

2529

2630
@dlt.source(name="pipedrive")
2731
def pipedrive_source(
2832
pipedrive_api_key: str = dlt.secrets.value,
2933
since_timestamp: Optional[Union[pendulum.DateTime, str]] = "1970-01-01 00:00:00",
30-
) -> Iterator[DltResource]:
34+
) -> Iterable[DltResource]:
3135
"""
3236
Get data from the Pipedrive API. Supports incremental loading and custom fields mapping.
3337
@@ -60,6 +64,8 @@ def pipedrive_source(
6064
Resources that depend on another resource are implemented as transformers
6165
so they can re-use the original resource data without re-downloading.
6266
Examples: deals_participants, deals_flow
67+
68+
Note: For v2 API endpoints, use pipedrive_v2_source from pipedrive.rest_v2
6369
"""
6470

6571
# yield nice rename mapping
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
from typing import Iterable, Dict, Any, List, Optional, Union, cast
2+
3+
import dlt
4+
from dlt.sources import DltResource
5+
from dlt.sources.rest_api import rest_api_source, RESTAPIConfig
6+
from dlt.sources.rest_api.typing import EndpointResource
7+
8+
from ..settings import ENTITIES_V2, NESTED_ENTITIES_V2
9+
10+
11+
@dlt.source(name="pipedrive_v2")
12+
def pipedrive_v2_source(
13+
pipedrive_api_key: str = dlt.secrets.value,
14+
company_domain: str = dlt.secrets.value,
15+
resources: Optional[List[str]] = None,
16+
prefix: str = "v2_",
17+
) -> Iterable[DltResource]:
18+
"""
19+
Get data from the Pipedrive API v2.
20+
21+
Args:
22+
pipedrive_api_key: API token for authentication
23+
company_domain: Your Pipedrive company domain
24+
resources: List of resource names to load (e.g., ["deals", "persons"]). If None, loads all available v2 resources.
25+
prefix: Prefix for table names (default: "v2_")
26+
27+
Returns:
28+
Resources for v2 endpoints. Nested endpoints (e.g., deal_products, deal_followers) are automatically included when their parent resource is selected.
29+
30+
See also: https://pipedrive.readme.io/docs/pipedrive-api-v2#api-v2-availability
31+
"""
32+
resources = resources or list(ENTITIES_V2.keys())
33+
34+
# Filter valid v2 endpoints
35+
v2_resources_config = {
36+
resource: ENTITIES_V2[resource]
37+
for resource in resources
38+
if resource in ENTITIES_V2 # this ensures that resource is supported by v2 api
39+
}
40+
41+
if not v2_resources_config:
42+
raise ValueError(
43+
f"No valid v2 endpoints found in: {resources}. "
44+
f"Available endpoints: {list(ENTITIES_V2.keys())}"
45+
)
46+
47+
# Only include nested endpoints if their parent is in the v2 endpoints list
48+
nested_configs_to_create = {
49+
nested_name: nested_config
50+
for nested_name, nested_config in NESTED_ENTITIES_V2.items()
51+
if nested_config["parent"] in v2_resources_config
52+
}
53+
54+
# Create and yield v2 resources
55+
v2_resources = rest_v2_resources(
56+
pipedrive_api_key,
57+
company_domain,
58+
v2_resources_config,
59+
nested_configs_to_create,
60+
prefix,
61+
)
62+
for resource in v2_resources:
63+
yield resource
64+
65+
66+
def rest_v2_resources(
67+
pipedrive_api_key: str,
68+
company_domain: str,
69+
resource_configs: Dict[str, Any],
70+
nested_configs: Dict[str, Dict[str, Any]],
71+
prefix: str,
72+
) -> Iterable[DltResource]:
73+
"""
74+
Build and yield REST v2 resources for the given resource configurations.
75+
Includes nested endpoints that depend on parent resources.
76+
"""
77+
# Build resources list
78+
resources: List[Dict[str, Any]] = []
79+
80+
# Build the resources list for the config from the provided resource configs
81+
for resource_name, endpoint_config in resource_configs.items():
82+
resource_def: Dict[str, Any] = {
83+
"name": resource_name,
84+
"endpoint": endpoint_config,
85+
}
86+
resources.append(resource_def)
87+
88+
# Add nested resources using native rest_api_source support
89+
for nested_name, nested_config in nested_configs.items():
90+
parent_name = nested_config["parent"]
91+
endpoint_path = nested_config["endpoint_path"]
92+
params = nested_config.get("params", {})
93+
primary_key: Union[str, List[str]] = nested_config.get("primary_key", "id")
94+
include_from_parent = nested_config.get("include_from_parent")
95+
96+
# Use native rest_api_source nested endpoint syntax: {resources.parent_name.id}
97+
nested_resource_def: Dict[str, Any] = {
98+
"name": nested_name,
99+
"endpoint": {
100+
"path": endpoint_path.replace(
101+
"{id}", f"{{resources.{parent_name}.id}}"
102+
),
103+
"params": params,
104+
},
105+
}
106+
if include_from_parent:
107+
nested_resource_def["include_from_parent"] = include_from_parent
108+
if primary_key != "id":
109+
nested_resource_def["primary_key"] = primary_key
110+
resources.append(nested_resource_def)
111+
112+
config: RESTAPIConfig = {
113+
"client": {
114+
"base_url": f"https://{company_domain}.pipedrive.com/api/v2/",
115+
"auth": {
116+
"type": "api_key",
117+
"name": "api_token",
118+
"api_key": pipedrive_api_key,
119+
"location": "query",
120+
},
121+
},
122+
"resource_defaults": {
123+
"primary_key": "id",
124+
"write_disposition": "merge",
125+
"endpoint": {
126+
"params": {
127+
"limit": 500,
128+
"sort_by": "update_time",
129+
"sort_direction": "desc",
130+
},
131+
"data_selector": "data",
132+
"paginator": {
133+
"type": "cursor",
134+
"cursor_path": "additional_data.next_cursor",
135+
"cursor_param": "cursor",
136+
},
137+
},
138+
},
139+
"resources": cast(List[Union[str, EndpointResource, DltResource]], resources),
140+
}
141+
142+
api_source = rest_api_source(config)
143+
for resource in api_source.resources.values():
144+
yield resource.with_name(f"{prefix}{resource.name}")

sources/pipedrive/settings.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,82 @@
2727
"task": "tasks",
2828
"user": "users",
2929
}
30+
31+
32+
"""
33+
Available Pipedrive API v2 endpoints for configuration.
34+
35+
Note: Some endpoints (e.g., followers, deal_products) require nested configuration.
36+
See NESTED_ENTITIES_V2 for examples.
37+
38+
# For more details, see: https://developers.pipedrive.com/docs/api/v2
39+
"""
40+
ENTITIES_V2 = {
41+
"activities": {},
42+
"deals": {
43+
"params": {
44+
"include_fields": (
45+
"next_activity_id,last_activity_id,first_won_time,products_count,"
46+
"files_count,notes_count,followers_count,email_messages_count,"
47+
"activities_count,done_activities_count,undone_activities_count,"
48+
"participants_count,last_incoming_mail_time,last_outgoing_mail_time,"
49+
"smart_bcc_email"
50+
)
51+
}
52+
},
53+
"persons": {
54+
"params": {
55+
"include_fields": (
56+
"next_activity_id,last_activity_id,open_deals_count,"
57+
"related_open_deals_count,closed_deals_count,related_closed_deals_count,"
58+
"participant_open_deals_count,participant_closed_deals_count,"
59+
"email_messages_count,activities_count,done_activities_count,"
60+
"undone_activities_count,files_count,notes_count,followers_count,"
61+
"won_deals_count,related_won_deals_count,lost_deals_count,"
62+
"related_lost_deals_count,last_incoming_mail_time,last_outgoing_mail_time"
63+
)
64+
}
65+
},
66+
"organizations": {
67+
"params": {
68+
"include_fields": (
69+
"next_activity_id,last_activity_id,open_deals_count,"
70+
"related_open_deals_count,closed_deals_count,related_closed_deals_count,"
71+
"email_messages_count,activities_count,done_activities_count,"
72+
"undone_activities_count,files_count,notes_count,followers_count,"
73+
"won_deals_count,related_won_deals_count,lost_deals_count,"
74+
"related_lost_deals_count"
75+
)
76+
}
77+
},
78+
"products": {},
79+
"pipelines": {},
80+
"stages": {},
81+
}
82+
83+
# Nested V2 API Endpoints Configuration
84+
# Automatically loaded when their parent resource is included in use_v2_endpoints.
85+
NESTED_ENTITIES_V2 = {
86+
"deal_products": {
87+
"parent": "deals",
88+
"endpoint_path": "deals/{id}/products",
89+
"params": {
90+
"limit": 500,
91+
},
92+
},
93+
"deal_followers": {
94+
"parent": "deals",
95+
"endpoint_path": "deals/{id}/followers",
96+
"primary_key": ["user_id", "_deals_id"], # Followers don't have 'id', use composite key
97+
"include_from_parent": ["id"], # Include deal id from parent
98+
"params": {
99+
"limit": 500,
100+
},
101+
},
102+
}
103+
104+
# Default v2 resources to load when none are specified
105+
# This curated set includes the most commonly used endpoints.
106+
# Users can customize this list to match their needs.
107+
# See ENTITIES_V2 above for all available v2 endpoints.
108+
DEFAULT_V2_RESOURCES = ["deals", "persons", "organizations" ,"products", "pipelines", "stages"]

sources/pipedrive_pipeline.py

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
1+
from typing import Optional, Sequence
2+
13
import dlt
2-
from pipedrive import pipedrive_source
4+
from pipedrive import pipedrive_source, pipedrive_v2_source
5+
from pipedrive.settings import DEFAULT_V2_RESOURCES
36

47

58
def load_pipedrive() -> None:
69
"""Constructs a pipeline that will load all pipedrive data"""
710
# configure the pipeline with your destination details
811
pipeline = dlt.pipeline(
9-
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
12+
pipeline_name="pipedrive",
13+
destination="duckdb",
14+
dataset_name="pipedrive_data",
15+
progress="log",
16+
dev_mode=True,
1017
)
1118
load_info = pipeline.run(pipedrive_source())
1219
print(load_info)
@@ -16,13 +23,18 @@ def load_pipedrive() -> None:
1623
def load_selected_data() -> None:
1724
"""Shows how to load just selected tables using `with_resources`"""
1825
pipeline = dlt.pipeline(
19-
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
26+
pipeline_name="pipedrive",
27+
destination="duckdb",
28+
dataset_name="pipedrive_data",
29+
progress="log",
30+
dev_mode=True,
2031
)
2132
# Use with_resources to select which entities to load
2233
# Note: `custom_fields_mapping` must be included to translate custom field hashes to corresponding names
2334
load_info = pipeline.run(
2435
pipedrive_source().with_resources(
25-
"products", "deals", "deals_participants", "custom_fields_mapping"
36+
# "products", "deals", "deals_participants", "custom_fields_mapping"
37+
"deals"
2638
)
2739
)
2840
print(load_info)
@@ -44,7 +56,11 @@ def load_selected_data() -> None:
4456
def load_from_start_date() -> None:
4557
"""Example to incrementally load activities limited to items updated after a given date"""
4658
pipeline = dlt.pipeline(
47-
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
59+
pipeline_name="pipedrive",
60+
destination="duckdb",
61+
dataset_name="pipedrive_data",
62+
progress="log",
63+
dev_mode=True,
4864
)
4965

5066
# First source configure to load everything except activities from the beginning
@@ -61,10 +77,48 @@ def load_from_start_date() -> None:
6177
print(load_info)
6278

6379

80+
def load_v2_resources(resources: Optional[Sequence[str]] = None) -> None:
81+
"""Load v2 entities using the separate v2 source.
82+
83+
Note: company_domain will be read from dlt secrets if not provided.
84+
"""
85+
resources = list(resources or DEFAULT_V2_RESOURCES)
86+
pipeline = dlt.pipeline(
87+
pipeline_name="pipedrive",
88+
destination="duckdb",
89+
dataset_name="pipedrive_data",
90+
progress="log",
91+
dev_mode=True,
92+
)
93+
source = pipedrive_v2_source(resources=resources)
94+
load_info = pipeline.run(source)
95+
print(load_info)
96+
print(pipeline.last_trace.last_normalize_info)
97+
98+
99+
def load_selected_v2_data(resources: Sequence[str]) -> None:
100+
"""Load only the specified v2 entities (and their nested resources)."""
101+
pipeline = dlt.pipeline(
102+
pipeline_name="pipedrive",
103+
destination="duckdb",
104+
dataset_name="pipedrive_data",
105+
progress="log",
106+
dev_mode=True,
107+
)
108+
source = pipedrive_v2_source(resources=list(resources))
109+
load_info = pipeline.run(source)
110+
print(load_info)
111+
print(pipeline.last_trace.last_normalize_info)
112+
113+
64114
if __name__ == "__main__":
65115
# run our main example
66-
load_pipedrive()
116+
# load_pipedrive()
67117
# load selected tables and display resource info
68118
# load_selected_data()
69119
# load activities updated since given date
70120
# load_from_start_date()
121+
# load v2 resources (optional addon)
122+
load_v2_resources()
123+
# load only selected v2 resources
124+
# load_selected_v2_data(["deals", "stages"])

0 commit comments

Comments
 (0)