From 298ee75891530161be5f4cb9fbb692203c408a48 Mon Sep 17 00:00:00 2001 From: Mateus Junges Date: Tue, 9 Sep 2025 00:38:29 -0300 Subject: [PATCH 1/6] Remove queueable handlers (#363) * Remove queueable handlers * Un-document queueable handlers * Remove tests * Linting --- composer.json | 2 +- docs/consuming-messages/queueable-handlers.md | 61 --------------- src/Concerns/HandleConsumedMessage.php | 28 ------- src/Concerns/PrepareMiddlewares.php | 24 ------ src/Consumers/CallableConsumer.php | 77 ++++++------------- src/Consumers/DispatchQueuedHandler.php | 37 --------- tests/Consumers/ConsumerTest.php | 30 -------- tests/Consumers/SimpleQueueableHandler.php | 16 ---- 8 files changed, 24 insertions(+), 251 deletions(-) delete mode 100644 docs/consuming-messages/queueable-handlers.md delete mode 100644 src/Concerns/HandleConsumedMessage.php delete mode 100644 src/Concerns/PrepareMiddlewares.php delete mode 100644 src/Consumers/DispatchQueuedHandler.php delete mode 100644 tests/Consumers/SimpleQueueableHandler.php diff --git a/composer.json b/composer.json index 740ab8f3..cc9a0587 100644 --- a/composer.json +++ b/composer.json @@ -37,7 +37,7 @@ } ], "scripts": { - "test": "vendor/bin/phpunit tests", + "test": "vendor/bin/phpunit", "format": "vendor/bin/pint" }, "extra": { diff --git a/docs/consuming-messages/queueable-handlers.md b/docs/consuming-messages/queueable-handlers.md deleted file mode 100644 index 7fa1208c..00000000 --- a/docs/consuming-messages/queueable-handlers.md +++ /dev/null @@ -1,61 +0,0 @@ ---- -title: Queueable handlers -weight: 11 ---- - -Queueable handlers allow you to handle your kafka messages in a queue. This will put a job into the Laravel queue system for each message received by your Kafka consumer. - -```+parse - -``` - -This only requires you to implements the `Illuminate\Contracts\Queue\ShouldQueue` interface in your Handler. - -This is how a queueable handler looks like: - -```php -use Illuminate\Contracts\Queue\ShouldQueue; -use Junges\Kafka\Contracts\Handler as HandlerContract; -use Junges\Kafka\Contracts\KafkaConsumerMessage; - -class Handler implements HandlerContract, ShouldQueue -{ - public function __invoke(KafkaConsumerMessage $message): void - { - // Handle the consumed message. - } -} -``` - -As you can see on the `__invoke` method, queued handlers does not have access to a `MessageConsumer` instance when handling the message, -because it's running on a laravel queue and there are no actions that can be performed asynchronously on Kafka message consumer. - -You can specify which queue connection and queue name to use for your handler by implementing the `onConnection` and `onQueue` methods: - -```php -use Illuminate\Contracts\Queue\ShouldQueue; -use Junges\Kafka\Contracts\Handler as HandlerContract; -use Junges\Kafka\Contracts\KafkaConsumerMessage; - -class Handler implements HandlerContract, ShouldQueue -{ - public function __invoke(KafkaConsumerMessage $message): void - { - // Handle the consumed message. - } - - public function onConnection(): string - { - return 'sqs'; // Specify your queue connection - } - - public function onQueue(): string - { - return 'kafka-handlers'; // Specify your queue name - } -} -``` - -After creating your handler class, you can use it just as a normal handler, and `laravel-kafka` will know how to handle it under the hoods 😄. - - diff --git a/src/Concerns/HandleConsumedMessage.php b/src/Concerns/HandleConsumedMessage.php deleted file mode 100644 index e5f112a0..00000000 --- a/src/Concerns/HandleConsumedMessage.php +++ /dev/null @@ -1,28 +0,0 @@ - $this->wrapMiddleware($middleware, $consumer), $middlewares); - $middlewares = array_reverse($middlewares); - - foreach ($middlewares as $middleware) { - $handler = $middleware($handler); - } - - $handler($message, $consumer); - } -} diff --git a/src/Concerns/PrepareMiddlewares.php b/src/Concerns/PrepareMiddlewares.php deleted file mode 100644 index 0ddd4d64..00000000 --- a/src/Concerns/PrepareMiddlewares.php +++ /dev/null @@ -1,24 +0,0 @@ - new $middleware, - $middleware instanceof Middleware => $middleware, - is_callable($middleware) => $middleware, - default => throw new LogicException('Invalid middleware.') - }; - - return static fn (callable $handler) => static fn ($message) => $middleware($message, fn ($message) => $handler($message, $consumer)); - } -} diff --git a/src/Consumers/CallableConsumer.php b/src/Consumers/CallableConsumer.php index 430d220d..bbd7323b 100644 --- a/src/Consumers/CallableConsumer.php +++ b/src/Consumers/CallableConsumer.php @@ -3,80 +3,49 @@ namespace Junges\Kafka\Consumers; use Closure; -use Illuminate\Contracts\Bus\Dispatcher; -use Illuminate\Contracts\Queue\ShouldQueue; -use Illuminate\Support\Facades\App; -use Junges\Kafka\Concerns\HandleConsumedMessage; -use Junges\Kafka\Concerns\PrepareMiddlewares; use Junges\Kafka\Contracts\Consumer; use Junges\Kafka\Contracts\ConsumerMessage; use Junges\Kafka\Contracts\Handler; use Junges\Kafka\Contracts\MessageConsumer; +use Junges\Kafka\Contracts\Middleware; +use LogicException; class CallableConsumer extends Consumer { - use HandleConsumedMessage; - use PrepareMiddlewares; - - private Dispatcher $dispatcher; - public function __construct(private Closure|Handler $handler, private readonly array $middlewares) { - $this->handler = $this->handler instanceof Handler - ? $handler - : $handler(...); - - $this->dispatcher = App::make(Dispatcher::class); + $this->handler = match (true) { + $handler instanceof Handler => $handler, + default => $handler(...), + }; } - /** Handle the received message. */ public function handle(ConsumerMessage $message, MessageConsumer $consumer): void - { - // If the message handler should be queued, we will dispatch a job to handle this message. - // Otherwise, the message will be handled synchronously. - if ($this->shouldQueueHandler()) { - $this->queueHandler($this->handler, $message, $this->middlewares); - - return; - } - - $this->handleMessageSynchronously($message, $consumer); - } - - private function shouldQueueHandler(): bool - { - return $this->handler instanceof ShouldQueue; - } - - private function handleMessageSynchronously(ConsumerMessage $message, MessageConsumer $consumer): void { $this->handleConsumedMessage($message, $this->handler, $consumer, $this->middlewares); } - /** - * This method dispatches a job to handle the consumed message. You can customize the connection and - * queue in which it will be dispatched using the onConnection and onQueue methods. If this - * methods doesn't exist in the handler class, we will use the default configuration accordingly to - * your queue.php config file. - */ - private function queueHandler(Handler $handler, ConsumerMessage $message, array $middlewares): void + private function handleConsumedMessage(ConsumerMessage $message, Handler|Closure $handler, ?MessageConsumer $consumer = null, array $middlewares = []): void { - $connection = config('queue.default'); + $middlewares = array_map(fn ($middleware) => $this->wrapMiddleware($middleware, $consumer), $middlewares); + $middlewares = array_reverse($middlewares); - if (method_exists($handler, 'onConnection')) { - $connection = $handler->onConnection(); + foreach ($middlewares as $middleware) { + $handler = $middleware($handler); } - $queue = config("queue.$connection.queue", 'default'); - - if (method_exists($handler, 'onQueue')) { - $queue = $handler->onQueue(); - } + $handler($message, $consumer); + } - $this->dispatcher->dispatch( - (new DispatchQueuedHandler($handler, $message, $middlewares)) - ->onQueue($queue) - ->onConnection($connection) - ); + private function wrapMiddleware(Middleware|string|callable $middleware, ?MessageConsumer $consumer = null): callable + { + $middleware = match (true) { + is_string($middleware) && is_subclass_of($middleware, Middleware::class) => new $middleware, + $middleware instanceof Middleware => $middleware, + is_callable($middleware) => $middleware, + default => throw new LogicException('Invalid middleware.') + }; + + return static fn (callable $handler) => static fn ($message) => $middleware($message, fn ($message) => $handler($message, $consumer)); } } diff --git a/src/Consumers/DispatchQueuedHandler.php b/src/Consumers/DispatchQueuedHandler.php deleted file mode 100644 index 6c2fdbed..00000000 --- a/src/Consumers/DispatchQueuedHandler.php +++ /dev/null @@ -1,37 +0,0 @@ -handleConsumedMessage( - message: $this->message, - handler: $this->handler, - middlewares: $this->middlewares - ); - } -} diff --git a/tests/Consumers/ConsumerTest.php b/tests/Consumers/ConsumerTest.php index 62ec9a40..7ef1ae24 100644 --- a/tests/Consumers/ConsumerTest.php +++ b/tests/Consumers/ConsumerTest.php @@ -2,13 +2,11 @@ namespace Junges\Kafka\Tests\Consumers; -use Illuminate\Support\Facades\Bus; use Illuminate\Support\Facades\Event; use Junges\Kafka\Commit\VoidCommitter; use Junges\Kafka\Config\Config; use Junges\Kafka\Consumers\CallableConsumer; use Junges\Kafka\Consumers\Consumer; -use Junges\Kafka\Consumers\DispatchQueuedHandler; use Junges\Kafka\Contracts\CommitterFactory; use Junges\Kafka\Contracts\ConsumerMessage; use Junges\Kafka\Contracts\Handler; @@ -105,34 +103,6 @@ public function it_can_consume_messages(): void Event::assertDispatched(MessageConsumed::class, fn (MessageConsumed $e) => $e->message === $fakeConsumer->getMessage()); } - #[Test] - public function it_can_consume_messages_with_queueable_handlers(): void - { - Bus::fake(); - $message = new Message; - $message->err = 0; - $message->key = 'key'; - $message->topic_name = 'test'; - $message->payload = '{"body": "message payload"}'; - $message->headers = []; - $message->partition = 1; - $message->offset = 0; - - $this->mockConsumerWithMessage($message); - - $this->mockProducer(); - - $consumer = Kafka::consumer(['test']) - ->withHandler($fakeConsumer = new SimpleQueueableHandler) - ->withAutoCommit() - ->withMaxMessages(1) - ->build(); - - $consumer->consume(); - - Bus::assertDispatched(DispatchQueuedHandler::class); - } - #[Test] public function consume_message_with_error(): void { diff --git a/tests/Consumers/SimpleQueueableHandler.php b/tests/Consumers/SimpleQueueableHandler.php deleted file mode 100644 index 9e01307c..00000000 --- a/tests/Consumers/SimpleQueueableHandler.php +++ /dev/null @@ -1,16 +0,0 @@ - Date: Tue, 9 Sep 2025 00:41:40 -0300 Subject: [PATCH 2/6] Drop support for php 8.2 (#364) * Drop support for php 8.2 * wip --- .github/workflows/run-tests.yml | 2 +- composer.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index ada77e28..50d37aca 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -8,7 +8,7 @@ jobs: strategy: matrix: - php: [8.4, 8.3, 8.2] + php: [8.4, 8.3] laravel: [12.*, 11.*, 10.*] dependency-version: [prefer-stable] include: diff --git a/composer.json b/composer.json index cc9a0587..d96d01a2 100644 --- a/composer.json +++ b/composer.json @@ -3,7 +3,7 @@ "description": "A kafka driver for laravel", "type": "library", "require": { - "php": "^8.2|^8.3|^8.4", + "php": "^8.3|^8.4", "ext-rdkafka": "^6.0", "monolog/monolog": "^3", "mateusjunges/avro-serde-php": "^3.0", From 18b144ff6eb1cd43acb71641c6c23410b4f774e2 Mon Sep 17 00:00:00 2001 From: Mateus Junges Date: Tue, 9 Sep 2025 00:45:03 -0300 Subject: [PATCH 3/6] Drop support for laravel 10 (#365) * Drop support for laravel 10 * wip --- .github/workflows/run-tests.yml | 4 +--- composer.json | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 50d37aca..79f6353c 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -9,11 +9,9 @@ jobs: strategy: matrix: php: [8.4, 8.3] - laravel: [12.*, 11.*, 10.*] + laravel: [12.*, 11.*] dependency-version: [prefer-stable] include: - - laravel: 10.* - testbench: 8.* - laravel: 11.* testbench: 9.* - laravel: 12.* diff --git a/composer.json b/composer.json index d96d01a2..438184d2 100644 --- a/composer.json +++ b/composer.json @@ -7,8 +7,8 @@ "ext-rdkafka": "^6.0", "monolog/monolog": "^3", "mateusjunges/avro-serde-php": "^3.0", - "illuminate/support": "^10.0|^11.0|^12.0", - "illuminate/contracts": "^10.0|^11.0|^12.0" + "illuminate/support": "^11.0|^12.0", + "illuminate/contracts": "^11.0|^12.0" }, "require-dev": { "phpunit/phpunit": "^10.5|^11.5.3", From 232d9ad10d2999b2a5312fdd8b2985ed758187df Mon Sep 17 00:00:00 2001 From: Mateus Junges Date: Tue, 9 Sep 2025 01:05:38 -0300 Subject: [PATCH 4/6] Make publishing async by default (#366) * Make publishing async by default * Fix tests * Update docs * Linting --- docs/producing-messages/producing-messages.md | 18 ++++-- .../producing-messages/publishing-to-kafka.md | 15 ++++- docs/upgrade-guide.md | 30 ++++++++++ src/Contracts/Manager.php | 5 +- src/Facades/Kafka.php | 2 +- src/Factory.php | 37 +++++------- src/Support/Testing/Fakes/KafkaFake.php | 2 +- tests/KafkaFakeTest.php | 2 +- tests/KafkaTest.php | 57 ++++++++++++++++++- 9 files changed, 130 insertions(+), 38 deletions(-) diff --git a/docs/producing-messages/producing-messages.md b/docs/producing-messages/producing-messages.md index 25b12a89..70adca49 100644 --- a/docs/producing-messages/producing-messages.md +++ b/docs/producing-messages/producing-messages.md @@ -14,16 +14,24 @@ Kafka::publish('broker')->onTopic('topic-name') This method returns a `ProducerBuilder` instance, which contains a few methods to configure your kafka producer. The following lines describes these methods. -If you are going to produce a lot of messages to different topics, please use the `asyncPublish` method on the `Junges\Kafka\Facades\Kafka` class: +The default `publish()` method now uses asynchronous publishing for better performance. Messages are queued and flushed when the application terminates: ```php use Junges\Kafka\Facades\Kafka; -Kafka::asyncPublish('broker')->onTopic('topic-name') +Kafka::publish('broker')->onTopic('topic-name') ``` -The main difference is that the Async Producer is a singleton and will only flush the producer when the application is shutting down, instead of after each send. -This reduces the overhead when you want to send a lot of messages in your request handlers. +The async producer is a singleton and will only flush messages when the application is shutting down, instead of after each send. +This reduces overhead when you want to send a lot of messages in your request handlers. + +If you need immediate message flushing (synchronous publishing), use the `publishSync()` method: + +```php +use Junges\Kafka\Facades\Kafka; + +Kafka::publishSync('broker')->onTopic('topic-name') +``` ```+parse @@ -37,6 +45,6 @@ available on the `Kafka` facade (added in v2.2.0). This method will return a fre use Junges\Kafka\Facades\Kafka; Kafka::fresh() - ->asyncPublish('broker') + ->publish('broker') ->onTopic('topic-name') ``` \ No newline at end of file diff --git a/docs/producing-messages/publishing-to-kafka.md b/docs/producing-messages/publishing-to-kafka.md index e95d473f..03c65878 100644 --- a/docs/producing-messages/publishing-to-kafka.md +++ b/docs/producing-messages/publishing-to-kafka.md @@ -18,8 +18,19 @@ $producer = Kafka::publish('broker') $producer->send(); ``` -If you want to send multiple messages, consider using the async producer instead. The default `send` method is recommended for low-throughput systems only, as it -flushes the producer after every message that is sent. +The `publish()` method uses asynchronous publishing for better performance, batching messages and flushing them when the application terminates. +If you need immediate message flushing, use `publishSync()` instead: + +```php +use Junges\Kafka\Facades\Kafka; + +// For immediate flush (synchronous) +$producer = Kafka::publishSync('broker') + ->onTopic('topic') + ->withKafkaKey('kafka-key'); + +$producer->send(); +``` ```+parse diff --git a/docs/upgrade-guide.md b/docs/upgrade-guide.md index 7e28b71f..3699e10d 100644 --- a/docs/upgrade-guide.md +++ b/docs/upgrade-guide.md @@ -3,6 +3,36 @@ title: Upgrade guide weight: 6 --- +## Upgrade to v3.0 from v2.9 + +### Breaking Changes + +- `publish()` is now asynchronous by default. Messages are queued and flushed when the application terminates for better performance +- Removed `asyncPublish()` and `publishAsync()` methods - use `publish()` for async behavior (default) or `publishSync()` for immediate flushing +- Minimum PHP version raised to 8.3 +- Minimum Laravel version raised to 11.0 +- **NEW**: Added `publishSync()` method for synchronous message publishing with immediate flush + +### Migration Guide + +**Before (v2.9):** +```php +// Async publishing +Kafka::asyncPublish()->onTopic('topic')->withBody(['data' => 'value'])->send(); + +// Sync publishing +Kafka::publish()->onTopic('topic')->withBody(['data' => 'value'])->send(); +``` + +**After (v3.0):** +```php +// Async publishing (default behavior) +Kafka::publish()->onTopic('topic')->withBody(['data' => 'value'])->send(); + +// Sync publishing (immediate flush) +Kafka::publishSync()->onTopic('topic')->withBody(['data' => 'value'])->send(); +``` + ## Upgrade to v2.9 from v2.8 - **BREAKING CHANGE**: Deprecated producer batch messages feature has been removed (`MessageBatch`, `sendBatch`, `produceBatch`). Use `Kafka::asyncPublish()` instead for better performance diff --git a/src/Contracts/Manager.php b/src/Contracts/Manager.php index a651e97f..87621d94 100644 --- a/src/Contracts/Manager.php +++ b/src/Contracts/Manager.php @@ -7,9 +7,12 @@ interface Manager /** Returns a new fresh instance of the Manager. */ public function fresh(): self; - /** Creates a new ProducerBuilder instance, setting brokers and topic. */ + /** Creates a new async ProducerBuilder instance, setting brokers and topic. */ public function publish(?string $broker = null): MessageProducer; + /** Creates a synchronous ProducerBuilder instance for immediate message flushing. */ + public function publishSync(?string $broker = null): MessageProducer; + /** Return a ConsumerBuilder instance. */ public function consumer(array $topics = [], ?string $groupId = null, ?string $brokers = null): ConsumerBuilder; diff --git a/src/Facades/Kafka.php b/src/Facades/Kafka.php index 032aaa69..7eb9a7f6 100644 --- a/src/Facades/Kafka.php +++ b/src/Facades/Kafka.php @@ -8,7 +8,7 @@ /** * @method static \Junges\Kafka\Contracts\MessageProducer publish(string $broker = null) - * @method static \Junges\Kafka\Contracts\MessageProducer asyncPublish(string $broker = null) + * @method static \Junges\Kafka\Contracts\MessageProducer publishSync(string $broker = null) * @method static \Junges\Kafka\Factory fresh(string $broker = null) * @method static \Junges\Kafka\Consumers\Builder consumer(array $topics = [], string $groupId = null, string $brokers = null) * @method static void assertPublished(ProducerMessage $expectedMessage = null, callable $callback = null) diff --git a/src/Factory.php b/src/Factory.php index d7eedebe..ac0b7e3f 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -28,44 +28,33 @@ public function publish(?string $broker = null): MessageProducer return Kafka::fake()->publish($broker); } + if ($this->builder instanceof ProducerBuilder) { + return $this->builder; + } + return new ProducerBuilder( - broker: $broker ?? config('kafka.brokers') + broker: $broker ?? config('kafka.brokers'), + asyncProducer: true, ); } - /** Returns a fresh factory instance. */ - public function fresh(): self - { - return new self; - } - - /** - * Creates a new ProducerBuilder instance, optionally setting the brokers. - * The producer will be flushed only when the application terminates, - * and doing SEND does not mean that the message was flushed! - */ - public function asyncPublish(?string $broker = null): MessageProducer + /** Creates a synchronous ProducerBuilder instance for immediate message flushing. */ + public function publishSync(?string $broker = null): MessageProducer { if ($this->shouldFake) { return Kafka::fake()->publish($broker); } - if ($this->builder instanceof ProducerBuilder) { - return $this->builder; - } - - $this->builder = new ProducerBuilder( + return new ProducerBuilder( broker: $broker ?? config('kafka.brokers'), - asyncProducer: true + asyncProducer: false ); - - return $this->builder; } - /** This is an alias for the asyncPublish method. */ - public function publishAsync(?string $broker = null): MessageProducer + /** Returns a fresh factory instance. */ + public function fresh(): self { - return $this->asyncPublish($broker); + return new self; } /** Return a ConsumerBuilder instance. */ diff --git a/src/Support/Testing/Fakes/KafkaFake.php b/src/Support/Testing/Fakes/KafkaFake.php index e44289ae..faa4550f 100644 --- a/src/Support/Testing/Fakes/KafkaFake.php +++ b/src/Support/Testing/Fakes/KafkaFake.php @@ -46,7 +46,7 @@ public function publish(?string $broker = null): ProducerBuilderFake return $this->makeProducerBuilderFake($broker); } - public function asyncPublish(?string $broker = null): ProducerBuilderFake + public function publishSync(?string $broker = null): ProducerBuilderFake { return $this->publish($broker); } diff --git a/tests/KafkaFakeTest.php b/tests/KafkaFakeTest.php index a680e86c..018e1c24 100644 --- a/tests/KafkaFakeTest.php +++ b/tests/KafkaFakeTest.php @@ -58,7 +58,7 @@ public function it_stores_multiple_messages(): void public function it_stores_multiple_messages_when_publishing_async(): void { for ($i = 0; $i < 3; $i++) { - $this->fake->asyncPublish() + $this->fake->publish() ->onTopic('topic') ->withBody('test') ->send(); diff --git a/tests/KafkaTest.php b/tests/KafkaTest.php index f2154e5e..21d58abd 100644 --- a/tests/KafkaTest.php +++ b/tests/KafkaTest.php @@ -56,6 +56,57 @@ public function it_can_publish_messages_to_kafka(): void $this->assertTrue($test); } + #[Test] + public function it_can_publish_messages_synchronously(): void + { + Event::fake(); + + $mockedProducerTopic = m::mock(ProducerTopic::class) + ->shouldReceive('producev')->twice() + ->andReturn(m::self()) + ->getMock(); + + $mockedProducer = m::mock(Producer::class) + ->shouldReceive('newTopic')->with('test')->twice()->andReturn($mockedProducerTopic) + ->shouldReceive('poll')->twice() + ->shouldReceive('flush')->twice() + ->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR) + ->getMock(); + + $this->app->bind(Producer::class, fn () => $mockedProducer); + + $test1 = Kafka::publishSync() + ->onTopic('test') + ->withConfigOptions([ + 'metadata.broker.list' => 'broker', + ]) + ->withKafkaKey(Str::uuid()->toString()) + ->withBodyKey('test', ['test']) + ->withHeaders(['custom' => 'header']) + ->withDebugEnabled() + ->send(); + + $test2 = Kafka::publishSync() + ->onTopic('test') + ->withConfigOptions([ + 'metadata.broker.list' => 'broker', + ]) + ->withKafkaKey(Str::uuid()->toString()) + ->withBodyKey('test', ['test']) + ->withHeaders(['custom' => 'header']) + ->withDebugEnabled() + ->send(); + + Event::assertDispatched(MessagePublished::class); + + $this->assertTrue($test1); + $this->assertTrue($test2); + + Kafka::clearResolvedInstances(); + + Event::assertDispatched(MessagePublished::class); + } + #[Test] public function it_can_publish_messages_asynchronously(): void { @@ -69,13 +120,13 @@ public function it_can_publish_messages_asynchronously(): void $mockedProducer = m::mock(Producer::class) ->shouldReceive('newTopic')->with('test')->twice()->andReturn($mockedProducerTopic) ->shouldReceive('poll')->twice() - ->shouldReceive('flush')->once() + ->shouldReceive('flush')->atLeast()->once() ->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR) ->getMock(); $this->app->bind(Producer::class, fn () => $mockedProducer); - $test1 = Kafka::asyncPublish() + $test1 = Kafka::publish() ->onTopic('test') ->withConfigOptions([ 'metadata.broker.list' => 'broker', @@ -86,7 +137,7 @@ public function it_can_publish_messages_asynchronously(): void ->withDebugEnabled() ->send(); - $test2 = Kafka::asyncPublish() + $test2 = Kafka::publish() ->onTopic('test') ->withConfigOptions([ 'metadata.broker.list' => 'broker', From b32d3a078e2624d0bd740aa7879df7014faf1e04 Mon Sep 17 00:00:00 2001 From: Mateus Junges Date: Tue, 9 Sep 2025 01:17:40 -0300 Subject: [PATCH 5/6] Upgrade to modern php 8.3 (#368) --- composer.json | 4 +-- rector.php | 3 +- src/Commit/RetryableCommitter.php | 2 +- src/Config/Config.php | 8 ++--- src/Consumers/Consumer.php | 14 ++++---- src/Events/CouldNotPublishMessage.php | 8 ++--- src/Events/MessageConsumed.php | 4 +-- src/Events/MessagePublished.php | 4 +-- src/Events/PublishingMessage.php | 4 +-- src/Exceptions/SchemaRegistryException.php | 2 +- .../Serializers/AvroSerializerException.php | 4 +-- .../TransactionFatalErrorException.php | 2 +- .../TransactionShouldBeAbortedException.php | 2 +- .../TransactionShouldBeRetriedException.php | 2 +- src/Message/Message.php | 2 ++ src/Providers/LaravelKafkaServiceProvider.php | 2 ++ src/Support/InfiniteTimer.php | 3 ++ src/Support/Testing/Fakes/BuilderFake.php | 3 ++ tests/Commit/KafkaCommitterTest.php | 8 ++--- tests/Commit/RetryableCommitterTest.php | 2 +- .../SeekToCurrentErrorCommitterTest.php | 2 +- tests/Console/Consumers/OptionsTest.php | 2 ++ tests/Consumers/ConsumerTest.php | 6 ++-- tests/Consumers/ManualCommitTest.php | 4 +-- tests/FailingCommitter.php | 10 +----- tests/KafkaFakeTest.php | 22 ++++--------- tests/KafkaTest.php | 32 +++++++------------ tests/LaravelKafkaTestCase.php | 15 +++------ tests/Message/MessageTest.php | 2 ++ .../Registry/AvroSchemaRegistryTest.php | 2 -- 30 files changed, 77 insertions(+), 103 deletions(-) diff --git a/composer.json b/composer.json index 438184d2..c424d2aa 100644 --- a/composer.json +++ b/composer.json @@ -12,9 +12,9 @@ }, "require-dev": { "phpunit/phpunit": "^10.5|^11.5.3", - "orchestra/testbench": "^7.16|^8.0|^9.0|^10.0", + "orchestra/testbench": "^9.0|^10.0", "predis/predis": "^1", - "rector/rector": "^0.19.8", + "rector/rector": "^2.1", "laravel/pint": "dev-main" }, "minimum-stability": "dev", diff --git a/rector.php b/rector.php index 6f75d570..4d459280 100644 --- a/rector.php +++ b/rector.php @@ -7,7 +7,6 @@ return static function (RectorConfig $rectorConfig): void { $rectorConfig->paths([ __DIR__.'/config', - __DIR__.'/dev', __DIR__.'/src', __DIR__.'/tests', ]); @@ -17,6 +16,6 @@ // define sets of rules $rectorConfig->sets([ - LevelSetList::UP_TO_PHP_81, + LevelSetList::UP_TO_PHP_83, ]); }; diff --git a/src/Commit/RetryableCommitter.php b/src/Commit/RetryableCommitter.php index 6ec60567..df1adc89 100644 --- a/src/Commit/RetryableCommitter.php +++ b/src/Commit/RetryableCommitter.php @@ -10,7 +10,7 @@ class RetryableCommitter implements Committer { - private const RETRYABLE_ERRORS = [ + private const array RETRYABLE_ERRORS = [ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS, diff --git a/src/Config/Config.php b/src/Config/Config.php index c6b60522..cc6a66c7 100644 --- a/src/Config/Config.php +++ b/src/Config/Config.php @@ -9,11 +9,11 @@ class Config { - final public const SASL_PLAINTEXT = 'SASL_PLAINTEXT'; + final public const string SASL_PLAINTEXT = 'SASL_PLAINTEXT'; - final public const SASL_SSL = 'SASL_SSL'; + final public const string SASL_SSL = 'SASL_SSL'; - final public const PRODUCER_ONLY_CONFIG_OPTIONS = [ + final public const array PRODUCER_ONLY_CONFIG_OPTIONS = [ 'transactional.id', 'transaction.timeout.ms', 'enable.idempotence', @@ -36,7 +36,7 @@ class Config 'sticky.partitioning.linger.ms', ]; - final public const CONSUMER_ONLY_CONFIG_OPTIONS = [ + final public const array CONSUMER_ONLY_CONFIG_OPTIONS = [ 'partition.assignment.strategy', 'session.timeout.ms', 'heartbeat.interval.ms', diff --git a/src/Consumers/Consumer.php b/src/Consumers/Consumer.php index a6ef021e..311d6ae2 100644 --- a/src/Consumers/Consumer.php +++ b/src/Consumers/Consumer.php @@ -33,23 +33,23 @@ class Consumer implements MessageConsumer { - private const IGNORABLE_CONSUMER_ERRORS = [ + private const array IGNORABLE_CONSUMER_ERRORS = [ RD_KAFKA_RESP_ERR__PARTITION_EOF, RD_KAFKA_RESP_ERR__TRANSPORT, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, RD_KAFKA_RESP_ERR__TIMED_OUT, ]; - private const CONSUME_STOP_EOF_ERRORS = [ + private const array CONSUME_STOP_EOF_ERRORS = [ RD_KAFKA_RESP_ERR__PARTITION_EOF, RD_KAFKA_RESP_ERR__TIMED_OUT, ]; - private const TIMEOUT_ERRORS = [ + private const array TIMEOUT_ERRORS = [ RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, ]; - private const IGNORABLE_COMMIT_ERRORS = [ + private const array IGNORABLE_COMMIT_ERRORS = [ RD_KAFKA_RESP_ERR__NO_OFFSET, ]; @@ -73,9 +73,9 @@ class Consumer implements MessageConsumer private bool $stopRequested = false; - private ?Closure $whenStopConsuming; + private readonly ?Closure $whenStopConsuming; - private Dispatcher $dispatcher; + private readonly Dispatcher $dispatcher; public function __construct(private readonly Config $config, private readonly MessageDeserializer $deserializer, ?CommitterFactory $committerFactory = null) { @@ -390,7 +390,7 @@ private function buildHeadersForDlq(Message $message, ?Throwable $throwable = nu $throwableHeaders['kafka_throwable_message'] = $throwable->getMessage(); $throwableHeaders['kafka_throwable_code'] = $throwable->getCode(); - $throwableHeaders['kafka_throwable_class_name'] = get_class($throwable); + $throwableHeaders['kafka_throwable_class_name'] = $throwable::class; return array_merge($message->headers ?? [], $throwableHeaders); } diff --git a/src/Events/CouldNotPublishMessage.php b/src/Events/CouldNotPublishMessage.php index 6acfddee..654fcb1f 100644 --- a/src/Events/CouldNotPublishMessage.php +++ b/src/Events/CouldNotPublishMessage.php @@ -4,11 +4,11 @@ use Throwable; -final class CouldNotPublishMessage +final readonly class CouldNotPublishMessage { public function __construct( - public readonly int $errorCode, - public readonly string $message, - public readonly Throwable $throwable, + public int $errorCode, + public string $message, + public Throwable $throwable, ) {} } diff --git a/src/Events/MessageConsumed.php b/src/Events/MessageConsumed.php index f2a07874..521d9b33 100644 --- a/src/Events/MessageConsumed.php +++ b/src/Events/MessageConsumed.php @@ -4,10 +4,10 @@ use Junges\Kafka\Contracts\ConsumerMessage; -final class MessageConsumed +final readonly class MessageConsumed { public function __construct( - public readonly ConsumerMessage $message + public ConsumerMessage $message ) {} public function getMessageIdentifier(): string diff --git a/src/Events/MessagePublished.php b/src/Events/MessagePublished.php index 390d5cef..ad4abc92 100644 --- a/src/Events/MessagePublished.php +++ b/src/Events/MessagePublished.php @@ -4,10 +4,10 @@ use Junges\Kafka\Contracts\ProducerMessage; -final class MessagePublished +final readonly class MessagePublished { public function __construct( - public readonly ProducerMessage $message, + public ProducerMessage $message, ) {} public function getMessageIdentifier(): string diff --git a/src/Events/PublishingMessage.php b/src/Events/PublishingMessage.php index b89ee37d..df348d78 100644 --- a/src/Events/PublishingMessage.php +++ b/src/Events/PublishingMessage.php @@ -4,10 +4,10 @@ use Junges\Kafka\Contracts\ProducerMessage; -final class PublishingMessage +final readonly class PublishingMessage { public function __construct( - public readonly ProducerMessage $message, + public ProducerMessage $message, ) {} public function getMessageIdentifier(): string diff --git a/src/Exceptions/SchemaRegistryException.php b/src/Exceptions/SchemaRegistryException.php index 2b0d2798..366b88cc 100644 --- a/src/Exceptions/SchemaRegistryException.php +++ b/src/Exceptions/SchemaRegistryException.php @@ -4,5 +4,5 @@ class SchemaRegistryException extends LaravelKafkaException { - final public const SCHEMA_MAPPING_NOT_FOUND = 'There is no schema mapping topic: %s, type: %s'; + final public const string SCHEMA_MAPPING_NOT_FOUND = 'There is no schema mapping topic: %s, type: %s'; } diff --git a/src/Exceptions/Serializers/AvroSerializerException.php b/src/Exceptions/Serializers/AvroSerializerException.php index 92d86a49..15f7c5bb 100644 --- a/src/Exceptions/Serializers/AvroSerializerException.php +++ b/src/Exceptions/Serializers/AvroSerializerException.php @@ -6,7 +6,7 @@ class AvroSerializerException extends LaravelKafkaException { - final public const NO_SCHEMA_FOR_TOPIC_MESSAGE = 'There is no %s avro schema defined for the topic %s'; + final public const string NO_SCHEMA_FOR_TOPIC_MESSAGE = 'There is no %s avro schema defined for the topic %s'; - final public const UNABLE_TO_LOAD_DEFINITION_MESSAGE = 'Was unable to load definition for schema %s'; + final public const string UNABLE_TO_LOAD_DEFINITION_MESSAGE = 'Was unable to load definition for schema %s'; } diff --git a/src/Exceptions/Transactions/TransactionFatalErrorException.php b/src/Exceptions/Transactions/TransactionFatalErrorException.php index 08bb7e79..b3430597 100644 --- a/src/Exceptions/Transactions/TransactionFatalErrorException.php +++ b/src/Exceptions/Transactions/TransactionFatalErrorException.php @@ -7,7 +7,7 @@ final class TransactionFatalErrorException extends LaravelKafkaException { - private const FATAL_EXCEPTION_MESSAGE = 'Transaction failed with a fatal error. You must create a new producer as this one can not be used anymore. [%s]'; + private const string FATAL_EXCEPTION_MESSAGE = 'Transaction failed with a fatal error. You must create a new producer as this one can not be used anymore. [%s]'; public static function new(KafkaErrorException $baseException): self { diff --git a/src/Exceptions/Transactions/TransactionShouldBeAbortedException.php b/src/Exceptions/Transactions/TransactionShouldBeAbortedException.php index f03cbdcb..944b8562 100644 --- a/src/Exceptions/Transactions/TransactionShouldBeAbortedException.php +++ b/src/Exceptions/Transactions/TransactionShouldBeAbortedException.php @@ -7,7 +7,7 @@ final class TransactionShouldBeAbortedException extends LaravelKafkaException { - private const ABORTABLE_EXCEPTION_MESSAGE = 'Transaction failed. You must abort your current transaction and start a new one. [%s]'; + private const string ABORTABLE_EXCEPTION_MESSAGE = 'Transaction failed. You must abort your current transaction and start a new one. [%s]'; public static function new(KafkaErrorException $baseException): self { diff --git a/src/Exceptions/Transactions/TransactionShouldBeRetriedException.php b/src/Exceptions/Transactions/TransactionShouldBeRetriedException.php index fd780236..51622582 100644 --- a/src/Exceptions/Transactions/TransactionShouldBeRetriedException.php +++ b/src/Exceptions/Transactions/TransactionShouldBeRetriedException.php @@ -7,7 +7,7 @@ final class TransactionShouldBeRetriedException extends LaravelKafkaException { - private const RETRIABLE_EXCEPTION_MESSAGE = 'This transaction failed, but can be retried. [%s]'; + private const string RETRIABLE_EXCEPTION_MESSAGE = 'This transaction failed, but can be retried. [%s]'; public static function new(KafkaErrorException $baseException): self { diff --git a/src/Message/Message.php b/src/Message/Message.php index ca2c71eb..0c431b9c 100644 --- a/src/Message/Message.php +++ b/src/Message/Message.php @@ -8,6 +8,7 @@ use JetBrains\PhpStorm\Pure; use Junges\Kafka\AbstractMessage; use Junges\Kafka\Contracts\ProducerMessage; +use Override; class Message extends AbstractMessage implements Arrayable, ProducerMessage { @@ -81,6 +82,7 @@ public function withHeader(string $key, string|int|float $value): ProducerMessag return $this; } + #[Override] public function getHeaders(): ?array { // Here we insert an uuid to be used to uniquely identify this message. If the diff --git a/src/Providers/LaravelKafkaServiceProvider.php b/src/Providers/LaravelKafkaServiceProvider.php index 08c08738..4a782af8 100644 --- a/src/Providers/LaravelKafkaServiceProvider.php +++ b/src/Providers/LaravelKafkaServiceProvider.php @@ -17,6 +17,7 @@ use Junges\Kafka\Message\Deserializers\JsonDeserializer; use Junges\Kafka\Message\Message; use Junges\Kafka\Message\Serializers\JsonSerializer; +use Override; class LaravelKafkaServiceProvider extends ServiceProvider { @@ -32,6 +33,7 @@ public function boot(): void } } + #[Override] public function register(): void { $this->app->bind(MessageSerializer::class, fn () => new JsonSerializer); diff --git a/src/Support/InfiniteTimer.php b/src/Support/InfiniteTimer.php index 8dd75281..849b2e0f 100644 --- a/src/Support/InfiniteTimer.php +++ b/src/Support/InfiniteTimer.php @@ -2,8 +2,11 @@ namespace Junges\Kafka\Support; +use Override; + class InfiniteTimer extends Timer { + #[Override] public function isTimedOut(): bool { return false; diff --git a/src/Support/Testing/Fakes/BuilderFake.php b/src/Support/Testing/Fakes/BuilderFake.php index 65b79d29..dc202bb2 100644 --- a/src/Support/Testing/Fakes/BuilderFake.php +++ b/src/Support/Testing/Fakes/BuilderFake.php @@ -7,6 +7,7 @@ use Junges\Kafka\Consumers\CallableConsumer; use Junges\Kafka\Contracts\ConsumerBuilder as ConsumerBuilderContract; use Junges\Kafka\Contracts\MessageConsumer; +use Override; class BuilderFake extends Builder implements ConsumerBuilderContract { @@ -14,6 +15,7 @@ class BuilderFake extends Builder implements ConsumerBuilderContract private array $messages = []; /** {@inheritDoc} */ + #[Override] public static function create(?string $brokers, array $topics = [], ?string $groupId = null): self { return new self( @@ -32,6 +34,7 @@ public function setMessages(array $messages): self } /** Build the Kafka consumer. */ + #[Override] public function build(): MessageConsumer { $config = new Config( diff --git a/tests/Commit/KafkaCommitterTest.php b/tests/Commit/KafkaCommitterTest.php index 43b162a4..9f00765e 100644 --- a/tests/Commit/KafkaCommitterTest.php +++ b/tests/Commit/KafkaCommitterTest.php @@ -24,9 +24,7 @@ public function it_can_commit(): void ->shouldReceive('commit')->once() ->andReturnSelf(); - $this->app->bind(KafkaConsumer::class, function () use ($kafkaConsumer) { - return $kafkaConsumer->getMock(); - }); + $this->app->bind(KafkaConsumer::class, fn () => $kafkaConsumer->getMock()); $config = new Config( broker: 'broker', @@ -54,9 +52,7 @@ public function it_can_commit_to_dlq(): void ->shouldReceive('commit')->once() ->andReturnSelf(); - $this->app->bind(KafkaConsumer::class, function () use ($kafkaConsumer) { - return $kafkaConsumer->getMock(); - }); + $this->app->bind(KafkaConsumer::class, fn () => $kafkaConsumer->getMock()); $config = new Config( broker: 'broker', diff --git a/tests/Commit/RetryableCommitterTest.php b/tests/Commit/RetryableCommitterTest.php index 090c255a..dce1bbe1 100644 --- a/tests/Commit/RetryableCommitterTest.php +++ b/tests/Commit/RetryableCommitterTest.php @@ -68,7 +68,7 @@ public function it_should_progressively_wait_for_the_next_retry(): void try { $retryableCommitter->commitMessage(new Message, true); - } catch (RdKafkaException $exception) { + } catch (RdKafkaException) { } $expectedSleeps = [1e6, 2e6, 4e6, 8e6, 16e6, 32e6]; diff --git a/tests/Commit/SeekToCurrentErrorCommitterTest.php b/tests/Commit/SeekToCurrentErrorCommitterTest.php index 2341a92c..cbb79529 100644 --- a/tests/Commit/SeekToCurrentErrorCommitterTest.php +++ b/tests/Commit/SeekToCurrentErrorCommitterTest.php @@ -60,6 +60,6 @@ public function it_passes_dlq_commits(): void $seekToCurrentErrorCommitter = new SeekToCurrentErrorCommitter($mockedKafkaConsumer, $mockedCommitter); - $seekToCurrentErrorCommitter->commitDlq(new Message, true); + $seekToCurrentErrorCommitter->commitDlq(new Message); } } diff --git a/tests/Console/Consumers/OptionsTest.php b/tests/Console/Consumers/OptionsTest.php index 87437c5d..9ffa0e86 100644 --- a/tests/Console/Consumers/OptionsTest.php +++ b/tests/Console/Consumers/OptionsTest.php @@ -5,12 +5,14 @@ use Junges\Kafka\Console\Commands\KafkaConsumer\Options; use Junges\Kafka\Tests\Fakes\FakeHandler; use Junges\Kafka\Tests\LaravelKafkaTestCase; +use Override; use PHPUnit\Framework\Attributes\Test; class OptionsTest extends LaravelKafkaTestCase { private array $config; + #[Override] protected function setUp(): void { parent::setUp(); diff --git a/tests/Consumers/ConsumerTest.php b/tests/Consumers/ConsumerTest.php index 7ef1ae24..d089d51a 100644 --- a/tests/Consumers/ConsumerTest.php +++ b/tests/Consumers/ConsumerTest.php @@ -559,7 +559,7 @@ public function commitAsync(mixed $messageOrOffsets = null): void } }; - $customCommitterFactory = new class($customCommitter) implements CommitterFactory + $customCommitterFactory = new readonly class($customCommitter) implements CommitterFactory { public function __construct(private \Junges\Kafka\Contracts\Committer $committer) {} @@ -701,8 +701,6 @@ private function mockConsumerWithMessageAndPartitions(Message $message, array $p ->andReturn($partitions) ->getMock(); - $this->app->bind(KafkaConsumer::class, function () use ($mockedKafkaConsumer) { - return $mockedKafkaConsumer; - }); + $this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer); } } diff --git a/tests/Consumers/ManualCommitTest.php b/tests/Consumers/ManualCommitTest.php index 5ff2c89a..d7ecf934 100644 --- a/tests/Consumers/ManualCommitTest.php +++ b/tests/Consumers/ManualCommitTest.php @@ -440,7 +440,7 @@ public function it_handles_commit_errors_gracefully(): void function (ConsumerMessage $message, Consumer $consumer) use (&$exceptionThrown) { try { $consumer->commit($message); - } catch (Throwable $e) { + } catch (Throwable) { $exceptionThrown = true; } }, @@ -495,7 +495,7 @@ public function it_ignores_no_offset_commit_errors(): void function (ConsumerMessage $message, Consumer $consumer) use (&$noExceptionThrown) { try { $consumer->commit($message); - } catch (\RdKafka\Exception $e) { + } catch (\RdKafka\Exception) { $noExceptionThrown = false; } }, diff --git a/tests/FailingCommitter.php b/tests/FailingCommitter.php index 601b3ada..c78d3aa6 100644 --- a/tests/FailingCommitter.php +++ b/tests/FailingCommitter.php @@ -8,21 +8,13 @@ final class FailingCommitter implements Committer { - private int $timesToFail; - - private Exception $failure; - private int $timesTriedToCommitMessage = 0; private int $timesTriedToCommitDlq = 0; private int $commitCount = 0; - public function __construct(Exception $failure, int $timesToFail) - { - $this->failure = $failure; - $this->timesToFail = $timesToFail; - } + public function __construct(private readonly Exception $failure, private readonly int $timesToFail) {} /** * @throws Exception diff --git a/tests/KafkaFakeTest.php b/tests/KafkaFakeTest.php index 018e1c24..411b20bc 100644 --- a/tests/KafkaFakeTest.php +++ b/tests/KafkaFakeTest.php @@ -11,6 +11,7 @@ use Junges\Kafka\Message\ConsumedMessage; use Junges\Kafka\Message\Message; use Junges\Kafka\Support\Testing\Fakes\KafkaFake; +use Override; use PHPUnit\Framework\Attributes\Test; use PHPUnit\Framework\Constraint\ExceptionMessageIsOrContains; use PHPUnit\Framework\ExpectationFailedException; @@ -21,6 +22,7 @@ final class KafkaFakeTest extends LaravelKafkaTestCase private MessageConsumer $consumer; + #[Override] protected function setUp(): void { parent::setUp(); @@ -129,18 +131,12 @@ public function it_can_perform_assertions_on_published_messages(): void $this->fake->assertPublished($producer->getMessage()); - $this->fake->assertPublished($producer->getMessage(), function ($message) use ($uuid) { - return $message->getKey() === $uuid; - }); + $this->fake->assertPublished($producer->getMessage(), fn ($message) => $message->getKey() === $uuid); - $this->fake->assertPublished($message = $producer->getMessage(), function () use ($message, $uuid) { - return $message->getKey() === $uuid; - }); + $this->fake->assertPublished($message = $producer->getMessage(), fn () => $message->getKey() === $uuid); try { - $this->fake->assertPublished($message = $producer->getMessage(), function () use ($message) { - return $message->getKey() === 'not-published-uuid'; - }); + $this->fake->assertPublished($message = $producer->getMessage(), fn () => $message->getKey() === 'not-published-uuid'); } catch (ExpectationFailedException $exception) { $this->assertThat($exception, new ExceptionMessageIsOrContains('The expected message was not published.')); } @@ -226,16 +222,12 @@ public function i_can_perform_assertions_using_assert_published_on(): void $this->fake->assertPublishedOn('topic', $producer->getMessage()); try { - $this->fake->assertPublishedOn('topic', $producer->getMessage(), function ($message) { - return $message->getKey() === 'different-key'; - }); + $this->fake->assertPublishedOn('topic', $producer->getMessage(), fn ($message) => $message->getKey() === 'different-key'); } catch (ExpectationFailedException $exception) { $this->assertThat($exception, new ExceptionMessageIsOrContains('The expected message was not published.')); } - $this->fake->assertPublishedOn('topic', $producer->getMessage(), function ($message) use ($uuid) { - return $message->getKey() === $uuid; - }); + $this->fake->assertPublishedOn('topic', $producer->getMessage(), fn ($message) => $message->getKey() === $uuid); } #[Test] diff --git a/tests/KafkaTest.php b/tests/KafkaTest.php index 21d58abd..1cfb7d0b 100644 --- a/tests/KafkaTest.php +++ b/tests/KafkaTest.php @@ -239,9 +239,7 @@ public function i_can_set_the_entire_message_with_message_object(): void ->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR) ->getMock(); - $this->app->bind(Producer::class, function () use ($mockedProducer) { - return $mockedProducer; - }); + $this->app->bind(Producer::class, fn () => $mockedProducer); $message = Message::create() ->withHeaders(['foo' => 'bar']) @@ -292,9 +290,7 @@ public function i_can_disable_debug_using_with_debug_disabled_method(): void ->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR) ->getMock(); - $this->app->bind(Producer::class, function () use ($mockedProducer) { - return $mockedProducer; - }); + $this->app->bind(Producer::class, fn () => $mockedProducer); /** @var ProducerBuilder $producer */ $producer = Kafka::publish() @@ -383,11 +379,9 @@ public function producer_throws_exception_if_message_could_not_be_published(): v Kafka::publish()->onTopic('test')->withBodyKey('foo', 'bar')->send(); - Event::assertDispatched(CouldNotPublishMessageEvent::class, function (CouldNotPublishMessageEvent $event) use ($expectedMessage) { - return $event->throwable instanceof CouldNotPublishMessage - && $event->errorCode === RD_KAFKA_RESP_ERR__FAIL - && $event->message === $expectedMessage; - }); + Event::assertDispatched(CouldNotPublishMessageEvent::class, fn (CouldNotPublishMessageEvent $event) => $event->throwable instanceof CouldNotPublishMessage + && $event->errorCode === RD_KAFKA_RESP_ERR__FAIL + && $event->message === $expectedMessage); } #[Test] @@ -395,13 +389,11 @@ public function macro(): void { $sasl = new Sasl(username: 'username', password: 'password', mechanisms: 'mechanisms'); - Kafka::macro('defaultProducer', function () { - return $this->publish()->withSasl( - username: 'username', - password: 'password', - mechanisms: 'mechanisms', - ); - }); + Kafka::macro('defaultProducer', fn () => $this->publish()->withSasl( + username: 'username', + password: 'password', + mechanisms: 'mechanisms', + )); $producer = Kafka::defaultProducer(); @@ -418,9 +410,7 @@ public function it_stores_published_messages_when_using_macros(): void ->onTopic('topic') ->withKey($uuid = Str::uuid()->toString()); - Kafka::macro('testProducer', function () use ($expectedMessage) { - return $this->publish()->withMessage($expectedMessage); - }); + Kafka::macro('testProducer', fn () => $this->publish()->withMessage($expectedMessage)); Kafka::fake(); Kafka::testProducer()->send(); diff --git a/tests/LaravelKafkaTestCase.php b/tests/LaravelKafkaTestCase.php index c70d0eb8..9fbcadd8 100644 --- a/tests/LaravelKafkaTestCase.php +++ b/tests/LaravelKafkaTestCase.php @@ -8,6 +8,7 @@ use Junges\Kafka\Providers\LaravelKafkaServiceProvider; use Mockery as m; use Orchestra\Testbench\TestCase as Orchestra; +use Override; use RdKafka\Conf; use RdKafka\KafkaConsumer; use RdKafka\Message; @@ -16,6 +17,7 @@ abstract class LaravelKafkaTestCase extends Orchestra { + #[Override] protected function setUp(): void { parent::setUp(); @@ -57,9 +59,7 @@ protected function mockProducer(): void ->shouldReceive('produce') ->andReturn(); - $this->app->bind(Producer::class, function () use ($mockedProducer) { - return $mockedProducer->getMock(); - }); + $this->app->bind(Producer::class, fn () => $mockedProducer->getMock()); $this->mockKafkaProducer(); } @@ -82,9 +82,7 @@ protected function mockKafkaProducer(): void ->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR) ->getMock(); - $this->app->bind(KafkaProducer::class, function () use ($mockedKafkaProducer) { - return $mockedKafkaProducer; - }); + $this->app->bind(KafkaProducer::class, fn () => $mockedKafkaProducer); } protected function mockConsumerWithMessageFailingCommit(Message $message): void @@ -99,9 +97,7 @@ protected function mockConsumerWithMessageFailingCommit(Message $message): void ->never() ->getMock(); - $this->app->bind(KafkaConsumer::class, function () use ($mockedKafkaConsumer) { - return $mockedKafkaConsumer; - }); + $this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer); } protected function mockConsumerWithMessage(Message ...$message): void @@ -125,7 +121,6 @@ protected function getPropertyWithReflection(string $property, object $object): { $reflection = new ReflectionClass($object); $reflectionProperty = $reflection->getProperty($property); - $reflectionProperty->setAccessible(true); return $reflectionProperty->getValue($object); } diff --git a/tests/Message/MessageTest.php b/tests/Message/MessageTest.php index bed221d1..0f20db35 100644 --- a/tests/Message/MessageTest.php +++ b/tests/Message/MessageTest.php @@ -5,12 +5,14 @@ use Illuminate\Support\Str; use Junges\Kafka\Message\Message; use Junges\Kafka\Tests\LaravelKafkaTestCase; +use Override; use PHPUnit\Framework\Attributes\Test; final class MessageTest extends LaravelKafkaTestCase { private Message $message; + #[Override] protected function setUp(): void { parent::setUp(); diff --git a/tests/Message/Registry/AvroSchemaRegistryTest.php b/tests/Message/Registry/AvroSchemaRegistryTest.php index b5ba113e..b2f72c26 100644 --- a/tests/Message/Registry/AvroSchemaRegistryTest.php +++ b/tests/Message/Registry/AvroSchemaRegistryTest.php @@ -25,7 +25,6 @@ public function add_body_schema_mapping_for_topic(): void $registry->addBodySchemaMappingForTopic('test', $schema); $reflectionProperty = new ReflectionProperty($registry, 'schemaMapping'); - $reflectionProperty->setAccessible(true); $schemaMapping = $reflectionProperty->getValue($registry); @@ -46,7 +45,6 @@ public function add_key_schema_mapping_for_topic(): void $registry->addKeySchemaMappingForTopic('test2', $schema); $reflectionProperty = new ReflectionProperty($registry, 'schemaMapping'); - $reflectionProperty->setAccessible(true); $schemaMapping = $reflectionProperty->getValue($registry); From a36e3deb341f8575c3406f401e0f8d656ae98d6e Mon Sep 17 00:00:00 2001 From: Mateus Junges Date: Thu, 11 Sep 2025 17:10:45 -0300 Subject: [PATCH 6/6] wip --- rector.php | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rector.php b/rector.php index 4d459280..529cb230 100644 --- a/rector.php +++ b/rector.php @@ -3,6 +3,7 @@ use Rector\CodeQuality\Rector\Class_\InlineConstructorDefaultToPropertyRector; use Rector\Config\RectorConfig; use Rector\Set\ValueObject\LevelSetList; +use Rector\TypeDeclaration\Rector\ClassMethod\ReturnNeverTypeRector; return static function (RectorConfig $rectorConfig): void { $rectorConfig->paths([ @@ -11,11 +12,13 @@ __DIR__.'/tests', ]); - // register a single rule $rectorConfig->rule(InlineConstructorDefaultToPropertyRector::class); - // define sets of rules $rectorConfig->sets([ LevelSetList::UP_TO_PHP_83, ]); + + $rectorConfig->skip([ + ReturnNeverTypeRector::class, + ]); };