Skip to content

Commit 1cbc491

Browse files
kardymondsCopilot
andauthored
YQ-4893 Fix precompute #29150 / to stable (#29230)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent e21ecb0 commit 1cbc491

File tree

2 files changed

+90
-12
lines changed

2 files changed

+90
-12
lines changed

ydb/core/fq/libs/actors/run_actor.cpp

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ struct TEvaluationGraphInfo {
323323
NActors::TActorId ResultId;
324324
NThreading::TPromise<NYql::IDqGateway::TResult> Result;
325325
ui64 Index = 0;
326+
TMaybe<NCommon::TResultFormatSettings> ResultFormatSettings;
326327
};
327328

328329
class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
@@ -1279,7 +1280,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
12791280

12801281
LOG_D("Query evaluation " << NYql::NDqProto::StatusIds_StatusCode_Name(QueryEvalStatusCode)
12811282
<< ". " << it->second.Index << " response. Issues count: " << result.IssuesSize()
1282-
<< ". Rows count: " << result.GetRowsCount());
1283+
<< ". Rows count: " << result.GetRowsCount() << ", Sample count: " << result.SampleSize() << ", Truncated: " << result.GetTruncated());
12831284

12841285
TVector<NDq::TDqSerializedBatch> rows;
12851286
for (const auto& s : result.GetSample()) {
@@ -1288,11 +1289,12 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
12881289
rows.emplace_back(std::move(batch));
12891290
}
12901291

1291-
TProtoBuilder protoBuilder(ResultFormatSettings->ResultType, ResultFormatSettings->Columns);
1292+
const auto& resultFormatSettings = it->second.ResultFormatSettings;
1293+
TProtoBuilder protoBuilder(resultFormatSettings->ResultType, resultFormatSettings->Columns);
12921294

12931295
bool ysonTruncated = false;
1294-
queryResult.Data = protoBuilder.BuildYson(std::move(rows), ResultFormatSettings->SizeLimit.GetOrElse(Max<ui64>()),
1295-
ResultFormatSettings->RowsLimit.GetOrElse(Max<ui64>()), &ysonTruncated);
1296+
queryResult.Data = protoBuilder.BuildYson(std::move(rows), resultFormatSettings->SizeLimit.GetOrElse(Max<ui64>()),
1297+
resultFormatSettings->RowsLimit.GetOrElse(Max<ui64>()), &ysonTruncated);
12961298

12971299
queryResult.RowsCount = result.GetRowsCount();
12981300
queryResult.Truncated = result.GetTruncated() || ysonTruncated;
@@ -1546,7 +1548,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
15461548
*request.MutableSettings() = dqGraphParams.GetSettings();
15471549
*request.MutableSecureParams() = dqGraphParams.GetSecureParams();
15481550
*request.MutableColumns() = dqGraphParams.GetColumns();
1549-
PrepareResultFormatSettings(dqGraphParams, *dqConfiguration);
1551+
PrepareResultFormatSettings(info.ResultFormatSettings, dqGraphParams, *dqConfiguration);
15501552
NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram());
15511553
Send(info.ExecuterId, new NYql::NDqs::TEvGraphRequest(request, info.ControlId, info.ResultId));
15521554
LOG_D("Evaluation Executer: " << info.ExecuterId << ", Controller: " << info.ControlId << ", ResultActor: " << info.ResultId);
@@ -1585,7 +1587,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
15851587
ExecuterId, dqGraphParams.GetResultType(),
15861588
writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ResultBytesLimit));
15871589

1588-
PrepareResultFormatSettings(dqGraphParams, *dqConfiguration);
1590+
PrepareResultFormatSettings(ResultFormatSettings, dqGraphParams, *dqConfiguration);
15891591
} else {
15901592
LOG_D("ResultWriter was NOT CREATED since ResultType is empty");
15911593
resultId = ExecuterId;
@@ -1652,15 +1654,15 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
16521654
LOG_D("Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId);
16531655
}
16541656

1655-
void PrepareResultFormatSettings(NFq::NProto::TGraphParams& dqGraphParams, const TDqConfiguration& dqConfiguration) {
1656-
ResultFormatSettings.ConstructInPlace();
1657+
void PrepareResultFormatSettings(TMaybe<NCommon::TResultFormatSettings>& resultFormatSettings, NFq::NProto::TGraphParams& dqGraphParams, const TDqConfiguration& dqConfiguration) {
1658+
resultFormatSettings.ConstructInPlace();
16571659
for (const auto& c : dqGraphParams.GetColumns()) {
1658-
ResultFormatSettings->Columns.push_back(c);
1660+
resultFormatSettings->Columns.push_back(c);
16591661
}
16601662

1661-
ResultFormatSettings->ResultType = dqGraphParams.GetResultType();
1662-
ResultFormatSettings->SizeLimit = dqConfiguration._AllResultsBytesLimit.Get();
1663-
ResultFormatSettings->RowsLimit = dqConfiguration._RowsLimitPerWrite.Get();
1663+
resultFormatSettings->ResultType = dqGraphParams.GetResultType();
1664+
resultFormatSettings->SizeLimit = dqConfiguration._AllResultsBytesLimit.Get();
1665+
resultFormatSettings->RowsLimit = dqConfiguration._RowsLimitPerWrite.Get();
16641666
}
16651667

16661668
void ClearResultFormatSettings() {

ydb/tests/fq/s3/test_s3_0.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,3 +1167,79 @@ def test_double_optional_types_validation(self, kikimr, s3, client, unique_prefi
11671167
issues = str(client.describe_query(query_id).result.query.issue)
11681168

11691169
assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues
1170+
1171+
@yq_v1
1172+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
1173+
def test_precompute_with_different_result_types(self, kikimr, s3, client, unique_prefix):
1174+
resource = boto3.resource(
1175+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
1176+
)
1177+
1178+
bucket = resource.Bucket("fbucket")
1179+
bucket.create(ACL='public-read')
1180+
bucket.objects.all().delete()
1181+
1182+
s3_client = boto3.client(
1183+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
1184+
)
1185+
1186+
fruits = '''f1,f2,f3
1187+
Banana,3,100'''
1188+
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='file1.csv', ContentType='text/plain')
1189+
fruits = '''f3,f4
1190+
Banana,3'''
1191+
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='file2.csv', ContentType='text/plain')
1192+
1193+
kikimr.control_plane.wait_bootstrap(1)
1194+
storage_connection_name = unique_prefix + "fruitbucket"
1195+
client.create_storage_connection(storage_connection_name, "fbucket")
1196+
1197+
sql = f'''
1198+
$input1 =
1199+
SELECT AGGREGATE_LIST( AsStruct(f1 AS dns_mining_pool))
1200+
FROM `{storage_connection_name}`.`file1.csv`
1201+
WITH (format=csv_with_names, SCHEMA (
1202+
f1 String NOT NULL,
1203+
f2 Int NOT NULL,
1204+
f3 Int NOT NULL
1205+
));
1206+
1207+
$input2 =
1208+
SELECT AGGREGATE_LIST( AsStruct(f3 AS dns_f_query, f4 AS dns_query1wewqwer) )
1209+
FROM `{storage_connection_name}`.`file2.csv`
1210+
WITH (format=csv_with_names, SCHEMA (
1211+
f3 String NOT NULL,
1212+
f4 Int NOT NULL
1213+
));
1214+
1215+
$f1 = () -> {{
1216+
RETURN ListHead(ListMap(
1217+
$input1,
1218+
($r) -> (
1219+
AsStruct("1" AS dns_mining_pool)
1220+
)
1221+
))
1222+
}};
1223+
1224+
$f2 = () -> {{
1225+
RETURN ListHead(ListMap(
1226+
$input2,
1227+
($r) -> (
1228+
AsStruct("2" AS dns_f_query)
1229+
)
1230+
))
1231+
}};
1232+
1233+
$parsed = SELECT $f1() AS f1, $f2() AS f2;
1234+
1235+
$parsed =
1236+
SELECT
1237+
p.f1.dns_mining_pool AS f1, p.f2.dns_f_query AS f2
1238+
FROM $parsed AS p;
1239+
1240+
SELECT SOME("MinersPoolsViaDNS") AS event_class FROM $parsed
1241+
WHERE (f1 == f2 )
1242+
'''
1243+
1244+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
1245+
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

0 commit comments

Comments
 (0)