diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index 6aadf211..cf0e72ce 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; public class WorkflowMutableInstance implements WorkflowInstance { @@ -44,6 +45,8 @@ public class WorkflowMutableInstance implements WorkflowInstance { protected final WorkflowContext workflowContext; protected Instant startedAt; + protected final Map additionalObjects = new ConcurrentHashMap(); + protected AtomicReference> futureRef = new AtomicReference<>(); protected Instant completedAt; @@ -279,4 +282,8 @@ public boolean cancel() { } public void restoreContext(WorkflowContext workflow, TaskContext context) {} + + public T additionalObject(String key, Supplier supplier) { + return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get()); + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java b/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java index f12c341f..ddb7b5dd 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java @@ -17,6 +17,9 @@ import java.time.Instant; import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Optional; public abstract class AbstractConfigManager implements ConfigManager { @@ -56,5 +59,19 @@ protected T convert(String value, Class propClass) { return propClass.cast(result); } + @Override + public Collection multiConfig(String propName, Class propClass) { + String multiValue = get(propName); + if (multiValue != null) { + Collection result = new ArrayList<>(); + for (String value : multiValue.split(",")) { + result.add(convert(value, propClass)); + } + return result; + } else { + return Collections.emptyList(); + } + } + protected abstract T convertComplex(String value, Class propClass); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java b/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java index 08f2fb0e..d8daa2ae 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java @@ -16,11 +16,14 @@ package io.serverlessworkflow.impl.config; import io.serverlessworkflow.impl.ServicePriority; +import java.util.Collection; import java.util.Optional; public interface ConfigManager extends ServicePriority { Optional config(String propName, Class propClass); + Collection multiConfig(String propName, Class propClass); + Iterable names(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/AbstractScriptRunner.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/AbstractScriptRunner.java new file mode 100644 index 00000000..37921aad --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/AbstractScriptRunner.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scripts; + +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.executors.ProcessResult; +import java.io.ByteArrayOutputStream; + +public abstract class AbstractScriptRunner implements ScriptRunner { + + @Override + public WorkflowModel runScript( + ScriptContext scriptContext, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel input) { + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + try { + runScript(scriptContext, stdout, stderr, workflowContext, taskContext); + return scriptContext + .returnType() + .map( + type -> + modelFromOutput( + type, + workflowContext.definition().application().modelFactory(), + stdout, + stderr)) + .orElse(input); + } catch (Exception ex) { + throw new WorkflowException(WorkflowError.runtime(taskContext, ex).build()); + } + } + + protected abstract void runScript( + ScriptContext scriptContext, + ByteArrayOutputStream stdout, + ByteArrayOutputStream stderr, + WorkflowContext workflowContext, + TaskContext taskContext); + + protected WorkflowModel modelFromOutput( + RunTaskConfiguration.ProcessReturnType returnType, + WorkflowModelFactory modelFactory, + ByteArrayOutputStream stdout, + ByteArrayOutputStream stderr) { + return switch (returnType) { + case ALL -> modelFactory.fromAny(new ProcessResult(0, toString(stdout), toString(stderr))); + case NONE -> modelFactory.fromNull(); + case CODE -> modelFactory.from(0); + case STDOUT -> modelFactory.from(toString(stdout)); + case STDERR -> modelFactory.from(toString(stderr)); + }; + } + + private String toString(ByteArrayOutputStream stream) { + return stream.toString().trim(); + } +} diff --git a/impl/pom.xml b/impl/pom.xml index b052dd52..ef67f386 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -17,6 +17,7 @@ 9.2.1 3.7.0 25.0.1 + 4.2.0 @@ -142,6 +143,11 @@ serverlessworkflow-impl-script-js ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-script-python + ${project.version} + com.cronutils cron-utils @@ -168,6 +174,11 @@ polyglot ${version.org.graalvm.polyglot} + + black.ninia + jep + ${version.black.ninia} + @@ -187,5 +198,6 @@ container test script-js + python diff --git a/impl/python/pom.xml b/impl/python/pom.xml new file mode 100644 index 00000000..cb232423 --- /dev/null +++ b/impl/python/pom.xml @@ -0,0 +1,20 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-script-python + Serverless Workflow :: Impl :: Script Python + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + black.ninia + jep + + + \ No newline at end of file diff --git a/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java b/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java new file mode 100644 index 00000000..da6c25d6 --- /dev/null +++ b/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java @@ -0,0 +1,66 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.script.python; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.config.ConfigManager; +import io.serverlessworkflow.impl.scripts.AbstractScriptRunner; +import io.serverlessworkflow.impl.scripts.ScriptContext; +import io.serverlessworkflow.impl.scripts.ScriptLanguageId; +import java.io.ByteArrayOutputStream; +import java.util.Collection; +import jep.Interpreter; +import jep.SharedInterpreter; + +public class PythonScriptTaskRunner extends AbstractScriptRunner { + + @Override + public ScriptLanguageId identifier() { + return ScriptLanguageId.PYTHON; + } + + private static final String PYTHON_SYS_PATH = "sys.path.append('%s')\n"; + private static final String SEARCH_PATH_PROPERTY = "io.serverlessworkflow.impl."; + + @Override + protected void runScript( + ScriptContext scriptContext, + ByteArrayOutputStream stdout, + ByteArrayOutputStream stderr, + WorkflowContext workflowContext, + TaskContext taskContext) { + Interpreter py = + workflowContext + .instance() + .additionalObject( + "pyInterpreter", + () -> interpreter(workflowContext.definition().application().configManager())); + scriptContext.args().forEach(py::set); + py.exec(scriptContext.code()); + } + + protected Interpreter interpreter(ConfigManager configManager) { + Interpreter py = new SharedInterpreter(); + Collection searchPaths = configManager.multiConfig(SEARCH_PATH_PROPERTY, String.class); + if (!searchPaths.isEmpty()) { + StringBuilder sb = new StringBuilder("import sys\n"); + searchPaths.forEach(path -> sb.append(String.format(PYTHON_SYS_PATH, path))); + py.exec(sb.toString()); + } + return py; + } +} diff --git a/impl/python/src/main/resources/META-INF/services/io.serverlessworkflow.impl.scripts.ScriptRunner b/impl/python/src/main/resources/META-INF/services/io.serverlessworkflow.impl.scripts.ScriptRunner new file mode 100644 index 00000000..347fa206 --- /dev/null +++ b/impl/python/src/main/resources/META-INF/services/io.serverlessworkflow.impl.scripts.ScriptRunner @@ -0,0 +1 @@ +io.serverlessworkflow.impl.executors.script.python.PythonScriptTaskRunner \ No newline at end of file diff --git a/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java b/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java index 5b986cd4..632c494e 100644 --- a/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java +++ b/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java @@ -15,23 +15,15 @@ */ package io.serverlessworkflow.impl.executors.script.js; -import io.serverlessworkflow.api.types.RunTaskConfiguration; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowError; -import io.serverlessworkflow.impl.WorkflowException; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowModelFactory; -import io.serverlessworkflow.impl.executors.ProcessResult; +import io.serverlessworkflow.impl.scripts.AbstractScriptRunner; import io.serverlessworkflow.impl.scripts.ScriptContext; import io.serverlessworkflow.impl.scripts.ScriptLanguageId; import io.serverlessworkflow.impl.scripts.ScriptRunner; import java.io.ByteArrayOutputStream; import java.util.Map; -import java.util.function.Supplier; import org.graalvm.polyglot.Context; -import org.graalvm.polyglot.PolyglotException; import org.graalvm.polyglot.Source; import org.graalvm.polyglot.Value; @@ -39,88 +31,13 @@ * JavaScript implementation of the {@link ScriptRunner} interface that executes JavaScript scripts * using GraalVM Polyglot API. */ -public class JavaScriptScriptTaskRunner implements ScriptRunner { +public class JavaScriptScriptTaskRunner extends AbstractScriptRunner { @Override public ScriptLanguageId identifier() { return ScriptLanguageId.JS; } - @Override - public WorkflowModel runScript( - ScriptContext script, - WorkflowContext workflowContext, - TaskContext taskContext, - WorkflowModel input) { - WorkflowApplication application = workflowContext.definition().application(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - try (Context ctx = - Context.newBuilder() - .err(stderr) - .out(stdout) - .useSystemExit(true) - .allowCreateProcess(false) - .option("engine.WarnInterpreterOnly", "false") - .build()) { - - script - .args() - .forEach( - (key, val) -> { - ctx.getBindings(identifier().getLang()).putMember(key, val); - }); - configureProcessEnv(ctx, script.envs()); - ctx.eval(Source.create(identifier().getLang(), script.code())); - return script - .returnType() - .map( - type -> - modelFromOutput( - type, application.modelFactory(), stdout, () -> stderr.toString())) - .orElse(input); - } catch (PolyglotException e) { - if (e.getExitStatus() != 0 || e.isSyntaxError()) { - throw new WorkflowException(WorkflowError.runtime(taskContext, e).build()); - } else { - return script - .returnType() - .map( - type -> - modelFromOutput( - type, application.modelFactory(), stdout, () -> buildStderr(e, stderr))) - .orElse(input); - } - } - } - - private WorkflowModel modelFromOutput( - RunTaskConfiguration.ProcessReturnType returnType, - WorkflowModelFactory modelFactory, - ByteArrayOutputStream stdout, - Supplier stderr) { - return switch (returnType) { - case ALL -> - modelFactory.fromAny(new ProcessResult(0, stdout.toString().trim(), stderr.get().trim())); - case NONE -> modelFactory.fromNull(); - case CODE -> modelFactory.from(0); - case STDOUT -> modelFactory.from(stdout.toString().trim()); - case STDERR -> modelFactory.from(stderr.get().trim()); - }; - } - - /* - * Gets the stderr message from the PolyglotException or the stderr stream. - * - * @param e the {@link PolyglotException} thrown during script execution - * @param stderr the stderr stream - * @return the stderr message - */ - private String buildStderr(PolyglotException e, ByteArrayOutputStream stderr) { - String err = stderr.toString(); - return err.isBlank() ? e.getMessage() : err.trim(); - } - /* * Configures the process.env object in the JavaScript context with the provided environment * variables. @@ -138,4 +55,25 @@ private void configureProcessEnv(Context context, Map envs) { } bindings.putMember("process", process); } + + @Override + protected void runScript( + ScriptContext scriptContext, + ByteArrayOutputStream stdout, + ByteArrayOutputStream stderr, + WorkflowContext workflowContext, + TaskContext taskContext) { + try (Context ctx = + Context.newBuilder() + .err(stderr) + .out(stdout) + .useSystemExit(true) + .allowCreateProcess(false) + .option("engine.WarnInterpreterOnly", "false") + .build()) { + scriptContext.args().forEach(ctx.getBindings(identifier().getLang())::putMember); + configureProcessEnv(ctx, scriptContext.envs()); + ctx.eval(Source.create(identifier().getLang(), scriptContext.code())); + } + } } diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 444ae5c1..fd47aae9 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -37,6 +37,10 @@ io.serverlessworkflow serverlessworkflow-impl-script-js + + io.serverlessworkflow + serverlessworkflow-impl-script-python + org.glassfish.jersey.media jersey-media-json-jackson diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java new file mode 100644 index 00000000..387e83d6 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowException; +import java.io.IOException; +import java.util.Map; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class CustomFunctionTest { + + private static WorkflowApplication app; + + @BeforeAll + static void init() { + app = WorkflowApplication.builder().build(); + } + + @AfterAll + static void cleanup() { + app.close(); + } + + @Test + void testCustomFunction() { + assertThatThrownBy( + () -> + app.workflowDefinition( + readWorkflowFromClasspath( + "workflows-samples/call-custom-function-inline.yaml")) + .instance(Map.of()) + .start() + .join()) + .hasCauseInstanceOf(WorkflowException.class) + .extracting(w -> ((WorkflowException) w.getCause()).getWorkflowError().status()) + .isEqualTo(404); + } + + @ParameterizedTest + @ValueSource( + strings = { + "workflows-samples/call-custom-function-cataloged.yaml", + "workflows-samples/call-custom-function-cataloged-global.yaml" + }) + void testCustomCatalogFunction(String fileName) throws IOException { + app.workflowDefinition(readWorkflowFromClasspath(fileName)).instance(Map.of()).start().join(); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java index c60e5b73..2a217fc0 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java @@ -121,36 +121,4 @@ void testTimeout() throws IOException { .orElseThrow(); assertThat(result.get("message")).isEqualTo("Viva er Beti Balompie"); } - - @Test - void testCustomFunction() { - assertThatThrownBy( - () -> - app.workflowDefinition( - readWorkflowFromClasspath( - "workflows-samples/call-custom-function-inline.yaml")) - .instance(Map.of()) - .start() - .join()) - .hasCauseInstanceOf(WorkflowException.class) - .extracting(w -> ((WorkflowException) w.getCause()).getWorkflowError().status()) - .isEqualTo(404); - } - - @ParameterizedTest - @ValueSource( - strings = { - "workflows-samples/call-custom-function-cataloged.yaml", - "workflows-samples/call-custom-function-cataloged-global.yaml" - }) - void testCustomCatalogFunction(String fileName) throws IOException { - assertThatThrownBy( - () -> - app.workflowDefinition(readWorkflowFromClasspath(fileName)) - .instance(Map.of()) - .start() - .join()) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("No script runner implementation found for language PYTHON"); - } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java index 24832ebf..7ec221c6 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java @@ -15,11 +15,13 @@ */ package io.serverlessworkflow.impl.test; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.executors.ProcessResult; import java.io.IOException; import java.util.Map; import okhttp3.mockwebserver.MockResponse; @@ -126,13 +128,9 @@ void testFunctionThrowingError() throws IOException { WorkflowReader.readWorkflowFromClasspath( "workflows-samples/run-script/function-with-throw.yaml"); try (WorkflowApplication appl = WorkflowApplication.builder().build()) { - WorkflowModel model = appl.workflowDefinition(workflow).instance(Map.of()).start().join(); - - SoftAssertions.assertSoftly( - softly -> { - softly.assertThat(model.asText()).isPresent(); - softly.assertThat(model.asText().get()).isEqualTo("Error: This is a test error"); - }); + assertThatThrownBy(() -> appl.workflowDefinition(workflow).instance(Map.of()).start().join()) + .hasCauseInstanceOf(WorkflowException.class) + .hasMessageContaining("test error"); } } @@ -142,15 +140,9 @@ void testFunctionThrowingErrorAndReturnAll() throws IOException { WorkflowReader.readWorkflowFromClasspath( "workflows-samples/run-script/function-with-throw-all.yaml"); try (WorkflowApplication appl = WorkflowApplication.builder().build()) { - WorkflowModel model = appl.workflowDefinition(workflow).instance(Map.of()).start().join(); - - SoftAssertions.assertSoftly( - softly -> { - ProcessResult r = model.as(ProcessResult.class).orElseThrow(); - softly.assertThat(r.stderr()).isEqualTo("Error: This is a test error"); - softly.assertThat(r.stdout()).isEqualTo("logged before the 'throw' statement"); - softly.assertThat(r.code()).isEqualTo(0); - }); + assertThatThrownBy(() -> appl.workflowDefinition(workflow).instance(Map.of()).start().join()) + .hasCauseInstanceOf(WorkflowException.class) + .hasMessageContaining("test error"); } }