Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import io.serverlessworkflow.api.types.Task;
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
import java.util.function.Function;

public interface FunctionReader extends Function<ExternalResourceHandler, Task> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

public interface NamedObject {
String name();
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class WorkflowApplication implements AutoCloseable {
private final SecretManager secretManager;
private final SchedulerListener schedulerListener;
private final Optional<URITemplateResolver> templateResolver;
private final Optional<FunctionReader> functionReader;

private WorkflowApplication(Builder builder) {
this.taskFactory = builder.taskFactory;
Expand All @@ -98,6 +99,7 @@ private WorkflowApplication(Builder builder) {
this.configManager = builder.configManager;
this.secretManager = builder.secretManager;
this.templateResolver = builder.templateResolver;
this.functionReader = builder.functionReader;
}

public TaskExecutorFactory taskFactory() {
Expand Down Expand Up @@ -178,6 +180,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
private ConfigManager configManager;
private SchedulerListener schedulerListener;
private Optional<URITemplateResolver> templateResolver;
private Optional<FunctionReader> functionReader;

private Builder() {
ServiceLoader.load(NamedWorkflowAdditionalObject.class)
Expand Down Expand Up @@ -329,6 +332,7 @@ public WorkflowApplication build() {
.orElseGet(() -> new ConfigSecretManager(configManager));
}
templateResolver = ServiceLoader.load(URITemplateResolver.class).findFirst();
functionReader = ServiceLoader.load(FunctionReader.class).findFirst();
return new WorkflowApplication(this);
}
}
Expand Down Expand Up @@ -406,6 +410,10 @@ public Optional<URITemplateResolver> templateResolver() {
return templateResolver;
}

public Optional<FunctionReader> functionReader() {
return functionReader;
}

public <T> Optional<T> additionalObject(
String name, WorkflowContext workflowContext, TaskContext taskContext) {
return Optional.ofNullable(additionalObjects.get(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,8 @@ public static Optional<SchemaValidator> getSchemaValidator(
return Optional.of(validatorFactory.getValidator(schema.getSchemaInline()));
} else if (schema.getSchemaExternal() != null) {
return Optional.of(
resourceLoader.load(
schema.getSchemaExternal().getResource(),
validatorFactory::getValidator,
null,
null,
null));
resourceLoader.loadStatic(
schema.getSchemaExternal().getResource(), validatorFactory::getValidator));
}
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.serverlessworkflow.impl.additional;

public interface NamedWorkflowAdditionalObject<T> extends WorkflowAdditionalObject<T> {
String name();
}
import io.serverlessworkflow.impl.NamedObject;

public interface NamedWorkflowAdditionalObject<T>
extends WorkflowAdditionalObject<T>, NamedObject {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@
package io.serverlessworkflow.impl.executors;

import io.serverlessworkflow.api.types.CallFunction;
import io.serverlessworkflow.api.types.Catalog;
import io.serverlessworkflow.api.types.FunctionArguments;
import io.serverlessworkflow.api.types.Task;
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.Use;
import io.serverlessworkflow.api.types.UseCatalogs;
import io.serverlessworkflow.api.types.UseFunctions;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.WorkflowUtils;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.net.URI;
import java.util.Map;
import java.util.Optional;

Expand All @@ -35,25 +42,71 @@ public class CallFunctionExecutor implements CallableTaskBuilder<CallFunction> {
public void init(
CallFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) {
String functionName = task.getCall();
Use use = definition.workflow().getUse();
Task function = null;
if (use != null) {
UseFunctions functions = use.getFunctions();
if (functions != null) {
function = functions.getAdditionalProperties().get(functionName);
}
if (function == null) {
int indexOf = functionName.indexOf('@');
if (indexOf > 0) {
String catalogName = functionName.substring(indexOf + 1);
UseCatalogs catalogs = use.getCatalogs();
if (catalogs != null) {
Catalog catalog = catalogs.getAdditionalProperties().get(catalogName);
ResourceLoader loader = definition.resourceLoader();
function =
definition
.resourceLoader()
.loadURI(
WorkflowUtils.concatURI(
loader.uri(catalog.getEndpoint()),
pathFromFunctionName(functionName.substring(0, indexOf))),
h -> from(definition, h));
}
}
}
}
if (function == null) {
function =
definition.resourceLoader().loadURI(URI.create(functionName), h -> from(definition, h));
}
executorBuilder =
definition.application().taskFactory().getTaskExecutor(position, function, definition);
FunctionArguments functionArgs = task.getWith();
args =
functionArgs != null
? WorkflowUtils.buildMapResolver(
definition.application(), functionArgs.getAdditionalProperties())
: (w, t, m) -> Map.of();
Task function = null;
if (definition.workflow().getUse() != null
&& definition.workflow().getUse().getFunctions() != null
&& definition.workflow().getUse().getFunctions().getAdditionalProperties() != null) {
function =
definition.workflow().getUse().getFunctions().getAdditionalProperties().get(functionName);
}
if (function == null) {
// TODO search in catalog
throw new UnsupportedOperationException("Function Catalog not supported yet");
}

private String pathFromFunctionName(String functionName) {
int sep = functionName.indexOf(":");
if (sep < 0) {
throw new IllegalArgumentException(
"Invalid function name "
+ functionName
+ ". It has to be of the format <function name>:<function version>");
}
executorBuilder =
definition.application().taskFactory().getTaskExecutor(position, function, definition);
StringBuilder sb = new StringBuilder(functionName);
sb.setCharAt(sep, '/');
sb.insert(0, "main/functions/");
sb.append("/function.yaml");
return sb.toString();
}

private Task from(WorkflowDefinition definition, ExternalResourceHandler handler) {
return definition
.application()
.functionReader()
.map(v -> v.apply(handler))
.orElseThrow(
() ->
new IllegalStateException(
"No converter from external resource to function found. Make sure a dependency that includes an implementation of FunctionReader is included"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,7 @@
*/
package io.serverlessworkflow.impl.resources;

import static io.serverlessworkflow.impl.WorkflowUtils.getURISupplier;

import io.serverlessworkflow.api.types.Endpoint;
import io.serverlessworkflow.api.types.EndpointUri;
import io.serverlessworkflow.api.types.ExternalResource;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
Expand All @@ -35,18 +25,31 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

public class DefaultResourceLoader implements ResourceLoader {
public class DefaultResourceLoader extends ResourceLoader {

private final Optional<Path> workflowPath;
private final WorkflowApplication application;

private Map<ExternalResourceHandler, CachedResource> resourceCache = new ConcurrentHashMap<>();

protected DefaultResourceLoader(WorkflowApplication application, Path workflowPath) {
this.application = application;
super(application);
this.workflowPath = Optional.ofNullable(workflowPath);
}

@Override
public <T> T loadURI(URI uri, Function<ExternalResourceHandler, T> function) {
ExternalResourceHandler resourceHandler = buildFromURI(uri);
return (T)
resourceCache
.compute(
resourceHandler,
(k, v) ->
v == null || k.shouldReload(v.lastReload())
? new CachedResource(Instant.now(), function.apply(k))
: v)
.content();
}

private ExternalResourceHandler fileResource(String pathStr) {
Path path = Path.of(pathStr);
if (path.isAbsolute()) {
Expand Down Expand Up @@ -74,66 +77,6 @@ private ExternalResourceHandler buildFromURI(URI uri) {
}

@Override
public <T> T load(
ExternalResource resource,
Function<ExternalResourceHandler, T> function,
WorkflowContext workflowContext,
TaskContext taskContext,
WorkflowModel model) {
ExternalResourceHandler resourceHandler =
buildFromURI(
uriSupplier(resource.getEndpoint())
.apply(
workflowContext,
taskContext,
model == null ? application.modelFactory().fromNull() : model));
return (T)
resourceCache
.compute(
resourceHandler,
(k, v) ->
v == null || k.shouldReload(v.lastReload())
? new CachedResource(Instant.now(), function.apply(k))
: v)
.content();
}

@Override
public WorkflowValueResolver<URI> uriSupplier(Endpoint endpoint) {
if (endpoint.getEndpointConfiguration() != null) {
EndpointUri uri = endpoint.getEndpointConfiguration().getUri();
if (uri.getLiteralEndpointURI() != null) {
return getURISupplier(application, uri.getLiteralEndpointURI());
} else if (uri.getExpressionEndpointURI() != null) {
return new ExpressionURISupplier(
application
.expressionFactory()
.resolveString(ExpressionDescriptor.from(uri.getExpressionEndpointURI())));
}
} else if (endpoint.getRuntimeExpression() != null) {
return new ExpressionURISupplier(
application
.expressionFactory()
.resolveString(ExpressionDescriptor.from(endpoint.getRuntimeExpression())));
} else if (endpoint.getUriTemplate() != null) {
return getURISupplier(application, endpoint.getUriTemplate());
}
throw new IllegalArgumentException("Invalid endpoint definition " + endpoint);
}

private class ExpressionURISupplier implements WorkflowValueResolver<URI> {
private WorkflowValueResolver<String> expr;

public ExpressionURISupplier(WorkflowValueResolver<String> expr) {
this.expr = expr;
}

@Override
public URI apply(WorkflowContext workflow, TaskContext task, WorkflowModel node) {
return URI.create(expr.apply(workflow, task, node));
}
}

public void close() {
resourceCache.clear();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.resources;

import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;

public class GitHubHelper {

private GitHubHelper() {}

private static final String BLOB = "blob/";

public static URL handleURL(URL url) {
if (url.getHost().equals("github.com")) {
try {
String path = url.getPath();
if (path.startsWith(BLOB)) {
path = path.substring(BLOB.length());
}
return new URL(url.getProtocol(), "raw.githubusercontent.com", url.getPort(), path);
} catch (MalformedURLException e) {
throw new UncheckedIOException(e);
}
} else {
return url;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class HttpResource implements ExternalResourceHandler {
private URL url;

public HttpResource(URL url) {
this.url = url;
this.url = GitHubHelper.handleURL(url);
}

@Override
Expand Down
Loading