diff --git a/README.md b/README.md index 52fd197..168c0f1 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Feature | Java Library | Python Library | Notes Serialization and deserialization using schema registry | ✔️ | ✔️ Avro message format | ✔️ | ✔️ JSON Schema message format | ✔️ | ✔️ +Protobuf V3 message format | ✔️ | ✔️ Kafka Streams support | ✔️ | | N/A for Python, Kafka Streams is Java-only Compression | ✔️ | ✔️ | Local schema cache | ✔️ | ✔️ @@ -205,4 +206,4 @@ Assuming pypi permissions: python -m build twine upload -r testpypi dist/* twine upload dist/* -``` \ No newline at end of file +``` diff --git a/setup.cfg b/setup.cfg index c4e7e12..14d88e3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,6 +38,9 @@ install_requires = orjson~=3.6.0;python_version<"3.11" orjson>=3.7.7;python_version>="3.11" fastjsonschema~=2.15 + protobuf~=4.21.9 + grpcio~=1.54.0 + grpcio-tools~=1.54.0 setup_requires = setuptools diff --git a/src/aws_schema_registry/protobufv3.py b/src/aws_schema_registry/protobufv3.py new file mode 100644 index 0000000..9f9148e --- /dev/null +++ b/src/aws_schema_registry/protobufv3.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import json +import re +import tempfile +from os import path + +from .schema import DataFormat, Schema, ValidationError +from .utils_protobufv3 import load_pb_file, load_pb_str + + +class ProtobufV3Schema(Schema): + """Implementation of the `Schema` protocol for Protobuf V3 schemas. + + Arguments: + definition: the schema, either as a file path string or (perspectively) a string + message_class_name: the class name of the message to be + serialized / deserialized + """ + + def __init__(self, definition: str, + message_class_name: str): + # distinguish: protobuf schema file vs. protbuf schema string + if re.match(r"^(.+)/([^/]+)$", definition): + self._parsed = load_pb_file(definition) + else: + temp_file = tempfile.NamedTemporaryFile(delete=True) + self._parsed = load_pb_str(definition, f"{path.basename(temp_file.name)}.proto") + self._msg_obj = getattr(self._parsed, message_class_name)() + self._message_class_name = message_class_name + + def __hash__(self): + return hash(str(self)) + + def __eq__(self, other): + return isinstance(other, ProtobufV3Schema) and \ + self._parsed == other._parsed + + def __str__(self): + return self._parsed + + def __repr__(self): + return '' % self._parsed + + @property + def data_format(self) -> DataFormat: + return 'PROTOBUFV3' + + @property + def fqn(self) -> str: + return "" + + def read(self, bytestr: bytes): + self._msg_obj.ParseFromString(bytestr) + return self._msg_obj + + def write(self, data) -> bytes: + return data.SerializeToString() + + def validate(self, data): + try: + data.SerializeToString() + except Exception as e: + # the message will contain space characters, json.loads + str is a + # (relatively inefficient) way to remove them + detail: list[str] = json.loads(str(e)) + raise ValidationError(str(detail)) from e diff --git a/src/aws_schema_registry/schema.py b/src/aws_schema_registry/schema.py index 6e91184..bb0adcf 100644 --- a/src/aws_schema_registry/schema.py +++ b/src/aws_schema_registry/schema.py @@ -11,7 +11,7 @@ else: from typing import Literal -DataFormat = Literal['AVRO', 'JSON'] +DataFormat = Literal['AVRO', 'JSON', 'PROTOBUFV3'] CompatibilityMode = Literal['NONE', 'DISABLED', 'BACKWARD', 'BACKWARD_ALL', 'FORWARD', 'FORWARD_ALL', 'FULL', 'FULL_ALL'] diff --git a/src/aws_schema_registry/utils_protobufv3.py b/src/aws_schema_registry/utils_protobufv3.py new file mode 100644 index 0000000..c8c355c --- /dev/null +++ b/src/aws_schema_registry/utils_protobufv3.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import tempfile +from importlib.machinery import SourceFileLoader +from os import path, makedirs +from types import ModuleType + +from grpc_tools import protoc + + +def load_pb_file(proto_schema_file: str) -> ModuleType: + """ + Helper function to load a protobuf schema from a given file. + + :param proto_schema_file: file path and name as string (ends with ".proto") + """ + with tempfile.TemporaryDirectory() as temp_dir: + schema_file_name = path.basename(proto_schema_file) + proto_dir = path.dirname(proto_schema_file) + proto_name = schema_file_name[:-6] + + compiled_dir = path.join(temp_dir, 'protol', proto_name) + + return compile_pb_schema(proto_dir, proto_name, proto_schema_file, compiled_dir) + + +def load_pb_str(proto_schema_str: str, schema_file_name: str) -> ModuleType: + """ + Helper function to load a protobuf schema from a given string. + + :param proto_schema_str: protobuf schema as string + :param schema_file_name: protobuf schema file name (ends with ".proto") + """ + with tempfile.TemporaryDirectory() as proto_dir: + proto_name = schema_file_name[:-6] + + # create protobuf schema file in temp folder + proto_schema_file = path.join(proto_dir, schema_file_name) + with open(proto_schema_file, 'w') as f: + f.write(proto_schema_str) + + compiled_dir = path.join(proto_dir, 'protol', proto_name) + + return compile_pb_schema(proto_dir, proto_name, proto_schema_file, compiled_dir) + + +def compile_pb_schema(proto_dir, proto_name, proto_schema_file, compiled_dir): + """ + Compile protobuf schema to Python classes. + + :param proto_dir: directory of the protobuf schema file name + :param proto_name: protobuf schema file name (without extension) + :param proto_schema_file: the given protobuf schema file + :param compiled_dir: directory containing the compiled Python classes + """ + pb2_name = f'{proto_name}_pb2' + pb2_file = path.join(compiled_dir, f'{pb2_name}.py') + + if path.isdir(compiled_dir): + if path.exists(pb2_file): + return SourceFileLoader(pb2_name, pb2_file).load_module() + else: + makedirs(compiled_dir) + + proto_include = protoc.pkg_resources.resource_filename('grpc_tools', '_proto') + compile_arguments = [ + f'-I{proto_dir}', + f'--proto_path={proto_dir}', + f'--python_out={compiled_dir}', + proto_schema_file, + f'-I{proto_include}' + ] + protoc.main(compile_arguments) + + return SourceFileLoader(pb2_name, pb2_file).load_module() diff --git a/tests/integration/java/test_java_integration.py b/tests/integration/java/test_java_integration.py index 43caec0..78802f4 100644 --- a/tests/integration/java/test_java_integration.py +++ b/tests/integration/java/test_java_integration.py @@ -3,6 +3,7 @@ import subprocess import pytest +from google.protobuf.json_format import ParseDict from aws_schema_registry import DataAndSchema, SchemaRegistryClient from aws_schema_registry.avro import AvroSchema @@ -10,6 +11,7 @@ from aws_schema_registry.adapter.kafka import ( KafkaDeserializer, KafkaSerializer ) +from aws_schema_registry.protobufv3 import ProtobufV3Schema LOG = logging.getLogger(__name__) @@ -19,6 +21,11 @@ 'target', 'java-integration-test.jar' ) +DATA = { + 'name': 'John Doe', + 'favorite_number': 6, + 'favorite_color': 'red' +} with open(os.path.join(os.path.dirname(__file__), 'user.avsc'), 'r') as f: SCHEMA = AvroSchema(f.read()) @@ -26,25 +33,27 @@ with open(os.path.join(os.path.dirname(__file__), 'user.json'), 'r') as f: JSON_SCHEMA = JsonSchema(f.read()) +with open(os.path.join(os.path.dirname(__file__), 'user.proto'), 'r') as f: + PROTOBUFV3_SCHEMA = ProtobufV3Schema(f.read(), 'User') + def _topic_name_schema_type_name_strategy(topic, is_key, schema): return f"{topic}-{'key' if is_key else 'value'}-{schema.data_format}" -@pytest.mark.parametrize("schema", [SCHEMA, JSON_SCHEMA]) +@pytest.mark.parametrize("schema, data", [ + (SCHEMA, DATA), + (JSON_SCHEMA, DATA), + (PROTOBUFV3_SCHEMA, ParseDict(DATA, PROTOBUFV3_SCHEMA._msg_obj)) +]) def test_interop_with_java_library(glue_client, registry, - boto_session, schema): + boto_session, schema, data): client = SchemaRegistryClient(glue_client, registry_name=registry) serializer = KafkaSerializer( client, schema_naming_strategy=_topic_name_schema_type_name_strategy) deserializer = KafkaDeserializer(client) - data = { - 'name': 'John Doe', - 'favorite_number': 6, - 'favorite_color': 'red' - } serialized: bytes = serializer.serialize( 'test', DataAndSchema(data, schema) ) diff --git a/tests/integration/java/user.proto b/tests/integration/java/user.proto new file mode 100644 index 0000000..c205aa4 --- /dev/null +++ b/tests/integration/java/user.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; +package aws_schema_registry.integrationtests; + +message User { + string name = 1; + int32 favorite_number = 2; + string favorite_color = 3; +} diff --git a/tests/resources/example.proto b/tests/resources/example.proto new file mode 100644 index 0000000..8865e1b --- /dev/null +++ b/tests/resources/example.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +package alpha.beta; + +message MyMessage { + string text = 1; + int32 number = 2; +} diff --git a/tests/test_protobufv3.py b/tests/test_protobufv3.py new file mode 100644 index 0000000..d394c41 --- /dev/null +++ b/tests/test_protobufv3.py @@ -0,0 +1,25 @@ +from aws_schema_registry.protobufv3 import ProtobufV3Schema + + +def test_fully_qualified_name(): + s = ProtobufV3Schema('./resources/example.proto', 'MyMessage') + assert s.fqn == "" + + +def test_readwrite(): + s = ProtobufV3Schema('./resources/example.proto', 'MyMessage') + d = s._parsed.MyMessage(text = 'Hello World!', number = 42) + assert s.read(s.write(d)) == d + + +def test_readwrite_schema_str(): + s = ProtobufV3Schema("""syntax = "proto3"; +package gamma.delta; + +message MySecondMessage { + string text = 1; + int32 number = 2; +} +""", 'MySecondMessage') + d = s._parsed.MySecondMessage(text = 'Hello World!', number = 42) + assert s.read(s.write(d)) == d