Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions sources/pipedrive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,29 @@
To get an api key: https://pipedrive.readme.io/docs/how-to-find-the-api-token
"""

from typing import Any, Dict, Iterator, List, Optional, Union, Iterator
from typing import Any, Dict, Iterator, List, Optional, Union, Iterable

import dlt
from dlt.common import pendulum
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import TDataItems
from dlt.sources import DltResource

from .helpers.custom_fields_munger import update_fields_mapping, rename_fields
from .helpers.pages import get_recent_items_incremental, get_pages
from .helpers import group_deal_flows
from .typing import TDataPage
from .settings import ENTITY_MAPPINGS, RECENTS_ENTITIES
from dlt.common import pendulum
from dlt.common.time import ensure_pendulum_datetime
from dlt.sources import DltResource, TDataItems

# Export v2 source for easy access
from .rest_v2 import pipedrive_v2_source


@dlt.source(name="pipedrive")
def pipedrive_source(
pipedrive_api_key: str = dlt.secrets.value,
since_timestamp: Optional[Union[pendulum.DateTime, str]] = "1970-01-01 00:00:00",
) -> Iterator[DltResource]:
) -> Iterable[DltResource]:
"""
Get data from the Pipedrive API. Supports incremental loading and custom fields mapping.

Expand Down Expand Up @@ -60,6 +64,8 @@ def pipedrive_source(
Resources that depend on another resource are implemented as transformers
so they can re-use the original resource data without re-downloading.
Examples: deals_participants, deals_flow

Note: For v2 API endpoints, use pipedrive_v2_source from pipedrive.rest_v2
"""

# yield nice rename mapping
Expand Down
168 changes: 168 additions & 0 deletions sources/pipedrive/rest_v2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""Pipedrive API v2 source. Loads data from Pipedrive API v2 endpoints with incremental support."""

from typing import Iterable, Dict, Any, List, Optional, Union, cast
import dlt
from dlt.common import pendulum
from dlt.common.time import ensure_pendulum_datetime
from dlt.sources import DltResource
from dlt.sources.rest_api import rest_api_resources, RESTAPIConfig, EndpointResource

from ..settings import ENTITIES_V2, NESTED_ENTITIES_V2

INCREMENTAL_API_RESOURCES = {
"activities",
"deals",
"persons",
"organizations",
"products",
}


@dlt.source(name="pipedrive_v2", section="pipedrive")
def pipedrive_v2_source(
pipedrive_api_key: Optional[str] = dlt.secrets.value,
company_domain: Optional[str] = dlt.secrets.value,
resources: Optional[List[str]] = None,
prefix: str = "v2_",
since_timestamp: Optional[Union[pendulum.DateTime, str]] = "1970-01-01 00:00:00",
) -> Iterable[DltResource]:
"""Load Pipedrive API v2 data with incremental filtering.

Args:
pipedrive_api_key: The API key for authentication.
company_domain: The company domain for the Pipedrive account.
resources: List of resource names to load. If None, all available resources are loaded.
prefix: Prefix to add to resource names. Defaults to "v2_".
since_timestamp: Start timestamp for incremental loading. Defaults to "1970-01-01 00:00:00".

Returns:
Iterable[DltResource]: Resources for Pipedrive API v2 endpoints.
"""
resources = resources or list(ENTITIES_V2.keys())

v2_resources_config = {
res: ENTITIES_V2[res] for res in resources if res in ENTITIES_V2
}

if not v2_resources_config:
raise ValueError(f"No valid v2 endpoints found in: {resources}")

nested_configs_to_create = {
name: conf
for name, conf in NESTED_ENTITIES_V2.items()
if conf["parent"] in v2_resources_config
}
# pipdrive API expects RFC3339 format for updated_since parameter
since_timestamp_rfc3339 = ensure_pendulum_datetime(
since_timestamp
).to_rfc3339_string()

v2_resources = rest_v2_resources(
pipedrive_api_key,
company_domain,
v2_resources_config,
nested_configs_to_create,
prefix,
since_timestamp_rfc3339,
)
for resource in v2_resources:
yield resource


def rest_v2_resources(
pipedrive_api_key: str,
company_domain: str,
resource_configs: Dict[str, Any],
nested_configs: Dict[str, Dict[str, Any]],
prefix: str,
since_timestamp: str,
) -> Iterable[DltResource]:
"""Build REST v2 resources with nested endpoints.

Args:
pipedrive_api_key: The API key for authentication.
company_domain: The company domain for the Pipedrive account.
resource_configs: Configuration for main resources.
nested_configs: Configuration for nested/dependent resources.
prefix: Prefix to add to resource names.
since_timestamp: Timestamp for incremental filtering.

Returns:
Iterable[DltResource]: Configured REST API resources.
"""
resources = []

for resource_name, endpoint_config in resource_configs.items():
endpoint_def = {
**endpoint_config,
"path": resource_name,
"params": {**endpoint_config.get("params", {})},
}

if resource_name in INCREMENTAL_API_RESOURCES:
endpoint_def["params"]["updated_since"] = "{incremental.start_value}"
endpoint_def["incremental"] = {
"cursor_path": "update_time",
"initial_value": since_timestamp,
}

resources.append(
{
"name": f"{prefix}{resource_name}",
"endpoint": endpoint_def,
}
)

for name, conf in nested_configs.items():
nested_res = {
"name": f"{prefix}{name}",
"endpoint": {
"path": conf["endpoint_path"].replace(
"{id}", f"{{resources.{prefix}{conf['parent']}.id}}"
),
"params": {**conf.get("params", {})},
},
}

if "include_from_parent" in conf:
nested_res["include_from_parent"] = conf["include_from_parent"]

pk = conf.get("primary_key", "id")
if isinstance(pk, list):
nested_res["primary_key"] = [k for k in pk if not k.startswith("_")]
elif pk != "id":
nested_res["primary_key"] = pk

resources.append(nested_res)

config: RESTAPIConfig = {
"client": {
"base_url": f"https://{company_domain}.pipedrive.com/api/v2/",
"auth": {
"type": "api_key",
"name": "api_token",
"api_key": pipedrive_api_key,
"location": "query",
},
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"params": {
"limit": 500,
"sort_by": "update_time",
"sort_direction": "desc",
},
"data_selector": "data",
"paginator": {
"type": "cursor",
"cursor_path": "additional_data.next_cursor",
"cursor_param": "cursor",
},
},
},
"resources": cast(List[Union[str, EndpointResource, DltResource]], resources),
}

yield from rest_api_resources(config)
89 changes: 89 additions & 0 deletions sources/pipedrive/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,92 @@
"task": "tasks",
"user": "users",
}


"""
Available Pipedrive API v2 endpoints for configuration.

Note: Some endpoints (e.g., followers, deal_products) require nested configuration.
See NESTED_ENTITIES_V2 for examples.

# For more details, see: https://developers.pipedrive.com/docs/api/v2
"""
ENTITIES_V2 = {
"activities": {},
"deals": {
"params": {
"include_fields": (
"next_activity_id,last_activity_id,first_won_time,products_count,"
"files_count,notes_count,followers_count,email_messages_count,"
"activities_count,done_activities_count,undone_activities_count,"
"participants_count,last_incoming_mail_time,last_outgoing_mail_time,"
"smart_bcc_email"
)
}
},
"persons": {
"params": {
"include_fields": (
"next_activity_id,last_activity_id,open_deals_count,"
"related_open_deals_count,closed_deals_count,related_closed_deals_count,"
"participant_open_deals_count,participant_closed_deals_count,"
"email_messages_count,activities_count,done_activities_count,"
"undone_activities_count,files_count,notes_count,followers_count,"
"won_deals_count,related_won_deals_count,lost_deals_count,"
"related_lost_deals_count,last_incoming_mail_time,last_outgoing_mail_time"
)
}
},
"organizations": {
"params": {
"include_fields": (
"next_activity_id,last_activity_id,open_deals_count,"
"related_open_deals_count,closed_deals_count,related_closed_deals_count,"
"email_messages_count,activities_count,done_activities_count,"
"undone_activities_count,files_count,notes_count,followers_count,"
"won_deals_count,related_won_deals_count,lost_deals_count,"
"related_lost_deals_count"
)
}
},
"products": {},
"pipelines": {},
"stages": {},
}

# Nested V2 API Endpoints Configuration
# Automatically loaded when their parent resource is included in use_v2_endpoints.
NESTED_ENTITIES_V2 = {
"deal_products": {
"parent": "deals",
"endpoint_path": "deals/{id}/products",
"params": {
"limit": 500,
},
},
"deal_followers": {
"parent": "deals",
"endpoint_path": "deals/{id}/followers",
"primary_key": [
"user_id",
"_deals_id",
], # Followers don't have 'id', use composite key
"include_from_parent": ["id"], # Include deal id from parent
"params": {
"limit": 500,
},
},
}

# Default v2 resources to load when none are specified
# This curated set includes the most commonly used endpoints.
# Users can customize this list to match their needs.
# See ENTITIES_V2 above for all available v2 endpoints.
DEFAULT_V2_RESOURCES = [
"deals",
"persons",
"organizations",
"products",
"pipelines",
"stages",
]
58 changes: 54 additions & 4 deletions sources/pipedrive_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from typing import Optional, Sequence

import dlt
from pipedrive import pipedrive_source
from pipedrive import pipedrive_source, pipedrive_v2_source
from pipedrive.settings import DEFAULT_V2_RESOURCES


def load_pipedrive() -> None:
"""Constructs a pipeline that will load all pipedrive data"""
# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
pipeline_name="pipedrive",
destination="duckdb",
dataset_name="pipedrive_data",
progress="log",
)
load_info = pipeline.run(pipedrive_source())
print(load_info)
Expand All @@ -16,7 +22,10 @@ def load_pipedrive() -> None:
def load_selected_data() -> None:
"""Shows how to load just selected tables using `with_resources`"""
pipeline = dlt.pipeline(
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
pipeline_name="pipedrive",
destination="duckdb",
dataset_name="pipedrive_data",
progress="log",
)
# Use with_resources to select which entities to load
# Note: `custom_fields_mapping` must be included to translate custom field hashes to corresponding names
Expand Down Expand Up @@ -44,7 +53,10 @@ def load_selected_data() -> None:
def load_from_start_date() -> None:
"""Example to incrementally load activities limited to items updated after a given date"""
pipeline = dlt.pipeline(
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
pipeline_name="pipedrive",
destination="duckdb",
dataset_name="pipedrive_data",
progress="log",
)

# First source configure to load everything except activities from the beginning
Expand All @@ -61,10 +73,48 @@ def load_from_start_date() -> None:
print(load_info)


def load_v2_resources(resources: Optional[Sequence[str]] = None) -> None:
"""Load v2 entities using the separate v2 source."""

resources = list(resources or DEFAULT_V2_RESOURCES)
pipeline = dlt.pipeline(
pipeline_name="pipedrive",
destination="duckdb",
dataset_name="pipedrive_data",
progress="log",
)
source = pipedrive_v2_source(resources=resources)
load_info = pipeline.run(source)
print(load_info)
print(pipeline.last_trace.last_normalize_info)


def load_selected_v2_data() -> None:
"""Load only the specified v2 entities using `with_resources`."""
pipeline = dlt.pipeline(
pipeline_name="pipedrive",
destination="duckdb",
dataset_name="pipedrive_data",
progress="log",
)
# Nested dependencies (e.g., v2_deal_products, v2_deal_followers) must be explicitly listed.
load_info = pipeline.run(
pipedrive_v2_source().with_resources(
"v2_deals", "v2_persons", "v2_organizations"
)
)
print(load_info)
print(pipeline.last_trace.last_normalize_info)


if __name__ == "__main__":
# run our main example
load_pipedrive()
# load selected tables and display resource info
# load_selected_data()
# load activities updated since given date
# load_from_start_date()
# load v2 resources (optional addon)
load_v2_resources()
# load only selected v2 resources (3 major endpoints: deals, persons, organizations)
# load_selected_v2_data()