Skip to content

Commit d6773ef

Browse files
author
meiliang86
authored
Fix deadlock in sticky decider cache eviction (#272)
1 parent a711af1 commit d6773ef

File tree

6 files changed

+209
-126
lines changed

6 files changed

+209
-126
lines changed

src/main/java/com/uber/cadence/internal/replay/DeciderCache.java

Lines changed: 55 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,22 @@
1818
package com.uber.cadence.internal.replay;
1919

2020
import com.google.common.base.Preconditions;
21-
import com.google.common.base.Throwables;
2221
import com.google.common.cache.CacheBuilder;
2322
import com.google.common.cache.CacheLoader;
2423
import com.google.common.cache.LoadingCache;
25-
import com.google.common.util.concurrent.ExecutionError;
26-
import com.google.common.util.concurrent.UncheckedExecutionException;
2724
import com.uber.cadence.PollForDecisionTaskResponse;
28-
import com.uber.cadence.internal.common.ThrowableFunc1;
2925
import com.uber.cadence.internal.metrics.MetricsType;
3026
import com.uber.m3.tally.Scope;
31-
import java.util.Iterator;
32-
import java.util.Objects;
33-
import java.util.Random;
34-
import java.util.Set;
35-
import java.util.concurrent.TimeUnit;
36-
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.*;
28+
import java.util.concurrent.Callable;
3729
import java.util.concurrent.locks.Lock;
3830
import java.util.concurrent.locks.ReentrantLock;
39-
import org.slf4j.Logger;
40-
import org.slf4j.LoggerFactory;
4131

4232
public final class DeciderCache {
4333
private final Scope metricsScope;
4434
private LoadingCache<String, Decider> cache;
45-
private Lock evictionLock = new ReentrantLock();
46-
Random rand = new Random();
47-
48-
private static final Logger log = LoggerFactory.getLogger(DeciderCache.class);
35+
private Lock cacheLock = new ReentrantLock();
36+
private Set<String> inProcessing = new HashSet<>();
4937

5038
public DeciderCache(int maxCacheSize, Scope scope) {
5139
Preconditions.checkArgument(maxCacheSize > 0, "Max cache size must be greater than 0");
@@ -70,83 +58,79 @@ public Decider load(String key) {
7058
}
7159

7260
public Decider getOrCreate(
73-
PollForDecisionTaskResponse decisionTask,
74-
ThrowableFunc1<PollForDecisionTaskResponse, Decider, Exception> createReplayDecider)
75-
throws Exception {
61+
PollForDecisionTaskResponse decisionTask, Callable<Decider> deciderFunc) throws Exception {
7662
String runId = decisionTask.getWorkflowExecution().getRunId();
77-
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
7863
if (isFullHistory(decisionTask)) {
79-
invalidate(decisionTask);
80-
return cache.get(runId, () -> createReplayDecider.apply(decisionTask));
64+
invalidate(runId);
65+
return deciderFunc.call();
66+
}
67+
68+
Decider decider = getForProcessing(runId);
69+
if (decider != null) {
70+
return decider;
8171
}
82-
AtomicBoolean miss = new AtomicBoolean();
83-
Decider result = null;
72+
return deciderFunc.call();
73+
}
74+
75+
private Decider getForProcessing(String runId) throws Exception {
76+
cacheLock.lock();
8477
try {
85-
result =
86-
cache.get(
87-
runId,
88-
() -> {
89-
miss.set(true);
90-
return createReplayDecider.apply(decisionTask);
91-
});
92-
} catch (UncheckedExecutionException | ExecutionError e) {
93-
Throwables.throwIfUnchecked(e.getCause());
78+
Decider decider = cache.get(runId);
79+
inProcessing.add(runId);
80+
metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
81+
return decider;
82+
} catch (CacheLoader.InvalidCacheLoadException e) {
83+
// We don't have a default loader and don't want to have one. So it's ok to get null value.
84+
metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
85+
return null;
9486
} finally {
95-
if (miss.get()) {
96-
metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
97-
} else {
98-
metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
99-
}
87+
cacheLock.unlock();
10088
}
101-
return result;
10289
}
10390

104-
public void evictAny(String runId) throws InterruptedException {
105-
// Timeout is to guard against workflows trying to evict each other.
106-
if (!evictionLock.tryLock(rand.nextInt(4), TimeUnit.SECONDS)) {
107-
return;
91+
void markProcessingDone(PollForDecisionTaskResponse decisionTask) {
92+
String runId = decisionTask.getWorkflowExecution().getRunId();
93+
94+
cacheLock.lock();
95+
try {
96+
inProcessing.remove(runId);
97+
} finally {
98+
cacheLock.unlock();
10899
}
100+
}
101+
102+
public void addToCache(PollForDecisionTaskResponse decisionTask, Decider decider) {
103+
String runId = decisionTask.getWorkflowExecution().getRunId();
104+
cache.put(runId, decider);
105+
}
106+
107+
public boolean evictAnyNotInProcessing(String runId) {
108+
cacheLock.lock();
109109
try {
110110
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
111-
Set<String> set = cache.asMap().keySet();
112-
if (set.isEmpty()) {
113-
return;
114-
}
115-
Iterator<String> iter = cache.asMap().keySet().iterator();
116-
String key = "";
117-
while (iter.hasNext()) {
118-
key = iter.next();
119-
if (!key.equals(runId)) {
120-
break;
111+
for (String key : cache.asMap().keySet()) {
112+
if (!key.equals(runId) && !inProcessing.contains(key)) {
113+
cache.invalidate(key);
114+
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
115+
metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
116+
return true;
121117
}
122118
}
123119

124-
if (key.equals(runId)) {
125-
log.warn(String.format("%s attempted to self evict. Ignoring eviction", runId));
126-
return;
127-
}
128-
cache.invalidate(key);
129-
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
130-
metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
120+
return false;
131121
} finally {
132-
evictionLock.unlock();
122+
cacheLock.unlock();
133123
}
134124
}
135125

136-
public void invalidate(PollForDecisionTaskResponse decisionTask) throws InterruptedException {
137-
String runId = decisionTask.getWorkflowExecution().getRunId();
138-
invalidate(runId);
139-
}
140-
141-
private void invalidate(String runId) throws InterruptedException {
142-
if (!evictionLock.tryLock(rand.nextInt(4), TimeUnit.SECONDS)) {
143-
return;
144-
}
126+
void invalidate(String runId) {
127+
cacheLock.lock();
145128
try {
146129
cache.invalidate(runId);
130+
inProcessing.remove(runId);
147131
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
148132
} finally {
149-
evictionLock.unlock();
133+
cacheLock.unlock();
150134
}
151135
}
152136

@@ -163,11 +147,4 @@ private boolean isFullHistory(PollForDecisionTaskResponse decisionTask) {
163147
public void invalidateAll() {
164148
cache.invalidateAll();
165149
}
166-
167-
public static class EvictedException extends Exception {
168-
169-
public EvictedException(String runId) {
170-
super(String.format("cache was evicted for the decisionTask. RunId: %s", runId));
171-
}
172-
}
173150
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.time.Duration;
3333
import java.util.List;
3434
import java.util.Objects;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

@@ -115,12 +116,26 @@ private Result handleDecisionTaskImpl(PollForDecisionTaskResponse decisionTask)
115116

116117
private Result processDecision(PollForDecisionTaskResponse decisionTask) throws Throwable {
117118
Decider decider = null;
119+
AtomicBoolean createdNew = new AtomicBoolean();
118120
try {
119-
decider =
120-
stickyTaskListName == null
121-
? createDecider(decisionTask)
122-
: cache.getOrCreate(decisionTask, this::createDecider);
121+
if (stickyTaskListName == null) {
122+
decider = createDecider(decisionTask);
123+
} else {
124+
decider =
125+
cache.getOrCreate(
126+
decisionTask,
127+
() -> {
128+
createdNew.set(true);
129+
return createDecider(decisionTask);
130+
});
131+
}
132+
123133
List<Decision> decisions = decider.decide(decisionTask);
134+
135+
if (stickyTaskListName != null && createdNew.get()) {
136+
cache.addToCache(decisionTask, decider);
137+
}
138+
124139
if (log.isTraceEnabled()) {
125140
WorkflowExecution execution = decisionTask.getWorkflowExecution();
126141
log.trace(
@@ -148,12 +163,14 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws
148163
return createCompletedRequest(decisionTask, decisions);
149164
} catch (Throwable e) {
150165
if (stickyTaskListName != null) {
151-
cache.invalidate(decisionTask);
166+
cache.invalidate(decisionTask.getWorkflowExecution().getRunId());
152167
}
153168
throw e;
154169
} finally {
155170
if (stickyTaskListName == null && decider != null) {
156171
decider.close();
172+
} else {
173+
cache.markProcessingDone(decisionTask);
157174
}
158175
}
159176
}
@@ -162,12 +179,24 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) {
162179
RespondQueryTaskCompletedRequest queryCompletedRequest = new RespondQueryTaskCompletedRequest();
163180
queryCompletedRequest.setTaskToken(decisionTask.getTaskToken());
164181
Decider decider = null;
182+
AtomicBoolean createdNew = new AtomicBoolean();
165183
try {
166-
decider =
167-
stickyTaskListName == null
168-
? createDecider(decisionTask)
169-
: cache.getOrCreate(decisionTask, this::createDecider);
184+
if (stickyTaskListName == null) {
185+
decider = createDecider(decisionTask);
186+
} else {
187+
decider =
188+
cache.getOrCreate(
189+
decisionTask,
190+
() -> {
191+
createdNew.set(true);
192+
return createDecider(decisionTask);
193+
});
194+
}
195+
170196
byte[] queryResult = decider.query(decisionTask, decisionTask.getQuery());
197+
if (stickyTaskListName != null && createdNew.get()) {
198+
cache.addToCache(decisionTask, decider);
199+
}
171200
queryCompletedRequest.setQueryResult(queryResult);
172201
queryCompletedRequest.setCompletedType(QueryTaskCompletedType.COMPLETED);
173202
} catch (Throwable e) {
@@ -180,6 +209,8 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) {
180209
} finally {
181210
if (stickyTaskListName == null && decider != null) {
182211
decider.close();
212+
} else {
213+
cache.markProcessingDone(decisionTask);
183214
}
184215
}
185216
return new Result(null, null, queryCompletedRequest, null);

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -236,31 +236,27 @@ public void start() {
236236
.gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT)
237237
.update(((ThreadPoolExecutor) threadPool).getActiveCount());
238238

239-
try {
240-
taskFuture = threadPool.submit(task);
241-
return;
242-
} catch (RejectedExecutionException e) {
243-
getDecisionContext()
244-
.getMetricsScope()
245-
.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION)
246-
.inc(1);
239+
while (true) {
247240
try {
241+
taskFuture = threadPool.submit(task);
242+
return;
243+
} catch (RejectedExecutionException e) {
244+
getDecisionContext()
245+
.getMetricsScope()
246+
.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION)
247+
.inc(1);
248248
if (cache != null) {
249-
cache.evictAny(this.runner.getDecisionContext().getContext().getRunId());
249+
boolean evicted =
250+
cache.evictAnyNotInProcessing(
251+
this.runner.getDecisionContext().getContext().getRunId());
252+
if (!evicted) {
253+
throw e;
254+
}
255+
} else {
256+
throw e;
250257
}
251-
} catch (InterruptedException e1) {
252-
log.warn("Unable to evict cache", e1);
253258
}
254259
}
255-
256-
try {
257-
taskFuture = threadPool.submit(task);
258-
} catch (RejectedExecutionException e) {
259-
throw new Error(
260-
"Not enough threads to execute workflows. "
261-
+ "If this message appears consistently either WorkerOptions.maxConcurrentWorkflowExecutionSize "
262-
+ "should be decreased or WorkerOptions.maxWorkflowThreads increased.");
263-
}
264260
}
265261

266262
public WorkflowThreadContext getContext() {

0 commit comments

Comments
 (0)