From 6d1a262ddbca5cd631a0dd5e51d2ebcd1b50bf00 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 13 Nov 2025 17:11:15 +0300 Subject: [PATCH 1/7] YQ-4848 Rebalance partitions after new nodes connected (#28397) Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- ydb/core/fq/libs/actors/nodes_manager.cpp | 2 +- .../libs/config/protos/row_dispatcher.proto | 3 +- .../common/row_dispatcher_settings.cpp | 1 + .../common/row_dispatcher_settings.h | 1 + .../fq/libs/row_dispatcher/coordinator.cpp | 168 +++++++++++++++--- .../libs/row_dispatcher/events/data_plane.h | 6 + .../libs/row_dispatcher/protos/events.proto | 3 + .../libs/row_dispatcher/ut/coordinator_ut.cpp | 54 +++++- .../pq/async_io/dq_pq_rd_read_actor.cpp | 13 ++ .../pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp | 17 ++ ydb/tests/fq/yds/test_row_dispatcher.py | 46 ++++- ydb/tests/tools/fq_runner/kikimr_runner.py | 7 +- 12 files changed, 285 insertions(+), 36 deletions(-) diff --git a/ydb/core/fq/libs/actors/nodes_manager.cpp b/ydb/core/fq/libs/actors/nodes_manager.cpp index 3bd981829c3d..9bfdba23dbfe 100644 --- a/ydb/core/fq/libs/actors/nodes_manager.cpp +++ b/ydb/core/fq/libs/actors/nodes_manager.cpp @@ -297,7 +297,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped(); response->NodeIds.reserve(Peers.size()); for (const auto& info : Peers) { diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index e05dd9b82edc..68c0e3619cac 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -16,7 +16,8 @@ message TRowDispatcherCoordinatorConfig { // Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode // if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions) // Request will hang up infinitely, disabled by default - uint64 TopicPartitionsLimitPerNode = 4; + uint64 TopicPartitionsLimitPerNode = 4; // deprecated + uint64 RebalancingTimeoutSec = 5; // Automatic rebalancing partition after new nodes connected / nodes disconnected. } message TJsonParserConfig { diff --git a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp index 4bfa8dff23a3..dd2fcdde46fb 100644 --- a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp +++ b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp @@ -28,6 +28,7 @@ TRowDispatcherSettings::TCoordinatorSettings::TCoordinatorSettings(const NConfig : LocalMode(config.GetLocalMode()) , Database(config.GetDatabase()) , CoordinationNodePath(config.GetCoordinationNodePath()) + , RebalancingTimeout(TDuration::Seconds(config.GetRebalancingTimeoutSec())) {} TRowDispatcherSettings::TCoordinatorSettings::TCoordinatorSettings(const NKikimrConfig::TStreamingQueriesConfig::TExternalStorageConfig& config) diff --git a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h index d8eadaec9843..9c733c44588f 100644 --- a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h +++ b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h @@ -54,6 +54,7 @@ class TRowDispatcherSettings { YDB_ACCESSOR(bool, LocalMode, false); YDB_ACCESSOR_MUTABLE(TExternalStorageSettings, Database, {}); YDB_ACCESSOR_DEF(TString, CoordinationNodePath); + YDB_ACCESSOR(TDuration, RebalancingTimeout, TDuration::Seconds(120)); }; enum class EConsumerMode { diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index 7d03ecc108bf..0be5c465b6ac 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -26,6 +26,8 @@ using NYql::TIssues; namespace { +const ui64 DefaultRebalancingTimeoutSec = 120; + //////////////////////////////////////////////////////////////////////////////// struct TCoordinatorMetrics { @@ -33,14 +35,14 @@ struct TCoordinatorMetrics { : Counters(counters) { IncomingRequests = Counters->GetCounter("IncomingRequests", true); LeaderChanged = Counters->GetCounter("LeaderChanged", true); - PartitionsLimitPerNode = Counters->GetCounter("PartitionsLimitPerNode"); + KnownRowDispatchers = Counters->GetCounter("KnownRowDispatchers"); } ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests; ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChanged; ::NMonitoring::TDynamicCounters::TCounterPtr IsActive; - ::NMonitoring::TDynamicCounters::TCounterPtr PartitionsLimitPerNode; + ::NMonitoring::TDynamicCounters::TCounterPtr KnownRowDispatchers; }; struct TEvPrivate { @@ -48,18 +50,22 @@ struct TEvPrivate { EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvPrintState = EvBegin, EvListNodes, + EvRebalancing, + EvStartingTimeout, EvEnd }; static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); struct TEvPrintState : public NActors::TEventLocal {}; struct TEvListNodes : public NActors::TEventLocal {}; + struct TEvRebalancing : public NActors::TEventLocal {}; + struct TEvStartingTimeout : public NActors::TEventLocal {}; }; class TActorCoordinator : public TActorBootstrapped { static constexpr ui64 PrintStatePeriodSec = 300; static constexpr ui64 PrintStateToLogSplitSize = 64000; - static constexpr TDuration NodesManagerRetryPeriod = TDuration::Seconds(1); + static constexpr TDuration NodesManagerRetryPeriod = TDuration::Seconds(10); struct TTopicKey { TString Endpoint; @@ -106,11 +112,18 @@ class TActorCoordinator : public TActorBootstrapped { } }; - struct RowDispatcherInfo { - RowDispatcherInfo(bool connected, bool isLocal) + enum class ENodeState { + Initializing, // wait timeout after connected + Started + }; + + struct TRowDispatcherInfo { + TRowDispatcherInfo(bool connected, ENodeState state, bool isLocal) : Connected(connected) + , State(state) , IsLocal(isLocal) {} bool Connected = false; + ENodeState State; bool IsLocal = false; THashSet Locations; }; @@ -182,14 +195,18 @@ class TActorCoordinator : public TActorBootstrapped { TActorId LocalRowDispatcherId; const TString LogPrefix; const TString Tenant; - TMap RowDispatchers; + TMap RowDispatchers; THashMap PartitionLocations; THashMap TopicsInfo; std::unordered_map PendingReadActors; + std::unordered_set KnownReadActors; TCoordinatorMetrics Metrics; THashSet InterconnectSessions; ui64 NodesCount = 0; NActors::TActorId NodesManagerId; + bool RebalancingScheduled = false; + ENodeState State = ENodeState::Initializing; + TDuration RebalancingTimeout; public: TActorCoordinator( @@ -211,6 +228,8 @@ class TActorCoordinator : public TActorBootstrapped { void Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev); void Handle(TEvPrivate::TEvPrintState::TPtr&); void Handle(TEvPrivate::TEvListNodes::TPtr&); + void Handle(TEvPrivate::TEvRebalancing::TPtr&); + void Handle(TEvPrivate::TEvStartingTimeout::TPtr&); void Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult::TPtr&); void Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr&); @@ -224,13 +243,15 @@ class TActorCoordinator : public TActorBootstrapped { hFunc(NFq::TEvRowDispatcher::TEvCoordinatorRequest, Handle); hFunc(TEvPrivate::TEvPrintState, Handle); hFunc(TEvPrivate::TEvListNodes, Handle); + hFunc(TEvPrivate::TEvRebalancing, Handle); + hFunc(TEvPrivate::TEvStartingTimeout, Handle); hFunc(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult, Handle); hFunc(NFq::TEvNodesManager::TEvGetNodesResponse, Handle); }) private: - void AddRowDispatcher(NActors::TActorId actorId, bool isLocal); + void UpdateKnownRowDispatchers(NActors::TActorId actorId, bool isLocal); void PrintInternalState(); TTopicInfo& GetOrCreateTopicInfo(const TTopicKey& topic); std::optional GetAndUpdateLocation(const TPartitionKey& key, const TSet& filteredNodeIds); // std::nullopt if TopicPartitionsLimitPerNode reached @@ -238,9 +259,10 @@ class TActorCoordinator : public TActorBootstrapped { void UpdatePendingReadActors(); void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession); TString GetInternalState(); - bool IsReady() const; + bool IsReadyPartitionDistribution() const; void SendError(TActorId readActorId, const TCoordinatorRequest& request, const TString& message); void ScheduleNodeInfoRequest() const; + void UpdateGlobalState(); }; TActorCoordinator::TActorCoordinator( @@ -255,21 +277,27 @@ TActorCoordinator::TActorCoordinator( , Tenant(tenant) , Metrics(counters) , NodesManagerId(nodesManagerId) + , RebalancingTimeout(Config.GetRebalancingTimeout() ? Config.GetRebalancingTimeout() : TDuration::Seconds(DefaultRebalancingTimeoutSec)) { - AddRowDispatcher(localRowDispatcherId, true); + UpdateKnownRowDispatchers(localRowDispatcherId, true); } void TActorCoordinator::Bootstrap() { Become(&TActorCoordinator::StateFunc); Send(LocalRowDispatcherId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); ScheduleNodeInfoRequest(); + Schedule(RebalancingTimeout, new TEvPrivate::TEvStartingTimeout()); // Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); // Logs (InternalState) is too big - LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId() << ", NodesManagerId " << NodesManagerId); + LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId() + << ", NodesManagerId " << NodesManagerId + << ", rebalancing timeout " << RebalancingTimeout); auto nodeGroup = Metrics.Counters->GetSubgroup("node", ToString(SelfId().NodeId())); Metrics.IsActive = nodeGroup->GetCounter("IsActive"); } -void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal) { +void TActorCoordinator::UpdateKnownRowDispatchers(NActors::TActorId actorId, bool isLocal) { + LOG_ROW_DISPATCHER_TRACE("UpdateKnownRowDispatchers " << actorId.ToString()); + auto it = RowDispatchers.find(actorId); if (it != RowDispatchers.end()) { it->second.Connected = true; @@ -293,9 +321,23 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal UpdatePendingReadActors(); return; } + auto nodeState = State == ENodeState::Initializing ? ENodeState::Started : ENodeState::Initializing; + if (PartitionLocations.empty()) { + nodeState = ENodeState::Started; + } + + LOG_ROW_DISPATCHER_TRACE("Add new row dispatcher to map (state " << static_cast(nodeState) << ")"); + RowDispatchers.emplace(actorId, TRowDispatcherInfo{true, nodeState, isLocal}); + UpdateGlobalState(); + + if (nodeState == ENodeState::Initializing && !RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + } - RowDispatchers.emplace(actorId, RowDispatcherInfo{true, isLocal}); UpdatePendingReadActors(); + Metrics.KnownRowDispatchers->Set(RowDispatchers.size()); } void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) { @@ -313,7 +355,7 @@ void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& inte void TActorCoordinator::Handle(NActors::TEvents::TEvPing::TPtr& ev) { LOG_ROW_DISPATCHER_TRACE("TEvPing received, " << ev->Sender); UpdateInterconnectSessions(ev->InterconnectSession); - AddRowDispatcher(ev->Sender, false); + UpdateKnownRowDispatchers(ev->Sender, false); LOG_ROW_DISPATCHER_TRACE("Send TEvPong to " << ev->Sender); Send(ev->Sender, new NActors::TEvents::TEvPong(), IEventHandle::FlagTrackDelivery); } @@ -323,7 +365,7 @@ TString TActorCoordinator::GetInternalState() { str << "Known row dispatchers:\n"; for (const auto& [actorId, info] : RowDispatchers) { - str << " " << actorId << ", connected " << info.Connected << "\n"; + str << " " << actorId << ", state " << static_cast(info.State) << "\n"; } str << "\nLocations:\n"; @@ -361,6 +403,12 @@ void TActorCoordinator::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected: } Y_ENSURE(!info.IsLocal, "EvNodeDisconnected from local row dispatcher"); info.Connected = false; + + if (!RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + } } } @@ -378,6 +426,12 @@ void TActorCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { continue; } info.Connected = false; + + if (!RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + } return; } } @@ -403,7 +457,7 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition auto& topicInfo = GetOrCreateTopicInfo(key.Topic); - if (!IsReady()) { + if (!IsReadyPartitionDistribution()) { topicInfo.AddPendingPartition(key); return std::nullopt; } @@ -411,7 +465,7 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition TActorId bestLocation; ui64 bestNumberPartitions = std::numeric_limits::max(); for (auto& [location, info] : RowDispatchers) { - if (!info.Connected) { + if (info.State != ENodeState::Started) { continue; } if (!filteredNodeIds.empty() && !filteredNodeIds.contains(location.NodeId())) { @@ -445,6 +499,8 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev) { const auto& source = ev->Get()->Record.GetSource(); + KnownReadActors.insert(ev->Sender); + UpdateInterconnectSessions(ev->InterconnectSession); TStringStream str; @@ -509,7 +565,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC } void TActorCoordinator::UpdatePendingReadActors() { - if (!IsReady()) { + if (!IsReadyPartitionDistribution()) { return; } for (auto readActorIt = PendingReadActors.begin(); readActorIt != PendingReadActors.end();) { @@ -534,6 +590,7 @@ void TActorCoordinator::Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult } LOG_ROW_DISPATCHER_INFO("Updated node info, node count: " << ev->Get()->AssignedNodes.size() << ", AssignedNodes: " << JoinSeq(", ", ev->Get()->AssignedNodes)); NodesCount = ev->Get()->AssignedNodes.size(); + UpdateGlobalState(); UpdatePendingReadActors(); } @@ -547,14 +604,74 @@ void TActorCoordinator::Handle(TEvPrivate::TEvListNodes::TPtr&) { } } -bool TActorCoordinator::IsReady() const { +void TActorCoordinator::Handle(TEvPrivate::TEvRebalancing::TPtr&) { + LOG_ROW_DISPATCHER_DEBUG("Rebalancing..."); + RebalancingScheduled = false; + + bool needRebalance = false; + TSet toDelete; + + auto printState = [&](const TString& str){ + LOG_ROW_DISPATCHER_DEBUG(str); + for (auto& [actorId, info] : RowDispatchers) { + LOG_ROW_DISPATCHER_DEBUG(" node " << actorId.NodeId() << " (" << actorId << ") state " << (info.State == ENodeState::Initializing ? "Initializing" : "Started") << " connected " << info.Connected << " partitions count " << info.Locations.size()); + } + }; + + printState("Current state (rebalancing):"); + + for (auto& [actorId, info] : RowDispatchers) { + if (info.State == ENodeState::Initializing) { + if (info.Connected) { + info.State = ENodeState::Started; + needRebalance = true; + } else { + toDelete.insert(actorId); + } + } else { // Started + if (!info.Connected) { + toDelete.insert(actorId); + if (!info.Locations.empty()) { + needRebalance = true; + } + } + } + } + for (const auto& actorId : toDelete) { + RowDispatchers.erase(actorId); + } + if (!needRebalance) { + return; + } + + for (const auto& readActorId : KnownReadActors) { + LOG_ROW_DISPATCHER_TRACE("Send TEvCoordinatorDistributionReset to " << readActorId); + Send(readActorId, new TEvRowDispatcher::TEvCoordinatorDistributionReset(), IEventHandle::FlagTrackDelivery); + } + + for (auto& [actorId, info] : RowDispatchers) { + info.Locations.clear(); + } + PendingReadActors.clear(); + PartitionLocations.clear(); + TopicsInfo.clear(); + KnownReadActors.clear(); + + printState("Current state (after rebalancing):"); +} + +void TActorCoordinator::Handle(TEvPrivate::TEvStartingTimeout::TPtr&) { + if (State != ENodeState::Started) { + LOG_ROW_DISPATCHER_TRACE("Change global state to Started (by timeout)"); + State = ENodeState::Started; + } +} + +bool TActorCoordinator::IsReadyPartitionDistribution() const { if (Config.GetLocalMode()) { return true; } - if (!NodesCount) { - return false; - } - return RowDispatchers.size() >= NodesCount - 1; + return State == ENodeState::Started; } void TActorCoordinator::SendError(TActorId readActorId, const TCoordinatorRequest& request, const TString& message) { @@ -571,12 +688,19 @@ void TActorCoordinator::ScheduleNodeInfoRequest() const { void TActorCoordinator::Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr& ev) { NodesCount = ev->Get()->NodeIds.size(); LOG_ROW_DISPATCHER_INFO("Updated node info, node count: " << NodesCount); + UpdateGlobalState(); if (!NodesCount) { ScheduleNodeInfoRequest(); } UpdatePendingReadActors(); } +void TActorCoordinator::UpdateGlobalState() { + if (State != ENodeState::Started && NodesCount && RowDispatchers.size() >= NodesCount) { + LOG_ROW_DISPATCHER_TRACE("Change global state to Started (by nodes count)"); + State = ENodeState::Started; + } +} } // anonymous namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h index 17474e0d2498..39bd6cf33db0 100644 --- a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h +++ b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h @@ -59,6 +59,7 @@ struct TEvRowDispatcher { EvPurecalcCompileRequest, EvPurecalcCompileResponse, EvPurecalcCompileAbort, + EvCoordinatorDistributionReset, EvEnd, }; @@ -91,6 +92,11 @@ struct TEvRowDispatcher { TEvCoordinatorResult() = default; }; + struct TEvCoordinatorDistributionReset : public NActors::TEventPB { + TEvCoordinatorDistributionReset() = default; + }; + // Session events (with seqNo checks) struct TEvStartSession : public NActors::TEventPB(readActorId, TDuration::Seconds(5)); + UNIT_ASSERT(eventPtr.Get() != nullptr); + } + void ProcessNodesManagerRequest(ui64 nodesCount) { TVector nodes; nodes.reserve(nodesCount); @@ -195,7 +201,7 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { Y_UNIT_TEST_F(WaitNodesConnected, TFixture) { ExpectCoordinatorChangesSubscribe(); - ProcessNodesManagerRequest(4); + ProcessNodesManagerRequest(3); Ping(RowDispatcher1Id); MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); @@ -224,6 +230,52 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { actorId = ActorIdFromProto(result2.GetPartitions(0).GetActorId()); UNIT_ASSERT_VALUES_EQUAL(actorId.NodeId(), RowDispatcher2Id.NodeId()); } + + Y_UNIT_TEST_F(RebalanceAfterNewNodeConnected, TFixture) { + ExpectCoordinatorChangesSubscribe(); + ProcessNodesManagerRequest(1); + TSet rowDispatcherIds{LocalRowDispatcherId}; + for (auto id : rowDispatcherIds) { + Ping(id); + } + MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); + auto rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId()); + MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1}); + auto rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId()); + UNIT_ASSERT_VALUES_EQUAL(rdActor1, rdActor2); + + Ping(RowDispatcher1Id); + ExpectDistributionReset(ReadActor1); + ExpectDistributionReset(ReadActor2); + + MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); + rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId()); + MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1}); + rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId()); + UNIT_ASSERT(rdActor1 != rdActor2); + } + + Y_UNIT_TEST_F(RebalanceAfterNodeDisconnected, TFixture) { + ExpectCoordinatorChangesSubscribe(); + ProcessNodesManagerRequest(3); + TSet rowDispatcherIds{RowDispatcher1Id, RowDispatcher2Id, LocalRowDispatcherId}; + for (auto id : rowDispatcherIds) { + Ping(id); + } + + MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2}); + auto result1 = ExpectResult(ReadActor1); + UNIT_ASSERT(result1.PartitionsSize() == 3); + + auto event = new NActors::TEvInterconnect::TEvNodeDisconnected(RowDispatcher2Id.NodeId()); + Runtime.Send(new NActors::IEventHandle(Coordinator, RowDispatcher2Id, event)); + + ExpectDistributionReset(ReadActor1); + + MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2}); + result1 = ExpectResult(ReadActor1); + UNIT_ASSERT(result1.PartitionsSize() == 2); + } } } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index a827664bfa9e..c1c83bc8c61f 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -352,6 +352,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev); + void Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev); void HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev); @@ -377,6 +378,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle); hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle); hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, Handle); + hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle); hFunc(NActors::TEvents::TEvPong, Handle); hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected); @@ -404,6 +406,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: hFunc(NFq::TEvRowDispatcher::TEvSessionError, ReplyNoSession); hFunc(NFq::TEvRowDispatcher::TEvStatistics, ReplyNoSession); hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, ReplyNoSession); + hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle); hFunc(NActors::TEvents::TEvPong, Handle); hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected); @@ -913,6 +916,16 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest: Send(ev->Sender, response.release(), 0, ev->Cookie); } +void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev) { + if (CoordinatorActorId != ev->Sender) { + SRC_LOG_I("Ignore TEvCoordinatorDistributionReset, sender is not active coordinator (sender " << ev->Sender << ", current coordinator " << CoordinatorActorId << ")"); + return; + } + SRC_LOG_I("Received TEvCoordinatorDistributionReset from " << ev->Sender); + ReInit("Distribution changed"); + ScheduleProcessState(); +} + void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) { auto partitionId = ev->Get()->Record.GetPartitionId(); const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp index ba991a76c8a7..757ff49e9821 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp @@ -350,6 +350,13 @@ class TFixture : public TPqIoTestFixture { AssertDataWithWatermarks(expected, actual); } + void MockCoordinatorDistributionReset(NActors::TActorId coordinatorId) const { + CaSetup->Execute([&](TFakeActor& actor) { + auto event = new NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset(); + CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, coordinatorId, event, 0)); + }); + } + public: NYql::NPq::NProto::TDqPqTopicSource Settings = BuildPqTopicSourceSettings( "topic", @@ -888,6 +895,16 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { f.ReadMessages(expected); } } + + Y_UNIT_TEST_F(RebalanceAfterDistributionReset, TFixture) { + StartSession(Settings); + MockCoordinatorDistributionReset(CoordinatorId1); + + auto req = ExpectCoordinatorRequest(CoordinatorId1); + MockCoordinatorResult(CoordinatorId1, {{RowDispatcherId2, PartitionId1}}, req->Cookie); + ExpectStartSession({}, RowDispatcherId2, 2); + MockAck(RowDispatcherId2, 2, PartitionId1); + } } } // namespace NYql::NDq diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 43869c3e5a61..77987adb85f9 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -61,11 +61,10 @@ def wait_actor_count(kikimr, activity, expected_count): count = 0 for node_index in kikimr.compute_plane.kikimr_cluster.nodes: count = count + kikimr.compute_plane.get_actor_count(node_index, activity) - if count == expected_count: - break + if count == expected_count: + return node_index # return any node assert time.time() < deadline, f"Waiting actor {activity} count failed, current count {count}" time.sleep(1) - pass def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match=True): @@ -617,7 +616,7 @@ def test_start_new_query(self, kikimr, client): @yq_v1 def test_stop_start(self, kikimr, client): - self.init(client, "test_stop_start") + self.init(client, "test_stop_start", 10) sql1 = Rf''' INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` @@ -625,12 +624,12 @@ def test_stop_start(self, kikimr, client): WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL));''' query_id = start_yds_query(kikimr, client, sql1) - wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 10) data = ['{"time": 101}', '{"time": 102}'] self.write_stream(data) expected = ['101', '102'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) kikimr.compute_plane.wait_completed_checkpoints( query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2 @@ -652,7 +651,7 @@ def test_stop_start(self, kikimr, client): self.write_stream(data) expected = ['103', '104'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) stop_yds_query(client, query_id) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) @@ -667,7 +666,7 @@ def test_stop_start2(self, kikimr, client): wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) self.write_stream(['{"time": 101}', '{"time": 102}']) expected = ['101', '102'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) kikimr.compute_plane.wait_completed_checkpoints(query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 2) stop_yds_query(client, query_id1) @@ -1282,3 +1281,34 @@ def test_delete_topic(self, kikimr, client): self.write_stream(data) expected = ['104'] assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + + @yq_v1 + def test_redistribute_partition_after_timeout(self, kikimr, client): + partitions_count = 3 + self.init(client, "redistribute", partitions=partitions_count) + wait_row_dispatcher_sensor_value(kikimr, "KnownRowDispatchers", 2 * COMPUTE_NODE_COUNT - 1) + + sql = Rf''' + PRAGMA dq.Scheduler=@@{{"type": "single_node"}}@@; + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}` + WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL, data String NOT NULL));''' + + query_id = start_yds_query(kikimr, client, sql) + session_node_index = wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", partitions_count) + kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) + + message_count = 10 + expected = "hello" + for i in range(message_count): + self.write_stream(['{"time": 100, "data": "hello"}'], topic_path=None, partition_key=str(i)) + assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count + kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) + + logging.debug(f"Stopping node: {session_node_index}") + kikimr.compute_plane.kikimr_cluster.nodes[session_node_index].stop() + + expected = "Relativitätstheorie" + for i in range(message_count): + self.write_stream(['{"time": 101, "data": "Relativitätstheorie"}'], topic_path=None, partition_key=str(i)) + assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index 920ec10caa54..98bc27c07252 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -363,15 +363,15 @@ def get_completed_checkpoints(self, query_id, expect_counters_exist=False): return self.get_checkpoint_coordinator_metric(query_id, "CompletedCheckpoints", expect_counters_exist=expect_counters_exist) - def wait_completed_checkpoints(self, query_id, checkpoints_count, + def wait_completed_checkpoints(self, query_id, expected, timeout=plain_or_under_sanitizer_wrapper(30, 150), expect_counters_exist=False): deadline = time.time() + timeout while True: completed = self.get_completed_checkpoints(query_id, expect_counters_exist=expect_counters_exist) - if completed >= checkpoints_count: + if completed >= expected: break - assert time.time() < deadline, "Wait zero checkpoint failed, actual completed: " + str(completed) + assert time.time() < deadline, "Wait checkpoint failed, actual current: " + str(completed) + ", expected " + str(expected) time.sleep(plain_or_under_sanitizer_wrapper(0.5, 2)) def wait_zero_checkpoint(self, query_id, timeout=plain_or_under_sanitizer_wrapper(30, 150), @@ -537,6 +537,7 @@ def fill_config(self, control_plane): 'max_session_used_memory': 1000000, 'without_consumer': True} fq_config['row_dispatcher']['coordinator'] = {'coordination_node_path': "row_dispatcher"} + fq_config['row_dispatcher']['coordinator'] = {'rebalancing_timeout_sec': "5"} fq_config['row_dispatcher']['coordinator']['database'] = {} self.fill_storage_config(fq_config['row_dispatcher']['coordinator']['database'], "RowDispatcher_" + self.uuid) From f7a757b72897621b49ce9390780472046dda7da7 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 20 Nov 2025 11:17:11 +0300 Subject: [PATCH 2/7] YQ-4864 Pragma ydb.DisableCheckpoints (#28729) --- .../kqp/executer_actor/kqp_data_executer.cpp | 4 ++- ydb/core/kqp/provider/yql_kikimr_settings.cpp | 1 + ydb/core/kqp/provider/yql_kikimr_settings.h | 1 + .../kqp/query_compiler/kqp_query_compiler.cpp | 1 + ydb/core/protos/kqp_physical.proto | 1 + ydb/tests/fq/streaming/test_streaming.py | 27 +++++++++++++++++++ 6 files changed, 34 insertions(+), 1 deletion(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 1dcdfc1b57db..0ac38860a9ae 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2794,9 +2794,11 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetPreparedQuery().GetPhysicalQuery().GetDisableCheckpoints(); + bool enableCheckpointCoordinator = AppData()->FeatureFlags.GetEnableStreamingQueries() && (Request.SaveQueryPhysicalGraph || Request.QueryPhysicalGraph != nullptr) - && context && context->CheckpointId; + && context && context->CheckpointId && !disableCheckpoints; if (!enableCheckpointCoordinator) { return; } diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp index 7641bcbb71cd..88629f973bf2 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp @@ -130,6 +130,7 @@ TKikimrConfiguration::TKikimrConfiguration() { REGISTER_SETTING(*this, MaxSequentialReadsInFlight); REGISTER_SETTING(*this, KMeansTreeSearchTopSize); + REGISTER_SETTING(*this, DisableCheckpoints); /* Runtime */ REGISTER_SETTING(*this, ScanQuery); diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index ad04217b93ff..072ca3140e6d 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -98,6 +98,7 @@ struct TKikimrSettings { NCommon::TConfSetting MaxSequentialReadsInFlight; NCommon::TConfSetting KMeansTreeSearchTopSize; + NCommon::TConfSetting DisableCheckpoints; /* Runtime */ NCommon::TConfSetting ScanQuery; diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 846860897efe..d9ad9a6df433 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -647,6 +647,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { queryProto.SetForceImmediateEffectsExecution( Config->KqpForceImmediateEffectsExecution.Get().GetOrElse(false)); + queryProto.SetDisableCheckpoints(Config->DisableCheckpoints.Get().GetOrElse(false)); for (const auto& queryBlock : dataQueryBlocks) { auto queryBlockSettings = TKiDataQueryBlockSettings::Parse(queryBlock); if (queryBlockSettings.HasUncommittedChangesRead) { diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index de44bde84a6e..6a14980c6300 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -652,4 +652,5 @@ message TKqpPhyQuery { optional bool EnableHtapTx = 14; optional bool ForceImmediateEffectsExecution = 15; uint32 LangVer = 16; + optional bool DisableCheckpoints = 17; } diff --git a/ydb/tests/fq/streaming/test_streaming.py b/ydb/tests/fq/streaming/test_streaming.py index ae17f4fa4592..6c3dc130b0be 100644 --- a/ydb/tests/fq/streaming/test_streaming.py +++ b/ydb/tests/fq/streaming/test_streaming.py @@ -8,6 +8,7 @@ from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase from ydb.tests.tools.fq_runner.kikimr_metrics import load_metrics +from ydb.tests.tools.datastreams_helpers.control_plane import create_read_rule logger = logging.getLogger(__name__) @@ -387,3 +388,29 @@ def test_restart_query_by_rescaling(self, kikimr): assert self.read_stream(message_count, topic_path=self.output_topic) == ["time to lunch" for i in range(message_count)] kikimr.YdbClient.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);") + + def test_pragma(self, kikimr): + sourceName = "test_pragma" + self.init_topics(sourceName, partitions_count=10) + self.create_source(kikimr, sourceName) + create_read_rule(self.input_topic, self.consumer_name) + + query_name = "test_pragma1" + sql = R''' + CREATE STREAMING QUERY `{query_name}` AS + DO BEGIN + PRAGMA ydb.DisableCheckpoints="true"; + PRAGMA ydb.MaxTasksPerStage = "1"; + PRAGMA pq.Consumer = "{consumer_name}"; + $in = SELECT time FROM {source_name}.`{input_topic}` + WITH ( + FORMAT="json_each_row", + SCHEMA=(time String NOT NULL)); + INSERT INTO {source_name}.`{output_topic}` SELECT time FROM $in; + END DO;''' + + kikimr.YdbClient.query(sql.format(query_name=query_name, consumer_name=self.consumer_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + self.write_stream(['{"time": "lunch time"}']) + assert self.read_stream(1, topic_path=self.output_topic) == ['lunch time'] + + kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`") From eef08cc5e08394eee06c9f535175ce967e7ec69c Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 20 Nov 2025 15:19:46 +0300 Subject: [PATCH 3/7] YQ-4502 Streaming: type tests and compatibility (#28799) --- .../compatibility/streaming/test_streaming.py | 208 ++++++++++++++++++ ydb/tests/compatibility/streaming/ya.make | 32 +++ ydb/tests/compatibility/ya.make | 1 + ydb/tests/fq/streaming/test_streaming.py | 106 +++++++++ .../tools/datastreams_helpers/data_plane.py | 20 +- .../datastreams_helpers/test_yds_base.py | 8 +- 6 files changed, 365 insertions(+), 10 deletions(-) create mode 100644 ydb/tests/compatibility/streaming/test_streaming.py create mode 100644 ydb/tests/compatibility/streaming/ya.make diff --git a/ydb/tests/compatibility/streaming/test_streaming.py b/ydb/tests/compatibility/streaming/test_streaming.py new file mode 100644 index 000000000000..11703febbd04 --- /dev/null +++ b/ydb/tests/compatibility/streaming/test_streaming.py @@ -0,0 +1,208 @@ +# -*- coding: utf-8 -*- +import logging +import os +import pytest +import time + +from ydb.tests.library.compatibility.fixtures import MixedClusterFixture, RestartToAnotherVersionFixture, RollingUpgradeAndDowngradeFixture +from ydb.tests.library.harness.util import LogLevels +from ydb.tests.library.test_meta import link_test_case +from ydb.tests.oss.ydb_sdk_import import ydb +from ydb.tests.tools.datastreams_helpers.data_plane import write_stream, read_stream + +logger = logging.getLogger(__name__) + + +class StreamingTestBase: + def setup_cluster(self): + logger.debug(f"setup_cluster, versions {self.versions}") + + if min(self.versions) < (25, 4): + logger.debug("skip test, only available since 25-4") + pytest.skip("Only available since 25-4") + + os.environ["YDB_TEST_DEFAULT_CHECKPOINTING_PERIOD_MS"] = "200" + os.environ["YDB_TEST_LEASE_DURATION_SEC"] = "15" + yield from super().setup_cluster( + extra_feature_flags={ + "enable_external_data_sources": True, + "enable_streaming_queries": True + }, + additional_log_configs={ + 'KQP_COMPUTE': LogLevels.TRACE, + 'STREAMS_CHECKPOINT_COORDINATOR': LogLevels.TRACE, + 'STREAMS_STORAGE_SERVICE': LogLevels.TRACE, + 'FQ_ROW_DISPATCHER': LogLevels.TRACE, + 'KQP_PROXY': LogLevels.DEBUG, + 'KQP_EXECUTOR': LogLevels.DEBUG}, + ) + + def create_topics(self): + logger.debug("create_topics") + self.input_topic = 'streaming_recipe/input_topic' + self.output_topic = 'streaming_recipe/output_topic' + self.consumer_name = 'consumer_name' + with ydb.QuerySessionPool(self.driver) as session_pool: + query = f""" + CREATE TOPIC `{self.input_topic}`; + CREATE TOPIC `{self.output_topic}` (CONSUMER {self.consumer_name}); + """ + session_pool.execute_with_retries(query) + + def create_external_data_source(self): + logger.debug("create_external_data_source") + endpoint = f"localhost:{self.cluster.nodes[1].port}" + with ydb.QuerySessionPool(self.driver) as session_pool: + query = f""" + CREATE EXTERNAL DATA SOURCE source_name WITH ( + SOURCE_TYPE="Ydb", + LOCATION="{endpoint}", + DATABASE_NAME="{self.database_path}", + SHARED_READING="false", + AUTH_METHOD="NONE"); + """ + session_pool.execute_with_retries(query) + + def create_streaming_query(self): + logger.debug("create_streaming_query") + with ydb.QuerySessionPool(self.driver) as session_pool: + query = f""" + CREATE STREAMING QUERY `my_queries/query_name` AS DO BEGIN + $input = ( + SELECT * FROM + source_name.`{self.input_topic}` WITH ( + FORMAT = 'json_each_row', + SCHEMA (time String NOT NULL, level String NOT NULL, host String NOT NULL) + ) + ); + $filtered = (SELECT * FROM $input WHERE level == 'error'); + + $number_errors = ( + SELECT host, COUNT(*) AS error_count, CAST(HOP_START() AS String) AS ts + FROM $filtered + GROUP BY + HoppingWindow(CAST(time AS Timestamp), 'PT600S', 'PT600S'), + host + ); + + $json = (SELECT ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow())))) + FROM $number_errors + ); + + INSERT INTO source_name.`{self.output_topic}` + SELECT * FROM $json; + END DO; + + """ + session_pool.execute_with_retries(query) + + def create_simple_streaming_query(self): + logger.debug("create_simple_streaming_query") + with ydb.QuerySessionPool(self.driver) as session_pool: + query = f""" + CREATE STREAMING QUERY `my_queries/query_name` AS DO BEGIN + $input = ( + SELECT + * + FROM + source_name.`{self.input_topic}` WITH ( + FORMAT = 'json_each_row', + SCHEMA (time String NOT NULL, level String NOT NULL, host String NOT NULL) + ) + ); + + $json = (SELECT ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow())))) + FROM $input + ); + + INSERT INTO source_name.`{self.output_topic}` + SELECT * FROM $json; + END DO; + + """ + session_pool.execute_with_retries(query) + + def do_write_read(self, input, expected_output): + logger.debug("do_write_read") + endpoint = f"localhost:{self.cluster.nodes[1].port}" + time.sleep(2) + logger.debug("write data to stream") + write_stream(path=self.input_topic, data=input, database=self.database_path, endpoint=endpoint) + logger.debug("read data from stream") + assert sorted(read_stream( + path=self.output_topic, + messages_count=len(expected_output), + consumer_name=self.consumer_name, + database=self.database_path, + endpoint=endpoint)) == sorted(expected_output) + + def do_test_part1(self): + input = [ + '{"time": "2025-01-01T00:00:00.000000Z", "level": "error", "host": "host-1"}', + '{"time": "2025-01-01T00:04:00.000000Z", "level": "error", "host": "host-2"}', + '{"time": "2025-01-01T00:08:00.000000Z", "level": "error", "host": "host-1"}', + '{"time": "2025-01-01T00:12:00.000000Z", "level": "error", "host": "host-2"}', + '{"time": "2025-01-01T00:12:00.000000Z", "level": "error", "host": "host-1"}'] + expected_data = sorted([ + '{"error_count":1,"host":"host-2","ts":"2025-01-01T00:00:00Z"}', + '{"error_count":2,"host":"host-1","ts":"2025-01-01T00:00:00Z"}']) + self.do_write_read(input, expected_data) + + def do_test_part2(self): + input = [ + '{"time": "2025-01-01T00:15:00.000000Z", "level": "error", "host": "host-2"}', + '{"time": "2025-01-01T00:22:00.000000Z", "level": "error", "host": "host-1"}', + '{"time": "2025-01-01T00:22:00.000000Z", "level": "error", "host": "host-2"}'] + expected_data = sorted([ + '{"error_count":2,"host":"host-2","ts":"2025-01-01T00:10:00Z"}', + '{"error_count":1,"host":"host-1","ts":"2025-01-01T00:10:00Z"}']) + self.do_write_read(input, expected_data) + + +class TestStreamingMixedCluster(StreamingTestBase, MixedClusterFixture): + @pytest.fixture(autouse=True, scope="function") + def setup(self): + yield from self.setup_cluster() + + @link_test_case("#27924") + def test_mixed_cluster(self): + self.create_topics() + self.create_external_data_source() + self.create_streaming_query() + self.do_test_part1() + self.do_test_part2() + + +class TestStreamingRestartToAnotherVersion(StreamingTestBase, RestartToAnotherVersionFixture): + @pytest.fixture(autouse=True, scope="function") + def setup(self): + yield from self.setup_cluster() + + @link_test_case("#27924") + def test_restart_to_another_version(self): + self.create_topics() + self.create_external_data_source() + self.create_streaming_query() + self.do_test_part1() + self.change_cluster_version() + self.do_test_part2() + + +class TestStreamingRollingUpgradeAndDowngrade(StreamingTestBase, RollingUpgradeAndDowngradeFixture): + @pytest.fixture(autouse=True, scope="function") + def setup(self): + yield from self.setup_cluster() + + @link_test_case("#27924") + def test_rolling_upgrage(self): + self.create_topics() + self.create_external_data_source() + self.create_simple_streaming_query() + + for _ in self.roll(): # every iteration is a step in rolling upgrade process + # + # 2. check written data is correct during rolling upgrade + # + input = ['{"time": "2025-01-01T00:15:00.000000Z", "level": "error", "host": "host-2"}'] + expected_data = ['{"host":"host-2","level":"error","time":"2025-01-01T00:15:00.000000Z"}'] + self.do_write_read(input, expected_data) diff --git a/ydb/tests/compatibility/streaming/ya.make b/ydb/tests/compatibility/streaming/ya.make new file mode 100644 index 000000000000..47e5775e675e --- /dev/null +++ b/ydb/tests/compatibility/streaming/ya.make @@ -0,0 +1,32 @@ +PY3TEST() +INCLUDE(${ARCADIA_ROOT}/ydb/tests/ydbd_dep.inc) + + +FORK_TEST_FILES() +FORK_TESTS() +FORK_SUBTESTS() +SPLIT_FACTOR(10) + +TEST_SRCS( + test_streaming.py +) + +SIZE(LARGE) +REQUIREMENTS(cpu:16) +INCLUDE(${ARCADIA_ROOT}/ydb/tests/large.inc) + + +DEPENDS( + ydb/tests/library/compatibility/binaries + ydb/tests/tools/pq_read +) + +PEERDIR( + contrib/python/boto3 + ydb/tests/library + ydb/tests/library/compatibility + ydb/tests/library/test_meta + ydb/tests/tools/datastreams_helpers +) + +END() diff --git a/ydb/tests/compatibility/ya.make b/ydb/tests/compatibility/ya.make index 0b838af44a8b..6eca10ea3d61 100644 --- a/ydb/tests/compatibility/ya.make +++ b/ydb/tests/compatibility/ya.make @@ -52,4 +52,5 @@ END() RECURSE( s3_backups olap + streaming ) diff --git a/ydb/tests/fq/streaming/test_streaming.py b/ydb/tests/fq/streaming/test_streaming.py index 6c3dc130b0be..5e4d78241fa4 100644 --- a/ydb/tests/fq/streaming/test_streaming.py +++ b/ydb/tests/fq/streaming/test_streaming.py @@ -414,3 +414,109 @@ def test_pragma(self, kikimr): assert self.read_stream(1, topic_path=self.output_topic) == ['lunch time'] kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`") + + def test_types(self, kikimr): + sourceName = "test_types" + self.init_topics(sourceName, partitions_count=1) + + self.create_source(kikimr, sourceName) + + query_name = "test_types1" + + def test_type(self, kikimr, type, input, expected_output): + sql = R''' + CREATE STREAMING QUERY `{query_name}` AS + DO BEGIN + $in = SELECT field_name FROM {source_name}.`{input_topic}` + WITH ( + FORMAT="json_each_row", + SCHEMA=(field_name {type_name} NOT NULL)); + INSERT INTO {source_name}.`{output_topic}` SELECT CAST(field_name as String) FROM $in; + END DO;''' + + kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, type_name=type, input_topic=self.input_topic, output_topic=self.output_topic)) + self.write_stream([f"{{\"field_name\": {input}}}"]) + assert self.read_stream(1, topic_path=self.output_topic) == [expected_output] + kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`") + + test_type(self, kikimr, type="String", input='"lunch time"', expected_output='lunch time') + test_type(self, kikimr, type="Utf8", input='"Relativitätstheorie"', expected_output='Relativitätstheorie') + test_type(self, kikimr, type="Int8", input='42', expected_output='42') + test_type(self, kikimr, type="Uint64", input='777', expected_output='777') + test_type(self, kikimr, type="Float", input='1024.1024', expected_output='1024.1024') + test_type(self, kikimr, type="Double", input='-777.777', expected_output='-777.777') + test_type(self, kikimr, type="Bool", input='true', expected_output='true') + test_type(self, kikimr, type="Uuid", input='"3d6c7233-d082-4b25-83e2-10d271bbc911"', expected_output='3d6c7233-d082-4b25-83e2-10d271bbc911') + # Unsupported + # test_type(self, kikimr, type="Timestamp", input='"2025-08-25 10:49:00"', expected_output='2025-08-25T10:49:00Z') + # test_type(self, kikimr, type="Json", input='{"name": "value"}', expected_output='{"name": "value"}') + # test_type(self, kikimr, type="JsonDocument", input='{"name": "value"}', expected_output='lunch time') + + def test_raw_format(self, kikimr): + sourceName = "test_restart_query" + ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + self.init_topics(sourceName, partitions_count=10) + self.create_source(kikimr, sourceName, False) + + query_name = "test_raw_format_string" + sql = R''' + CREATE STREAMING QUERY `{query_name}` AS + DO BEGIN + $input = SELECT CAST(data AS Json) AS json FROM {source_name}.`{input_topic}` + WITH ( + FORMAT="raw", + SCHEMA=(data String)); + $parsed = SELECT JSON_VALUE(json, "$.time") as k, JSON_VALUE(json, "$.value") as v FROM $input; + INSERT INTO {source_name}.`{output_topic}` SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $parsed; + END DO;''' + path = f"/Root/{query_name}" + kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + self.wait_completed_checkpoints(kikimr, path) + + data = ['{"time": "2020-01-01T13:00:00.000000Z", "value": "lunch time"}'] + expected_data = ['{"k":"2020-01-01T13:00:00.000000Z","v":"lunch time"}'] + self.write_stream(data) + + assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data + kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`") + + query_name = "test_raw_format_default" + sql = R''' + CREATE STREAMING QUERY `{query_name}` AS + DO BEGIN + $input = SELECT CAST(Data AS Json) AS json FROM {source_name}.`{input_topic}`; + $parsed = SELECT JSON_VALUE(json, "$.time") as k, JSON_VALUE(json, "$.value") as v FROM $input; + INSERT INTO {source_name}.`{output_topic}` SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $parsed; + END DO;''' + path = f"/Root/{query_name}" + kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + self.wait_completed_checkpoints(kikimr, path) + + data = ['{"time": "2020-01-01T13:00:00.000000Z", "value": "lunch time"}'] + expected_data = ['{"k":"2020-01-01T13:00:00.000000Z","v":"lunch time"}'] + self.write_stream(data) + + assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data + kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`") + + query_name = "test_raw_format_json" + sql = R''' + CREATE STREAMING QUERY `{query_name}` AS + DO BEGIN + $input = SELECT data AS json FROM {source_name}.`{input_topic}` + WITH ( + FORMAT="raw", + SCHEMA=(data Json)); + $parsed = SELECT JSON_VALUE(json, "$.time") as k, JSON_VALUE(json, "$.value") as v FROM $input; + INSERT INTO {source_name}.`{output_topic}` SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $parsed; + END DO;''' + path = f"/Root/{query_name}" + kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + self.wait_completed_checkpoints(kikimr, path) + + data = ['{"time": "2020-01-01T13:00:00.000000Z", "value": "lunch time"}'] + expected_data = ['{"k":"2020-01-01T13:00:00.000000Z","v":"lunch time"}'] + self.write_stream(data) + + assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data + + kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`") diff --git a/ydb/tests/tools/datastreams_helpers/data_plane.py b/ydb/tests/tools/datastreams_helpers/data_plane.py index 4b66fe2bdfd1..c4555a88b554 100644 --- a/ydb/tests/tools/datastreams_helpers/data_plane.py +++ b/ydb/tests/tools/datastreams_helpers/data_plane.py @@ -17,9 +17,13 @@ READ_TOOL_TIMEOUT = plain_or_under_sanitizer(30, 300) -def write_stream(path, data, partition_key=None): - request_metadata = [("x-ydb-database", os.getenv("YDB_DATABASE"))] - channel = grpc.insecure_channel(os.getenv("YDB_ENDPOINT")) +def write_stream(path, data, partition_key=None, database=None, endpoint=None): + if database is None: + database = os.getenv("YDB_DATABASE") + if endpoint is None: + endpoint = os.getenv("YDB_ENDPOINT") + request_metadata = [("x-ydb-database", database)] + channel = grpc.insecure_channel(endpoint) stub = ydb_datastreams_v1_pb2_grpc.DataStreamsServiceStub(channel) request = datastreams_pb2.PutRecordsRequest() @@ -40,18 +44,22 @@ def write_stream(path, data, partition_key=None): # Data plane grpc API is not implemented in datastreams. -def read_stream(path, messages_count, commit_after_processing=True, consumer_name="test_client", timeout=READ_TOOL_TIMEOUT): +def read_stream(path, messages_count, commit_after_processing=True, consumer_name="test_client", timeout=READ_TOOL_TIMEOUT, database=None, endpoint=None): result_file_name = "{}-{}-read-result-{}-{}-out".format( os.getenv("PYTEST_CURRENT_TEST").replace(":", "_").replace(" (call)", ""), path.replace("/", "_"), consumer_name, uuid.uuid4() ) + if database is None: + database = os.getenv("YDB_DATABASE") + if endpoint is None: + endpoint = os.getenv("YDB_ENDPOINT") result_file = yatest.common.output_path(result_file_name) cmd = [ yatest.common.binary_path("ydb/tests/tools/pq_read/pq_read"), - "--endpoint", os.getenv("YDB_ENDPOINT"), - "--database", os.getenv("YDB_DATABASE"), + "--endpoint", endpoint, + "--database", database, "--topic-path", path, "--consumer-name", consumer_name, "--disable-cluster-discovery", diff --git a/ydb/tests/tools/datastreams_helpers/test_yds_base.py b/ydb/tests/tools/datastreams_helpers/test_yds_base.py index 23bfdb179cb0..e3a6fc3d26e9 100644 --- a/ydb/tests/tools/datastreams_helpers/test_yds_base.py +++ b/ydb/tests/tools/datastreams_helpers/test_yds_base.py @@ -21,13 +21,13 @@ def init_topics(self, prefix, create_input=True, create_output=True, partitions_ create_stream(self.output_topic, partitions_count=partitions_count) create_read_rule(self.output_topic, self.consumer_name) - def write_stream(self, data, topic_path=None, partition_key=None): + def write_stream(self, data, topic_path=None, partition_key=None, database=None, endpoint=None): topic = topic_path if topic_path else self.input_topic - write_stream(topic, data, partition_key=partition_key) + write_stream(topic, data, partition_key=partition_key, database=database, endpoint=endpoint) - def read_stream(self, messages_count, commit_after_processing=True, topic_path=None): + def read_stream(self, messages_count, commit_after_processing=True, topic_path=None, database=None, endpoint=None): topic = topic_path if topic_path else self.output_topic - return read_stream(topic, messages_count, commit_after_processing, self.consumer_name) + return read_stream(topic, messages_count, commit_after_processing, self.consumer_name, database=database, endpoint=endpoint) def wait_until(self, predicate, wait_time=plain_or_under_sanitizer(10, 50)): deadline = time.time() + wait_time From 578e83243cc154a32e6e5f850bc9ff51780bbbb2 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Tue, 18 Nov 2025 15:10:53 +0500 Subject: [PATCH 4/7] YQ-4899 fixed error resource pool 'default' not found for streaming queries (#29054) --- .../behaviour/streaming_query/queries.cpp | 4 +++- .../kqp/proxy_service/kqp_proxy_service.cpp | 2 ++ .../proxy_service/kqp_script_executions.cpp | 23 +++++++++++-------- .../run_script_actor/kqp_run_script_actor.cpp | 6 ++++- .../datastreams/datastreams_ut.cpp | 5 +++- 5 files changed, 28 insertions(+), 12 deletions(-) diff --git a/ydb/core/kqp/gateway/behaviour/streaming_query/queries.cpp b/ydb/core/kqp/gateway/behaviour/streaming_query/queries.cpp index 3174cce1dfc7..b53310e55451 100644 --- a/ydb/core/kqp/gateway/behaviour/streaming_query/queries.cpp +++ b/ydb/core/kqp/gateway/behaviour/streaming_query/queries.cpp @@ -1757,7 +1757,9 @@ class TStartStreamingQueryTableActor final : public TActionActorBaseRecord; record.SetTraceId(TStringBuilder() << "streaming-query-" << QueryPath << "-" << State.GetCurrentExecutionId()); if (const auto& token = Context.GetUserToken()) { - record.SetUserToken(token->SerializeAsString()); + if (const auto& serializedToken = token->GetSerializedToken()) { + record.SetUserToken(serializedToken); + } } auto& request = *record.MutableRequest(); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 8a331ebabe40..33ea6f13f843 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1295,6 +1295,8 @@ class TKqpProxyService : public TActorBootstrapped { return true; } + KQP_PROXY_LOG_W("Reply process error for request " << static_cast(request->EventType) << ", status: " << ydbStatus << ", issues: " << issues.ToOneLineString()); + if (request->EventType == TKqpEvents::EvPingSessionRequest) { auto response = std::make_unique(); response->Record.SetStatus(ydbStatus); diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index dd14dfd499fe..3fc6335a9819 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -372,8 +372,8 @@ class TCreateScriptOperationQuery : public TQueryBase { DECLARE $lease_state AS Int32; DECLARE $execution_meta_ttl AS Interval; DECLARE $retry_state AS JsonDocument; - DECLARE $user_sid AS Text; - DECLARE $user_group_sids AS JsonDocument; + DECLARE $user_sid AS Optional; + DECLARE $user_group_sids AS Optional; DECLARE $parameters AS String; DECLARE $graph_compressed AS Optional; DECLARE $graph_compression_method AS Optional; @@ -400,7 +400,13 @@ class TCreateScriptOperationQuery : public TQueryBase { ); )"; - const auto token = NACLib::TUserToken(Request.GetUserToken()); + std::optional userSID; + std::optional userGroupSIDs; + if (Request.HasUserToken()) { + const NACLib::TUserToken token(Request.GetUserToken()); + userSID = token.GetUserSID(); + userGroupSIDs = SequenceToJsonString(token.GetGroupSIDs()); + } std::optional graphCompressionMethod; std::optional graphCompressed; @@ -447,10 +453,10 @@ class TCreateScriptOperationQuery : public TQueryBase { .Int32(static_cast(ELeaseState::ScriptRunning)) .Build() .AddParam("$user_sid") - .Utf8(token.GetUserSID()) + .OptionalUtf8(userSID) .Build() .AddParam("$user_group_sids") - .JsonDocument(SequenceToJsonString(token.GetGroupSIDs())) + .OptionalJsonDocument(userGroupSIDs) .Build() .AddParam("$parameters") .String(SerializeParameters()) @@ -1008,10 +1014,9 @@ class TRestartScriptOperationQuery : public TQueryBase { } } - queryRequest.SetUserToken(NACLib::TUserToken( - result.ColumnParser("user_token").GetOptionalUtf8().value_or(""), - userGroupSids - ).SerializeAsString()); + if (const std::optional& userSID = result.ColumnParser("user_token").GetOptionalUtf8()) { + queryRequest.SetUserToken(NACLib::TUserToken(*userSID, userGroupSids).SerializeAsString()); + } if (const auto serializedParameters = result.ColumnParser("parameters").GetOptionalString()) { NJson::TJsonValue value; diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index d40227158804..4f94640cdd74 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -681,7 +681,11 @@ class TRunScriptActor : public NActors::TActorBootstrapped { NYql::IssuesFromMessage(issueMessage, issues); Issues.AddIssues(TruncateIssues(issues)); - LOG_I("Script query finished from " << ev->Sender << " " << record.GetYdbStatus() << ", Issues: " << Issues.ToOneLineString()); + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + LOG_I("Script query successfully finished from " << ev->Sender << ", Issues: " << Issues.ToOneLineString()); + } else { + LOG_W("Script query failed from " << ev->Sender << " " << record.GetYdbStatus() << ", Issues: " << Issues.ToOneLineString()); + } if (record.GetYdbStatus() == Ydb::StatusIds::TIMEOUT) { const TDuration timeout = GetQueryTimeout(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, Request.GetRequest().GetTimeoutMs(), {}, QueryServiceConfig); diff --git a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp index 79767d90f00b..df4251beed5b 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp @@ -161,7 +161,7 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture { std::shared_ptr GetQueryClient() { if (!QueryClient) { QueryClient = std::make_shared( - GetKikimrRunner()->GetQueryClient(TClientSettings().AuthToken(BUILTIN_ACL_ROOT)) + GetKikimrRunner()->GetQueryClient(QueryClientSettings) ); } @@ -736,6 +736,7 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture { protected: TDuration CheckpointPeriod = TDuration::MilliSeconds(200); TTestLogSettings LogSettings; + TClientSettings QueryClientSettings = TClientSettings().AuthToken(BUILTIN_ACL_ROOT); private: std::optional AppConfig; @@ -2672,6 +2673,8 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { } Y_UNIT_TEST_F(OffsetsAndStateRecoveryOnInternalRetry, TStreamingTestFixture) { + QueryClientSettings = TClientSettings(); + // Join with S3 used for introducing temporary failure and force retry on specific key constexpr char sourceBucket[] = "test_streaming_query_recovery_on_internal_retry"; From c9f2f7734543a1c11b9187c328f6810b863b622d Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Wed, 19 Nov 2025 12:05:49 +0500 Subject: [PATCH 5/7] YQ-4900 fixed empty streaming queries sys view without user token (#29100) --- .../federated_query/datastreams/datastreams_ut.cpp | 14 ++++++++++++++ .../streaming_queries/streaming_queries.cpp | 5 ++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp index df4251beed5b..4b9eace4e5dd 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp @@ -3396,6 +3396,20 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesSysView) { }, "Path = '/Root/C'"); } + Y_UNIT_TEST_F(ReadWithoutAuth, TStreamingSysViewTestFixture) { + QueryClientSettings = TClientSettings(); + Setup(); + + StartQuery("A"); + StartQuery("B"); + StartQuery("C"); + Sleep(STATS_WAIT_DURATION); + + CheckSysView({ + {"A"}, {"B"}, {"C"} + }); + } + Y_UNIT_TEST_F(SortOrderForSysView, TStreamingSysViewTestFixture) { Setup(); diff --git a/ydb/core/sys_view/streaming_queries/streaming_queries.cpp b/ydb/core/sys_view/streaming_queries/streaming_queries.cpp index 5fa6fdabd020..590363841565 100644 --- a/ydb/core/sys_view/streaming_queries/streaming_queries.cpp +++ b/ydb/core/sys_view/streaming_queries/streaming_queries.cpp @@ -285,7 +285,10 @@ class TSchemeDescribeActorBase : public TActorBootstrapped { auto request = std::make_unique(); request->DatabaseName = Database; - request->UserToken = UserToken; + + if (UserToken && UserToken->GetSanitizedToken()) { + request->UserToken = UserToken; + } request->ResultSet.reserve(Paths.size()); for (const auto& path : Paths) { From 660a82eadde30a2bb2d2073e0e57a4c8e21d375f Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Fri, 21 Nov 2025 13:56:11 +0500 Subject: [PATCH 6/7] YQ-4901 added mkql hopping stats into plan (#29101) --- .../kqp/executer_actor/kqp_executer_stats.cpp | 26 ++++++++++++++ .../kqp/executer_actor/kqp_executer_stats.h | 2 ++ ydb/core/kqp/opt/kqp_query_plan.cpp | 6 ++++ .../datastreams/datastreams_ut.cpp | 34 +++++++++++++++---- .../yql/dq/actors/protos/dq_stats.proto | 2 ++ ydb/tests/tools/kqprun/src/ydb_setup.cpp | 2 +- 6 files changed, 64 insertions(+), 8 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index aca9cadf5185..b79cbc6c923b 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -372,6 +372,8 @@ void TStageExecutionStats::Resize(ui32 taskCount) { for (auto& [_, f] : Filters) f.Resize(taskCount); for (auto& [_, a] : Aggregations) a.Resize(taskCount); + for (auto& [_, m] : Mkql) m.resize(taskCount); + MaxMemoryUsage.Resize(taskCount); Finished.resize(taskCount); } @@ -669,6 +671,24 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS } } + for (const auto& mkqlStat : taskStats.GetMkqlStats()) { + if (const auto& name = mkqlStat.GetName()) { + std::vector* stats = nullptr; + const auto value = mkqlStat.GetValue(); + if (value) { + stats = &Mkql.emplace(name, TaskCount).first->second; + } else if (auto it = Mkql.find(name); it != Mkql.end()) { + stats = &it->second; + } else { + continue; + } + + AFL_ENSURE(stats); + AFL_ENSURE(index < stats->size()); + (*stats)[index] = value; + } + } + MaxMemoryUsage.SetNonZero(index, maxMemoryUsage); return baseTimeMs; @@ -1054,6 +1074,9 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask( UpdateAggr(tableStats.MutableReadRows(), tableStat.GetReadRows()); UpdateAggr(tableStats.MutableReadBytes(), tableStat.GetReadBytes()); } + for (const auto& mkqlStat : task.GetMkqlStats()) { + UpdateAggr(&(*stageStats->MutableMkql())[mkqlStat.GetName()], mkqlStat.GetValue()); + } } void TQueryExecutionStats::AddComputeActorProfileStatsByTask( @@ -1672,6 +1695,9 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st ExportAggStats(a.Bytes, *aggrStat.MutableBytes()); ExportAggStats(a.Rows, *aggrStat.MutableRows()); } + for (auto& [id, m] : stageStat.Mkql) { + ExportAggStats(m, (*stageStats.MutableMkql())[id]); + } } } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index d3c23958c25d..10b99db9f3d0 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -249,6 +249,8 @@ struct TStageExecutionStats { std::map Filters; std::map Aggregations; + std::unordered_map> Mkql; + TTimeSeriesStats MaxMemoryUsage; ui32 HistorySampleCount = 0; diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 2d151b5d5a62..b607fe5d0736 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -3275,6 +3275,12 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD } } } + if (const auto& mkqlStat = (*stat)->GetMkql(); !mkqlStat.empty()) { + auto& mkqlStats = stats.InsertValue("Mkql", NJson::JSON_MAP); + for (const auto& [name, m] : mkqlStat) { + FillAggrStat(mkqlStats, m, name); + } + } NKqpProto::TKqpStageExtraStats kqpStageStats; if ((*stat)->GetExtra().UnpackTo(&kqpStageStats)) { diff --git a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp index 4b9eace4e5dd..87a85830063b 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp @@ -374,11 +374,12 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture { DATABASE_NAME = "{pq_database_name}", AUTH_METHOD = "BASIC", LOGIN = "root", - PASSWORD_SECRET_NAME = "secret_local_password" + PASSWORD_SECRET_NAME = "{secret_name}" );)", "pq_source"_a = pqSourceName, "pq_location"_a = YDB_ENDPOINT, - "pq_database_name"_a = YDB_DATABASE + "pq_database_name"_a = YDB_DATABASE, + "secret_name"_a = secretName )); } @@ -521,6 +522,7 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture { req.SetQuery(query); req.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); req.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT); + req.SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE); auto ev = std::make_unique(); ev->Record = queryProto; @@ -530,6 +532,7 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture { ev->SaveQueryPhysicalGraph = settings.SaveState; ev->QueryPhysicalGraph = settings.PhysicalGraph; ev->CheckpointId = settings.CheckpointId; + ev->ProgressStatsPeriod = TDuration::Seconds(1); const auto edgeActor = runtime.AllocateEdgeActor(); runtime.Send(MakeKqpProxyID(runtime.GetNodeId()), edgeActor, ev.release()); @@ -1071,7 +1074,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQueryDatastreams) { CreatePqSourceBasicAuth(sourceName, useSchemaSecrets); const auto scriptExecutionOperation = ExecScript(fmt::format(R"( - SELECT * FROM `{source}`.`{topic}` + SELECT key || "{id}", value FROM `{source}`.`{topic}` WITH ( FORMAT="json_each_row", SCHEMA=( @@ -1081,19 +1084,25 @@ Y_UNIT_TEST_SUITE(KqpFederatedQueryDatastreams) { LIMIT 1; )", "source"_a=sourceName, - "topic"_a=topicName + "topic"_a=topicName, + "id"_a = i )); WriteTopicMessage(topicName, R"({"key": "key1", "value": "value1"})"); - CheckScriptResult(scriptExecutionOperation, 2, 1, [](TResultSetParser& result) { - UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser(0).GetString(), "key1"); + CheckScriptResult(scriptExecutionOperation, 2, 1, [i](TResultSetParser& result) { + UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser(0).GetString(), "key1" + ToString(i)); UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser(1).GetString(), "value1"); }); + + const auto metadata = GetScriptExecutionOperation(scriptExecutionOperation).Metadata(); + const auto& plan = metadata.ExecStats.GetPlan(); + UNIT_ASSERT(plan); + UNIT_ASSERT_STRING_CONTAINS(*plan, "Mkql_TotalNodes"); } } - Y_UNIT_TEST_F(ExplainReadTopicBasic, TStreamingTestFixture) { + Y_UNIT_TEST_F(ReadTopicExplainBasic, TStreamingTestFixture) { const TString sourceName = "sourceName"; const TString topicName = "topicName"; CreateTopic(topicName); @@ -1485,6 +1494,17 @@ Y_UNIT_TEST_SUITE(KqpFederatedQueryDatastreams) { CheckScriptResult(result[0], 1, 1, [](TResultSetParser& resultSet) { UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUint64(), 2); }); + + WaitFor(TDuration::Seconds(5), "operation stats", [&](TString& error) { + const auto metadata = GetScriptExecutionOperation(operationId).Metadata(); + const auto& plan = metadata.ExecStats.GetPlan(); + if (plan && plan->contains("MultiHop_NewHopsCount")) { + return true; + } + + error = TStringBuilder() << "plan is not available, status: " << metadata.ExecStatus << ", plan: " << plan.value_or(""); + return false; + }); } Y_UNIT_TEST_F(CheckpointsOnNotDrainedChannels, TStreamingTestFixture) { diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index e249f2b0369b..ee904d9283fb 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -467,6 +467,8 @@ message TDqStageStats { map OperatorFilter = 44; map OperatorAggregation = 45; + map Mkql = 53; + TDqStatsAggr MaxMemoryUsage = 24; google.protobuf.Any Extra = 100; diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index 80c5ca796e67..f264ac4702bf 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -574,7 +574,7 @@ class TYdbSetup::TImpl : public TKikimrSetupBase { request->SetQuery(query.Query); request->SetType(type); request->SetAction(query.Action); - request->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL); + request->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE); request->SetDatabase(database); request->SetPoolId(query.PoolId); request->MutableYdbParameters()->insert(query.Params.begin(), query.Params.end()); From ad5d846303a4cb13307e8b7a7658580b2674165a Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Tue, 25 Nov 2025 16:12:06 +0500 Subject: [PATCH 7/7] YQ-4911 disabled transparent sys columns in PQ source (#29332) --- ydb/core/kqp/host/kqp_host.cpp | 1 + .../datastreams/datastreams_ut.cpp | 56 ++++++++ .../pq/async_io/dq_pq_meta_extractor.cpp | 131 +++++++++--------- .../providers/pq/common/pq_meta_fields.cpp | 113 +++++++++++---- .../yql/providers/pq/common/pq_meta_fields.h | 18 +-- .../pq/provider/yql_pq_datasource.cpp | 6 +- .../provider/yql_pq_datasource_type_ann.cpp | 6 +- .../pq/provider/yql_pq_dq_integration.cpp | 6 +- .../providers/pq/provider/yql_pq_provider.h | 3 +- .../ast.txt | 4 +- 10 files changed, 229 insertions(+), 115 deletions(-) diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index b1f2947174dc..570a3820192f 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1885,6 +1885,7 @@ class TKqpHost : public IKqpHost { TString sessionId = CreateGuidAsString(); auto state = MakeIntrusive(sessionId); state->SupportRtmrMode = false; + state->AllowTransparentSystemColumns = false; state->Types = TypesCtx.Get(); state->DbResolver = FederatedQuerySetup->DatabaseAsyncResolver; state->FunctionRegistry = FuncRegistry; diff --git a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp index 87a85830063b..d4257917f4fb 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp @@ -3365,6 +3365,62 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { CheckScriptExecutionsCount(0, 0); } } + + Y_UNIT_TEST_F(WritingInLocalYdbTablesWithProjection, TStreamingTestFixture) { + constexpr char pqSourceName[] = "pqSource"; + CreatePqSource(pqSourceName); + + for (const bool rowTables : {true, false}) { + const auto inputTopicName = TStringBuilder() << "writingInLocalYdbWithLimitInputTopicName" << rowTables; + CreateTopic(inputTopicName); + + const auto ydbTable = TStringBuilder() << "tableSink" << rowTables; + ExecQuery(fmt::format(R"( + CREATE TABLE `{table}` ( + Key String NOT NULL, + Value String NOT NULL, + PRIMARY KEY (Key) + ) {settings})", + "table"_a = ydbTable, + "settings"_a = rowTables ? "" : "WITH (STORE = COLUMN)" + )); + + const auto queryName = TStringBuilder() << "streamingQuery" << rowTables; + ExecQuery(fmt::format(R"( + CREATE STREAMING QUERY `{query_name}` AS + DO BEGIN + UPSERT INTO `{ydb_table}` + SELECT (Key || "x") AS Key, Value FROM `{pq_source}`.`{input_topic}` WITH ( + FORMAT = json_each_row, + SCHEMA ( + Key String NOT NULL, + Value String NOT NULL + ) + ) LIMIT 1 + END DO;)", + "query_name"_a = queryName, + "pq_source"_a = pqSourceName, + "input_topic"_a = inputTopicName, + "ydb_table"_a = ydbTable + )); + + CheckScriptExecutionsCount(1, 1); + Sleep(TDuration::Seconds(1)); + + WriteTopicMessage(inputTopicName, R"({"Key": "message1", "Value": "value1"})"); + Sleep(TDuration::Seconds(1)); + CheckTable(*this, ydbTable, {{"message1x", "value1"}}); + + Sleep(TDuration::Seconds(1)); + CheckScriptExecutionsCount(1, 0); + + ExecQuery(fmt::format( + "DROP STREAMING QUERY `{query_name}`", + "query_name"_a = queryName + )); + CheckScriptExecutionsCount(0, 0); + } + } } Y_UNIT_TEST_SUITE(KqpStreamingQueriesSysView) { diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp index 0d333df8c379..69e5e26e4f77 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp @@ -3,79 +3,84 @@ #include #include +namespace NYql::NDq { + namespace { - const std::unordered_map ExtractorsMap = { - { - "_yql_sys_create_time", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ - using TDataType = NYql::NUdf::TDataType; - return std::make_pair( - NYql::NUdf::TUnboxedValuePod(static_cast(message.GetCreateTime().MicroSeconds())), - NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize - ); - } - }, - { - "_yql_sys_tsp_write_time", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ - using TDataType = NYql::NUdf::TDataType; - return std::make_pair( - NYql::NUdf::TUnboxedValuePod(static_cast(message.GetWriteTime().MicroSeconds())), - NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize - ); - } - }, - { - "_yql_sys_partition_id", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ - using TDataType = NYql::NUdf::TDataType; - return std::make_pair( - NYql::NUdf::TUnboxedValuePod(static_cast(message.GetPartitionSession()->GetPartitionId())), - NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize - ); - } - }, - { - "_yql_sys_offset", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ - using TDataType = NYql::NUdf::TDataType; - return std::make_pair( - NYql::NUdf::TUnboxedValuePod(static_cast(message.GetOffset())), - NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize); - } - }, - { - "_yql_sys_message_group_id", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ - const auto& data = message.GetMessageGroupId(); - return std::make_pair( - NKikimr::NMiniKQL::MakeString(NYql::NUdf::TStringRef(data.data(), data.size())), - data.size() - ); - } - }, - { - "_yql_sys_seq_no", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ - using TDataType = NYql::NUdf::TDataType; - return std::make_pair( - NYql::NUdf::TUnboxedValuePod(static_cast(message.GetSeqNo())), - NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize - ); - } - }, - }; -} -namespace NYql::NDq { +const std::unordered_map ExtractorsMap = { + { + "create_time", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) { + using TDataType = NUdf::TDataType; + return std::make_pair( + NUdf::TUnboxedValuePod(static_cast(message.GetCreateTime().MicroSeconds())), + NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize + ); + } + }, + { + "write_time", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) { + using TDataType = NUdf::TDataType; + return std::make_pair( + NUdf::TUnboxedValuePod(static_cast(message.GetWriteTime().MicroSeconds())), + NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize + ); + } + }, + { + "partition_id", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) { + using TDataType = NUdf::TDataType; + return std::make_pair( + NUdf::TUnboxedValuePod(static_cast(message.GetPartitionSession()->GetPartitionId())), + NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize + ); + } + }, + { + "offset", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) { + using TDataType = NUdf::TDataType; + return std::make_pair( + NUdf::TUnboxedValuePod(static_cast(message.GetOffset())), + NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize); + } + }, + { + "message_group_id", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) { + const auto& data = message.GetMessageGroupId(); + return std::make_pair( + NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.data(), data.size())), + data.size() + ); + } + }, + { + "seq_no", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) { + using TDataType = NUdf::TDataType; + return std::make_pair( + NUdf::TUnboxedValuePod(static_cast(message.GetSeqNo())), + NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize + ); + } + }, +}; + +} // anonymous namespace TPqMetaExtractor::TPqMetaExtractor() { - for (const auto& key : AllowedPqMetaSysColumns()) { - Y_ENSURE( - ExtractorsMap.contains(key), - "Pq metadata field " << key << " hasn't valid runtime extractor. You should add it."); + for (const auto& sysColumn : AllowedPqMetaSysColumns(true)) { + const auto key = SkipPqSystemPrefix(sysColumn); + Y_ENSURE(key, sysColumn); + Y_ENSURE(ExtractorsMap.contains(*key), "Pq metadata field " << *key << " hasn't valid runtime extractor. You should add it."); } } TPqMetaExtractor::TPqMetaExtractorLambda TPqMetaExtractor::FindExtractorLambda(const TString& sysColumn) const { - auto iter = ExtractorsMap.find(sysColumn); + const auto key = SkipPqSystemPrefix(sysColumn); + Y_ENSURE(key, sysColumn); + + const auto iter = ExtractorsMap.find(*key); Y_ENSURE(iter != ExtractorsMap.end(), sysColumn); return iter->second; } -} +} // namespace NYql::NDq diff --git a/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp b/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp index b665a37e4f3c..92276fe46e54 100644 --- a/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp +++ b/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp @@ -3,52 +3,105 @@ #include +namespace NYql { + namespace { - const std::vector PqMetaFields = { - NYql::TMetaFieldDescriptor("create_time", "_yql_sys_create_time", NYql::NUdf::EDataSlot::Timestamp), - NYql::TMetaFieldDescriptor("write_time", "_yql_sys_tsp_write_time", NYql::NUdf::EDataSlot::Timestamp), - NYql::TMetaFieldDescriptor("partition_id", "_yql_sys_partition_id", NYql::NUdf::EDataSlot::Uint64), - NYql::TMetaFieldDescriptor("offset", "_yql_sys_offset", NYql::NUdf::EDataSlot::Uint64), - NYql::TMetaFieldDescriptor("message_group_id", "_yql_sys_message_group_id", NYql::NUdf::EDataSlot::String), - NYql::TMetaFieldDescriptor("seq_no", "_yql_sys_seq_no", NYql::NUdf::EDataSlot::Uint64), - }; -} -namespace NYql { +class TPqMetadataField { +public: + static constexpr char SYS_PREFIX[] = "_yql_sys_"; + static constexpr char TRANSPARENT_PREFIX[] = "tsp_"; + + explicit TPqMetadataField(NUdf::EDataSlot type, bool transparent = false) + : Type(type) + , Transparent(transparent) + {} + + TString GetSysColumn(const TString& key, bool allowTransparentColumns) const { + auto systemPrefix = TStringBuilder() << SYS_PREFIX; + if (Transparent && allowTransparentColumns) { + systemPrefix << TRANSPARENT_PREFIX; + } + + return systemPrefix << key; + } + + TMetaFieldDescriptor GetDescriptor(const TString& key, bool allowTransparentColumns) const { + return { + .Key = key, + .SysColumn = GetSysColumn(key, allowTransparentColumns), + .Type = Type, + }; + } + +public: + const NUdf::EDataSlot Type; + const bool Transparent; +}; + +const std::unordered_map PqMetaFields = { + {"create_time", TPqMetadataField(NUdf::EDataSlot::Timestamp)}, + {"write_time", TPqMetadataField(NUdf::EDataSlot::Timestamp, /* transparent */ true)}, + {"partition_id", TPqMetadataField(NUdf::EDataSlot::Uint64)}, + {"offset", TPqMetadataField(NUdf::EDataSlot::Uint64)}, + {"message_group_id", TPqMetadataField(NUdf::EDataSlot::String)}, + {"seq_no", TPqMetadataField(NUdf::EDataSlot::Uint64)}, +}; -const TMetaFieldDescriptor* FindPqMetaFieldDescriptorByKey(const TString& key) { - const auto iter = std::find_if( - PqMetaFields.begin(), - PqMetaFields.end(), - [&](const NYql::TMetaFieldDescriptor& item){ return item.Key == key; }); - if (iter != PqMetaFields.end()) { - return iter; +} // anonymous namespace + +std::optional SkipPqSystemPrefix(const TString& sysColumn, bool* isTransparent) { + TStringBuf keyBuf(sysColumn); + if (!keyBuf.SkipPrefix(TPqMetadataField::SYS_PREFIX)) { + return std::nullopt; + } + + const bool transparent = keyBuf.SkipPrefix(TPqMetadataField::TRANSPARENT_PREFIX); + if (isTransparent) { + *isTransparent = transparent; + } + + return TString(keyBuf); +} + +std::optional FindPqMetaFieldDescriptorByKey(const TString& key, bool allowTransparentColumns) { + const auto it = PqMetaFields.find(key); + if (it == PqMetaFields.end()) { + return std::nullopt; } - return nullptr; + return it->second.GetDescriptor(key, allowTransparentColumns); } -const TMetaFieldDescriptor* FindPqMetaFieldDescriptorBySysColumn(const TString& sysColumn) { - const auto iter = std::find_if( - PqMetaFields.begin(), - PqMetaFields.end(), - [&](const NYql::TMetaFieldDescriptor& item){ return item.SysColumn == sysColumn; }); - if (iter != PqMetaFields.end()) { - return iter; +std::optional FindPqMetaFieldDescriptorBySysColumn(const TString& sysColumn) { + bool transparent = false; + const auto key = SkipPqSystemPrefix(sysColumn, &transparent); + if (!key) { + return std::nullopt; + } + + const auto it = PqMetaFields.find(*key); + if (it == PqMetaFields.end()) { + return std::nullopt; } - return nullptr; + const auto& metadata = it->second; + if (transparent && !metadata.Transparent) { + return std::nullopt; + } + + return metadata.GetDescriptor(*key, transparent); } -std::vector AllowedPqMetaSysColumns() { +std::vector AllowedPqMetaSysColumns(bool allowTransparentColumns) { std::vector res; res.reserve(PqMetaFields.size()); - for (const auto& descriptor : PqMetaFields) { - res.emplace_back(descriptor.SysColumn); + for (const auto& [key, field] : PqMetaFields) { + res.emplace_back(field.GetSysColumn(key, allowTransparentColumns)); } return res; } -} +} // namespace NYql diff --git a/ydb/library/yql/providers/pq/common/pq_meta_fields.h b/ydb/library/yql/providers/pq/common/pq_meta_fields.h index 03fbaac753c5..1ac9095783ac 100644 --- a/ydb/library/yql/providers/pq/common/pq_meta_fields.h +++ b/ydb/library/yql/providers/pq/common/pq_meta_fields.h @@ -12,23 +12,17 @@ namespace NYql { struct TMetaFieldDescriptor { -public: - TMetaFieldDescriptor(TString key, TString sysColumn, NUdf::EDataSlot type) - : Key(key) - , SysColumn(sysColumn) - , Type(type) - { } - -public: const TString Key; const TString SysColumn; const NUdf::EDataSlot Type; }; -const TMetaFieldDescriptor* FindPqMetaFieldDescriptorByKey(const TString& key); +std::optional SkipPqSystemPrefix(const TString& sysColumn, bool* isTransparent = nullptr); -const TMetaFieldDescriptor* FindPqMetaFieldDescriptorBySysColumn(const TString& sysColumn); +std::optional FindPqMetaFieldDescriptorByKey(const TString& key, bool allowTransparentColumns); -std::vector AllowedPqMetaSysColumns(); +std::optional FindPqMetaFieldDescriptorBySysColumn(const TString& sysColumn); -} +std::vector AllowedPqMetaSysColumns(bool allowTransparentColumns); + +} // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp index 3cdeff74a856..a35f720675b5 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp @@ -20,6 +20,8 @@ namespace NYql { using namespace NNodes; +namespace { + class TPqDataSourceProvider : public TDataProviderBase { public: TPqDataSourceProvider(TPqState::TPtr state, IPqGateway::TPtr gateway) @@ -104,7 +106,7 @@ class TPqDataSourceProvider : public TDataProviderBase { } TVector sourceMetadata; - for (auto sysColumn : AllowedPqMetaSysColumns()) { + for (auto sysColumn : AllowedPqMetaSysColumns(State_->AllowTransparentSystemColumns)) { sourceMetadata.push_back(Build(ctx, read.Pos()) .Name().Build("system") .Value().Build(sysColumn) @@ -277,6 +279,8 @@ class TPqDataSourceProvider : public TDataProviderBase { THolder IODiscoveryTransformer_; }; +} // anonymous namespace + TIntrusivePtr CreatePqDataSource(TPqState::TPtr state, IPqGateway::TPtr gateway) { return new TPqDataSourceProvider(state, gateway); } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp index c4e6a034cf58..61f93f41da13 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp @@ -205,7 +205,7 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { if (!State_->IsRtmrMode() && !NCommon::ValidateFormatForInput( // Rtmr has 3 field (key/subkey/value). format->Content(), schema->Cast()->GetItemType()->Cast(), - [](TStringBuf fieldName) {return FindPqMetaFieldDescriptorBySysColumn(TString(fieldName)); }, + [](TStringBuf fieldName) {return FindPqMetaFieldDescriptorBySysColumn(TString(fieldName)).has_value(); }, ctx)) { return TStatus::Error; } @@ -407,7 +407,7 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { } const auto metadataKey = TString(key->TailPtr()->Content()); - const auto descriptor = FindPqMetaFieldDescriptorByKey(metadataKey); + const auto descriptor = FindPqMetaFieldDescriptorByKey(metadataKey, State_->AllowTransparentSystemColumns); if (!descriptor) { ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Metadata key " << metadataKey << " wasn't found")); @@ -480,7 +480,7 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { TPqState::TPtr State_; }; -} +} // anonymous namespace THolder CreatePqDataSourceTypeAnnotationTransformer(TPqState::TPtr state) { return MakeHolder(state); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index 87ac32d98faf..6e95bc0d1e88 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -476,7 +476,7 @@ class TPqDqIntegration: public TDqIntegrationBase { .Done()); TExprNode::TListType metadataFieldsList; - for (const auto& sysColumn : AllowedPqMetaSysColumns()) { + for (const auto& sysColumn : AllowedPqMetaSysColumns(State_->AllowTransparentSystemColumns)) { metadataFieldsList.push_back(ctx.NewAtom(pos, sysColumn)); } @@ -531,10 +531,10 @@ class TPqDqIntegration: public TDqIntegrationBase { TPqState* State_; // State owns dq integration, so back reference must be not smart. }; -} +} // anonymous namespace THolder CreatePqDqIntegration(const TPqState::TPtr& state) { return MakeHolder(state); } -} +} // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h index 5ffefca938f2..5201eecac1b7 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h @@ -10,7 +10,7 @@ namespace NKikimr::NMiniKQL { class IFunctionRegistry; -} +} // namespace NKikimr::NMiniKQL namespace NYql { @@ -46,6 +46,7 @@ struct TPqState : public TThrRefBase { public: bool SupportRtmrMode = false; bool UseActorSystemThreadsInTopicClient = true; + bool AllowTransparentSystemColumns = true; const TString SessionId; THashMap, TTopicMeta> Topics; diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_pq-ReadTopicWithMetadata-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_pq-ReadTopicWithMetadata-default.txt_/ast.txt index ae1bd17eddb1..2b203779bef4 100644 --- a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_pq-ReadTopicWithMetadata-default.txt_/ast.txt +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_pq-ReadTopicWithMetadata-default.txt_/ast.txt @@ -3,7 +3,7 @@ (let $2 (Configure! $1 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) (let $3 (DataSink 'result)) (let $4 (DataSource '"pq" '"pq")) -(let $5 '('('"system" '_yql_sys_create_time) '('"system" '_yql_sys_tsp_write_time) '('"system" '_yql_sys_partition_id) '('"system" '_yql_sys_offset) '('"system" '_yql_sys_message_group_id) '('"system" '_yql_sys_seq_no))) +(let $5 '('('"system" '_yql_sys_seq_no) '('"system" '_yql_sys_partition_id) '('"system" '_yql_sys_create_time) '('"system" '_yql_sys_message_group_id) '('"system" '_yql_sys_tsp_write_time) '('"system" '_yql_sys_offset))) (let $6 (DataType 'String)) (let $7 (OptionalType $6)) (let $8 '('"color" $7)) @@ -16,7 +16,7 @@ (let $15 (StructType '('_yql_sys_create_time $13) '('_yql_sys_message_group_id $6) '('_yql_sys_offset $14) '('_yql_sys_partition_id $14) '('_yql_sys_seq_no $14) '('_yql_sys_tsp_write_time $13) $8 $9)) (let $16 (DqPqTopicSource world $10 '('"color" '"value") $12 (SecureParam '"cluster:default_pq") '"" $15 '"")) (let $17 (DqStage '((DqSource $4 $16)) (lambda '($22) (block '( - (let $23 '('_yql_sys_create_time '_yql_sys_tsp_write_time '_yql_sys_partition_id '_yql_sys_offset '_yql_sys_message_group_id '_yql_sys_seq_no)) + (let $23 '('_yql_sys_seq_no '_yql_sys_partition_id '_yql_sys_create_time '_yql_sys_message_group_id '_yql_sys_tsp_write_time '_yql_sys_offset)) (let $24 '('('"format" '"json_each_row") '('"metadataColumns" $23) '('"formatSettings" '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX"))) '('"settings" '($11)))) (let $25 (DqSourceWideWrap $22 $4 $15 $24)) (return (FlatMap (NarrowMap $25 (lambda '($26 $27 $28 $29 $30 $31 $32 $33) (AsStruct '('_yql_sys_create_time $26) '('_yql_sys_message_group_id $27) '('_yql_sys_offset $28) '('_yql_sys_partition_id $29) '('_yql_sys_seq_no $30) '('_yql_sys_tsp_write_time $31) '('"color" $32) '('"value" $33)))) (lambda '($34) (Just (AsStruct '('"color" (Member $34 '"color")) '('"create_time" (Member $34 '_yql_sys_create_time)) '('"message_group_id" (Member $34 '_yql_sys_message_group_id)) '('"offset" (Member $34 '_yql_sys_offset)) '('"partition_id" (Member $34 '_yql_sys_partition_id)) '('"v" (Member $34 '_yql_sys_seq_no)) '('"value" (Member $34 '"value")) '('"write_time" (Member $34 '_yql_sys_tsp_write_time)))))))