diff --git a/sources/pipedrive/__init__.py b/sources/pipedrive/__init__.py index b7964731c..9f4e76582 100644 --- a/sources/pipedrive/__init__.py +++ b/sources/pipedrive/__init__.py @@ -9,25 +9,29 @@ To get an api key: https://pipedrive.readme.io/docs/how-to-find-the-api-token """ -from typing import Any, Dict, Iterator, List, Optional, Union, Iterator +from typing import Any, Dict, Iterator, List, Optional, Union, Iterable import dlt +from dlt.common import pendulum +from dlt.common.time import ensure_pendulum_datetime +from dlt.common.typing import TDataItems +from dlt.sources import DltResource from .helpers.custom_fields_munger import update_fields_mapping, rename_fields from .helpers.pages import get_recent_items_incremental, get_pages from .helpers import group_deal_flows from .typing import TDataPage from .settings import ENTITY_MAPPINGS, RECENTS_ENTITIES -from dlt.common import pendulum -from dlt.common.time import ensure_pendulum_datetime -from dlt.sources import DltResource, TDataItems + +# Export v2 source for easy access +from .rest_v2 import pipedrive_v2_source @dlt.source(name="pipedrive") def pipedrive_source( pipedrive_api_key: str = dlt.secrets.value, since_timestamp: Optional[Union[pendulum.DateTime, str]] = "1970-01-01 00:00:00", -) -> Iterator[DltResource]: +) -> Iterable[DltResource]: """ Get data from the Pipedrive API. Supports incremental loading and custom fields mapping. @@ -60,6 +64,8 @@ def pipedrive_source( Resources that depend on another resource are implemented as transformers so they can re-use the original resource data without re-downloading. Examples: deals_participants, deals_flow + + Note: For v2 API endpoints, use pipedrive_v2_source from pipedrive.rest_v2 """ # yield nice rename mapping diff --git a/sources/pipedrive/rest_v2/__init__.py b/sources/pipedrive/rest_v2/__init__.py new file mode 100644 index 000000000..37c6c4472 --- /dev/null +++ b/sources/pipedrive/rest_v2/__init__.py @@ -0,0 +1,168 @@ +"""Pipedrive API v2 source. Loads data from Pipedrive API v2 endpoints with incremental support.""" + +from typing import Iterable, Dict, Any, List, Optional, Union, cast +import dlt +from dlt.common import pendulum +from dlt.common.time import ensure_pendulum_datetime +from dlt.sources import DltResource +from dlt.sources.rest_api import rest_api_resources, RESTAPIConfig, EndpointResource + +from ..settings import ENTITIES_V2, NESTED_ENTITIES_V2 + +INCREMENTAL_API_RESOURCES = { + "activities", + "deals", + "persons", + "organizations", + "products", +} + + +@dlt.source(name="pipedrive_v2", section="pipedrive") +def pipedrive_v2_source( + pipedrive_api_key: Optional[str] = dlt.secrets.value, + company_domain: Optional[str] = dlt.secrets.value, + resources: Optional[List[str]] = None, + prefix: str = "v2_", + since_timestamp: Optional[Union[pendulum.DateTime, str]] = "1970-01-01 00:00:00", +) -> Iterable[DltResource]: + """Load Pipedrive API v2 data with incremental filtering. + + Args: + pipedrive_api_key: The API key for authentication. + company_domain: The company domain for the Pipedrive account. + resources: List of resource names to load. If None, all available resources are loaded. + prefix: Prefix to add to resource names. Defaults to "v2_". + since_timestamp: Start timestamp for incremental loading. Defaults to "1970-01-01 00:00:00". + + Returns: + Iterable[DltResource]: Resources for Pipedrive API v2 endpoints. + """ + resources = resources or list(ENTITIES_V2.keys()) + + v2_resources_config = { + res: ENTITIES_V2[res] for res in resources if res in ENTITIES_V2 + } + + if not v2_resources_config: + raise ValueError(f"No valid v2 endpoints found in: {resources}") + + nested_configs_to_create = { + name: conf + for name, conf in NESTED_ENTITIES_V2.items() + if conf["parent"] in v2_resources_config + } + # pipdrive API expects RFC3339 format for updated_since parameter + since_timestamp_rfc3339 = ensure_pendulum_datetime( + since_timestamp + ).to_rfc3339_string() + + v2_resources = rest_v2_resources( + pipedrive_api_key, + company_domain, + v2_resources_config, + nested_configs_to_create, + prefix, + since_timestamp_rfc3339, + ) + for resource in v2_resources: + yield resource + + +def rest_v2_resources( + pipedrive_api_key: str, + company_domain: str, + resource_configs: Dict[str, Any], + nested_configs: Dict[str, Dict[str, Any]], + prefix: str, + since_timestamp: str, +) -> Iterable[DltResource]: + """Build REST v2 resources with nested endpoints. + + Args: + pipedrive_api_key: The API key for authentication. + company_domain: The company domain for the Pipedrive account. + resource_configs: Configuration for main resources. + nested_configs: Configuration for nested/dependent resources. + prefix: Prefix to add to resource names. + since_timestamp: Timestamp for incremental filtering. + + Returns: + Iterable[DltResource]: Configured REST API resources. + """ + resources = [] + + for resource_name, endpoint_config in resource_configs.items(): + endpoint_def = { + **endpoint_config, + "path": resource_name, + "params": {**endpoint_config.get("params", {})}, + } + + if resource_name in INCREMENTAL_API_RESOURCES: + endpoint_def["params"]["updated_since"] = "{incremental.start_value}" + endpoint_def["incremental"] = { + "cursor_path": "update_time", + "initial_value": since_timestamp, + } + + resources.append( + { + "name": f"{prefix}{resource_name}", + "endpoint": endpoint_def, + } + ) + + for name, conf in nested_configs.items(): + nested_res = { + "name": f"{prefix}{name}", + "endpoint": { + "path": conf["endpoint_path"].replace( + "{id}", f"{{resources.{prefix}{conf['parent']}.id}}" + ), + "params": {**conf.get("params", {})}, + }, + } + + if "include_from_parent" in conf: + nested_res["include_from_parent"] = conf["include_from_parent"] + + pk = conf.get("primary_key", "id") + if isinstance(pk, list): + nested_res["primary_key"] = [k for k in pk if not k.startswith("_")] + elif pk != "id": + nested_res["primary_key"] = pk + + resources.append(nested_res) + + config: RESTAPIConfig = { + "client": { + "base_url": f"https://{company_domain}.pipedrive.com/api/v2/", + "auth": { + "type": "api_key", + "name": "api_token", + "api_key": pipedrive_api_key, + "location": "query", + }, + }, + "resource_defaults": { + "primary_key": "id", + "write_disposition": "merge", + "endpoint": { + "params": { + "limit": 500, + "sort_by": "update_time", + "sort_direction": "desc", + }, + "data_selector": "data", + "paginator": { + "type": "cursor", + "cursor_path": "additional_data.next_cursor", + "cursor_param": "cursor", + }, + }, + }, + "resources": cast(List[Union[str, EndpointResource, DltResource]], resources), + } + + yield from rest_api_resources(config) diff --git a/sources/pipedrive/settings.py b/sources/pipedrive/settings.py index 86c43bce5..5f960f0fd 100644 --- a/sources/pipedrive/settings.py +++ b/sources/pipedrive/settings.py @@ -27,3 +27,92 @@ "task": "tasks", "user": "users", } + + +""" +Available Pipedrive API v2 endpoints for configuration. + +Note: Some endpoints (e.g., followers, deal_products) require nested configuration. +See NESTED_ENTITIES_V2 for examples. + +# For more details, see: https://developers.pipedrive.com/docs/api/v2 +""" +ENTITIES_V2 = { + "activities": {}, + "deals": { + "params": { + "include_fields": ( + "next_activity_id,last_activity_id,first_won_time,products_count," + "files_count,notes_count,followers_count,email_messages_count," + "activities_count,done_activities_count,undone_activities_count," + "participants_count,last_incoming_mail_time,last_outgoing_mail_time," + "smart_bcc_email" + ) + } + }, + "persons": { + "params": { + "include_fields": ( + "next_activity_id,last_activity_id,open_deals_count," + "related_open_deals_count,closed_deals_count,related_closed_deals_count," + "participant_open_deals_count,participant_closed_deals_count," + "email_messages_count,activities_count,done_activities_count," + "undone_activities_count,files_count,notes_count,followers_count," + "won_deals_count,related_won_deals_count,lost_deals_count," + "related_lost_deals_count,last_incoming_mail_time,last_outgoing_mail_time" + ) + } + }, + "organizations": { + "params": { + "include_fields": ( + "next_activity_id,last_activity_id,open_deals_count," + "related_open_deals_count,closed_deals_count,related_closed_deals_count," + "email_messages_count,activities_count,done_activities_count," + "undone_activities_count,files_count,notes_count,followers_count," + "won_deals_count,related_won_deals_count,lost_deals_count," + "related_lost_deals_count" + ) + } + }, + "products": {}, + "pipelines": {}, + "stages": {}, +} + +# Nested V2 API Endpoints Configuration +# Automatically loaded when their parent resource is included in use_v2_endpoints. +NESTED_ENTITIES_V2 = { + "deal_products": { + "parent": "deals", + "endpoint_path": "deals/{id}/products", + "params": { + "limit": 500, + }, + }, + "deal_followers": { + "parent": "deals", + "endpoint_path": "deals/{id}/followers", + "primary_key": [ + "user_id", + "_deals_id", + ], # Followers don't have 'id', use composite key + "include_from_parent": ["id"], # Include deal id from parent + "params": { + "limit": 500, + }, + }, +} + +# Default v2 resources to load when none are specified +# This curated set includes the most commonly used endpoints. +# Users can customize this list to match their needs. +# See ENTITIES_V2 above for all available v2 endpoints. +DEFAULT_V2_RESOURCES = [ + "deals", + "persons", + "organizations", + "products", + "pipelines", + "stages", +] diff --git a/sources/pipedrive_pipeline.py b/sources/pipedrive_pipeline.py index edc3d6e2e..de5320338 100644 --- a/sources/pipedrive_pipeline.py +++ b/sources/pipedrive_pipeline.py @@ -1,12 +1,18 @@ +from typing import Optional, Sequence + import dlt -from pipedrive import pipedrive_source +from pipedrive import pipedrive_source, pipedrive_v2_source +from pipedrive.settings import DEFAULT_V2_RESOURCES def load_pipedrive() -> None: """Constructs a pipeline that will load all pipedrive data""" # configure the pipeline with your destination details pipeline = dlt.pipeline( - pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data" + pipeline_name="pipedrive", + destination="duckdb", + dataset_name="pipedrive_data", + progress="log", ) load_info = pipeline.run(pipedrive_source()) print(load_info) @@ -16,7 +22,10 @@ def load_pipedrive() -> None: def load_selected_data() -> None: """Shows how to load just selected tables using `with_resources`""" pipeline = dlt.pipeline( - pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data" + pipeline_name="pipedrive", + destination="duckdb", + dataset_name="pipedrive_data", + progress="log", ) # Use with_resources to select which entities to load # Note: `custom_fields_mapping` must be included to translate custom field hashes to corresponding names @@ -44,7 +53,10 @@ def load_selected_data() -> None: def load_from_start_date() -> None: """Example to incrementally load activities limited to items updated after a given date""" pipeline = dlt.pipeline( - pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data" + pipeline_name="pipedrive", + destination="duckdb", + dataset_name="pipedrive_data", + progress="log", ) # First source configure to load everything except activities from the beginning @@ -61,6 +73,40 @@ def load_from_start_date() -> None: print(load_info) +def load_v2_resources(resources: Optional[Sequence[str]] = None) -> None: + """Load v2 entities using the separate v2 source.""" + + resources = list(resources or DEFAULT_V2_RESOURCES) + pipeline = dlt.pipeline( + pipeline_name="pipedrive", + destination="duckdb", + dataset_name="pipedrive_data", + progress="log", + ) + source = pipedrive_v2_source(resources=resources) + load_info = pipeline.run(source) + print(load_info) + print(pipeline.last_trace.last_normalize_info) + + +def load_selected_v2_data() -> None: + """Load only the specified v2 entities using `with_resources`.""" + pipeline = dlt.pipeline( + pipeline_name="pipedrive", + destination="duckdb", + dataset_name="pipedrive_data", + progress="log", + ) + # Nested dependencies (e.g., v2_deal_products, v2_deal_followers) must be explicitly listed. + load_info = pipeline.run( + pipedrive_v2_source().with_resources( + "v2_deals", "v2_persons", "v2_organizations" + ) + ) + print(load_info) + print(pipeline.last_trace.last_normalize_info) + + if __name__ == "__main__": # run our main example load_pipedrive() @@ -68,3 +114,7 @@ def load_from_start_date() -> None: # load_selected_data() # load activities updated since given date # load_from_start_date() + # load v2 resources (optional addon) + load_v2_resources() + # load only selected v2 resources (3 major endpoints: deals, persons, organizations) + # load_selected_v2_data()