Skip to content

Commit 4d25135

Browse files
kardymondsGrigoriyPA
authored andcommitted
YQ-4735 Json format settings / skip jsons errors (ydb-platform#28329)
Co-authored-by: Pisarenko Grigoriy <grigoriypisar@ydb.tech>
1 parent 798efb5 commit 4d25135

File tree

20 files changed

+148
-44
lines changed

20 files changed

+148
-44
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ struct TObjectStorageExternalSource : public IExternalSource {
272272
continue;
273273
}
274274

275+
if (key == "skip.json.errors"sv) {
276+
continue;
277+
}
278+
275279
if (matchAllSettings) {
276280
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "unknown format setting " + key));
277281
}

ydb/core/fq/libs/config/protos/row_dispatcher.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ message TJsonParserConfig {
2323
uint64 BatchSizeBytes = 1; // default 1 MiB
2424
uint64 BatchCreationTimeoutMs = 2;
2525
uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6
26-
bool SkipErrors = 4;
2726
}
2827

2928
message TCompileServiceConfig {

ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ namespace NFq {
99

1010
TRowDispatcherSettings::TJsonParserSettings::TJsonParserSettings(const NConfig::TJsonParserConfig& config)
1111
: BatchCreationTimeout(TDuration::MilliSeconds(config.GetBatchCreationTimeoutMs()))
12-
, SkipErrors(config.GetSkipErrors())
1312
{
1413
if (config.GetBatchSizeBytes()) {
1514
BatchSizeBytes = config.GetBatchSizeBytes();

ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ class TRowDispatcherSettings {
3333
YDB_ACCESSOR(ui64, BatchSizeBytes, 1_MB);
3434
YDB_ACCESSOR(TDuration, BatchCreationTimeout, TDuration::Seconds(1));
3535
YDB_ACCESSOR(ui64, BufferCellCount, 1000'000);
36-
YDB_ACCESSOR(bool, SkipErrors, false);
3736
};
3837

3938
class TCompileServiceSettings {

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -677,10 +677,10 @@ ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext&
677677
return ITopicFormatHandler::TPtr(handler);
678678
}
679679

680-
TFormatHandlerConfig CreateFormatHandlerConfig(const TRowDispatcherSettings& rowDispatcherConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NActors::TActorId compileServiceId) {
680+
TFormatHandlerConfig CreateFormatHandlerConfig(const TRowDispatcherSettings& rowDispatcherConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NActors::TActorId compileServiceId, bool skipJsonErrors) {
681681
return {
682682
.FunctionRegistry = functionRegistry,
683-
.JsonParserConfig = CreateJsonParserConfig(rowDispatcherConfig.GetJsonParser(), functionRegistry),
683+
.JsonParserConfig = CreateJsonParserConfig(rowDispatcherConfig.GetJsonParser(), functionRegistry, skipJsonErrors),
684684
.FiltersConfig = {
685685
.CompileServiceId = compileServiceId
686686
}

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ struct TFormatHandlerConfig {
7878
};
7979

8080
ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, const TCountersDesc& counters);
81-
TFormatHandlerConfig CreateFormatHandlerConfig(const TRowDispatcherSettings& rowDispatcherConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NActors::TActorId compileServiceId);
81+
TFormatHandlerConfig CreateFormatHandlerConfig(const TRowDispatcherSettings& rowDispatcherConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NActors::TActorId compileServiceId, bool skipJsonErrors);
8282

8383
namespace NTests {
8484

ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -743,13 +743,13 @@ TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr cons
743743
return ITopicParser::TPtr(parser);
744744
}
745745

746-
TJsonParserConfig CreateJsonParserConfig(const TRowDispatcherSettings::TJsonParserSettings& parserConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) {
746+
TJsonParserConfig CreateJsonParserConfig(const TRowDispatcherSettings::TJsonParserSettings& parserConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, bool skipErrors) {
747747
return {
748748
.FunctionRegistry = functionRegistry,
749749
.BatchSize = parserConfig.GetBatchSizeBytes(),
750750
.LatencyLimit = parserConfig.GetBatchCreationTimeout(),
751751
.BufferCellCount = parserConfig.GetBufferCellCount(),
752-
.SkipErrors = parserConfig.GetSkipErrors()
752+
.SkipErrors = skipErrors
753753
};
754754
}
755755

ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ struct TJsonParserConfig {
2121
};
2222

2323
TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters);
24-
TJsonParserConfig CreateJsonParserConfig(const TRowDispatcherSettings::TJsonParserSettings& parserConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry);
24+
TJsonParserConfig CreateJsonParserConfig(const TRowDispatcherSettings::TJsonParserSettings& parserConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, bool skipErrors);
2525

2626
} // namespace NFq::NRowDispatcher

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession>, NYql::TTopicEven
267267
const NYql::IPqGateway::TPtr PqGateway;
268268
const std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
269269
const TRowDispatcherSettings Config;
270-
const TFormatHandlerConfig FormatHandlerConfig;
270+
const TActorId CompileServiceActorId;
271+
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
271272
const i64 BufferSize;
272273
TString LogPrefix;
273274

@@ -285,6 +286,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession>, NYql::TTopicEven
285286
ui64 QueuedBytes = 0;
286287
TMaybe<TString> ConsumerName;
287288
TInstant StartingMessageTimestamp;
289+
TMaybe<bool> SkipJsonErrors;
288290

289291
// Metrics
290292
TInstant WaitEventStartedAt;
@@ -407,7 +409,8 @@ TTopicSession::TTopicSession(
407409
, PqGateway(pqGateway)
408410
, CredentialsProviderFactory(credentialsProviderFactory)
409411
, Config(config)
410-
, FormatHandlerConfig(CreateFormatHandlerConfig(config, functionRegistry, compileServiceActorId))
412+
, CompileServiceActorId(compileServiceActorId)
413+
, FunctionRegistry(functionRegistry)
411414
, BufferSize(maxBufferSize)
412415
, LogPrefix("TopicSession")
413416
, Counters(counters)
@@ -783,9 +786,10 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
783786
auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive<TClientsInfo>(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup, offset)}).first->second;
784787
auto formatIt = FormatHandlers.find(handlerSettings);
785788
if (formatIt == FormatHandlers.end()) {
789+
auto config = CreateFormatHandlerConfig(Config, FunctionRegistry, CompileServiceActorId, source.GetSkipJsonErrors());
786790
formatIt = FormatHandlers.emplace(handlerSettings, CreateTopicFormatHandler(
787791
ActorContext(),
788-
FormatHandlerConfig,
792+
config,
789793
handlerSettings,
790794
{.CountersRoot = CountersRoot, .CountersSubgroup = Metrics.PartitionGroup}
791795
)).first;
@@ -797,6 +801,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
797801
}
798802

799803
ConsumerName = source.GetConsumerName();
804+
SkipJsonErrors = source.GetSkipJsonErrors();
800805
SendStatistics();
801806
}
802807

@@ -981,6 +986,11 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr&
981986
return false;
982987
}
983988

989+
if (SkipJsonErrors && SkipJsonErrors != source.GetSkipJsonErrors()) {
990+
LOG_ROW_DISPATCHER_INFO("Different skip json errors mode, expected " << SkipJsonErrors << ", actual " << source.GetSkipJsonErrors() << ", send error");
991+
SendSessionError(ev->Sender, TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Use the same skip json errors settings in all queries via RD (current mode " << SkipJsonErrors << ")"), false);
992+
return false;
993+
}
984994
return true;
985995
}
986996

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,12 @@ class TFixture : public NTests::TBaseFixture {
4949
RowDispatcherActorId = Runtime.AllocateEdgeActor();
5050
}
5151

52-
void Init(const TString& topicPath, ui64 maxSessionUsedMemory = std::numeric_limits<ui64>::max(), bool skipErrors = false) {
52+
void Init(const TString& topicPath, ui64 maxSessionUsedMemory = std::numeric_limits<ui64>::max()) {
5353
TopicPath = topicPath;
5454
Config.SetTimeoutBeforeStartSessionSec(TimeoutBeforeStartSessionSec);
5555
Config.SetMaxSessionUsedMemory(maxSessionUsedMemory);
5656
Config.SetSendStatusPeriodSec(2);
5757
Config.SetWithoutConsumer(false);
58-
Config.MutableJsonParser()->SetSkipErrors(skipErrors);
5958
Config.MutableJsonParser()->SetBatchCreationTimeoutMs(100);
6059

6160
auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory;
@@ -124,7 +123,7 @@ class TFixture : public NTests::TBaseFixture {
124123
}
125124
}
126125

127-
NYql::NPq::NProto::TDqPqTopicSource BuildSource(bool emptyPredicate = false, const TString& consumer = DefaultPqConsumer) {
126+
NYql::NPq::NProto::TDqPqTopicSource BuildSource(bool emptyPredicate = false, const TString& consumer = DefaultPqConsumer, bool skipErrors = false) {
128127
NYql::NPq::NProto::TDqPqTopicSource settings;
129128
settings.SetEndpoint(GetDefaultPqEndpoint());
130129
settings.SetTopicPath(TopicPath);
@@ -139,6 +138,7 @@ class TFixture : public NTests::TBaseFixture {
139138
if (!emptyPredicate) {
140139
settings.SetPredicate("TRUE");
141140
}
141+
settings.SetSkipJsonErrors(skipErrors);
142142
return settings;
143143
}
144144

@@ -631,8 +631,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
631631
Y_UNIT_TEST_F(WrongJson, TRealTopicFixture) {
632632
const TString topicName = "wrong_json";
633633
PQCreateStream(topicName);
634-
Init(topicName, std::numeric_limits<ui64>::max(), true);
635-
auto source = BuildSource();
634+
Init(topicName);
635+
auto source = BuildSource(false, DefaultPqConsumer, true);
636636
StartSession(ReadActorId1, source);
637637

638638
auto writeRead = [&](const std::vector<TString>& input, const TBatch& output) {
@@ -669,8 +669,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
669669
Y_UNIT_TEST_F(WrongJsonOffset, TRealTopicFixture) {
670670
const TString topicName = "wrong_json_offset";
671671
PQCreateStream(topicName);
672-
Init(topicName, std::numeric_limits<ui64>::max(), true);
673-
auto source = BuildSource();
672+
Init(topicName);
673+
auto source = BuildSource(false, DefaultPqConsumer, true);
674674
StartSession(ReadActorId1, source);
675675

676676
TString wrongJson{"wrong"};

0 commit comments

Comments
 (0)