Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Feature | Java Library | Python Library | Notes
Serialization and deserialization using schema registry | ✔️ | ✔️
Avro message format | ✔️ | ✔️
JSON Schema message format | ✔️ | ✔️
Protobuf V3 message format | ✔️ | ✔️
Kafka Streams support | ✔️ | | N/A for Python, Kafka Streams is Java-only
Compression | ✔️ | ✔️ |
Local schema cache | ✔️ | ✔️
Expand Down Expand Up @@ -205,4 +206,4 @@ Assuming pypi permissions:
python -m build
twine upload -r testpypi dist/*
twine upload dist/*
```
```
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ install_requires =
orjson~=3.6.0;python_version<"3.11"
orjson>=3.7.7;python_version>="3.11"
fastjsonschema~=2.15
protobuf~=4.21.9
grpcio~=1.54.0
grpcio-tools~=1.54.0
setup_requires =
setuptools

Expand Down
67 changes: 67 additions & 0 deletions src/aws_schema_registry/protobufv3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import annotations

import json
import re
import tempfile
from os import path

from .schema import DataFormat, Schema, ValidationError
from .utils_protobufv3 import load_pb_file, load_pb_str


class ProtobufV3Schema(Schema):
"""Implementation of the `Schema` protocol for Protobuf V3 schemas.

Arguments:
definition: the schema, either as a file path string or (perspectively) a string
message_class_name: the class name of the message to be
serialized / deserialized
"""

def __init__(self, definition: str,
message_class_name: str):
# distinguish: protobuf schema file vs. protbuf schema string
if re.match(r"^(.+)/([^/]+)$", definition):
self._parsed = load_pb_file(definition)
else:
temp_file = tempfile.NamedTemporaryFile(delete=True)
self._parsed = load_pb_str(definition, f"{path.basename(temp_file.name)}.proto")
self._msg_obj = getattr(self._parsed, message_class_name)()
self._message_class_name = message_class_name

def __hash__(self):
return hash(str(self))

def __eq__(self, other):
return isinstance(other, ProtobufV3Schema) and \
self._parsed == other._parsed

def __str__(self):
return self._parsed

def __repr__(self):
return '<ProtobufV3Schema %s>' % self._parsed

@property
def data_format(self) -> DataFormat:
return 'PROTOBUFV3'

@property
def fqn(self) -> str:
return ""

def read(self, bytestr: bytes):
self._msg_obj.ParseFromString(bytestr)
return self._msg_obj

def write(self, data) -> bytes:
return data.SerializeToString()

def validate(self, data):
try:
data.SerializeToString()
except Exception as e:
# the message will contain space characters, json.loads + str is a
# (relatively inefficient) way to remove them
detail: list[str] = json.loads(str(e))
raise ValidationError(str(detail)) from e
2 changes: 1 addition & 1 deletion src/aws_schema_registry/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
else:
from typing import Literal

DataFormat = Literal['AVRO', 'JSON']
DataFormat = Literal['AVRO', 'JSON', 'PROTOBUFV3']

CompatibilityMode = Literal['NONE', 'DISABLED', 'BACKWARD', 'BACKWARD_ALL',
'FORWARD', 'FORWARD_ALL', 'FULL', 'FULL_ALL']
Expand Down
75 changes: 75 additions & 0 deletions src/aws_schema_registry/utils_protobufv3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from __future__ import annotations

import tempfile
from importlib.machinery import SourceFileLoader
from os import path, makedirs
from types import ModuleType

from grpc_tools import protoc


def load_pb_file(proto_schema_file: str) -> ModuleType:
"""
Helper function to load a protobuf schema from a given file.

:param proto_schema_file: file path and name as string (ends with ".proto")
"""
with tempfile.TemporaryDirectory() as temp_dir:
schema_file_name = path.basename(proto_schema_file)
proto_dir = path.dirname(proto_schema_file)
proto_name = schema_file_name[:-6]

compiled_dir = path.join(temp_dir, 'protol', proto_name)

return compile_pb_schema(proto_dir, proto_name, proto_schema_file, compiled_dir)


def load_pb_str(proto_schema_str: str, schema_file_name: str) -> ModuleType:
"""
Helper function to load a protobuf schema from a given string.

:param proto_schema_str: protobuf schema as string
:param schema_file_name: protobuf schema file name (ends with ".proto")
"""
with tempfile.TemporaryDirectory() as proto_dir:
proto_name = schema_file_name[:-6]

# create protobuf schema file in temp folder
proto_schema_file = path.join(proto_dir, schema_file_name)
with open(proto_schema_file, 'w') as f:
f.write(proto_schema_str)

compiled_dir = path.join(proto_dir, 'protol', proto_name)

return compile_pb_schema(proto_dir, proto_name, proto_schema_file, compiled_dir)


def compile_pb_schema(proto_dir, proto_name, proto_schema_file, compiled_dir):
"""
Compile protobuf schema to Python classes.

:param proto_dir: directory of the protobuf schema file name
:param proto_name: protobuf schema file name (without extension)
:param proto_schema_file: the given protobuf schema file
:param compiled_dir: directory containing the compiled Python classes
"""
pb2_name = f'{proto_name}_pb2'
pb2_file = path.join(compiled_dir, f'{pb2_name}.py')

if path.isdir(compiled_dir):
if path.exists(pb2_file):
return SourceFileLoader(pb2_name, pb2_file).load_module()
else:
makedirs(compiled_dir)

proto_include = protoc.pkg_resources.resource_filename('grpc_tools', '_proto')
compile_arguments = [
f'-I{proto_dir}',
f'--proto_path={proto_dir}',
f'--python_out={compiled_dir}',
proto_schema_file,
f'-I{proto_include}'
]
protoc.main(compile_arguments)

return SourceFileLoader(pb2_name, pb2_file).load_module()
23 changes: 16 additions & 7 deletions tests/integration/java/test_java_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import subprocess

import pytest
from google.protobuf.json_format import ParseDict

from aws_schema_registry import DataAndSchema, SchemaRegistryClient
from aws_schema_registry.avro import AvroSchema
from aws_schema_registry.jsonschema import JsonSchema
from aws_schema_registry.adapter.kafka import (
KafkaDeserializer, KafkaSerializer
)
from aws_schema_registry.protobufv3 import ProtobufV3Schema

LOG = logging.getLogger(__name__)

Expand All @@ -19,32 +21,39 @@
'target',
'java-integration-test.jar'
)
DATA = {
'name': 'John Doe',
'favorite_number': 6,
'favorite_color': 'red'
}

with open(os.path.join(os.path.dirname(__file__), 'user.avsc'), 'r') as f:
SCHEMA = AvroSchema(f.read())

with open(os.path.join(os.path.dirname(__file__), 'user.json'), 'r') as f:
JSON_SCHEMA = JsonSchema(f.read())

with open(os.path.join(os.path.dirname(__file__), 'user.proto'), 'r') as f:
PROTOBUFV3_SCHEMA = ProtobufV3Schema(f.read(), 'User')


def _topic_name_schema_type_name_strategy(topic, is_key, schema):
return f"{topic}-{'key' if is_key else 'value'}-{schema.data_format}"


@pytest.mark.parametrize("schema", [SCHEMA, JSON_SCHEMA])
@pytest.mark.parametrize("schema, data", [
(SCHEMA, DATA),
(JSON_SCHEMA, DATA),
(PROTOBUFV3_SCHEMA, ParseDict(DATA, PROTOBUFV3_SCHEMA._msg_obj))
])
def test_interop_with_java_library(glue_client, registry,
boto_session, schema):
boto_session, schema, data):
client = SchemaRegistryClient(glue_client, registry_name=registry)
serializer = KafkaSerializer(
client,
schema_naming_strategy=_topic_name_schema_type_name_strategy)
deserializer = KafkaDeserializer(client)

data = {
'name': 'John Doe',
'favorite_number': 6,
'favorite_color': 'red'
}
serialized: bytes = serializer.serialize(
'test', DataAndSchema(data, schema)
)
Expand Down
8 changes: 8 additions & 0 deletions tests/integration/java/user.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";
package aws_schema_registry.integrationtests;

message User {
string name = 1;
int32 favorite_number = 2;
string favorite_color = 3;
}
7 changes: 7 additions & 0 deletions tests/resources/example.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
syntax = "proto3";
package alpha.beta;

message MyMessage {
string text = 1;
int32 number = 2;
}
25 changes: 25 additions & 0 deletions tests/test_protobufv3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from aws_schema_registry.protobufv3 import ProtobufV3Schema


def test_fully_qualified_name():
s = ProtobufV3Schema('./resources/example.proto', 'MyMessage')
assert s.fqn == ""


def test_readwrite():
s = ProtobufV3Schema('./resources/example.proto', 'MyMessage')
d = s._parsed.MyMessage(text = 'Hello World!', number = 42)
assert s.read(s.write(d)) == d


def test_readwrite_schema_str():
s = ProtobufV3Schema("""syntax = "proto3";
package gamma.delta;

message MySecondMessage {
string text = 1;
int32 number = 2;
}
""", 'MySecondMessage')
d = s._parsed.MySecondMessage(text = 'Hello World!', number = 42)
assert s.read(s.write(d)) == d