From 4b676e27d47f0320e2b9d553419003d723f5b5ed Mon Sep 17 00:00:00 2001 From: Vasilis Tsiolkas Date: Tue, 2 Sep 2025 10:54:28 +0300 Subject: [PATCH 1/3] Adds support for gRPC aio server that allows for async handlers Signed-off-by: Vasilis Tsiolkas --- .../dapr/ext/grpc/aio/__init__.py | 35 +++ .../dapr/ext/grpc/aio/_health_servicer.py | 32 ++ .../dapr/ext/grpc/aio/_servicer.py | 280 ++++++++++++++++++ ext/dapr-ext-grpc/dapr/ext/grpc/aio/app.py | 224 ++++++++++++++ ext/dapr-ext-grpc/tests/aio/__init__.py | 0 ext/dapr-ext-grpc/tests/aio/test_app.py | 90 ++++++ .../tests/aio/test_health_servicer.py | 20 ++ ext/dapr-ext-grpc/tests/aio/test_servicier.py | 229 ++++++++++++++ tox.ini | 1 + 9 files changed, 911 insertions(+) create mode 100644 ext/dapr-ext-grpc/dapr/ext/grpc/aio/__init__.py create mode 100644 ext/dapr-ext-grpc/dapr/ext/grpc/aio/_health_servicer.py create mode 100644 ext/dapr-ext-grpc/dapr/ext/grpc/aio/_servicer.py create mode 100644 ext/dapr-ext-grpc/dapr/ext/grpc/aio/app.py create mode 100644 ext/dapr-ext-grpc/tests/aio/__init__.py create mode 100644 ext/dapr-ext-grpc/tests/aio/test_app.py create mode 100644 ext/dapr-ext-grpc/tests/aio/test_health_servicer.py create mode 100644 ext/dapr-ext-grpc/tests/aio/test_servicier.py diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/aio/__init__.py b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/__init__.py new file mode 100644 index 000000000..56aa3d17a --- /dev/null +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/__init__.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from dapr.clients.grpc._request import InvokeMethodRequest, BindingRequest, JobEvent +from dapr.clients.grpc._response import InvokeMethodResponse, TopicEventResponse +from dapr.clients.grpc._jobs import Job, FailurePolicy, DropFailurePolicy, ConstantFailurePolicy + +from dapr.ext.grpc.aio.app import App, Rule # type:ignore + + +__all__ = [ + 'App', + 'Rule', + 'InvokeMethodRequest', + 'InvokeMethodResponse', + 'BindingRequest', + 'TopicEventResponse', + 'Job', + 'JobEvent', + 'FailurePolicy', + 'DropFailurePolicy', + 'ConstantFailurePolicy', +] diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/aio/_health_servicer.py b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/_health_servicer.py new file mode 100644 index 000000000..5f0780c5e --- /dev/null +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/_health_servicer.py @@ -0,0 +1,32 @@ +import grpc +from typing import Callable, Optional + +from dapr.proto import appcallback_service_v1 +from dapr.proto.runtime.v1.appcallback_pb2 import HealthCheckResponse + +HealthCheckCallable = Optional[Callable[[], None]] + + +class _AioHealthCheckServicer(appcallback_service_v1.AppCallbackHealthCheckServicer): + """The implementation of HealthCheck Server. + + :class:`App` provides useful decorators to register method, topic, input bindings. + """ + + def __init__(self): + self._health_check_cb: Optional[HealthCheckCallable] = None + + def register_health_check(self, cb: HealthCheckCallable) -> None: + if not cb: + raise ValueError('health check callback must be defined') + self._health_check_cb = cb + + async def HealthCheck(self, request, context): + """Health check.""" + + if not self._health_check_cb: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + await self._health_check_cb() + return HealthCheckResponse() diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/aio/_servicer.py b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/_servicer.py new file mode 100644 index 000000000..0276f9110 --- /dev/null +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/_servicer.py @@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import grpc.aio + +from cloudevents.sdk.event import v1 # type: ignore +from typing import Awaitable, Callable, Dict, List, Optional, Tuple, Union + +from google.protobuf import empty_pb2 +from google.protobuf.message import Message as GrpcMessage +from google.protobuf.struct_pb2 import Struct + +from dapr.proto import appcallback_service_v1, common_v1, appcallback_v1 +from dapr.proto.runtime.v1.appcallback_pb2 import ( + TopicEventRequest, + BindingEventRequest, + JobEventRequest, +) +from dapr.proto.common.v1.common_pb2 import InvokeRequest +from dapr.clients.base import DEFAULT_JSON_CONTENT_TYPE +from dapr.clients.grpc._request import InvokeMethodRequest, BindingRequest, JobEvent +from dapr.clients.grpc._response import InvokeMethodResponse, TopicEventResponse + +InvokeMethodCallable = Callable[ + [InvokeMethodRequest], Awaitable[Union[str, bytes, InvokeMethodResponse]] +] +TopicSubscribeCallable = Callable[[v1.Event], Awaitable[Optional[TopicEventResponse]]] +BindingCallable = Callable[[BindingRequest], Awaitable[None]] +JobEventCallable = Callable[[JobEvent], Awaitable[None]] + +DELIMITER = ':' + + +class Rule: + def __init__(self, match: str, priority: int) -> None: + self.match = match + self.priority = priority + + +class _RegisteredSubscription: + def __init__( + self, + subscription: appcallback_v1.TopicSubscription, + rules: List[Tuple[int, appcallback_v1.TopicRule]], + ): + self.subscription = subscription + self.rules = rules + + +class _AioCallbackServicer( + appcallback_service_v1.AppCallbackServicer, appcallback_service_v1.AppCallbackAlphaServicer +): + """The asyncio-native implementation of the AppCallback Server. + + This internal class implements application server and provides helpers to register + method, topic, and input bindings. It implements the routing handling logic to route + mulitple methods, topics, and bindings. + + :class:`App` provides useful decorators to register method, topic, input bindings. + """ + + def __init__(self): + self._invoke_method_map: Dict[str, InvokeMethodCallable] = {} + self._topic_map: Dict[str, TopicSubscribeCallable] = {} + self._binding_map: Dict[str, BindingCallable] = {} + self._job_event_map: Dict[str, JobEventCallable] = {} + + self._registered_topics_map: Dict[str, _RegisteredSubscription] = {} + self._registered_topics: List[appcallback_v1.TopicSubscription] = [] + self._registered_bindings: List[str] = [] + + def register_method(self, method: str, cb: InvokeMethodCallable) -> None: + """Registers method for service invocation.""" + if method in self._invoke_method_map: + raise ValueError(f'{method} is already registered') + self._invoke_method_map[method] = cb + + def register_topic( + self, + pubsub_name: str, + topic: str, + cb: TopicSubscribeCallable, + metadata: Optional[Dict[str, str]], + dead_letter_topic: Optional[str] = None, + rule: Optional[Rule] = None, + disable_topic_validation: Optional[bool] = False, + ) -> None: + """Registers topic subscription for pubsub.""" + if not disable_topic_validation: + topic_key = pubsub_name + DELIMITER + topic + else: + topic_key = pubsub_name + pubsub_topic = topic_key + DELIMITER + if rule is not None: + path = getattr(cb, '__name__', rule.match) + pubsub_topic = pubsub_topic + path + if pubsub_topic in self._topic_map: + raise ValueError(f'{topic} is already registered with {pubsub_name}') + self._topic_map[pubsub_topic] = cb + + registered_topic = self._registered_topics_map.get(topic_key) + sub: appcallback_v1.TopicSubscription = appcallback_v1.TopicSubscription() + rules: List[Tuple[int, appcallback_v1.TopicRule]] = [] + if not registered_topic: + sub = appcallback_v1.TopicSubscription( + pubsub_name=pubsub_name, + topic=topic, + metadata=metadata, + routes=appcallback_v1.TopicRoutes(), + ) + if dead_letter_topic: + sub.dead_letter_topic = dead_letter_topic + registered_topic = _RegisteredSubscription(sub, rules) + self._registered_topics_map[topic_key] = registered_topic + self._registered_topics.append(sub) + + sub = registered_topic.subscription + rules = registered_topic.rules + + if rule: + path = getattr(cb, '__name__', rule.match) + rules.append((rule.priority, appcallback_v1.TopicRule(match=rule.match, path=path))) + rules.sort(key=lambda x: x[0]) + rs = [rule for id, rule in rules] + del sub.routes.rules[:] + sub.routes.rules.extend(rs) + + def register_binding(self, name: str, cb: BindingCallable) -> None: + """Registers input bindings.""" + if name in self._binding_map: + raise ValueError(f'{name} is already registered') + self._binding_map[name] = cb + self._registered_bindings.append(name) + + def register_job_event(self, name: str, cb: JobEventCallable) -> None: + """Registers job event handler. + + Args: + name (str): The name of the job to handle events for. + cb (JobEventCallable): The callback function to handle job events. + """ + if name in self._job_event_map: + raise ValueError(f'Job event handler for {name} is already registered') + self._job_event_map[name] = cb + + async def OnInvoke(self, request: InvokeRequest, context): + """Invokes service method with InvokeRequest.""" + if request.method not in self._invoke_method_map: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + raise NotImplementedError(f'{request.method} method not implemented!') + + req = InvokeMethodRequest(request.data, request.content_type) + req.metadata = context.invocation_metadata() + resp = await self._invoke_method_map[request.method](req) + + if not resp: + return common_v1.InvokeResponse() + + resp_data = InvokeMethodResponse() + if isinstance(resp, (bytes, str)): + resp_data.set_data(resp) + resp_data.content_type = DEFAULT_JSON_CONTENT_TYPE + elif isinstance(resp, GrpcMessage): + resp_data.set_data(resp) + elif isinstance(resp, InvokeMethodResponse): + resp_data = resp + else: + context.set_code(grpc.StatusCode.OUT_OF_RANGE) + context.set_details(f'{type(resp)} is the invalid return type.') + raise NotImplementedError(f'{request.method} method not implemented!') + + if len(resp_data.get_headers()) > 0: + context.send_initial_metadata(resp_data.get_headers()) + + content_type = '' + if resp_data.content_type: + content_type = resp_data.content_type + + return common_v1.InvokeResponse(data=resp_data.proto, content_type=content_type) + + async def ListTopicSubscriptions(self, request, context): + """Lists all topics subscribed by this app.""" + return appcallback_v1.ListTopicSubscriptionsResponse(subscriptions=self._registered_topics) + + async def OnTopicEvent(self, request: TopicEventRequest, context): + """Subscribes events from Pubsub.""" + pubsub_topic = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path + no_validation_key = request.pubsub_name + DELIMITER + request.path + + if pubsub_topic not in self._topic_map: + if no_validation_key in self._topic_map: + pubsub_topic = no_validation_key + else: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + raise NotImplementedError(f'topic {request.topic} is not implemented!') + + customdata: Struct = request.extensions + extensions = dict() + for k, v in customdata.items(): + extensions[k] = v + for k, v in context.invocation_metadata(): + extensions['_metadata_' + k] = v + + event = v1.Event() + event.SetEventType(request.type) + event.SetEventID(request.id) + event.SetSource(request.source) + event.SetData(request.data) + event.SetContentType(request.data_content_type) + event.SetSubject(request.topic) + event.SetExtensions(extensions) + + response = await self._topic_map[pubsub_topic](event) + if isinstance(response, TopicEventResponse): + return appcallback_v1.TopicEventResponse(status=response.status.value) + return empty_pb2.Empty() + + async def ListInputBindings(self, request, context): + """Lists all input bindings subscribed by this app.""" + return appcallback_v1.ListInputBindingsResponse(bindings=self._registered_bindings) + + async def OnBindingEvent(self, request: BindingEventRequest, context): + """Listens events from the input bindings + User application can save the states or send the events to the output + bindings optionally by returning BindingEventResponse. + """ + if request.name not in self._binding_map: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + raise NotImplementedError(f'{request.name} binding not implemented!') + + req = BindingRequest(request.data, dict(request.metadata)) + req.metadata = context.invocation_metadata() + await self._binding_map[request.name](req) + + # TODO: support output bindings options + return appcallback_v1.BindingEventResponse() + + async def OnJobEventAlpha1(self, request: JobEventRequest, context): + """Handles job events from Dapr runtime. + + This method is called by Dapr when a scheduled job is triggered. + It routes the job event to the appropriate registered handler based on the job name. + + Args: + request (JobEventRequest): The job event request from Dapr. + context: The gRPC context. + + Returns: + appcallback_v1.JobEventResponse: Empty response indicating successful handling. + """ + job_name = request.name + + if job_name not in self._job_event_map: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + raise NotImplementedError(f'Job event handler for {job_name} not implemented!') + + # Create a JobEvent object matching Go SDK's common.JobEvent + # Extract raw data bytes from the Any proto (matching Go implementation) + data_bytes = b'' + if request.HasField('data') and request.data.value: + data_bytes = request.data.value + + job_event = JobEvent(name=request.name, data=data_bytes) + + # Call the registered handler with the JobEvent object + await self._job_event_map[job_name](job_event) + + # Return empty response + return appcallback_v1.JobEventResponse() diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/aio/app.py b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/app.py new file mode 100644 index 000000000..a0ce3ccc0 --- /dev/null +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/app.py @@ -0,0 +1,224 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import grpc.aio + +from typing import Callable, Dict, Optional, cast + +from dapr.conf import settings +from dapr.ext.grpc.aio._health_servicer import _AioHealthCheckServicer # type: ignore +from dapr.ext.grpc.aio._servicer import Rule, _AioCallbackServicer # type: ignore +from dapr.proto import appcallback_service_v1 + + +class App: + """Asyncio-native App object for Dapr gRPC callbacks.""" + + def __init__(self, max_grpc_message_length: Optional[int] = None, **kwargs): + """Inits App object. Server creation is deferred. + + Args: + max_grpc_messsage_length (int, optional): The maximum grpc send and receive + message length in bytes. Only used when kwargs are not set. + kwargs: arguments to grpc.server() + """ + self._servicer = _AioCallbackServicer() + self._health_check_servicer = _AioHealthCheckServicer() + self._server: grpc.aio.Server | None = None + + if kwargs: + self._grpc_server_kwargs = kwargs + else: + options = [] + if max_grpc_message_length is not None: + options = [ + ('grpc.max_send_message_length', max_grpc_message_length), + ('grpc.max_receive_message_length', max_grpc_message_length), + ] + self._grpc_server_kwargs = {'options': options} + + def _create_server(self): + """Creates the gRPC server instance.""" + self._server = grpc.aio.server(**self._grpc_server_kwargs) # type: ignore + # Add the async servicers to the newly created server + appcallback_service_v1.add_AppCallbackServicer_to_server(self._servicer, self._server) + appcallback_service_v1.add_AppCallbackHealthCheckServicer_to_server( + self._health_check_servicer, self._server + ) + + async def run( + self, app_port: Optional[int] = None, listen_address: Optional[str] = None + ) -> None: + """Creates, starts the async app gRPC server and waits for termination. + + Args: + app_port (int, optional): The port on which to listen for incoming gRPC calls. + Defaults to settings.GRPC_APP_PORT. + listen_address (str, optional): The IP address on which to listen for incoming gRPC + calls. Defaults to [::] (all IP addresses). + """ + + # Initialize the gRPC aio server here to receive the correct asyncio loop + self._create_server() + self._server = cast(grpc.aio.Server, self._server) + + if app_port is None: + app_port = settings.GRPC_APP_PORT + self._server.add_insecure_port(f"{listen_address if listen_address else '[::]'}:{app_port}") + await self._server.start() + await self._server.wait_for_termination() + + async def stop(self, grace: Optional[float] = None) -> None: + """Stops the async app server gracefully.""" + if self._server is None: + return + await self._server.stop(grace) + + def add_external_service(self, servicer_callback, external_servicer): + servicer_callback(external_servicer, self._server) + + def register_health_check(self, health_check_callback: Callable): + self._health_check_servicer.register_health_check(health_check_callback) + + def method(self, name: str) -> Callable: + """A decorator that is used to register the method for the service invocation. + + Return JSON formatted data response:: + + @app.method('start') + async def start(request: InvokeMethodRequest): + + ... + + return json.dumps() + + Return Protocol buffer response:: + + @app.method('start') + async def start(request: InvokeMethodRequest): + + ... + + return CustomProtoResponse(data='hello world') + + + Specify Response header:: + + @app.method('start') + async def start(request: InvokeMethodRequest): + + ... + + resp = InvokeMethodResponse('hello world', 'text/plain') + resp.headers = ('key', 'value') + + return resp + + Args: + name (str): name of invoked method + """ + + def decorator(func: Callable) -> Callable: + self._servicer.register_method(name, func) + return func + + return decorator + + def subscribe( + self, + pubsub_name: str, + topic: str, + metadata: Optional[Dict[str, str]] = {}, + dead_letter_topic: Optional[str] = None, + rule: Optional[Rule] = None, + disable_topic_validation: Optional[bool] = False, + ) -> Callable: + """A decorator that is used to register the subscribing topic method. + + The below example registers 'topic' subscription topic and pass custom + metadata to pubsub component:: + + from cloudevents.sdk.event import v1 + + @app.subscribe('pubsub_name', 'topic', metadata=(('session-id', 'session-id-value'),)) + async def topic(event: v1.Event) -> None: + ... + + Args: + pubsub_name (str): the name of the pubsub component + topic (str): the topic name which is subscribed + metadata (dict, optional): metadata which will be passed to pubsub component + during initialization + dead_letter_topic (str, optional): the dead letter topic name for the subscription + """ + + def decorator(func: Callable) -> Callable: + self._servicer.register_topic( + pubsub_name, + topic, + func, + metadata, + dead_letter_topic, + rule, + disable_topic_validation, + ) + return func + + return decorator + + def binding(self, name: str) -> Callable: + """A decorator that is used to register input binding. + + The below registers input binding which this application subscribes: + + @app.binding('input') + async def input(request: BindingRequest) -> None: + ... + + Args: + name (str): the name of invoked method + """ + + def decorator(func: Callable) -> Callable: + self._servicer.register_binding(name, func) + return func + + return decorator + + def job_event(self, name: str): + """A decorator that is used to register job event handler. + + This decorator registers a handler for job events triggered by the Dapr scheduler. + The handler will be called when a job with the specified name is triggered. + + The below registers a job event handler for jobs named 'my-job': + + from dapr.ext.grpc import JobEvent + + @app.job_event('my-job') + def handle_my_job(job_event: JobEvent) -> None: + print(f"Job {job_event.name} triggered") + data_str = job_event.get_data_as_string() + print(f"Job data: {data_str}") + # Process the job... + + Args: + name (str): the name of the job to handle events for + """ + + def decorator(func): + self._servicer.register_job_event(name, func) + + return decorator diff --git a/ext/dapr-ext-grpc/tests/aio/__init__.py b/ext/dapr-ext-grpc/tests/aio/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ext/dapr-ext-grpc/tests/aio/test_app.py b/ext/dapr-ext-grpc/tests/aio/test_app.py new file mode 100644 index 000000000..70484d8c3 --- /dev/null +++ b/ext/dapr-ext-grpc/tests/aio/test_app.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import unittest + +from cloudevents.sdk.event import v1 +from dapr.ext.grpc.aio import App, Rule, InvokeMethodRequest, BindingRequest + + +class AppTests(unittest.TestCase): + def setUp(self): + self._app = App() + + def test_method_decorator(self): + @self._app.method('Method1') + async def method1(request: InvokeMethodRequest): + pass + + @self._app.method('Method2') + async def method2(request: InvokeMethodRequest): + pass + + method_map = self._app._servicer._invoke_method_map + self.assertIn('AppTests.test_method_decorator..method1', str(method_map['Method1'])) + self.assertIn('AppTests.test_method_decorator..method2', str(method_map['Method2'])) + + def test_binding_decorator(self): + @self._app.binding('binding1') + async def binding1(request: BindingRequest): + pass + + binding_map = self._app._servicer._binding_map + self.assertIn( + 'AppTests.test_binding_decorator..binding1', str(binding_map['binding1']) + ) + + def test_subscribe_decorator(self): + @self._app.subscribe(pubsub_name='pubsub', topic='topic') + async def handle_default(event: v1.Event) -> None: + pass + + @self._app.subscribe( + pubsub_name='pubsub', topic='topic', rule=Rule('event.type == "test"', 1) + ) + async def handle_test_event(event: v1.Event) -> None: + pass + + @self._app.subscribe(pubsub_name='pubsub', topic='topic2', dead_letter_topic='topic2_dead') + async def handle_dead_letter(event: v1.Event) -> None: + pass + + subscription_map = self._app._servicer._topic_map + self.assertIn( + 'AppTests.test_subscribe_decorator..handle_default', + str(subscription_map['pubsub:topic:']), + ) + self.assertIn( + 'AppTests.test_subscribe_decorator..handle_test_event', + str(subscription_map['pubsub:topic:handle_test_event']), + ) + self.assertIn( + 'AppTests.test_subscribe_decorator..handle_dead_letter', + str(subscription_map['pubsub:topic2:']), + ) + + def test_register_health_check(self): + async def health_check_cb(): + pass + + self._app.register_health_check(health_check_cb) + registered_cb = self._app._health_check_servicer._health_check_cb + self.assertIn( + 'AppTests.test_register_health_check..health_check_cb', str(registered_cb) + ) + + def test_no_health_check(self): + registered_cb = self._app._health_check_servicer._health_check_cb + self.assertIsNone(registered_cb) diff --git a/ext/dapr-ext-grpc/tests/aio/test_health_servicer.py b/ext/dapr-ext-grpc/tests/aio/test_health_servicer.py new file mode 100644 index 000000000..afc96ca23 --- /dev/null +++ b/ext/dapr-ext-grpc/tests/aio/test_health_servicer.py @@ -0,0 +1,20 @@ +import unittest +from unittest.mock import AsyncMock, Mock + +from dapr.ext.grpc.aio._health_servicer import _AioHealthCheckServicer + + +class OnInvokeTests(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self._health_servicer = _AioHealthCheckServicer() + + async def test_healthcheck_cb_called(self): + health_cb = AsyncMock() + self._health_servicer.register_health_check(health_cb) + await self._health_servicer.HealthCheck(None, Mock()) + health_cb.assert_called_once() + + async def test_no_healthcheck_cb(self): + with self.assertRaises(NotImplementedError) as exception_context: + await self._health_servicer.HealthCheck(None, Mock()) + self.assertIn('Method not implemented!', exception_context.exception.args[0]) diff --git a/ext/dapr-ext-grpc/tests/aio/test_servicier.py b/ext/dapr-ext-grpc/tests/aio/test_servicier.py new file mode 100644 index 000000000..5f9eb47be --- /dev/null +++ b/ext/dapr-ext-grpc/tests/aio/test_servicier.py @@ -0,0 +1,229 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import unittest + +from unittest.mock import AsyncMock, MagicMock + +from dapr.clients.grpc._request import InvokeMethodRequest +from dapr.clients.grpc._response import InvokeMethodResponse, TopicEventResponse +from dapr.ext.grpc.aio._servicer import _AioCallbackServicer +from dapr.proto import common_v1, appcallback_v1 + +from google.protobuf.any_pb2 import Any as GrpcAny + + +class OnInvokeTests(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self._servicer = _AioCallbackServicer() + + async def _on_invoke(self, method_name, method_cb): + self._servicer.register_method(method_name, method_cb) + + # fake context + fake_context = MagicMock() + fake_context.invocation_metadata.return_value = ( + ('key1', 'value1'), + ('key2', 'value1'), + ) + + return await self._servicer.OnInvoke( + common_v1.InvokeRequest(method=method_name, data=GrpcAny()), + fake_context, + ) + + async def test_on_invoke_return_str(self): + async def method_cb(request: InvokeMethodRequest): + return 'method_str_cb' + + resp = await self._on_invoke('method_str', method_cb) + + self.assertEqual(b'method_str_cb', resp.data.value) + + async def test_on_invoke_return_bytes(self): + async def method_cb(request: InvokeMethodRequest): + return b'method_str_cb' + + resp = await self._on_invoke('method_bytes', method_cb) + + self.assertEqual(b'method_str_cb', resp.data.value) + + async def test_on_invoke_return_proto(self): + async def method_cb(request: InvokeMethodRequest): + return common_v1.StateItem(key='fake_key') + + resp = await self._on_invoke('method_proto', method_cb) + + state = common_v1.StateItem() + resp.data.Unpack(state) + + self.assertEqual('fake_key', state.key) + + async def test_on_invoke_return_invoke_method_response(self): + async def method_cb(request: InvokeMethodRequest): + return InvokeMethodResponse( + data='fake_data', + content_type='text/plain', + ) + + resp = await self._on_invoke('method_resp', method_cb) + + self.assertEqual(b'fake_data', resp.data.value) + self.assertEqual('text/plain', resp.content_type) + + async def test_on_invoke_invalid_response(self): + async def method_cb(request: InvokeMethodRequest): + return 1000 + + with self.assertRaises(NotImplementedError): + await self._on_invoke('method_resp', method_cb) + + +class TopicSubscriptionTests(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self._servicer = _AioCallbackServicer() + self._topic1_method = AsyncMock() + self._topic2_method = AsyncMock() + self._topic3_method = AsyncMock() + self._topic3_method.return_value = TopicEventResponse('success') + self._topic4_method = AsyncMock() + + self._servicer.register_topic('pubsub1', 'topic1', self._topic1_method, {'session': 'key'}) + self._servicer.register_topic('pubsub1', 'topic3', self._topic3_method, {'session': 'key'}) + self._servicer.register_topic('pubsub2', 'topic2', self._topic2_method, {'session': 'key'}) + self._servicer.register_topic('pubsub2', 'topic3', self._topic3_method, {'session': 'key'}) + self._servicer.register_topic( + 'pubsub3', + 'topic4', + self._topic4_method, + {'session': 'key'}, + disable_topic_validation=True, + ) + + # fake context + self.fake_context = MagicMock() + self.fake_context.invocation_metadata.return_value = ( + ('key1', 'value1'), + ('key2', 'value1'), + ) + + def test_duplicated_topic(self): + with self.assertRaises(ValueError): + self._servicer.register_topic( + 'pubsub1', 'topic1', self._topic1_method, {'session': 'key'} + ) + + async def test_list_topic_subscription(self): + resp = await self._servicer.ListTopicSubscriptions(None, None) + self.assertEqual('pubsub1', resp.subscriptions[0].pubsub_name) + self.assertEqual('topic1', resp.subscriptions[0].topic) + self.assertEqual({'session': 'key'}, resp.subscriptions[0].metadata) + self.assertEqual('pubsub1', resp.subscriptions[1].pubsub_name) + self.assertEqual('topic3', resp.subscriptions[1].topic) + self.assertEqual({'session': 'key'}, resp.subscriptions[1].metadata) + self.assertEqual('pubsub2', resp.subscriptions[2].pubsub_name) + self.assertEqual('topic2', resp.subscriptions[2].topic) + self.assertEqual({'session': 'key'}, resp.subscriptions[2].metadata) + self.assertEqual('pubsub2', resp.subscriptions[3].pubsub_name) + self.assertEqual('topic3', resp.subscriptions[3].topic) + self.assertEqual({'session': 'key'}, resp.subscriptions[3].metadata) + self.assertEqual('topic4', resp.subscriptions[4].topic) + self.assertEqual({'session': 'key'}, resp.subscriptions[4].metadata) + + async def test_topic_event(self): + await self._servicer.OnTopicEvent( + appcallback_v1.TopicEventRequest(pubsub_name='pubsub1', topic='topic1'), + self.fake_context, + ) + + self._topic1_method.assert_called_once() + + async def test_topic3_event_called_once(self): + await self._servicer.OnTopicEvent( + appcallback_v1.TopicEventRequest(pubsub_name='pubsub1', topic='topic3'), + self.fake_context, + ) + + self._topic3_method.assert_called_once() + + async def test_topic3_event_response(self): + response = await self._servicer.OnTopicEvent( + appcallback_v1.TopicEventRequest(pubsub_name='pubsub1', topic='topic3'), + self.fake_context, + ) + self.assertIsInstance(response, appcallback_v1.TopicEventResponse) + self.assertEqual( + response.status, appcallback_v1.TopicEventResponse.TopicEventResponseStatus.SUCCESS + ) + + async def test_disable_topic_validation(self): + await self._servicer.OnTopicEvent( + appcallback_v1.TopicEventRequest(pubsub_name='pubsub3', topic='should_be_ignored'), + self.fake_context, + ) + + self._topic4_method.assert_called_once() + + async def test_non_registered_topic(self): + with self.assertRaises(NotImplementedError): + await self._servicer.OnTopicEvent( + appcallback_v1.TopicEventRequest(pubsub_name='pubsub1', topic='topic_non_existed'), + self.fake_context, + ) + + +class BindingTests(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self._servicer = _AioCallbackServicer() + self._binding1_method = AsyncMock() + self._binding2_method = AsyncMock() + + self._servicer.register_binding('binding1', self._binding1_method) + self._servicer.register_binding('binding2', self._binding2_method) + + # fake context + self.fake_context = MagicMock() + self.fake_context.invocation_metadata.return_value = ( + ('key1', 'value1'), + ('key2', 'value1'), + ) + + def test_duplicated_binding(self): + with self.assertRaises(ValueError): + self._servicer.register_binding('binding1', self._binding1_method) + + async def test_list_bindings(self): + resp = await self._servicer.ListInputBindings(None, None) + self.assertEqual('binding1', resp.bindings[0]) + self.assertEqual('binding2', resp.bindings[1]) + + async def test_binding_event(self): + await self._servicer.OnBindingEvent( + appcallback_v1.BindingEventRequest(name='binding1'), + self.fake_context, + ) + + self._binding1_method.assert_called_once() + + async def test_non_registered_binding(self): + with self.assertRaises(NotImplementedError): + await self._servicer.OnBindingEvent( + appcallback_v1.BindingEventRequest(name='binding3'), + self.fake_context, + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tox.ini b/tox.ini index 0c9ebeabb..d1bbeea1e 100644 --- a/tox.ini +++ b/tox.ini @@ -15,6 +15,7 @@ commands = coverage run -m unittest discover -v ./tests coverage run -a -m unittest discover -v ./ext/dapr-ext-workflow/tests coverage run -a -m unittest discover -v ./ext/dapr-ext-grpc/tests + coverage run -a -m unittest discover -v ./ext/dapr-ext-grpc/tests/aio coverage run -a -m unittest discover -v ./ext/dapr-ext-fastapi/tests coverage run -a -m unittest discover -v ./ext/flask_dapr/tests coverage xml From c0f90f91cd124ab097f8b6a6c3013f146f5789f9 Mon Sep 17 00:00:00 2001 From: Vasilis Tsiolkas Date: Tue, 2 Sep 2025 11:28:05 +0300 Subject: [PATCH 2/3] Adds example for grpc.aio App. Signed-off-by: Vasilis Tsiolkas --- examples/invoke-aio/Dockerfile | 10 ++ examples/invoke-aio/README.md | 118 ++++++++++++++++++ examples/invoke-aio/deploy/invoke-caller.yaml | 37 ++++++ .../invoke-aio/deploy/invoke-receiver.yaml | 40 ++++++ examples/invoke-aio/invoke-caller.py | 22 ++++ examples/invoke-aio/invoke-receiver.py | 15 +++ examples/invoke-aio/requirements.txt | 2 + 7 files changed, 244 insertions(+) create mode 100644 examples/invoke-aio/Dockerfile create mode 100644 examples/invoke-aio/README.md create mode 100644 examples/invoke-aio/deploy/invoke-caller.yaml create mode 100644 examples/invoke-aio/deploy/invoke-receiver.yaml create mode 100644 examples/invoke-aio/invoke-caller.py create mode 100644 examples/invoke-aio/invoke-receiver.py create mode 100644 examples/invoke-aio/requirements.txt diff --git a/examples/invoke-aio/Dockerfile b/examples/invoke-aio/Dockerfile new file mode 100644 index 000000000..892d3f624 --- /dev/null +++ b/examples/invoke-aio/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.9-slim + +WORKDIR /app + +ADD requirements.txt . +RUN pip install -r requirements.txt + +COPY *.py /app/ + +CMD [ "python", "invoke-receiver.py" ] diff --git a/examples/invoke-aio/README.md b/examples/invoke-aio/README.md new file mode 100644 index 000000000..8eed2276e --- /dev/null +++ b/examples/invoke-aio/README.md @@ -0,0 +1,118 @@ +# Example - Invoke a service + +This example utilizes a receiver and a caller for the OnInvoke / Invoke functionality. It will create an aio gRPC server and bind the OnInvoke method to an async function, which gets called after a client sends a direct method invocation. + +> **Note:** Make sure to use the latest proto bindings + +## Pre-requisites + +- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started) +- [Install Python 3.9+](https://www.python.org/downloads/) + +## Install Dapr python-SDK + + + +```bash +pip3 install dapr dapr-ext-grpc +``` + +## Running in self-hosted mode + +Run the following command in a terminal/command-prompt: + + + +```bash +# 1. Start Receiver (expose gRPC server receiver on port 50051) +dapr run --app-id invoke-receiver --app-protocol grpc --app-port 50051 python3 invoke-receiver.py +``` + + + +In another terminal/command prompt run: + + + +```bash +# 2. Start Caller +dapr run --app-id invoke-caller --app-protocol grpc --dapr-http-port 3500 python3 invoke-caller.py +``` + + + +## Cleanup + + + +```bash +dapr stop --app-id invoke-caller +dapr stop --app-id invoke-receiver +``` + + + +## Running in Kubernetes mode + +1. Build docker image + + ``` + docker build -t [your registry]/invokesimple:latest . + ``` + +2. Push docker image + + ``` + docker push [your registry]/invokesimple:latest + ``` + +3. Edit image name to `[your registry]/invokesimple:latest` in deploy/*.yaml + +4. Deploy applications + + ``` + kubectl apply -f ./deploy/ + ``` + +5. See logs for the apps and sidecars + + Logs for caller sidecar: + ``` + dapr logs -a invoke-caller -k + ``` + + Logs for caller app: + ``` + kubectl logs -l app="invokecaller" -c invokecaller + ``` + + Logs for receiver sidecar: + ``` + dapr logs -a invoke-receiver -k + ``` + + Logs for receiver app: + ``` + kubectl logs -l app="invokereceiver" -c invokereceiver + ``` diff --git a/examples/invoke-aio/deploy/invoke-caller.yaml b/examples/invoke-aio/deploy/invoke-caller.yaml new file mode 100644 index 000000000..0c60fd6f8 --- /dev/null +++ b/examples/invoke-aio/deploy/invoke-caller.yaml @@ -0,0 +1,37 @@ +# Copyright 2021 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: invokecaller + labels: + app: invokecaller +spec: + replicas: 1 + selector: + matchLabels: + app: invokecaller + template: + metadata: + labels: + app: invokecaller + annotations: + dapr.io/enabled: "true" + dapr.io/app-id: "invoke-caller" + dapr.io/app-protocol: "grpc" + spec: + containers: + - name: invokecaller + image: invokesimple:latest # EDIT HERE: Replace the image name + command: ["python"] + args: ["/app/invoke-caller.py"] + imagePullPolicy: Always diff --git a/examples/invoke-aio/deploy/invoke-receiver.yaml b/examples/invoke-aio/deploy/invoke-receiver.yaml new file mode 100644 index 000000000..07cd4804a --- /dev/null +++ b/examples/invoke-aio/deploy/invoke-receiver.yaml @@ -0,0 +1,40 @@ +# Copyright 2021 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: invokereceiver + labels: + app: invokereceiver +spec: + replicas: 1 + selector: + matchLabels: + app: invokereceiver + template: + metadata: + labels: + app: invokereceiver + annotations: + dapr.io/enabled: "true" + dapr.io/app-id: "invoke-receiver" + dapr.io/app-protocol: "grpc" + dapr.io/app-port: "50051" + spec: + containers: + - name: invokereceiver + image: invokesimple:latest # EDIT HERE: Replace the image name + command: ["python"] + args: ["/app/invoke-receiver.py"] + ports: + - containerPort: 3000 + imagePullPolicy: Always diff --git a/examples/invoke-aio/invoke-caller.py b/examples/invoke-aio/invoke-caller.py new file mode 100644 index 000000000..5c5773ea2 --- /dev/null +++ b/examples/invoke-aio/invoke-caller.py @@ -0,0 +1,22 @@ +import json +import time + +from dapr.clients import DaprClient + +with DaprClient() as d: + req_data = {'id': 1, 'message': 'hello world'} + + while True: + # Create a typed message with content type and body + resp = d.invoke_method( + 'invoke-receiver', + 'my-method', + data=json.dumps(req_data), + ) + + # Print the response + print(resp.content_type, flush=True) + print(resp.text(), flush=True) + print(str(resp.status_code), flush=True) + + time.sleep(2) diff --git a/examples/invoke-aio/invoke-receiver.py b/examples/invoke-aio/invoke-receiver.py new file mode 100644 index 000000000..1b140e5e5 --- /dev/null +++ b/examples/invoke-aio/invoke-receiver.py @@ -0,0 +1,15 @@ +import asyncio +from dapr.ext.grpc.aio import App, InvokeMethodRequest, InvokeMethodResponse + +app = App() + + +@app.method(name='my-method') +async def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse: + print(request.metadata, flush=True) + print(request.text(), flush=True) + + return InvokeMethodResponse(b'INVOKE_RECEIVED', 'text/plain; charset=UTF-8') + + +asyncio.run(app.run(50051)) diff --git a/examples/invoke-aio/requirements.txt b/examples/invoke-aio/requirements.txt new file mode 100644 index 000000000..ee0ce7078 --- /dev/null +++ b/examples/invoke-aio/requirements.txt @@ -0,0 +1,2 @@ +dapr-ext-grpc-dev >= 1.15.0.dev +dapr-dev >= 1.15.0.dev From 3f2b9fd0fed7efea2358f174b67465624f437a21 Mon Sep 17 00:00:00 2001 From: Vasilis Tsiolkas Date: Thu, 11 Sep 2025 14:14:50 +0300 Subject: [PATCH 3/3] chore: Change copyright dates Signed-off-by: Vasilis Tsiolkas --- examples/invoke-aio/deploy/invoke-caller.yaml | 2 +- examples/invoke-aio/deploy/invoke-receiver.yaml | 2 +- ext/dapr-ext-grpc/dapr/ext/grpc/aio/__init__.py | 2 +- ext/dapr-ext-grpc/dapr/ext/grpc/aio/_servicer.py | 2 +- ext/dapr-ext-grpc/dapr/ext/grpc/aio/app.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/invoke-aio/deploy/invoke-caller.yaml b/examples/invoke-aio/deploy/invoke-caller.yaml index 0c60fd6f8..84ac2a2ca 100644 --- a/examples/invoke-aio/deploy/invoke-caller.yaml +++ b/examples/invoke-aio/deploy/invoke-caller.yaml @@ -1,4 +1,4 @@ -# Copyright 2021 The Dapr Authors +# Copyright 2025 The Dapr Authors # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at diff --git a/examples/invoke-aio/deploy/invoke-receiver.yaml b/examples/invoke-aio/deploy/invoke-receiver.yaml index 07cd4804a..513a71cca 100644 --- a/examples/invoke-aio/deploy/invoke-receiver.yaml +++ b/examples/invoke-aio/deploy/invoke-receiver.yaml @@ -1,4 +1,4 @@ -# Copyright 2021 The Dapr Authors +# Copyright 2025 The Dapr Authors # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/aio/__init__.py b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/__init__.py index 56aa3d17a..e2a41b776 100644 --- a/ext/dapr-ext-grpc/dapr/ext/grpc/aio/__init__.py +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/__init__.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ -Copyright 2023 The Dapr Authors +Copyright 2025 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/aio/_servicer.py b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/_servicer.py index 0276f9110..ed7b392b3 100644 --- a/ext/dapr-ext-grpc/dapr/ext/grpc/aio/_servicer.py +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/_servicer.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ -Copyright 2023 The Dapr Authors +Copyright 2025 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/aio/app.py b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/app.py index a0ce3ccc0..6c7471bd7 100644 --- a/ext/dapr-ext-grpc/dapr/ext/grpc/aio/app.py +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/aio/app.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ -Copyright 2023 The Dapr Authors +Copyright 2025 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at