4545import com .uber .cadence .common .RetryOptions ;
4646import com .uber .cadence .converter .JsonDataConverter ;
4747import com .uber .cadence .internal .sync .DeterministicRunnerTest ;
48+ import com .uber .cadence .internal .worker .PollerOptions ;
4849import com .uber .cadence .serviceclient .WorkflowServiceTChannel ;
4950import com .uber .cadence .testing .TestEnvironmentOptions ;
5051import com .uber .cadence .testing .TestWorkflowEnvironment ;
7879import org .junit .After ;
7980import org .junit .Assert ;
8081import org .junit .Before ;
82+ import org .junit .Ignore ;
8183import org .junit .Rule ;
8284import org .junit .Test ;
8385import org .junit .rules .TestName ;
@@ -184,21 +186,21 @@ private static WorkflowOptions.Builder newWorkflowOptionsBuilder(String taskList
184186
185187 private static ActivityOptions newActivityOptions1 (String taskList ) {
186188 if (DEBUGGER_TIMEOUTS ) {
187- return new ActivityOptions .Builder ()
188- .setTaskList (taskList )
189- .setScheduleToCloseTimeout (Duration .ofSeconds (5 ))
190- .setHeartbeatTimeout (Duration .ofSeconds (5 ))
191- .setScheduleToStartTimeout (Duration .ofSeconds (5 ))
192- .setStartToCloseTimeout (Duration .ofSeconds (10 ))
193- .build ();
194- } else {
195189 return new ActivityOptions .Builder ()
196190 .setTaskList (taskList )
197191 .setScheduleToCloseTimeout (Duration .ofSeconds (1000 ))
198192 .setHeartbeatTimeout (Duration .ofSeconds (1000 ))
199193 .setScheduleToStartTimeout (Duration .ofSeconds (1000 ))
200194 .setStartToCloseTimeout (Duration .ofSeconds (10000 ))
201195 .build ();
196+ } else {
197+ return new ActivityOptions .Builder ()
198+ .setTaskList (taskList )
199+ .setScheduleToCloseTimeout (Duration .ofSeconds (5 ))
200+ .setHeartbeatTimeout (Duration .ofSeconds (5 ))
201+ .setScheduleToStartTimeout (Duration .ofSeconds (5 ))
202+ .setStartToCloseTimeout (Duration .ofSeconds (10 ))
203+ .build ();
202204 }
203205 }
204206
@@ -223,7 +225,11 @@ public void setUp() {
223225 .build ();
224226 workerFactory = new Worker .Factory (service , DOMAIN , factoryOptions );
225227 WorkerOptions workerOptions =
226- new WorkerOptions .Builder ().setInterceptorFactory (tracer ).build ();
228+ new WorkerOptions .Builder ()
229+ .setActivityPollerOptions (new PollerOptions .Builder ().setPollThreadCount (5 ).build ())
230+ .setMaxConcurrentActivityExecutionSize (1000 )
231+ .setInterceptorFactory (tracer )
232+ .build ();
227233 worker = workerFactory .newWorker (taskList , workerOptions );
228234 workflowClient = WorkflowClient .newInstance (service , DOMAIN );
229235 WorkflowClientOptions clientOptions =
@@ -3645,6 +3651,56 @@ public void testGenericParametersWorkflow() throws ExecutionException, Interrupt
36453651 assertEquals (expectedResult , result );
36463652 }
36473653
3654+ public interface TestLargeWorkflow {
3655+
3656+ @ WorkflowMethod
3657+ String execute (int activityCount , String taskList );
3658+ }
3659+
3660+ public interface TestLargeWorkflowActivity {
3661+ String activity ();
3662+ }
3663+
3664+ public static class TestLargeWorkflowActivityImpl implements TestLargeWorkflowActivity {
3665+
3666+ @ Override
3667+ public String activity () {
3668+ return "done" ;
3669+ }
3670+ }
3671+
3672+ public static class TestLargeHistory implements TestLargeWorkflow {
3673+
3674+ @ Override
3675+ public String execute (int activityCount , String taskList ) {
3676+ TestLargeWorkflowActivity activities =
3677+ Workflow .newActivityStub (TestLargeWorkflowActivity .class , newActivityOptions1 (taskList ));
3678+ List <Promise <String >> results = new ArrayList <>();
3679+ for (int i = 0 ; i < activityCount ; i ++) {
3680+ Promise <String > result = Async .function (activities ::activity );
3681+ results .add (result );
3682+ }
3683+ Promise .allOf (results ).get ();
3684+ return "done" ;
3685+ }
3686+ }
3687+
3688+ @ Test
3689+ @ Ignore // Requires DEBUG_TIMEOUTS=true
3690+ public void testLargeHistory () {
3691+ final int activityCount = 1000 ;
3692+ worker .registerActivitiesImplementations (new TestLargeWorkflowActivityImpl ());
3693+ startWorkerFor (TestLargeHistory .class );
3694+ TestLargeWorkflow workflowStub =
3695+ workflowClient .newWorkflowStub (
3696+ TestLargeWorkflow .class , newWorkflowOptionsBuilder (taskList ).build ());
3697+ long start = System .currentTimeMillis ();
3698+ String result = workflowStub .execute (activityCount , taskList );
3699+ long duration = System .currentTimeMillis () - start ;
3700+ log .info (testName .toString () + " duration is " + duration );
3701+ assertEquals ("done" , result );
3702+ }
3703+
36483704 private static class FilteredTrace {
36493705
36503706 private final List <String > impl = Collections .synchronizedList (new ArrayList <>());
0 commit comments