Skip to content

Commit f37ec78

Browse files
authored
Ensure schemaId initialization is thread-safe (#2120)
* Ensure schemaId initialization is thread-safe * Minor fix
1 parent 06c0744 commit f37ec78

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

src/confluent_kafka/schema_registry/_async/protobuf.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,8 @@ async def __serialize(self, message: Message, ctx: Optional[SerializationContext
395395
latest_schema = await self._get_reader_schema(subject, fmt='serialized')
396396

397397
if latest_schema is not None:
398-
self._schema_id = SchemaId(PROTOBUF_TYPE, latest_schema.schema_id, latest_schema.guid)
398+
self._schema_id = SchemaId(PROTOBUF_TYPE, latest_schema.schema_id,
399+
latest_schema.guid, self._index_array)
399400

400401
elif subject not in self._known_subjects and ctx is not None:
401402
references = await self._resolve_dependencies(ctx, message.DESCRIPTOR.file)
@@ -408,11 +409,13 @@ async def __serialize(self, message: Message, ctx: Optional[SerializationContext
408409
if self._auto_register:
409410
registered_schema = await self._registry.register_schema_full_response(
410411
subject, self._schema, normalize_schemas=self._normalize_schemas)
411-
self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id, registered_schema.guid)
412+
self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id,
413+
registered_schema.guid, self._index_array)
412414
else:
413415
registered_schema = await self._registry.lookup_schema(
414416
subject, self._schema, normalize_schemas=self._normalize_schemas)
415-
self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id, registered_schema.guid)
417+
self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id,
418+
registered_schema.guid, self._index_array)
416419

417420
self._known_subjects.add(subject)
418421

@@ -428,7 +431,6 @@ def field_transformer(rule_ctx, field_transform, msg): return ( # noqa: E731
428431

429432
with _ContextStringIO() as fo:
430433
fo.write(message.SerializeToString())
431-
self._schema_id.message_indexes = self._index_array
432434
buffer = fo.getvalue()
433435

434436
if latest_schema is not None:

src/confluent_kafka/schema_registry/_sync/protobuf.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,8 @@ def __serialize(self, message: Message, ctx: Optional[SerializationContext] = No
395395
latest_schema = self._get_reader_schema(subject, fmt='serialized')
396396

397397
if latest_schema is not None:
398-
self._schema_id = SchemaId(PROTOBUF_TYPE, latest_schema.schema_id, latest_schema.guid)
398+
self._schema_id = SchemaId(PROTOBUF_TYPE, latest_schema.schema_id,
399+
latest_schema.guid, self._index_array)
399400

400401
elif subject not in self._known_subjects and ctx is not None:
401402
references = self._resolve_dependencies(ctx, message.DESCRIPTOR.file)
@@ -408,11 +409,13 @@ def __serialize(self, message: Message, ctx: Optional[SerializationContext] = No
408409
if self._auto_register:
409410
registered_schema = self._registry.register_schema_full_response(
410411
subject, self._schema, normalize_schemas=self._normalize_schemas)
411-
self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id, registered_schema.guid)
412+
self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id,
413+
registered_schema.guid, self._index_array)
412414
else:
413415
registered_schema = self._registry.lookup_schema(
414416
subject, self._schema, normalize_schemas=self._normalize_schemas)
415-
self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id, registered_schema.guid)
417+
self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id,
418+
registered_schema.guid, self._index_array)
416419

417420
self._known_subjects.add(subject)
418421

@@ -428,7 +431,6 @@ def field_transformer(rule_ctx, field_transform, msg): return ( # noqa: E731
428431

429432
with _ContextStringIO() as fo:
430433
fo.write(message.SerializeToString())
431-
self._schema_id.message_indexes = self._index_array
432434
buffer = fo.getvalue()
433435

434436
if latest_schema is not None:

0 commit comments

Comments
 (0)