File tree Expand file tree Collapse file tree 2 files changed +80
-1
lines changed
src/confluent_kafka/schema_registry Expand file tree Collapse file tree 2 files changed +80
-1
lines changed Original file line number Diff line number Diff line change @@ -752,7 +752,11 @@ def _get_inline_tags_recursively(
752752 return
753753 else :
754754 schema_type = schema .get ("type" )
755- if schema_type == 'record' :
755+ if schema_type == 'array' :
756+ _get_inline_tags_recursively (ns , name , schema .get ("items" ), tags )
757+ elif schema_type == 'map' :
758+ _get_inline_tags_recursively (ns , name , schema .get ("values" ), tags )
759+ elif schema_type == 'record' :
756760 record_ns = schema .get ("namespace" )
757761 record_name = schema .get ("name" )
758762 if record_ns is None :
Original file line number Diff line number Diff line change @@ -757,6 +757,81 @@ def test_avro_cel_field_transform_complex_with_none():
757757 assert obj2 == newobj
758758
759759
760+ def test_avro_cel_field_transform_complex_nested ():
761+ conf = {'url' : _BASE_URL }
762+ client = SchemaRegistryClient .new_client (conf )
763+ ser_conf = {'auto.register.schemas' : False , 'use.latest.version' : True }
764+ schema = {
765+ 'type' : 'record' ,
766+ 'name' : 'UnionTest' ,
767+ 'namespace' : 'test' ,
768+ 'fields' : [
769+ {
770+ 'name' : 'emails' ,
771+ 'type' : [
772+ 'null' ,
773+ {
774+ 'type' : 'array' ,
775+ 'items' : {
776+ 'type' : 'record' ,
777+ 'name' : 'Email' ,
778+ 'fields' : [
779+ {
780+ 'name' : 'email' ,
781+ 'type' : [
782+ 'null' ,
783+ 'string'
784+ ],
785+ 'doc' : 'Email address' ,
786+ 'confluent:tags' : [
787+ 'PII'
788+ ]
789+ }
790+ ]
791+ }
792+ }
793+ ],
794+ 'doc' : 'Communication Email' ,
795+ }
796+ ]
797+ }
798+
799+ rule = Rule (
800+ "test-cel" ,
801+ "" ,
802+ RuleKind .TRANSFORM ,
803+ RuleMode .WRITE ,
804+ "CEL_FIELD" ,
805+ None ,
806+ None ,
807+ "typeName == 'STRING' ; value + '-suffix'" ,
808+ None ,
809+ None ,
810+ False
811+ )
812+ client .register_schema (_SUBJECT , Schema (
813+ json .dumps (schema ),
814+ "AVRO" ,
815+ [],
816+ None ,
817+ RuleSet (None , [rule ])
818+ ))
819+
820+ obj = {
821+ 'emails' : [{'email' : 'john@acme.com' }]
822+ }
823+ ser = AvroSerializer (client , schema_str = None , conf = ser_conf )
824+ ser_ctx = SerializationContext (_TOPIC , MessageField .VALUE )
825+ obj_bytes = ser (obj , ser_ctx )
826+
827+ obj2 = {
828+ 'emails' : [{'email' : 'john@acme.com-suffix' }]
829+ }
830+ deser = AvroDeserializer (client )
831+ newobj = deser (obj_bytes , ser_ctx )
832+ assert obj2 == newobj
833+
834+
760835def test_avro_cel_field_condition ():
761836 conf = {'url' : _BASE_URL }
762837 client = SchemaRegistryClient .new_client (conf )
You can’t perform that action at this time.
0 commit comments