Skip to content

Commit 47a2a80

Browse files
dajacTaiJuWu
authored andcommitted
MINOR: Update GroupCoordinator interface to received MetadataImage/Delta directly (apache#21008)
In apache#20061, we introduced the `CoordinatorMetadataImage` and `CoordinatorMetadataDelta` interfaces to basically contain/control how metadata is used within the `GroupCoordinatorService`, more precisely within the `GroupCoordinatorShard`. When we did the change, we directly changed the `GroupCoordinator` interface to take implementation of those interfaces, requiring to wrap the `MetadataImage` and the `MetadataDelta` in the `BrokerMetadataPublisher`. While looking at it now, I find this not idea for a couple a reasons: 1) From a `BrokerMetadataPublisher` perspective, propagating metadata should be standardized. Basically, all the internal components should work the same from his point of view. If a component wants to be more restrictive within his scope, it is fine but the publisher should not be aware of this. 2) From a `GroupCoordinator` perspective, requiring `CoordinatorMetadataImage` and `CoordinatorMetadataDelta` limits what we can do. For instance, we could move more functionality such as electing shards from the `BrokerMetadataPublisher` to the `GroupCoordinatorService` to further simplify the metadata publishing and improving the encapsulation of the components. 3) From a `ShareCoordinator` perspective, the abstraction failed short as we require `CoordinatorMetadataImage` and `CoordinatorMetadataDelta` in the interface but we still require `FeaturesImage` as well. The abstraction fails short here. This patch is an attempt to change this by moving back to requiring `MetadataImage` and `MetadataDelta` in the `GroupCoordinator` interface and to wrap them within the service itself. It does not change the `ShareCoordinator` yet. I will do this as a follow-up if people agree with the general approach. Reviewers: Sean Quah <squah@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
1 parent edf5947 commit 47a2a80

File tree

8 files changed

+54
-57
lines changed

8 files changed

+54
-57
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2535,12 +2535,12 @@ public void scheduleUnloadOperation(
25352535
/**
25362536
* A new metadata image is available.
25372537
*
2538-
* @param newImage The new metadata image.
2539-
* @param delta The metadata delta.
2538+
* @param delta The metadata delta.
2539+
* @param newImage The new metadata image.
25402540
*/
2541-
public void onNewMetadataImage(
2542-
CoordinatorMetadataImage newImage,
2543-
CoordinatorMetadataDelta delta
2541+
public void onMetadataUpdate(
2542+
CoordinatorMetadataDelta delta,
2543+
CoordinatorMetadataImage newImage
25442544
) {
25452545
throwIfNotRunning();
25462546
log.debug("Scheduling applying of a new metadata image with version {}.", newImage.version());

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1968,7 +1968,7 @@ public void testClose() throws Exception {
19681968
}
19691969

19701970
@Test
1971-
public void testOnNewMetadataImage() {
1971+
public void testOnMetadataUpdate() {
19721972
TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
19731973
TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
19741974

@@ -2029,7 +2029,7 @@ public void testOnNewMetadataImage() {
20292029
// Publish a new image.
20302030
CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(new MetadataDelta(MetadataImage.EMPTY));
20312031
CoordinatorMetadataImage newImage = CoordinatorMetadataImage.EMPTY;
2032-
runtime.onNewMetadataImage(newImage, delta);
2032+
runtime.onMetadataUpdate(delta, newImage);
20332033

20342034
// Coordinator 0 should be notified about it.
20352035
verify(coordinator0).onNewMetadataImage(newImage, delta);

core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ class BrokerMetadataPublisher(
234234

235235
try {
236236
// Propagate the new image to the group coordinator.
237-
groupCoordinator.onNewMetadataImage(new KRaftCoordinatorMetadataImage(newImage), new KRaftCoordinatorMetadataDelta(delta))
237+
groupCoordinator.onMetadataUpdate(delta, newImage)
238238
} catch {
239239
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
240240
s"coordinator with local changes in $deltaName", t)

core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import org.apache.kafka.common.internals.Topic
3535
import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
3636
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
3737
import org.apache.kafka.common.utils.Exit
38-
import org.apache.kafka.coordinator.common.runtime.{KRaftCoordinatorMetadataDelta, KRaftCoordinatorMetadataImage}
3938
import org.apache.kafka.coordinator.group.GroupCoordinator
4039
import org.apache.kafka.coordinator.share.ShareCoordinator
4140
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
@@ -292,7 +291,7 @@ class BrokerMetadataPublisherTest {
292291
.numBytes(42)
293292
.build())
294293

295-
verify(groupCoordinator).onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), new KRaftCoordinatorMetadataDelta(delta))
294+
verify(groupCoordinator).onMetadataUpdate(delta, image)
296295
}
297296

298297
@Test

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@
5353
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
5454
import org.apache.kafka.common.requests.TransactionResult;
5555
import org.apache.kafka.common.utils.BufferSupplier;
56-
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
57-
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
5856
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
57+
import org.apache.kafka.image.MetadataDelta;
58+
import org.apache.kafka.image.MetadataImage;
5959
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
6060

6161
import java.time.Duration;
@@ -452,12 +452,12 @@ void onResignation(
452452
/**
453453
* A new metadata image is available.
454454
*
455-
* @param newImage The new metadata image.
456455
* @param delta The metadata delta.
456+
* @param newImage The new metadata image.
457457
*/
458-
void onNewMetadataImage(
459-
CoordinatorMetadataImage newImage,
460-
CoordinatorMetadataDelta delta
458+
void onMetadataUpdate(
459+
MetadataDelta delta,
460+
MetadataImage newImage
461461
);
462462

463463
/**

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,21 @@
8585
import org.apache.kafka.common.utils.Utils;
8686
import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor;
8787
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
88-
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
8988
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
9089
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
9190
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
9291
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
9392
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics;
9493
import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSupplier;
94+
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataDelta;
95+
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
9596
import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
9697
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
9798
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
9899
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
99100
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
101+
import org.apache.kafka.image.MetadataDelta;
102+
import org.apache.kafka.image.MetadataImage;
100103
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
101104
import org.apache.kafka.server.authorizer.Authorizer;
102105
import org.apache.kafka.server.record.BrokerCompressionType;
@@ -344,9 +347,9 @@ public GroupCoordinatorService build() {
344347

345348
/**
346349
* The metadata image to extract topic id to names map.
347-
* This is initialised when the {@link GroupCoordinator#onNewMetadataImage(CoordinatorMetadataImage, CoordinatorMetadataDelta)} is called
350+
* This is initialised when the {@link GroupCoordinator#onMetadataUpdate(MetadataDelta, MetadataImage)} is called
348351
*/
349-
private CoordinatorMetadataImage metadataImage = null;
352+
private volatile CoordinatorMetadataImage metadataImage = null;
350353

351354
/**
352355
*
@@ -2299,16 +2302,18 @@ public void onResignation(
22992302
}
23002303

23012304
/**
2302-
* See {@link GroupCoordinator#onNewMetadataImage(CoordinatorMetadataImage, CoordinatorMetadataDelta)}.
2305+
* See {@link GroupCoordinator#onMetadataUpdate(MetadataDelta, MetadataImage)}.
23032306
*/
23042307
@Override
2305-
public void onNewMetadataImage(
2306-
CoordinatorMetadataImage newImage,
2307-
CoordinatorMetadataDelta delta
2308+
public void onMetadataUpdate(
2309+
MetadataDelta delta,
2310+
MetadataImage newImage
23082311
) {
23092312
throwIfNotActive();
2310-
metadataImage = newImage;
2311-
runtime.onNewMetadataImage(newImage, delta);
2313+
var wrappedImage = newImage == null ? null : new KRaftCoordinatorMetadataImage(newImage);
2314+
var wrappedDelta = delta == null ? null : new KRaftCoordinatorMetadataDelta(delta);
2315+
metadataImage = wrappedImage;
2316+
runtime.onMetadataUpdate(wrappedDelta, wrappedImage);
23122317
}
23132318

23142319
/**

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,8 @@
8888
import org.apache.kafka.common.utils.BufferSupplier;
8989
import org.apache.kafka.common.utils.LogContext;
9090
import org.apache.kafka.common.utils.Utils;
91-
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
9291
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
9392
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
94-
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataDelta;
95-
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
9693
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
9794
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
9895
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
@@ -3166,7 +3163,7 @@ public void testOnPartitionsDeleted() {
31663163
.addTopic(Uuid.randomUuid(), "foo", 1)
31673164
.build();
31683165

3169-
service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), new KRaftCoordinatorMetadataDelta(new MetadataDelta(image)));
3166+
service.onMetadataUpdate(new MetadataDelta(image), image);
31703167

31713168
when(runtime.scheduleWriteAllOperation(
31723169
ArgumentMatchers.eq("on-partition-deleted"),
@@ -3224,7 +3221,7 @@ public void testOnPartitionsDeletedCleanupShareGroupState() {
32243221
.addTopic(Uuid.randomUuid(), "foo", 1)
32253222
.build();
32263223

3227-
service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), new KRaftCoordinatorMetadataDelta(new MetadataDelta(image)));
3224+
service.onMetadataUpdate(new MetadataDelta(image), image);
32283225

32293226
// No error in partition deleted callback
32303227
when(runtime.scheduleWriteAllOperation(
@@ -3271,10 +3268,10 @@ public void testOnPartitionsDeletedCleanupShareGroupStateEmptyMetadata() {
32713268
.build();
32723269
service.startup(() -> 3);
32733270

3274-
CoordinatorMetadataImage image = new MetadataImageBuilder()
3271+
MetadataImage image = new MetadataImageBuilder()
32753272
.addTopic(Uuid.randomUuid(), "bar", 1)
3276-
.buildCoordinatorMetadataImage();
3277-
service.onNewMetadataImage(image, image.emptyDelta());
3273+
.build();
3274+
service.onMetadataUpdate(new MetadataDelta(image), image);
32783275

32793276
// No error in partition deleted callback
32803277
when(runtime.scheduleWriteAllOperation(
@@ -3321,8 +3318,8 @@ public void testOnPartitionsDeletedCleanupShareGroupStateTopicsNotInMetadata() {
33213318
.build();
33223319
service.startup(() -> 3);
33233320

3324-
CoordinatorMetadataImage image = CoordinatorMetadataImage.EMPTY;
3325-
service.onNewMetadataImage(image, image.emptyDelta());
3321+
MetadataImage image = MetadataImage.EMPTY;
3322+
service.onMetadataUpdate(new MetadataDelta(image), image);
33263323

33273324
// No error in partition deleted callback
33283325
when(runtime.scheduleWriteAllOperation(
@@ -4149,7 +4146,7 @@ public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws Interrupt
41494146
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
41504147
.build();
41514148

4152-
service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
4149+
service.onMetadataUpdate(new MetadataDelta(image), image);
41534150

41544151
int partition = 1;
41554152

@@ -4232,7 +4229,7 @@ public void testDescribeShareGroupOffsetsMetadataImageNull() throws ExecutionExc
42324229
.build(true);
42334230

42344231
// Forcing a null Metadata Image
4235-
service.onNewMetadataImage(null, null);
4232+
service.onMetadataUpdate(null, null);
42364233

42374234
int partition = 1;
42384235
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
@@ -4276,7 +4273,7 @@ public void testDescribeShareGroupAllOffsets() throws InterruptedException, Exec
42764273
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
42774274
.build();
42784275

4279-
service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
4276+
service.onMetadataUpdate(null, image);
42804277

42814278
int partition = 1;
42824279

@@ -4347,7 +4344,7 @@ public void testDescribeShareGroupAllOffsetsThrowsError() {
43474344
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
43484345
.build();
43494346

4350-
service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
4347+
service.onMetadataUpdate(null, image);
43514348

43524349
int partition = 1;
43534350

@@ -4383,7 +4380,7 @@ public void testDescribeShareGroupAllOffsetsNullResult() {
43834380
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
43844381
.build();
43854382

4386-
service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
4383+
service.onMetadataUpdate(null, image);
43874384

43884385
int partition = 1;
43894386

@@ -4420,7 +4417,7 @@ public void testDescribeShareGroupAllOffsetsReadSummaryPartitionError() throws I
44204417
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
44214418
.build();
44224419

4423-
service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
4420+
service.onMetadataUpdate(null, image);
44244421

44254422
int partition = 1;
44264423

@@ -4510,7 +4507,7 @@ public void testDescribeShareGroupAllOffsetsMetadataImageNull() throws Execution
45104507
.build(true);
45114508

45124509
// Forcing a null Metadata Image
4513-
service.onNewMetadataImage(null, null);
4510+
service.onMetadataUpdate(null, null);
45144511

45154512
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
45164513
.setGroupId("share-group-id")
@@ -4707,7 +4704,7 @@ public void testDeleteShareGroupOffsetsMetadataImageNull() throws ExecutionExcep
47074704
.build(true);
47084705

47094706
// Forcing a null Metadata Image
4710-
service.onNewMetadataImage(null, null);
4707+
service.onMetadataUpdate(null, null);
47114708

47124709
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
47134710
.setGroupId("share-group-id")
@@ -5584,7 +5581,7 @@ public void testPersisterInitializeSuccess() {
55845581
.addTopic(topicId, "topic-name", 3)
55855582
.build();
55865583

5587-
service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
5584+
service.onMetadataUpdate(null, image);
55885585

55895586
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
55905587
new InitializeShareGroupStateResult.Builder()
@@ -5759,7 +5756,7 @@ public void testPersisterInitializeGroupInitializeFailure() {
57595756
.addTopic(topicId, "topic-name", 3)
57605757
.build();
57615758

5762-
service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
5759+
service.onMetadataUpdate(null, image);
57635760

57645761
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
57655762
new InitializeShareGroupStateResult.Builder()
@@ -5821,7 +5818,7 @@ public void testAlterShareGroupOffsetsMetadataImageNull() throws ExecutionExcept
58215818
.build(true);
58225819

58235820
// Forcing a null Metadata Image
5824-
service.onNewMetadataImage(null, null);
5821+
service.onMetadataUpdate(null, null);
58255822

58265823
String groupId = "share-group";
58275824
AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData()
@@ -5977,7 +5974,7 @@ public void testPersisterInitializeForAlterShareGroupOffsetsResponseSuccess() {
59775974
.addTopic(topicId, "topic-name", 1)
59785975
.build();
59795976

5980-
service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
5977+
service.onMetadataUpdate(null, image);
59815978

59825979
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
59835980
new InitializeShareGroupStateResult.Builder()
@@ -6026,7 +6023,7 @@ private static class GroupCoordinatorServiceBuilder {
60266023
private CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime;
60276024
private GroupCoordinatorMetrics metrics = new GroupCoordinatorMetrics();
60286025
private Persister persister = new NoOpStatePersister();
6029-
private CoordinatorMetadataImage metadataImage = null;
6026+
private MetadataImage metadataImage = null;
60306027
private PartitionMetadataClient partitionMetadataClient = null;
60316028

60326029
GroupCoordinatorService build() {
@@ -6035,7 +6032,9 @@ GroupCoordinatorService build() {
60356032

60366033
GroupCoordinatorService build(boolean serviceStartup) {
60376034
if (metadataImage == null) {
6038-
metadataImage = mock(CoordinatorMetadataImage.class);
6035+
metadataImage = new MetadataImageBuilder()
6036+
.addTopic(TOPIC_ID, TOPIC_NAME, 1)
6037+
.build();
60396038
}
60406039

60416040
GroupCoordinatorService service = new GroupCoordinatorService(
@@ -6051,14 +6050,8 @@ GroupCoordinatorService build(boolean serviceStartup) {
60516050

60526051
if (serviceStartup) {
60536052
service.startup(() -> 1);
6054-
service.onNewMetadataImage(metadataImage, null);
6053+
service.onMetadataUpdate(null, metadataImage);
60556054
}
6056-
when(metadataImage.topicNames()).thenReturn(Set.of(TOPIC_NAME));
6057-
var topicMetadata = mock(CoordinatorMetadataImage.TopicMetadata.class);
6058-
when(topicMetadata.name()).thenReturn(TOPIC_NAME);
6059-
when(topicMetadata.id()).thenReturn(TOPIC_ID);
6060-
when(metadataImage.topicMetadata(TOPIC_ID)).thenReturn(Optional.of(topicMetadata));
6061-
when(metadataImage.topicMetadata(TOPIC_NAME)).thenReturn(Optional.of(topicMetadata));
60626055

60636056
return service;
60646057
}

share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1100,7 +1100,7 @@ public void onTopicsDeleted(Set<Uuid> deletedTopicIds, BufferSupplier bufferSupp
11001100
@Override
11011101
public void onNewMetadataImage(CoordinatorMetadataImage newImage, FeaturesImage newFeaturesImage, CoordinatorMetadataDelta delta) {
11021102
throwIfNotActive();
1103-
this.runtime.onNewMetadataImage(newImage, delta);
1103+
this.runtime.onMetadataUpdate(delta, newImage);
11041104
boolean enabled = isShareGroupsEnabled(newFeaturesImage);
11051105
// enabled shouldRunJob result (XOR)
11061106
// 0 0 no op on flag, do not call jobs

0 commit comments

Comments
 (0)