Skip to content

Commit 3ab612d

Browse files
committed
[Fix #934] Implementing function catalog
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 5e40aef commit 3ab612d

File tree

7 files changed

+155
-29
lines changed

7 files changed

+155
-29
lines changed

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ public static <T, R> FuncCallStep<T, R> withInstanceId(
327327
}
328328

329329
/**
330-
* Named variant of {@link #withContext(JavaContextFunction, Class)}.
330+
* NamedObject variant of {@link #withContext(JavaContextFunction, Class)}.
331331
*
332332
* @param name task name
333333
* @param fn context-aware function
@@ -359,7 +359,7 @@ public static <T, R> FuncCallStep<T, R> withFilter(JavaFilterFunction<T, R> fn,
359359
}
360360

361361
/**
362-
* Named variant of {@link #withFilter(JavaFilterFunction, Class)}.
362+
* NamedObject variant of {@link #withFilter(JavaFilterFunction, Class)}.
363363
*
364364
* @param name task name
365365
* @param fn context-aware filter function
@@ -374,7 +374,7 @@ public static <T, R> FuncCallStep<T, R> withFilter(
374374
}
375375

376376
/**
377-
* Named variant of {@link #withInstanceId(InstanceIdBiFunction, Class)}.
377+
* NamedObject variant of {@link #withInstanceId(InstanceIdBiFunction, Class)}.
378378
*
379379
* @param name task name
380380
* @param fn instance-id-aware function
@@ -450,7 +450,7 @@ public static <T> ConsumeStep<T> consume(Consumer<T> consumer, Class<T> clazz) {
450450
}
451451

452452
/**
453-
* Named variant of {@link #consume(Consumer, Class)}.
453+
* NamedObject variant of {@link #consume(Consumer, Class)}.
454454
*
455455
* @param name task name
456456
* @param consumer side-effect function
@@ -480,7 +480,7 @@ public static <T, R> FuncCallStep<T, R> agent(UniqueIdBiFunction<T, R> fn, Class
480480
}
481481

482482
/**
483-
* Named agent-style sugar. See {@link #agent(UniqueIdBiFunction, Class)}.
483+
* NamedObject agent-style sugar. See {@link #agent(UniqueIdBiFunction, Class)}.
484484
*
485485
* <p>Signature expected: {@code (uniqueId, payload) -> result}
486486
*
@@ -511,7 +511,7 @@ public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn) {
511511
}
512512

513513
/**
514-
* Named variant of {@link #function(Function)} with inferred input type.
514+
* NamedObject variant of {@link #function(Function)} with inferred input type.
515515
*
516516
* @param name task name
517517
* @param fn the function to execute
@@ -525,7 +525,7 @@ public static <T, R> FuncCallStep<T, R> function(String name, Function<T, R> fn)
525525
}
526526

527527
/**
528-
* Named variant of {@link #function(Function, Class)} with explicit input type.
528+
* NamedObject variant of {@link #function(Function, Class)} with explicit input type.
529529
*
530530
* @param name task name
531531
* @param fn the function to execute
@@ -565,7 +565,7 @@ public static EmitStep emit(Consumer<FuncEmitTaskBuilder> cfg) {
565565
}
566566

567567
/**
568-
* Named variant of {@link #emit(Consumer)}.
568+
* NamedObject variant of {@link #emit(Consumer)}.
569569
*
570570
* @param name task name
571571
* @param cfg emit builder configurer
@@ -588,7 +588,7 @@ public static <T> EmitStep emit(String type, Function<T, CloudEventData> fn) {
588588
}
589589

590590
/**
591-
* Named variant of {@link #emit(String, Function)}.
591+
* NamedObject variant of {@link #emit(String, Function)}.
592592
*
593593
* @param name task name
594594
* @param type CloudEvent type
@@ -664,7 +664,7 @@ public static ListenStep listen(FuncListenSpec spec) {
664664
}
665665

666666
/**
667-
* Named variant of {@link #listen(FuncListenSpec)}.
667+
* NamedObject variant of {@link #listen(FuncListenSpec)}.
668668
*
669669
* @param name task name
670670
* @param spec listen spec
@@ -709,7 +709,7 @@ public static FuncTaskConfigurer switchCase(SwitchCaseConfigurer... cases) {
709709
}
710710

711711
/**
712-
* Named variant of {@link #switchCase(SwitchCaseConfigurer...)}.
712+
* NamedObject variant of {@link #switchCase(SwitchCaseConfigurer...)}.
713713
*
714714
* @param taskName task name
715715
* @param cases case configurers
@@ -1078,7 +1078,7 @@ public static FuncCallOpenAPIStep openapi() {
10781078
}
10791079

10801080
/**
1081-
* Named variant of {@link #openapi()}.
1081+
* NamedObject variant of {@link #openapi()}.
10821082
*
10831083
* @param name task name to be used when the spec is attached via {@link
10841084
* #call(FuncCallOpenAPIStep)}
@@ -1109,7 +1109,7 @@ public static FuncCallHttpStep http() {
11091109
}
11101110

11111111
/**
1112-
* Named variant of {@link #http()}.
1112+
* NamedObject variant of {@link #http()}.
11131113
*
11141114
* @param name task name to be used when the spec is attached via {@link #call(FuncCallHttpStep)}
11151115
* @return a new named {@link FuncCallHttpStep}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
public interface NamedObject {
19+
String name();
20+
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.serverlessworkflow.api.types.Workflow;
2222
import io.serverlessworkflow.impl.additional.NamedWorkflowAdditionalObject;
2323
import io.serverlessworkflow.impl.additional.WorkflowAdditionalObject;
24+
import io.serverlessworkflow.impl.catalogs.CatalogNavigator;
25+
import io.serverlessworkflow.impl.catalogs.NamedCatalogNavigator;
2426
import io.serverlessworkflow.impl.config.ConfigManager;
2527
import io.serverlessworkflow.impl.config.ConfigSecretManager;
2628
import io.serverlessworkflow.impl.config.SecretManager;
@@ -76,6 +78,7 @@ public class WorkflowApplication implements AutoCloseable {
7678
private final SecretManager secretManager;
7779
private final SchedulerListener schedulerListener;
7880
private final Optional<URITemplateResolver> templateResolver;
81+
private final Map<String, CatalogNavigator> catalogRegistry;
7982

8083
private WorkflowApplication(Builder builder) {
8184
this.taskFactory = builder.taskFactory;
@@ -98,6 +101,7 @@ private WorkflowApplication(Builder builder) {
98101
this.configManager = builder.configManager;
99102
this.secretManager = builder.secretManager;
100103
this.templateResolver = builder.templateResolver;
104+
this.catalogRegistry = builder.catalogRegistry;
101105
}
102106

103107
public TaskExecutorFactory taskFactory() {
@@ -178,10 +182,13 @@ public SchemaValidator getValidator(SchemaInline inline) {
178182
private ConfigManager configManager;
179183
private SchedulerListener schedulerListener;
180184
private Optional<URITemplateResolver> templateResolver;
185+
private Map<String, CatalogNavigator> catalogRegistry = new HashMap<>();
181186

182187
private Builder() {
183188
ServiceLoader.load(NamedWorkflowAdditionalObject.class)
184189
.forEach(a -> additionalObjects.put(a.name(), a));
190+
ServiceLoader.load(NamedCatalogNavigator.class)
191+
.forEach(a -> catalogRegistry.put(a.name(), a));
185192
}
186193

187194
public Builder withListener(WorkflowExecutionListener listener) {
@@ -270,6 +277,11 @@ public Builder withModelFactory(WorkflowModelFactory modelFactory) {
270277
return this;
271278
}
272279

280+
public Builder withCatalogRegistry(String name, CatalogNavigator catalog) {
281+
catalogRegistry.put(name, catalog);
282+
return this;
283+
}
284+
273285
public WorkflowApplication build() {
274286
if (modelFactory == null) {
275287
modelFactory =
@@ -328,6 +340,7 @@ public WorkflowApplication build() {
328340
.findFirst()
329341
.orElseGet(() -> new ConfigSecretManager(configManager));
330342
}
343+
331344
templateResolver = ServiceLoader.load(URITemplateResolver.class).findFirst();
332345
return new WorkflowApplication(this);
333346
}
@@ -406,6 +419,10 @@ public Optional<URITemplateResolver> templateResolver() {
406419
return templateResolver;
407420
}
408421

422+
public Map<String, CatalogNavigator> catalogRegistry() {
423+
return catalogRegistry;
424+
}
425+
409426
public <T> Optional<T> additionalObject(
410427
String name, WorkflowContext workflowContext, TaskContext taskContext) {
411428
return Optional.ofNullable(additionalObjects.get(name))

impl/core/src/main/java/io/serverlessworkflow/impl/additional/NamedWorkflowAdditionalObject.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.impl.additional;
1717

18-
public interface NamedWorkflowAdditionalObject<T> extends WorkflowAdditionalObject<T> {
19-
String name();
20-
}
18+
import io.serverlessworkflow.impl.NamedObject;
19+
20+
public interface NamedWorkflowAdditionalObject<T>
21+
extends WorkflowAdditionalObject<T>, NamedObject {}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.catalogs;
17+
18+
import io.serverlessworkflow.api.types.Catalog;
19+
import io.serverlessworkflow.api.types.Task;
20+
import io.serverlessworkflow.impl.ServicePriority;
21+
import java.util.Optional;
22+
23+
public interface CatalogNavigator extends ServicePriority {
24+
Optional<Task> resolveTask(Catalog catalog, String functionName);
25+
26+
Optional<Task> resolveTask(String functionEndpoint);
27+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.catalogs;
17+
18+
import io.serverlessworkflow.impl.NamedObject;
19+
20+
public interface NamedCatalogNavigator extends CatalogNavigator, NamedObject {}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallFunctionExecutor.java

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,44 +16,85 @@
1616
package io.serverlessworkflow.impl.executors;
1717

1818
import io.serverlessworkflow.api.types.CallFunction;
19+
import io.serverlessworkflow.api.types.Catalog;
1920
import io.serverlessworkflow.api.types.FunctionArguments;
2021
import io.serverlessworkflow.api.types.Task;
2122
import io.serverlessworkflow.api.types.TaskBase;
23+
import io.serverlessworkflow.api.types.Use;
24+
import io.serverlessworkflow.api.types.UseCatalogs;
25+
import io.serverlessworkflow.api.types.UseFunctions;
2226
import io.serverlessworkflow.impl.WorkflowDefinition;
2327
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2428
import io.serverlessworkflow.impl.WorkflowUtils;
2529
import io.serverlessworkflow.impl.WorkflowValueResolver;
30+
import io.serverlessworkflow.impl.catalogs.CatalogNavigator;
2631
import java.util.Map;
2732
import java.util.Optional;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
2835

2936
public class CallFunctionExecutor implements CallableTaskBuilder<CallFunction> {
3037

38+
private static final Logger logger = LoggerFactory.getLogger(CallFunctionExecutor.class);
3139
private TaskExecutorBuilder<? extends TaskBase> executorBuilder;
3240
private WorkflowValueResolver<Map<String, Object>> args;
3341

3442
@Override
3543
public void init(
3644
CallFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) {
3745
String functionName = task.getCall();
38-
FunctionArguments functionArgs = task.getWith();
39-
args =
40-
functionArgs != null
41-
? WorkflowUtils.buildMapResolver(
42-
definition.application(), functionArgs.getAdditionalProperties())
43-
: (w, t, m) -> Map.of();
46+
47+
Use use = definition.workflow().getUse();
4448
Task function = null;
45-
if (definition.workflow().getUse() != null
46-
&& definition.workflow().getUse().getFunctions() != null
47-
&& definition.workflow().getUse().getFunctions().getAdditionalProperties() != null) {
48-
function =
49-
definition.workflow().getUse().getFunctions().getAdditionalProperties().get(functionName);
49+
if (use != null) {
50+
UseFunctions functions = use.getFunctions();
51+
if (functions != null) {
52+
function = functions.getAdditionalProperties().get(functionName);
53+
}
54+
if (function == null) {
55+
int indexOf = functionName.indexOf('@');
56+
if (indexOf > 0) {
57+
String catalogName = functionName.substring(indexOf + 1);
58+
UseCatalogs catalogs = use.getCatalogs();
59+
if (catalogs != null) {
60+
Catalog catalog = catalogs.getAdditionalProperties().get(catalogName);
61+
CatalogNavigator catalogNavigator =
62+
definition.application().catalogRegistry().get(catalogName);
63+
if (catalogNavigator == null) {
64+
throw new IllegalArgumentException(
65+
"There is not catalog registered for name " + catalogName);
66+
}
67+
function =
68+
catalogNavigator
69+
.resolveTask(catalog, functionName.substring(0, indexOf))
70+
.orElseThrow(
71+
() ->
72+
new IllegalArgumentException(
73+
"Cannot find function "
74+
+ functionName
75+
+ " in catalog "
76+
+ catalogName));
77+
}
78+
}
79+
}
5080
}
5181
if (function == null) {
52-
// TODO search in catalog
53-
throw new UnsupportedOperationException("Function Catalog not supported yet");
82+
// asume name is direct pointer to catalog file
83+
function =
84+
definition.application().catalogRegistry().values().stream()
85+
.flatMap(c -> c.resolveTask(functionName).stream())
86+
.findFirst()
87+
.orElseThrow(
88+
() -> new IllegalArgumentException("Cannot find function name " + functionName));
5489
}
5590
executorBuilder =
5691
definition.application().taskFactory().getTaskExecutor(position, function, definition);
92+
FunctionArguments functionArgs = task.getWith();
93+
args =
94+
functionArgs != null
95+
? WorkflowUtils.buildMapResolver(
96+
definition.application(), functionArgs.getAdditionalProperties())
97+
: (w, t, m) -> Map.of();
5798
}
5899

59100
@Override

0 commit comments

Comments
 (0)