Skip to content

Commit e21ecb0

Browse files
YQ-4735 Json format settings / skip jsons errors / to stable (ydb-platform#28564)
Co-authored-by: Pisarenko Grigoriy <grigoriypisar@ydb.tech>
1 parent 5951eb7 commit e21ecb0

File tree

17 files changed

+118
-42
lines changed

17 files changed

+118
-42
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,10 @@ struct TObjectStorageExternalSource : public IExternalSource {
262262
continue;
263263
}
264264

265+
if (key == "skip.json.errors"sv) {
266+
continue;
267+
}
268+
265269
if (matchAllSettings) {
266270
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "unknown format setting " + key));
267271
}

ydb/core/fq/libs/config/protos/row_dispatcher.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ message TJsonParserConfig {
2323
uint64 BatchSizeBytes = 1; // default 1 MiB
2424
uint64 BatchCreationTimeoutMs = 2;
2525
uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6
26-
bool SkipErrors = 4;
2726
}
2827

2928
message TCompileServiceConfig {

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -678,10 +678,10 @@ ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext&
678678
return ITopicFormatHandler::TPtr(handler);
679679
}
680680

681-
TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConfig& rowDispatcherConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NActors::TActorId compileServiceId) {
681+
TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConfig& rowDispatcherConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NActors::TActorId compileServiceId, bool skipJsonErrors) {
682682
return {
683683
.FunctionRegistry = functionRegistry,
684-
.JsonParserConfig = CreateJsonParserConfig(rowDispatcherConfig.GetJsonParser(), functionRegistry),
684+
.JsonParserConfig = CreateJsonParserConfig(rowDispatcherConfig.GetJsonParser(), functionRegistry, skipJsonErrors),
685685
.FiltersConfig = {
686686
.CompileServiceId = compileServiceId
687687
}

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ struct TFormatHandlerConfig {
8181
};
8282

8383
ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, const TCountersDesc& counters);
84-
TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConfig& rowDispatcherConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NActors::TActorId compileServiceId);
84+
TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConfig& rowDispatcherConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NActors::TActorId compileServiceId, bool skipJsonErrors);
8585

8686
namespace NTests {
8787

ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -736,7 +736,7 @@ TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr cons
736736
return ITopicParser::TPtr(parser);
737737
}
738738

739-
TJsonParserConfig CreateJsonParserConfig(const NConfig::TJsonParserConfig& parserConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) {
739+
TJsonParserConfig CreateJsonParserConfig(const NConfig::TJsonParserConfig& parserConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, bool skipErrors) {
740740
TJsonParserConfig result = {.FunctionRegistry = functionRegistry};
741741
if (const auto batchSize = parserConfig.GetBatchSizeBytes()) {
742742
result.BatchSize = batchSize;
@@ -745,7 +745,7 @@ TJsonParserConfig CreateJsonParserConfig(const NConfig::TJsonParserConfig& parse
745745
result.BufferCellCount = bufferCellCount;
746746
}
747747
result.LatencyLimit = TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs());
748-
result.SkipErrors = parserConfig.GetSkipErrors();
748+
result.SkipErrors = skipErrors;
749749
return result;
750750
}
751751

ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ struct TJsonParserConfig {
2323
};
2424

2525
TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters);
26-
TJsonParserConfig CreateJsonParserConfig(const NConfig::TJsonParserConfig& parserConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry);
26+
TJsonParserConfig CreateJsonParserConfig(const NConfig::TJsonParserConfig& parserConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, bool skipErrors);
2727

2828
} // namespace NFq::NRowDispatcher

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
263263
const NYql::IPqGateway::TPtr PqGateway;
264264
const std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
265265
const NConfig::TRowDispatcherConfig Config;
266-
const TFormatHandlerConfig FormatHandlerConfig;
266+
const TActorId CompileServiceActorId;
267+
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
267268
const i64 BufferSize;
268269
TString LogPrefix;
269270

@@ -281,6 +282,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
281282
ui64 QueuedBytes = 0;
282283
TMaybe<TString> ConsumerName;
283284
TInstant StartingMessageTimestamp;
285+
TMaybe<bool> SkipJsonErrors;
284286

285287
// Metrics
286288
TInstant WaitEventStartedAt;
@@ -401,7 +403,8 @@ TTopicSession::TTopicSession(
401403
, PqGateway(pqGateway)
402404
, CredentialsProviderFactory(credentialsProviderFactory)
403405
, Config(config)
404-
, FormatHandlerConfig(CreateFormatHandlerConfig(config, functionRegistry, compileServiceActorId))
406+
, CompileServiceActorId(compileServiceActorId)
407+
, FunctionRegistry(functionRegistry)
405408
, BufferSize(maxBufferSize)
406409
, LogPrefix("TopicSession")
407410
, Counters(counters)
@@ -767,9 +770,10 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
767770
auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive<TClientsInfo>(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup, offset)}).first->second;
768771
auto formatIt = FormatHandlers.find(handlerSettings);
769772
if (formatIt == FormatHandlers.end()) {
773+
auto config = CreateFormatHandlerConfig(Config, FunctionRegistry, CompileServiceActorId, source.GetSkipJsonErrors());
770774
formatIt = FormatHandlers.emplace(handlerSettings, CreateTopicFormatHandler(
771775
ActorContext(),
772-
FormatHandlerConfig,
776+
config,
773777
handlerSettings,
774778
{.CountersRoot = CountersRoot, .CountersSubgroup = Metrics.PartitionGroup}
775779
)).first;
@@ -781,6 +785,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
781785
}
782786

783787
ConsumerName = source.GetConsumerName();
788+
SkipJsonErrors = source.GetSkipJsonErrors();
784789
SendStatistics();
785790
}
786791

@@ -965,6 +970,11 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr&
965970
return false;
966971
}
967972

973+
if (SkipJsonErrors && SkipJsonErrors != source.GetSkipJsonErrors()) {
974+
LOG_ROW_DISPATCHER_INFO("Different skip json errors mode, expected " << SkipJsonErrors << ", actual " << source.GetSkipJsonErrors() << ", send error");
975+
SendSessionError(ev->Sender, TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Use the same skip json errors settings in all queries via RD (current mode " << SkipJsonErrors << ")"), false);
976+
return false;
977+
}
968978
return true;
969979
}
970980

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,12 @@ class TFixture : public NTests::TBaseFixture {
4848
RowDispatcherActorId = Runtime.AllocateEdgeActor();
4949
}
5050

51-
void Init(const TString& topicPath, ui64 maxSessionUsedMemory = std::numeric_limits<ui64>::max(), bool skipErrors = false) {
51+
void Init(const TString& topicPath, ui64 maxSessionUsedMemory = std::numeric_limits<ui64>::max()) {
5252
TopicPath = topicPath;
5353
Config.SetTimeoutBeforeStartSessionSec(TimeoutBeforeStartSessionSec);
5454
Config.SetMaxSessionUsedMemory(maxSessionUsedMemory);
5555
Config.SetSendStatusPeriodSec(2);
5656
Config.SetWithoutConsumer(false);
57-
Config.MutableJsonParser()->SetSkipErrors(skipErrors);
5857
Config.MutableJsonParser()->SetBatchCreationTimeoutMs(100);
5958

6059
auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory;
@@ -122,7 +121,7 @@ class TFixture : public NTests::TBaseFixture {
122121
}
123122
}
124123

125-
NYql::NPq::NProto::TDqPqTopicSource BuildSource(bool emptyPredicate = false, const TString& consumer = DefaultPqConsumer) {
124+
NYql::NPq::NProto::TDqPqTopicSource BuildSource(bool emptyPredicate = false, const TString& consumer = DefaultPqConsumer, bool skipErrors = false) {
126125
NYql::NPq::NProto::TDqPqTopicSource settings;
127126
settings.SetEndpoint(GetDefaultPqEndpoint());
128127
settings.SetTopicPath(TopicPath);
@@ -137,6 +136,7 @@ class TFixture : public NTests::TBaseFixture {
137136
if (!emptyPredicate) {
138137
settings.SetPredicate("TRUE");
139138
}
139+
settings.SetSkipJsonErrors(skipErrors);
140140
return settings;
141141
}
142142

@@ -648,8 +648,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
648648
Y_UNIT_TEST_F(WrongJson, TRealTopicFixture) {
649649
const TString topicName = "wrong_json";
650650
PQCreateStream(topicName);
651-
Init(topicName, std::numeric_limits<ui64>::max(), true);
652-
auto source = BuildSource();
651+
Init(topicName);
652+
auto source = BuildSource(false, DefaultPqConsumer, true);
653653
StartSession(ReadActorId1, source);
654654

655655
auto writeRead = [&](const std::vector<TString>& input, const TBatch& output) {
@@ -686,8 +686,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
686686
Y_UNIT_TEST_F(WrongJsonOffset, TRealTopicFixture) {
687687
const TString topicName = "wrong_json_offset";
688688
PQCreateStream(topicName);
689-
Init(topicName, std::numeric_limits<ui64>::max(), true);
690-
auto source = BuildSource();
689+
Init(topicName);
690+
auto source = BuildSource(false, DefaultPqConsumer, true);
691691
StartSession(ReadActorId1, source);
692692

693693
TString wrongJson{"wrong"};

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,9 +539,8 @@ TDqPqRdReadActor::TDqPqRdReadActor(
539539
, CredentialsProviderFactory(std::move(credentialsProviderFactory))
540540
, MaxBufferSize(bufferSize)
541541
{
542-
543542
SRC_LOG_I("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields())
544-
<< ", partitions: " << JoinSeq(',', GetPartitionsToRead()));
543+
<< ", partitions: " << JoinSeq(',', GetPartitionsToRead()) << ", skip json errors: " << SourceParams.GetSkipJsonErrors());
545544
if (Parent != this) {
546545
return;
547546
}

ydb/library/yql/providers/pq/common/yql_names.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ constexpr TStringBuf WatermarksLateArrivalDelayUsSetting = "WatermarksLateArriva
1818
constexpr TStringBuf WatermarksIdlePartitionsSetting = "WatermarksIdlePartitions";
1919
constexpr TStringBuf ReconnectPeriod = "ReconnectPeriod";
2020
constexpr TStringBuf ReadGroup = "ReadGroup";
21+
constexpr TStringBuf SkipJsonErrors = "SkipJsonErrors";
2122

2223
} // namespace NYql

0 commit comments

Comments
 (0)