Skip to content

Commit 98d5876

Browse files
authored
Merge pull request #751 from superannotateai/get_annotations
update in get_annotations logic
2 parents 92e6695 + 64cc82d commit 98d5876

File tree

6 files changed

+124
-23
lines changed

6 files changed

+124
-23
lines changed

src/superannotate/lib/app/interface/sdk_interface.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ def get_component_config(self, project: Union[NotEmptyStr, int], component_id: s
501501
"""
502502

503503
def retrieve_context(
504-
component_data: List[dict], component_pk: str
504+
component_data: List[dict], component_pk: str
505505
) -> Tuple[bool, typing.Any]:
506506
try:
507507
for component in component_data:
@@ -512,9 +512,9 @@ def retrieve_context(
512512
if found:
513513
return found, val
514514
if (
515-
"id" in component and
516-
component["id"] == component_pk
517-
and component["type"] == "webComponent"
515+
"id" in component
516+
and component["id"] == component_pk
517+
and component["type"] == "webComponent"
518518
):
519519
return True, json.loads(component.get("context"))
520520

src/superannotate/lib/core/serviceproviders.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ def get_upload_chunks(
490490
self,
491491
project: entities.ProjectEntity,
492492
item_ids: List[int],
493+
chunk_size: int = 1000,
493494
) -> Dict[str, List]:
494495
raise NotImplementedError
495496

src/superannotate/lib/infrastructure/services/annotation.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
from lib.core.service_types import UploadAnnotationsResponse
1919
from lib.core.serviceproviders import BaseAnnotationService
2020
from lib.infrastructure.stream_data_handler import StreamedAnnotations
21+
from lib.infrastructure.utils import annotation_is_valid
22+
from lib.infrastructure.utils import divide_to_chunks
2123

2224
try:
2325
from pydantic.v1 import parse_obj_as
@@ -170,21 +172,29 @@ def get_upload_chunks(
170172
self,
171173
project: entities.ProjectEntity,
172174
item_ids: List[int],
175+
chunk_size: int = 1000,
173176
) -> Dict[str, List]:
174-
response_data = {"small": [], "large": []}
175-
response = self.client.request(
176-
url=urljoin(self.get_assets_provider_url(), self.URL_CLASSIFY_ITEM_SIZE),
177-
method="POST",
178-
params={"limit": len(item_ids)},
179-
data={"project_id": project.id, "item_ids": item_ids},
180-
)
181-
if not response.ok:
182-
raise AppException(response.error)
183-
response_data["small"] = [
184-
i["data"] for i in response.data.get("small", {}).values()
185-
]
186-
response_data["large"] = response.data.get("large", [])
187-
return response_data
177+
small = []
178+
large = []
179+
180+
chunks = divide_to_chunks(item_ids, chunk_size)
181+
for chunk in chunks:
182+
response = self.client.request(
183+
method="POST",
184+
url=urljoin(
185+
self.get_assets_provider_url(), self.URL_CLASSIFY_ITEM_SIZE
186+
),
187+
params={"limit": len(chunk)},
188+
data={
189+
"project_id": project.id,
190+
"item_ids": chunk,
191+
},
192+
)
193+
if not response.ok:
194+
raise AppException(response.error)
195+
small.extend([i["data"] for i in response.data.get("small", {}).values()])
196+
large.extend(response.data.get("large", []))
197+
return {"small": small, "large": large}
188198

189199
async def download_big_annotation(
190200
self,
@@ -218,6 +228,14 @@ async def download_big_annotation(
218228
) as session:
219229
start_response = await session.request("post", url, params=query_params)
220230
res = await start_response.json()
231+
if start_response.status > 299 or not annotation_is_valid(res):
232+
logger.debug(
233+
f"Failed to download large annotation; item_id [{item_id}];"
234+
f" response: {res}; http_status: {start_response.status}"
235+
)
236+
raise AppException(
237+
f"Failed to download large annotation, ID: {item_id}"
238+
)
221239
Path(download_path).mkdir(exist_ok=True, parents=True)
222240

223241
dest_path = Path(download_path) / (item_name + ".json")

src/superannotate/lib/infrastructure/stream_data_handler.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@
66
from typing import Callable
77

88
import aiohttp
9+
from lib.core.exceptions import AppException
10+
from lib.core.exceptions import BackendError
911
from lib.core.reporter import Reporter
1012
from lib.infrastructure.services.http_client import AIOHttpSession
13+
from lib.infrastructure.utils import annotation_is_valid
14+
from lib.infrastructure.utils import async_retry_on_generator
1115

1216
_seconds = 2**10
1317
TIMEOUT = aiohttp.ClientTimeout(
@@ -42,6 +46,7 @@ def get_json(self, data: bytes):
4246
self._reporter.log_error(f"Invalud chunk: {str(e)}")
4347
return None
4448

49+
@async_retry_on_generator((BackendError,))
4550
async def fetch(
4651
self,
4752
method: str,
@@ -59,6 +64,7 @@ async def fetch(
5964
buffer = ""
6065
line_groups = b""
6166
decoder = json.JSONDecoder()
67+
data_received = False
6268
async for line in response.content.iter_any():
6369
line_groups += line
6470
try:
@@ -71,6 +77,19 @@ async def fetch(
7177
if buffer.startswith(self.DELIMITER):
7278
buffer = buffer[self.DELIMITER_LEN :]
7379
json_obj, index = decoder.raw_decode(buffer)
80+
if not annotation_is_valid(json_obj):
81+
logger.warning(
82+
f"Invalid JSON detected in small annotations stream process, json: {json_obj}."
83+
)
84+
if data_received:
85+
raise AppException(
86+
"Invalid JSON detected in small annotations stream process."
87+
)
88+
else:
89+
raise BackendError(
90+
"Invalid JSON detected at the start of the small annotations stream process."
91+
)
92+
data_received = True
7493
yield json_obj
7594
if len(buffer[index:]) >= self.DELIMITER_LEN:
7695
buffer = buffer[index + self.DELIMITER_LEN :]

src/superannotate/lib/infrastructure/utils.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
1+
import asyncio
2+
import logging
13
import time
24
from abc import ABC
35
from abc import abstractmethod
6+
from functools import wraps
47
from itertools import islice
58
from pathlib import Path
69
from typing import Any
10+
from typing import Callable
711
from typing import Dict
812
from typing import Optional
913
from typing import Tuple
14+
from typing import Type
1015
from typing import Union
1116

1217
from lib.core.entities import ProjectEntity
@@ -16,6 +21,9 @@
1621
from lib.infrastructure.services.work_management import WorkManagementService
1722

1823

24+
logger = logging.getLogger("sa")
25+
26+
1927
def divide_to_chunks(it, size):
2028
it = iter(it)
2129
return iter(lambda: tuple(islice(it, size)), ())
@@ -44,6 +52,66 @@ def extract_project_folder(user_input: Union[str, dict]) -> Tuple[str, Optional[
4452
raise PathError("Invalid project path")
4553

4654

55+
def async_retry_on_generator(
56+
exceptions: Tuple[Type[Exception]],
57+
retries: int = 3,
58+
delay: float = 0.3,
59+
backoff: float = 0.3,
60+
):
61+
"""
62+
An async retry decorator that retries a function only on specific exceptions.
63+
64+
Parameters:
65+
exceptions (tuple): Tuple of exception classes to retry on.
66+
retries (int): Number of retry attempts.
67+
delay (float): Initial delay between retries in seconds.
68+
backoff (float): Factor to increase the delay after each failure.
69+
"""
70+
71+
def decorator(func: Callable):
72+
@wraps(func)
73+
async def wrapper(*args, **kwargs):
74+
attempt = 0
75+
current_delay = delay
76+
raised_exception = None
77+
78+
while attempt < retries:
79+
try:
80+
async for v in func(*args, **kwargs):
81+
yield v
82+
return
83+
except exceptions as e:
84+
raised_exception = e
85+
logger.debug(
86+
f"Attempt {attempt + 1}/{retries} failed with error: {e}. "
87+
f"Retrying in {current_delay} seconds..."
88+
)
89+
await asyncio.sleep(current_delay)
90+
current_delay += backoff # Exponential backoff
91+
finally:
92+
attempt += 1
93+
if raised_exception:
94+
logger.error(
95+
f"All {retries} attempts failed due to {raised_exception}."
96+
)
97+
raise raised_exception
98+
99+
return wrapper
100+
101+
return decorator
102+
103+
104+
def annotation_is_valid(annotation: dict) -> bool:
105+
annotation_keys = annotation.keys()
106+
if (
107+
"errors" in annotation_keys
108+
or "error" in annotation_keys
109+
or "metadata" not in annotation_keys
110+
):
111+
return False
112+
return True
113+
114+
47115
class BaseCachedWorkManagementRepository(ABC):
48116
def __init__(self, ttl_seconds: int, work_management: WorkManagementService):
49117
self.ttl_seconds = ttl_seconds

tests/integration/annotations/test_upload_annotations_from_folder_to_project.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,3 @@ def test_annotation_folder_upload_download(self):
166166
contents1 = f1.read()
167167
contents2 = f2.read()
168168
assert contents1 == contents2
169-
170-
171-
def test():
172-
a = sa.get_user_metadata(pk=244700, include=["custom_fields"])
173-
print(a)

0 commit comments

Comments
 (0)