5151import java .util .concurrent .CancellationException ;
5252import java .util .concurrent .TimeUnit ;
5353import java .util .concurrent .atomic .AtomicReference ;
54+ import java .util .concurrent .locks .Lock ;
55+ import java .util .concurrent .locks .ReentrantLock ;
5456import java .util .function .BiFunction ;
5557import java .util .function .Consumer ;
5658import org .apache .thrift .TException ;
5961 * Implements decider that relies on replay of a workflow code. An instance of this class is created
6062 * per decision.
6163 */
62- class ReplayDecider implements Decider , Consumer < HistoryEvent > {
64+ class ReplayDecider implements Decider {
6365
6466 private static final int MAXIMUM_PAGE_SIZE = 10000 ;
6567
@@ -75,6 +77,8 @@ class ReplayDecider implements Decider, Consumer<HistoryEvent> {
7577 private final Scope metricsScope ;
7678 private final long wfStartTimeNanos ;
7779 private final WorkflowExecutionStartedEventAttributes startedEvent ;
80+ private final Lock lock = new ReentrantLock ();
81+ private final Consumer <HistoryEvent > localActivityCompletionSink ;
7882
7983 ReplayDecider (
8084 IWorkflowService service ,
@@ -100,6 +104,19 @@ class ReplayDecider implements Decider, Consumer<HistoryEvent> {
100104 context =
101105 new DecisionContextImpl (
102106 decisionsHelper , domain , decisionTask , startedEvent , options , laTaskPoller , this );
107+ localActivityCompletionSink =
108+ historyEvent -> {
109+ lock .lock ();
110+ try {
111+ processEvent (historyEvent );
112+ } finally {
113+ lock .unlock ();
114+ }
115+ };
116+ }
117+
118+ Lock getLock () {
119+ return lock ;
103120 }
104121
105122 private void handleWorkflowExecutionStarted (HistoryEvent event ) {
@@ -356,8 +373,13 @@ private void handleWorkflowExecutionSignaled(HistoryEvent event) {
356373
357374 @ Override
358375 public DecisionResult decide (PollForDecisionTaskResponse decisionTask ) throws Throwable {
359- boolean forceCreateNewDecisionTask = decideImpl (decisionTask , null );
360- return new DecisionResult (decisionsHelper .getDecisions (), forceCreateNewDecisionTask );
376+ lock .lock ();
377+ try {
378+ boolean forceCreateNewDecisionTask = decideImpl (decisionTask , null );
379+ return new DecisionResult (decisionsHelper .getDecisions (), forceCreateNewDecisionTask );
380+ } finally {
381+ lock .unlock ();
382+ }
361383 }
362384
363385 // Returns boolean to indicate whether we need to force create new decision task for local
@@ -536,19 +558,28 @@ int getDecisionTimeoutSeconds() {
536558
537559 @ Override
538560 public void close () {
539- workflow .close ();
561+ lock .lock ();
562+ try {
563+ workflow .close ();
564+ } finally {
565+ lock .unlock ();
566+ }
540567 }
541568
542569 @ Override
543570 public byte [] query (PollForDecisionTaskResponse response , WorkflowQuery query ) throws Throwable {
544- AtomicReference <byte []> result = new AtomicReference <>();
545- decideImpl (response , () -> result .set (workflow .query (query )));
546- return result .get ();
571+ lock .lock ();
572+ try {
573+ AtomicReference <byte []> result = new AtomicReference <>();
574+ decideImpl (response , () -> result .set (workflow .query (query )));
575+ return result .get ();
576+ } finally {
577+ lock .unlock ();
578+ }
547579 }
548580
549- @ Override
550- public void accept (HistoryEvent event ) {
551- processEvent (event );
581+ public Consumer <HistoryEvent > getLocalActivityCompletionSink () {
582+ return localActivityCompletionSink ;
552583 }
553584
554585 private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHistoryIterator {
@@ -580,7 +611,12 @@ private final Duration retryServiceOperationExpirationInterval() {
580611
581612 @ Override
582613 public PollForDecisionTaskResponse getDecisionTask () {
583- return task ;
614+ lock .lock ();
615+ try {
616+ return task ;
617+ } finally {
618+ lock .unlock ();
619+ }
584620 }
585621
586622 @ Override
0 commit comments