Skip to content

Commit 3001ac5

Browse files
committed
Make sink offset parameter nullable for consistent API
1 parent 41d8a9d commit 3001ac5

File tree

6 files changed

+11
-11
lines changed

6 files changed

+11
-11
lines changed

quixstreams/dataframe/dataframe.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1708,7 +1708,7 @@ def _sink_callback(
17081708
headers=headers,
17091709
partition=ctx.partition,
17101710
topic=ctx.topic,
1711-
offset=ctx.offset or 0,
1711+
offset=ctx.offset,
17121712
)
17131713

17141714
# uses apply without returning to make this operation terminal

quixstreams/sinks/base/batch.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from collections import deque
22
from itertools import islice
3-
from typing import Any, Deque, Iterable, Iterator
3+
from typing import Any, Deque, Iterable, Iterator, Optional
44

55
from quixstreams.models import HeadersTuples
66

@@ -39,7 +39,7 @@ def size(self) -> int:
3939
return len(self._buffer)
4040

4141
@property
42-
def start_offset(self) -> int:
42+
def start_offset(self) -> Optional[int]:
4343
return self._buffer[0].offset
4444

4545
def append(
@@ -48,7 +48,7 @@ def append(
4848
key: Any,
4949
timestamp: int,
5050
headers: HeadersTuples,
51-
offset: int,
51+
offset: Optional[int],
5252
):
5353
self._buffer.append(
5454
SinkItem(

quixstreams/sinks/base/item.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any
1+
from typing import Any, Optional
22

33
from quixstreams.models import HeadersTuples
44

@@ -20,7 +20,7 @@ def __init__(
2020
key: Any,
2121
timestamp: int,
2222
headers: HeadersTuples,
23-
offset: int,
23+
offset: Optional[int],
2424
):
2525
self.key = key
2626
self.value = value

quixstreams/sinks/base/sink.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def add(
7171
headers: HeadersTuples,
7272
topic: str,
7373
partition: int,
74-
offset: int,
74+
offset: Optional[int],
7575
):
7676
"""
7777
This method is triggered on every new processed record being sent to this sink.
@@ -164,7 +164,7 @@ def add(
164164
headers: HeadersTuples,
165165
topic: str,
166166
partition: int,
167-
offset: int,
167+
offset: Optional[int],
168168
):
169169
"""
170170
Add a new record to in-memory batch.

quixstreams/sinks/core/influxdb3.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ def add(
235235
headers: HeadersTuples,
236236
topic: str,
237237
partition: int,
238-
offset: int,
238+
offset: Optional[int],
239239
):
240240
if not isinstance(value, Mapping):
241241
raise TypeError(

quixstreams/sinks/core/list.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from collections import UserList
2-
from typing import Any
2+
from typing import Any, Optional
33

44
from quixstreams.models import HeadersTuples
55
from quixstreams.sinks.base import BaseSink
@@ -63,7 +63,7 @@ def add(
6363
headers: HeadersTuples,
6464
topic: str,
6565
partition: int,
66-
offset: int,
66+
offset: Optional[int],
6767
):
6868
if not isinstance(value, dict):
6969
value = {"value": value}

0 commit comments

Comments
 (0)