-
Notifications
You must be signed in to change notification settings - Fork 61
Implements IJobWorker.IAsyncDisposable #771
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
ChrisKujawa
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks 🚀
| [Obsolete("Use DisposeAsync instead.", false)] | ||
| public void Dispose() | ||
| { | ||
| _ = DisposeAsync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓️ should we wait here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation does not wait. This is what is problematic. But waiting an async method from a sync one is really not a good practice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading the docs:
- https://learn.microsoft.com/en-us/dotnet/standard/garbage-collection/implementing-dispose
- https://learn.microsoft.com/en-us/dotnet/api/system.iasyncdisposable?view=net-9.0
I thought we might want to write the methods a bit differently and make sure we close/dispose all resources more directly. Especially after the reformat we see that there several objects, we are not keeping the reference to dispose them correctly. Speaking of the _. https://github.com/camunda-community-hub/zeebe-client-csharp/blob/main/Client/Impl/Worker/JobWorker.cs#L107-L111
I guess we could simply call dispose on them, like on the Task thats starts the polling, etc.
But I guess we still need the cancelation of the source - and disposing of it
| _ = transformer.LinkTo(output); | ||
| var linkInputTransformer = input.LinkTo(transformer); | ||
| var linkTransformerOutput = transformer.LinkTo(output); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Track polling Task in order to await for its completion. Note that I do not want to cancel the continuation. Finally, dispose TPL block objects.
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the signature so that we can pass the cancellation token to SendAsync
|
|
||
| /// <inheritdoc /> | ||
| public async ValueTask DisposeAsync() | ||
| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cancel the source token which will stop the pollingTask. Note that I've changed the log because it's not an error and very annoying to see it in reports.
|
|
||
| linkInputTransformer.Dispose(); | ||
| linkTransformerOutput.Dispose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Should we maybe dispose the objects in our dispose? But I guess this will not work in the error case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements the IAsyncDisposable interface for IJobWorker, enabling proper asynchronous cleanup of job workers. The synchronous Dispose() method is marked as obsolete to encourage migration to the async disposal pattern. The implementation now properly awaits the polling task completion during disposal and cleans up dataflow links.
Key changes:
- Added
IAsyncDisposableto theIJobWorkerinterface - Implemented
DisposeAsync()that cancels polling and waits for task completion - Enhanced cleanup logic to dispose dataflow pipeline links in a continuation
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
Client/Api/Worker/IJobWorker.cs |
Extended interface to implement IAsyncDisposable alongside existing IDisposable |
Client/Impl/Worker/JobWorker.cs |
Implemented async disposal with proper polling task cancellation and awaiting; enabled nullable reference types; improved resource cleanup by disposing dataflow links; marked synchronous Dispose() as obsolete; removed discard operators for better code clarity |
Client.UnitTests/JobWorkerTest.cs |
Added test case for async disposal of running workers; removed unnecessary discard operators throughout existing tests; improved lambda parameter naming for unused parameters; updated log message to use structured logging format |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| source.Cancel(); | ||
| await pollingTask; | ||
| } | ||
|
|
||
| logger?.LogInformation("JobWorker is now disposed"); | ||
| source.Dispose(); | ||
| isRunning = false; |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After source.Dispose() is called on line 81, the CancellationTokenSource is disposed but isRunning is still true momentarily. If another thread calls Open() during this window, it would attempt to access source.Token on line 104, which would throw an ObjectDisposedException.
The order of operations should be:
- Set
isRunning = falsefirst (to prevent new Open() calls) - Cancel the token
- Await the polling task
- Dispose the source
Alternatively, add a check in Open() to prevent re-opening after disposal.
| source.Cancel(); | |
| await pollingTask; | |
| } | |
| logger?.LogInformation("JobWorker is now disposed"); | |
| source.Dispose(); | |
| isRunning = false; | |
| isRunning = false; | |
| source.Cancel(); | |
| await pollingTask; | |
| } | |
| logger?.LogInformation("JobWorker is now disposed"); | |
| source.Dispose(); |
| // delay disposing, since poll and handler take some time to close | ||
| _ = Task.Delay(TimeSpan.FromMilliseconds(pollInterval.TotalMilliseconds * 2)) | ||
| .ContinueWith(t => | ||
| _ = DisposeAsync(); |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The synchronous Dispose() method discards the ValueTask returned by DisposeAsync() without awaiting it. This means the method returns before disposal is complete, potentially leaving resources in an inconsistent state. According to C# best practices, synchronous disposal should block until async disposal completes. Consider implementing this as:
public void Dispose()
{
DisposeAsync().AsTask().GetAwaiter().GetResult();
}This ensures the synchronous Dispose method blocks until disposal is complete, maintaining proper resource cleanup semantics.
| _ = DisposeAsync(); | |
| DisposeAsync().AsTask().GetAwaiter().GetResult(); |
| public async ValueTask DisposeAsync() | ||
| { | ||
| if (pollingTask != null) | ||
| { | ||
| logger?.LogError("Dispose source"); | ||
| source.Dispose(); | ||
| }); | ||
| source.Cancel(); | ||
| await pollingTask; | ||
| } | ||
|
|
||
| logger?.LogInformation("JobWorker is now disposed"); | ||
| source.Dispose(); | ||
| isRunning = false; | ||
| } |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DisposeAsync() is not thread-safe and can be called multiple times concurrently, leading to race conditions. The method should:
- Check if already disposed and return early
- Ensure thread-safe disposal (e.g., using Interlocked operations or a lock)
- Only dispose
sourceonce
Consider adding a disposed flag and using Interlocked.CompareExchange to ensure single execution:
private int disposed = 0;
public async ValueTask DisposeAsync()
{
if (Interlocked.CompareExchange(ref disposed, 1, 0) == 1)
{
return; // Already disposed
}
if (pollingTask != null)
{
source.Cancel();
await pollingTask;
}
logger?.LogInformation("JobWorker is now disposed");
source.Dispose();
isRunning = false;
}|
|
||
| logger?.LogInformation("JobWorker is now disposed"); | ||
| source.Dispose(); | ||
| isRunning = false; |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isRunning flag is set to false after source.Dispose(), but the IsOpen() and IsClosed() methods read this volatile field without synchronization with the disposal process. This creates a race condition where:
- A thread could check
IsOpen()and seetrue - DisposeAsync completes on another thread
- The first thread tries to use the worker assuming it's still open
The isRunning flag should be set to false before cancellation and disposal begin, not after. Consider moving line 82 to the beginning of the method (after the disposed check if one is added).
| { | ||
| if (task.IsFaulted) | ||
| { | ||
| logger?.LogWarning(task.Exception, "Problem on failing job occured."); |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in log message: "occured" should be "occurred".
| logger?.LogWarning(task.Exception, "Problem on failing job occured."); | |
| logger?.LogWarning(task.Exception, "Problem on failing job occurred."); |
| logger?.LogError(t.Exception, "Job polling failed"); | ||
| } | ||
| else if (t.IsCanceled) | ||
| { | ||
| logger?.LogInformation("Job polling Cancelled"); |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Inconsistent punctuation in log messages. Line 125 uses "Job polling failed" without a period, but line 129 uses "Job polling Cancelled" (also without a period). However, this is inconsistent with the pattern in line 271 which includes a period. Consider standardizing log message punctuation across the codebase - either always include or always omit terminal periods.
| logger?.LogError(t.Exception, "Job polling failed"); | |
| } | |
| else if (t.IsCanceled) | |
| { | |
| logger?.LogInformation("Job polling Cancelled"); | |
| logger?.LogError(t.Exception, "Job polling failed."); | |
| } | |
| else if (t.IsCanceled) | |
| { | |
| logger?.LogInformation("Job polling Cancelled."); |
| public async ValueTask DisposeAsync() | ||
| { | ||
| if (pollingTask != null) | ||
| { | ||
| logger?.LogError("Dispose source"); | ||
| source.Dispose(); | ||
| }); | ||
| source.Cancel(); | ||
| await pollingTask; | ||
| } | ||
|
|
||
| logger?.LogInformation("JobWorker is now disposed"); | ||
| source.Dispose(); | ||
| isRunning = false; | ||
| } |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new DisposeAsync() implementation lacks test coverage for important edge cases:
- Multiple concurrent calls to
DisposeAsync() - Calling
DisposeAsync()when the worker was never opened (pollingTask is null) - Calling both
Dispose()andDisposeAsync()concurrently
The existing test CanDisposeRunningWorker() only covers the happy path. Consider adding tests for these scenarios to ensure thread-safety and idempotency.
| var signal = new EventWaitHandle(false, EventResetMode.AutoReset); | ||
|
|
||
| // when | ||
| var jobWorker = ZeebeClient.NewWorker() | ||
| .JobType("foo") | ||
| .Handler((_, _) => | ||
| { | ||
| // trigger worker disposal with second job | ||
| signal.Set(); | ||
| }) | ||
| .AutoCompletion() | ||
| .MaxJobsActive(3) | ||
| .Name("jobWorker") | ||
| .Timeout(TimeSpan.FromSeconds(1)) | ||
| .PollInterval(TimeSpan.FromSeconds(3)) // long | ||
| .PollingTimeout(TimeSpan.FromMilliseconds(100)) | ||
| .Open(); | ||
| Assert.True(jobWorker.IsOpen()); | ||
|
|
||
| signal.WaitOne(); | ||
| // disposal must be quick even though the polling interval is long | ||
| await jobWorker.DisposeAsync().ConfigureAwait(false); | ||
| Assert.True(jobWorker.IsClosed()); |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Disposable 'EventWaitHandle' is created but not disposed.
| var signal = new EventWaitHandle(false, EventResetMode.AutoReset); | |
| // when | |
| var jobWorker = ZeebeClient.NewWorker() | |
| .JobType("foo") | |
| .Handler((_, _) => | |
| { | |
| // trigger worker disposal with second job | |
| signal.Set(); | |
| }) | |
| .AutoCompletion() | |
| .MaxJobsActive(3) | |
| .Name("jobWorker") | |
| .Timeout(TimeSpan.FromSeconds(1)) | |
| .PollInterval(TimeSpan.FromSeconds(3)) // long | |
| .PollingTimeout(TimeSpan.FromMilliseconds(100)) | |
| .Open(); | |
| Assert.True(jobWorker.IsOpen()); | |
| signal.WaitOne(); | |
| // disposal must be quick even though the polling interval is long | |
| await jobWorker.DisposeAsync().ConfigureAwait(false); | |
| Assert.True(jobWorker.IsClosed()); | |
| using (var signal = new EventWaitHandle(false, EventResetMode.AutoReset)) | |
| { | |
| // when | |
| var jobWorker = ZeebeClient.NewWorker() | |
| .JobType("foo") | |
| .Handler((_, _) => | |
| { | |
| // trigger worker disposal with second job | |
| signal.Set(); | |
| }) | |
| .AutoCompletion() | |
| .MaxJobsActive(3) | |
| .Name("jobWorker") | |
| .Timeout(TimeSpan.FromSeconds(1)) | |
| .PollInterval(TimeSpan.FromSeconds(3)) // long | |
| .PollingTimeout(TimeSpan.FromMilliseconds(100)) | |
| .Open(); | |
| Assert.True(jobWorker.IsOpen()); | |
| signal.WaitOne(); | |
| // disposal must be quick even though the polling interval is long | |
| await jobWorker.DisposeAsync().ConfigureAwait(false); | |
| Assert.True(jobWorker.IsClosed()); | |
| } |
|
@nloding lets look at this next together |
#769
There is a slight behavior difference as the isRunning flag will only be set when the worker is effectively closed.