Skip to content

Conversation

@xlegalles
Copy link
Contributor

@xlegalles xlegalles commented Feb 10, 2025

#769
There is a slight behavior difference as the isRunning flag will only be set when the worker is effectively closed.

Copy link
Collaborator

@ChrisKujawa ChrisKujawa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 🚀

[Obsolete("Use DisposeAsync instead.", false)]
public void Dispose()
{
_ = DisposeAsync();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓️ should we wait here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation does not wait. This is what is problematic. But waiting an async method from a sync one is really not a good practice

Copy link
Collaborator

@ChrisKujawa ChrisKujawa Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the docs:

I thought we might want to write the methods a bit differently and make sure we close/dispose all resources more directly. Especially after the reformat we see that there several objects, we are not keeping the reference to dispose them correctly. Speaking of the _. https://github.com/camunda-community-hub/zeebe-client-csharp/blob/main/Client/Impl/Worker/JobWorker.cs#L107-L111

I guess we could simply call dispose on them, like on the Task thats starts the polling, etc.

But I guess we still need the cancelation of the source - and disposing of it

_ = transformer.LinkTo(output);
var linkInputTransformer = input.LinkTo(transformer);
var linkTransformerOutput = transformer.LinkTo(output);

Copy link
Contributor Author

@xlegalles xlegalles Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Track polling Task in order to await for its completion. Note that I do not want to cancel the continuation. Finally, dispose TPL block objects.

}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the signature so that we can pass the cancellation token to SendAsync


/// <inheritdoc />
public async ValueTask DisposeAsync()
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancel the source token which will stop the pollingTask. Note that I've changed the log because it's not an error and very annoying to see it in reports.

Comment on lines +131 to +133

linkInputTransformer.Dispose();
linkTransformerOutput.Dispose();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Should we maybe dispose the objects in our dispose? But I guess this will not work in the error case.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements the IAsyncDisposable interface for IJobWorker, enabling proper asynchronous cleanup of job workers. The synchronous Dispose() method is marked as obsolete to encourage migration to the async disposal pattern. The implementation now properly awaits the polling task completion during disposal and cleans up dataflow links.

Key changes:

  • Added IAsyncDisposable to the IJobWorker interface
  • Implemented DisposeAsync() that cancels polling and waits for task completion
  • Enhanced cleanup logic to dispose dataflow pipeline links in a continuation

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.

File Description
Client/Api/Worker/IJobWorker.cs Extended interface to implement IAsyncDisposable alongside existing IDisposable
Client/Impl/Worker/JobWorker.cs Implemented async disposal with proper polling task cancellation and awaiting; enabled nullable reference types; improved resource cleanup by disposing dataflow links; marked synchronous Dispose() as obsolete; removed discard operators for better code clarity
Client.UnitTests/JobWorkerTest.cs Added test case for async disposal of running workers; removed unnecessary discard operators throughout existing tests; improved lambda parameter naming for unused parameters; updated log message to use structured logging format

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +76 to 82
source.Cancel();
await pollingTask;
}

logger?.LogInformation("JobWorker is now disposed");
source.Dispose();
isRunning = false;
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After source.Dispose() is called on line 81, the CancellationTokenSource is disposed but isRunning is still true momentarily. If another thread calls Open() during this window, it would attempt to access source.Token on line 104, which would throw an ObjectDisposedException.

The order of operations should be:

  1. Set isRunning = false first (to prevent new Open() calls)
  2. Cancel the token
  3. Await the polling task
  4. Dispose the source

Alternatively, add a check in Open() to prevent re-opening after disposal.

Suggested change
source.Cancel();
await pollingTask;
}
logger?.LogInformation("JobWorker is now disposed");
source.Dispose();
isRunning = false;
isRunning = false;
source.Cancel();
await pollingTask;
}
logger?.LogInformation("JobWorker is now disposed");
source.Dispose();

Copilot uses AI. Check for mistakes.
// delay disposing, since poll and handler take some time to close
_ = Task.Delay(TimeSpan.FromMilliseconds(pollInterval.TotalMilliseconds * 2))
.ContinueWith(t =>
_ = DisposeAsync();
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronous Dispose() method discards the ValueTask returned by DisposeAsync() without awaiting it. This means the method returns before disposal is complete, potentially leaving resources in an inconsistent state. According to C# best practices, synchronous disposal should block until async disposal completes. Consider implementing this as:

public void Dispose()
{
    DisposeAsync().AsTask().GetAwaiter().GetResult();
}

This ensures the synchronous Dispose method blocks until disposal is complete, maintaining proper resource cleanup semantics.

Suggested change
_ = DisposeAsync();
DisposeAsync().AsTask().GetAwaiter().GetResult();

Copilot uses AI. Check for mistakes.
Comment on lines +72 to 83
public async ValueTask DisposeAsync()
{
if (pollingTask != null)
{
logger?.LogError("Dispose source");
source.Dispose();
});
source.Cancel();
await pollingTask;
}

logger?.LogInformation("JobWorker is now disposed");
source.Dispose();
isRunning = false;
}
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DisposeAsync() is not thread-safe and can be called multiple times concurrently, leading to race conditions. The method should:

  1. Check if already disposed and return early
  2. Ensure thread-safe disposal (e.g., using Interlocked operations or a lock)
  3. Only dispose source once

Consider adding a disposed flag and using Interlocked.CompareExchange to ensure single execution:

private int disposed = 0;

public async ValueTask DisposeAsync()
{
    if (Interlocked.CompareExchange(ref disposed, 1, 0) == 1)
    {
        return; // Already disposed
    }
    
    if (pollingTask != null)
    {
        source.Cancel();
        await pollingTask;
    }

    logger?.LogInformation("JobWorker is now disposed");
    source.Dispose();
    isRunning = false;
}

Copilot uses AI. Check for mistakes.

logger?.LogInformation("JobWorker is now disposed");
source.Dispose();
isRunning = false;
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isRunning flag is set to false after source.Dispose(), but the IsOpen() and IsClosed() methods read this volatile field without synchronization with the disposal process. This creates a race condition where:

  1. A thread could check IsOpen() and see true
  2. DisposeAsync completes on another thread
  3. The first thread tries to use the worker assuming it's still open

The isRunning flag should be set to false before cancellation and disposal begin, not after. Consider moving line 82 to the beginning of the method (after the disposed check if one is added).

Copilot uses AI. Check for mistakes.
{
if (task.IsFaulted)
{
logger?.LogWarning(task.Exception, "Problem on failing job occured.");
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in log message: "occured" should be "occurred".

Suggested change
logger?.LogWarning(task.Exception, "Problem on failing job occured.");
logger?.LogWarning(task.Exception, "Problem on failing job occurred.");

Copilot uses AI. Check for mistakes.
Comment on lines +125 to +129
logger?.LogError(t.Exception, "Job polling failed");
}
else if (t.IsCanceled)
{
logger?.LogInformation("Job polling Cancelled");
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Inconsistent punctuation in log messages. Line 125 uses "Job polling failed" without a period, but line 129 uses "Job polling Cancelled" (also without a period). However, this is inconsistent with the pattern in line 271 which includes a period. Consider standardizing log message punctuation across the codebase - either always include or always omit terminal periods.

Suggested change
logger?.LogError(t.Exception, "Job polling failed");
}
else if (t.IsCanceled)
{
logger?.LogInformation("Job polling Cancelled");
logger?.LogError(t.Exception, "Job polling failed.");
}
else if (t.IsCanceled)
{
logger?.LogInformation("Job polling Cancelled.");

Copilot uses AI. Check for mistakes.
Comment on lines +72 to 83
public async ValueTask DisposeAsync()
{
if (pollingTask != null)
{
logger?.LogError("Dispose source");
source.Dispose();
});
source.Cancel();
await pollingTask;
}

logger?.LogInformation("JobWorker is now disposed");
source.Dispose();
isRunning = false;
}
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new DisposeAsync() implementation lacks test coverage for important edge cases:

  1. Multiple concurrent calls to DisposeAsync()
  2. Calling DisposeAsync() when the worker was never opened (pollingTask is null)
  3. Calling both Dispose() and DisposeAsync() concurrently

The existing test CanDisposeRunningWorker() only covers the happy path. Consider adding tests for these scenarios to ensure thread-safety and idempotency.

Copilot uses AI. Check for mistakes.
Comment on lines +670 to +692
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);

// when
var jobWorker = ZeebeClient.NewWorker()
.JobType("foo")
.Handler((_, _) =>
{
// trigger worker disposal with second job
signal.Set();
})
.AutoCompletion()
.MaxJobsActive(3)
.Name("jobWorker")
.Timeout(TimeSpan.FromSeconds(1))
.PollInterval(TimeSpan.FromSeconds(3)) // long
.PollingTimeout(TimeSpan.FromMilliseconds(100))
.Open();
Assert.True(jobWorker.IsOpen());

signal.WaitOne();
// disposal must be quick even though the polling interval is long
await jobWorker.DisposeAsync().ConfigureAwait(false);
Assert.True(jobWorker.IsClosed());
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disposable 'EventWaitHandle' is created but not disposed.

Suggested change
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
// when
var jobWorker = ZeebeClient.NewWorker()
.JobType("foo")
.Handler((_, _) =>
{
// trigger worker disposal with second job
signal.Set();
})
.AutoCompletion()
.MaxJobsActive(3)
.Name("jobWorker")
.Timeout(TimeSpan.FromSeconds(1))
.PollInterval(TimeSpan.FromSeconds(3)) // long
.PollingTimeout(TimeSpan.FromMilliseconds(100))
.Open();
Assert.True(jobWorker.IsOpen());
signal.WaitOne();
// disposal must be quick even though the polling interval is long
await jobWorker.DisposeAsync().ConfigureAwait(false);
Assert.True(jobWorker.IsClosed());
using (var signal = new EventWaitHandle(false, EventResetMode.AutoReset))
{
// when
var jobWorker = ZeebeClient.NewWorker()
.JobType("foo")
.Handler((_, _) =>
{
// trigger worker disposal with second job
signal.Set();
})
.AutoCompletion()
.MaxJobsActive(3)
.Name("jobWorker")
.Timeout(TimeSpan.FromSeconds(1))
.PollInterval(TimeSpan.FromSeconds(3)) // long
.PollingTimeout(TimeSpan.FromMilliseconds(100))
.Open();
Assert.True(jobWorker.IsOpen());
signal.WaitOne();
// disposal must be quick even though the polling interval is long
await jobWorker.DisposeAsync().ConfigureAwait(false);
Assert.True(jobWorker.IsClosed());
}

Copilot uses AI. Check for mistakes.
@ChrisKujawa
Copy link
Collaborator

@nloding lets look at this next together

@ChrisKujawa ChrisKujawa requested a review from nloding December 7, 2025 18:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants