Skip to content

Commit 44a1f42

Browse files
authored
Add telemetry for downsampling method (#138187)
1 parent c8cc2e0 commit 44a1f42

File tree

6 files changed

+152
-24
lines changed

6 files changed

+152
-24
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9223000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
jina_ai_configurable_late_chunking,9222000
1+
add_downsample_method_telemetry,9223000

x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java

Lines changed: 76 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.core.action;
99

10+
import org.elasticsearch.action.downsample.DownsampleConfig;
1011
import org.elasticsearch.action.support.PlainActionFuture;
1112
import org.elasticsearch.cluster.ClusterState;
1213
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -121,6 +122,9 @@ public void testAction() throws Exception {
121122
var dlmRoundsSum = new AtomicInteger(0);
122123
var dlmRoundsMin = new AtomicInteger(Integer.MAX_VALUE);
123124
var dlmRoundsMax = new AtomicInteger(Integer.MIN_VALUE);
125+
var dlmAggregateSamplingMethodCount = new AtomicInteger(0);
126+
var dlmLastValueSamplingMethodCount = new AtomicInteger(0);
127+
var dlmUndefinedSamplingMethodCount = new AtomicInteger(0);
124128

125129
// ... with ILM
126130
var ilmDownsampledDataStreamCount = new AtomicInteger(0);
@@ -129,8 +133,11 @@ public void testAction() throws Exception {
129133
var ilmRoundsSum = new AtomicInteger(0);
130134
var ilmRoundsMin = new AtomicInteger(Integer.MAX_VALUE);
131135
var ilmRoundsMax = new AtomicInteger(Integer.MIN_VALUE);
136+
var ilmAggregateSamplingMethodCount = new AtomicInteger(0);
137+
var ilmLastValueSamplingMethodCount = new AtomicInteger(0);
138+
var ilmUndefinedSamplingMethodCount = new AtomicInteger(0);
132139
Set<String> usedPolicies = new HashSet<>();
133-
var forceMergeEnabled = new AtomicReference<IlmForceMergeInPolicies>();
140+
var ilmPolicySpecs = new AtomicReference<IlmPolicySpecs>();
134141

135142
/*
136143
* We now add a number of simulated data streams to the cluster state. We mix different combinations of:
@@ -140,7 +147,7 @@ public void testAction() throws Exception {
140147
*/
141148
updateClusterState(clusterState -> {
142149
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
143-
forceMergeEnabled.set(addIlmPolicies(metadataBuilder));
150+
ilmPolicySpecs.set(addIlmPolicies(metadataBuilder));
144151

145152
Map<String, DataStream> dataStreamMap = new HashMap<>();
146153
for (int dataStreamCount = 0; dataStreamCount < randomIntBetween(10, 100); dataStreamCount++) {
@@ -164,6 +171,14 @@ public void testAction() throws Exception {
164171
if (downsamplingConfiguredBy == DownsampledBy.DLM) {
165172
dlmDownsampledDataStreamCount.incrementAndGet();
166173
updateRounds(lifecycle.downsamplingRounds().size(), dlmRoundsCount, dlmRoundsSum, dlmRoundsMin, dlmRoundsMax);
174+
DownsampleConfig.SamplingMethod samplingMethod = lifecycle.downsamplingMethod();
175+
if (samplingMethod == null) {
176+
dlmUndefinedSamplingMethodCount.incrementAndGet();
177+
} else if (samplingMethod == DownsampleConfig.SamplingMethod.AGGREGATE) {
178+
dlmAggregateSamplingMethodCount.incrementAndGet();
179+
} else if (samplingMethod == DownsampleConfig.SamplingMethod.LAST_VALUE) {
180+
dlmLastValueSamplingMethodCount.incrementAndGet();
181+
}
167182
} else if (downsamplingConfiguredBy == DownsampledBy.ILM) {
168183
ilmDownsampledDataStreamCount.incrementAndGet();
169184
}
@@ -230,6 +245,17 @@ public void testAction() throws Exception {
230245
ilmRoundsMin,
231246
ilmRoundsMax
232247
);
248+
249+
DownsampleConfig.SamplingMethod samplingMethod = ilmPolicySpecs.get()
250+
.samplingMethodByPolicy()
251+
.get(policy);
252+
if (samplingMethod == null) {
253+
ilmUndefinedSamplingMethodCount.incrementAndGet();
254+
} else if (samplingMethod == DownsampleConfig.SamplingMethod.AGGREGATE) {
255+
ilmAggregateSamplingMethodCount.incrementAndGet();
256+
} else if (samplingMethod == DownsampleConfig.SamplingMethod.LAST_VALUE) {
257+
ilmLastValueSamplingMethodCount.incrementAndGet();
258+
}
233259
}
234260
} else if (downsamplingConfiguredBy == DownsampledBy.DLM) {
235261
dlmDownsampledIndexCount.incrementAndGet();
@@ -299,7 +325,10 @@ public void testAction() throws Exception {
299325
dlmRoundsCount.get(),
300326
dlmRoundsSum.get(),
301327
dlmRoundsMin.get(),
302-
dlmRoundsMax.get()
328+
dlmRoundsMax.get(),
329+
dlmAggregateSamplingMethodCount.get(),
330+
dlmLastValueSamplingMethodCount.get(),
331+
dlmUndefinedSamplingMethodCount.get()
303332
);
304333

305334
// ILM
@@ -312,23 +341,26 @@ public void testAction() throws Exception {
312341
ilmRoundsCount.get(),
313342
ilmRoundsSum.get(),
314343
ilmRoundsMin.get(),
315-
ilmRoundsMax.get()
344+
ilmRoundsMax.get(),
345+
ilmAggregateSamplingMethodCount.get(),
346+
ilmLastValueSamplingMethodCount.get(),
347+
ilmUndefinedSamplingMethodCount.get()
316348
);
317349
var explicitlyEnabled = new AtomicInteger(0);
318350
var explicitlyDisabled = new AtomicInteger(0);
319351
var undefined = new AtomicInteger(0);
320352
Map<String, Object> phasesStats = (Map<String, Object>) ilmStats.get("phases_in_use");
321353
if (usedPolicies.contains(DOWNSAMPLING_IN_HOT_POLICY)) {
322354
assertThat(phasesStats.get("hot"), equalTo(1));
323-
updateForceMergeCounters(forceMergeEnabled.get().enabledInHot, explicitlyEnabled, explicitlyDisabled, undefined);
355+
updateForceMergeCounters(ilmPolicySpecs.get().enabledInHot, explicitlyEnabled, explicitlyDisabled, undefined);
324356
} else {
325357
assertThat(phasesStats.get("hot"), nullValue());
326358
}
327359
if (usedPolicies.contains(DOWNSAMPLING_IN_WARM_COLD_POLICY)) {
328360
assertThat(phasesStats.get("warm"), equalTo(1));
329-
updateForceMergeCounters(forceMergeEnabled.get().enabledInWarm, explicitlyEnabled, explicitlyDisabled, undefined);
361+
updateForceMergeCounters(ilmPolicySpecs.get().enabledInWarm, explicitlyEnabled, explicitlyDisabled, undefined);
330362
assertThat(phasesStats.get("cold"), equalTo(1));
331-
updateForceMergeCounters(forceMergeEnabled.get().enabledInCold, explicitlyEnabled, explicitlyDisabled, undefined);
363+
updateForceMergeCounters(ilmPolicySpecs.get().enabledInCold, explicitlyEnabled, explicitlyDisabled, undefined);
332364
} else {
333365
assertThat(phasesStats.get("warm"), nullValue());
334366
assertThat(phasesStats.get("cold"), nullValue());
@@ -377,19 +409,28 @@ private void assertDownsamplingStats(
377409
Integer roundsCount,
378410
Integer roundsSum,
379411
Integer roundsMin,
380-
Integer roundsMax
412+
Integer roundsMax,
413+
Integer aggregateSamplingMethod,
414+
Integer lastValueSamplingMethod,
415+
Integer undefinedSamplingMethod
381416
) {
382417
assertThat(stats.get("downsampled_data_stream_count"), equalTo(downsampledDataStreamCount));
383418
if (downsampledDataStreamCount == 0) {
384419
assertThat(stats.get("downsampled_index_count"), nullValue());
385420
assertThat(stats.get("rounds_per_data_stream"), nullValue());
421+
assertThat(stats.get("sampling_method"), nullValue());
386422
} else {
387423
assertThat(stats.get("downsampled_index_count"), equalTo(downsampledIndexCount));
388424
assertThat(stats.containsKey("rounds_per_data_stream"), equalTo(true));
389425
Map<String, Object> roundsMap = (Map<String, Object>) stats.get("rounds_per_data_stream");
390426
assertThat(roundsMap.get("average"), equalTo((double) roundsSum / roundsCount));
391427
assertThat(roundsMap.get("min"), equalTo(roundsMin));
392428
assertThat(roundsMap.get("max"), equalTo(roundsMax));
429+
assertThat(stats.containsKey("sampling_method"), equalTo(true));
430+
Map<String, Object> samplingMethodMap = (Map<String, Object>) stats.get("sampling_method");
431+
assertThat(samplingMethodMap.get("aggregate"), equalTo(aggregateSamplingMethod));
432+
assertThat(samplingMethodMap.get("last_value"), equalTo(lastValueSamplingMethod));
433+
assertThat(samplingMethodMap.get("undefined"), equalTo(undefinedSamplingMethod));
393434
}
394435
}
395436

@@ -477,10 +518,12 @@ private List<DataStreamLifecycle.DownsamplingRound> randomDownsamplingRounds() {
477518
return rounds;
478519
}
479520

480-
private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder) {
521+
private IlmPolicySpecs addIlmPolicies(Metadata.Builder metadataBuilder) {
481522
Boolean hotForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
482523
Boolean warmForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
483524
Boolean coldForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
525+
DownsampleConfig.SamplingMethod samplingMethod1 = randomeSamplingMethod();
526+
DownsampleConfig.SamplingMethod samplingMethod2 = randomeSamplingMethod();
484527
List<LifecyclePolicy> policies = List.of(
485528
new LifecyclePolicy(
486529
DOWNSAMPLING_IN_HOT_POLICY,
@@ -489,7 +532,10 @@ private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder)
489532
new Phase(
490533
"hot",
491534
TimeValue.ZERO,
492-
Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null, hotForceMergeEnabled, null))
535+
Map.of(
536+
"downsample",
537+
new DownsampleAction(DateHistogramInterval.MINUTE, null, hotForceMergeEnabled, samplingMethod1)
538+
)
493539
)
494540
)
495541
),
@@ -500,13 +546,13 @@ private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder)
500546
new Phase(
501547
"warm",
502548
TimeValue.ZERO,
503-
Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, warmForceMergeEnabled, null))
549+
Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, warmForceMergeEnabled, samplingMethod2))
504550
),
505551
"cold",
506552
new Phase(
507553
"cold",
508554
TimeValue.timeValueDays(3),
509-
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, coldForceMergeEnabled, null))
555+
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, coldForceMergeEnabled, samplingMethod2))
510556
)
511557
)
512558
),
@@ -524,10 +570,26 @@ private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder)
524570
);
525571
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(policyMetadata, OperationMode.RUNNING);
526572
metadataBuilder.putCustom(IndexLifecycleMetadata.TYPE, newMetadata);
527-
return new IlmForceMergeInPolicies(hotForceMergeEnabled, warmForceMergeEnabled, coldForceMergeEnabled);
573+
var samplingMethods = new HashMap<String, DownsampleConfig.SamplingMethod>(2);
574+
if (samplingMethod1 != null) {
575+
samplingMethods.put(DOWNSAMPLING_IN_HOT_POLICY, samplingMethod1);
576+
}
577+
if (samplingMethod2 != null) {
578+
samplingMethods.put(DOWNSAMPLING_IN_WARM_COLD_POLICY, samplingMethod2);
579+
}
580+
return new IlmPolicySpecs(hotForceMergeEnabled, warmForceMergeEnabled, coldForceMergeEnabled, samplingMethods);
581+
}
582+
583+
private static DownsampleConfig.SamplingMethod randomeSamplingMethod() {
584+
return randomBoolean() ? null : randomFrom(DownsampleConfig.SamplingMethod.values());
528585
}
529586

530-
private record IlmForceMergeInPolicies(Boolean enabledInHot, Boolean enabledInWarm, Boolean enabledInCold) {}
587+
private record IlmPolicySpecs(
588+
Boolean enabledInHot,
589+
Boolean enabledInWarm,
590+
Boolean enabledInCold,
591+
Map<String, DownsampleConfig.SamplingMethod> samplingMethodByPolicy
592+
) {}
531593

532594
private static String randomIlmPolicy(DownsampledBy downsampledBy, boolean ovewrittenDlm) {
533595
if (downsampledBy == DownsampledBy.ILM || (downsampledBy == DownsampledBy.DLM && ovewrittenDlm)) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.core.action;
99

1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.downsample.DownsampleConfig;
1112
import org.elasticsearch.action.support.ActionFilters;
1213
import org.elasticsearch.cluster.ClusterState;
1314
import org.elasticsearch.cluster.metadata.DataStream;
@@ -102,21 +103,25 @@ protected void localClusterStateOperation(
102103
if (ds.isIndexManagedByDataStreamLifecycle(indexMetadata.getIndex(), ignored -> indexMetadata) && dlmRounds != null) {
103104
dlmStats.trackIndex(ds, indexMetadata);
104105
dlmStats.trackRounds(dlmRounds, ds, indexMetadata);
106+
dlmStats.trackSamplingMethod(ds.getDataLifecycle().downsamplingMethod(), ds, indexMetadata);
105107
} else if (ilmAvailable && projectMetadata.isIndexManagedByILM(indexMetadata)) {
106108
LifecyclePolicyMetadata policyMetadata = ilmMetadata.getPolicyMetadatas().get(indexMetadata.getLifecyclePolicyName());
107109
if (policyMetadata == null) {
108110
continue;
109111
}
110112
int rounds = 0;
113+
DownsampleConfig.SamplingMethod samplingMethod = null;
111114
for (Phase phase : policyMetadata.getPolicy().getPhases().values()) {
112115
if (phase.getActions().containsKey(DownsampleAction.NAME)) {
113116
rounds++;
117+
samplingMethod = ((DownsampleAction) phase.getActions().get(DownsampleAction.NAME)).samplingMethod();
114118
}
115119
}
116120
if (rounds > 0) {
117121
ilmStats.trackPolicy(policyMetadata.getPolicy());
118122
ilmStats.trackIndex(ds, indexMetadata);
119123
ilmStats.trackRounds(rounds, ds, indexMetadata);
124+
ilmStats.trackSamplingMethod(samplingMethod, ds, indexMetadata);
120125
}
121126
}
122127
String interval = indexMetadata.getSettings().get(IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.getKey());
@@ -143,6 +148,9 @@ protected void localClusterStateOperation(
143148
private static class DownsamplingStatsTracker {
144149
private long downsampledDataStreams = 0;
145150
private long downsampledIndices = 0;
151+
private long aggregateSamplingMethod = 0;
152+
private long lastValueSamplingMethod = 0;
153+
private long undefinedSamplingMethod = 0;
146154
private final LongSummaryStatistics rounds = new LongSummaryStatistics();
147155

148156
void trackIndex(DataStream ds, IndexMetadata indexMetadata) {
@@ -160,13 +168,31 @@ void trackRounds(int rounds, DataStream ds, IndexMetadata indexMetadata) {
160168
}
161169
}
162170

171+
void trackSamplingMethod(DownsampleConfig.SamplingMethod samplingMethod, DataStream ds, IndexMetadata indexMetadata) {
172+
// We want to track the sampling method per data stream,
173+
// so we use the write index to determine the active lifecycle configuration
174+
if (Objects.equals(indexMetadata.getIndex(), ds.getWriteIndex())) {
175+
if (samplingMethod == null) {
176+
undefinedSamplingMethod++;
177+
return;
178+
}
179+
switch (samplingMethod) {
180+
case DownsampleConfig.SamplingMethod.AGGREGATE -> aggregateSamplingMethod++;
181+
case DownsampleConfig.SamplingMethod.LAST_VALUE -> lastValueSamplingMethod++;
182+
}
183+
}
184+
}
185+
163186
TimeSeriesFeatureSetUsage.DownsamplingFeatureStats getDownsamplingStats() {
164187
return new TimeSeriesFeatureSetUsage.DownsamplingFeatureStats(
165188
downsampledDataStreams,
166189
downsampledIndices,
167190
rounds.getMin(),
168191
rounds.getAverage(),
169-
rounds.getMax()
192+
rounds.getMax(),
193+
aggregateSamplingMethod,
194+
lastValueSamplingMethod,
195+
undefinedSamplingMethod
170196
);
171197
}
172198
}

0 commit comments

Comments
 (0)