Skip to content

Commit 46f24ce

Browse files
authored
DGS-22077 Handle evolution during field transformation (#2121)
* DGS-22077 Handle evolution during field transformation * Minor fix * Minor cleanup * Add new header
1 parent 3886a30 commit 46f24ce

File tree

5 files changed

+128
-2
lines changed

5 files changed

+128
-2
lines changed

src/confluent_kafka/schema_registry/_async/schema_registry_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ async def send_request(
457457
body_str = json.dumps(body)
458458
headers = {'Content-Length': str(len(body_str)),
459459
'Content-Type': "application/vnd.schemaregistry.v1+json",
460-
'Accept-Version': "8.0"}
460+
'Confluent-Accept-Unknown-Properties': "true"}
461461

462462
if self.bearer_auth_credentials_source:
463463
await self.handle_bearer_auth(headers)

src/confluent_kafka/schema_registry/_sync/schema_registry_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ def send_request(
457457
body_str = json.dumps(body)
458458
headers = {'Content-Length': str(len(body_str)),
459459
'Content-Type': "application/vnd.schemaregistry.v1+json",
460-
'Accept-Version': "8.0"}
460+
'Confluent-Accept-Unknown-Properties': "true"}
461461

462462
if self.bearer_auth_credentials_source:
463463
self.handle_bearer_auth(headers)

src/confluent_kafka/schema_registry/common/avro.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ def transform(
133133
return message
134134
fields = schema["fields"]
135135
for field in fields:
136+
if field["name"] not in message:
137+
continue
136138
_transform_field(ctx, schema, field, message, field_transform)
137139
return message
138140

tests/schema_registry/_async/test_avro_serdes.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,68 @@ async def test_avro_cel_field_transform():
737737
assert obj2 == newobj
738738

739739

740+
async def test_avro_cel_field_transform_missing_prop():
741+
conf = {'url': _BASE_URL}
742+
client = AsyncSchemaRegistryClient.new_client(conf)
743+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
744+
schema = {
745+
'type': 'record',
746+
'name': 'test',
747+
'fields': [
748+
{'name': 'intField', 'type': 'int'},
749+
{'name': 'doubleField', 'type': 'double'},
750+
{'name': 'stringField', 'type': 'string'},
751+
{'name': 'booleanField', 'type': 'boolean'},
752+
{'name': 'bytesField', 'type': 'bytes'},
753+
{'name': 'missing', 'type': ['null', 'string'], 'default': None},
754+
]
755+
}
756+
757+
rule = Rule(
758+
"test-cel",
759+
"",
760+
RuleKind.TRANSFORM,
761+
RuleMode.WRITEREAD,
762+
"CEL_FIELD",
763+
None,
764+
None,
765+
"name == 'stringField' ; value + '-suffix'",
766+
None,
767+
None,
768+
False
769+
)
770+
await client.register_schema(_SUBJECT, Schema(
771+
json.dumps(schema),
772+
"AVRO",
773+
[],
774+
None,
775+
RuleSet(None, [rule])
776+
))
777+
778+
obj = {
779+
'intField': 123,
780+
'doubleField': 45.67,
781+
'stringField': 'hi',
782+
'booleanField': True,
783+
'bytesField': b'foobar',
784+
}
785+
ser = await AsyncAvroSerializer(client, schema_str=None, conf=ser_conf)
786+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
787+
obj_bytes = await ser(obj, ser_ctx)
788+
789+
obj2 = {
790+
'intField': 123,
791+
'doubleField': 45.67,
792+
'stringField': 'hi-suffix-suffix',
793+
'booleanField': True,
794+
'bytesField': b'foobar',
795+
'missing': None,
796+
}
797+
deser = await AsyncAvroDeserializer(client)
798+
newobj = await deser(obj_bytes, ser_ctx)
799+
assert obj2 == newobj
800+
801+
740802
async def test_avro_cel_field_transform_disable():
741803
conf = {'url': _BASE_URL}
742804
client = AsyncSchemaRegistryClient.new_client(conf)

tests/schema_registry/_sync/test_avro_serdes.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,68 @@ def test_avro_cel_field_transform():
737737
assert obj2 == newobj
738738

739739

740+
def test_avro_cel_field_transform_missing_prop():
741+
conf = {'url': _BASE_URL}
742+
client = SchemaRegistryClient.new_client(conf)
743+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
744+
schema = {
745+
'type': 'record',
746+
'name': 'test',
747+
'fields': [
748+
{'name': 'intField', 'type': 'int'},
749+
{'name': 'doubleField', 'type': 'double'},
750+
{'name': 'stringField', 'type': 'string'},
751+
{'name': 'booleanField', 'type': 'boolean'},
752+
{'name': 'bytesField', 'type': 'bytes'},
753+
{'name': 'missing', 'type': ['null', 'string'], 'default': None},
754+
]
755+
}
756+
757+
rule = Rule(
758+
"test-cel",
759+
"",
760+
RuleKind.TRANSFORM,
761+
RuleMode.WRITEREAD,
762+
"CEL_FIELD",
763+
None,
764+
None,
765+
"name == 'stringField' ; value + '-suffix'",
766+
None,
767+
None,
768+
False
769+
)
770+
client.register_schema(_SUBJECT, Schema(
771+
json.dumps(schema),
772+
"AVRO",
773+
[],
774+
None,
775+
RuleSet(None, [rule])
776+
))
777+
778+
obj = {
779+
'intField': 123,
780+
'doubleField': 45.67,
781+
'stringField': 'hi',
782+
'booleanField': True,
783+
'bytesField': b'foobar',
784+
}
785+
ser = AvroSerializer(client, schema_str=None, conf=ser_conf)
786+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
787+
obj_bytes = ser(obj, ser_ctx)
788+
789+
obj2 = {
790+
'intField': 123,
791+
'doubleField': 45.67,
792+
'stringField': 'hi-suffix-suffix',
793+
'booleanField': True,
794+
'bytesField': b'foobar',
795+
'missing': None,
796+
}
797+
deser = AvroDeserializer(client)
798+
newobj = deser(obj_bytes, ser_ctx)
799+
assert obj2 == newobj
800+
801+
740802
def test_avro_cel_field_transform_disable():
741803
conf = {'url': _BASE_URL}
742804
client = SchemaRegistryClient.new_client(conf)

0 commit comments

Comments
 (0)