Skip to content

Commit 68648f2

Browse files
authored
Merge pull request #752 from superannotateai/FRIDAY-3604
added Databricks support in attach_items_from_integrated_storage
2 parents 98d5876 + ee73f3a commit 68648f2

File tree

6 files changed

+191
-25
lines changed

6 files changed

+191
-25
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ coverage.xml
5454
*.py,cover
5555
.hypothesis/
5656
.pytest_cache/
57+
tests/tmp_test.py
5758

5859
# Translations
5960
*.mo

src/superannotate/lib/app/interface/sdk_interface.py

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2919,32 +2919,80 @@ def attach_items_from_integrated_storage(
29192919
project: NotEmptyStr,
29202920
integration: Union[NotEmptyStr, IntegrationEntity],
29212921
folder_path: Optional[NotEmptyStr] = None,
2922+
*,
2923+
query: Optional[NotEmptyStr] = None,
2924+
item_name_column: Optional[NotEmptyStr] = None,
2925+
custom_item_name: Optional[NotEmptyStr] = None,
2926+
component_mapping: Optional[Dict[str, str]] = None,
29222927
):
2923-
"""Link images from integrated external storage to SuperAnnotate.
2928+
"""Link images from integrated external storage to SuperAnnotate from AWS, GCP, Azure, Databricks.
29242929
29252930
:param project: project name or folder path where items should be attached (e.g., “project1/folder1”).
29262931
:type project: str
29272932
2928-
:param integration: existing integration name or metadata dict to pull items from.
2929-
Mandatory keys in integration metadata’s dict is “name”.
2933+
:param integration: The existing integration name or metadata dict to pull items from.
2934+
Mandatory keys in integration metadata’s dict is “name”.
29302935
:type integration: str or dict
29312936
29322937
:param folder_path: Points to an exact folder/directory within given storage.
2933-
If None, items are fetched from the root directory.
2938+
If None, items are fetched from the root directory.
29342939
:type folder_path: str
2940+
2941+
:param query: (Only for Databricks). The SQL query to retrieve specific columns from Databricks.
2942+
If provided, the function will execute the query and use the results for mapping and uploading.
2943+
:type query: Optional[str]
2944+
2945+
:param item_name_column: (Only for Databricks). The column name from the SQL query whose values
2946+
will be used as item names. If this is provided, custom_item_name cannot be used.
2947+
The column must exist in the query result.
2948+
:type item_name_column: Optional[str]
2949+
2950+
:param custom_item_name: (Only for Databricks). A manually defined prefix for item names.
2951+
A random 10-character suffix will be appended to ensure uniqueness.
2952+
If this is provided, item_name_column cannot be used.
2953+
:type custom_item_name: Optional[str]
2954+
2955+
:param component_mapping: (Only for Databricks). A dictionary mapping Databricks
2956+
columns to SuperAnnotate component IDs.
2957+
:type component_mapping: Optional[dict]
2958+
2959+
2960+
Request Example:
2961+
::
2962+
2963+
client.attach_items_from_integrated_storage(
2964+
project="project_name",
2965+
integration="databricks_integration",
2966+
query="SELECT * FROM integration_data LIMIT 10",
2967+
item_name_column="prompt",
2968+
component_mapping={
2969+
"category": "_item_category",
2970+
"prompt_id": "id",
2971+
"prompt": "prompt"
2972+
}
2973+
)
2974+
29352975
"""
29362976
project, folder = self.controller.get_project_folder_by_path(project)
29372977
_integration = None
29382978
if isinstance(integration, str):
29392979
integration = IntegrationEntity(name=integration)
29402980
for i in self.controller.integrations.list().data:
2941-
if integration.name == i.name:
2981+
if integration.name.lower() == i.name.lower():
29422982
_integration = i
29432983
break
29442984
else:
29452985
raise AppException("Integration not found.")
2986+
29462987
response = self.controller.integrations.attach_items(
2947-
project, folder, _integration, folder_path
2988+
project=project,
2989+
folder=folder,
2990+
integration=_integration,
2991+
folder_path=folder_path,
2992+
query=query,
2993+
item_name_column=item_name_column,
2994+
custom_item_name=custom_item_name,
2995+
component_mapping=component_mapping,
29482996
)
29492997
if response.errors:
29502998
raise AppException(response.errors)

src/superannotate/lib/core/serviceproviders.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ def attach_items(
593593
folder: entities.FolderEntity,
594594
integration: entities.IntegrationEntity,
595595
folder_name: str = None,
596+
options: Dict[str, str] = None,
596597
) -> ServiceResponse:
597598
raise NotImplementedError
598599

Lines changed: 121 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
from typing import List
1+
from typing import Dict
2+
from typing import Optional
23

4+
from lib.core.conditions import Condition
5+
from lib.core.conditions import CONDITION_EQ as EQ
36
from lib.core.entities import FolderEntity
47
from lib.core.entities import IntegrationEntity
58
from lib.core.entities import ProjectEntity
9+
from lib.core.entities.integrations import IntegrationTypeEnum
10+
from lib.core.enums import ProjectType
611
from lib.core.exceptions import AppException
712
from lib.core.reporter import Reporter
813
from lib.core.response import Response
@@ -25,6 +30,11 @@ def execute(self) -> Response:
2530

2631

2732
class AttachIntegrations(BaseReportableUseCase):
33+
MULTIMODAL_INTEGRATIONS = [
34+
IntegrationTypeEnum.DATABRICKS,
35+
IntegrationTypeEnum.SNOWFLAKE,
36+
]
37+
2838
def __init__(
2939
self,
3040
reporter: Reporter,
@@ -33,46 +43,139 @@ def __init__(
3343
service_provider: BaseServiceProvider,
3444
integration: IntegrationEntity,
3545
folder_path: str = None,
46+
query: Optional[str] = None,
47+
item_name_column: Optional[str] = None,
48+
custom_item_name: Optional[str] = None,
49+
component_mapping: Optional[Dict[str, str]] = None,
3650
):
37-
3851
super().__init__(reporter)
3952
self._project = project
4053
self._folder = folder
4154
self._integration = integration
4255
self._service_provider = service_provider
4356
self._folder_path = folder_path
57+
self._query = query
58+
self._item_name_column = item_name_column
59+
self._custom_item_name = custom_item_name
60+
self._component_mapping = component_mapping
61+
self._options = {} # using only for Databricks and Snowflake
62+
self._item_category_column = None
4463

4564
@property
4665
def _upload_path(self):
4766
return f"{self._project.name}{f'/{self._folder.name}' if self._folder.name != 'root' else ''}"
4867

49-
def execute(self) -> Response:
50-
integrations: List[
51-
IntegrationEntity
52-
] = self._service_provider.integrations.list().data.integrations
53-
integration_name_lower = self._integration.name.lower()
54-
integration = next(
55-
(i for i in integrations if i.name.lower() == integration_name_lower), None
68+
def validate_integration(self):
69+
# TODO add support in next iterations
70+
if self._integration.type == IntegrationTypeEnum.SNOWFLAKE:
71+
raise AppException(
72+
"Attaching items is not supported with Snowflake integration."
73+
)
74+
75+
if self._integration.type in self.MULTIMODAL_INTEGRATIONS:
76+
if self._project.type != ProjectType.MULTIMODAL:
77+
raise AppException(
78+
f"{self._integration.name} integration is supported only for Multimodal projects."
79+
)
80+
81+
def validate_options_for_multimodal_integration(self):
82+
if self._integration.type in self.MULTIMODAL_INTEGRATIONS:
83+
if self._item_name_column and self._custom_item_name:
84+
raise AppException(
85+
"‘item_name_column and custom_item_name cannot be used simultaneously."
86+
)
87+
88+
if not self._item_name_column and not self._custom_item_name:
89+
raise AppException(
90+
"Either item_name_column or custom_item_name is required."
91+
)
92+
93+
if not all((self._query, self._component_mapping)):
94+
raise AppException(
95+
f"{self._integration.name} integration requires both a query and component_mapping."
96+
)
97+
98+
category_setting: bool = bool(
99+
next(
100+
(
101+
setting.value
102+
for setting in self._service_provider.projects.list_settings(
103+
self._project
104+
).data
105+
if setting.attribute == "CategorizeItems"
106+
),
107+
None,
108+
)
109+
)
110+
if (
111+
not category_setting
112+
and "_item_category" in self._component_mapping.values()
113+
):
114+
raise AppException(
115+
"Item Category must be enabled for a project to use _item_category"
116+
)
117+
118+
item_category_column = next(
119+
(
120+
k
121+
for k, v in self._component_mapping.items()
122+
if v == "_item_category"
123+
),
124+
None,
125+
)
126+
if item_category_column:
127+
self._item_category_column = self._component_mapping.pop(
128+
item_category_column
129+
)
130+
131+
sa_components = [
132+
c.name.lower()
133+
for c in self._service_provider.annotation_classes.list(
134+
condition=Condition("project_id", self._project.id, EQ)
135+
).data
136+
]
137+
138+
for i in self._component_mapping.values():
139+
if i.lower() not in sa_components:
140+
raise AppException(
141+
f"Component mapping contains invalid component ID: `{i}`"
142+
)
143+
144+
def generate_options_for_multimodal_integration(self):
145+
self._options["query"] = self._query
146+
self._options["item_name"] = (
147+
self._custom_item_name if self._custom_item_name else self._item_name_column
56148
)
57-
if integration:
149+
self._options["prefix"] = True if self._custom_item_name else False
150+
self._options["column_class_map"] = self._component_mapping
151+
if self._item_category_column:
152+
self._options["item_category"] = self._item_category_column
153+
154+
def execute(self) -> Response:
155+
if self.is_valid():
156+
if self._integration.type in self.MULTIMODAL_INTEGRATIONS:
157+
self.generate_options_for_multimodal_integration()
158+
58159
self.reporter.log_info(
59160
"Attaching file(s) from "
60-
f"{integration.root}{f'/{self._folder_path}' if self._folder_path else ''} "
161+
f"{self._integration.root}{f'/{self._folder_path}' if self._folder_path else ''} "
61162
f"to {self._upload_path}. This may take some time."
62163
)
63-
attached = self._service_provider.integrations.attach_items(
164+
165+
attache_response = self._service_provider.integrations.attach_items(
64166
project=self._project,
65167
folder=self._folder,
66-
integration=integration,
67-
folder_name=self._folder_path,
168+
integration=self._integration,
169+
folder_name=self._folder_path
170+
if self._integration.type not in self.MULTIMODAL_INTEGRATIONS
171+
else None,
172+
options=self._options if self._options else None,
68173
)
69-
if not attached:
174+
if not attache_response.ok:
70175
self._response.errors = AppException(
71176
f"An error occurred for {self._integration.name}. Please make sure: "
72177
"\n - The bucket exists."
73178
"\n - The connection is valid."
74179
"\n - The path to a specified directory is correct."
75180
)
76-
else:
77-
self._response.errors = AppException("Integration not found.")
78-
return self._response
181+
return self._response

src/superannotate/lib/infrastructure/controller.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1138,7 +1138,11 @@ def attach_items(
11381138
project: ProjectEntity,
11391139
folder: FolderEntity,
11401140
integration: IntegrationEntity,
1141-
folder_path: str,
1141+
folder_path: str = None,
1142+
query: Optional[str] = None,
1143+
item_name_column: Optional[str] = None,
1144+
custom_item_name: Optional[str] = None,
1145+
component_mapping: Optional[Dict[str, str]] = None,
11421146
):
11431147
use_case = usecases.AttachIntegrations(
11441148
reporter=Reporter(),
@@ -1147,6 +1151,10 @@ def attach_items(
11471151
folder=folder,
11481152
integration=integration,
11491153
folder_path=folder_path,
1154+
query=query,
1155+
item_name_column=item_name_column,
1156+
custom_item_name=custom_item_name,
1157+
component_mapping=component_mapping,
11501158
)
11511159
return use_case.execute()
11521160

src/superannotate/lib/infrastructure/services/integration.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Dict
2+
13
from lib.core import entities
24
from lib.core.service_types import IntegrationListResponse
35
from lib.core.serviceproviders import BaseIntegrationService
@@ -23,6 +25,7 @@ def attach_items(
2325
folder: entities.FolderEntity,
2426
integration: entities.IntegrationEntity,
2527
folder_name: str = None,
28+
options: Dict[str, str] = None,
2629
):
2730
data = {
2831
"team_id": project.team_id,
@@ -32,6 +35,8 @@ def attach_items(
3235
}
3336
if folder_name:
3437
data["customer_folder_name"] = folder_name
38+
if options:
39+
data["options"] = options
3540
return self.client.request(
3641
self.URL_ATTACH_INTEGRATIONS.format(project.team_id), "post", data=data
3742
)

0 commit comments

Comments
 (0)