-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add channel benchmarks #4546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
murfel
wants to merge
12
commits into
develop
Choose a base branch
from
channel-benchmarks
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Add channel benchmarks #4546
Changes from 1 commit
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
c5c72a3
add benchmarks
murfel 37be033
avoid allocating (avoid list.take(count))
murfel 0383992
rename send to sendManyItems to avoid false hinting at Channel.send
murfel 33f9e72
make senders+receivers sum up to cores == 4
murfel bf9af79
add prefill
murfel c406fbd
remove Dispatchers.Default, it's it the runSendReceive already
murfel 8425561
sendManyItems simplify and comment
murfel be3f273
replace consume with receive
murfel b5c7bf7
change units from ms to ns
murfel f82a493
fix
murfel 939c1d5
attempt to align comment on top of @Param
murfel 4647c52
align another entry
murfel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
104 changes: 104 additions & 0 deletions
104
benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| package benchmarks | ||
|
|
||
| import kotlinx.coroutines.* | ||
| import kotlinx.coroutines.channels.* | ||
| import org.openjdk.jmh.annotations.* | ||
| import java.util.concurrent.* | ||
|
|
||
| @Warmup(iterations = 7, time = 1) | ||
| @Measurement(iterations = 10, time = 1) | ||
| @BenchmarkMode(Mode.AverageTime) | ||
| @OutputTimeUnit(TimeUnit.MILLISECONDS) | ||
| @State(Scope.Benchmark) | ||
| @Fork(1) | ||
| open class ChannelBenchmark { | ||
| // max coroutines launched per benchmark | ||
| // to allow for true parallelism | ||
| val cores = 4 | ||
murfel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // 4 KB, 40 KB, 400 KB, 4 MB, 40 MB, 400 MB | ||
murfel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| @Param("1000", "10000", "100000", "1000000", "10000000", "100000000") | ||
| var count: Int = 0 | ||
|
|
||
| // 1. Preallocate. | ||
| // 2. Different values to avoid helping the cache. | ||
| val list = ArrayList<Int>(100000000).apply { | ||
| repeat(100000000) { add(it) } | ||
| } | ||
|
|
||
| @Benchmark | ||
| fun sendUnlimited() = runBlocking { | ||
| runSend(count, Channel.UNLIMITED) | ||
| } | ||
|
|
||
| @Benchmark | ||
| fun sendConflated() = runBlocking { | ||
| runSend(count, Channel.CONFLATED) | ||
| } | ||
|
|
||
| @Benchmark | ||
| fun sendReceiveUnlimited() = runBlocking(Dispatchers.Default) { | ||
| runSendReceive(count, Channel.UNLIMITED) | ||
| } | ||
|
|
||
| @Benchmark | ||
| fun sendReceiveConflated() = runBlocking(Dispatchers.Default) { | ||
| runSendReceive(count, Channel.CONFLATED) | ||
| } | ||
|
|
||
| @Benchmark | ||
| fun sendReceiveRendezvous() = runBlocking(Dispatchers.Default) { | ||
| // NB: Rendezvous is partly benchmarking the scheduler, not the channel alone. | ||
| // So don't trust the Rendezvous results too much. | ||
| runSendReceive(count, Channel.RENDEZVOUS) | ||
| } | ||
|
|
||
| @Benchmark | ||
| fun oneSenderManyReceivers() = runBlocking { | ||
| runSendReceive(count, Channel.UNLIMITED, 1, cores) | ||
| } | ||
|
|
||
| @Benchmark | ||
| fun manySendersOneReceiver() = runBlocking { | ||
| runSendReceive(count, Channel.UNLIMITED, cores, 1) | ||
| } | ||
|
|
||
| @Benchmark | ||
| fun manySendersManyReceivers() = runBlocking { | ||
| runSendReceive(count, Channel.UNLIMITED, cores / 2, cores / 2) | ||
| } | ||
|
|
||
| private suspend fun send(count: Int, channel: Channel<Int>) = coroutineScope { | ||
| list.take(count).forEach { channel.send(it) } | ||
|
||
| } | ||
|
|
||
| private suspend fun runSend(count: Int, capacity: Int) { | ||
| Channel<Int>(capacity).also { | ||
| send(count, it) | ||
| } | ||
| } | ||
|
|
||
| // NB: not all parameter combinations make sense in general. | ||
| // E.g., for the rendezvous channel, senders should be equal to receivers. | ||
| // If they are non-equal, it's a special case of performance under contention. | ||
| private suspend inline fun runSendReceive(count: Int, capacity: Int, senders: Int = 1, receivers: Int = 1) = | ||
| withContext(Dispatchers.Default) { | ||
| require(senders > 0 && receivers > 0) | ||
|
|
||
| val channel = Channel<Int>(capacity) | ||
| repeat(receivers) { | ||
| launch { | ||
| channel.consumeEach { } | ||
| } | ||
| } | ||
|
|
||
| coroutineScope { | ||
| repeat(senders) { | ||
| launch { | ||
| send(count / senders, channel) | ||
| } | ||
| } | ||
| } | ||
| channel.close() | ||
| } | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate what exactly you're trying to measure using these benchmarks?
Right now, it looks like "time required to create a new channel, send N messages into it (and, optionally, receive them), and then close the channel". However, I thought that initial idea was to measure the latency of sending (and receiving) a single message into the channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do measure that, indirectly. Do you suggest to literally only send/receive one message per benchmark? Is that reliable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Direct measurements are always better than indirect. If the goal is to measure send/recv timing, let's measure it.
What makes you think it will be unreliable?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not always. See below. Also depends on how you define "better".
Again, I am measuring that. My way of measuring is a valid way of measuring. Having to assert that makes me feel dismissed.
We can explore other ways to measure, for sure.
What setup did you have in mind, something like this?
Or this? (no suspension, trySend/tryReceive)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The internal structure of the channel may be worth taking into account. For a prefilled channel with 32+ elements (32 is the default channel segment size), we can expect
sendandreceivenot to interact with one another at all, that is, the duration ofsendfollowed byreceiveshould be roughly the sum of durations ofsendandreceive, invoked independently. I imaginewrapper.channel.send(42)andblackhole.consume(wrapper.channel.receive())could stay in different benchmarks without affecting the results too much.For an empty channel, we could also try racing
sendandreceive.Using
runBlockingin a benchmark that's only doingsenddoesn't seem optimal to me, I can imagine the run time getting dominated by therunBlockingmachinery. I don't know what the proper way of doing this in JMH is, but I'd try a scheme like this:(haven't actually tested the code). Then, the scheme would be:
@fzhinkin , is there a standard mechanism that encapsulates this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Running them in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, if our goal is to measure a latency of a certain operation and we have facilities to do so, then it's better to do it directly (to an extent, benchmark's results are averages anyway). By doing so, we can ensure that all unnecessary setup and teardown code (like, creating a channel) won't skew results.
On the other hand, if the goal is to measure end-to-end latency, like "time to create a channel and send 100k messages over it", then sure, the current approach works for that (moreover, I don't see how to measure it otherwise).
See the comment, above. I was under the impression that the typical use case for channel is to be used indirectly (within a flow, for example), so for channels as they are we decided to measure a latency of a single operation to see how it will be affected by potential changes in the implementation.
I'm not saying that the way you're measuring it is invalid, but if there are facilities to measure latency of a single operation (well, the send-receive pair of operations), I'm voting for using it (unless there is an evidence that such a measurement is impossible or makes no sense).
Setup (and teardown) actions performed before (after) the whole run (or an individual iteration) should not affect measurements (as they are performed outside of the measurement scope); it will affect the measurements when performed for each benchmark function invocation.
I'm not sure if sending 400MB of data is a typical usage either. ;)
The benchmark function is continuously invoked over a configured period of time (you set it to 1 second).
If we reuse the same channel in each invocation, results will average over data structure amortization.
It's easier to focus on memory footprint as it is something we control directly (how many bytes we're allocating when performing an operation), rather than on GC pauses (they are a subject to various factors).
Both approaches look sane (assuming the wrapper is not recreated for every benchmark call) and we can do both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dkhalanskyjb, it feels like I didn't get you, but nevertheless: JMH provides some facilities to running benchmark methods concurrently and synchronize their execution:
https://github.com/openjdk/jmh/blob/master/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_15_Asymmetric.java
https://github.com/openjdk/jmh/blob/master/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_17_SyncIterations.java
As of
runBlocking, it would be nice to have a kx-benchmarks maintainer here, who would solve a problem with benchmarking suspend-API for us. Oh, wait... 😄There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, but that's because I had the wrong assumptions. I hadn't realized one important aspect:
My goal was to reduce the influence of
runBlockingon the performance, and to that end, I wanted to spawn the actual computation in a separate thread beforehand. In the benchmark itself, instead of wasting time on scheduling a computation, initializing and deinitializing structured concurrency, etc., it would only send a signal "you can actually start now" and then wait for the computation to signal "okay, done".This is all moot if the preparatory work in
@Setup(Level.Invocation)objects is included in the time measurements. It's not viable to start as many threads as there are computations beforehand.