From 8cf18e0508db2b9f86d20995b2c35bf5a0e347e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20St=C3=B6r?= Date: Thu, 27 Nov 2025 13:41:27 +0100 Subject: [PATCH 1/7] Rename all rest-controller endpoints --- .../nodelibrary/types/ClusterRestRouteConfigurations.java | 6 +++--- .../types/ClusterStorageBinaryDataDistributorKafka.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRouteConfigurations.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRouteConfigurations.java index 0f8e6ce..8ff090c 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRouteConfigurations.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRouteConfigurations.java @@ -94,7 +94,7 @@ private GetMicrostreamStorageBytes() public static final class PostMicrostreamBackup { - public static final String PATH = "/microstream-backup"; + public static final String PATH = "/eclipse-datagrid-backup"; public static final String CONSUMES = MediaTypes.APPLICATION_JSON; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; @@ -120,7 +120,7 @@ private PostMicrostreamBackup() public static final class GetMicrostreamBackup { - public static final String PATH = "/microstream-backup"; + public static final String PATH = "/eclipse-datagrid-backup"; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; @@ -152,7 +152,7 @@ private GetMicrostreamUpdates() public static final class PostMicrostreamResumeUpdates { - public static final String PATH = "/microstream-resume-updates"; + public static final String PATH = "/eclipse-datagrid-resume-updates"; public static final String CONSUMES = MediaTypes.WILDCARD; public static final String PRODUCES = MediaTypes.WILDCARD; diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributorKafka.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributorKafka.java index 9568cf0..a46792d 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributorKafka.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributorKafka.java @@ -257,7 +257,7 @@ private Async(final String topicName, final KafkaPropertiesProvider kafkaPropert private Thread createThread(final Runnable runnable) { final Thread thread = new Thread(runnable); - thread.setName("MicroStream-StorageDistributor-Kafka"); + thread.setName("Eclipse-Datagrid-StorageDistributor-Kafka"); return thread; } From 5b84cc387630806e7cca92401d311e743ac659c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20St=C3=B6r?= Date: Thu, 27 Nov 2025 15:09:15 +0100 Subject: [PATCH 2/7] Rename rest route configurations --- .../helidon/HelidonClusterController.java | 122 ++++++++-------- .../types/MicronautClusterController.java | 132 +++++++++--------- .../exceptions/NotADistributorException.java | 2 +- .../types/ClusterRestRequestController.java | 120 ++++++++-------- ...> StorageNodeRestRouteConfigurations.java} | 57 ++++---- .../SpringBootClusterController.java | 108 +++++++------- 6 files changed, 270 insertions(+), 271 deletions(-) rename cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/{ClusterRestRouteConfigurations.java => StorageNodeRestRouteConfigurations.java} (72%) diff --git a/cluster/nodelibrary/helidon/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/helidon/HelidonClusterController.java b/cluster/nodelibrary/helidon/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/helidon/HelidonClusterController.java index 8845837..52a0737 100644 --- a/cluster/nodelibrary/helidon/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/helidon/HelidonClusterController.java +++ b/cluster/nodelibrary/helidon/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/helidon/HelidonClusterController.java @@ -18,14 +18,14 @@ import jakarta.ws.rs.*; import org.eclipse.datagrid.cluster.nodelibrary.exceptions.HttpResponseException; import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRequestController; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations; import org.eclipse.microprofile.openapi.annotations.parameters.RequestBody; -import static org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.*; +import static org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.*; @ApplicationScoped -@Path(ClusterRestRouteConfigurations.ROOT_PATH) +@Path(StorageNodeRestRouteConfigurations.ROOT_PATH) public class HelidonClusterController { private final ClusterRestRequestController requestController; @@ -36,112 +36,112 @@ public HelidonClusterController(final ClusterRestRequestController requestContro } @GET - @Path(GetMicrostreamDistributor.PATH) - @Produces(GetMicrostreamDistributor.PRODUCES) - public boolean getMicrostreamDistributor() throws HttpResponseException + @Path(GetDistributor.PATH) + @Produces(GetDistributor.PRODUCES) + public boolean getDataGridDistributor() throws HttpResponseException { - return this.requestController.getMicrostreamDistributor(); + return this.requestController.getDataGridDistributor(); } @POST - @Path(PostMicrostreamActivateDistributorStart.PATH) - @Consumes(PostMicrostreamActivateDistributorStart.CONSUMES) - @Produces(PostMicrostreamActivateDistributorStart.PRODUCES) - public void postMicrostreamActivateDistributorStart() throws HttpResponseException + @Path(PostActivateDistributorStart.PATH) + @Consumes(PostActivateDistributorStart.CONSUMES) + @Produces(PostActivateDistributorStart.PRODUCES) + public void postDataGridActivateDistributorStart() throws HttpResponseException { - this.requestController.postMicrostreamActivateDistributorStart(); + this.requestController.postDataGridActivateDistributorStart(); } @POST - @Path(PostMicrostreamActivateDistributorFinish.PATH) - @Consumes(PostMicrostreamActivateDistributorFinish.CONSUMES) - @Produces(PostMicrostreamActivateDistributorFinish.PRODUCES) - public boolean postMicrostreamActivateDistributorFinish() throws HttpResponseException + @Path(PostActivateDistributorFinish.PATH) + @Consumes(PostActivateDistributorFinish.CONSUMES) + @Produces(PostActivateDistributorFinish.PRODUCES) + public boolean postDataGridActivateDistributorFinish() throws HttpResponseException { - return this.requestController.postMicrostreamActivateDistributorFinish(); + return this.requestController.postDataGridActivateDistributorFinish(); } @GET - @Path(GetMicrostreamHealth.PATH) - @Produces(GetMicrostreamHealth.PRODUCES) - public void getMicrostreamHealth() throws HttpResponseException + @Path(GetHealth.PATH) + @Produces(GetHealth.PRODUCES) + public void getDataGridHealth() throws HttpResponseException { - this.requestController.getMicrostreamHealth(); + this.requestController.getDataGridHealth(); } @GET - @Path(GetMicrostreamHealthReady.PATH) - @Produces(GetMicrostreamHealthReady.PRODUCES) - public void getMicrostreamHealthReady() throws HttpResponseException + @Path(GetHealthReady.PATH) + @Produces(GetHealthReady.PRODUCES) + public void getDataGridHealthReady() throws HttpResponseException { - this.requestController.getMicrostreamHealthReady(); + this.requestController.getDataGridHealthReady(); } @GET - @Path(GetMicrostreamStorageBytes.PATH) - @Produces(GetMicrostreamStorageBytes.PRODUCES) - public String getMicrostreamStorageBytes() throws HttpResponseException + @Path(GetStorageBytes.PATH) + @Produces(GetStorageBytes.PRODUCES) + public String getDataGridStorageBytes() throws HttpResponseException { - return this.requestController.getMicrostreamStorageBytes(); + return this.requestController.getDataGridStorageBytes(); } @POST - @Path(PostMicrostreamBackup.PATH) - @Consumes(PostMicrostreamBackup.CONSUMES) - @Produces(PostMicrostreamBackup.PRODUCES) - public void postMicrostreamBackup(@RequestBody final PostMicrostreamBackup.Body body) throws HttpResponseException + @Path(PostBackup.PATH) + @Consumes(PostBackup.CONSUMES) + @Produces(PostBackup.PRODUCES) + public void postDataGridBackup(@RequestBody final PostBackup.Body body) throws HttpResponseException { - this.requestController.postMicrostreamBackup(body); + this.requestController.postDataGridBackup(body); } @GET - @Path(GetMicrostreamBackup.PATH) - @Produces(GetMicrostreamBackup.PRODUCES) - public boolean getMicrostreamBackup() throws HttpResponseException + @Path(GetBackup.PATH) + @Produces(GetBackup.PRODUCES) + public boolean getDataGridBackup() throws HttpResponseException { - return this.requestController.getMicrostreamBackup(); + return this.requestController.getDataGridBackup(); } @POST - @Path(PostMicrostreamUpdates.PATH) - @Consumes(PostMicrostreamUpdates.CONSUMES) - @Produces(PostMicrostreamUpdates.PRODUCES) - public void postMicrostreamUpdates() throws HttpResponseException + @Path(PostUpdates.PATH) + @Consumes(PostUpdates.CONSUMES) + @Produces(PostUpdates.PRODUCES) + public void postDataGridUpdates() throws HttpResponseException { - this.requestController.postMicrostreamUpdates(); + this.requestController.postDataGridUpdates(); } @GET - @Path(GetMicrostreamUpdates.PATH) - @Produces(GetMicrostreamUpdates.PRODUCES) - public boolean getMicrostreamUpdates() throws HttpResponseException + @Path(GetUpdates.PATH) + @Produces(GetUpdates.PRODUCES) + public boolean getDataGridUpdates() throws HttpResponseException { - return this.requestController.getMicrostreamUpdates(); + return this.requestController.getDataGridUpdates(); } @POST - @Path(PostMicrostreamResumeUpdates.PATH) - @Consumes(PostMicrostreamResumeUpdates.CONSUMES) - @Produces(PostMicrostreamResumeUpdates.PRODUCES) - public void postMicrostreamResumeUpdates() throws HttpResponseException + @Path(PostResumeUpdates.PATH) + @Consumes(PostResumeUpdates.CONSUMES) + @Produces(PostResumeUpdates.PRODUCES) + public void postDataGridResumeUpdates() throws HttpResponseException { - this.requestController.postMicrostreamResumeUpdates(); + this.requestController.postDataGridResumeUpdates(); } @POST - @Path(PostMicrostreamGc.PATH) - @Consumes(PostMicrostreamGc.CONSUMES) - @Produces(PostMicrostreamGc.PRODUCES) - public void postMicrostreamGc() throws HttpResponseException + @Path(PostGc.PATH) + @Consumes(PostGc.CONSUMES) + @Produces(PostGc.PRODUCES) + public void postDataGridGc() throws HttpResponseException { - this.requestController.postMicrostreamGc(); + this.requestController.postDataGridGc(); } @GET - @Path(GetMicrostreamGc.PATH) - @Produces(GetMicrostreamGc.PRODUCES) - public boolean getMicrostreamGc() throws HttpResponseException + @Path(GetGc.PATH) + @Produces(GetGc.PRODUCES) + public boolean getDataGridGc() throws HttpResponseException { - return this.requestController.getMicrostreamGc(); + return this.requestController.getDataGridGc(); } } diff --git a/cluster/nodelibrary/micronaut/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/micronaut/types/MicronautClusterController.java b/cluster/nodelibrary/micronaut/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/micronaut/types/MicronautClusterController.java index 2d89b56..036598c 100644 --- a/cluster/nodelibrary/micronaut/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/micronaut/types/MicronautClusterController.java +++ b/cluster/nodelibrary/micronaut/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/micronaut/types/MicronautClusterController.java @@ -16,20 +16,20 @@ import org.eclipse.datagrid.cluster.nodelibrary.exceptions.HttpResponseException; import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRequestController; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.GetMicrostreamBackup; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.GetMicrostreamDistributor; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.GetMicrostreamGc; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.GetMicrostreamHealth; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.GetMicrostreamHealthReady; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.GetMicrostreamStorageBytes; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.GetMicrostreamUpdates; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.PostMicrostreamActivateDistributorFinish; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.PostMicrostreamActivateDistributorStart; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.PostMicrostreamBackup; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.PostMicrostreamGc; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.PostMicrostreamResumeUpdates; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.PostMicrostreamUpdates; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.GetBackup; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.GetDistributor; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.GetGc; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.GetHealth; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.GetHealthReady; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.GetStorageBytes; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.GetUpdates; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.PostActivateDistributorFinish; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.PostActivateDistributorStart; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.PostBackup; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.PostGc; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.PostResumeUpdates; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.PostUpdates; import io.micronaut.core.annotation.Introspected; import io.micronaut.http.HttpResponse; @@ -43,9 +43,9 @@ import io.micronaut.scheduling.annotation.ExecuteOn; import io.micronaut.serde.annotation.SerdeImport; -@Controller(ClusterRestRouteConfigurations.ROOT_PATH) -@Introspected(classes = PostMicrostreamBackup.Body.class) -@SerdeImport(PostMicrostreamBackup.Body.class) +@Controller(StorageNodeRestRouteConfigurations.ROOT_PATH) +@Introspected(classes = PostBackup.Body.class) +@SerdeImport(PostBackup.Body.class) public class MicronautClusterController { private final ClusterRestRequestController controller; @@ -66,103 +66,103 @@ public HttpResponse handleNodelibraryException(final HttpResponseException return response; } - @Get(value = GetMicrostreamDistributor.PATH, produces = GetMicrostreamDistributor.PRODUCES) - public boolean getMicrostreamDistributor() throws HttpResponseException + @Get(value = GetDistributor.PATH, produces = GetDistributor.PRODUCES) + public boolean getDataGridDistributor() throws HttpResponseException { - return this.controller.getMicrostreamDistributor(); + return this.controller.getDataGridDistributor(); } @Post( - value = PostMicrostreamActivateDistributorStart.PATH, - consumes = PostMicrostreamActivateDistributorStart.CONSUMES, - produces = PostMicrostreamActivateDistributorStart.PRODUCES + value = PostActivateDistributorStart.PATH, + consumes = PostActivateDistributorStart.CONSUMES, + produces = PostActivateDistributorStart.PRODUCES ) - public void postMicrostreamActivateDistributorStart() throws HttpResponseException + public void postDataGridActivateDistributorStart() throws HttpResponseException { - this.controller.postMicrostreamActivateDistributorStart(); + this.controller.postDataGridActivateDistributorStart(); } @Post( - value = PostMicrostreamActivateDistributorFinish.PATH, - consumes = PostMicrostreamActivateDistributorFinish.CONSUMES, - produces = PostMicrostreamActivateDistributorFinish.PRODUCES + value = PostActivateDistributorFinish.PATH, + consumes = PostActivateDistributorFinish.CONSUMES, + produces = PostActivateDistributorFinish.PRODUCES ) - public boolean postMicrostreamActivateDistributorFinish() throws HttpResponseException + public boolean postDataGridActivateDistributorFinish() throws HttpResponseException { - return this.controller.postMicrostreamActivateDistributorFinish(); + return this.controller.postDataGridActivateDistributorFinish(); } - @Get(value = GetMicrostreamHealth.PATH, produces = GetMicrostreamHealth.PRODUCES) - public void getMicrostreamHealth() throws HttpResponseException + @Get(value = GetHealth.PATH, produces = GetHealth.PRODUCES) + public void getDataGridHealth() throws HttpResponseException { - this.controller.getMicrostreamHealth(); + this.controller.getDataGridHealth(); } - @Get(value = GetMicrostreamHealthReady.PATH, produces = GetMicrostreamHealthReady.PRODUCES) + @Get(value = GetHealthReady.PATH, produces = GetHealthReady.PRODUCES) @ExecuteOn(TaskExecutors.IO) - public void getMicrostreamHealthReady() throws HttpResponseException + public void getDataGridHealthReady() throws HttpResponseException { - this.controller.getMicrostreamHealthReady(); + this.controller.getDataGridHealthReady(); } - @Get(value = GetMicrostreamStorageBytes.PATH, produces = GetMicrostreamStorageBytes.PRODUCES) + @Get(value = GetStorageBytes.PATH, produces = GetStorageBytes.PRODUCES) @ExecuteOn(TaskExecutors.IO) - public String getMicrostreamStorageBytes() throws HttpResponseException + public String getDataGridStorageBytes() throws HttpResponseException { - return this.controller.getMicrostreamStorageBytes(); + return this.controller.getDataGridStorageBytes(); } @Post( - value = PostMicrostreamBackup.PATH, - consumes = PostMicrostreamBackup.CONSUMES, - produces = PostMicrostreamBackup.PRODUCES + value = PostBackup.PATH, + consumes = PostBackup.CONSUMES, + produces = PostBackup.PRODUCES ) - public void postMicrostreamBackup(@Body final PostMicrostreamBackup.Body body) throws HttpResponseException + public void postDataGridBackup(@Body final PostBackup.Body body) throws HttpResponseException { - this.controller.postMicrostreamBackup(body); + this.controller.postDataGridBackup(body); } - @Get(value = GetMicrostreamBackup.PATH, produces = GetMicrostreamBackup.PRODUCES) - public boolean getMicrostreamBackup() throws HttpResponseException + @Get(value = GetBackup.PATH, produces = GetBackup.PRODUCES) + public boolean getDataGridBackup() throws HttpResponseException { - return this.controller.getMicrostreamBackup(); + return this.controller.getDataGridBackup(); } @Post( - value = PostMicrostreamUpdates.PATH, - consumes = PostMicrostreamUpdates.CONSUMES, - produces = PostMicrostreamUpdates.PRODUCES + value = PostUpdates.PATH, + consumes = PostUpdates.CONSUMES, + produces = PostUpdates.PRODUCES ) - public void postMicrostreamUpdates() throws HttpResponseException + public void postDataGridUpdates() throws HttpResponseException { - this.controller.postMicrostreamUpdates(); + this.controller.postDataGridUpdates(); } - @Get(value = GetMicrostreamUpdates.PATH, produces = GetMicrostreamUpdates.PRODUCES) - public boolean getMicrostreamUpdates() throws HttpResponseException + @Get(value = GetUpdates.PATH, produces = GetUpdates.PRODUCES) + public boolean getDataGridUpdates() throws HttpResponseException { - return this.controller.getMicrostreamUpdates(); + return this.controller.getDataGridUpdates(); } @Post( - value = PostMicrostreamResumeUpdates.PATH, - consumes = PostMicrostreamResumeUpdates.CONSUMES, - produces = PostMicrostreamResumeUpdates.PRODUCES + value = PostResumeUpdates.PATH, + consumes = PostResumeUpdates.CONSUMES, + produces = PostResumeUpdates.PRODUCES ) - public void postMicrostreamResumeUpdates() throws HttpResponseException + public void postDataGridResumeUpdates() throws HttpResponseException { - this.controller.postMicrostreamResumeUpdates(); + this.controller.postDataGridResumeUpdates(); } - @Post(value = PostMicrostreamGc.PATH, consumes = PostMicrostreamGc.CONSUMES, produces = PostMicrostreamGc.PRODUCES) - public void postMicrostreamGc() throws HttpResponseException + @Post(value = PostGc.PATH, consumes = PostGc.CONSUMES, produces = PostGc.PRODUCES) + public void postDataGridGc() throws HttpResponseException { - this.controller.postMicrostreamGc(); + this.controller.postDataGridGc(); } - @Get(value = GetMicrostreamGc.PATH, produces = GetMicrostreamGc.PRODUCES) - public boolean getMicrostreamGc() throws HttpResponseException + @Get(value = GetGc.PATH, produces = GetGc.PRODUCES) + public boolean getDataGridGc() throws HttpResponseException { - return this.controller.getMicrostreamGc(); + return this.controller.getDataGridGc(); } } diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/exceptions/NotADistributorException.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/exceptions/NotADistributorException.java index 1d8ff5e..2e98c17 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/exceptions/NotADistributorException.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/exceptions/NotADistributorException.java @@ -22,7 +22,7 @@ public class NotADistributorException extends BadRequestException { - public static final String NAD_HEADER_KEY = "Microstream-NAD"; + public static final String NAD_HEADER_KEY = "DataGrid-NAD"; public static final String NAD_HEADER_VALUE = Boolean.TRUE.toString(); public NotADistributorException() diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java index 0dcf741..cb491d9 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java @@ -17,7 +17,7 @@ import org.eclipse.datagrid.cluster.nodelibrary.exceptions.BadRequestException; import org.eclipse.datagrid.cluster.nodelibrary.exceptions.HttpResponseException; import org.eclipse.datagrid.cluster.nodelibrary.exceptions.InternalServerErrorException; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.PostMicrostreamBackup; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.PostBackup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,32 +29,32 @@ public interface ClusterRestRequestController extends AutoCloseable { - boolean getMicrostreamDistributor() throws HttpResponseException; + boolean getDataGridDistributor() throws HttpResponseException; - void postMicrostreamActivateDistributorStart() throws HttpResponseException; + void postDataGridActivateDistributorStart() throws HttpResponseException; - boolean postMicrostreamActivateDistributorFinish() throws HttpResponseException; + boolean postDataGridActivateDistributorFinish() throws HttpResponseException; - void getMicrostreamHealth() throws HttpResponseException; + void getDataGridHealth() throws HttpResponseException; - void getMicrostreamHealthReady() throws HttpResponseException; + void getDataGridHealthReady() throws HttpResponseException; // TODO: Rename to get statistics or monitoring etc. - String getMicrostreamStorageBytes() throws HttpResponseException; + String getDataGridStorageBytes() throws HttpResponseException; - void postMicrostreamBackup(PostMicrostreamBackup.Body body) throws HttpResponseException; + void postDataGridBackup(PostBackup.Body body) throws HttpResponseException; - boolean getMicrostreamBackup() throws HttpResponseException; + boolean getDataGridBackup() throws HttpResponseException; - void postMicrostreamUpdates() throws HttpResponseException; + void postDataGridUpdates() throws HttpResponseException; - boolean getMicrostreamUpdates() throws HttpResponseException; + boolean getDataGridUpdates() throws HttpResponseException; - void postMicrostreamResumeUpdates() throws HttpResponseException; + void postDataGridResumeUpdates() throws HttpResponseException; - void postMicrostreamGc() throws HttpResponseException; + void postDataGridGc() throws HttpResponseException; - boolean getMicrostreamGc() throws HttpResponseException; + boolean getDataGridGc() throws HttpResponseException; @Override void close(); @@ -102,21 +102,21 @@ protected Abstract(final ClusterNodeManager nodeManager, final NodelibraryProper } @Override - public void postMicrostreamGc() throws HttpResponseException + public void postDataGridGc() throws HttpResponseException { - LOG.trace("Handling postMicrostreamGc request"); + LOG.trace("Handling postDataGridGc request"); this.handleRequest(this.nodeManager::startStorageChecks); } @Override - public boolean getMicrostreamGc() throws HttpResponseException + public boolean getDataGridGc() throws HttpResponseException { - LOG.trace("Handling getMicrostreamGc request"); + LOG.trace("Handling getDataGridGc request"); return this.handleRequest(this.nodeManager::isRunningStorageChecks); } @Override - public void getMicrostreamHealth() throws HttpResponseException + public void getDataGridHealth() throws HttpResponseException { this.handleRequest(() -> { @@ -128,7 +128,7 @@ public void getMicrostreamHealth() throws HttpResponseException } @Override - public void getMicrostreamHealthReady() throws HttpResponseException + public void getDataGridHealthReady() throws HttpResponseException { this.handleRequest(() -> { @@ -140,7 +140,7 @@ public void getMicrostreamHealthReady() throws HttpResponseException } @Override - public String getMicrostreamStorageBytes() throws HttpResponseException + public String getDataGridStorageBytes() throws HttpResponseException { return this.handleRequest(() -> { @@ -158,49 +158,49 @@ public String getMicrostreamStorageBytes() throws HttpResponseException } @Override - public boolean postMicrostreamActivateDistributorFinish() throws HttpResponseException + public boolean postDataGridActivateDistributorFinish() throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getMicrostreamDistributor() throws HttpResponseException + public boolean getDataGridDistributor() throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getMicrostreamUpdates() throws HttpResponseException + public boolean getDataGridUpdates() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postMicrostreamResumeUpdates() throws HttpResponseException + public void postDataGridResumeUpdates() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postMicrostreamActivateDistributorStart() throws HttpResponseException + public void postDataGridActivateDistributorStart() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postMicrostreamBackup(PostMicrostreamBackup.Body body) throws HttpResponseException + public void postDataGridBackup(PostBackup.Body body) throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getMicrostreamBackup() throws HttpResponseException + public boolean getDataGridBackup() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postMicrostreamUpdates() throws HttpResponseException + public void postDataGridUpdates() throws HttpResponseException { throw new BadRequestException(); } @@ -256,21 +256,21 @@ private StorageNode(final StorageNodeManager storageNodeManager, final Nodelibra } @Override - public boolean postMicrostreamActivateDistributorFinish() throws HttpResponseException + public boolean postDataGridActivateDistributorFinish() throws HttpResponseException { return this.handleRequest(this.storageNodeManager::finishDistributonSwitch); } @Override - public boolean getMicrostreamDistributor() throws HttpResponseException + public boolean getDataGridDistributor() throws HttpResponseException { return this.handleRequest(this.storageNodeManager::isDistributor); } @Override - public void postMicrostreamActivateDistributorStart() throws HttpResponseException + public void postDataGridActivateDistributorStart() throws HttpResponseException { - LOG.trace("Handling postMicrostreamActivateDistributorStart request"); + LOG.trace("Handling postDataGridActivateDistributorStart request"); this.handleRequest(() -> { if (!this.storageNodeManager.isDistributor()) @@ -299,28 +299,28 @@ private BackupNode(final BackupNodeManager backupNodeManager, final NodelibraryP } @Override - public void postMicrostreamBackup(PostMicrostreamBackup.Body body) throws HttpResponseException + public void postDataGridBackup(PostBackup.Body body) throws HttpResponseException { - LOG.trace("Handling postMicrostreamBackup request"); + LOG.trace("Handling postDataGridBackup request"); this.handleRequest(() -> this.backupNodeManager.createStorageBackup(unbox(body.getUseManualSlot()))); } @Override - public boolean getMicrostreamBackup() throws HttpResponseException + public boolean getDataGridBackup() throws HttpResponseException { return this.handleRequest(this.backupNodeManager::isBackupRunning); } @Override - public void postMicrostreamUpdates() throws HttpResponseException + public void postDataGridUpdates() throws HttpResponseException { - LOG.trace("Handling postMicrostreamUpdates request"); + LOG.trace("Handling postDataGridUpdates request"); this.handleRequest(this.backupNodeManager::stopReadingAtLatestOffset); } @Override - public boolean getMicrostreamUpdates() throws HttpResponseException + public boolean getDataGridUpdates() throws HttpResponseException { return this.handleRequest(() -> { @@ -330,9 +330,9 @@ public boolean getMicrostreamUpdates() throws HttpResponseException } @Override - public void postMicrostreamResumeUpdates() throws HttpResponseException + public void postDataGridResumeUpdates() throws HttpResponseException { - LOG.trace("Handling postMicrostreamResumeUpdates request"); + LOG.trace("Handling postDataGridResumeUpdates request"); this.handleRequest(this.backupNodeManager::resumeReading); } @@ -356,34 +356,34 @@ private MicroNode(final MicroNodeManager microNodeManager, final NodelibraryProp } @Override - public boolean postMicrostreamActivateDistributorFinish() throws HttpResponseException + public boolean postDataGridActivateDistributorFinish() throws HttpResponseException { // micro nodes are always the distributor return true; } @Override - public boolean getMicrostreamDistributor() throws HttpResponseException + public boolean getDataGridDistributor() throws HttpResponseException { // micro nodes are always the distributor return true; } @Override - public void postMicrostreamActivateDistributorStart() throws HttpResponseException + public void postDataGridActivateDistributorStart() throws HttpResponseException { // micro nodes are always the distributor } @Override - public void postMicrostreamBackup(PostMicrostreamBackup.Body body) throws HttpResponseException + public void postDataGridBackup(PostBackup.Body body) throws HttpResponseException { - LOG.trace("Handling postMicrostreamBackup request"); + LOG.trace("Handling postDataGridBackup request"); this.handleRequest(this.microNodeManager::createStorageBackup); } @Override - public boolean getMicrostreamBackup() throws HttpResponseException + public boolean getDataGridBackup() throws HttpResponseException { return this.handleRequest(this.microNodeManager::isBackupRunning); } @@ -406,79 +406,79 @@ private DevNode() } @Override - public boolean getMicrostreamDistributor() throws HttpResponseException + public boolean getDataGridDistributor() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postMicrostreamActivateDistributorStart() throws HttpResponseException + public void postDataGridActivateDistributorStart() throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean postMicrostreamActivateDistributorFinish() throws HttpResponseException + public boolean postDataGridActivateDistributorFinish() throws HttpResponseException { throw new BadRequestException(); } @Override - public void getMicrostreamHealth() throws HttpResponseException + public void getDataGridHealth() throws HttpResponseException { throw new BadRequestException(); } @Override - public void getMicrostreamHealthReady() throws HttpResponseException + public void getDataGridHealthReady() throws HttpResponseException { throw new BadRequestException(); } @Override - public String getMicrostreamStorageBytes() throws HttpResponseException + public String getDataGridStorageBytes() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postMicrostreamBackup(PostMicrostreamBackup.Body body) throws HttpResponseException + public void postDataGridBackup(PostBackup.Body body) throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getMicrostreamBackup() throws HttpResponseException + public boolean getDataGridBackup() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postMicrostreamUpdates() throws HttpResponseException + public void postDataGridUpdates() throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getMicrostreamUpdates() throws HttpResponseException + public boolean getDataGridUpdates() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postMicrostreamResumeUpdates() throws HttpResponseException + public void postDataGridResumeUpdates() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postMicrostreamGc() throws HttpResponseException + public void postDataGridGc() throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getMicrostreamGc() throws HttpResponseException + public boolean getDataGridGc() throws HttpResponseException { throw new BadRequestException(); } diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRouteConfigurations.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeRestRouteConfigurations.java similarity index 72% rename from cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRouteConfigurations.java rename to cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeRestRouteConfigurations.java index 8ff090c..58e24a7 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRouteConfigurations.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeRestRouteConfigurations.java @@ -14,13 +14,12 @@ * #L% */ -public final class ClusterRestRouteConfigurations +public final class StorageNodeRestRouteConfigurations { public static final class MediaTypes { private static final String WILDCARD = "*/*"; private static final String APPLICATION_JSON = "application/json"; - private static final String APPLICATION_OCTET_STREAM = "application/octet-stream"; private static final String TEXT_PLAIN = "text/plain"; private MediaTypes() @@ -30,69 +29,69 @@ private MediaTypes() public static final String ROOT_PATH = "/eclipse-datagrid-cluster-controller"; - public static final class GetMicrostreamDistributor + public static final class GetDistributor { public static final String PATH = "/eclipse-datagrid-distributor"; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; - private GetMicrostreamDistributor() + private GetDistributor() { } } - public static final class PostMicrostreamActivateDistributorStart + public static final class PostActivateDistributorStart { public static final String PATH = "/eclipse-datagrid-activate-distributor/start"; public static final String CONSUMES = MediaTypes.WILDCARD; public static final String PRODUCES = MediaTypes.WILDCARD; - private PostMicrostreamActivateDistributorStart() + private PostActivateDistributorStart() { } } - public static final class PostMicrostreamActivateDistributorFinish + public static final class PostActivateDistributorFinish { public static final String PATH = "/eclipse-datagrid-activate-distributor/finish"; public static final String CONSUMES = MediaTypes.WILDCARD; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; - private PostMicrostreamActivateDistributorFinish() + private PostActivateDistributorFinish() { } } - public static final class GetMicrostreamHealth + public static final class GetHealth { public static final String PATH = "/eclipse-datagrid-health"; public static final String PRODUCES = MediaTypes.WILDCARD; - private GetMicrostreamHealth() + private GetHealth() { } } - public static final class GetMicrostreamHealthReady + public static final class GetHealthReady { public static final String PATH = "/eclipse-datagrid-health/ready"; public static final String PRODUCES = MediaTypes.WILDCARD; - private GetMicrostreamHealthReady() + private GetHealthReady() { } } - public static final class GetMicrostreamStorageBytes + public static final class GetStorageBytes { public static final String PATH = "/eclipse-datagrid-storage-bytes"; public static final String PRODUCES = MediaTypes.TEXT_PLAIN; - private GetMicrostreamStorageBytes() + private GetStorageBytes() { } } - public static final class PostMicrostreamBackup + public static final class PostBackup { public static final String PATH = "/eclipse-datagrid-backup"; public static final String CONSUMES = MediaTypes.APPLICATION_JSON; @@ -113,76 +112,76 @@ public void setUseManualSlot(final Boolean useManualSlot) } } - private PostMicrostreamBackup() + private PostBackup() { } } - public static final class GetMicrostreamBackup + public static final class GetBackup { public static final String PATH = "/eclipse-datagrid-backup"; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; - private GetMicrostreamBackup() + private GetBackup() { } } - public static final class PostMicrostreamUpdates + public static final class PostUpdates { public static final String PATH = "/eclipse-datagrid-updates"; public static final String CONSUMES = MediaTypes.WILDCARD; public static final String PRODUCES = MediaTypes.WILDCARD; - private PostMicrostreamUpdates() + private PostUpdates() { } } - public static final class GetMicrostreamUpdates + public static final class GetUpdates { public static final String PATH = "/eclipse-datagrid-updates"; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; - private GetMicrostreamUpdates() + private GetUpdates() { } } - public static final class PostMicrostreamResumeUpdates + public static final class PostResumeUpdates { public static final String PATH = "/eclipse-datagrid-resume-updates"; public static final String CONSUMES = MediaTypes.WILDCARD; public static final String PRODUCES = MediaTypes.WILDCARD; - private PostMicrostreamResumeUpdates() + private PostResumeUpdates() { } } - public static final class PostMicrostreamGc + public static final class PostGc { public static final String PATH = "/eclipse-datagrid-gc"; public static final String CONSUMES = MediaTypes.WILDCARD; public static final String PRODUCES = MediaTypes.WILDCARD; - private PostMicrostreamGc() + private PostGc() { } } - public static final class GetMicrostreamGc + public static final class GetGc { public static final String PATH = "/eclipse-datagrid-gc"; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; - private GetMicrostreamGc() + private GetGc() { } } - private ClusterRestRouteConfigurations() + private StorageNodeRestRouteConfigurations() { } } diff --git a/cluster/nodelibrary/springboot/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/springboot/SpringBootClusterController.java b/cluster/nodelibrary/springboot/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/springboot/SpringBootClusterController.java index 1ea17f1..93a4fa8 100644 --- a/cluster/nodelibrary/springboot/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/springboot/SpringBootClusterController.java +++ b/cluster/nodelibrary/springboot/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/springboot/SpringBootClusterController.java @@ -16,8 +16,8 @@ import org.eclipse.datagrid.cluster.nodelibrary.exceptions.HttpResponseException; import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRequestController; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations; -import org.eclipse.datagrid.cluster.nodelibrary.types.ClusterRestRouteConfigurations.*; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations; +import org.eclipse.datagrid.cluster.nodelibrary.types.StorageNodeRestRouteConfigurations.*; import org.eclipse.serializer.util.X; import org.springframework.http.HttpStatus; import org.springframework.scheduling.annotation.Async; @@ -28,7 +28,7 @@ import java.util.function.Supplier; @RestController -@RequestMapping(ClusterRestRouteConfigurations.ROOT_PATH) +@RequestMapping(StorageNodeRestRouteConfigurations.ROOT_PATH) public class SpringBootClusterController { private final ClusterRestRequestController controller; @@ -38,109 +38,109 @@ public SpringBootClusterController(final ClusterRestRequestController controller this.controller = controller; } - @GetMapping(value = GetMicrostreamDistributor.PATH, produces = GetMicrostreamDistributor.PRODUCES) - public boolean getMicrostreamDistributor() throws HttpResponseException + @GetMapping(value = GetDistributor.PATH, produces = GetDistributor.PRODUCES) + public boolean getDataGridDistributor() throws HttpResponseException { - return this.call(this.controller::getMicrostreamDistributor); + return this.call(this.controller::getDataGridDistributor); } @PostMapping( - value = PostMicrostreamActivateDistributorStart.PATH, - consumes = PostMicrostreamActivateDistributorStart.CONSUMES, - produces = PostMicrostreamActivateDistributorStart.PRODUCES + value = PostActivateDistributorStart.PATH, + consumes = PostActivateDistributorStart.CONSUMES, + produces = PostActivateDistributorStart.PRODUCES ) - public void postMicrostreamActivateDistributorStart() throws HttpResponseException + public void postDataGridActivateDistributorStart() throws HttpResponseException { - this.call(this.controller::postMicrostreamActivateDistributorStart); + this.call(this.controller::postDataGridActivateDistributorStart); } @PostMapping( - value = PostMicrostreamActivateDistributorFinish.PATH, - consumes = PostMicrostreamActivateDistributorFinish.CONSUMES, - produces = PostMicrostreamActivateDistributorFinish.PRODUCES + value = PostActivateDistributorFinish.PATH, + consumes = PostActivateDistributorFinish.CONSUMES, + produces = PostActivateDistributorFinish.PRODUCES ) - public boolean postMicrostreamActivateDistributorFinish() throws HttpResponseException + public boolean postDataGridActivateDistributorFinish() throws HttpResponseException { - return this.call(this.controller::postMicrostreamActivateDistributorFinish); + return this.call(this.controller::postDataGridActivateDistributorFinish); } - @GetMapping(value = GetMicrostreamHealth.PATH, produces = GetMicrostreamHealth.PRODUCES) - public void getMicrostreamHealth() throws HttpResponseException + @GetMapping(value = GetHealth.PATH, produces = GetHealth.PRODUCES) + public void getDataGridHealth() throws HttpResponseException { - this.call(this.controller::getMicrostreamHealth); + this.call(this.controller::getDataGridHealth); } - @GetMapping(value = GetMicrostreamHealthReady.PATH, produces = GetMicrostreamHealthReady.PRODUCES) + @GetMapping(value = GetHealthReady.PATH, produces = GetHealthReady.PRODUCES) @Async - public CompletableFuture getMicrostreamHealthReady() throws HttpResponseException + public CompletableFuture getDataGridHealthReady() throws HttpResponseException { - this.call(this.controller::getMicrostreamHealthReady); + this.call(this.controller::getDataGridHealthReady); return CompletableFuture.completedFuture(null); } - @GetMapping(value = GetMicrostreamStorageBytes.PATH, produces = GetMicrostreamStorageBytes.PRODUCES) + @GetMapping(value = GetStorageBytes.PATH, produces = GetStorageBytes.PRODUCES) @Async - public CompletableFuture getMicrostreamStorageBytes() throws HttpResponseException + public CompletableFuture getDataGridStorageBytes() throws HttpResponseException { - return CompletableFuture.completedFuture(this.call(this.controller::getMicrostreamStorageBytes)); + return CompletableFuture.completedFuture(this.call(this.controller::getDataGridStorageBytes)); } @PostMapping( - value = PostMicrostreamBackup.PATH, - consumes = PostMicrostreamBackup.CONSUMES, - produces = PostMicrostreamBackup.PRODUCES + value = PostBackup.PATH, + consumes = PostBackup.CONSUMES, + produces = PostBackup.PRODUCES ) - public void postMicrostreamBackup(@RequestBody final PostMicrostreamBackup.Body body) throws HttpResponseException + public void postDataGridBackup(@RequestBody final PostBackup.Body body) throws HttpResponseException { - this.call(() -> this.controller.postMicrostreamBackup(body)); + this.call(() -> this.controller.postDataGridBackup(body)); } @PostMapping( - value = PostMicrostreamUpdates.PATH, - consumes = PostMicrostreamUpdates.CONSUMES, - produces = PostMicrostreamUpdates.PRODUCES + value = PostUpdates.PATH, + consumes = PostUpdates.CONSUMES, + produces = PostUpdates.PRODUCES ) - public void postMicrostreamUpdates() throws HttpResponseException + public void postDataGridUpdates() throws HttpResponseException { - this.call(this.controller::postMicrostreamUpdates); + this.call(this.controller::postDataGridUpdates); } - @GetMapping(value = GetMicrostreamBackup.PATH, produces = GetMicrostreamBackup.PRODUCES) - public boolean getMicrostreamBackup() throws HttpResponseException + @GetMapping(value = GetBackup.PATH, produces = GetBackup.PRODUCES) + public boolean getDataGridBackup() throws HttpResponseException { - return this.call(this.controller::getMicrostreamBackup); + return this.call(this.controller::getDataGridBackup); } - @GetMapping(value = GetMicrostreamUpdates.PATH, produces = GetMicrostreamUpdates.PRODUCES) - public boolean getMicrostreamUpdates() throws HttpResponseException + @GetMapping(value = GetUpdates.PATH, produces = GetUpdates.PRODUCES) + public boolean getDataGridUpdates() throws HttpResponseException { - return this.call(this.controller::getMicrostreamUpdates); + return this.call(this.controller::getDataGridUpdates); } @PostMapping( - value = PostMicrostreamResumeUpdates.PATH, - consumes = PostMicrostreamResumeUpdates.CONSUMES, - produces = PostMicrostreamResumeUpdates.PRODUCES + value = PostResumeUpdates.PATH, + consumes = PostResumeUpdates.CONSUMES, + produces = PostResumeUpdates.PRODUCES ) - public void postMicrostreamResumeUpdates() throws HttpResponseException + public void postDataGridResumeUpdates() throws HttpResponseException { - this.call(this.controller::postMicrostreamResumeUpdates); + this.call(this.controller::postDataGridResumeUpdates); } @PostMapping( - value = PostMicrostreamGc.PATH, - consumes = PostMicrostreamGc.CONSUMES, - produces = PostMicrostreamGc.PRODUCES + value = PostGc.PATH, + consumes = PostGc.CONSUMES, + produces = PostGc.PRODUCES ) - public void postMicrostreamGc() throws HttpResponseException + public void postDataGridGc() throws HttpResponseException { - this.call(this.controller::postMicrostreamGc); + this.call(this.controller::postDataGridGc); } - @GetMapping(value = GetMicrostreamGc.PATH, produces = GetMicrostreamGc.PRODUCES) - public boolean getMicrostreamGc() throws HttpResponseException + @GetMapping(value = GetGc.PATH, produces = GetGc.PRODUCES) + public boolean getDataGridGc() throws HttpResponseException { - return this.call(this.controller::getMicrostreamGc); + return this.call(this.controller::getDataGridGc); } private T call(final Supplier s) From d57e687fbad182963049859a1e2210a65eb908ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20St=C3=B6r?= Date: Thu, 27 Nov 2025 15:18:14 +0100 Subject: [PATCH 3/7] Simplify endpoint paths --- .../StorageNodeRestRouteConfigurations.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeRestRouteConfigurations.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeRestRouteConfigurations.java index 58e24a7..4a8f108 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeRestRouteConfigurations.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeRestRouteConfigurations.java @@ -27,11 +27,11 @@ private MediaTypes() } } - public static final String ROOT_PATH = "/eclipse-datagrid-cluster-controller"; + public static final String ROOT_PATH = "/eclipse-datagrid"; public static final class GetDistributor { - public static final String PATH = "/eclipse-datagrid-distributor"; + public static final String PATH = "/distributor"; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; private GetDistributor() @@ -41,7 +41,7 @@ private GetDistributor() public static final class PostActivateDistributorStart { - public static final String PATH = "/eclipse-datagrid-activate-distributor/start"; + public static final String PATH = "/activate-distributor/start"; public static final String CONSUMES = MediaTypes.WILDCARD; public static final String PRODUCES = MediaTypes.WILDCARD; @@ -52,7 +52,7 @@ private PostActivateDistributorStart() public static final class PostActivateDistributorFinish { - public static final String PATH = "/eclipse-datagrid-activate-distributor/finish"; + public static final String PATH = "/activate-distributor/finish"; public static final String CONSUMES = MediaTypes.WILDCARD; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; @@ -63,7 +63,7 @@ private PostActivateDistributorFinish() public static final class GetHealth { - public static final String PATH = "/eclipse-datagrid-health"; + public static final String PATH = "/health"; public static final String PRODUCES = MediaTypes.WILDCARD; private GetHealth() @@ -73,7 +73,7 @@ private GetHealth() public static final class GetHealthReady { - public static final String PATH = "/eclipse-datagrid-health/ready"; + public static final String PATH = "/health/ready"; public static final String PRODUCES = MediaTypes.WILDCARD; private GetHealthReady() @@ -83,7 +83,7 @@ private GetHealthReady() public static final class GetStorageBytes { - public static final String PATH = "/eclipse-datagrid-storage-bytes"; + public static final String PATH = "/storage-bytes"; public static final String PRODUCES = MediaTypes.TEXT_PLAIN; private GetStorageBytes() @@ -93,7 +93,7 @@ private GetStorageBytes() public static final class PostBackup { - public static final String PATH = "/eclipse-datagrid-backup"; + public static final String PATH = "/backup"; public static final String CONSUMES = MediaTypes.APPLICATION_JSON; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; @@ -119,7 +119,7 @@ private PostBackup() public static final class GetBackup { - public static final String PATH = "/eclipse-datagrid-backup"; + public static final String PATH = "/backup"; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; @@ -130,7 +130,7 @@ private GetBackup() public static final class PostUpdates { - public static final String PATH = "/eclipse-datagrid-updates"; + public static final String PATH = "/updates"; public static final String CONSUMES = MediaTypes.WILDCARD; public static final String PRODUCES = MediaTypes.WILDCARD; @@ -141,7 +141,7 @@ private PostUpdates() public static final class GetUpdates { - public static final String PATH = "/eclipse-datagrid-updates"; + public static final String PATH = "/updates"; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; private GetUpdates() @@ -151,7 +151,7 @@ private GetUpdates() public static final class PostResumeUpdates { - public static final String PATH = "/eclipse-datagrid-resume-updates"; + public static final String PATH = "/resume-updates"; public static final String CONSUMES = MediaTypes.WILDCARD; public static final String PRODUCES = MediaTypes.WILDCARD; @@ -162,7 +162,7 @@ private PostResumeUpdates() public static final class PostGc { - public static final String PATH = "/eclipse-datagrid-gc"; + public static final String PATH = "/gc"; public static final String CONSUMES = MediaTypes.WILDCARD; public static final String PRODUCES = MediaTypes.WILDCARD; @@ -173,7 +173,7 @@ private PostGc() public static final class GetGc { - public static final String PATH = "/eclipse-datagrid-gc"; + public static final String PATH = "/gc"; public static final String PRODUCES = MediaTypes.APPLICATION_JSON; private GetGc() From 158e1c74496fad088f0b9abcdaaba5bd236271eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20St=C3=B6r?= Date: Thu, 27 Nov 2025 15:22:52 +0100 Subject: [PATCH 4/7] Also simplify rest controller method names --- .../helidon/HelidonClusterController.java | 52 ++++----- .../types/MicronautClusterController.java | 52 ++++----- .../types/ClusterRestRequestController.java | 104 +++++++++--------- .../SpringBootClusterController.java | 52 ++++----- 4 files changed, 130 insertions(+), 130 deletions(-) diff --git a/cluster/nodelibrary/helidon/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/helidon/HelidonClusterController.java b/cluster/nodelibrary/helidon/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/helidon/HelidonClusterController.java index 52a0737..45c585c 100644 --- a/cluster/nodelibrary/helidon/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/helidon/HelidonClusterController.java +++ b/cluster/nodelibrary/helidon/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/helidon/HelidonClusterController.java @@ -38,110 +38,110 @@ public HelidonClusterController(final ClusterRestRequestController requestContro @GET @Path(GetDistributor.PATH) @Produces(GetDistributor.PRODUCES) - public boolean getDataGridDistributor() throws HttpResponseException + public boolean getDistributor() throws HttpResponseException { - return this.requestController.getDataGridDistributor(); + return this.requestController.getDistributor(); } @POST @Path(PostActivateDistributorStart.PATH) @Consumes(PostActivateDistributorStart.CONSUMES) @Produces(PostActivateDistributorStart.PRODUCES) - public void postDataGridActivateDistributorStart() throws HttpResponseException + public void postActivateDistributorStart() throws HttpResponseException { - this.requestController.postDataGridActivateDistributorStart(); + this.requestController.postActivateDistributorStart(); } @POST @Path(PostActivateDistributorFinish.PATH) @Consumes(PostActivateDistributorFinish.CONSUMES) @Produces(PostActivateDistributorFinish.PRODUCES) - public boolean postDataGridActivateDistributorFinish() throws HttpResponseException + public boolean postActivateDistributorFinish() throws HttpResponseException { - return this.requestController.postDataGridActivateDistributorFinish(); + return this.requestController.postActivateDistributorFinish(); } @GET @Path(GetHealth.PATH) @Produces(GetHealth.PRODUCES) - public void getDataGridHealth() throws HttpResponseException + public void getHealth() throws HttpResponseException { - this.requestController.getDataGridHealth(); + this.requestController.getHealth(); } @GET @Path(GetHealthReady.PATH) @Produces(GetHealthReady.PRODUCES) - public void getDataGridHealthReady() throws HttpResponseException + public void getHealthReady() throws HttpResponseException { - this.requestController.getDataGridHealthReady(); + this.requestController.getHealthReady(); } @GET @Path(GetStorageBytes.PATH) @Produces(GetStorageBytes.PRODUCES) - public String getDataGridStorageBytes() throws HttpResponseException + public String getStorageBytes() throws HttpResponseException { - return this.requestController.getDataGridStorageBytes(); + return this.requestController.getStorageBytes(); } @POST @Path(PostBackup.PATH) @Consumes(PostBackup.CONSUMES) @Produces(PostBackup.PRODUCES) - public void postDataGridBackup(@RequestBody final PostBackup.Body body) throws HttpResponseException + public void postBackup(@RequestBody final PostBackup.Body body) throws HttpResponseException { - this.requestController.postDataGridBackup(body); + this.requestController.postBackup(body); } @GET @Path(GetBackup.PATH) @Produces(GetBackup.PRODUCES) - public boolean getDataGridBackup() throws HttpResponseException + public boolean getBackup() throws HttpResponseException { - return this.requestController.getDataGridBackup(); + return this.requestController.getBackup(); } @POST @Path(PostUpdates.PATH) @Consumes(PostUpdates.CONSUMES) @Produces(PostUpdates.PRODUCES) - public void postDataGridUpdates() throws HttpResponseException + public void postUpdates() throws HttpResponseException { - this.requestController.postDataGridUpdates(); + this.requestController.postUpdates(); } @GET @Path(GetUpdates.PATH) @Produces(GetUpdates.PRODUCES) - public boolean getDataGridUpdates() throws HttpResponseException + public boolean getUpdates() throws HttpResponseException { - return this.requestController.getDataGridUpdates(); + return this.requestController.getUpdates(); } @POST @Path(PostResumeUpdates.PATH) @Consumes(PostResumeUpdates.CONSUMES) @Produces(PostResumeUpdates.PRODUCES) - public void postDataGridResumeUpdates() throws HttpResponseException + public void postResumeUpdates() throws HttpResponseException { - this.requestController.postDataGridResumeUpdates(); + this.requestController.postResumeUpdates(); } @POST @Path(PostGc.PATH) @Consumes(PostGc.CONSUMES) @Produces(PostGc.PRODUCES) - public void postDataGridGc() throws HttpResponseException + public void postGc() throws HttpResponseException { - this.requestController.postDataGridGc(); + this.requestController.postGc(); } @GET @Path(GetGc.PATH) @Produces(GetGc.PRODUCES) - public boolean getDataGridGc() throws HttpResponseException + public boolean getGc() throws HttpResponseException { - return this.requestController.getDataGridGc(); + return this.requestController.getGc(); } } diff --git a/cluster/nodelibrary/micronaut/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/micronaut/types/MicronautClusterController.java b/cluster/nodelibrary/micronaut/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/micronaut/types/MicronautClusterController.java index 036598c..c74b540 100644 --- a/cluster/nodelibrary/micronaut/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/micronaut/types/MicronautClusterController.java +++ b/cluster/nodelibrary/micronaut/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/micronaut/types/MicronautClusterController.java @@ -67,9 +67,9 @@ public HttpResponse handleNodelibraryException(final HttpResponseException } @Get(value = GetDistributor.PATH, produces = GetDistributor.PRODUCES) - public boolean getDataGridDistributor() throws HttpResponseException + public boolean getDistributor() throws HttpResponseException { - return this.controller.getDataGridDistributor(); + return this.controller.getDistributor(); } @Post( @@ -77,9 +77,9 @@ public boolean getDataGridDistributor() throws HttpResponseException consumes = PostActivateDistributorStart.CONSUMES, produces = PostActivateDistributorStart.PRODUCES ) - public void postDataGridActivateDistributorStart() throws HttpResponseException + public void postActivateDistributorStart() throws HttpResponseException { - this.controller.postDataGridActivateDistributorStart(); + this.controller.postActivateDistributorStart(); } @Post( @@ -87,29 +87,29 @@ public void postDataGridActivateDistributorStart() throws HttpResponseException consumes = PostActivateDistributorFinish.CONSUMES, produces = PostActivateDistributorFinish.PRODUCES ) - public boolean postDataGridActivateDistributorFinish() throws HttpResponseException + public boolean postActivateDistributorFinish() throws HttpResponseException { - return this.controller.postDataGridActivateDistributorFinish(); + return this.controller.postActivateDistributorFinish(); } @Get(value = GetHealth.PATH, produces = GetHealth.PRODUCES) - public void getDataGridHealth() throws HttpResponseException + public void getHealth() throws HttpResponseException { - this.controller.getDataGridHealth(); + this.controller.getHealth(); } @Get(value = GetHealthReady.PATH, produces = GetHealthReady.PRODUCES) @ExecuteOn(TaskExecutors.IO) - public void getDataGridHealthReady() throws HttpResponseException + public void getHealthReady() throws HttpResponseException { - this.controller.getDataGridHealthReady(); + this.controller.getHealthReady(); } @Get(value = GetStorageBytes.PATH, produces = GetStorageBytes.PRODUCES) @ExecuteOn(TaskExecutors.IO) - public String getDataGridStorageBytes() throws HttpResponseException + public String getStorageBytes() throws HttpResponseException { - return this.controller.getDataGridStorageBytes(); + return this.controller.getStorageBytes(); } @Post( @@ -117,15 +117,15 @@ public String getDataGridStorageBytes() throws HttpResponseException consumes = PostBackup.CONSUMES, produces = PostBackup.PRODUCES ) - public void postDataGridBackup(@Body final PostBackup.Body body) throws HttpResponseException + public void postBackup(@Body final PostBackup.Body body) throws HttpResponseException { - this.controller.postDataGridBackup(body); + this.controller.postBackup(body); } @Get(value = GetBackup.PATH, produces = GetBackup.PRODUCES) - public boolean getDataGridBackup() throws HttpResponseException + public boolean getBackup() throws HttpResponseException { - return this.controller.getDataGridBackup(); + return this.controller.getBackup(); } @Post( @@ -133,15 +133,15 @@ public boolean getDataGridBackup() throws HttpResponseException consumes = PostUpdates.CONSUMES, produces = PostUpdates.PRODUCES ) - public void postDataGridUpdates() throws HttpResponseException + public void postUpdates() throws HttpResponseException { - this.controller.postDataGridUpdates(); + this.controller.postUpdates(); } @Get(value = GetUpdates.PATH, produces = GetUpdates.PRODUCES) - public boolean getDataGridUpdates() throws HttpResponseException + public boolean getUpdates() throws HttpResponseException { - return this.controller.getDataGridUpdates(); + return this.controller.getUpdates(); } @Post( @@ -149,20 +149,20 @@ public boolean getDataGridUpdates() throws HttpResponseException consumes = PostResumeUpdates.CONSUMES, produces = PostResumeUpdates.PRODUCES ) - public void postDataGridResumeUpdates() throws HttpResponseException + public void postResumeUpdates() throws HttpResponseException { - this.controller.postDataGridResumeUpdates(); + this.controller.postResumeUpdates(); } @Post(value = PostGc.PATH, consumes = PostGc.CONSUMES, produces = PostGc.PRODUCES) - public void postDataGridGc() throws HttpResponseException + public void postGc() throws HttpResponseException { - this.controller.postDataGridGc(); + this.controller.postGc(); } @Get(value = GetGc.PATH, produces = GetGc.PRODUCES) - public boolean getDataGridGc() throws HttpResponseException + public boolean getGc() throws HttpResponseException { - return this.controller.getDataGridGc(); + return this.controller.getGc(); } } diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java index cb491d9..ad33da0 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java @@ -29,32 +29,32 @@ public interface ClusterRestRequestController extends AutoCloseable { - boolean getDataGridDistributor() throws HttpResponseException; + boolean getDistributor() throws HttpResponseException; - void postDataGridActivateDistributorStart() throws HttpResponseException; + void postActivateDistributorStart() throws HttpResponseException; - boolean postDataGridActivateDistributorFinish() throws HttpResponseException; + boolean postActivateDistributorFinish() throws HttpResponseException; - void getDataGridHealth() throws HttpResponseException; + void getHealth() throws HttpResponseException; - void getDataGridHealthReady() throws HttpResponseException; + void getHealthReady() throws HttpResponseException; // TODO: Rename to get statistics or monitoring etc. - String getDataGridStorageBytes() throws HttpResponseException; + String getStorageBytes() throws HttpResponseException; - void postDataGridBackup(PostBackup.Body body) throws HttpResponseException; + void postBackup(PostBackup.Body body) throws HttpResponseException; - boolean getDataGridBackup() throws HttpResponseException; + boolean getBackup() throws HttpResponseException; - void postDataGridUpdates() throws HttpResponseException; + void postUpdates() throws HttpResponseException; - boolean getDataGridUpdates() throws HttpResponseException; + boolean getUpdates() throws HttpResponseException; - void postDataGridResumeUpdates() throws HttpResponseException; + void postResumeUpdates() throws HttpResponseException; - void postDataGridGc() throws HttpResponseException; + void postGc() throws HttpResponseException; - boolean getDataGridGc() throws HttpResponseException; + boolean getGc() throws HttpResponseException; @Override void close(); @@ -102,21 +102,21 @@ protected Abstract(final ClusterNodeManager nodeManager, final NodelibraryProper } @Override - public void postDataGridGc() throws HttpResponseException + public void postGc() throws HttpResponseException { LOG.trace("Handling postDataGridGc request"); this.handleRequest(this.nodeManager::startStorageChecks); } @Override - public boolean getDataGridGc() throws HttpResponseException + public boolean getGc() throws HttpResponseException { LOG.trace("Handling getDataGridGc request"); return this.handleRequest(this.nodeManager::isRunningStorageChecks); } @Override - public void getDataGridHealth() throws HttpResponseException + public void getHealth() throws HttpResponseException { this.handleRequest(() -> { @@ -128,7 +128,7 @@ public void getDataGridHealth() throws HttpResponseException } @Override - public void getDataGridHealthReady() throws HttpResponseException + public void getHealthReady() throws HttpResponseException { this.handleRequest(() -> { @@ -140,7 +140,7 @@ public void getDataGridHealthReady() throws HttpResponseException } @Override - public String getDataGridStorageBytes() throws HttpResponseException + public String getStorageBytes() throws HttpResponseException { return this.handleRequest(() -> { @@ -158,49 +158,49 @@ public String getDataGridStorageBytes() throws HttpResponseException } @Override - public boolean postDataGridActivateDistributorFinish() throws HttpResponseException + public boolean postActivateDistributorFinish() throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getDataGridDistributor() throws HttpResponseException + public boolean getDistributor() throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getDataGridUpdates() throws HttpResponseException + public boolean getUpdates() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postDataGridResumeUpdates() throws HttpResponseException + public void postResumeUpdates() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postDataGridActivateDistributorStart() throws HttpResponseException + public void postActivateDistributorStart() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postDataGridBackup(PostBackup.Body body) throws HttpResponseException + public void postBackup(PostBackup.Body body) throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getDataGridBackup() throws HttpResponseException + public boolean getBackup() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postDataGridUpdates() throws HttpResponseException + public void postUpdates() throws HttpResponseException { throw new BadRequestException(); } @@ -256,19 +256,19 @@ private StorageNode(final StorageNodeManager storageNodeManager, final Nodelibra } @Override - public boolean postDataGridActivateDistributorFinish() throws HttpResponseException + public boolean postActivateDistributorFinish() throws HttpResponseException { return this.handleRequest(this.storageNodeManager::finishDistributonSwitch); } @Override - public boolean getDataGridDistributor() throws HttpResponseException + public boolean getDistributor() throws HttpResponseException { return this.handleRequest(this.storageNodeManager::isDistributor); } @Override - public void postDataGridActivateDistributorStart() throws HttpResponseException + public void postActivateDistributorStart() throws HttpResponseException { LOG.trace("Handling postDataGridActivateDistributorStart request"); this.handleRequest(() -> @@ -299,28 +299,28 @@ private BackupNode(final BackupNodeManager backupNodeManager, final NodelibraryP } @Override - public void postDataGridBackup(PostBackup.Body body) throws HttpResponseException + public void postBackup(PostBackup.Body body) throws HttpResponseException { LOG.trace("Handling postDataGridBackup request"); this.handleRequest(() -> this.backupNodeManager.createStorageBackup(unbox(body.getUseManualSlot()))); } @Override - public boolean getDataGridBackup() throws HttpResponseException + public boolean getBackup() throws HttpResponseException { return this.handleRequest(this.backupNodeManager::isBackupRunning); } @Override - public void postDataGridUpdates() throws HttpResponseException + public void postUpdates() throws HttpResponseException { LOG.trace("Handling postDataGridUpdates request"); this.handleRequest(this.backupNodeManager::stopReadingAtLatestOffset); } @Override - public boolean getDataGridUpdates() throws HttpResponseException + public boolean getUpdates() throws HttpResponseException { return this.handleRequest(() -> { @@ -330,7 +330,7 @@ public boolean getDataGridUpdates() throws HttpResponseException } @Override - public void postDataGridResumeUpdates() throws HttpResponseException + public void postResumeUpdates() throws HttpResponseException { LOG.trace("Handling postDataGridResumeUpdates request"); this.handleRequest(this.backupNodeManager::resumeReading); @@ -356,34 +356,34 @@ private MicroNode(final MicroNodeManager microNodeManager, final NodelibraryProp } @Override - public boolean postDataGridActivateDistributorFinish() throws HttpResponseException + public boolean postActivateDistributorFinish() throws HttpResponseException { // micro nodes are always the distributor return true; } @Override - public boolean getDataGridDistributor() throws HttpResponseException + public boolean getDistributor() throws HttpResponseException { // micro nodes are always the distributor return true; } @Override - public void postDataGridActivateDistributorStart() throws HttpResponseException + public void postActivateDistributorStart() throws HttpResponseException { // micro nodes are always the distributor } @Override - public void postDataGridBackup(PostBackup.Body body) throws HttpResponseException + public void postBackup(PostBackup.Body body) throws HttpResponseException { LOG.trace("Handling postDataGridBackup request"); this.handleRequest(this.microNodeManager::createStorageBackup); } @Override - public boolean getDataGridBackup() throws HttpResponseException + public boolean getBackup() throws HttpResponseException { return this.handleRequest(this.microNodeManager::isBackupRunning); } @@ -406,79 +406,79 @@ private DevNode() } @Override - public boolean getDataGridDistributor() throws HttpResponseException + public boolean getDistributor() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postDataGridActivateDistributorStart() throws HttpResponseException + public void postActivateDistributorStart() throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean postDataGridActivateDistributorFinish() throws HttpResponseException + public boolean postActivateDistributorFinish() throws HttpResponseException { throw new BadRequestException(); } @Override - public void getDataGridHealth() throws HttpResponseException + public void getHealth() throws HttpResponseException { throw new BadRequestException(); } @Override - public void getDataGridHealthReady() throws HttpResponseException + public void getHealthReady() throws HttpResponseException { throw new BadRequestException(); } @Override - public String getDataGridStorageBytes() throws HttpResponseException + public String getStorageBytes() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postDataGridBackup(PostBackup.Body body) throws HttpResponseException + public void postBackup(PostBackup.Body body) throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getDataGridBackup() throws HttpResponseException + public boolean getBackup() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postDataGridUpdates() throws HttpResponseException + public void postUpdates() throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getDataGridUpdates() throws HttpResponseException + public boolean getUpdates() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postDataGridResumeUpdates() throws HttpResponseException + public void postResumeUpdates() throws HttpResponseException { throw new BadRequestException(); } @Override - public void postDataGridGc() throws HttpResponseException + public void postGc() throws HttpResponseException { throw new BadRequestException(); } @Override - public boolean getDataGridGc() throws HttpResponseException + public boolean getGc() throws HttpResponseException { throw new BadRequestException(); } diff --git a/cluster/nodelibrary/springboot/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/springboot/SpringBootClusterController.java b/cluster/nodelibrary/springboot/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/springboot/SpringBootClusterController.java index 93a4fa8..27a0146 100644 --- a/cluster/nodelibrary/springboot/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/springboot/SpringBootClusterController.java +++ b/cluster/nodelibrary/springboot/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/springboot/SpringBootClusterController.java @@ -39,9 +39,9 @@ public SpringBootClusterController(final ClusterRestRequestController controller } @GetMapping(value = GetDistributor.PATH, produces = GetDistributor.PRODUCES) - public boolean getDataGridDistributor() throws HttpResponseException + public boolean getDistributor() throws HttpResponseException { - return this.call(this.controller::getDataGridDistributor); + return this.call(this.controller::getDistributor); } @PostMapping( @@ -49,9 +49,9 @@ public boolean getDataGridDistributor() throws HttpResponseException consumes = PostActivateDistributorStart.CONSUMES, produces = PostActivateDistributorStart.PRODUCES ) - public void postDataGridActivateDistributorStart() throws HttpResponseException + public void postActivateDistributorStart() throws HttpResponseException { - this.call(this.controller::postDataGridActivateDistributorStart); + this.call(this.controller::postActivateDistributorStart); } @PostMapping( @@ -59,30 +59,30 @@ public void postDataGridActivateDistributorStart() throws HttpResponseException consumes = PostActivateDistributorFinish.CONSUMES, produces = PostActivateDistributorFinish.PRODUCES ) - public boolean postDataGridActivateDistributorFinish() throws HttpResponseException + public boolean postActivateDistributorFinish() throws HttpResponseException { - return this.call(this.controller::postDataGridActivateDistributorFinish); + return this.call(this.controller::postActivateDistributorFinish); } @GetMapping(value = GetHealth.PATH, produces = GetHealth.PRODUCES) - public void getDataGridHealth() throws HttpResponseException + public void getHealth() throws HttpResponseException { - this.call(this.controller::getDataGridHealth); + this.call(this.controller::getHealth); } @GetMapping(value = GetHealthReady.PATH, produces = GetHealthReady.PRODUCES) @Async - public CompletableFuture getDataGridHealthReady() throws HttpResponseException + public CompletableFuture getHealthReady() throws HttpResponseException { - this.call(this.controller::getDataGridHealthReady); + this.call(this.controller::getHealthReady); return CompletableFuture.completedFuture(null); } @GetMapping(value = GetStorageBytes.PATH, produces = GetStorageBytes.PRODUCES) @Async - public CompletableFuture getDataGridStorageBytes() throws HttpResponseException + public CompletableFuture getStorageBytes() throws HttpResponseException { - return CompletableFuture.completedFuture(this.call(this.controller::getDataGridStorageBytes)); + return CompletableFuture.completedFuture(this.call(this.controller::getStorageBytes)); } @PostMapping( @@ -90,9 +90,9 @@ public CompletableFuture getDataGridStorageBytes() throws HttpResponseEx consumes = PostBackup.CONSUMES, produces = PostBackup.PRODUCES ) - public void postDataGridBackup(@RequestBody final PostBackup.Body body) throws HttpResponseException + public void postBackup(@RequestBody final PostBackup.Body body) throws HttpResponseException { - this.call(() -> this.controller.postDataGridBackup(body)); + this.call(() -> this.controller.postBackup(body)); } @PostMapping( @@ -100,21 +100,21 @@ public void postDataGridBackup(@RequestBody final PostBackup.Body body) throws H consumes = PostUpdates.CONSUMES, produces = PostUpdates.PRODUCES ) - public void postDataGridUpdates() throws HttpResponseException + public void postUpdates() throws HttpResponseException { - this.call(this.controller::postDataGridUpdates); + this.call(this.controller::postUpdates); } @GetMapping(value = GetBackup.PATH, produces = GetBackup.PRODUCES) - public boolean getDataGridBackup() throws HttpResponseException + public boolean getBackup() throws HttpResponseException { - return this.call(this.controller::getDataGridBackup); + return this.call(this.controller::getBackup); } @GetMapping(value = GetUpdates.PATH, produces = GetUpdates.PRODUCES) - public boolean getDataGridUpdates() throws HttpResponseException + public boolean getUpdates() throws HttpResponseException { - return this.call(this.controller::getDataGridUpdates); + return this.call(this.controller::getUpdates); } @PostMapping( @@ -122,9 +122,9 @@ public boolean getDataGridUpdates() throws HttpResponseException consumes = PostResumeUpdates.CONSUMES, produces = PostResumeUpdates.PRODUCES ) - public void postDataGridResumeUpdates() throws HttpResponseException + public void postResumeUpdates() throws HttpResponseException { - this.call(this.controller::postDataGridResumeUpdates); + this.call(this.controller::postResumeUpdates); } @PostMapping( @@ -132,15 +132,15 @@ public void postDataGridResumeUpdates() throws HttpResponseException consumes = PostGc.CONSUMES, produces = PostGc.PRODUCES ) - public void postDataGridGc() throws HttpResponseException + public void postGc() throws HttpResponseException { - this.call(this.controller::postDataGridGc); + this.call(this.controller::postGc); } @GetMapping(value = GetGc.PATH, produces = GetGc.PRODUCES) - public boolean getDataGridGc() throws HttpResponseException + public boolean getGc() throws HttpResponseException { - return this.call(this.controller::getDataGridGc); + return this.call(this.controller::getGc); } private T call(final Supplier s) From 68f1c9fa27407b8c71a3250d861d8d6346916782 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20St=C3=B6r?= Date: Fri, 28 Nov 2025 07:38:51 +0100 Subject: [PATCH 5/7] Rename microstreamOffset->messageIndex --- .../types/ClusterStorageBinaryDataClient.java | 8 ++++---- .../types/ClusterStorageBinaryDataPacket.java | 16 ++++++++-------- .../ClusterStorageBinaryDistributedKafka.java | 12 ++++++------ .../nodelibrary/types/KafkaOffsetProvider.java | 12 ++++++------ .../nodelibrary/types/StorageNodeManager.java | 8 ++++---- 5 files changed, 28 insertions(+), 28 deletions(-) diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataClient.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataClient.java index 2d2cb21..a0535cb 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataClient.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataClient.java @@ -368,19 +368,19 @@ private void consumeFullMessage( for (final var packet : packets) { - if (this.cachedOffset >= packet.microstreamOffset()) + if (this.cachedOffset >= packet.messageIndex()) { LOG.warn( "Skipping packet with offset {} (current: {})", - packet.microstreamOffset(), + packet.messageIndex(), this.cachedOffset ); continue; } - this.cachedOffset = packet.microstreamOffset(); + this.cachedOffset = packet.messageIndex(); @@ -417,7 +417,7 @@ private ClusterStorageBinaryDataPacket createDataPacket( ClusterStorageBinaryDistributedKafka.messageLength(headers), ClusterStorageBinaryDistributedKafka.packetIndex(headers), ClusterStorageBinaryDistributedKafka.packetCount(headers), - ClusterStorageBinaryDistributedKafka.microstreamOffset(headers), + ClusterStorageBinaryDistributedKafka.messageIndex(headers), ByteBuffer.wrap(record.value()) ); } diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataPacket.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataPacket.java index c3278b2..ee194c2 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataPacket.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataPacket.java @@ -25,11 +25,11 @@ public interface ClusterStorageBinaryDataPacket extends StorageBinaryDataPacket { - long microstreamOffset(); + long messageIndex(); - public static ClusterStorageBinaryDataPacket New(final MessageType messageType, final int messageLength, final int packetIndex, final int packetCount, final long microstreamOffset, final ByteBuffer buffer) + public static ClusterStorageBinaryDataPacket New(final MessageType messageType, final int messageLength, final int packetIndex, final int packetCount, final long messageIndex, final ByteBuffer buffer) { - return new Default(notNull(messageType), notNegative(messageLength), notNegative(packetIndex), positive(packetCount), microstreamOffset, notNull(buffer)); + return new Default(notNull(messageType), notNegative(messageLength), notNegative(packetIndex), positive(packetCount), messageIndex, notNull(buffer)); } public static class Default implements ClusterStorageBinaryDataPacket @@ -38,16 +38,16 @@ public static class Default implements ClusterStorageBinaryDataPacket private final int messageLength; private final int packetIndex; private final int packetCount; - private final long microstreamOffset; + private final long messageIndex; private final ByteBuffer buffer; - private Default(final MessageType messageType, final int messageLength, final int packetIndex, final int packetCount, final long microstreamOffset, final ByteBuffer buffer) + private Default(final MessageType messageType, final int messageLength, final int packetIndex, final int packetCount, final long messageIndex, final ByteBuffer buffer) { this.messageType = messageType; this.messageLength = messageLength; this.packetIndex = packetIndex; this.packetCount = packetCount; - this.microstreamOffset = microstreamOffset; + this.messageIndex = messageIndex; this.buffer = buffer; } @@ -76,9 +76,9 @@ public int packetCount() } @Override - public long microstreamOffset() + public long messageIndex() { - return this.microstreamOffset; + return this.messageIndex; } @Override diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDistributedKafka.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDistributedKafka.java index a074d6d..d8b5a9d 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDistributedKafka.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDistributedKafka.java @@ -37,9 +37,9 @@ public static String keyPacketCount() return "packet-count"; } - public static String keyMicrostreamOffset() + public static String keyMessageIndex() { - return "microstreamOffset"; + return "messageIndex"; } public static String keyPacketIndex() @@ -63,14 +63,14 @@ public static void addPacketHeaders( final int messageLength, final int packetIndex, final int packetCount, - final long microstreamOffset + final long messageIndex ) { headers.add(keyMessageType(), serializeString(messageType.name())); headers.add(keyMessageLength(), serializeInt(messageLength)); headers.add(keyPacketIndex(), serializeInt(packetIndex)); headers.add(keyPacketCount(), serializeInt(packetCount)); - headers.add(keyMicrostreamOffset(), serializeLong(microstreamOffset)); + headers.add(keyMessageIndex(), serializeLong(messageIndex)); } public static MessageType messageType(final Headers headers) @@ -93,9 +93,9 @@ public static int packetCount(final Headers headers) return deserializeInt(headers.lastHeader(keyPacketCount()).value()); } - public static long microstreamOffset(final Headers headers) + public static long messageIndex(final Headers headers) { - return deserializeLong(headers.lastHeader(keyMicrostreamOffset()).value()); + return deserializeLong(headers.lastHeader(keyMessageIndex()).value()); } public static byte[] serializeString(final String value) diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/KafkaOffsetProvider.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/KafkaOffsetProvider.java index d62aef5..b2e8ccf 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/KafkaOffsetProvider.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/KafkaOffsetProvider.java @@ -110,23 +110,23 @@ public void init() throws KafkaException */ public long provideLatestOffset() throws KafkaException { - long lastMicrostreamOffset = Long.MIN_VALUE; + long lastMessageIndex = Long.MIN_VALUE; this.seekToLastOffsets(); // LOG.trace("Polling latest messages for topic {}", this.topic); for (final var rec : this.kafka.poll(POLL_TIMEOUT)) { - final long microstreamOffset = ClusterStorageBinaryDistributedKafka.deserializeLong( - rec.headers().lastHeader(ClusterStorageBinaryDistributedKafka.keyMicrostreamOffset()).value() + final long messageIndex = ClusterStorageBinaryDistributedKafka.deserializeLong( + rec.headers().lastHeader(ClusterStorageBinaryDistributedKafka.keyMessageIndex()).value() ); - if (microstreamOffset > lastMicrostreamOffset) + if (messageIndex > lastMessageIndex) { - lastMicrostreamOffset = microstreamOffset; + lastMessageIndex = messageIndex; } } - return lastMicrostreamOffset; + return lastMessageIndex; } public OffsetInfo provideLatestOffsetInfo() diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeManager.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeManager.java index f415952..b33d587 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeManager.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeManager.java @@ -31,9 +31,9 @@ public interface StorageNodeManager extends ClusterNodeManager boolean finishDistributonSwitch() throws NotADistributorException; - long getCurrentMicrostreamOffset(); + long getCurrentMessageIndex(); - long getLatestMicrostreamOffset(); + long getLatestMessageIndex(); static StorageNodeManager New( @@ -185,7 +185,7 @@ public boolean finishDistributonSwitch() throws NotADistributorException } @Override - public long getCurrentMicrostreamOffset() + public long getCurrentMessageIndex() { if (this.isDistributor()) { @@ -198,7 +198,7 @@ public long getCurrentMicrostreamOffset() } @Override - public long getLatestMicrostreamOffset() + public long getLatestMessageIndex() { return this.kafkaOffsetProvider.provideLatestOffset(); } From a4ef7f90cacd396df1e5409b3f74b21ed6e87a33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20St=C3=B6r?= Date: Fri, 28 Nov 2025 07:43:04 +0100 Subject: [PATCH 6/7] Rename NAD header --- .../nodelibrary/exceptions/NotADistributorException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/exceptions/NotADistributorException.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/exceptions/NotADistributorException.java index 2e98c17..b6c847c 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/exceptions/NotADistributorException.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/exceptions/NotADistributorException.java @@ -22,7 +22,7 @@ public class NotADistributorException extends BadRequestException { - public static final String NAD_HEADER_KEY = "DataGrid-NAD"; + public static final String NAD_HEADER_KEY = "StorageNode-NAD"; public static final String NAD_HEADER_VALUE = Boolean.TRUE.toString(); public NotADistributorException() From 1b5435a2205d20b397bba445bde728cc0d1f5b4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20St=C3=B6r?= Date: Fri, 28 Nov 2025 13:35:18 +0100 Subject: [PATCH 7/7] refactor!: Rename ms-offset refs to message-idx BREAKING CHANGE: Rename many types and fields referencing the msOffset and OffsetInfo --- .../AfterDataMessageConsumedListener.java | 2 +- .../nodelibrary/types/BackupNodeManager.java | 6 +- .../nodelibrary/types/ClusterFoundation.java | 102 ++++---- .../types/ClusterRestRequestController.java | 2 +- .../types/ClusterStorageBinaryDataClient.java | 80 +++---- .../ClusterStorageBinaryDataDistributor.java | 12 +- ...sterStorageBinaryDataDistributorKafka.java | 20 +- .../types/FilesystemVolumeBackupBackend.java | 16 +- ...der.java => KafkaMessageInfoProvider.java} | 110 +++++---- .../{OffsetInfo.java => MessageInfo.java} | 24 +- .../types/NetworkArchiveBackupBackend.java | 22 +- .../types/StorageBackupBackend.java | 2 +- .../types/StorageBackupManager.java | 14 +- .../types/StorageNodeHealthCheck.java | 24 +- .../nodelibrary/types/StorageNodeManager.java | 22 +- .../types/StoredMessageIndexManager.java | 226 ++++++++++++++++++ .../types/StoredOffsetManager.java | 224 ----------------- 17 files changed, 457 insertions(+), 451 deletions(-) rename cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/{KafkaOffsetProvider.java => KafkaMessageInfoProvider.java} (64%) rename cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/{OffsetInfo.java => MessageInfo.java} (67%) create mode 100644 cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StoredMessageIndexManager.java delete mode 100644 cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StoredOffsetManager.java diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/AfterDataMessageConsumedListener.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/AfterDataMessageConsumedListener.java index 76662e1..1fd9266 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/AfterDataMessageConsumedListener.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/AfterDataMessageConsumedListener.java @@ -18,7 +18,7 @@ public interface AfterDataMessageConsumedListener extends AutoCloseable { - void onChange(OffsetInfo offsetInfo) throws NodelibraryException; + void onChange(MessageInfo messageInfo) throws NodelibraryException; @Override void close(); diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/BackupNodeManager.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/BackupNodeManager.java index 7546d4a..ecb0749 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/BackupNodeManager.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/BackupNodeManager.java @@ -24,7 +24,7 @@ public interface BackupNodeManager extends ClusterNodeManager { - void stopReadingAtLatestOffset(); + void stopReadingAtLatestMessage(); void resumeReading() throws NodelibraryException; @@ -83,10 +83,10 @@ private Default( } @Override - public void stopReadingAtLatestOffset() + public void stopReadingAtLatestMessage() { this.validateRunning(); - this.dataClient.stopAtLatestOffset(); + this.dataClient.stopAtLatestMessage(); } @Override diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterFoundation.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterFoundation.java index 72073f1..177e497 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterFoundation.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterFoundation.java @@ -101,9 +101,9 @@ public interface ClusterFoundation> extends Insta F setAfterDataMessageConsumedListener(AfterDataMessageConsumedListener listener); - StoredOffsetManager getStoredOffsetManager(); + StoredMessageIndexManager getStoredMessageIndexManager(); - F setStoredOffsetManager(StoredOffsetManager manager); + F setStoredMessageIndexManager(StoredMessageIndexManager manager); StorageBackupManager getStorageBackupManager(); @@ -157,9 +157,9 @@ public interface ClusterFoundation> extends Insta F setEnableAsyncDistribution(boolean enable); - KafkaOffsetProvider getKafkaOffsetProvider(); + KafkaMessageInfoProvider getKafkaMessageInfoProvider(); - F setKafkaOffsetProvider(KafkaOffsetProvider provider); + F setKafkaMessageInfoProvider(KafkaMessageInfoProvider provider); static ClusterFoundation New() { @@ -201,9 +201,9 @@ class Default> extends InstanceDispatcher.Default implement private AfterDataMessageConsumedListener afterDataMessageConsumedListener; private ClusterStorageBinaryDataMerger dataMerger; private ClusterStorageBinaryDataPacketAcceptor dataPacketAcceptor; - private StoredOffsetManager storedOffsetManager; + private StoredMessageIndexManager storedMessageInfoManager; private KafkaPropertiesProvider kafkaPropertiesProvider; - private KafkaOffsetProvider kafkaOffsetProvider; + private KafkaMessageInfoProvider kafkaMessageInfoProvider; // cached created types private ClusterStorageManager clusterStorageManager; @@ -223,7 +223,7 @@ protected StorageBackupBackend ensureBackupBackend() { // TODO: Hardcoded path final var props = this.getNodelibraryPropertiesProvider(); - final StoredOffsetManager.Creator offsetManagerCreator = StoredOffsetManager::New; + final StoredMessageIndexManager.Creator messageIndexManagerCreator = StoredMessageIndexManager::New; if (props.backupTarget() == BackupTarget.SAAS) { @@ -242,12 +242,12 @@ protected StorageBackupBackend ensureBackupBackend() return NetworkArchiveBackupBackend.New( scratchSpace, this.getBackupProxyHttpClient(), - offsetManagerCreator + messageIndexManagerCreator ); } else { - return FilesystemVolumeBackupBackend.New(Paths.get("/backups"), offsetManagerCreator); + return FilesystemVolumeBackupBackend.New(Paths.get("/backups"), messageIndexManagerCreator); } } @@ -309,7 +309,7 @@ protected StorageLimitCheckerQuartzCronJobManager ensureStorageLimitCheckerManag ); } - protected KafkaOffsetProvider ensureKafkaOffsetProvider() + protected KafkaMessageInfoProvider ensureKafkaMessageInfoProvider() { final var nodelibProps = this.getNodelibraryPropertiesProvider(); final var kafkaProps = this.getKafkaPropertiesProvider(); @@ -317,7 +317,7 @@ protected KafkaOffsetProvider ensureKafkaOffsetProvider() final String topic = nodelibProps.kafkaTopicName(); final String groupId = String.format("%s-%s-offsetgetter", topic, nodelibProps.myPodName()); - return KafkaOffsetProvider.New(topic, groupId, kafkaProps); + return KafkaMessageInfoProvider.New(topic, groupId, kafkaProps); } protected KafkaPropertiesProvider ensureKafkaPropertiesProvider() @@ -325,29 +325,29 @@ protected KafkaPropertiesProvider ensureKafkaPropertiesProvider() return KafkaPropertiesProvider.New(this.getNodelibraryPropertiesProvider()); } - protected StoredOffsetManager ensureStoredOffsetManager() + protected StoredMessageIndexManager ensureStoredMessageIndexManager() { // TODO: Hardcoded path - final var offsetPath = Paths.get("/storage/offset"); - LOG.trace("Creating stored offset manager for offset file at {}", offsetPath); - return StoredOffsetManager.New(NioFileSystem.New().ensureFile(offsetPath).tryUseWriting()); + final var messageInfoPath = Paths.get("/storage/offset"); + LOG.trace("Creating stored offset manager for offset file at {}", messageInfoPath); + return StoredMessageIndexManager.New(NioFileSystem.New().ensureFile(messageInfoPath).tryUseWriting()); } protected AfterDataMessageConsumedListener ensureAfterDataMessageConsumedListener() { final var props = this.getNodelibraryPropertiesProvider(); - final var storedOffsetUpdater = new AfterDataMessageConsumedListener() + final var storedMessageInfoUpdater = new AfterDataMessageConsumedListener() { - final StoredOffsetManager delegate = ClusterFoundation.Default.this.getStoredOffsetManager(); + final StoredMessageIndexManager delegate = ClusterFoundation.Default.this.getStoredMessageIndexManager(); @Override - public void onChange(final OffsetInfo offsetInfo) throws NodelibraryException + public void onChange(final MessageInfo messageInfo) throws NodelibraryException { if (props.isBackupNode()) { - // only backup nodes shall update the stored offset - this.delegate.set(offsetInfo); + // only backup nodes shall update the stored message index + this.delegate.set(messageInfo); } } @@ -358,10 +358,10 @@ public void close() } }; LOG.trace( - "Created AfterDataMessageConsumedListener->StoredOffsetManager delegate. WillRun={}", + "Created AfterDataMessageConsumedListener->StoredMessageInfoManager delegate. WillRun={}", props.isBackupNode() ); - return storedOffsetUpdater; + return storedMessageInfoUpdater; } protected StorageBackupManager ensureStorageBackupManager() @@ -369,14 +369,14 @@ protected StorageBackupManager ensureStorageBackupManager() final var props = this.getNodelibraryPropertiesProvider(); final int maxBackupCount = props.keptBackupsCount(); - final Supplier offsetProvider = this.getClusterStorageBinaryDataClient()::offsetInfo; + final Supplier messageInfoProvider = this.getClusterStorageBinaryDataClient()::messageInfo; return StorageBackupManager.New( this.clusterStorageManager, maxBackupCount, this.getStorageBackupBackend(), - offsetProvider, + messageInfoProvider, this.getClusterStorageBinaryDataClient() ); } @@ -431,7 +431,7 @@ protected ClusterStorageBinaryDataClient ensureClusterStorageBinaryDataClient() topic, groupId, this.getAfterDataMessageConsumedListener(), - this.getStoredOffsetManager().get(), + this.getStoredMessageIndexManager().get(), this.getKafkaPropertiesProvider(), doCommitOffset ); @@ -483,7 +483,7 @@ protected StorageNodeManager ensureStorageNodeManager() this.getStorageNodeHealthCheck(), this.clusterStorageManager, this.getStorageDiskSpaceReader(), - this.getKafkaOffsetProvider() + this.getKafkaMessageInfoProvider() ); } @@ -961,36 +961,36 @@ public F setClusterStorageBinaryDataPacketAcceptor(final ClusterStorageBinaryDat } @Override - public StoredOffsetManager getStoredOffsetManager() + public StoredMessageIndexManager getStoredMessageIndexManager() { - if (this.storedOffsetManager == null) + if (this.storedMessageInfoManager == null) { - this.storedOffsetManager = this.dispatch(this.ensureStoredOffsetManager()); + this.storedMessageInfoManager = this.dispatch(this.ensureStoredMessageIndexManager()); } - return this.storedOffsetManager; + return this.storedMessageInfoManager; } @Override - public F setStoredOffsetManager(final StoredOffsetManager manager) + public F setStoredMessageIndexManager(final StoredMessageIndexManager manager) { - this.storedOffsetManager = manager; + this.storedMessageInfoManager = manager; return this.$(); } @Override - public KafkaOffsetProvider getKafkaOffsetProvider() + public KafkaMessageInfoProvider getKafkaMessageInfoProvider() { - if (this.kafkaOffsetProvider == null) + if (this.kafkaMessageInfoProvider == null) { - this.kafkaOffsetProvider = this.dispatch(this.ensureKafkaOffsetProvider()); + this.kafkaMessageInfoProvider = this.dispatch(this.ensureKafkaMessageInfoProvider()); } - return this.kafkaOffsetProvider; + return this.kafkaMessageInfoProvider; } @Override - public F setKafkaOffsetProvider(final KafkaOffsetProvider provider) + public F setKafkaMessageInfoProvider(final KafkaMessageInfoProvider provider) { - this.kafkaOffsetProvider = provider; + this.kafkaMessageInfoProvider = provider; return this.$(); } @@ -1042,15 +1042,15 @@ protected void startBackupNode() throws NodelibraryException { LOG.info("Starting backup cluster node"); - this.getKafkaOffsetProvider().init(); + this.getKafkaMessageInfoProvider().init(); // TODO: Hardcoded paths final var storageParentPath = Paths.get("/storage/"); final var storageRootPath = storageParentPath.resolve("storage"); - // if we use a downloaded storage, always scroll to the latest offset so we don't read old messages - boolean useLatestOffset = false; + // if we use a downloaded storage, always scroll to the latest message so we don't read old messages + boolean useLatestMessageIndex = false; boolean requiresStorageUpload = false; // don't send messages generated by starting the storage and storing the empty root @@ -1070,7 +1070,7 @@ protected void startBackupNode() throws NodelibraryException { LOG.info("Downloading user uploaded storage"); - useLatestOffset = true; + useLatestMessageIndex = true; // since the storage is now different than before the storage nodes // also need the exact same storage requiresStorageUpload = true; @@ -1088,11 +1088,11 @@ else if (containsBackups && !Files.exists(storageRootPath)) LOG.info("Starting with local storage"); } - if (useLatestOffset) + if (useLatestMessageIndex) { - final var offsets = this.getKafkaOffsetProvider().provideLatestOffsetInfo(); - LOG.debug("Set starting offset to offsets: {}", offsets); - this.getStoredOffsetManager().set(offsets); + final var info = this.getKafkaMessageInfoProvider().provideLatestMessageInfo(); + LOG.debug("Set starting message info to: {}", info); + this.getStoredMessageIndexManager().set(info); } LOG.info("Creating nodelibrary cluster controller"); @@ -1190,7 +1190,7 @@ protected void startStorageNode() throws NodelibraryException // TODO: Hardcoded paths final var storageParentPath = Paths.get("/storage/"); final var storageRootPath = storageParentPath.resolve("storage"); - final var offsetPath = storageParentPath.resolve("offset"); + final var messageInfoPath = storageParentPath.resolve("offset"); if (Files.exists(storageRootPath)) { @@ -1198,15 +1198,15 @@ protected void startStorageNode() throws NodelibraryException this.deleteDirectory(storageRootPath); } - if (Files.exists(offsetPath)) + if (Files.exists(messageInfoPath)) { try { - Files.delete(offsetPath); + Files.delete(messageInfoPath); } catch (final IOException e) { - throw new NodelibraryException("Failed to delete offset file", e); + throw new NodelibraryException("Failed to delete message info file", e); } } @@ -1233,7 +1233,7 @@ protected void startStorageNode() throws NodelibraryException // don't send messages generated by starting the storage and storing the empty root this.getClusterStorageBinaryDataDistributor().ignoreDistribution(true); - this.getKafkaOffsetProvider().init(); + this.getKafkaMessageInfoProvider().init(); final var embeddedStorageFoundation = this.getEmbeddedStorageFoundation(); // replace the storage live file provider from the provided embedded storage foundation diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java index ad33da0..21c5ab6 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterRestRequestController.java @@ -316,7 +316,7 @@ public boolean getBackup() throws HttpResponseException public void postUpdates() throws HttpResponseException { LOG.trace("Handling postDataGridUpdates request"); - this.handleRequest(this.backupNodeManager::stopReadingAtLatestOffset); + this.handleRequest(this.backupNodeManager::stopReadingAtLatestMessage); } @Override diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataClient.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataClient.java index a0535cb..cad8ca1 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataClient.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataClient.java @@ -41,9 +41,9 @@ public interface ClusterStorageBinaryDataClient extends StorageBinaryDataClient { - void stopAtLatestOffset(); + void stopAtLatestMessage(); - OffsetInfo offsetInfo(); + MessageInfo messageInfo(); boolean isRunning(); @@ -55,7 +55,7 @@ static ClusterStorageBinaryDataClient New( final String topicName, final String groupId, final AfterDataMessageConsumedListener offsetChangedListener, - final OffsetInfo startingOffsetInfo, + final MessageInfo startingMessageInfo, final KafkaPropertiesProvider kafkaPropertiesProvider, final boolean doCommitOffset ) @@ -65,7 +65,7 @@ static ClusterStorageBinaryDataClient New( notNull(topicName), notNull(groupId), notNull(offsetChangedListener), - notNull(startingOffsetInfo), + notNull(startingMessageInfo), notNull(kafkaPropertiesProvider), doCommitOffset ); @@ -90,9 +90,9 @@ final class Default implements ClusterStorageBinaryDataClient private final KafkaPropertiesProvider kafkaPropertiesProvider; private final boolean doCommitOffset; - private final AtomicReference offsetInfo; - private long cachedOffset; - private final AtomicBoolean stopAtLatestOffset = new AtomicBoolean(); + private final AtomicReference messageInfo; + private long cachedMessageIndex; + private final AtomicBoolean stopAtLatestMessage = new AtomicBoolean(); private final AtomicBoolean requestStop = new AtomicBoolean(); private final AtomicBoolean running = new AtomicBoolean(); @@ -103,7 +103,7 @@ private Default( final String topicName, final String groupId, final AfterDataMessageConsumedListener offsetChangedListener, - final OffsetInfo startingOffsetInfo, + final MessageInfo startingMessageInfo, final KafkaPropertiesProvider kafkaPropertiesProvider, final boolean doCommitOffset ) @@ -112,8 +112,8 @@ private Default( this.topicName = topicName; this.groupId = groupId; this.offsetChangedListener = offsetChangedListener; - this.offsetInfo = new AtomicReference<>(startingOffsetInfo); - this.cachedOffset = startingOffsetInfo.msOffset(); + this.messageInfo = new AtomicReference<>(startingMessageInfo); + this.cachedMessageIndex = startingMessageInfo.messageIndex(); this.kafkaPropertiesProvider = kafkaPropertiesProvider; this.doCommitOffset = doCommitOffset; } @@ -125,9 +125,9 @@ public boolean isRunning() } @Override - public OffsetInfo offsetInfo() + public MessageInfo messageInfo() { - return this.offsetInfo.get(); + return this.messageInfo.get(); } @Override @@ -135,7 +135,7 @@ public void start() { if (LOG.isInfoEnabled()) { - LOG.info("Starting kafka data client at offsets {}", this.offsetInfo.get().msOffset()); + LOG.info("Starting kafka data client at message index {}", this.messageInfo.get().messageIndex()); } this.runner = new Thread(this::tryRun); this.runner.start(); @@ -185,8 +185,8 @@ private void run() } // Seek to correct offsets - final var cachedOffsetInfo = this.offsetInfo.get(); - for (final var entry : cachedOffsetInfo.kafkaPartitionOffsets()) + final var cachedMessageInfo = this.messageInfo.get(); + for (final var entry : cachedMessageInfo.kafkaPartitionOffsets()) { final var partition = entry.key(); final long offset = entry.value(); @@ -195,7 +195,7 @@ private void run() } final var missingPartitions = consumer.assignment() .stream() - .filter(a -> !cachedOffsetInfo.kafkaPartitionOffsets().containsSearched(kv -> kv.key().equals(a))) + .filter(a -> !cachedMessageInfo.kafkaPartitionOffsets().containsSearched(kv -> kv.key().equals(a))) .toList(); if (!missingPartitions.isEmpty()) { @@ -209,16 +209,16 @@ private void run() boolean run = true; while (run && !this.requestStop.get()) { - if (!this.stopAtLatestOffset.get()) + if (!this.stopAtLatestMessage.get()) { this.pollAndConsume(consumer); } else { - LOG.info("Data client is now stopping at latest offset."); + LOG.info("Data client is now stopping at latest message."); final long stopAt; try ( - final var offsetProvider = KafkaOffsetProvider.New( + final var offsetProvider = KafkaMessageInfoProvider.New( this.topicName, this.groupId + "-offsetgetter", this.kafkaPropertiesProvider @@ -226,17 +226,17 @@ private void run() ) { offsetProvider.init(); - stopAt = offsetProvider.provideLatestOffset(); + stopAt = offsetProvider.provideLatestMessageIndex(); } - LOG.info("Stopping at offset {}", stopAt); + LOG.info("Stopping at message index {}", stopAt); - while (this.cachedOffset < stopAt && !this.requestStop.get()) + while (this.cachedMessageIndex < stopAt && !this.requestStop.get()) { this.pollAndConsume(consumer); } - LOG.info("Data client is now at latest offset ({})", this.cachedOffset); + LOG.info("Data client is now at latest offset ({})", this.cachedMessageIndex); - this.stopAtLatestOffset.set(false); + this.stopAtLatestMessage.set(false); run = false; } @@ -251,11 +251,11 @@ private void run() LOG.info("DataClient run finished"); } - private OffsetInfo updateOffsets(final KafkaConsumer consumer) + private MessageInfo updateOffsets(final KafkaConsumer consumer) { - if (LOG.isDebugEnabled() && this.cachedOffset % 10_000 == 0) + if (LOG.isDebugEnabled() && this.cachedMessageIndex % 10_000 == 0) { - LOG.debug("Polling and updating offset info for offset {}", this.cachedOffset); + LOG.debug("Polling and updating message info for message index {}", this.cachedMessageIndex); } final EqHashTable map = EqHashTable.New(); for (final var partition : consumer.assignment()) @@ -266,8 +266,8 @@ private OffsetInfo updateOffsets(final KafkaConsumer consumer) offset = Math.max(offset - this.cachedPackets.size(), 0); map.put(partition, offset); } - final var info = OffsetInfo.New(this.cachedOffset, map.immure()); - this.offsetInfo.set(info); + final var info = MessageInfo.New(this.cachedMessageIndex, map.immure()); + this.messageInfo.set(info); return info; } @@ -368,25 +368,25 @@ private void consumeFullMessage( for (final var packet : packets) { - if (this.cachedOffset >= packet.messageIndex()) + if (this.cachedMessageIndex >= packet.messageIndex()) { LOG.warn( "Skipping packet with offset {} (current: {})", packet.messageIndex(), - this.cachedOffset + this.cachedMessageIndex ); continue; } - this.cachedOffset = packet.messageIndex(); + this.cachedMessageIndex = packet.messageIndex(); - if (LOG.isTraceEnabled() && this.cachedOffset % 10_000 == 0) + if (LOG.isTraceEnabled() && this.cachedMessageIndex % 10_000 == 0) { - LOG.trace("Consuming packet with offset {}", this.cachedOffset); + LOG.trace("Consuming packet with offset {}", this.cachedMessageIndex); } newPackets.add(packet); @@ -397,9 +397,9 @@ private void consumeFullMessage( if (!newPackets.isEmpty()) { - if (LOG.isDebugEnabled() && this.cachedOffset % 10_000 == 0) + if (LOG.isDebugEnabled() && this.cachedMessageIndex % 10_000 == 0) { - LOG.debug("Applying packets at offset {}", this.cachedOffset); + LOG.debug("Applying packets at offset {}", this.cachedMessageIndex); } this.packetAcceptor.accept(newPackets); final var newInfo = this.updateOffsets(consumer); @@ -427,19 +427,19 @@ private ClusterStorageBinaryDataPacket createDataPacket( * reached. */ @Override - public void stopAtLatestOffset() + public void stopAtLatestMessage() { LOG.info("DataClient will stop at latest offset"); - this.stopAtLatestOffset.set(true); + this.stopAtLatestMessage.set(true); } @Override public void resume() throws NodelibraryException { - if (this.stopAtLatestOffset.get()) + if (this.stopAtLatestMessage.get()) { throw new NodelibraryException( - new IllegalStateException("Client is sill reading up to the latest offset") + new IllegalStateException("Client is sill reading up to the latest message index") ); } if (this.isRunning()) diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributor.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributor.java index 673f660..aaf0443 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributor.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributor.java @@ -22,9 +22,9 @@ public interface ClusterStorageBinaryDataDistributor extends StorageBinaryDataDistributor { - void offset(long offset); + void messageIndex(long index); - long offset(); + long messageIndex(); void ignoreDistribution(boolean ignore); @@ -63,15 +63,15 @@ public synchronized void distributeTypeDictionary(final String typeDictionaryDat } @Override - public void offset(final long offset) + public void messageIndex(final long index) { - this.delegate.offset(offset); + this.delegate.messageIndex(index); } @Override - public long offset() + public long messageIndex() { - return this.delegate.offset(); + return this.delegate.messageIndex(); } @Override diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributorKafka.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributorKafka.java index a46792d..754da08 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributorKafka.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/ClusterStorageBinaryDataDistributorKafka.java @@ -64,7 +64,7 @@ abstract class Abstract implements ClusterStorageBinaryDataDistributorKafka private final String topicName; private final KafkaProducer producer; - private long offset = Long.MIN_VALUE; + private long messageIndex = Long.MIN_VALUE; private boolean ignoreDistribution = false; protected Abstract(final String topicName, final KafkaPropertiesProvider kafkaPropertiesProvider) @@ -147,7 +147,7 @@ private void executeDistribution(final MessageType messageType, final Binary dat final var kafkaRecord = new ProducerRecord(this.topicName, packet); - ++this.offset; + ++this.messageIndex; ClusterStorageBinaryDistributedKafka.addPacketHeaders( kafkaRecord.headers(), @@ -155,12 +155,12 @@ private void executeDistribution(final MessageType messageType, final Binary dat messageSize, packetIndex, packetCount, - this.offset + this.messageIndex ); - if (LOG.isDebugEnabled() && this.offset % 10_000 == 0) + if (LOG.isDebugEnabled() && this.messageIndex % 10_000 == 0) { - LOG.debug("Sending kafka packet at offset {}", this.offset); + LOG.debug("Sending kafka packet at message index {}", this.messageIndex); } this.producer.send(kafkaRecord); @@ -198,16 +198,16 @@ public void distributeTypeDictionary(final String typeDictionaryData) } @Override - public void offset(final long offset) + public void messageIndex(final long index) { - LOG.info("Setting distributor offset to {}", offset); - this.offset = offset; + LOG.info("Setting distributor message index to {}", index); + this.messageIndex = index; } @Override - public long offset() + public long messageIndex() { - return this.offset; + return this.messageIndex; } @Override diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/FilesystemVolumeBackupBackend.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/FilesystemVolumeBackupBackend.java index e2ba998..d8934eb 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/FilesystemVolumeBackupBackend.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/FilesystemVolumeBackupBackend.java @@ -32,23 +32,23 @@ public interface FilesystemVolumeBackupBackend extends StorageBackupBackend { static FilesystemVolumeBackupBackend New( final Path backupVolumePath, - final StoredOffsetManager.Creator storedOffsetManagerCreator + final StoredMessageIndexManager.Creator storedmessageIndexManagerCreator ) { - return new Default(notNull(backupVolumePath), notNull(storedOffsetManagerCreator)); + return new Default(notNull(backupVolumePath), notNull(storedmessageIndexManagerCreator)); } static class Default implements FilesystemVolumeBackupBackend { private final Path backupVolumePath; private final Path userUploadedStorageFolderPath; - private final StoredOffsetManager.Creator offsetManagerCreator; + private final StoredMessageIndexManager.Creator messageIndexManagerCreator; - private Default(final Path backupVolumePath, final StoredOffsetManager.Creator storedOffsetManagerCreator) + private Default(final Path backupVolumePath, final StoredMessageIndexManager.Creator storedMessageIndexManagerCreator) { this.backupVolumePath = backupVolumePath; this.userUploadedStorageFolderPath = backupVolumePath.resolve("user-uploaded-storage"); - this.offsetManagerCreator = storedOffsetManagerCreator; + this.messageIndexManagerCreator = storedMessageIndexManagerCreator; } @Override @@ -70,7 +70,7 @@ public void deleteBackup(final BackupMetadata backup) throws NodelibraryExceptio @Override public void createAndUploadBackup( final StorageConnection connection, - final OffsetInfo offsetInfo, + final MessageInfo messageInfo, final BackupMetadata backup ) throws NodelibraryException { @@ -80,12 +80,12 @@ public void createAndUploadBackup( connection.issueFullBackup(fs.ensureDirectory(backupRootPath.resolve("storage"))); // TODO: Hardcoded offset file name try ( - final var offsetWriter = this.offsetManagerCreator.create( + final var infoWriter = this.messageIndexManagerCreator.create( fs.ensureFile(backupRootPath.resolve("offset")).tryUseWriting() ) ) { - offsetWriter.set(offsetInfo); + infoWriter.set(messageInfo); } try { diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/KafkaOffsetProvider.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/KafkaMessageInfoProvider.java similarity index 64% rename from cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/KafkaOffsetProvider.java rename to cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/KafkaMessageInfoProvider.java index b2e8ccf..a5dd684 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/KafkaOffsetProvider.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/KafkaMessageInfoProvider.java @@ -32,41 +32,45 @@ import static org.eclipse.serializer.util.X.notNull; /** - * Tool to ask kafka for the last microstream offset available + * Tool to ask kafka for the last message info available */ -public class KafkaOffsetProvider implements AutoCloseable +public class KafkaMessageInfoProvider implements AutoCloseable { - public static KafkaOffsetProvider New( - final String topic, - final String groupInstanceId, - final KafkaPropertiesProvider kafkaPropertiesProvider - ) - { - return new KafkaOffsetProvider(notNull(topic), notNull(groupInstanceId), notNull(kafkaPropertiesProvider)); - } - - private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetProvider.class); + public static KafkaMessageInfoProvider New( + final String topic, + final String groupInstanceId, + final KafkaPropertiesProvider kafkaPropertiesProvider + ) + { + return new KafkaMessageInfoProvider( + notNull(topic), + notNull(groupInstanceId), + notNull(kafkaPropertiesProvider) + ); + } + + private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageInfoProvider.class); private static final long PARTITION_ASSIGNMENT_TIMEOUT_MS = 10_000L; private static final Duration POLL_TIMEOUT = Duration.ofMillis(250L); - + private final KafkaConsumer kafka; - private final KafkaPropertiesProvider kafkaPropertiesProvider; + private final KafkaPropertiesProvider kafkaPropertiesProvider; private final String topic; - private KafkaOffsetProvider( - final String topic, - final String groupInstanceId, - final KafkaPropertiesProvider kafkaPropertiesProvider - ) - { - this.topic = topic; - this.kafkaPropertiesProvider = kafkaPropertiesProvider; - this.kafka = this.createKafkaConsumer(groupInstanceId); - } - + private KafkaMessageInfoProvider( + final String topic, + final String groupInstanceId, + final KafkaPropertiesProvider kafkaPropertiesProvider + ) + { + this.topic = topic; + this.kafkaPropertiesProvider = kafkaPropertiesProvider; + this.kafka = this.createKafkaConsumer(groupInstanceId); + } + private KafkaConsumer createKafkaConsumer(final String groupInstanceId) { - final var properties = this.kafkaPropertiesProvider.provide(); + final var properties = this.kafkaPropertiesProvider.provide(); properties.setProperty(GROUP_ID_CONFIG, groupInstanceId); properties.setProperty(GROUP_INSTANCE_ID_CONFIG, groupInstanceId); properties.setProperty(CLIENT_ID_CONFIG, groupInstanceId); @@ -78,19 +82,19 @@ private KafkaConsumer createKafkaConsumer(final String groupInst properties.setProperty(ISOLATION_LEVEL_CONFIG, READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); return new KafkaConsumer<>(properties); } - + @Override public void close() throws KafkaException { - LOG.trace("Closing KafkaOffsetProvider."); + LOG.trace("Closing KafkaMessageInfoProvider"); this.kafka.close(); } - + public void init() throws KafkaException { - LOG.trace("Initializing kafka offset provider. Subscribing to topic {}", this.topic); + LOG.trace("Initializing KafkaMessageInfoProvider. Subscribing to topic {}", this.topic); this.kafka.subscribe(Collections.singleton(this.topic)); - + LOG.trace("Polling consumer until we have partitions assigned."); final long startMs = System.currentTimeMillis(); final long endMs = startMs + PARTITION_ASSIGNMENT_TIMEOUT_MS; @@ -103,46 +107,46 @@ public void init() throws KafkaException this.kafka.poll(POLL_TIMEOUT); } } - + /** * Creates a new kafka consumer and asks for the last message in the topic, - * returns the microstream offset of that message + * returns the message index of that message */ - public long provideLatestOffset() throws KafkaException + public long provideLatestMessageIndex() throws KafkaException { long lastMessageIndex = Long.MIN_VALUE; - + this.seekToLastOffsets(); - -// LOG.trace("Polling latest messages for topic {}", this.topic); + + // LOG.trace("Polling latest messages for topic {}", this.topic); for (final var rec : this.kafka.poll(POLL_TIMEOUT)) { - final long messageIndex = ClusterStorageBinaryDistributedKafka.deserializeLong( - rec.headers().lastHeader(ClusterStorageBinaryDistributedKafka.keyMessageIndex()).value() - ); + final long messageIndex = ClusterStorageBinaryDistributedKafka.deserializeLong( + rec.headers().lastHeader(ClusterStorageBinaryDistributedKafka.keyMessageIndex()).value() + ); if (messageIndex > lastMessageIndex) { lastMessageIndex = messageIndex; } } - + return lastMessageIndex; } - public OffsetInfo provideLatestOffsetInfo() - { - final var msOffset = this.provideLatestOffset(); + public MessageInfo provideLatestMessageInfo() + { + final var messageIndex = this.provideLatestMessageIndex(); - final EqHashTable kafkaOffsets = EqHashTable.New(); - for (final var partition : this.kafka.assignment()) - { - final long offset = this.kafka.position(partition); - kafkaOffsets.put(partition, offset); - } + final EqHashTable kafkaOffsets = EqHashTable.New(); + for (final var partition : this.kafka.assignment()) + { + final long offset = this.kafka.position(partition); + kafkaOffsets.put(partition, offset); + } + + return MessageInfo.New(messageIndex, kafkaOffsets.immure()); + } - return OffsetInfo.New(msOffset, kafkaOffsets.immure()); - } - private void seekToLastOffsets() throws KafkaException { LOG.trace("Seeking to last partition messages"); diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/OffsetInfo.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/MessageInfo.java similarity index 67% rename from cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/OffsetInfo.java rename to cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/MessageInfo.java index 6a7ec76..a60adb5 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/OffsetInfo.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/MessageInfo.java @@ -20,25 +20,25 @@ import org.eclipse.serializer.chars.VarString; import org.eclipse.serializer.collections.types.XImmutableMap; -public interface OffsetInfo +public interface MessageInfo { - long msOffset(); + long messageIndex(); XImmutableMap kafkaPartitionOffsets(); - static OffsetInfo New(final long msOffset, final XImmutableMap kafkaPartitionOffsets) + static MessageInfo New(final long messageIndex, final XImmutableMap kafkaPartitionOffsets) { - return new Default(msOffset, notNull(kafkaPartitionOffsets)); + return new Default(messageIndex, notNull(kafkaPartitionOffsets)); } - final class Default implements OffsetInfo + final class Default implements MessageInfo { - private final long msOffset; + private final long messageIndex; private final XImmutableMap kafkaPartitionOffsets; - private Default(final long msOffset, final XImmutableMap kafkaPartitionOffsets) + private Default(final long messageIndex, final XImmutableMap kafkaPartitionOffsets) { - this.msOffset = msOffset; + this.messageIndex = messageIndex; this.kafkaPartitionOffsets = kafkaPartitionOffsets; } @@ -49,17 +49,17 @@ public XImmutableMap kafkaPartitionOffsets() } @Override - public long msOffset() + public long messageIndex() { - return this.msOffset; + return this.messageIndex; } @Override public String toString() { return VarString.New() - .add("OffsetInfo{msOffset=") - .add(this.msOffset) + .add("MessageInfo{messageIndex=") + .add(this.messageIndex) .add(",kafkaPartitionOffsets=") .add(this.kafkaPartitionOffsets.toString()) .add('}') diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/NetworkArchiveBackupBackend.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/NetworkArchiveBackupBackend.java index 8351a70..afe6f8a 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/NetworkArchiveBackupBackend.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/NetworkArchiveBackupBackend.java @@ -35,13 +35,13 @@ public interface NetworkArchiveBackupBackend extends StorageBackupBackend static NetworkArchiveBackupBackend New( final Path storageExportScratchSpacePath, final BackupProxyHttpClient backupProxyHttpClient, - final StoredOffsetManager.Creator storedOffsetManagerCreator + final StoredMessageIndexManager.Creator storedMessageInfoManagerCreator ) { return new Default( notNull(storageExportScratchSpacePath), notNull(backupProxyHttpClient), - notNull(storedOffsetManagerCreator) + notNull(storedMessageInfoManagerCreator) ); } @@ -52,17 +52,17 @@ final class Default implements NetworkArchiveBackupBackend private final Path storageExportScratchSpacePath; private final BackupProxyHttpClient http; - private final StoredOffsetManager.Creator offsetManagerCreator; + private final StoredMessageIndexManager.Creator messageInfoManagerCreator; private Default( final Path storageExportScratchSpacePath, final BackupProxyHttpClient backupProxyHttpClient, - final StoredOffsetManager.Creator storedOffsetManagerCreator + final StoredMessageIndexManager.Creator storedMessageInfoManagerCreator ) { this.storageExportScratchSpacePath = storageExportScratchSpacePath; this.http = backupProxyHttpClient; - this.offsetManagerCreator = storedOffsetManagerCreator; + this.messageInfoManagerCreator = storedMessageInfoManagerCreator; } @Override @@ -96,7 +96,7 @@ public void deleteBackup(final BackupMetadata backup) throws NodelibraryExceptio @Override public void createAndUploadBackup( final StorageConnection connection, - final OffsetInfo offsetInfo, + final MessageInfo messageInfo, final BackupMetadata backup ) throws NodelibraryException { @@ -112,12 +112,12 @@ public void createAndUploadBackup( connection.issueFullBackup(fs.ensureDirectory(this.storageExportScratchSpacePath.resolve("storage"))); // TODO: Hardcoded offset file name try ( - final var offsetWriter = this.offsetManagerCreator.create( + final var infoWriter = this.messageInfoManagerCreator.create( fs.ensureFile(this.storageExportScratchSpacePath.resolve("offset")).tryUseWriting() ) ) { - offsetWriter.set(offsetInfo); + infoWriter.set(messageInfo); } this.compressStorage(this.storageExportScratchSpacePath.toString(), archiveFilePath); @@ -175,16 +175,16 @@ public boolean hasUserUploadedStorage() throws NodelibraryException private void clearScratchSpace() { final Path storagePath = this.storageExportScratchSpacePath.resolve("storage"); - final Path offsetPath = this.storageExportScratchSpacePath.resolve("offset"); + final Path messageInfoPath = this.storageExportScratchSpacePath.resolve("offset"); if (Files.exists(storagePath)) { this.deleteDirectory(storagePath); } - if(Files.exists(offsetPath)) + if(Files.exists(messageInfoPath)) { - this.deleteFile(offsetPath); + this.deleteFile(messageInfoPath); } } diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageBackupBackend.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageBackupBackend.java index 9a20417..2d81372 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageBackupBackend.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageBackupBackend.java @@ -43,7 +43,7 @@ default BackupMetadata latestBackup(final boolean ignoreManualSlot) throws Nodel void deleteBackup(BackupMetadata backup) throws NodelibraryException; - void createAndUploadBackup(StorageConnection connection, final OffsetInfo offsetInfo, BackupMetadata backup) + void createAndUploadBackup(StorageConnection connection, final MessageInfo messageInfo, BackupMetadata backup) throws NodelibraryException; void downloadBackup(Path storageDestinationParentPath, BackupMetadata backup) throws NodelibraryException; diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageBackupManager.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageBackupManager.java index 969e192..6f24e19 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageBackupManager.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageBackupManager.java @@ -59,7 +59,7 @@ static StorageBackupManager New( final int maxBackupCount, final StorageBackupBackend storageBackupBackend, - final Supplier offsetSupplier, + final Supplier messageInfoSupplier, final ClusterStorageBinaryDataClient dataClient ) { @@ -67,7 +67,7 @@ static StorageBackupManager New( notNull(storageConnection), positive(maxBackupCount), notNull(storageBackupBackend), - notNull(offsetSupplier), + notNull(messageInfoSupplier), notNull(dataClient) ); } @@ -79,7 +79,7 @@ class Default implements StorageBackupManager private final StorageConnection storageConnection; private final int maxBackupCount; private final StorageBackupBackend backend; - private final Supplier offsetSupplier; + private final Supplier messageInfoSupplier; private final ClusterStorageBinaryDataClient dataClient; private Default( @@ -87,14 +87,14 @@ private Default( final int maxBackupCount, final StorageBackupBackend backupBackend, - final Supplier offsetSupplier, + final Supplier messageInfoSupplier, final ClusterStorageBinaryDataClient dataClient ) { this.storageConnection = storageConnection; this.maxBackupCount = maxBackupCount; this.backend = backupBackend; - this.offsetSupplier = offsetSupplier; + this.messageInfoSupplier = messageInfoSupplier; this.dataClient = dataClient; } @@ -127,7 +127,7 @@ public void createStorageBackup(final boolean useManualSlot) throws NodelibraryE this.stopDataClient(); - this.backend.createAndUploadBackup(this.storageConnection, this.offsetSupplier.get(), newBackup); + this.backend.createAndUploadBackup(this.storageConnection, this.messageInfoSupplier.get(), newBackup); this.dataClient.resume(); } @@ -183,7 +183,7 @@ public List listBackups() throws NodelibraryException private void stopDataClient() { LOG.trace("Waiting for data client to stop reading"); - this.dataClient.stopAtLatestOffset(); + this.dataClient.stopAtLatestMessage(); while (this.dataClient.isRunning()) { XThreads.sleep(500); diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeHealthCheck.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeHealthCheck.java index 5973b5b..2ee0973 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeHealthCheck.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeHealthCheck.java @@ -58,7 +58,7 @@ final class Default implements StorageNodeHealthCheck private static final Logger LOG = LoggerFactory.getLogger(StorageNodeHealthCheck.class); private static final long KAFKA_MESSAGE_LAG_TOLERANCE = 10; - private final KafkaOffsetProvider kafka; + private final KafkaMessageInfoProvider kafka; private final StorageController storageController; private final ClusterStorageBinaryDataClient dataClient; @@ -72,7 +72,7 @@ private Default( final KafkaPropertiesProvider kafkaPropertiesProvider ) { - this.kafka = KafkaOffsetProvider.New(topic, groupId, kafkaPropertiesProvider); + this.kafka = KafkaMessageInfoProvider.New(topic, groupId, kafkaPropertiesProvider); this.storageController = storageController; this.dataClient = dataClient; } @@ -135,25 +135,25 @@ private synchronized boolean isKafkaReady() throws KafkaException return this.tryRun(() -> { - final long latestOffset = this.kafka.provideLatestOffset(); - final long currentOffset = this.dataClient.offsetInfo().msOffset(); + final long latestMessageIndex = this.kafka.provideLatestMessageIndex(); + final long currentMessageIndex = this.dataClient.messageInfo().messageIndex(); - if (LOG.isTraceEnabled() && latestOffset - currentOffset > 1_000) + if (LOG.isTraceEnabled() && latestMessageIndex - currentMessageIndex > 1_000) { LOG.trace( - "Current Offset: {}, Latest Offset: {}, Difference: {}", - currentOffset, - latestOffset, - latestOffset - currentOffset + "Current MessageIndex: {}, Latest MessageIndex: {}, Difference: {}", + currentMessageIndex, + latestMessageIndex, + latestMessageIndex - currentMessageIndex ); } - if (currentOffset >= latestOffset) + if (currentMessageIndex >= latestMessageIndex) { return true; } - return latestOffset - currentOffset <= KAFKA_MESSAGE_LAG_TOLERANCE; + return latestMessageIndex - currentMessageIndex <= KAFKA_MESSAGE_LAG_TOLERANCE; }); } @@ -173,7 +173,7 @@ public synchronized void close() } catch (final KafkaException e) { - LOG.error("Failed to close kafka offset getter", e); + LOG.error("Failed to close KafkaMessageInfoProvider", e); } this.isActive = false; diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeManager.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeManager.java index b33d587..9b281a4 100644 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeManager.java +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StorageNodeManager.java @@ -43,7 +43,7 @@ static StorageNodeManager New( final StorageNodeHealthCheck healthCheck, final StorageController storageController, final StorageDiskSpaceReader storageDiskSpaceReader, - final KafkaOffsetProvider kafkaOffsetProvider + final KafkaMessageInfoProvider kafkaMessageInfoProvider ) { return new Default( @@ -53,7 +53,7 @@ static StorageNodeManager New( notNull(healthCheck), notNull(storageController), notNull(storageDiskSpaceReader), - notNull(kafkaOffsetProvider) + notNull(kafkaMessageInfoProvider) ); } @@ -67,7 +67,7 @@ final class Default implements StorageNodeManager private final StorageNodeHealthCheck healthCheck; private final StorageController storageController; private final StorageDiskSpaceReader storageDiskSpaceReader; - private final KafkaOffsetProvider kafkaOffsetProvider; + private final KafkaMessageInfoProvider kafkaMessageInfoProvider; private boolean isDistributor; private boolean isSwitchingToDistributor; @@ -79,7 +79,7 @@ public Default( final StorageNodeHealthCheck healthCheck, final StorageController storageController, final StorageDiskSpaceReader storageDiskSpaceReader, - final KafkaOffsetProvider kafkaOffsetProvider + final KafkaMessageInfoProvider kafkaMessageInfoProvider ) { this.dataDistributor = dataDistributor; @@ -88,7 +88,7 @@ public Default( this.storageController = storageController; this.storageDiskSpaceReader = storageDiskSpaceReader; this.storageTaskExecutor = storageTaskExecutor; - this.kafkaOffsetProvider = kafkaOffsetProvider; + this.kafkaMessageInfoProvider = kafkaMessageInfoProvider; } @Override @@ -151,7 +151,7 @@ public void switchToDistribution() LOG.info("Turning on distribution."); this.isSwitchingToDistributor = true; - this.dataClient.stopAtLatestOffset(); + this.dataClient.stopAtLatestMessage(); } @Override @@ -172,12 +172,12 @@ public boolean finishDistributonSwitch() throws NotADistributorException return false; } - final var offsetInfo = this.dataClient.offsetInfo(); + final var messageInfo = this.dataClient.messageInfo(); // once a node has been switched to distribution it will never become a reader node anymore this.healthCheck.close(); this.dataClient.dispose(); - this.dataDistributor.offset(offsetInfo.msOffset()); + this.dataDistributor.messageIndex(messageInfo.messageIndex()); this.isDistributor = true; this.isSwitchingToDistributor = false; @@ -189,18 +189,18 @@ public long getCurrentMessageIndex() { if (this.isDistributor()) { - return this.dataDistributor.offset(); + return this.dataDistributor.messageIndex(); } else { - return this.dataClient.offsetInfo().msOffset(); + return this.dataClient.messageInfo().messageIndex(); } } @Override public long getLatestMessageIndex() { - return this.kafkaOffsetProvider.provideLatestOffset(); + return this.kafkaMessageInfoProvider.provideLatestMessageIndex(); } @Override diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StoredMessageIndexManager.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StoredMessageIndexManager.java new file mode 100644 index 0000000..cffaf32 --- /dev/null +++ b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StoredMessageIndexManager.java @@ -0,0 +1,226 @@ +package org.eclipse.datagrid.cluster.nodelibrary.types; + +/*- + * #%L + * Eclipse Data Grid Cluster Nodelibrary + * %% + * Copyright (C) 2025 MicroStream Software + * %% + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * #L% + */ + +import static org.eclipse.serializer.util.X.notNull; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.TopicPartition; +import org.eclipse.datagrid.cluster.nodelibrary.exceptions.NodelibraryException; +import org.eclipse.serializer.afs.types.AWritableFile; +import org.eclipse.serializer.chars.VarString; +import org.eclipse.serializer.collections.EqHashTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface StoredMessageIndexManager extends AutoCloseable +{ + MessageInfo get() throws NodelibraryException; + + void set(MessageInfo messageInfoInfo) throws NodelibraryException; + + @Override + void close(); + + static StoredMessageIndexManager New(final AWritableFile messageInfoFile) + { + return new Default(notNull(messageInfoFile)); + } + + @FunctionalInterface + interface Creator + { + StoredMessageIndexManager create(AWritableFile offsetFile); + } + + final class Default implements StoredMessageIndexManager + { + private static final Logger LOG = LoggerFactory.getLogger(StoredMessageIndexManager.class); + + private final AWritableFile messageInfoFile; + + private boolean closed = false; + private boolean initialized = false; + + private MessageInfo messageInfo; + + private Default(final AWritableFile messageInfoFile) + { + this.messageInfoFile = messageInfoFile; + } + + @Override + public MessageInfo get() throws NodelibraryException + { + this.ensureInit(); + return this.messageInfo; + } + + @Override + public void set(final MessageInfo messageInfo) throws NodelibraryException + { + this.ensureInit(); + + final var str = VarString.New(); + str.add(messageInfo.messageIndex()).lf(); + messageInfo.kafkaPartitionOffsets() + .forEach( + entry -> str.add(entry.key().topic()) + .add(',') + .add(entry.key().partition()) + .add(',') + .add(entry.value().longValue()) + .lf() + ); + + final var buffer = ByteBuffer.wrap(str.encode()); + + try + { + this.messageInfoFile.truncate(0); + // for some reason 0x0a was added to the end once, maybe some afs weirdness? + final long written = this.messageInfoFile.writeBytes(buffer); + if (LOG.isDebugEnabled() && messageInfo.messageIndex() % 10_000 == 0) + { + LOG.debug("Stored message index {}, written {} bytes", messageInfo.messageIndex(), written); + } + } + catch (final RuntimeException e) + { + throw new NodelibraryException("Failed to write message info file", e); + } + + this.messageInfo = messageInfo; + } + + private void ensureInit() throws NodelibraryException + { + if (this.initialized) + { + return; + } + + LOG.info("Initializing StoredMessageIndexManager"); + + final boolean createdNew = this.messageInfoFile.ensureExists(); + + if (createdNew) + { + LOG.debug("New message info file has been created."); + final EqHashTable partitionOffsets = EqHashTable.New(); + this.messageInfo = MessageInfo.New(Long.MIN_VALUE, partitionOffsets.immure()); + } + else + { + LOG.debug("Reading existing message info file."); + final ByteBuffer fileBytesBuffer; + try + { + fileBytesBuffer = this.messageInfoFile.readBytes(); + } + catch (final NodelibraryException e) + { + throw new NodelibraryException("Failed to read message info file", e); + } + + if (fileBytesBuffer.remaining() == 0) + { + LOG.debug("Previous message info file is empty"); + final EqHashTable partitionOffsets = EqHashTable.New(); + this.messageInfo = MessageInfo.New(Long.MIN_VALUE, partitionOffsets.immure()); + } + else + { + this.messageInfo = this.parseMessageInfo(fileBytesBuffer); + LOG.debug("Read previous message index at {}", this.messageInfo.messageIndex()); + } + } + + this.initialized = true; + } + + private MessageInfo parseMessageInfo(final ByteBuffer buffer) throws NodelibraryException + { + final var bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + final String[] rows = new String(bytes, StandardCharsets.UTF_8).trim().split("\n"); + try + { + // parse message index + final long messageIndex = Long.parseLong(rows[0]); + + // parse partition offsets + final EqHashTable partitionOffsets = EqHashTable.New(); + for (int i = 1; i < rows.length; i++) + { + final String[] cols = rows[i].split(","); + if (cols.length != 3) + { + throw new NodelibraryException( + "Offset Partition column formatting wrong, excpeted 3 comma separated columns: " + rows[i] + ); + } + + final String topic = cols[0]; + final int partition = Integer.parseInt(cols[1]); + final long offset = Long.parseLong(cols[2]); + + final var topicPartition = new TopicPartition(topic, partition); + + LOG.debug("Parsed partition {} at offset {}", topicPartition, offset); + if (partitionOffsets.get(topicPartition) != null) + { + throw new NodelibraryException("Offset file contains duplicate partition " + partition); + } + partitionOffsets.put(topicPartition, offset); + } + + return MessageInfo.New(messageIndex, partitionOffsets.immure()); + } + catch (final NumberFormatException | IndexOutOfBoundsException | NodelibraryException e) + { + if (e instanceof NodelibraryException) + { + throw e; + } + throw new NodelibraryException("Failed to parse message info file", e); + } + } + + @Override + public void close() + { + if (this.closed) + { + return; + + } + LOG.trace("Closing StoredMessageIndexManager"); + + try + { + this.messageInfoFile.release(); + } + catch (final RuntimeException e) + { + LOG.error("Failed to release message info file", e); + } + + this.closed = true; + } + } +} diff --git a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StoredOffsetManager.java b/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StoredOffsetManager.java deleted file mode 100644 index 26aa0c4..0000000 --- a/cluster/nodelibrary/nodelibrary/src/main/java/org/eclipse/datagrid/cluster/nodelibrary/types/StoredOffsetManager.java +++ /dev/null @@ -1,224 +0,0 @@ -package org.eclipse.datagrid.cluster.nodelibrary.types; - -/*- - * #%L - * Eclipse Data Grid Cluster Nodelibrary - * %% - * Copyright (C) 2025 MicroStream Software - * %% - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - * #L% - */ - -import static org.eclipse.serializer.util.X.notNull; - -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; - -import org.apache.kafka.common.TopicPartition; -import org.eclipse.datagrid.cluster.nodelibrary.exceptions.NodelibraryException; -import org.eclipse.serializer.afs.types.AWritableFile; -import org.eclipse.serializer.chars.VarString; -import org.eclipse.serializer.collections.EqHashTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public interface StoredOffsetManager extends AutoCloseable -{ - OffsetInfo get() throws NodelibraryException; - - void set(OffsetInfo offsetInfo) throws NodelibraryException; - - @Override - void close(); - - static StoredOffsetManager New(final AWritableFile offsetFile) - { - return new Default(notNull(offsetFile)); - } - - @FunctionalInterface - interface Creator - { - StoredOffsetManager create(AWritableFile offsetFile); - } - - final class Default implements StoredOffsetManager - { - private static final Logger LOG = LoggerFactory.getLogger(StoredOffsetManager.class); - - private final AWritableFile offsetFile; - - private boolean closed = false; - private boolean initialized = false; - - private OffsetInfo offsetInfo; - - private Default(final AWritableFile offsetFile) - { - this.offsetFile = offsetFile; - } - - @Override - public OffsetInfo get() throws NodelibraryException - { - this.ensureInit(); - return this.offsetInfo; - } - - @Override - public void set(final OffsetInfo offsetInfo) throws NodelibraryException - { - this.ensureInit(); - - final var str = VarString.New(); - str.add(offsetInfo.msOffset()).lf(); - offsetInfo.kafkaPartitionOffsets() - .forEach( - entry -> str.add(entry.key().topic()) - .add(',') - .add(entry.key().partition()) - .add(',') - .add(entry.value().longValue()) - .lf() - ); - - final var buffer = ByteBuffer.wrap(str.encode()); - - try - { - this.offsetFile.truncate(0); - // for some reason 0x0a was added to the end once, maybe some afs weirdness? - final long written = this.offsetFile.writeBytes(buffer); - if (LOG.isDebugEnabled() && offsetInfo.msOffset() % 10_000 == 0) - { - LOG.debug("Stored offset {}, written {} bytes", offsetInfo.msOffset(), written); - } - } - catch (final RuntimeException e) - { - throw new NodelibraryException("Failed to write offset file", e); - } - - this.offsetInfo = offsetInfo; - } - - private void ensureInit() throws NodelibraryException - { - if (this.initialized) - { - return; - } - - LOG.info("Initializing stored offset manager."); - - final boolean createdNew = this.offsetFile.ensureExists(); - - if (createdNew) - { - LOG.debug("New offset file has been created."); - final EqHashTable partitionOffsets = EqHashTable.New(); - this.offsetInfo = OffsetInfo.New(Long.MIN_VALUE, partitionOffsets.immure()); - } - else - { - LOG.debug("Reading existing offset file."); - final ByteBuffer fileBytesBuffer; - try - { - fileBytesBuffer = this.offsetFile.readBytes(); - } - catch (final NodelibraryException e) - { - throw new NodelibraryException("Failed to read offset file", e); - } - - if (fileBytesBuffer.remaining() == 0) - { - LOG.debug("Previous offset file is empty"); - final EqHashTable partitionOffsets = EqHashTable.New(); - this.offsetInfo = OffsetInfo.New(Long.MIN_VALUE, partitionOffsets.immure()); - } - else - { - this.offsetInfo = this.parseOffsetInfo(fileBytesBuffer); - LOG.debug("Read previous offset at {}", this.offsetInfo.msOffset()); - } - } - - this.initialized = true; - } - - private OffsetInfo parseOffsetInfo(final ByteBuffer buffer) throws NodelibraryException - { - final var bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - final String[] rows = new String(bytes, StandardCharsets.UTF_8).trim().split("\n"); - try - { - final long msOffset = Long.parseLong(rows[0]); - final EqHashTable partitionOffsets = EqHashTable.New(); - for (int i = 1; i < rows.length; i++) - { - final String[] cols = rows[i].split(","); - if (cols.length != 3) - { - throw new NodelibraryException( - "Offset Partition column formatting wrong, excpeted 3 comma separated columns: " + rows[i] - ); - } - - final String topic = cols[0]; - final int partition = Integer.parseInt(cols[1]); - final long offset = Long.parseLong(cols[2]); - - final var topicPartition = new TopicPartition(topic, partition); - - LOG.debug("Parsed partition {} at offset {}", topicPartition, offset); - if (partitionOffsets.get(topicPartition) != null) - { - throw new NodelibraryException("Offset file contains duplicate partition " + partition); - } - partitionOffsets.put(topicPartition, offset); - } - - return OffsetInfo.New(msOffset, partitionOffsets.immure()); - } - catch (final NumberFormatException | IndexOutOfBoundsException | NodelibraryException e) - { - if (e instanceof NodelibraryException) - { - throw e; - } - throw new NodelibraryException("Failed to parse offset file", e); - } - } - - @Override - public void close() - { - if (this.closed) - { - return; - - } - LOG.trace("Closing stored offset manager"); - - try - { - this.offsetFile.release(); - } - catch (final RuntimeException e) - { - LOG.error("Failed to release offset file", e); - } - - this.closed = true; - } - } -}