Skip to content

Commit 2e9b8f1

Browse files
authored
allocation: add balancer round summary as metrics (#136043)
This commit adds the BalancerRoundSummary as a collection of APM/open telemetry metrics. (These are already logged.) The summary provided every balancing round is sent synchronously into a counter or histogram. The round counter is implicitly incremented with each report. The shards moved are recorded with a counter and histogram. The per-node metrics, shard count, disk usage, write load, and total weight, are recorded into histograms with node_id and node_name attributes set. This change also includes a minor refactoring of the BalancingRoundSummary to move its key from node name to DiscoveryNode. This preserves the node id and name for use in histogram per-node attributes.
1 parent 44a377f commit 2e9b8f1

File tree

14 files changed

+433
-70
lines changed

14 files changed

+433
-70
lines changed

docs/changelog/136043.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 136043
2+
summary: "Allocation: add balancer round summary as metrics"
3+
area: Allocation
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
4242
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
4343
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
44+
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics;
4445
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
4546
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
4647
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
@@ -141,6 +142,7 @@ public class ClusterModule extends AbstractModule {
141142
private final AllocationStatsService allocationStatsService;
142143
private final TelemetryProvider telemetryProvider;
143144
private final DesiredBalanceMetrics desiredBalanceMetrics;
145+
private final AllocationBalancingRoundMetrics balancingRoundMetrics;
144146

145147
public ClusterModule(
146148
Settings settings,
@@ -168,6 +170,7 @@ public ClusterModule(
168170
balancingWeightsFactory
169171
);
170172
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
173+
this.balancingRoundMetrics = new AllocationBalancingRoundMetrics(telemetryProvider.getMeterRegistry());
171174
this.shardsAllocator = createShardsAllocator(
172175
settings,
173176
clusterService.getClusterSettings(),
@@ -180,7 +183,8 @@ public ClusterModule(
180183
writeLoadForecaster,
181184
nodeAllocationStatsAndWeightsCalculator,
182185
this::explainShardAllocation,
183-
desiredBalanceMetrics
186+
desiredBalanceMetrics,
187+
balancingRoundMetrics
184188
);
185189
this.clusterService = clusterService;
186190
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver);
@@ -521,7 +525,8 @@ private static ShardsAllocator createShardsAllocator(
521525
WriteLoadForecaster writeLoadForecaster,
522526
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
523527
ShardAllocationExplainer shardAllocationExplainer,
524-
DesiredBalanceMetrics desiredBalanceMetrics
528+
DesiredBalanceMetrics desiredBalanceMetrics,
529+
AllocationBalancingRoundMetrics balancingRoundMetrics
525530
) {
526531
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
527532
allocators.put(
@@ -538,7 +543,8 @@ private static ShardsAllocator createShardsAllocator(
538543
reconciler,
539544
nodeAllocationStatsAndWeightsCalculator,
540545
shardAllocationExplainer,
541-
desiredBalanceMetrics
546+
desiredBalanceMetrics,
547+
balancingRoundMetrics
542548
)
543549
);
544550

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import org.elasticsearch.cluster.node.DiscoveryNode;
13+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingRoundSummary.NodesWeightsChanges;
14+
import org.elasticsearch.core.SuppressForbidden;
15+
import org.elasticsearch.telemetry.metric.DoubleHistogram;
16+
import org.elasticsearch.telemetry.metric.LongCounter;
17+
import org.elasticsearch.telemetry.metric.LongHistogram;
18+
import org.elasticsearch.telemetry.metric.MeterRegistry;
19+
20+
import java.util.Map;
21+
22+
/**
23+
* A telemetry metrics sender for {@link BalancingRoundSummary}
24+
*/
25+
public class AllocationBalancingRoundMetrics {
26+
27+
// counters that measure rounds and moves from the last balancing round
28+
public static final String NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME = "es.allocator.balancing_round.balancing_rounds.total";
29+
public static final String NUMBER_OF_SHARD_MOVES_METRIC_NAME = "es.allocator.balancing_round.shard_moves.total";
30+
public static final String NUMBER_OF_SHARD_MOVES_HISTOGRAM_METRIC_NAME = "es.allocator.balancing_round.shard_moves.histogram";
31+
32+
// histograms that measure current utilization
33+
public static final String NUMBER_OF_SHARDS_METRIC_NAME = "es.allocator.balancing_round.shard_count.histogram";
34+
public static final String DISK_USAGE_BYTES_METRIC_NAME = "es.allocator.balancing_round.disk_usage_bytes.histogram";
35+
public static final String WRITE_LOAD_METRIC_NAME = "es.allocator.balancing_round.write_load.histogram";
36+
public static final String TOTAL_WEIGHT_METRIC_NAME = "es.allocator.balancing_round.total_weight.histogram";
37+
38+
private final LongCounter balancingRoundCounter;
39+
private final LongCounter shardMovesCounter;
40+
private final LongHistogram shardMovesHistogram;
41+
42+
private final LongHistogram shardCountHistogram;
43+
private final DoubleHistogram diskUsageHistogram;
44+
private final DoubleHistogram writeLoadHistogram;
45+
private final DoubleHistogram totalWeightHistogram;
46+
47+
public static AllocationBalancingRoundMetrics NOOP = new AllocationBalancingRoundMetrics(MeterRegistry.NOOP);
48+
49+
public AllocationBalancingRoundMetrics(MeterRegistry meterRegistry) {
50+
this.balancingRoundCounter = meterRegistry.registerLongCounter(
51+
NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME,
52+
"Total number of balancing rounds",
53+
"unit"
54+
);
55+
this.shardMovesCounter = meterRegistry.registerLongCounter(
56+
NUMBER_OF_SHARD_MOVES_METRIC_NAME,
57+
"Total number of shard moves",
58+
"unit"
59+
);
60+
61+
this.shardMovesHistogram = meterRegistry.registerLongHistogram(
62+
NUMBER_OF_SHARD_MOVES_HISTOGRAM_METRIC_NAME,
63+
"Number of shard movements executed in a balancing round",
64+
"unit"
65+
);
66+
this.shardCountHistogram = meterRegistry.registerLongHistogram(
67+
NUMBER_OF_SHARDS_METRIC_NAME,
68+
"change in node shard count per balancing round",
69+
"unit"
70+
);
71+
this.diskUsageHistogram = meterRegistry.registerDoubleHistogram(
72+
DISK_USAGE_BYTES_METRIC_NAME,
73+
"change in disk usage in bytes per balancing round",
74+
"unit"
75+
);
76+
this.writeLoadHistogram = meterRegistry.registerDoubleHistogram(
77+
WRITE_LOAD_METRIC_NAME,
78+
"change in write load per balancing round",
79+
"1.0"
80+
);
81+
this.totalWeightHistogram = meterRegistry.registerDoubleHistogram(
82+
TOTAL_WEIGHT_METRIC_NAME,
83+
"change in total weight per balancing round",
84+
"1.0"
85+
);
86+
}
87+
88+
@SuppressForbidden(reason = "ForbiddenAPIs bans Math.abs(long) because of overflow on Long.MIN_VALUE, but this is impossible here")
89+
private long longAbsNegativeSafe(long value) {
90+
assert value != Long.MIN_VALUE : "value must not be Long.MIN_VALUE";
91+
return Math.abs(value);
92+
}
93+
94+
public void addBalancingRoundSummary(BalancingRoundSummary summary) {
95+
balancingRoundCounter.increment();
96+
shardMovesCounter.incrementBy(summary.numberOfShardsToMove());
97+
shardMovesHistogram.record(summary.numberOfShardsToMove());
98+
99+
for (Map.Entry<DiscoveryNode, NodesWeightsChanges> changesEntry : summary.nodeToWeightChanges().entrySet()) {
100+
DiscoveryNode node = changesEntry.getKey();
101+
NodesWeightsChanges weightChanges = changesEntry.getValue();
102+
BalancingRoundSummary.NodeWeightsDiff weightsDiff = weightChanges.weightsDiff();
103+
104+
shardCountHistogram.record(longAbsNegativeSafe(weightsDiff.shardCountDiff()), getNodeAttributes(node));
105+
diskUsageHistogram.record(Math.abs(weightsDiff.diskUsageInBytesDiff()), getNodeAttributes(node));
106+
writeLoadHistogram.record(Math.abs(weightsDiff.writeLoadDiff()), getNodeAttributes(node));
107+
totalWeightHistogram.record(Math.abs(weightsDiff.totalWeightDiff()), getNodeAttributes(node));
108+
}
109+
}
110+
111+
private Map<String, Object> getNodeAttributes(DiscoveryNode node) {
112+
return Map.of("node_name", node.getName(), "node_id", node.getId());
113+
}
114+
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.cluster.node.DiscoveryNode;
1415
import org.elasticsearch.common.settings.ClusterSettings;
1516
import org.elasticsearch.common.settings.Setting;
1617
import org.elasticsearch.core.TimeValue;
@@ -58,6 +59,7 @@ public class AllocationBalancingRoundSummaryService {
5859
private final ThreadPool threadPool;
5960
private volatile boolean enableBalancerRoundSummaries;
6061
private volatile TimeValue summaryReportInterval;
62+
private final AllocationBalancingRoundMetrics balancingRoundMetrics;
6163

6264
/**
6365
* A concurrency-safe list of balancing round summaries. Balancer rounds are run and added here serially, so the queue will naturally
@@ -68,12 +70,17 @@ public class AllocationBalancingRoundSummaryService {
6870
/** This reference is set when reporting is scheduled. If it is null, then reporting is inactive. */
6971
private final AtomicReference<Scheduler.Cancellable> scheduledReportFuture = new AtomicReference<>();
7072

71-
public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSettings clusterSettings) {
73+
public AllocationBalancingRoundSummaryService(
74+
ThreadPool threadPool,
75+
ClusterSettings clusterSettings,
76+
AllocationBalancingRoundMetrics balancingRoundMetrics
77+
) {
7278
this.threadPool = threadPool;
7379
// Initialize the local setting values to avoid a null access when ClusterSettings#initializeAndWatch is called on each setting:
7480
// updating enableBalancerRoundSummaries accesses summaryReportInterval.
7581
this.enableBalancerRoundSummaries = clusterSettings.get(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING);
7682
this.summaryReportInterval = clusterSettings.get(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING);
83+
this.balancingRoundMetrics = balancingRoundMetrics;
7784

7885
clusterSettings.initializeAndWatch(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING, value -> {
7986
this.enableBalancerRoundSummaries = value;
@@ -99,14 +106,14 @@ public static BalancingRoundSummary createBalancerRoundSummary(DesiredBalance ol
99106
* Creates a summary of the node weight changes from {@code oldDesiredBalance} to {@code newDesiredBalance}.
100107
* See {@link BalancingRoundSummary.NodesWeightsChanges} for content details.
101108
*/
102-
private static Map<String, BalancingRoundSummary.NodesWeightsChanges> createWeightsSummary(
109+
private static Map<DiscoveryNode, BalancingRoundSummary.NodesWeightsChanges> createWeightsSummary(
103110
DesiredBalance oldDesiredBalance,
104111
DesiredBalance newDesiredBalance
105112
) {
106113
var oldWeightsPerNode = oldDesiredBalance.weightsPerNode();
107114
var newWeightsPerNode = newDesiredBalance.weightsPerNode();
108115

109-
Map<String, BalancingRoundSummary.NodesWeightsChanges> nodeNameToWeightInfo = new HashMap<>(oldWeightsPerNode.size());
116+
Map<DiscoveryNode, BalancingRoundSummary.NodesWeightsChanges> nodeNameToWeightInfo = new HashMap<>(oldWeightsPerNode.size());
110117
for (var nodeAndWeights : oldWeightsPerNode.entrySet()) {
111118
var discoveryNode = nodeAndWeights.getKey();
112119
var oldNodeWeightStats = nodeAndWeights.getValue();
@@ -116,7 +123,7 @@ private static Map<String, BalancingRoundSummary.NodesWeightsChanges> createWeig
116123
var newNodeWeightStats = newWeightsPerNode.getOrDefault(discoveryNode, DesiredBalanceMetrics.NodeWeightStats.ZERO);
117124

118125
nodeNameToWeightInfo.put(
119-
discoveryNode.getName(),
126+
discoveryNode,
120127
new BalancingRoundSummary.NodesWeightsChanges(
121128
oldNodeWeightStats,
122129
BalancingRoundSummary.NodeWeightsDiff.create(oldNodeWeightStats, newNodeWeightStats)
@@ -128,11 +135,11 @@ private static Map<String, BalancingRoundSummary.NodesWeightsChanges> createWeig
128135
// the new DesiredBalance to check.
129136
for (var nodeAndWeights : newWeightsPerNode.entrySet()) {
130137
var discoveryNode = nodeAndWeights.getKey();
131-
if (nodeNameToWeightInfo.containsKey(discoveryNode.getName()) == false) {
138+
if (nodeNameToWeightInfo.containsKey(discoveryNode) == false) {
132139
// This node is new in the new DesiredBalance, there was no entry added during iteration of the nodes in the old
133140
// DesiredBalance. So we'll make a new entry with a base of zero value weights and a weights diff of the new node's weights.
134141
nodeNameToWeightInfo.put(
135-
discoveryNode.getName(),
142+
discoveryNode,
136143
new BalancingRoundSummary.NodesWeightsChanges(
137144
DesiredBalanceMetrics.NodeWeightStats.ZERO,
138145
BalancingRoundSummary.NodeWeightsDiff.create(DesiredBalanceMetrics.NodeWeightStats.ZERO, nodeAndWeights.getValue())
@@ -164,6 +171,7 @@ public void addBalancerRoundSummary(BalancingRoundSummary summary) {
164171
}
165172

166173
summaries.add(summary);
174+
balancingRoundMetrics.addBalancingRoundSummary(summary);
167175
}
168176

169177
/**

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummary.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,22 @@
99

1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

12+
import org.elasticsearch.cluster.node.DiscoveryNode;
13+
import org.elasticsearch.core.Strings;
14+
1215
import java.util.HashMap;
1316
import java.util.List;
1417
import java.util.Map;
1518

1619
/**
1720
* Summarizes the impact to the cluster as a result of a rebalancing round.
1821
*
19-
* @param nodeNameToWeightChanges The shard balance weight changes for each node (by name), comparing a previous DesiredBalance shard
22+
* @param nodeToWeightChanges The shard balance weight changes for each DiscoveryNode, comparing a previous DesiredBalance shard
2023
* allocation to a new DesiredBalance allocation.
2124
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one. Does not include
2225
* new (index creation) or removed (index deletion) shard assignements.
2326
*/
24-
public record BalancingRoundSummary(Map<String, NodesWeightsChanges> nodeNameToWeightChanges, long numberOfShardsToMove) {
27+
public record BalancingRoundSummary(Map<DiscoveryNode, NodesWeightsChanges> nodeToWeightChanges, long numberOfShardsToMove) {
2528

2629
/**
2730
* Represents the change in weights for a node going from an old DesiredBalance to a new DesiredBalance
@@ -75,8 +78,8 @@ public NodeWeightsDiff combine(NodeWeightsDiff otherDiff) {
7578
@Override
7679
public String toString() {
7780
return "BalancingRoundSummary{"
78-
+ "nodeNameToWeightChanges"
79-
+ nodeNameToWeightChanges
81+
+ "nodeToWeightChanges"
82+
+ nodeToWeightChanges
8083
+ ", numberOfShardsToMove="
8184
+ numberOfShardsToMove
8285
+ '}';
@@ -93,17 +96,34 @@ public String toString() {
9396
* latest desired balance.
9497
*
9598
* @param numberOfBalancingRounds How many balancing round summaries are combined in this report.
96-
* @param nodeNameToWeightChanges
99+
* @param nodeToWeightChanges
97100
* @param numberOfShardMoves The sum of shard moves for each balancing round being combined into a single summary.
98101
*/
99102
public record CombinedBalancingRoundSummary(
100103
int numberOfBalancingRounds,
101-
Map<String, NodesWeightsChanges> nodeNameToWeightChanges,
104+
Map<DiscoveryNode, NodesWeightsChanges> nodeToWeightChanges,
102105
long numberOfShardMoves
103106
) {
104107

105108
public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary(0, new HashMap<>(), 0);
106109

110+
/**
111+
* Serialize the CombinedBalancingRoundSummary to a compact log representation, where {@link DiscoveryNode#getName()} is used
112+
* instead of the entire {@link DiscoveryNode#toString()} method.
113+
*/
114+
@Override
115+
public String toString() {
116+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = new HashMap<>(nodeToWeightChanges.size());
117+
nodeToWeightChanges.forEach((node, nodesWeightChanges) -> nodeNameToWeightChanges.put(node.getName(), nodesWeightChanges));
118+
119+
return Strings.format(
120+
"CombinedBalancingRoundSummary[numberOfBalancingRounds=%d, nodeToWeightChange=%s, numberOfShardMoves=%d]",
121+
numberOfBalancingRounds,
122+
nodeNameToWeightChanges,
123+
numberOfShardMoves
124+
);
125+
}
126+
107127
/**
108128
* Merges multiple {@link BalancingRoundSummary} summaries into a single {@link CombinedBalancingRoundSummary}.
109129
*/
@@ -113,7 +133,7 @@ public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary>
113133
}
114134

115135
// We will loop through the summaries and sum the weight diffs for each node entry.
116-
Map<String, NodesWeightsChanges> combinedNodeNameToWeightChanges = new HashMap<>();
136+
Map<DiscoveryNode, NodesWeightsChanges> combinedNodeNameToWeightChanges = new HashMap<>();
117137

118138
// Number of shards moves are simply summed across summaries. Each new balancing round is built upon the last one, so it is
119139
// possible that a shard is reassigned back to a node before it even moves away, and that will still be counted as 2 moves here.
@@ -128,7 +148,7 @@ public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary>
128148

129149
// We'll build the weight changes by keeping the node weight base from the first summary in which a node appears and then
130150
// summing the weight diffs in each summary to get total weight diffs across summaries.
131-
for (var nodeNameAndWeights : summary.nodeNameToWeightChanges.entrySet()) {
151+
for (var nodeNameAndWeights : summary.nodeToWeightChanges.entrySet()) {
132152
var combined = combinedNodeNameToWeightChanges.get(nodeNameAndWeights.getKey());
133153
if (combined == null) {
134154
// Either this is the first summary, and combinedNodeNameToWeightChanges hasn't been initialized yet for this node;

0 commit comments

Comments
 (0)