Skip to content

Commit ad5d846

Browse files
committed
YQ-4911 disabled transparent sys columns in PQ source (#29332)
1 parent 660a82e commit ad5d846

File tree

10 files changed

+229
-115
lines changed

10 files changed

+229
-115
lines changed

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1885,6 +1885,7 @@ class TKqpHost : public IKqpHost {
18851885
TString sessionId = CreateGuidAsString();
18861886
auto state = MakeIntrusive<TPqState>(sessionId);
18871887
state->SupportRtmrMode = false;
1888+
state->AllowTransparentSystemColumns = false;
18881889
state->Types = TypesCtx.Get();
18891890
state->DbResolver = FederatedQuerySetup->DatabaseAsyncResolver;
18901891
state->FunctionRegistry = FuncRegistry;

ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3365,6 +3365,62 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
33653365
CheckScriptExecutionsCount(0, 0);
33663366
}
33673367
}
3368+
3369+
Y_UNIT_TEST_F(WritingInLocalYdbTablesWithProjection, TStreamingTestFixture) {
3370+
constexpr char pqSourceName[] = "pqSource";
3371+
CreatePqSource(pqSourceName);
3372+
3373+
for (const bool rowTables : {true, false}) {
3374+
const auto inputTopicName = TStringBuilder() << "writingInLocalYdbWithLimitInputTopicName" << rowTables;
3375+
CreateTopic(inputTopicName);
3376+
3377+
const auto ydbTable = TStringBuilder() << "tableSink" << rowTables;
3378+
ExecQuery(fmt::format(R"(
3379+
CREATE TABLE `{table}` (
3380+
Key String NOT NULL,
3381+
Value String NOT NULL,
3382+
PRIMARY KEY (Key)
3383+
) {settings})",
3384+
"table"_a = ydbTable,
3385+
"settings"_a = rowTables ? "" : "WITH (STORE = COLUMN)"
3386+
));
3387+
3388+
const auto queryName = TStringBuilder() << "streamingQuery" << rowTables;
3389+
ExecQuery(fmt::format(R"(
3390+
CREATE STREAMING QUERY `{query_name}` AS
3391+
DO BEGIN
3392+
UPSERT INTO `{ydb_table}`
3393+
SELECT (Key || "x") AS Key, Value FROM `{pq_source}`.`{input_topic}` WITH (
3394+
FORMAT = json_each_row,
3395+
SCHEMA (
3396+
Key String NOT NULL,
3397+
Value String NOT NULL
3398+
)
3399+
) LIMIT 1
3400+
END DO;)",
3401+
"query_name"_a = queryName,
3402+
"pq_source"_a = pqSourceName,
3403+
"input_topic"_a = inputTopicName,
3404+
"ydb_table"_a = ydbTable
3405+
));
3406+
3407+
CheckScriptExecutionsCount(1, 1);
3408+
Sleep(TDuration::Seconds(1));
3409+
3410+
WriteTopicMessage(inputTopicName, R"({"Key": "message1", "Value": "value1"})");
3411+
Sleep(TDuration::Seconds(1));
3412+
CheckTable(*this, ydbTable, {{"message1x", "value1"}});
3413+
3414+
Sleep(TDuration::Seconds(1));
3415+
CheckScriptExecutionsCount(1, 0);
3416+
3417+
ExecQuery(fmt::format(
3418+
"DROP STREAMING QUERY `{query_name}`",
3419+
"query_name"_a = queryName
3420+
));
3421+
CheckScriptExecutionsCount(0, 0);
3422+
}
3423+
}
33683424
}
33693425

33703426
Y_UNIT_TEST_SUITE(KqpStreamingQueriesSysView) {

ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp

Lines changed: 68 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -3,79 +3,84 @@
33
#include <yql/essentials/minikql/mkql_string_util.h>
44
#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h>
55

6+
namespace NYql::NDq {
7+
68
namespace {
7-
const std::unordered_map<TString, NYql::NDq::TPqMetaExtractor::TPqMetaExtractorLambda> ExtractorsMap = {
8-
{
9-
"_yql_sys_create_time", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
10-
using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>;
11-
return std::make_pair(
12-
NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetCreateTime().MicroSeconds())),
13-
NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
14-
);
15-
}
16-
},
17-
{
18-
"_yql_sys_tsp_write_time", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
19-
using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>;
20-
return std::make_pair(
21-
NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetWriteTime().MicroSeconds())),
22-
NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
23-
);
24-
}
25-
},
26-
{
27-
"_yql_sys_partition_id", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
28-
using TDataType = NYql::NUdf::TDataType<ui64>;
29-
return std::make_pair(
30-
NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetPartitionSession()->GetPartitionId())),
31-
NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
32-
);
33-
}
34-
},
35-
{
36-
"_yql_sys_offset", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
37-
using TDataType = NYql::NUdf::TDataType<ui64>;
38-
return std::make_pair(
39-
NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetOffset())),
40-
NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize);
41-
}
42-
},
43-
{
44-
"_yql_sys_message_group_id", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
45-
const auto& data = message.GetMessageGroupId();
46-
return std::make_pair(
47-
NKikimr::NMiniKQL::MakeString(NYql::NUdf::TStringRef(data.data(), data.size())),
48-
data.size()
49-
);
50-
}
51-
},
52-
{
53-
"_yql_sys_seq_no", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
54-
using TDataType = NYql::NUdf::TDataType<ui64>;
55-
return std::make_pair(
56-
NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetSeqNo())),
57-
NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
58-
);
59-
}
60-
},
61-
};
62-
}
639

64-
namespace NYql::NDq {
10+
const std::unordered_map<TString, TPqMetaExtractor::TPqMetaExtractorLambda> ExtractorsMap = {
11+
{
12+
"create_time", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
13+
using TDataType = NUdf::TDataType<NUdf::TTimestamp>;
14+
return std::make_pair(
15+
NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetCreateTime().MicroSeconds())),
16+
NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
17+
);
18+
}
19+
},
20+
{
21+
"write_time", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
22+
using TDataType = NUdf::TDataType<NUdf::TTimestamp>;
23+
return std::make_pair(
24+
NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetWriteTime().MicroSeconds())),
25+
NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
26+
);
27+
}
28+
},
29+
{
30+
"partition_id", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
31+
using TDataType = NUdf::TDataType<ui64>;
32+
return std::make_pair(
33+
NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetPartitionSession()->GetPartitionId())),
34+
NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
35+
);
36+
}
37+
},
38+
{
39+
"offset", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
40+
using TDataType = NUdf::TDataType<ui64>;
41+
return std::make_pair(
42+
NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetOffset())),
43+
NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize);
44+
}
45+
},
46+
{
47+
"message_group_id", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
48+
const auto& data = message.GetMessageGroupId();
49+
return std::make_pair(
50+
NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.data(), data.size())),
51+
data.size()
52+
);
53+
}
54+
},
55+
{
56+
"seq_no", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
57+
using TDataType = NUdf::TDataType<ui64>;
58+
return std::make_pair(
59+
NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetSeqNo())),
60+
NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
61+
);
62+
}
63+
},
64+
};
65+
66+
} // anonymous namespace
6567

6668
TPqMetaExtractor::TPqMetaExtractor() {
67-
for (const auto& key : AllowedPqMetaSysColumns()) {
68-
Y_ENSURE(
69-
ExtractorsMap.contains(key),
70-
"Pq metadata field " << key << " hasn't valid runtime extractor. You should add it.");
69+
for (const auto& sysColumn : AllowedPqMetaSysColumns(true)) {
70+
const auto key = SkipPqSystemPrefix(sysColumn);
71+
Y_ENSURE(key, sysColumn);
72+
Y_ENSURE(ExtractorsMap.contains(*key), "Pq metadata field " << *key << " hasn't valid runtime extractor. You should add it.");
7173
}
7274
}
7375

7476
TPqMetaExtractor::TPqMetaExtractorLambda TPqMetaExtractor::FindExtractorLambda(const TString& sysColumn) const {
75-
auto iter = ExtractorsMap.find(sysColumn);
77+
const auto key = SkipPqSystemPrefix(sysColumn);
78+
Y_ENSURE(key, sysColumn);
79+
80+
const auto iter = ExtractorsMap.find(*key);
7681
Y_ENSURE(iter != ExtractorsMap.end(), sysColumn);
7782

7883
return iter->second;
7984
}
8085

81-
}
86+
} // namespace NYql::NDq

ydb/library/yql/providers/pq/common/pq_meta_fields.cpp

Lines changed: 83 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,52 +3,105 @@
33

44
#include <unordered_map>
55

6+
namespace NYql {
7+
68
namespace {
7-
const std::vector<NYql::TMetaFieldDescriptor> PqMetaFields = {
8-
NYql::TMetaFieldDescriptor("create_time", "_yql_sys_create_time", NYql::NUdf::EDataSlot::Timestamp),
9-
NYql::TMetaFieldDescriptor("write_time", "_yql_sys_tsp_write_time", NYql::NUdf::EDataSlot::Timestamp),
10-
NYql::TMetaFieldDescriptor("partition_id", "_yql_sys_partition_id", NYql::NUdf::EDataSlot::Uint64),
11-
NYql::TMetaFieldDescriptor("offset", "_yql_sys_offset", NYql::NUdf::EDataSlot::Uint64),
12-
NYql::TMetaFieldDescriptor("message_group_id", "_yql_sys_message_group_id", NYql::NUdf::EDataSlot::String),
13-
NYql::TMetaFieldDescriptor("seq_no", "_yql_sys_seq_no", NYql::NUdf::EDataSlot::Uint64),
14-
};
15-
}
169

17-
namespace NYql {
10+
class TPqMetadataField {
11+
public:
12+
static constexpr char SYS_PREFIX[] = "_yql_sys_";
13+
static constexpr char TRANSPARENT_PREFIX[] = "tsp_";
14+
15+
explicit TPqMetadataField(NUdf::EDataSlot type, bool transparent = false)
16+
: Type(type)
17+
, Transparent(transparent)
18+
{}
19+
20+
TString GetSysColumn(const TString& key, bool allowTransparentColumns) const {
21+
auto systemPrefix = TStringBuilder() << SYS_PREFIX;
22+
if (Transparent && allowTransparentColumns) {
23+
systemPrefix << TRANSPARENT_PREFIX;
24+
}
25+
26+
return systemPrefix << key;
27+
}
28+
29+
TMetaFieldDescriptor GetDescriptor(const TString& key, bool allowTransparentColumns) const {
30+
return {
31+
.Key = key,
32+
.SysColumn = GetSysColumn(key, allowTransparentColumns),
33+
.Type = Type,
34+
};
35+
}
36+
37+
public:
38+
const NUdf::EDataSlot Type;
39+
const bool Transparent;
40+
};
41+
42+
const std::unordered_map<TString, TPqMetadataField> PqMetaFields = {
43+
{"create_time", TPqMetadataField(NUdf::EDataSlot::Timestamp)},
44+
{"write_time", TPqMetadataField(NUdf::EDataSlot::Timestamp, /* transparent */ true)},
45+
{"partition_id", TPqMetadataField(NUdf::EDataSlot::Uint64)},
46+
{"offset", TPqMetadataField(NUdf::EDataSlot::Uint64)},
47+
{"message_group_id", TPqMetadataField(NUdf::EDataSlot::String)},
48+
{"seq_no", TPqMetadataField(NUdf::EDataSlot::Uint64)},
49+
};
1850

19-
const TMetaFieldDescriptor* FindPqMetaFieldDescriptorByKey(const TString& key) {
20-
const auto iter = std::find_if(
21-
PqMetaFields.begin(),
22-
PqMetaFields.end(),
23-
[&](const NYql::TMetaFieldDescriptor& item){ return item.Key == key; });
24-
if (iter != PqMetaFields.end()) {
25-
return iter;
51+
} // anonymous namespace
52+
53+
std::optional<TString> SkipPqSystemPrefix(const TString& sysColumn, bool* isTransparent) {
54+
TStringBuf keyBuf(sysColumn);
55+
if (!keyBuf.SkipPrefix(TPqMetadataField::SYS_PREFIX)) {
56+
return std::nullopt;
57+
}
58+
59+
const bool transparent = keyBuf.SkipPrefix(TPqMetadataField::TRANSPARENT_PREFIX);
60+
if (isTransparent) {
61+
*isTransparent = transparent;
62+
}
63+
64+
return TString(keyBuf);
65+
}
66+
67+
std::optional<TMetaFieldDescriptor> FindPqMetaFieldDescriptorByKey(const TString& key, bool allowTransparentColumns) {
68+
const auto it = PqMetaFields.find(key);
69+
if (it == PqMetaFields.end()) {
70+
return std::nullopt;
2671
}
2772

28-
return nullptr;
73+
return it->second.GetDescriptor(key, allowTransparentColumns);
2974
}
3075

31-
const TMetaFieldDescriptor* FindPqMetaFieldDescriptorBySysColumn(const TString& sysColumn) {
32-
const auto iter = std::find_if(
33-
PqMetaFields.begin(),
34-
PqMetaFields.end(),
35-
[&](const NYql::TMetaFieldDescriptor& item){ return item.SysColumn == sysColumn; });
36-
if (iter != PqMetaFields.end()) {
37-
return iter;
76+
std::optional<TMetaFieldDescriptor> FindPqMetaFieldDescriptorBySysColumn(const TString& sysColumn) {
77+
bool transparent = false;
78+
const auto key = SkipPqSystemPrefix(sysColumn, &transparent);
79+
if (!key) {
80+
return std::nullopt;
81+
}
82+
83+
const auto it = PqMetaFields.find(*key);
84+
if (it == PqMetaFields.end()) {
85+
return std::nullopt;
3886
}
3987

40-
return nullptr;
88+
const auto& metadata = it->second;
89+
if (transparent && !metadata.Transparent) {
90+
return std::nullopt;
91+
}
92+
93+
return metadata.GetDescriptor(*key, transparent);
4194
}
4295

43-
std::vector<TString> AllowedPqMetaSysColumns() {
96+
std::vector<TString> AllowedPqMetaSysColumns(bool allowTransparentColumns) {
4497
std::vector<TString> res;
4598
res.reserve(PqMetaFields.size());
4699

47-
for (const auto& descriptor : PqMetaFields) {
48-
res.emplace_back(descriptor.SysColumn);
100+
for (const auto& [key, field] : PqMetaFields) {
101+
res.emplace_back(field.GetSysColumn(key, allowTransparentColumns));
49102
}
50103

51104
return res;
52105
}
53106

54-
}
107+
} // namespace NYql

ydb/library/yql/providers/pq/common/pq_meta_fields.h

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,17 @@
1212
namespace NYql {
1313

1414
struct TMetaFieldDescriptor {
15-
public:
16-
TMetaFieldDescriptor(TString key, TString sysColumn, NUdf::EDataSlot type)
17-
: Key(key)
18-
, SysColumn(sysColumn)
19-
, Type(type)
20-
{ }
21-
22-
public:
2315
const TString Key;
2416
const TString SysColumn;
2517
const NUdf::EDataSlot Type;
2618
};
2719

28-
const TMetaFieldDescriptor* FindPqMetaFieldDescriptorByKey(const TString& key);
20+
std::optional<TString> SkipPqSystemPrefix(const TString& sysColumn, bool* isTransparent = nullptr);
2921

30-
const TMetaFieldDescriptor* FindPqMetaFieldDescriptorBySysColumn(const TString& sysColumn);
22+
std::optional<TMetaFieldDescriptor> FindPqMetaFieldDescriptorByKey(const TString& key, bool allowTransparentColumns);
3123

32-
std::vector<TString> AllowedPqMetaSysColumns();
24+
std::optional<TMetaFieldDescriptor> FindPqMetaFieldDescriptorBySysColumn(const TString& sysColumn);
3325

34-
}
26+
std::vector<TString> AllowedPqMetaSysColumns(bool allowTransparentColumns);
27+
28+
} // namespace NYql

0 commit comments

Comments
 (0)