Skip to content

Commit cabc8e5

Browse files
GrigoriyPACopilot
authored andcommitted
YQ-4860 supported automatic access revert after secure FF disabled (ydb-platform#28202)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 50323ee commit cabc8e5

File tree

24 files changed

+555
-178
lines changed

24 files changed

+555
-178
lines changed

ydb/core/kqp/common/events/script_executions.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,21 @@ struct TEvSaveScriptPhysicalGraphResponse : public TEventLocal<TEvSaveScriptPhys
342342
NYql::TIssues Issues;
343343
};
344344

345+
struct TEvGetScriptExecutionPhysicalGraph : public TEventWithDatabaseId<TEvGetScriptExecutionPhysicalGraph, TKqpScriptExecutionEvents::EvGetScriptExecutionPhysicalGraph> {
346+
TEvGetScriptExecutionPhysicalGraph(const TString& database, const TString& executionId)
347+
: TEventWithDatabaseId(database)
348+
, ExecutionId(executionId)
349+
{}
350+
351+
const TString ExecutionId;
352+
};
353+
345354
struct TEvGetScriptPhysicalGraphResponse : public TEventLocal<TEvGetScriptPhysicalGraphResponse, TKqpScriptExecutionEvents::EvGetScriptPhysicalGraphResponse> {
355+
TEvGetScriptPhysicalGraphResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
356+
: Status(status)
357+
, Issues(std::move(issues))
358+
{}
359+
346360
TEvGetScriptPhysicalGraphResponse(Ydb::StatusIds::StatusCode status, NKikimrKqp::TQueryPhysicalGraph&& physicalGraph, i64 generation, NYql::TIssues issues)
347361
: Status(status)
348362
, PhysicalGraph(std::move(physicalGraph))

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ struct TKqpScriptExecutionEvents {
176176
EvGetScriptPhysicalGraphResponse,
177177
EvSaveScriptProgressResponse,
178178
EvResetScriptExecutionRetriesResponse,
179+
EvGetScriptExecutionPhysicalGraph,
179180
};
180181
};
181182

ydb/core/kqp/gateway/behaviour/streaming_query/behaviour.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ NMetadata::NInitializer::IInitializationBehaviour::TPtr TStreamingQueryBehaviour
1414
}
1515

1616
TString TStreamingQueryBehaviour::GetInternalStorageTablePath() const {
17-
return "streaming/queries";
17+
return TStreamingQueryConfig::InternalTablesPath;
1818
}
1919

2020
NMetadata::NModifications::IOperationsManager::TPtr TStreamingQueryBehaviour::ConstructOperationsManager() const {

ydb/core/kqp/gateway/behaviour/streaming_query/common/utils.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
#include "utils.h"
22

3+
#include <ydb/core/base/path.h>
34
#include <ydb/core/protos/flat_scheme_op.pb.h>
45

56
#include <yql/essentials/sql/v1/node.h>
67

78
namespace NKikimr::NKqp {
89

10+
TString TStreamingQueryMeta::GetTablesPath() {
11+
return JoinPath({".metadata", InternalTablesPath});
12+
}
13+
914
TStreamingQuerySettings& TStreamingQuerySettings::FromProto(const NKikimrSchemeOp::TStreamingQueryProperties& info) {
1015
for (const auto& [name, value] : info.GetProperties()) {
1116
if (name == TStreamingQueryMeta::TSqlSettings::QUERY_TEXT_FEATURE) {

ydb/core/kqp/gateway/behaviour/streaming_query/common/utils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ class TStreamingQueryMeta {
3131
// Internal query info
3232
static inline constexpr char QueryTextRevision[] = "__query_text_revision";
3333
};
34+
35+
static inline constexpr char InternalTablesPath[] = "streaming/queries";
36+
37+
static TString GetTablesPath();
3438
};
3539

3640
// Used for properties parsing after describing streaming query

ydb/core/kqp/gateway/behaviour/streaming_query/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ SRCS(
55
)
66

77
PEERDIR(
8+
ydb/core/base
89
ydb/core/protos
910
yql/essentials/sql/v1
1011
)
Lines changed: 64 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,81 @@
11
#include "initializer.h"
22
#include "object.h"
33

4+
#include <ydb/library/table_creator/table_creator.h>
5+
46
namespace NKikimr::NKqp {
57

68
namespace {
79

8-
void AddColumn(Ydb::Table::CreateTableRequest& request, const TString& name, Ydb::Type::PrimitiveTypeId type, bool primary = false) {
9-
if (primary) {
10-
request.add_primary_key(name);
11-
}
10+
using namespace NMetadata::NInitializer;
1211

13-
auto& column = *request.add_columns();
14-
column.set_name(name);
15-
column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(type);
16-
}
12+
class TStreamingQueriesTablesCreator final : public NTableCreator::TMultiTableCreator {
13+
using TBase = NTableCreator::TMultiTableCreator;
1714

18-
} // anonymous namespace
15+
public:
16+
TStreamingQueriesTablesCreator(const TString& modificationId, IModifierExternalController::TPtr externalController)
17+
: TBase({GetStreamingQueriesCreator()})
18+
, ModificationId(modificationId)
19+
, ExternalController(externalController)
20+
{}
1921

20-
void TStreamingQueryInitializer::DoPrepare(NMetadata::NInitializer::IInitializerInput::TPtr controller) const {
21-
TVector<NMetadata::NInitializer::ITableModifier::TPtr> result;
22-
{
23-
Ydb::Table::CreateTableRequest request;
24-
request.set_path(TStreamingQueryConfig::GetBehaviour()->GetStorageTablePath());
25-
AddColumn(request, TStreamingQueryConfig::TColumns::DatabaseId, Ydb::Type::UTF8, true);
26-
AddColumn(request, TStreamingQueryConfig::TColumns::QueryPath, Ydb::Type::UTF8, true);
27-
AddColumn(request, TStreamingQueryConfig::TColumns::State, Ydb::Type::JSON);
28-
result.emplace_back(std::make_shared<NMetadata::NInitializer::TGenericTableModifier<NMetadata::NRequest::TDialogCreateTable>>(request, "create"));
22+
protected:
23+
static IActor* GetStreamingQueriesCreator() {
24+
NACLib::TDiffACL acl;
25+
acl.ClearAccess();
26+
acl.SetInterruptInheritance(AppData()->FeatureFlags.GetEnableSecureScriptExecutions());
27+
28+
return CreateTableCreator(
29+
SplitPath(TStreamingQueryConfig::GetTablesPath()),
30+
{
31+
Col(TStreamingQueryConfig::TColumns::DatabaseId, NScheme::NTypeIds::Utf8),
32+
Col(TStreamingQueryConfig::TColumns::QueryPath, NScheme::NTypeIds::Utf8),
33+
Col(TStreamingQueryConfig::TColumns::State, NScheme::NTypeIds::Json),
34+
},
35+
{ TStreamingQueryConfig::TColumns::DatabaseId, TStreamingQueryConfig::TColumns::QueryPath },
36+
NKikimrServices::KQP_PROXY,
37+
{},
38+
{},
39+
/* isSystemUser */ true,
40+
Nothing(),
41+
acl
42+
);
2943
}
3044

31-
if (AppData()->FeatureFlags.GetEnableSecureScriptExecutions()) {
32-
result.emplace_back(NMetadata::NInitializer::TACLModifierConstructor::GetNoAccessModifier(TStreamingQueryConfig::GetBehaviour()->GetStorageTablePath(), "acl"));
33-
} else {
34-
result.emplace_back(NMetadata::NInitializer::TACLModifierConstructor::GetReadOnlyModifier(TStreamingQueryConfig::GetBehaviour()->GetStorageTablePath(), "acl"));
45+
void OnTablesCreated(bool success, NYql::TIssues issues) final {
46+
if (success) {
47+
ExternalController->OnModificationFinished(ModificationId);
48+
} else {
49+
ExternalController->OnModificationFailed(Ydb::StatusIds::INTERNAL_ERROR, issues.ToString(), ModificationId);
50+
}
3551
}
3652

37-
controller->OnPreparationFinished(result);
53+
private:
54+
const TString ModificationId;
55+
const IModifierExternalController::TPtr ExternalController;
56+
};
57+
58+
class TStreamingQueriesTablesInitializer final : public ITableModifier {
59+
static constexpr char MODIFICATION_ID[] = "create-generic";
60+
61+
public:
62+
TStreamingQueriesTablesInitializer()
63+
: ITableModifier(MODIFICATION_ID, /* supportDbCache */ false)
64+
{}
65+
66+
protected:
67+
bool DoExecute(IModifierExternalController::TPtr externalController, const NMetadata::NRequest::TConfig& config) const final {
68+
Y_UNUSED(config);
69+
70+
TActivationContext::Register(new TStreamingQueriesTablesCreator(MODIFICATION_ID, externalController));
71+
return true;
72+
}
73+
};
74+
75+
} // anonymous namespace
76+
77+
void TStreamingQueryInitializer::DoPrepare(IInitializerInput::TPtr controller) const {
78+
controller->OnPreparationFinished({std::make_shared<TStreamingQueriesTablesInitializer>()});
3879
}
3980

4081
} // namespace NKikimr::NKqp

ydb/core/kqp/gateway/behaviour/streaming_query/queries.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include <ydb/core/kqp/common/simple/services.h>
1414
#include <ydb/core/kqp/gateway/utils/scheme_helpers.h>
1515
#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
16-
#include <ydb/core/kqp/proxy_service/kqp_script_executions.h>
1716
#include <ydb/core/protos/schemeshard/operations.pb.h>
1817
#include <ydb/core/resource_pools/resource_pool_settings.h>
1918
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
@@ -1704,8 +1703,8 @@ class TStartStreamingQueryTableActor final : public TActionActorBase<TStartStrea
17041703
if (!State.GetPreviousExecutionIds().empty() && !StateLoaded) {
17051704
StateLoaded = true;
17061705
const auto& executionId = *State.GetPreviousExecutionIds().rbegin();
1707-
const auto& fetcherId = Register(CreateGetScriptExecutionPhysicalGraphActor(SelfId(), Context.GetDatabase(), executionId));
1708-
LOG_D("Load previous query state from execution: " << executionId << " with fetcher " << fetcherId);
1706+
SendToKqpProxy(std::make_unique<TEvGetScriptExecutionPhysicalGraph>(Context.GetDatabase(), executionId));
1707+
LOG_D("Load previous query state from execution: " << executionId);
17091708
return;
17101709
}
17111710

ydb/core/kqp/gateway/behaviour/streaming_query/ya.make

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ PEERDIR(
2020
ydb/core/kqp/gateway/behaviour/streaming_query/common
2121
ydb/core/kqp/gateway/utils
2222
ydb/core/kqp/provider
23-
ydb/core/kqp/proxy_service
2423
ydb/core/protos
2524
ydb/core/protos/schemeshard
2625
ydb/core/resource_pools
@@ -29,6 +28,7 @@ PEERDIR(
2928
ydb/core/tx/tx_proxy
3029
ydb/library/conclusion
3130
ydb/library/query_actor
31+
ydb/library/table_creator
3232
ydb/services/metadata
3333
ydb/services/metadata/abstract
3434
ydb/services/metadata/manager

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,51 +2,52 @@
22
#include "kqp_proxy_service_impl.h"
33
#include "kqp_script_executions.h"
44

5+
#include <ydb/core/actorlib_impl/long_timer.h>
56
#include <ydb/core/base/appdata.h>
6-
#include <ydb/core/base/path.h>
7-
#include <ydb/core/base/location.h>
87
#include <ydb/core/base/feature_flags.h>
8+
#include <ydb/core/base/location.h>
9+
#include <ydb/core/base/path.h>
910
#include <ydb/core/base/statestorage.h>
1011
#include <ydb/core/cms/console/configs_dispatcher.h>
1112
#include <ydb/core/cms/console/console.h>
12-
#include <ydb/core/kqp/counters/kqp_counters.h>
13+
#include <ydb/core/fq/libs/checkpoint_storage/storage_service.h>
14+
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
15+
#include <ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h>
1316
#include <ydb/core/kqp/common/events/script_executions.h>
1417
#include <ydb/core/kqp/common/events/workload_service.h>
1518
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
1619
#include <ydb/core/kqp/common/kqp_timeouts.h>
1720
#include <ydb/core/kqp/compile_service/kqp_compile_service.h>
21+
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
22+
#include <ydb/core/kqp/counters/kqp_counters.h>
1823
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
19-
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
24+
#include <ydb/core/kqp/gateway/behaviour/streaming_query/behaviour.h>
2025
#include <ydb/core/kqp/node_service/kqp_node_service.h>
26+
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
2127
#include <ydb/core/kqp/workload_service/kqp_workload_service.h>
22-
#include <ydb/core/resource_pools/resource_pool_settings.h>
23-
#include <ydb/core/tx/schemeshard/schemeshard.h>
24-
#include <ydb/library/yql/dq/actors/compute/dq_checkpoints.h>
25-
#include <ydb/library/yql/dq/actors/spilling/spilling_file.h>
26-
#include <ydb/library/yql/dq/actors/spilling/spilling.h>
27-
#include <ydb/core/actorlib_impl/long_timer.h>
28-
#include <ydb/public/sdk/cpp/src/library/operation_id/protos/operation_id.pb.h>
29-
#include <ydb/core/node_whiteboard/node_whiteboard.h>
30-
#include <ydb/core/ydb_convert/ydb_convert.h>
31-
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
3228
#include <ydb/core/mon/mon.h>
33-
#include <ydb/library/ydb_issue/issue_helpers.h>
29+
#include <ydb/core/node_whiteboard/node_whiteboard.h>
3430
#include <ydb/core/protos/workload_manager_config.pb.h>
31+
#include <ydb/core/resource_pools/resource_pool_settings.h>
3532
#include <ydb/core/sys_view/common/registry.h>
36-
#include <ydb/core/fq/libs/checkpoint_storage/storage_service.h>
37-
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
38-
#include <ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h>
39-
40-
#include <ydb/library/yql/utils/actor_log/log.h>
41-
#include <yql/essentials/core/services/mounts/yql_mounts.h>
42-
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
43-
33+
#include <ydb/core/tx/schemeshard/schemeshard.h>
34+
#include <ydb/core/ydb_convert/ydb_convert.h>
4435
#include <ydb/library/actors/core/actor_bootstrapped.h>
4536
#include <ydb/library/actors/core/interconnect.h>
4637
#include <ydb/library/actors/core/hfunc.h>
4738
#include <ydb/library/actors/core/log.h>
4839
#include <ydb/library/actors/http/http.h>
4940
#include <ydb/library/actors/interconnect/interconnect.h>
41+
#include <ydb/library/ydb_issue/issue_helpers.h>
42+
#include <ydb/library/yql/dq/actors/compute/dq_checkpoints.h>
43+
#include <ydb/library/yql/dq/actors/spilling/spilling_file.h>
44+
#include <ydb/library/yql/dq/actors/spilling/spilling.h>
45+
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
46+
#include <ydb/library/yql/utils/actor_log/log.h>
47+
#include <ydb/public/sdk/cpp/src/library/operation_id/protos/operation_id.pb.h>
48+
49+
#include <yql/essentials/core/services/mounts/yql_mounts.h>
50+
5051
#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
5152
#include <library/cpp/monlib/service/pages/templates.h>
5253
#include <library/cpp/resource/resource.h>
@@ -174,6 +175,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
174175
GetScriptExecutionOperation,
175176
ListScriptExecutionOperations,
176177
CancelScriptExecutionOperation,
178+
GetScriptExecutionPhysicalGraph,
177179
};
178180

179181
public:
@@ -447,7 +449,13 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
447449
LogConfig.Swap(event.MutableConfig()->MutableLogConfig());
448450
UpdateYqlLogLevels();
449451

450-
FeatureFlags.Swap(event.MutableConfig()->MutableFeatureFlags());
452+
auto* newFeatureFlags = event.MutableConfig()->MutableFeatureFlags();
453+
if (newFeatureFlags->GetEnableSecureScriptExecutions() != FeatureFlags.GetEnableSecureScriptExecutions()) {
454+
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted;
455+
Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvResetManagerRegistration(TStreamingQueryBehaviour::GetInstance()));
456+
}
457+
458+
FeatureFlags.Swap(newFeatureFlags);
451459
WorkloadManagerConfig.Swap(event.MutableConfig()->MutableWorkloadManagerConfig());
452460
ResourcePoolsCache.UpdateConfig(FeatureFlags, WorkloadManagerConfig, ActorContext());
453461

@@ -1247,6 +1255,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
12471255
hFunc(NKqp::TEvGetScriptExecutionOperation, Handle);
12481256
hFunc(NKqp::TEvListScriptExecutionOperations, Handle);
12491257
hFunc(NKqp::TEvCancelScriptExecutionOperation, Handle);
1258+
hFunc(NKqp::TEvGetScriptExecutionPhysicalGraph, Handle);
12501259
hFunc(TEvInterconnect::TEvNodeConnected, Handle);
12511260
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
12521261
hFunc(TEvKqp::TEvListSessionsRequest, Handle);
@@ -1549,6 +1558,10 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
15491558
case EDelayedRequestType::CancelScriptExecutionOperation:
15501559
HandleDelayedScriptRequestError<TEvCancelScriptExecutionOperationResponse>(std::move(requestEvent), status, std::move(issues));
15511560
break;
1561+
1562+
case EDelayedRequestType::GetScriptExecutionPhysicalGraph:
1563+
HandleDelayedScriptRequestError<TEvGetScriptPhysicalGraphResponse>(std::move(requestEvent), status, std::move(issues));
1564+
break;
15521565
}
15531566
}
15541567

@@ -1557,9 +1570,14 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
15571570
Send(requestEvent->Sender, new TResponse(status, std::move(issues)), 0, requestEvent->Cookie);
15581571
}
15591572

1573+
void StartScriptExecutionsTablesCreation() {
1574+
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending;
1575+
Register(CreateScriptExecutionsTablesCreator(FeatureFlags), TMailboxType::HTSwap, AppData()->SystemPoolId);
1576+
}
1577+
15601578
template<typename TEvent>
15611579
bool CheckScriptExecutionsTablesReady(TEvent& ev, EDelayedRequestType requestType) {
1562-
if (!AppData()->FeatureFlags.GetEnableScriptExecutionOperations()) {
1580+
if (!FeatureFlags.GetEnableScriptExecutionOperations()) {
15631581
NYql::TIssues issues;
15641582
issues.AddIssue("ExecuteScript feature is not enabled");
15651583
HandleDelayedRequestError(requestType, std::move(ev), Ydb::StatusIds::UNSUPPORTED, std::move(issues));
@@ -1574,8 +1592,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
15741592

15751593
switch (ScriptExecutionsCreationStatus) {
15761594
case EScriptExecutionsCreationStatus::NotStarted:
1577-
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending;
1578-
Register(CreateScriptExecutionsTablesCreator(FeatureFlags), TMailboxType::HTSwap, AppData()->SystemPoolId);
1595+
StartScriptExecutionsTablesCreation();
15791596
[[fallthrough]];
15801597
case EScriptExecutionsCreationStatus::Pending:
15811598
if (DelayedEventsQueue.size() < 10000) {
@@ -1595,6 +1612,11 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
15951612
}
15961613

15971614
void Handle(TEvScriptExecutionsTablesCreationFinished::TPtr& ev) {
1615+
if (ScriptExecutionsCreationStatus == EScriptExecutionsCreationStatus::NotStarted) {
1616+
StartScriptExecutionsTablesCreation();
1617+
return;
1618+
}
1619+
15981620
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Finished;
15991621

16001622
NYql::TIssue rootIssue;
@@ -1641,6 +1663,12 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
16411663
}
16421664
}
16431665

1666+
void Handle(TEvGetScriptExecutionPhysicalGraph::TPtr& ev) {
1667+
if (CheckScriptExecutionsTablesReady(ev, EDelayedRequestType::GetScriptExecutionPhysicalGraph)) {
1668+
Register(CreateGetScriptExecutionPhysicalGraphActor(ev->Sender, ev->Get()->Database, ev->Get()->ExecutionId));
1669+
}
1670+
}
1671+
16441672
void Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
16451673
TNodeId nodeId = ev->Get()->NodeId;
16461674
auto sessions = LocalSessions->FindSessions(nodeId);
@@ -1867,7 +1895,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
18671895
}
18681896
};
18691897

1870-
} // namespace
1898+
} // anonymous namespace
18711899

18721900
IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
18731901
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,

0 commit comments

Comments
 (0)