Skip to content

Commit 346abfc

Browse files
authored
[ES|QL] Modify AggregateMetricDoubleBlockLoader to load partial metrics (#137949)
This commit modifies the existing AggregateMetricDoubleBlockLoader to accept an EnumMap containing the submetrics that should be loaded (any of min, max, sum, value_count). It also moves the BlockLoader into its own file.
1 parent 6851803 commit 346abfc

File tree

7 files changed

+454
-137
lines changed

7 files changed

+454
-137
lines changed

server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,8 @@ interface BlockFactory {
512512

513513
AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count);
514514

515+
Block buildAggregateMetricDoubleDirect(Block minBlock, Block maxBlock, Block sumBlock, Block countBlock);
516+
515517
ExponentialHistogramBuilder exponentialHistogramBlockBuilder(int count);
516518

517519
Block buildExponentialHistogramBlockDirect(

test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,21 @@ public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int
449449
return new AggregateMetricDoubleBlockBuilder(expectedSize);
450450
}
451451

452+
@Override
453+
public BlockLoader.Block buildAggregateMetricDoubleDirect(
454+
BlockLoader.Block minBlock,
455+
BlockLoader.Block maxBlock,
456+
BlockLoader.Block sumBlock,
457+
BlockLoader.Block countBlock
458+
) {
459+
return AggregateMetricDoubleBlockBuilder.parseAggMetricsToBlock(
460+
(TestBlock) minBlock,
461+
(TestBlock) maxBlock,
462+
(TestBlock) sumBlock,
463+
(TestBlock) countBlock
464+
);
465+
}
466+
452467
@Override
453468
public BlockLoader.ExponentialHistogramBuilder exponentialHistogramBlockBuilder(int count) {
454469
return new ExponentialHistogramBlockBuilder(this, count);
@@ -644,6 +659,10 @@ public BlockLoader.Block build() {
644659
var sumBlock = sum.build();
645660
var countBlock = count.build();
646661

662+
return parseAggMetricsToBlock(minBlock, maxBlock, sumBlock, countBlock);
663+
}
664+
665+
public static TestBlock parseAggMetricsToBlock(TestBlock minBlock, TestBlock maxBlock, TestBlock sumBlock, TestBlock countBlock) {
647666
assert minBlock.size() == maxBlock.size();
648667
assert maxBlock.size() == sumBlock.size();
649668
assert sumBlock.size() == countBlock.size();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,20 @@ public AggregateMetricDoubleBlockBuilder newAggregateMetricDoubleBlockBuilder(in
438438
return new AggregateMetricDoubleBlockBuilder(estimatedSize, this);
439439
}
440440

441+
public final AggregateMetricDoubleBlock newAggregateMetricDoubleBlock(
442+
Block minBlock,
443+
Block maxBlock,
444+
Block sumBlock,
445+
Block countBlock
446+
) {
447+
return new AggregateMetricDoubleArrayBlock(
448+
(DoubleBlock) minBlock,
449+
(DoubleBlock) maxBlock,
450+
(DoubleBlock) sumBlock,
451+
(IntBlock) countBlock
452+
);
453+
}
454+
441455
public final AggregateMetricDoubleBlock newConstantAggregateMetricDoubleBlock(
442456
AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral value,
443457
int positions
@@ -469,6 +483,15 @@ public final AggregateMetricDoubleBlock newConstantAggregateMetricDoubleBlock(
469483
}
470484
}
471485

486+
public BlockLoader.Block newAggregateMetricDoubleBlockFromDocValues(
487+
DoubleBlock minBlock,
488+
DoubleBlock maxBlock,
489+
DoubleBlock sumBlock,
490+
IntBlock countBlock
491+
) {
492+
return new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, sumBlock, countBlock);
493+
}
494+
472495
public ExponentialHistogramBlockBuilder newExponentialHistogramBlockBuilder(int estimatedSize) {
473496
return new ExponentialHistogramBlockBuilder(estimatedSize, this);
474497
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.compute.data.BytesRefVector;
1818
import org.elasticsearch.compute.data.DoubleBlock;
1919
import org.elasticsearch.compute.data.ElementType;
20+
import org.elasticsearch.compute.data.IntBlock;
2021
import org.elasticsearch.compute.data.IntVector;
2122
import org.elasticsearch.compute.data.LongBlock;
2223
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
@@ -151,6 +152,21 @@ public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int
151152
return factory.newAggregateMetricDoubleBlockBuilder(count);
152153
}
153154

155+
@Override
156+
public BlockLoader.Block buildAggregateMetricDoubleDirect(
157+
BlockLoader.Block minBlock,
158+
BlockLoader.Block maxBlock,
159+
BlockLoader.Block sumBlock,
160+
BlockLoader.Block countBlock
161+
) {
162+
return factory.newAggregateMetricDoubleBlockFromDocValues(
163+
(DoubleBlock) minBlock,
164+
(DoubleBlock) maxBlock,
165+
(DoubleBlock) sumBlock,
166+
(IntBlock) countBlock
167+
);
168+
}
169+
154170
@Override
155171
public BlockLoader.ExponentialHistogramBuilder exponentialHistogramBlockBuilder(int count) {
156172
return factory.newExponentialHistogramBlockBuilder(count);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.aggregatemetric.mapper;
9+
10+
import org.apache.lucene.index.DocValues;
11+
import org.apache.lucene.index.LeafReader;
12+
import org.apache.lucene.index.LeafReaderContext;
13+
import org.apache.lucene.index.NumericDocValues;
14+
import org.apache.lucene.util.NumericUtils;
15+
import org.elasticsearch.core.Releasables;
16+
import org.elasticsearch.index.mapper.NumberFieldMapper;
17+
import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader;
18+
19+
import java.io.IOException;
20+
import java.util.EnumMap;
21+
22+
public class AggregateMetricDoubleBlockLoader extends BlockDocValuesReader.DocValuesBlockLoader {
23+
final NumberFieldMapper.NumberFieldType minFieldType;
24+
final NumberFieldMapper.NumberFieldType maxFieldType;
25+
final NumberFieldMapper.NumberFieldType sumFieldType;
26+
final NumberFieldMapper.NumberFieldType countFieldType;
27+
28+
AggregateMetricDoubleBlockLoader(EnumMap<AggregateMetricDoubleFieldMapper.Metric, NumberFieldMapper.NumberFieldType> metricsRequested) {
29+
minFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.min, null);
30+
maxFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.max, null);
31+
sumFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.sum, null);
32+
countFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.value_count, null);
33+
}
34+
35+
private static NumericDocValues getNumericDocValues(NumberFieldMapper.NumberFieldType field, LeafReader leafReader) throws IOException {
36+
if (field == null) {
37+
return null;
38+
}
39+
String fieldName = field.name();
40+
var values = leafReader.getNumericDocValues(fieldName);
41+
if (values != null) {
42+
return values;
43+
}
44+
45+
var sortedValues = leafReader.getSortedNumericDocValues(fieldName);
46+
return DocValues.unwrapSingleton(sortedValues);
47+
}
48+
49+
@Override
50+
public AllReader reader(LeafReaderContext context) throws IOException {
51+
NumericDocValues minValues = getNumericDocValues(minFieldType, context.reader());
52+
NumericDocValues maxValues = getNumericDocValues(maxFieldType, context.reader());
53+
NumericDocValues sumValues = getNumericDocValues(sumFieldType, context.reader());
54+
NumericDocValues valueCountValues = getNumericDocValues(countFieldType, context.reader());
55+
56+
return new BlockDocValuesReader() {
57+
58+
private int docID = -1;
59+
60+
@Override
61+
protected int docId() {
62+
return docID;
63+
}
64+
65+
@Override
66+
public String toString() {
67+
return "BlockDocValuesReader.AggregateMetricDouble";
68+
}
69+
70+
@Override
71+
public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException {
72+
boolean success = false;
73+
Block minBlock = null;
74+
Block maxBlock = null;
75+
Block sumBlock = null;
76+
Block countBlock = null;
77+
try {
78+
minBlock = readDoubleSubblock(factory, docs, offset, minValues);
79+
maxBlock = readDoubleSubblock(factory, docs, offset, maxValues);
80+
sumBlock = readDoubleSubblock(factory, docs, offset, sumValues);
81+
countBlock = readIntSubblock(factory, docs, offset, valueCountValues);
82+
Block block = factory.buildAggregateMetricDoubleDirect(minBlock, maxBlock, sumBlock, countBlock);
83+
success = true;
84+
return block;
85+
} finally {
86+
if (success == false) {
87+
Releasables.closeExpectNoException(minBlock, maxBlock, sumBlock, countBlock);
88+
}
89+
}
90+
}
91+
92+
private Block readDoubleSubblock(BlockFactory factory, Docs docs, int offset, NumericDocValues values) throws IOException {
93+
int count = docs.count() - offset;
94+
if (values == null) {
95+
return factory.constantNulls(count);
96+
}
97+
try (DoubleBuilder builder = factory.doubles(count)) {
98+
copyDoubleValuesToBuilder(docs, offset, builder, values);
99+
return builder.build();
100+
}
101+
}
102+
103+
private Block readIntSubblock(BlockFactory factory, Docs docs, int offset, NumericDocValues values) throws IOException {
104+
int count = docs.count() - offset;
105+
if (values == null) {
106+
return factory.constantNulls(count);
107+
}
108+
try (IntBuilder builder = factory.ints(count)) {
109+
copyIntValuesToBuilder(docs, offset, builder, values);
110+
return builder.build();
111+
}
112+
}
113+
114+
private void copyDoubleValuesToBuilder(Docs docs, int offset, DoubleBuilder builder, NumericDocValues values)
115+
throws IOException {
116+
int lastDoc = -1;
117+
for (int i = offset; i < docs.count(); i++) {
118+
int doc = docs.get(i);
119+
if (doc < lastDoc) {
120+
throw new IllegalStateException("docs within same block must be in order");
121+
}
122+
if (values == null || values.advanceExact(doc) == false) {
123+
builder.appendNull();
124+
} else {
125+
double value = NumericUtils.sortableLongToDouble(values.longValue());
126+
lastDoc = doc;
127+
this.docID = doc;
128+
builder.appendDouble(value);
129+
}
130+
}
131+
}
132+
133+
private void copyIntValuesToBuilder(Docs docs, int offset, IntBuilder builder, NumericDocValues values) throws IOException {
134+
int lastDoc = -1;
135+
for (int i = offset; i < docs.count(); i++) {
136+
int doc = docs.get(i);
137+
if (doc < lastDoc) {
138+
throw new IllegalStateException("docs within same block must be in order");
139+
}
140+
if (values == null || values.advanceExact(doc) == false) {
141+
builder.appendNull();
142+
} else {
143+
int value = Math.toIntExact(values.longValue());
144+
lastDoc = doc;
145+
this.docID = doc;
146+
builder.appendInt(value);
147+
}
148+
}
149+
}
150+
151+
@Override
152+
public void read(int docId, StoredFields storedFields, Builder builder) throws IOException {
153+
var blockBuilder = (AggregateMetricDoubleBuilder) builder;
154+
this.docID = docId;
155+
readSingleRow(docId, blockBuilder);
156+
}
157+
158+
private void readSingleRow(int docId, AggregateMetricDoubleBuilder builder) throws IOException {
159+
if (minValues != null && minValues.advanceExact(docId)) {
160+
builder.min().appendDouble(NumericUtils.sortableLongToDouble(minValues.longValue()));
161+
} else {
162+
builder.min().appendNull();
163+
}
164+
if (maxValues != null && maxValues.advanceExact(docId)) {
165+
builder.max().appendDouble(NumericUtils.sortableLongToDouble(maxValues.longValue()));
166+
} else {
167+
builder.max().appendNull();
168+
}
169+
if (sumValues != null && sumValues.advanceExact(docId)) {
170+
builder.sum().appendDouble(NumericUtils.sortableLongToDouble(sumValues.longValue()));
171+
} else {
172+
builder.sum().appendNull();
173+
}
174+
if (valueCountValues != null && valueCountValues.advanceExact(docId)) {
175+
builder.count().appendInt(Math.toIntExact(valueCountValues.longValue()));
176+
} else {
177+
builder.count().appendNull();
178+
}
179+
}
180+
};
181+
}
182+
183+
@Override
184+
public Builder builder(BlockFactory factory, int expectedCount) {
185+
return factory.aggregateMetricDoubleBuilder(expectedCount);
186+
}
187+
}

0 commit comments

Comments
 (0)