diff --git a/dd-java-agent/agent-llmobs/build.gradle b/dd-java-agent/agent-llmobs/build.gradle index 541dbeb7ec5..faea293da33 100644 --- a/dd-java-agent/agent-llmobs/build.gradle +++ b/dd-java-agent/agent-llmobs/build.gradle @@ -17,6 +17,8 @@ dependencies { implementation project(':communication') implementation project(':components:json') implementation project(':internal-api') + implementation project(':utils:queue-utils') + testImplementation project(':dd-java-agent:testing') } diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/EvalProcessingWorker.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/EvalProcessingWorker.java index 1e17f90b22c..20ba18b5507 100644 --- a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/EvalProcessingWorker.java +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/EvalProcessingWorker.java @@ -6,6 +6,8 @@ import com.squareup.moshi.JsonAdapter; import com.squareup.moshi.Moshi; +import datadog.common.queue.BlockingConsumerNonBlockingQueue; +import datadog.common.queue.Queues; import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.communication.http.HttpRetryPolicy; @@ -20,7 +22,6 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; -import org.jctools.queues.MpscBlockingConsumerArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +35,7 @@ public class EvalProcessingWorker implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(EvalProcessingWorker.class); - private final MpscBlockingConsumerArrayQueue queue; + private final BlockingConsumerNonBlockingQueue queue; private final Thread serializerThread; public EvalProcessingWorker( @@ -43,7 +44,7 @@ public EvalProcessingWorker( final TimeUnit timeUnit, final SharedCommunicationObjects sco, Config config) { - this.queue = new MpscBlockingConsumerArrayQueue<>(capacity); + this.queue = Queues.mpscBlockingConsumerArrayQueue(capacity); boolean isAgentless = config.isLlmObsAgentlessEnabled(); if (isAgentless && (config.getApiKey() == null || config.getApiKey().isEmpty())) { @@ -98,7 +99,7 @@ public static class EvalSerializingHandler implements Runnable { private static final Logger log = LoggerFactory.getLogger(EvalSerializingHandler.class); private static final int FLUSH_THRESHOLD = 50; - private final MpscBlockingConsumerArrayQueue queue; + private final BlockingConsumerNonBlockingQueue queue; private final long ticksRequiredToFlush; private long lastTicks; @@ -111,7 +112,7 @@ public static class EvalSerializingHandler implements Runnable { private final List buffer = new ArrayList<>(); public EvalSerializingHandler( - final MpscBlockingConsumerArrayQueue queue, + final BlockingConsumerNonBlockingQueue queue, final long flushInterval, final TimeUnit timeUnit, final HttpUrl submissionUrl, diff --git a/dd-java-agent/build.gradle b/dd-java-agent/build.gradle index 8fe70ddf077..7b45aab54d8 100644 --- a/dd-java-agent/build.gradle +++ b/dd-java-agent/build.gradle @@ -353,6 +353,9 @@ dependencies { sharedShadowInclude project(':utils:socket-utils'), { transitive = false } + sharedShadowInclude project(':utils:queue-utils'), { + transitive = false + } sharedShadowInclude project(':utils:version-utils'), { transitive = false } diff --git a/dd-trace-core/build.gradle b/dd-trace-core/build.gradle index 7b111ed4e38..212125b23c3 100644 --- a/dd-trace-core/build.gradle +++ b/dd-trace-core/build.gradle @@ -69,6 +69,7 @@ dependencies { implementation project(':components:json') implementation project(':utils:container-utils') implementation project(':utils:socket-utils') + implementation project(':utils:queue-utils') // for span exception debugging compileOnly project(':dd-java-agent:agent-debugger:debugger-bootstrap') diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index 049ab311a97..f72be366e7c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -2,6 +2,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; +import datadog.common.queue.NonBlockingQueue; import datadog.trace.common.metrics.SignalItem.StopSignal; import datadog.trace.core.util.LRUCache; import java.util.Iterator; @@ -10,8 +11,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import org.jctools.queues.MessagePassingQueue; -import org.jctools.queues.MpscCompoundQueue; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +22,7 @@ final class Aggregator implements Runnable { private static final Logger log = LoggerFactory.getLogger(Aggregator.class); private final Queue batchPool; - private final MpscCompoundQueue inbox; + private final NonBlockingQueue inbox; private final LRUCache aggregates; private final ConcurrentMap pending; private final Set commonKeys; @@ -39,7 +39,7 @@ final class Aggregator implements Runnable { Aggregator( MetricWriter writer, Queue batchPool, - MpscCompoundQueue inbox, + NonBlockingQueue inbox, ConcurrentMap pending, final Set commonKeys, int maxAggregates, @@ -60,7 +60,7 @@ final class Aggregator implements Runnable { Aggregator( MetricWriter writer, Queue batchPool, - MpscCompoundQueue inbox, + NonBlockingQueue inbox, ConcurrentMap pending, final Set commonKeys, int maxAggregates, @@ -103,7 +103,7 @@ public void run() { log.debug("metrics aggregator exited"); } - private final class Drainer implements MessagePassingQueue.Consumer { + private final class Drainer implements Consumer { boolean stopped = false; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 9ca468c24b4..cda4b05bb9d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -19,6 +19,8 @@ import static java.util.Collections.unmodifiableSet; import static java.util.concurrent.TimeUnit.SECONDS; +import datadog.common.queue.NonBlockingQueue; +import datadog.common.queue.Queues; import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; @@ -46,8 +48,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import org.jctools.queues.MpscCompoundQueue; -import org.jctools.queues.SpmcArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,7 +93,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private final ConcurrentHashMap pending; private final ConcurrentHashMap keys; private final Thread thread; - private final MpscCompoundQueue inbox; + private final NonBlockingQueue inbox; private final Sink sink; private final Aggregator aggregator; private final long reportingInterval; @@ -176,8 +176,8 @@ public ConflatingMetricsAggregator( long reportingInterval, TimeUnit timeUnit) { this.ignoredResources = ignoredResources; - this.inbox = new MpscCompoundQueue<>(queueSize); - this.batchPool = new SpmcArrayQueue<>(maxAggregates); + this.inbox = Queues.mpscArrayQueue(queueSize); + this.batchPool = Queues.spmcArrayQueue(maxAggregates); this.pending = new ConcurrentHashMap<>(maxAggregates * 4 / 3); this.keys = new ConcurrentHashMap<>(); this.features = features; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OkHttpSink.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OkHttpSink.java index aa7a735f57d..0143d84cc3b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OkHttpSink.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OkHttpSink.java @@ -9,6 +9,8 @@ import static datadog.trace.common.metrics.EventListener.EventType.OK; import static java.util.concurrent.TimeUnit.SECONDS; +import datadog.common.queue.NonBlockingQueue; +import datadog.common.queue.Queues; import datadog.trace.util.AgentTaskScheduler; import java.io.IOException; import java.nio.ByteBuffer; @@ -23,7 +25,6 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; -import org.jctools.queues.SpscArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,7 @@ public final class OkHttpSink implements Sink, EventListener { private final OkHttpClient client; private final HttpUrl metricsUrl; private final List listeners; - private final SpscArrayQueue enqueuedRequests = new SpscArrayQueue<>(10); + private final NonBlockingQueue enqueuedRequests = Queues.spscArrayQueue(16); private final AtomicLong lastRequestTime = new AtomicLong(); private final AtomicLong asyncRequestCounter = new AtomicLong(); private final boolean bufferingEnabled; @@ -157,9 +158,9 @@ private void handleFailure(okhttp3.Response response) throws IOException { private static final class Sender implements AgentTaskScheduler.Task { - private final SpscArrayQueue inbox; + private final NonBlockingQueue inbox; - private Sender(SpscArrayQueue inbox) { + private Sender(NonBlockingQueue inbox) { this.inbox = inbox; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/SpanSamplingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/SpanSamplingWorker.java index 111fcd9e1cc..36635684c3e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/SpanSamplingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/SpanSamplingWorker.java @@ -5,6 +5,9 @@ import static datadog.trace.util.AgentThreadFactory.newAgentThread; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import datadog.common.queue.BlockingConsumerNonBlockingQueue; +import datadog.common.queue.NonBlockingQueue; +import datadog.common.queue.Queues; import datadog.communication.ddagent.DroppingPolicy; import datadog.trace.common.sampling.SingleSpanSampler; import datadog.trace.core.DDSpan; @@ -12,8 +15,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; -import org.jctools.queues.MessagePassingQueue; -import org.jctools.queues.MpscBlockingConsumerArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +46,7 @@ class DefaultSpanSamplingWorker implements SpanSamplingWorker { private final Thread spanSamplingThread; private final SamplingHandler samplingHandler; - private final MpscBlockingConsumerArrayQueue spanSamplingQueue; + private final BlockingConsumerNonBlockingQueue spanSamplingQueue; private final Queue primaryQueue; private final Queue secondaryQueue; private final SingleSpanSampler singleSpanSampler; @@ -62,7 +63,7 @@ protected DefaultSpanSamplingWorker( DroppingPolicy droppingPolicy) { this.samplingHandler = new SamplingHandler(); this.spanSamplingThread = newAgentThread(SPAN_SAMPLING_PROCESSOR, samplingHandler); - this.spanSamplingQueue = new MpscBlockingConsumerArrayQueue<>(capacity); + this.spanSamplingQueue = Queues.mpscBlockingConsumerArrayQueue(capacity); this.primaryQueue = primaryQueue; this.secondaryQueue = secondaryQueue; this.singleSpanSampler = singleSpanSampler; @@ -172,7 +173,7 @@ public void onEvent(Object event) { } } - private void consumeBatch(MessagePassingQueue queue) { + private void consumeBatch(NonBlockingQueue queue) { queue.drain(this::onEvent, queue.size()); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java index e5bddd5c48d..42202f04e28 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java @@ -5,6 +5,9 @@ import static datadog.trace.util.AgentThreadFactory.newAgentThread; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import datadog.common.queue.BlockingConsumerNonBlockingQueue; +import datadog.common.queue.NonBlockingQueue; +import datadog.common.queue.Queues; import datadog.communication.ddagent.DroppingPolicy; import datadog.trace.api.Config; import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor; @@ -19,8 +22,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; -import org.jctools.queues.MessagePassingQueue; -import org.jctools.queues.MpscBlockingConsumerArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,8 +37,8 @@ public class TraceProcessingWorker implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(TraceProcessingWorker.class); private final PrioritizationStrategy prioritizationStrategy; - private final MpscBlockingConsumerArrayQueue primaryQueue; - private final MpscBlockingConsumerArrayQueue secondaryQueue; + private final BlockingConsumerNonBlockingQueue primaryQueue; + private final BlockingConsumerNonBlockingQueue secondaryQueue; private final TraceSerializingHandler serializingHandler; private final Thread serializerThread; private final int capacity; @@ -121,14 +122,14 @@ public long getRemainingCapacity() { return primaryQueue.remainingCapacity(); } - private static MpscBlockingConsumerArrayQueue createQueue(int capacity) { - return new MpscBlockingConsumerArrayQueue<>(capacity); + private static BlockingConsumerNonBlockingQueue createQueue(int capacity) { + return Queues.mpscBlockingConsumerArrayQueue(capacity); } public static class TraceSerializingHandler implements Runnable { - private final MpscBlockingConsumerArrayQueue primaryQueue; - private final MpscBlockingConsumerArrayQueue secondaryQueue; + private final BlockingConsumerNonBlockingQueue primaryQueue; + private final BlockingConsumerNonBlockingQueue secondaryQueue; private final HealthMetrics healthMetrics; private final long ticksRequiredToFlush; private final boolean doTimeFlush; @@ -136,8 +137,8 @@ public static class TraceSerializingHandler implements Runnable { private long lastTicks; public TraceSerializingHandler( - final MpscBlockingConsumerArrayQueue primaryQueue, - final MpscBlockingConsumerArrayQueue secondaryQueue, + final BlockingConsumerNonBlockingQueue primaryQueue, + final BlockingConsumerNonBlockingQueue secondaryQueue, final HealthMetrics healthMetrics, final PayloadDispatcher payloadDispatcher, final long flushInterval, @@ -238,7 +239,7 @@ private boolean shouldFlush() { return false; } - private void consumeBatch(MessagePassingQueue queue) { + private void consumeBatch(NonBlockingQueue queue) { queue.drain(this::onEvent, queue.size()); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index 0057eb2ce7d..8f67c6c662a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -6,6 +6,8 @@ import static datadog.trace.util.AgentThreadFactory.newAgentThread; import static java.util.Comparator.comparingLong; +import datadog.common.queue.BlockingConsumerNonBlockingQueue; +import datadog.common.queue.Queues; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; import datadog.trace.api.flare.TracerFlare; @@ -18,10 +20,10 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.zip.ZipOutputStream; -import org.jctools.queues.MessagePassingQueue; -import org.jctools.queues.MpscBlockingConsumerArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +63,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { private static final CommandElement DUMP_ELEMENT = new CommandElement(); private static final CommandElement STAND_IN_ELEMENT = new CommandElement(); - private final MpscBlockingConsumerArrayQueue queue; + private final BlockingConsumerNonBlockingQueue queue; private final Thread worker; private final TimeSource timeSource; @@ -136,7 +138,7 @@ public void flush() { } } - private static final class WriteDrain implements MessagePassingQueue.Consumer { + private static final class WriteDrain implements Consumer { private static final WriteDrain WRITE_DRAIN = new WriteDrain(); @Override @@ -145,8 +147,7 @@ public void accept(Element pendingTrace) { } } - private static final class DumpDrain - implements MessagePassingQueue.Consumer, MessagePassingQueue.Supplier { + private static final class DumpDrain implements Consumer, Supplier { private static final Logger LOGGER = LoggerFactory.getLogger(DumpDrain.class); private static final DumpDrain DUMP_DRAIN = new DumpDrain(); private static final int MAX_DUMPED_TRACES = 50; @@ -292,7 +293,7 @@ public DelayingPendingTraceBuffer( Config config, SharedCommunicationObjects sharedCommunicationObjects, HealthMetrics healthMetrics) { - this.queue = new MpscBlockingConsumerArrayQueue<>(bufferSize); + this.queue = Queues.mpscBlockingConsumerArrayQueue(bufferSize); this.worker = newAgentThread(TRACE_MONITOR, new Worker()); this.timeSource = timeSource; boolean runningSpansEnabled = config.isLongRunningTraceEnabled(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 931d3583afe..4b1a65cb364 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -11,6 +11,8 @@ import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; import static datadog.trace.util.AgentThreadFactory.newAgentThread; +import datadog.common.queue.NonBlockingQueue; +import datadog.common.queue.Queues; import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.context.propagation.Propagator; @@ -38,7 +40,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import org.jctools.queues.MpscArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even new StatsPoint(DataStreamsTags.EMPTY, 0, 0, 0, 0, 0, 0, 0, null); private final Map> timeToBucket = new HashMap<>(); - private final MpscArrayQueue inbox = new MpscArrayQueue<>(1024); + private final NonBlockingQueue inbox = Queues.mpscArrayQueue(1024); private final DatastreamsPayloadWriter payloadWriter; private final DDAgentFeaturesDiscovery features; private final TimeSource timeSource; diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/SpanSamplingWorkerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/SpanSamplingWorkerTest.groovy index b5f265168f7..e9cfc737297 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/SpanSamplingWorkerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/SpanSamplingWorkerTest.groovy @@ -104,9 +104,9 @@ class SpanSamplingWorkerTest extends DDSpecification { singleSpanSampler.setSamplingPriority(span7) >> true when: - worker.getSpanSamplingQueue().offer([span1, span2, span3]) - worker.getSpanSamplingQueue().offer([span4, span5]) - worker.getSpanSamplingQueue().offer([span6, span7]) + assert worker.getSpanSamplingQueue().offer([span1, span2, span3]) + assert worker.getSpanSamplingQueue().offer([span4, span5]) + assert worker.getSpanSamplingQueue().offer([span6, span7]) then: primaryQueue.take() == [span1, span3] diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 5c04909b194..ea844ca1ad1 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -27,6 +27,7 @@ final class CachedData { exclude(project(':utils:config-utils')) exclude(project(':utils:container-utils')) exclude(project(':utils:filesystem-utils')) + exclude(project(':utils:queue-utils')) exclude(project(':utils:socket-utils')) exclude(project(':utils:time-utils')) exclude(project(':utils:version-utils')) diff --git a/internal-api/src/main/java/datadog/trace/util/BitUtils.java b/internal-api/src/main/java/datadog/trace/util/BitUtils.java new file mode 100644 index 00000000000..8426c4e1cc4 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/util/BitUtils.java @@ -0,0 +1,34 @@ +package datadog.trace.util; + +public final class BitUtils { + private BitUtils() {} + + /** + * Returns the next power of two greater than or equal to the given value. If the input is zero or + * negative, this method returns 1; + * + * @param value the input value + * @return the next power of two ≥ {@code value} + */ + public static int nextPowerOfTwo(int value) { + // The next power of two for 0 or 1 is 1. + if (value <= 1) { + return 1; + } + + // Compute how many leading zero bits there are in (value - 1). This gives us information about + // where the highest set bit is. + int n = Integer.numberOfLeadingZeros(value - 1); + + // -1 in two's complement = 0xFFFF_FFFF (all bits set to 1). Unsigned right-shifting by n: (-1 + // >>> n) produces a mask of (32 - n) one-bits. + int result = (-1 >>> n) + 1; + + // If result overflowed clamp it to the largest unsigned power of two fitting an int. + if (result <= 0) { + return 1 << 30; + } + + return result; + } +} diff --git a/internal-api/src/test/groovy/datadog/trace/util/BitUtilsTest.groovy b/internal-api/src/test/groovy/datadog/trace/util/BitUtilsTest.groovy new file mode 100644 index 00000000000..fb85a2100d9 --- /dev/null +++ b/internal-api/src/test/groovy/datadog/trace/util/BitUtilsTest.groovy @@ -0,0 +1,42 @@ +package datadog.trace.util + +import static datadog.trace.util.BitUtils.nextPowerOfTwo + +import datadog.trace.test.util.DDSpecification + +class BitUtilsTest extends DDSpecification { + def "nextPowerOfTwo(#input) should return #expected"() { + expect: + nextPowerOfTwo(input) == expected + + where: + input | expected + 0 | 1 // smallest case + 1 | 1 // already power of two + 2 | 2 + 3 | 4 + 4 | 4 + 5 | 8 + 6 | 8 + 7 | 8 + 8 | 8 + 9 | 16 + 15 | 16 + 16 | 16 + 17 | 32 + 31 | 32 + 32 | 32 + 33 | 64 + 63 | 64 + 64 | 64 + 65 | 128 + 1000 | 1024 + 1023 | 1024 + 1024 | 1024 + 1025 | 2048 + 4096 | 4096 + 4097 | 8192 + -1 | 1 // negative input edge case + Integer.MAX_VALUE | (1 << 30) // largest safe power of two + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 8f99a3a324d..1412dce4fe0 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -157,6 +157,7 @@ include( ":utils:container-utils", ":utils:filesystem-utils", ":utils:flare-utils", + ":utils:queue-utils", ":utils:socket-utils", ":utils:test-agent-utils:decoder", ":utils:test-utils", diff --git a/utils/queue-utils/build.gradle.kts b/utils/queue-utils/build.gradle.kts new file mode 100644 index 00000000000..081e2cdcd47 --- /dev/null +++ b/utils/queue-utils/build.gradle.kts @@ -0,0 +1,51 @@ +import groovy.lang.Closure +import org.gradle.kotlin.dsl.extra + +plugins { + `java-library` + id("de.thetaphi.forbiddenapis") version "3.8" + id("me.champeau.jmh") + idea +} + +val minJavaVersionForTests by extra(JavaVersion.VERSION_11) + +apply(from = "$rootDir/gradle/java.gradle") + +java { + toolchain { + languageVersion = JavaLanguageVersion.of(11) + } +} + +tasks.withType().configureEach() { + javadocTool = javaToolchains.javadocToolFor(java.toolchain) +} + +fun AbstractCompile.configureCompiler(javaVersionInteger: Int, compatibilityVersion: JavaVersion? = null, unsetReleaseFlagReason: String? = null) { + (project.extra["configureCompiler"] as Closure<*>).call(this, javaVersionInteger, compatibilityVersion, unsetReleaseFlagReason) +} + +listOf(JavaCompile::class.java, GroovyCompile::class.java).forEach { compileTaskType -> + tasks.withType(compileTaskType).configureEach { + configureCompiler(11, JavaVersion.VERSION_1_8) + } +} + +dependencies { + api(project(":internal-api")) + api(libs.jctools) + + testImplementation(project(":dd-java-agent:testing")) + testImplementation(libs.slf4j) +} + +tasks.forbiddenApisMain { + failOnMissingClasses = false +} + +idea { + module { + jdkName = "11" + } +} diff --git a/utils/queue-utils/src/jmh/java/datadog/common/queue/JctoolsMPSCBlockingConsumerQueueBenchmark.java b/utils/queue-utils/src/jmh/java/datadog/common/queue/JctoolsMPSCBlockingConsumerQueueBenchmark.java new file mode 100644 index 00000000000..ebc6347c131 --- /dev/null +++ b/utils/queue-utils/src/jmh/java/datadog/common/queue/JctoolsMPSCBlockingConsumerQueueBenchmark.java @@ -0,0 +1,68 @@ +package datadog.common.queue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.jctools.queues.MpscBlockingConsumerArrayQueue; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/* +Benchmark (capacity) Mode Cnt Score Error Units +JctoolsMPSCBlockingConsumerQueueBenchmark.queueTest 1024 thrpt 41,149 ops/us +JctoolsMPSCBlockingConsumerQueueBenchmark.queueTest:consume 1024 thrpt 30,661 ops/us +JctoolsMPSCBlockingConsumerQueueBenchmark.queueTest:produce 1024 thrpt 10,488 ops/us +JctoolsMPSCBlockingConsumerQueueBenchmark.queueTest 65536 thrpt 32,413 ops/us +JctoolsMPSCBlockingConsumerQueueBenchmark.queueTest:consume 65536 thrpt 24,680 ops/us +JctoolsMPSCBlockingConsumerQueueBenchmark.queueTest:produce 65536 thrpt 7,733 ops/us +*/ +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 1, time = 30) +@Measurement(iterations = 1, time = 30) +@Fork(1) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +public class JctoolsMPSCBlockingConsumerQueueBenchmark { + @State(Scope.Group) + public static class QueueState { + MpscBlockingConsumerArrayQueue queue; + CountDownLatch consumerReady; + + @Param({"1024", "65536"}) + int capacity; + + @Setup(Level.Iteration) + public void setup() { + queue = new MpscBlockingConsumerArrayQueue<>(capacity); + } + } + + @Benchmark + @Group("queueTest") + @GroupThreads(4) + public void produce(QueueState state, Blackhole bh) { + bh.consume(state.queue.offer(1)); + } + + @Benchmark + @Group("queueTest") + @GroupThreads(1) + public void consume(QueueState state, Blackhole bh) { + Integer v = state.queue.poll(); + if (v != null) { + bh.consume(v); + } + } +} diff --git a/utils/queue-utils/src/jmh/java/datadog/common/queue/JctoolsMPSCQueueBenchmark.java b/utils/queue-utils/src/jmh/java/datadog/common/queue/JctoolsMPSCQueueBenchmark.java new file mode 100644 index 00000000000..a7ba1fa433e --- /dev/null +++ b/utils/queue-utils/src/jmh/java/datadog/common/queue/JctoolsMPSCQueueBenchmark.java @@ -0,0 +1,66 @@ +package datadog.common.queue; + +import java.util.concurrent.TimeUnit; +import org.jctools.queues.MpscArrayQueue; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/* +Benchmark (capacity) Mode Cnt Score Error Units +JctoolsMPSCQueueBenchmark.queueTest 1024 thrpt 41,784 ops/us +JctoolsMPSCQueueBenchmark.queueTest:consume 1024 thrpt 31,070 ops/us +JctoolsMPSCQueueBenchmark.queueTest:produce 1024 thrpt 10,715 ops/us +JctoolsMPSCQueueBenchmark.queueTest 65536 thrpt 39,589 ops/us +JctoolsMPSCQueueBenchmark.queueTest:consume 65536 thrpt 32,370 ops/us +JctoolsMPSCQueueBenchmark.queueTest:produce 65536 thrpt 7,219 ops/us + */ +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 1, time = 30) +@Measurement(iterations = 1, time = 30) +@Fork(1) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +public class JctoolsMPSCQueueBenchmark { + @State(Scope.Group) + public static class QueueState { + MpscArrayQueue queue; + + @Param({"1024", "65536"}) + int capacity; + + @Setup(Level.Iteration) + public void setup() { + queue = new MpscArrayQueue<>(capacity); + } + } + + @Benchmark + @Group("queueTest") + @GroupThreads(4) + public void produce(QueueState state, Blackhole blackhole) { + blackhole.consume(state.queue.offer(0)); + } + + @Benchmark + @Group("queueTest") + @GroupThreads(1) + public void consume(QueueState state, Blackhole bh) { + Integer v = state.queue.poll(); + if (v != null) { + bh.consume(v); + } + } +} diff --git a/utils/queue-utils/src/jmh/java/datadog/common/queue/JctoolsSPSCQueueBenchmark.java b/utils/queue-utils/src/jmh/java/datadog/common/queue/JctoolsSPSCQueueBenchmark.java new file mode 100644 index 00000000000..8ecb13329fd --- /dev/null +++ b/utils/queue-utils/src/jmh/java/datadog/common/queue/JctoolsSPSCQueueBenchmark.java @@ -0,0 +1,66 @@ +package datadog.common.queue; + +import java.util.concurrent.TimeUnit; +import org.jctools.queues.SpscArrayQueue; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/* +Benchmark (capacity) Mode Cnt Score Error Units +JctoolsSPSCQueueBenchmark.queueTest 1024 thrpt 259,418 ops/us +JctoolsSPSCQueueBenchmark.queueTest:consume 1024 thrpt 129,694 ops/us +JctoolsSPSCQueueBenchmark.queueTest:produce 1024 thrpt 129,724 ops/us +JctoolsSPSCQueueBenchmark.queueTest 65536 thrpt 537,111 ops/us +JctoolsSPSCQueueBenchmark.queueTest:consume 65536 thrpt 268,577 ops/us +JctoolsSPSCQueueBenchmark.queueTest:produce 65536 thrpt 268,534 ops/us + */ +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 3, time = 10) +@Measurement(iterations = 1, time = 30) +@Fork(1) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +public class JctoolsSPSCQueueBenchmark { + @State(Scope.Group) + public static class QueueState { + SpscArrayQueue queue; + + @Param({"1024", "65536"}) + int capacity; + + @Setup(Level.Iteration) + public void setup() { + queue = new SpscArrayQueue<>(capacity); + } + } + + @Benchmark + @Group("queueTest") + @GroupThreads(1) + public void produce(QueueState state, Blackhole bh) { + bh.consume(state.queue.offer(0)); + } + + @Benchmark + @Group("queueTest") + @GroupThreads(1) + public void consume(QueueState state, Blackhole bh) { + Integer v = state.queue.poll(); + if (v != null) { + bh.consume(v); + } + } +} diff --git a/utils/queue-utils/src/jmh/java/datadog/common/queue/MPSCBlockingConsumerQueueBenchmark.java b/utils/queue-utils/src/jmh/java/datadog/common/queue/MPSCBlockingConsumerQueueBenchmark.java new file mode 100644 index 00000000000..4bb42274b00 --- /dev/null +++ b/utils/queue-utils/src/jmh/java/datadog/common/queue/MPSCBlockingConsumerQueueBenchmark.java @@ -0,0 +1,67 @@ +package datadog.common.queue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/* +Benchmark (capacity) Mode Cnt Score Error Units +MPSCBlockingConsumerQueueBenchmark.queueTest 1024 thrpt 258,074 ops/us +MPSCBlockingConsumerQueueBenchmark.queueTest:consume 1024 thrpt 246,683 ops/us +MPSCBlockingConsumerQueueBenchmark.queueTest:produce 1024 thrpt 11,391 ops/us +MPSCBlockingConsumerQueueBenchmark.queueTest 65536 thrpt 224,982 ops/us +MPSCBlockingConsumerQueueBenchmark.queueTest:consume 65536 thrpt 217,498 ops/us +MPSCBlockingConsumerQueueBenchmark.queueTest:produce 65536 thrpt 7,485 ops/us +*/ +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 1, time = 30) +@Measurement(iterations = 1, time = 30) +@Fork(1) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +public class MPSCBlockingConsumerQueueBenchmark { + @State(Scope.Group) + public static class QueueState { + MpscBlockingConsumerArrayQueueVarHandle queue; + CountDownLatch consumerReady; + + @Param({"1024", "65536"}) + int capacity; + + @Setup(Level.Iteration) + public void setup() { + queue = new MpscBlockingConsumerArrayQueueVarHandle<>(capacity); + } + } + + @Benchmark + @Group("queueTest") + @GroupThreads(4) + public void produce(QueueState state, Blackhole bh) { + bh.consume(state.queue.offer(1)); + } + + @Benchmark + @Group("queueTest") + @GroupThreads(1) + public void consume(QueueState state, Blackhole bh) { + Integer v = state.queue.poll(); + if (v != null) { + bh.consume(v); + } + } +} diff --git a/utils/queue-utils/src/jmh/java/datadog/common/queue/MPSCQueueBenchmark.java b/utils/queue-utils/src/jmh/java/datadog/common/queue/MPSCQueueBenchmark.java new file mode 100644 index 00000000000..66b096e4ece --- /dev/null +++ b/utils/queue-utils/src/jmh/java/datadog/common/queue/MPSCQueueBenchmark.java @@ -0,0 +1,65 @@ +package datadog.common.queue; + +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/* +Benchmark (capacity) Mode Cnt Score Error Units +MPSCQueueBenchmark.queueTest 1024 thrpt 238,609 ops/us +MPSCQueueBenchmark.queueTest:consume 1024 thrpt 222,383 ops/us +MPSCQueueBenchmark.queueTest:produce 1024 thrpt 16,226 ops/us +MPSCQueueBenchmark.queueTest 65536 thrpt 262,729 ops/us +MPSCQueueBenchmark.queueTest:consume 65536 thrpt 250,627 ops/us +MPSCQueueBenchmark.queueTest:produce 65536 thrpt 12,102 ops/us + */ +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 1, time = 30) +@Measurement(iterations = 1, time = 30) +@Fork(1) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +public class MPSCQueueBenchmark { + @State(Scope.Group) + public static class QueueState { + MpscArrayQueueVarHandle queue; + + @Param({"1024", "65536"}) + int capacity; + + @Setup(Level.Iteration) + public void setup() { + queue = new MpscArrayQueueVarHandle<>(capacity); + } + } + + @Benchmark + @Group("queueTest") + @GroupThreads(4) + public void produce(QueueState state, Blackhole blackhole) { + blackhole.consume(state.queue.offer(0)); + } + + @Benchmark + @Group("queueTest") + @GroupThreads(1) + public void consume(QueueState state, Blackhole bh) { + Integer v = state.queue.poll(); + if (v != null) { + bh.consume(v); + } + } +} diff --git a/utils/queue-utils/src/jmh/java/datadog/common/queue/SPSCQueueBenchmark.java b/utils/queue-utils/src/jmh/java/datadog/common/queue/SPSCQueueBenchmark.java new file mode 100644 index 00000000000..784d78f77c1 --- /dev/null +++ b/utils/queue-utils/src/jmh/java/datadog/common/queue/SPSCQueueBenchmark.java @@ -0,0 +1,65 @@ +package datadog.common.queue; + +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/* +Benchmark (capacity) Mode Cnt Score Error Units +SPSCQueueBenchmark.queueTest 1024 thrpt 101,007 ops/us +SPSCQueueBenchmark.queueTest:consume 1024 thrpt 72,542 ops/us +SPSCQueueBenchmark.queueTest:produce 1024 thrpt 28,465 ops/us +SPSCQueueBenchmark.queueTest 65536 thrpt 353,161 ops/us +SPSCQueueBenchmark.queueTest:consume 65536 thrpt 191,188 ops/us +SPSCQueueBenchmark.queueTest:produce 65536 thrpt 161,973 ops/us + */ +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 3, time = 10) +@Measurement(iterations = 1, time = 30) +@Fork(1) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +public class SPSCQueueBenchmark { + @State(Scope.Group) + public static class QueueState { + SpscArrayQueueVarHandle queue; + + @Param({"1024", "65536"}) + int capacity; + + @Setup(Level.Iteration) + public void setup() { + queue = new SpscArrayQueueVarHandle<>(capacity); + } + } + + @Benchmark + @Group("queueTest") + @GroupThreads(1) + public void produce(QueueState state, Blackhole bh) { + bh.consume(state.queue.offer(0)); + } + + @Benchmark + @Group("queueTest") + @GroupThreads(1) + public void consume(QueueState state, Blackhole bh) { + Integer v = state.queue.poll(); + if (v != null) { + bh.consume(v); + } + } +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/BaseQueue.java b/utils/queue-utils/src/main/java/datadog/common/queue/BaseQueue.java new file mode 100644 index 00000000000..fec2d5bbc8b --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/BaseQueue.java @@ -0,0 +1,128 @@ +package datadog.common.queue; + +import static datadog.trace.util.BitUtils.nextPowerOfTwo; + +import datadog.common.queue.padding.PaddedSequence; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.Nonnull; + +/** + * Base class for non-blocking queuing operations. + * + * @param the type of elements held by this queue + */ +abstract class BaseQueue extends AbstractQueue implements NonBlockingQueue { + private static final int CACHE_LINE_LONGS = 8; // 64 bytes / 8 bytes per long/ref + protected static final VarHandle ARRAY_HANDLE; + + static { + try { + ARRAY_HANDLE = MethodHandles.arrayElementVarHandle(Object[].class); + } catch (Throwable t) { + throw new ExceptionInInitializerError(t); + } + } + + /** The capacity of the queue (must be a power of two) */ + protected final int capacity; + + /** Mask for fast modulo operation (index = pos & mask) */ + protected final int mask; + + /** The backing array (plain Java array for VarHandle access) */ + protected final Object[] buffer; + + // Padding to avoid false sharing + @SuppressWarnings("unused") + private long p0, p1, p2, p3, p4, p5, p6; + + /** Next free slot for producer (single-threaded) */ + protected final PaddedSequence tail = new PaddedSequence(); + + /** Next slot to consume (multi-threaded) */ + protected final PaddedSequence head = new PaddedSequence(); + + // Padding around head + @SuppressWarnings("unused") + private long r0, r1, r2, r3, r4, r5, r6; + + public BaseQueue(int requestedCapacity) { + this.capacity = nextPowerOfTwo(requestedCapacity); + this.mask = this.capacity - 1; + this.buffer = new Object[capacity + 2 * CACHE_LINE_LONGS]; + } + + protected final int arrayIndex(long sequence) { + return (int) (sequence & mask) + CACHE_LINE_LONGS; + } + + @Override + public int drain(Consumer consumer) { + return drain(consumer, Integer.MAX_VALUE); + } + + @Override + public int drain(@Nonnull Consumer consumer, int limit) { + int count = 0; + E e; + while (count < limit && (e = poll()) != null) { + consumer.accept(e); + count++; + } + return count; + } + + @Override + public int fill(@Nonnull Supplier supplier, int limit) { + if (limit <= 0) { + return 0; + } + + int added = 0; + while (added < limit) { + E e = supplier.get(); + if (e == null) { + break; // stop if supplier exhausted + } + + if (offer(e)) { + added++; + } else { + break; // queue is full + } + } + return added; + } + + /** + * Iterator is not supported. + * + * @throws UnsupportedOperationException always + */ + @Override + public final Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public final int remainingCapacity() { + return capacity - size(); + } + + @Override + public final int capacity() { + return capacity; + } + + @Override + public final int size() { + long currentTail = tail.getVolatile(); + long currentHead = head.getVolatile(); + return (int) (currentTail - currentHead); + } +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/BlockingConsumerNonBlockingQueue.java b/utils/queue-utils/src/main/java/datadog/common/queue/BlockingConsumerNonBlockingQueue.java new file mode 100644 index 00000000000..159483073bd --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/BlockingConsumerNonBlockingQueue.java @@ -0,0 +1,47 @@ +package datadog.common.queue; + +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; + +/** + * A hybrid queue interface combining non-blocking producer semantics with blocking consumer + * operations. + * + *

This interface extends {@link NonBlockingQueue} and adds methods that allow consumers to block + * while waiting for elements to become available. It is intended for use in scenarios with: + * + *

    + *
  • Multiple or single producers enqueue elements using non-blocking operations (e.g., + * {@link #offer(Object)}). + *
  • A single consumer that may block until elements are ready (i.e., using {@link + * #take()} or {@link #poll(long, TimeUnit)}). + *
+ * + * @param the type of elements held in this queue + */ +public interface BlockingConsumerNonBlockingQueue extends NonBlockingQueue { + + /** + * Retrieves and removes the head of this queue, waiting up to the specified wait time if + * necessary for an element to become available. + * + * @param timeout how long to wait before giving up, in units of {@code unit} + * @param unit the time unit of the {@code timeout} argument; must not be {@code null} + * @return the head of this queue, or {@code null} if the specified waiting time elapses before an + * element becomes available + * @throws InterruptedException if interrupted while waiting + */ + E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException; + + /** + * Retrieves and removes the head of this queue, waiting if necessary until an element becomes + * available. + * + *

This operation blocks the consumer thread if the queue is empty, while producers continue to + * operate in a non-blocking manner. + * + * @return the head of this queue + * @throws InterruptedException if interrupted while waiting + */ + E take() throws InterruptedException; +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/JctoolsMpscBlockingConsumerWrappedQueue.java b/utils/queue-utils/src/main/java/datadog/common/queue/JctoolsMpscBlockingConsumerWrappedQueue.java new file mode 100644 index 00000000000..ce4e2b79cc0 --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/JctoolsMpscBlockingConsumerWrappedQueue.java @@ -0,0 +1,37 @@ +package datadog.common.queue; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import org.jctools.queues.MpscBlockingConsumerArrayQueue; + +/** + * A {@link BlockingConsumerNonBlockingQueue} implementation that wraps a JCTools {@link + * MpscBlockingConsumerArrayQueue}. + * + *

All operations delegate directly to the underlying JCTools queue to preserve performance and + * memory semantics. + * + * @param the type of elements held in this queue + */ +class JctoolsMpscBlockingConsumerWrappedQueue extends JctoolsWrappedQueue + implements BlockingConsumerNonBlockingQueue { + + private final BlockingQueue blockingQueueDelegate; + + public JctoolsMpscBlockingConsumerWrappedQueue( + @Nonnull MpscBlockingConsumerArrayQueue delegate) { + super(delegate); + this.blockingQueueDelegate = delegate; + } + + @Override + public E take() throws InterruptedException { + return blockingQueueDelegate.take(); + } + + @Override + public E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException { + return blockingQueueDelegate.poll(timeout, unit); + } +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/JctoolsWrappedQueue.java b/utils/queue-utils/src/main/java/datadog/common/queue/JctoolsWrappedQueue.java new file mode 100644 index 00000000000..5e72a4a7331 --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/JctoolsWrappedQueue.java @@ -0,0 +1,75 @@ +package datadog.common.queue; + +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.Nonnull; +import org.jctools.queues.MessagePassingQueue; + +/** + * A {@link NonBlockingQueue} implementation that wraps a {@link MessagePassingQueue} from the + * JCTools library to provide a consistent, framework-independent interface. + * + *

This adapter bridges JCTools’ queue APIs with the {@link NonBlockingQueue} abstraction used by + * this library. All operations are directly delegated to the underlying {@code MessagePassingQueue} + * + * @param the type of elements held in this queue + */ +class JctoolsWrappedQueue extends AbstractQueue implements NonBlockingQueue { + private final MessagePassingQueue delegate; + + public JctoolsWrappedQueue(@Nonnull MessagePassingQueue delegate) { + this.delegate = delegate; + } + + @Override + public int drain(Consumer consumer) { + return delegate.drain(consumer::accept); + } + + @Override + public int drain(Consumer consumer, int limit) { + return delegate.drain(consumer::accept, limit); + } + + @Override + public int fill(@Nonnull Supplier supplier, int limit) { + return delegate.fill(supplier::get, limit); + } + + @Override + public int remainingCapacity() { + return capacity() - size(); + } + + @Override + public int capacity() { + return delegate.capacity(); + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean offer(E e) { + return delegate.offer(e); + } + + @Override + public E poll() { + return delegate.poll(); + } + + @Override + public E peek() { + return delegate.peek(); + } +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/MpscArrayQueueVarHandle.java b/utils/queue-utils/src/main/java/datadog/common/queue/MpscArrayQueueVarHandle.java new file mode 100644 index 00000000000..850b842a07d --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/MpscArrayQueueVarHandle.java @@ -0,0 +1,112 @@ +package datadog.common.queue; + +import datadog.common.queue.padding.PaddedSequence; +import java.util.Objects; +import java.util.concurrent.locks.LockSupport; + +/** + * A Multiple-Producer, Single-Consumer (MPSC) bounded lock-free queue using a circular array and + * VarHandles. + * + *

All operations are wait-free for the consumer and lock-free for producers. + * + * @param the type of elements stored + */ +class MpscArrayQueueVarHandle extends BaseQueue { + /** Cached producer limit to reduce volatile head reads */ + protected final PaddedSequence producerLimit; + + /** + * Creates a new MPSC queue. + * + * @param requestedCapacity the desired capacity, rounded up to next power of two + */ + public MpscArrayQueueVarHandle(int requestedCapacity) { + super(requestedCapacity); + this.producerLimit = new PaddedSequence(capacity); + ; + } + + @Override + public boolean offer(E e) { + Objects.requireNonNull(e); + + // jctools does the same local copy to have the jitter optimise the accesses + final Object[] localBuffer = this.buffer; + + long localProducerLimit = producerLimit.getVolatile(); + long cachedHead = 0L; // Local cache of head to reduce volatile reads + + int spinCycles = 0; + boolean parkOnSpin = (Thread.currentThread().getId() & 1) == 0; + + while (true) { + long currentTail = tail.getVolatile(); + + // Check if producer limit exceeded + if (currentTail >= localProducerLimit) { + // Refresh head only when necessary + cachedHead = head.getVolatile(); + localProducerLimit = cachedHead + capacity; + + if (currentTail >= localProducerLimit) { + return false; // queue full + } + + // Update producerLimit so other producers also benefit + producerLimit.setVolatile(localProducerLimit); + } + + // Attempt to claim a slot + if (tail.compareAndSet(currentTail, currentTail + 1)) { + final int index = arrayIndex(currentTail); + + // Release-store ensures producer's write is visible to consumer + ARRAY_HANDLE.setRelease(localBuffer, index, e); + return true; + } + + // Backoff to reduce contention + if ((spinCycles & 1) == 0) { + Thread.onSpinWait(); + } else { + if (parkOnSpin) { + LockSupport.parkNanos(1); + } else { + Thread.yield(); + } + } + spinCycles++; + } + } + + @Override + @SuppressWarnings("unchecked") + public final E poll() { + final Object[] localBuffer = this.buffer; + + long currentHead = head.getOpaque(); + final int index = arrayIndex(currentHead); + + // Acquire-load ensures visibility of producer write + Object value = ARRAY_HANDLE.getAcquire(localBuffer, index); + if (value == null) { + return null; + } + + // Clear the slot without additional fence + ARRAY_HANDLE.setOpaque(localBuffer, index, null); + + // Advance head using opaque write (consumer-only) + head.setOpaque(currentHead + 1); + + return (E) value; + } + + @Override + @SuppressWarnings("unchecked") + public final E peek() { + final int index = arrayIndex(head.getOpaque()); + return (E) ARRAY_HANDLE.getVolatile(buffer, index); + } +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/MpscBlockingConsumerArrayQueueVarHandle.java b/utils/queue-utils/src/main/java/datadog/common/queue/MpscBlockingConsumerArrayQueueVarHandle.java new file mode 100644 index 00000000000..186873260d0 --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/MpscBlockingConsumerArrayQueueVarHandle.java @@ -0,0 +1,146 @@ +package datadog.common.queue; + +import datadog.common.queue.padding.PaddedThread; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import javax.annotation.Nonnull; + +/** + * A Multiple-Producer, Single-Consumer (MPSC) bounded lock-free queue using a circular array and + * VarHandles. It adds blocking capabilities for a single consumer (take, timed offer). + * + *

All operations are wait-free for the consumer and lock-free for producers. + * + * @param the type of elements stored + */ +class MpscBlockingConsumerArrayQueueVarHandle extends MpscArrayQueueVarHandle + implements BlockingConsumerNonBlockingQueue { + + /** Reference to the waiting consumer thread (set atomically). */ + private final PaddedThread consumerThread = new PaddedThread(); + + /** + * Creates a new MPSC queue. + * + * @param requestedCapacity the desired capacity, rounded up to next power of two + */ + public MpscBlockingConsumerArrayQueueVarHandle(int requestedCapacity) { + super(requestedCapacity); + } + + @Override + public final boolean offer(E e) { + Objects.requireNonNull(e); + + // jctools does the same local copy to have the jitter optimise the accesses + final Object[] localBuffer = this.buffer; + + long localProducerLimit = producerLimit.getVolatile(); + long cachedHead = 0L; // Local cache of head to reduce volatile reads + + int spinCycles = 0; + boolean parkOnSpin = (Thread.currentThread().getId() & 1) == 0; + + while (true) { + long currentTail = tail.getVolatile(); + + // Check if producer limit exceeded + if (currentTail >= localProducerLimit) { + // Refresh head only when necessary + cachedHead = head.getVolatile(); + localProducerLimit = cachedHead + capacity; + + if (currentTail >= localProducerLimit) { + return false; // queue full + } + + // Update producerLimit so other producers also benefit + producerLimit.setVolatile(localProducerLimit); + } + + // Attempt to claim a slot + if (tail.compareAndSet(currentTail, currentTail + 1)) { + final int index = arrayIndex(currentTail); + + // Release-store ensures producer's write is visible to consumer + ARRAY_HANDLE.setRelease(localBuffer, index, e); + + // Atomically clear and unpark the consumer if waiting + Thread c = consumerThread.getAndSet(null); + if (c != null) { + LockSupport.unpark(c); + } + + return true; + } + + // Backoff to reduce contention + if ((spinCycles & 1) == 0) { + Thread.onSpinWait(); + } else { + if (parkOnSpin) { + LockSupport.parkNanos(1); + } else { + Thread.yield(); + } + } + spinCycles++; + } + } + + @Override + public final E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException { + E e = poll(); + if (e != null) { + return e; + } + + final long parkNanos = unit.toNanos(timeout); + if (parkNanos <= 0) { + return null; + } + + parkUntilNext(parkNanos); + + return poll(); + } + + @Override + public E take() throws InterruptedException { + consumerThread.setVolatile(Thread.currentThread()); + E e; + while ((e = poll()) == null) { + parkUntilNext(-1); + } + return e; + } + + /** + * Blocks (parks) until an element becomes available or until the specified timeout elapses. + * + *

It is safe if only one thread is waiting (it's the case for this single consumer + * implementation). + * + * @param nanos max wait time in nanoseconds. If negative, it will park indefinably until waken or + * interrupted + * @throws InterruptedException if interrupted while waiting + */ + private void parkUntilNext(long nanos) throws InterruptedException { + Thread current = Thread.currentThread(); + // Publish the consumer thread (no ordering required) + consumerThread.setOpaque(current); + if (nanos <= 0) { + LockSupport.park(this); + } else { + LockSupport.parkNanos(this, nanos); + } + + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + // Cleanup (no fence needed, single consumer) + consumerThread.setOpaque(null); + } +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/NonBlockingQueue.java b/utils/queue-utils/src/main/java/datadog/common/queue/NonBlockingQueue.java new file mode 100644 index 00000000000..52a50b3e6d6 --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/NonBlockingQueue.java @@ -0,0 +1,72 @@ +package datadog.common.queue; + +import java.util.Queue; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.Nonnull; + +/** + * A non-blocking, concurrent queue supporting high-performance operations for producer-consumer + * scenarios. This interface extends {@link Queue} and adds specialized methods for bulk draining + * and filling, as well as querying the queue’s fixed capacity. + * + *

Unlike typical {@link java.util.concurrent.BlockingQueue} implementations, this interface does + * not provide blocking operations. Instead, producers and consumers are expected to retry or yield + * when the queue is full or empty, respectively. + * + *

Implementations are typically array-backed and rely on non-blocking atomic operations (such as + * VarHandles or Unsafe-based CAS) to achieve concurrent performance without locks. + * + * @param the type of elements held in this queue + * @see java.util.Queue + * @see java.util.concurrent.ConcurrentLinkedQueue + */ +public interface NonBlockingQueue extends Queue { + + /** + * Drains all available elements from this queue, passing each to the given {@link Consumer}. + * + *

This method will consume as many elements as are currently available, up to the queue’s size + * at the time of the call. + * + * @param consumer the consumer that will process each element; must not be {@code null} + * @return the number of elements drained + * @throws NullPointerException if {@code consumer} is {@code null} + */ + int drain(Consumer consumer); + + /** + * Drains up to the specified number of elements from this queue, passing each to the given {@link + * Consumer}. + * + * @param consumer the consumer that will process each element; must not be {@code null} + * @param limit the maximum number of elements to drain + * @return the actual number of elements drained (maybe less than {@code limit}) + */ + int drain(Consumer consumer, int limit); + + /** + * Fills the queue with elements supplied by the given {@link Supplier}, up to the specified limit + * or until the queue becomes full. + * + * @param supplier the supplier that provides elements to insert; must not be {@code null} + * @param limit the maximum number of elements to insert + * @return the number of elements successfully added (maybe less than {@code limit}) + */ + int fill(@Nonnull Supplier supplier, int limit); + + /** + * Returns the number of additional elements that can be inserted into this queue without + * exceeding its capacity. + * + * @return the number of remaining slots available for insertion + */ + int remainingCapacity(); + + /** + * Returns the total fixed capacity of this queue. + * + * @return the maximum number of elements this queue can hold + */ + int capacity(); +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/Queues.java b/utils/queue-utils/src/main/java/datadog/common/queue/Queues.java new file mode 100644 index 00000000000..997a7159c95 --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/Queues.java @@ -0,0 +1,89 @@ +package datadog.common.queue; + +import datadog.environment.JavaVirtualMachine; +import org.jctools.queues.MpscArrayQueue; +import org.jctools.queues.MpscBlockingConsumerArrayQueue; +import org.jctools.queues.SpmcArrayQueue; +import org.jctools.queues.SpscArrayQueue; + +/** + * A utility class for creating various high-performance queue implementations used for inter-thread + * communication. This class provides factory methods for creating non-blocking and + * partially-blocking queues optimized for different producer-consumer configurations. + * + *

Depending on the Java runtime version, this class will choose the most efficient + * implementation available: + * + *

    + *
  • For Java 9 and above, {@code VarHandle}-based queue implementations are used for improved + * performance and without relying on {@code sun.misc.Unsafe}. + *
  • For Java 8, {@code JCTools}-based wrappers are used instead. + *
+ */ +public final class Queues { + + private static final boolean CAN_USE_VARHANDLES = JavaVirtualMachine.isJavaVersionAtLeast(9); + + private Queues() {} + + /** + * Creates a Multiple Producer, Single Consumer (MPSC) array-backed queue. + * + * @param requestedCapacity the requested capacity of the queue. Will be rounded to the next power + * of two. + * @return a new {@link NonBlockingQueue} instance suitable for MPSC usage + */ + public static NonBlockingQueue mpscArrayQueue(int requestedCapacity) { + if (CAN_USE_VARHANDLES) { + return new MpscArrayQueueVarHandle<>(requestedCapacity); + } + return new JctoolsWrappedQueue<>(new MpscArrayQueue<>(requestedCapacity)); + } + + /** + * Creates a Single Producer, Multiple Consumer (SPMC) array-backed queue. + * + *

\ * @param requestedCapacity the requested capacity of the queue. Will be rounded to the + * next power of two. + * + * @return a new {@link NonBlockingQueue} instance suitable for SPMC usage + */ + public static NonBlockingQueue spmcArrayQueue(int requestedCapacity) { + if (CAN_USE_VARHANDLES) { + return new SpmcArrayQueueVarHandle<>(requestedCapacity); + } + return new JctoolsWrappedQueue<>(new SpmcArrayQueue<>(requestedCapacity)); + } + + /** + * Creates a Multiple Producer, Single Consumer (MPSC) array-backed queue that allows blocking + * behavior for the consumer. + * + * @param requestedCapacity the requested capacity of the queue. Will be rounded to the next power + * of two. + * @return a new {@link BlockingConsumerNonBlockingQueue} instance suitable for MPSC usage with + * blocking consumption + */ + public static BlockingConsumerNonBlockingQueue mpscBlockingConsumerArrayQueue( + int requestedCapacity) { + if (CAN_USE_VARHANDLES) { + return new MpscBlockingConsumerArrayQueueVarHandle<>(requestedCapacity); + } + return new JctoolsMpscBlockingConsumerWrappedQueue<>( + new MpscBlockingConsumerArrayQueue<>(requestedCapacity)); + } + + /** + * Creates a Single Producer, Single Consumer (SPSC) array-backed queue. + * + * @param requestedCapacity the requested capacity of the queue. Will be rounded to the next power + * of two. + * @return a new {@link NonBlockingQueue} instance suitable for SPSC usage + */ + public static NonBlockingQueue spscArrayQueue(int requestedCapacity) { + if (CAN_USE_VARHANDLES) { + return new SpscArrayQueueVarHandle<>(requestedCapacity); + } + return new JctoolsWrappedQueue<>(new SpscArrayQueue<>(requestedCapacity)); + } +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/SpmcArrayQueueVarHandle.java b/utils/queue-utils/src/main/java/datadog/common/queue/SpmcArrayQueueVarHandle.java new file mode 100644 index 00000000000..46d8c9914ad --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/SpmcArrayQueueVarHandle.java @@ -0,0 +1,110 @@ +package datadog.common.queue; + +import datadog.common.queue.padding.PaddedSequence; +import java.util.Objects; +import java.util.concurrent.locks.LockSupport; + +/** + * A Single-Producer, Multiple-Consumer (SPMC) bounded, lock-free queue based on a circular array. + * + *

All operations are wait-free for the single producer and lock-free for consumers. + * + * @param the element type + */ +final class SpmcArrayQueueVarHandle extends BaseQueue { + /** Cached consumer limit to avoid repeated volatile tail reads */ + private final PaddedSequence consumerLimit = new PaddedSequence(); + + /** + * Creates a new SPMC queue. + * + * @param requestedCapacity the desired capacity, rounded up to next power of two + */ + public SpmcArrayQueueVarHandle(int requestedCapacity) { + super(requestedCapacity); + } + + @Override + public boolean offer(E e) { + Objects.requireNonNull(e); + + long currentTail = tail.getVolatile(); + long wrapPoint = currentTail - capacity; + long currentHead = head.getVolatile(); + + if (wrapPoint >= currentHead) { + return false; // queue full + } + + int index = arrayIndex(currentTail); + + // Release-store ensures that the element is visible to consumers + ARRAY_HANDLE.setRelease(this.buffer, index, e); + + // Single-producer: simple volatile write to advance tail + tail.setVolatile(currentTail + 1); + return true; + } + + @Override + @SuppressWarnings("unchecked") + public E poll() { + final Object[] localBuffer = this.buffer; + + int spinCycles = 0; + boolean parkOnSpin = (Thread.currentThread().getId() & 1) == 0; + + while (true) { + long currentHead = head.getVolatile(); + long limit = consumerLimit.getVolatile(); // cached tail + + if (currentHead >= limit) { + // refresh limit once from tail volatile + limit = tail.getVolatile(); + if (currentHead >= limit) { + return null; // queue empty + } + consumerLimit.setVolatile(limit); // update local cache + } + + // Attempt to claim this slot + if (!head.compareAndSet(currentHead, currentHead + 1)) { + // CAS failed. Backoff to reduce contention + if ((spinCycles & 1) == 0) { + Thread.onSpinWait(); + } else { + if (parkOnSpin) { + LockSupport.parkNanos(1); + } else { + Thread.yield(); + } + } + spinCycles++; + continue; + } + + int index = arrayIndex(currentHead); + Object value; + + // Spin-wait until producer publishes + while ((value = ARRAY_HANDLE.getAcquire(localBuffer, index)) == null) { + Thread.onSpinWait(); + } + + // Clear slot for GC + ARRAY_HANDLE.setOpaque(localBuffer, index, null); + return (E) value; + } + } + + @Override + @SuppressWarnings("unchecked") + public E peek() { + long currentHead = head.getVolatile(); + long currentTail = tail.getVolatile(); + + if (currentHead >= currentTail) return null; + + return (E) ARRAY_HANDLE.getAcquire(buffer, currentHead); // acquire-load ensures visibility + } +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/SpscArrayQueueVarHandle.java b/utils/queue-utils/src/main/java/datadog/common/queue/SpscArrayQueueVarHandle.java new file mode 100644 index 00000000000..34462af8c99 --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/SpscArrayQueueVarHandle.java @@ -0,0 +1,69 @@ +package datadog.common.queue; + +import java.util.Objects; + +/** + * A high-performance Single-Producer, Single-Consumer (SPSC) bounded queue using a circular buffer. + * + * @param the type of elements held in this queue + */ +final class SpscArrayQueueVarHandle extends BaseQueue { + // These caches eliminate redundant volatile reads + private long cachedHead = 0L; // visible only to producer + private long cachedTail = 0L; // visible only to consumer + + /** + * Creates a new SPSC queue with the specified capacity. Capacity must be a power of two. + * + * @param requestedCapacity the desired capacity, rounded up to the next power of two if needed + */ + public SpscArrayQueueVarHandle(int requestedCapacity) { + super(requestedCapacity); + } + + @Override + public boolean offer(E e) { + Objects.requireNonNull(e); + + final long currentTail = tail.getOpaque(); + final int index = arrayIndex(currentTail); + + if (currentTail - cachedHead >= capacity) { + // Refresh cached head (read from consumer side) + cachedHead = (long) head.getVolatile(); + if (currentTail - cachedHead >= capacity) { + return false; // still full + } + } + + ARRAY_HANDLE.setRelease(buffer, index, e); // publish value + tail.setOpaque(currentTail + 1); // relaxed tail update + return true; + } + + @Override + @SuppressWarnings("unchecked") + public E poll() { + final long currentHead = head.getOpaque(); + final int index = arrayIndex(currentHead); + + if (currentHead >= cachedTail) { + // refresh tail cache + cachedTail = tail.getVolatile(); + if (currentHead >= cachedTail) { + return null; // still empty + } + } + + Object value = ARRAY_HANDLE.getAcquire(buffer, index); + ARRAY_HANDLE.setOpaque(buffer, index, null); // clear slot + head.setOpaque(currentHead + 1); // relaxed head update + return (E) value; + } + + @Override + @SuppressWarnings("unchecked") + public E peek() { + return (E) ARRAY_HANDLE.getVolatile(buffer, arrayIndex(head.getOpaque())); + } +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/padding/LhsPadding.java b/utils/queue-utils/src/main/java/datadog/common/queue/padding/LhsPadding.java new file mode 100644 index 00000000000..aa7a681728c --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/padding/LhsPadding.java @@ -0,0 +1,8 @@ +package datadog.common.queue.padding; + +/** Left-hand-side (LHS) padding to prevent false sharing. */ +class LhsPadding { + /** 7 bytes padding field to occupy space on the left side of the value. */ + @SuppressWarnings("unused") + private long p1, p2, p3, p4, p5, p6, p7; +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/padding/LongRhsPadding.java b/utils/queue-utils/src/main/java/datadog/common/queue/padding/LongRhsPadding.java new file mode 100644 index 00000000000..75fec8a656c --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/padding/LongRhsPadding.java @@ -0,0 +1,8 @@ +package datadog.common.queue.padding; + +/** Right-hand-side (RHS) padding to prevent false sharing. */ +class LongRhsPadding extends LongValue { + /** 7 bytes fields to occupy space on the right side of the value. */ + @SuppressWarnings("unused") + private long p9, p10, p11, p12, p13, p14, p15; +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/padding/LongValue.java b/utils/queue-utils/src/main/java/datadog/common/queue/padding/LongValue.java new file mode 100644 index 00000000000..b481cddc650 --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/padding/LongValue.java @@ -0,0 +1,10 @@ +package datadog.common.queue.padding; + +/** + * Holds the actual sequence value, padded on the left to prevent false sharing with preceding + * fields. + */ +class LongValue extends LhsPadding { + /** The volatile value being protected from false sharing. */ + protected volatile long value; +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/padding/PaddedSequence.java b/utils/queue-utils/src/main/java/datadog/common/queue/padding/PaddedSequence.java new file mode 100644 index 00000000000..45d0dfbdc6f --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/padding/PaddedSequence.java @@ -0,0 +1,173 @@ +package datadog.common.queue.padding; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; + +/** A padded, volatile long sequence value designed to minimize false sharing. */ +public final class PaddedSequence extends LongRhsPadding { + + /** VarHandle providing atomic access to the {@code value} field. */ + public static final VarHandle VALUE_HANDLE; + + static { + try { + VALUE_HANDLE = + MethodHandles.lookup().findVarHandle(PaddedSequence.class, "value", long.class); + } catch (Throwable t) { + throw new ExceptionInInitializerError(t); + } + } + + /** Creates a new {@code PaddedSequence} with initial value {@code 0}. */ + public PaddedSequence() { + this(0L); + } + + /** + * Creates a new {@code PaddedSequence} with the specified initial value. + * + * @param initialValue the initial value of the sequence + */ + public PaddedSequence(long initialValue) { + setPlain(initialValue); + } + + /** + * Returns the current value using a volatile read. + * + *

This provides full read visibility: all writes published via release/volatile stores by + * other threads are guaranteed visible to this load. + * + * @return the current sequence value + */ + public long getVolatile() { + return (long) VALUE_HANDLE.getVolatile(this); + } + + /** + * Returns the current value using acquire semantics. + * + *

This read guarantees that subsequent loads and stores cannot move before this operation. It + * is typically paired with {@code setRelease(long)} on the publishing side for efficient + * message-passing. + * + * @return the current sequence value with acquire ordering + */ + public long getAcquire() { + return (long) VALUE_HANDLE.getAcquire(this); + } + + /** + * Returns the current value using a plain (non-volatile) load. + * + *

No memory ordering or visibility guarantees are provided. Useful for single-threaded readers + * or cases where explicit fencing is used separately. + * + * @return the current sequence value with no memory barrier + */ + public long getPlain() { + return (long) VALUE_HANDLE.get(this); + } + + /** + * Returns the current value using an opaque read. + * + *

Opaque mode guarantees that the read is not reordered with respect to other opaque or + * stronger operations on the same variable, but allows the JVM considerable freedom to reorder + * relative to unrelated memory accesses. + * + *

It provides weaker semantics than acquire but stronger than a plain read, making it useful + * for performance-sensitive code paths where minimal ordering is required but visibility should + * not be completely relaxed. + * + * @return the current value using opaque semantics + */ + public long getOpaque() { + return (long) VALUE_HANDLE.getOpaque(this); + } + + /** + * Stores a new value using a volatile write. + * + *

This provides full write visibility and prevents reorderings of preceding + * operations. Equivalent to a traditional {@code volatile long} assignment. + * + * @param newValue the value to store + */ + public void setVolatile(long newValue) { + VALUE_HANDLE.setVolatile(this, newValue); + } + + /** + * Stores a new value using release semantics. + * + *

This guarantees that all prior writes cannot be reordered after this store, but does not + * impose the full cost of a volatile write. Functionally equivalent to {@code lazySet} or ordered + * write in {@code Unsafe}. + * + * @param newValue the value to store with release ordering + */ + public void setRelease(long newValue) { + VALUE_HANDLE.setRelease(this, newValue); + } + + /** + * Stores a value using a plain (non-volatile) write. + * + *

No ordering guarantees are provided. Useful for single-threaded initialization or when + * explicit fences are applied externally. + * + * @param newValue the value to store with no memory barrier + */ + public void setPlain(long newValue) { + VALUE_HANDLE.set(this, newValue); + } + + /** + * Stores a value using an opaque write. + * + *

Opaque mode prevents the write from being reordered with respect to other opaque or stronger + * operations on the same variable, but does not provide the ordering guarantees of release or + * volatile stores. + * + *

Opaque writes are typically used when the value does not participate in a strict + * happens-before relationship but still must not be fully reordered with respect to adjacent + * accesses to the same field. + * + * @param newValue the value to store using opaque semantics + */ + public void setOpaque(long newValue) { + VALUE_HANDLE.setOpaque(this, newValue); + } + + /** + * Atomically updates the value to {@code newValue} if the current value equals {@code expected}. + * + * @param expected the value the field must currently hold + * @param newValue the new value to set + * @return {@code true} on success, {@code false} otherwise + */ + public boolean compareAndSet(long expected, long newValue) { + return VALUE_HANDLE.compareAndSet(this, expected, newValue); + } + + /** + * Atomically updates the value to {@code newValue} if the current value equals {@code expected}, + * but may fail spuriously even if the current value equals {@code expected}. + * + *

This is a weak CAS operation. Weak CAS is intrinsified in modern JVMs and + * generally faster than strong CAS. It is suitable for usage inside CAS loops, where the loop + * retries until success. + * + *

Important: This method may return {@code false} spuriously, even if no + * other thread has modified the value. Do not rely on weak CAS for single-shot atomic + * updates that must succeed or fail deterministically. + * + * @param expected the value the field must currently hold + * @param newValue the new value to set + * @return {@code true} if the CAS succeeded, {@code false} otherwise (may be spurious) + */ + public boolean weakCompareAndSet(long expected, long newValue) { + return VALUE_HANDLE.weakCompareAndSet(this, expected, newValue); + } +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/padding/PaddedThread.java b/utils/queue-utils/src/main/java/datadog/common/queue/padding/PaddedThread.java new file mode 100644 index 00000000000..c64db4396a9 --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/padding/PaddedThread.java @@ -0,0 +1,59 @@ +package datadog.common.queue.padding; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; + +/** A padded, volatile Thread value designed to minimize false sharing. */ +public final class PaddedThread extends ThreadRhsPadding { + + /** VarHandle providing atomic access to the {@code value} field. */ + public static final VarHandle VALUE_HANDLE; + + static { + try { + VALUE_HANDLE = + MethodHandles.lookup().findVarHandle(PaddedThread.class, "value", Thread.class); + } catch (Throwable t) { + throw new ExceptionInInitializerError(t); + } + } + + /** + * Stores a value using an opaque write. + * + *

Opaque mode prevents the write from being reordered with respect to other opaque or stronger + * operations on the same variable, but does not provide the ordering guarantees of release or + * volatile stores. + * + *

Opaque writes are typically used when the value does not participate in a strict + * happens-before relationship but still must not be fully reordered with respect to adjacent + * accesses to the same field. + * + * @param newValue the value to store using opaque semantics + */ + public void setOpaque(Thread newValue) { + VALUE_HANDLE.setOpaque(this, newValue); + } + + /** + * Stores a new value using a volatile write. + * + *

This provides full write visibility and prevents reorderings of preceding + * operations. + * + * @param newValue the value to store + */ + public void setVolatile(Thread newValue) { + VALUE_HANDLE.setVolatile(this, newValue); + } + + /** + * Atomically sets the value to {@code newValue} and returns the previous value. + * + * @param newValue the value to set + * @return the previous value + */ + public Thread getAndSet(Thread newValue) { + return (Thread) VALUE_HANDLE.getAndSet(this, newValue); + } +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/padding/ThreadRhsPadding.java b/utils/queue-utils/src/main/java/datadog/common/queue/padding/ThreadRhsPadding.java new file mode 100644 index 00000000000..92da61dd0b9 --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/padding/ThreadRhsPadding.java @@ -0,0 +1,8 @@ +package datadog.common.queue.padding; + +/** Right-hand-side (RHS) padding to prevent false sharing. */ +public class ThreadRhsPadding extends ThreadValue { + /** 7 bytes fields to occupy space on the right side of the value. */ + @SuppressWarnings("unused") + private long p9, p10, p11, p12, p13, p14, p15; +} diff --git a/utils/queue-utils/src/main/java/datadog/common/queue/padding/ThreadValue.java b/utils/queue-utils/src/main/java/datadog/common/queue/padding/ThreadValue.java new file mode 100644 index 00000000000..bd7b6048167 --- /dev/null +++ b/utils/queue-utils/src/main/java/datadog/common/queue/padding/ThreadValue.java @@ -0,0 +1,9 @@ +package datadog.common.queue.padding; + +/** + * Holds the actual thread value, padded on the left to prevent false sharing with preceding fields. + */ +class ThreadValue extends LhsPadding { + /** A left padded {@link Thread} */ + protected volatile Thread value; +} diff --git a/utils/queue-utils/src/test/java/datadog/common/queue/AbstractQueueTest.groovy b/utils/queue-utils/src/test/java/datadog/common/queue/AbstractQueueTest.groovy new file mode 100644 index 00000000000..139f992ce86 --- /dev/null +++ b/utils/queue-utils/src/test/java/datadog/common/queue/AbstractQueueTest.groovy @@ -0,0 +1,116 @@ +package datadog.common.queue + + +import datadog.trace.test.util.DDSpecification +import java.util.function.Consumer + +abstract class AbstractQueueTest> extends DDSpecification { + abstract T createQueue(int capacity) + protected T queue = createQueue(8) + + def "offer and poll should preserve FIFO order"() { + when: + queue.offer(1) + queue.offer(2) + queue.offer(3) + + then: + queue.poll() == 1 + queue.poll() == 2 + queue.poll() == 3 + queue.poll() == null + } + + def "offer should return false when queue is full"() { + given: + queue.clear() + (1..8).each { queue.offer(it) } + + expect: + !queue.offer(999) + queue.size() == 8 + } + + def "peek should return head element without removing it"() { + given: + queue.clear() + queue.offer(10) + queue.offer(20) + + expect: + queue.peek() == 10 + queue.peek() == 10 + queue.size() == 2 + } + + def "poll should return null when empty"() { + given: + queue.clear() + + expect: + queue.poll() == null + } + + def "size should reflect current number of items"() { + when: + queue.clear() + queue.offer(1) + queue.offer(2) + + then: + queue.size() == 2 + + when: + queue.poll() + queue.poll() + + then: + queue.size() == 0 + } + + def "drain should consume all available elements"() { + given: + queue.clear() + (1..5).each { queue.offer(it) } + def drained = [] + + when: + def count = queue.drain({ drained << it } as Consumer) + + then: + count == 5 + drained == [1, 2, 3, 4, 5] + queue.isEmpty() + } + + def "drain with limit should only consume that many elements"() { + given: + queue.clear() + (1..6).each { queue.offer(it) } + def drained = [] + + when: + def count = queue.drain({ drained << it } as Consumer, 3) + + then: + count == 3 + drained == [1, 2, 3] + queue.size() == 3 + } + + def "remainingCapacity should reflect current occupancy"() { + given: + def q = createQueue(4) + q.offer(1) + q.offer(2) + + expect: + q.remainingCapacity() == 2 + + when: + q.poll() + + then: + q.remainingCapacity() == 3 + } +} diff --git a/utils/queue-utils/src/test/java/datadog/common/queue/JctoolsWrapppersTest.groovy b/utils/queue-utils/src/test/java/datadog/common/queue/JctoolsWrapppersTest.groovy new file mode 100644 index 00000000000..e763004c6e8 --- /dev/null +++ b/utils/queue-utils/src/test/java/datadog/common/queue/JctoolsWrapppersTest.groovy @@ -0,0 +1,42 @@ +package datadog.common.queue + +import datadog.trace.test.util.DDSpecification +import java.util.concurrent.TimeUnit +import java.util.function.Consumer +import java.util.function.Supplier +import org.jctools.queues.MpscBlockingConsumerArrayQueue + +class JctoolsWrapppersTest extends DDSpecification { + + def "should wrap the method #method to the jctools delegate #wrapperClass"() { + setup: + // will work for both wrapper classes + def delegate = Mock(MpscBlockingConsumerArrayQueue) + def queue = wrapperClass.newInstance(delegate) as NonBlockingQueue + + when: + queue.invokeMethod(method, args.toArray()) + + then: + 1 * delegate."$method"(*_) + + where: + method | args | wrapperClass + "poll" | [] | JctoolsWrappedQueue + "offer" | ["test"] | JctoolsWrappedQueue + "capacity" | [] | JctoolsWrappedQueue + "peek" | [] | JctoolsWrappedQueue + "drain" | [Mock(Consumer)] | JctoolsWrappedQueue + "drain" | [Mock(Consumer), 1] | JctoolsWrappedQueue + "fill" | [Mock(Supplier), 1] | JctoolsWrappedQueue + "poll" | [] | JctoolsMpscBlockingConsumerWrappedQueue + "offer" | ["test"] | JctoolsMpscBlockingConsumerWrappedQueue + "capacity" | [] | JctoolsMpscBlockingConsumerWrappedQueue + "peek" | [] | JctoolsMpscBlockingConsumerWrappedQueue + "drain" | [Mock(Consumer)] | JctoolsMpscBlockingConsumerWrappedQueue + "drain" | [Mock(Consumer), 1] | JctoolsMpscBlockingConsumerWrappedQueue + "fill" | [Mock(Supplier), 1] | JctoolsMpscBlockingConsumerWrappedQueue + "poll" | [1, TimeUnit.SECONDS] | JctoolsMpscBlockingConsumerWrappedQueue + "take" | [] | JctoolsMpscBlockingConsumerWrappedQueue + } +} diff --git a/utils/queue-utils/src/test/java/datadog/common/queue/MpscArrayQueueVarHandleTest.groovy b/utils/queue-utils/src/test/java/datadog/common/queue/MpscArrayQueueVarHandleTest.groovy new file mode 100644 index 00000000000..e8484492f28 --- /dev/null +++ b/utils/queue-utils/src/test/java/datadog/common/queue/MpscArrayQueueVarHandleTest.groovy @@ -0,0 +1,61 @@ +package datadog.common.queue + + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import spock.lang.Timeout + +class MpscArrayQueueVarHandleTest extends AbstractQueueTest { + + @Timeout(10) + def "multiple producers single consumer should consume all elements without duplication or loss"() { + given: + int total = 1000 + int producers = 4 + queue = new MpscArrayQueueVarHandle<>(1024) + def results = Collections.synchronizedList([]) + def executor = Executors.newFixedThreadPool(producers) + def latch = new CountDownLatch(producers) + def consumerDone = new CountDownLatch(1) + + when: "multiple producers enqueue concurrently" + (1..producers).each { id -> + executor.submit { + for (int i = 0; i < total / producers; i++) { + int value = (id * 10000) + i + while (!queue.offer(value)) { + Thread.yield() + } + } + latch.countDown() + } + } + + and: "a single consumer drains all elements" + Thread consumer = new Thread({ + while (results.size() < total) { + def v = queue.poll() + if (v != null) { + results << v + } else { + Thread.yield() + } + } + consumerDone.countDown() + }) + consumer.start() + + latch.await() + consumerDone.await() + executor.shutdown() + + then: + results.size() == total + results.toSet().size() == total // all unique + } + + @Override + MpscArrayQueueVarHandle createQueue(int capacity) { + return new MpscArrayQueueVarHandle(capacity) + } +} diff --git a/utils/queue-utils/src/test/java/datadog/common/queue/MpscBlockingConsumerArrayQueueVarHandleTest.groovy b/utils/queue-utils/src/test/java/datadog/common/queue/MpscBlockingConsumerArrayQueueVarHandleTest.groovy new file mode 100644 index 00000000000..9404a850da9 --- /dev/null +++ b/utils/queue-utils/src/test/java/datadog/common/queue/MpscBlockingConsumerArrayQueueVarHandleTest.groovy @@ -0,0 +1,170 @@ +package datadog.common.queue + +import static java.util.concurrent.TimeUnit.NANOSECONDS + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import java.util.function.Consumer +import java.util.function.Supplier +import spock.lang.Timeout + +class MpscBlockingConsumerArrayQueueVarHandleTest extends AbstractQueueTest> { + + @Override + MpscBlockingConsumerArrayQueueVarHandle createQueue(int capacity) { + return new MpscBlockingConsumerArrayQueueVarHandle(capacity) + } + + def "drain should consume all elements in order"() { + given: + queue.clear() + (1..5).each { queue.offer(it) } + def drained = [] + + when: + def count = queue.drain({ drained << it } as Consumer) + + then: + count == 5 + drained == [1, 2, 3, 4, 5] + queue.isEmpty() + } + + def "drain with limit should consume only limited number"() { + given: + queue.clear() + (1..6).each { queue.offer(it) } + def drained = [] + + when: + def count = queue.drain({ drained << it } as Consumer, 3) + + then: + count == 3 + drained == [1, 2, 3] + queue.size() == 3 + } + + @Timeout(10) + def "multiple producers single consumer should consume all elements without duplicates"() { + given: + int total = 1000 + int producers = 4 + queue = new MpscBlockingConsumerArrayQueueVarHandle<>(1024) + def results = Collections.synchronizedList([]) + def latch = new CountDownLatch(producers) + + when: + // Multiple producers + (1..producers).each { id -> + Thread.start { + for (int i = 0; i < total / producers; i++) { + int val = id * 10_000 + i + while (!queue.offer(val)) { + Thread.yield() + } + } + latch.countDown() + } + } + + // Single consumer + Thread consumer = Thread.start { + while (results.size() < total) { + def v = queue.poll() + if (v != null) { + results << v + } + else { + Thread.yield() + } + } + } + + latch.await() + consumer.join() + + then: + results.size() == total + results.toSet().size() == total // all unique + } + + def "blocking take should wake up when producer offers"() { + given: + queue = new MpscBlockingConsumerArrayQueueVarHandle<>(4) + def result = new AtomicReference<>() + + when: + Thread consumer = Thread.start { + try { + result.set(queue.take()) + } catch (InterruptedException ignored) { + } + } + Thread.sleep(100) + queue.offer(123) + consumer.join(1000) + + then: + result.get() == 123 + queue.isEmpty() + } + + def "fill inserts up to capacity"() { + given: + def counter = 0 + def supplier = { counter < 10 ? counter++ : null } as Supplier + + when: + def filled = queue.fill(supplier, 10) + + then: + filled == 8 + queue.size() == 8 + } + + def "poll with timeout returns null if no element becomes available"() { + when: + def start = System.nanoTime() + def value = queue.poll(200, TimeUnit.MILLISECONDS) + def elapsedMs = NANOSECONDS.toMillis(System.nanoTime() - start) + + then: + value == null + elapsedMs >= 200 // waited approximately the timeout + } + + def "poll with zero timeout behaves like immediate poll"() { + expect: + queue.poll(0, TimeUnit.MILLISECONDS) == null + + when: + queue.offer(99) + + then: + queue.poll(0, TimeUnit.MILLISECONDS) == 99 + } + + def "poll throws InterruptedException if interrupted"() { + given: + def thrown = new AtomicBoolean() + def thread = Thread.start { + try { + queue.poll(500, TimeUnit.MILLISECONDS) + } catch (InterruptedException ie) { + thrown.set(true) + Thread.currentThread().interrupt() + } + } + + when: + Thread.sleep(50) + thread.interrupt() + thread.join() + + then: + thrown.get() + } +} diff --git a/utils/queue-utils/src/test/java/datadog/common/queue/SpmcArrayQueueVarHandleTest.groovy b/utils/queue-utils/src/test/java/datadog/common/queue/SpmcArrayQueueVarHandleTest.groovy new file mode 100644 index 00000000000..996d9c42d95 --- /dev/null +++ b/utils/queue-utils/src/test/java/datadog/common/queue/SpmcArrayQueueVarHandleTest.groovy @@ -0,0 +1,59 @@ +package datadog.common.queue + + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import spock.lang.Timeout + +class SpmcArrayQueueVarHandleTest extends AbstractQueueTest> { + + @Override + SpmcArrayQueueVarHandle createQueue(int capacity) { + return new SpmcArrayQueueVarHandle(capacity) + } + + @Timeout(10) + def "single producer multiple consumers should consume all elements without duplication or loss"() { + given: + int total = 1000 + int consumers = 4 + queue = new SpmcArrayQueueVarHandle<>(1024) + def results = Collections.synchronizedList([]) + def executor = Executors.newFixedThreadPool(consumers) + def latch = new CountDownLatch(consumers) + + when: "one producer fills the queue" + Thread producer = new Thread({ + for (int i = 0; i < total; i++) { + while (!queue.offer(i)) { + Thread.yield() + } + } + }) + producer.start() + + and: "multiple consumers drain concurrently" + (1..consumers).each { + executor.submit { + while (results.size() < total) { + def v = queue.poll() + if (v != null) { + results << v + } else { + Thread.yield() + } + } + latch.countDown() + } + } + + latch.await() + producer.join() + executor.shutdown() + + then: + results.size() == total + results.toSet().size() == total // no duplicates + results.containsAll((0..> { + + def "single producer single consumer concurrency"() { + given: + def queue = new SpscArrayQueueVarHandle(1024) + def producerCount = 1000 + def consumed = new AtomicInteger(0) + def consumedValues = [] + + def producer = Thread.start { + (1..producerCount).each { queue.offer(it) } + } + + def consumer = Thread.start { + while (consumed.get() < producerCount) { + def v = queue.poll() + if (v != null) { + consumedValues << v + consumed.incrementAndGet() + } + } + } + + when: + producer.join() + consumer.join() + + then: + consumed.get() == producerCount + consumedValues.toSet().size() == producerCount // all values unique + } + + @Override + SpscArrayQueueVarHandle createQueue(int capacity) { + return new SpscArrayQueueVarHandle(capacity) + } +}