Skip to content
This repository was archived by the owner on Jun 10, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* @method int getMetadataRequestTimeoutMs()
* @method int getMetadataRefreshIntervalMs()
* @method int getMetadataMaxAgeMs()
* @method bool getAutoCreateTopicsEnable()
* @method string getSecurityProtocol()
* @method bool getSslEnable()
* @method void setSslEnable(bool $sslEnable)
Expand Down Expand Up @@ -84,6 +85,7 @@ abstract class Config
'metadataRequestTimeoutMs' => 60000,
'metadataRefreshIntervalMs' => 300000,
'metadataMaxAgeMs' => -1,
'autoCreateTopicsEnable' => true,
'securityProtocol' => self::SECURITY_PROTOCOL_PLAINTEXT,
'sslEnable' => false, // this config item will override, don't config it.
'sslLocalCert' => '',
Expand Down Expand Up @@ -241,6 +243,11 @@ public function setMetadataMaxAgeMs(int $metadataMaxAgeMs): void
static::$options['metadataMaxAgeMs'] = $metadataMaxAgeMs;
}

public function setAutoCreateTopicsEnable(bool $flag = true): void
{
static::$options['autoCreateTopicsEnable'] = $flag;
}

/**
* @throws Exception\Config
*/
Expand Down
6 changes: 5 additions & 1 deletion src/Producer/RecordValidator.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Kafka\Producer;

use Kafka\Exception;
use Kafka\ProducerConfig;
use function is_string;
use function trim;

Expand All @@ -30,7 +31,10 @@ public function validate(array $record, array $topicList): void
throw Exception\InvalidRecordInSet::missingTopic();
}

if (! isset($topicList[$record['topic']])) {
/** @var ProducerConfig $config */
$config = ProducerConfig::getInstance();

if (! isset($topicList[$record['topic']]) && ! $config->getAutoCreateTopicsEnable()) {
throw Exception\InvalidRecordInSet::nonExististingTopic($record['topic']);
}

Expand Down
18 changes: 18 additions & 0 deletions tests/Base/Producer/RecordValidatorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use Kafka\Exception\InvalidRecordInSet;
use Kafka\Producer\RecordValidator;
use Kafka\ProducerConfig;
use PHPUnit\Framework\TestCase;

final class RecordValidatorTest extends TestCase
Expand All @@ -16,6 +17,10 @@ public function setUp(): void
{
$this->recordValidator = new RecordValidator();

/** @var ProducerConfig $config */
$config = ProducerConfig::getInstance();
$config->setAutoCreateTopicsEnable(false);

parent::setUp();
}

Expand Down Expand Up @@ -61,4 +66,17 @@ public function invalidRecordThrowsExceptionDataProvider(): array
],
];
}

/**
* @doesNotPerformAssertions
* @phpcsSuppress SlevomatCodingStandard.TypeHints.TypeHintDeclaration.UselessDocComment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't suppress CS violations

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took it from the test above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh I get it, it's because we're using an old version of doctrine/coding-standard. Let's keep it for now and I created #216 to handle that. Thanks!

*/
public function testValidateNonExistingTopicWhenAutoCreateTopicsEnabled(): void
{
/** @var ProducerConfig $config */
$config = ProducerConfig::getInstance();
$config->setAutoCreateTopicsEnable(true);

$this->recordValidator->validate(['topic' => 'test', 'value' => 'a value'], []);
}
}
2 changes: 2 additions & 0 deletions tests/Functional/Ticket/GH181Test.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ public function setUp(): void
);
}

/** @var ProducerConfig $config */
$config = ProducerConfig::getInstance();
$config->setMetadataBrokerList($brokers);
$config->setBrokerVersion($version);
$config->setAutoCreateTopicsEnable(false);

parent::setUp();
}
Expand Down