Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

public class WorkflowMutableInstance implements WorkflowInstance {

Expand All @@ -44,6 +45,8 @@ public class WorkflowMutableInstance implements WorkflowInstance {
protected final WorkflowContext workflowContext;
protected Instant startedAt;

protected final Map<String, Object> additionalObjects = new ConcurrentHashMap<String, Object>();

protected AtomicReference<CompletableFuture<WorkflowModel>> futureRef = new AtomicReference<>();
protected Instant completedAt;

Expand Down Expand Up @@ -279,4 +282,8 @@ public boolean cancel() {
}

public void restoreContext(WorkflowContext workflow, TaskContext context) {}

public <T> T additionalObject(String key, Supplier<T> supplier) {
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;

public abstract class AbstractConfigManager implements ConfigManager {
Expand Down Expand Up @@ -56,5 +59,19 @@ protected <T> T convert(String value, Class<T> propClass) {
return propClass.cast(result);
}

@Override
public <T> Collection<T> multiConfig(String propName, Class<T> propClass) {
String multiValue = get(propName);
if (multiValue != null) {
Collection<T> result = new ArrayList<>();
for (String value : multiValue.split(",")) {
result.add(convert(value, propClass));
}
return result;
} else {
return Collections.emptyList();
}
}

protected abstract <T> T convertComplex(String value, Class<T> propClass);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
package io.serverlessworkflow.impl.config;

import io.serverlessworkflow.impl.ServicePriority;
import java.util.Collection;
import java.util.Optional;

public interface ConfigManager extends ServicePriority {

<T> Optional<T> config(String propName, Class<T> propClass);

<T> Collection<T> multiConfig(String propName, Class<T> propClass);

Iterable<String> names();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.scripts;

import io.serverlessworkflow.api.types.RunTaskConfiguration;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowError;
import io.serverlessworkflow.impl.WorkflowException;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowModelFactory;
import io.serverlessworkflow.impl.executors.ProcessResult;
import java.io.ByteArrayOutputStream;

public abstract class AbstractScriptRunner implements ScriptRunner {

@Override
public WorkflowModel runScript(
ScriptContext scriptContext,
WorkflowContext workflowContext,
TaskContext taskContext,
WorkflowModel input) {
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
try {
runScript(scriptContext, stdout, stderr, workflowContext, taskContext);
return scriptContext
.returnType()
.map(
type ->
modelFromOutput(
type,
workflowContext.definition().application().modelFactory(),
stdout,
stderr))
.orElse(input);
} catch (Exception ex) {
throw new WorkflowException(WorkflowError.runtime(taskContext, ex).build());
}
}

protected abstract void runScript(
ScriptContext scriptContext,
ByteArrayOutputStream stdout,
ByteArrayOutputStream stderr,
WorkflowContext workflowContext,
TaskContext taskContext);

protected WorkflowModel modelFromOutput(
RunTaskConfiguration.ProcessReturnType returnType,
WorkflowModelFactory modelFactory,
ByteArrayOutputStream stdout,
ByteArrayOutputStream stderr) {
return switch (returnType) {
case ALL -> modelFactory.fromAny(new ProcessResult(0, toString(stdout), toString(stderr)));
case NONE -> modelFactory.fromNull();
case CODE -> modelFactory.from(0);
case STDOUT -> modelFactory.from(toString(stdout));
case STDERR -> modelFactory.from(toString(stderr));
};
}

private String toString(ByteArrayOutputStream stream) {
return stream.toString().trim();
}
}
12 changes: 12 additions & 0 deletions impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<version.com.cronutils>9.2.1</version.com.cronutils>
<version.docker.java>3.7.0</version.docker.java>
<version.org.graalvm.polyglot>25.0.1</version.org.graalvm.polyglot>
<version.black.ninia>4.2.0</version.black.ninia>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -142,6 +143,11 @@
<artifactId>serverlessworkflow-impl-script-js</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-impl-script-python</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
Expand All @@ -168,6 +174,11 @@
<artifactId>polyglot</artifactId>
<version>${version.org.graalvm.polyglot}</version>
</dependency>
<dependency>
<groupId>black.ninia</groupId>
<artifactId>jep</artifactId>
<version>${version.black.ninia}</version>
</dependency>
</dependencies>
</dependencyManagement>
<modules>
Expand All @@ -187,5 +198,6 @@
<module>container</module>
<module>test</module>
<module>script-js</module>
<module>python</module>
</modules>
</project>
20 changes: 20 additions & 0 deletions impl/python/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-impl</artifactId>
<version>8.0.0-SNAPSHOT</version>
</parent>
<artifactId>serverlessworkflow-impl-script-python</artifactId>
<name>Serverless Workflow :: Impl :: Script Python</name>
<dependencies>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-impl-core</artifactId>
</dependency>
<dependency>
<groupId>black.ninia</groupId>
<artifactId>jep</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.script.python;

import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.config.ConfigManager;
import io.serverlessworkflow.impl.scripts.AbstractScriptRunner;
import io.serverlessworkflow.impl.scripts.ScriptContext;
import io.serverlessworkflow.impl.scripts.ScriptLanguageId;
import java.io.ByteArrayOutputStream;
import java.util.Collection;
import jep.Interpreter;
import jep.SharedInterpreter;

public class PythonScriptTaskRunner extends AbstractScriptRunner {

@Override
public ScriptLanguageId identifier() {
return ScriptLanguageId.PYTHON;
}

private static final String PYTHON_SYS_PATH = "sys.path.append('%s')\n";
private static final String SEARCH_PATH_PROPERTY = "io.serverlessworkflow.impl.";

@Override
protected void runScript(
ScriptContext scriptContext,
ByteArrayOutputStream stdout,
ByteArrayOutputStream stderr,
WorkflowContext workflowContext,
TaskContext taskContext) {
Interpreter py =
workflowContext
.instance()
.additionalObject(
"pyInterpreter",
() -> interpreter(workflowContext.definition().application().configManager()));
scriptContext.args().forEach(py::set);
py.exec(scriptContext.code());
}

protected Interpreter interpreter(ConfigManager configManager) {
Interpreter py = new SharedInterpreter();
Collection<String> searchPaths = configManager.multiConfig(SEARCH_PATH_PROPERTY, String.class);
if (!searchPaths.isEmpty()) {
StringBuilder sb = new StringBuilder("import sys\n");
searchPaths.forEach(path -> sb.append(String.format(PYTHON_SYS_PATH, path)));
py.exec(sb.toString());
}
return py;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.serverlessworkflow.impl.executors.script.python.PythonScriptTaskRunner
Loading
Loading