Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/vmcp/composer/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type WorkflowDefinition struct {
// - .params.* - Input parameters
// - .steps.*.output - Step outputs
// - .steps.*.status - Step status
// - .vars.* - Workflow variables
// - .workflow.* - Workflow metadata (id, duration, timestamps)
//
// The template must produce valid JSON. If omitted, defaults to returning
Expand Down Expand Up @@ -324,6 +325,21 @@ type TemplateExpander interface {

// EvaluateCondition evaluates a condition template to a boolean.
EvaluateCondition(ctx context.Context, condition string, workflowCtx *WorkflowContext) (bool, error)

// ExpandOutputFormat evaluates the output_format template with workflow context.
// Returns a map representing the expanded JSON output structure.
// The template has access to:
// - .params.* - Input parameters
// - .steps.*.output - Step outputs
// - .steps.*.status - Step status
// - .vars.* - Workflow variables
// - .workflow.* - Workflow metadata (id, duration, timestamps, step_count)
ExpandOutputFormat(
ctx context.Context,
outputFormatTemplate string,
workflowCtx *WorkflowContext,
startTime, endTime int64,
) (map[string]any, error)
}

// WorkflowContext contains the execution context for a workflow.
Expand Down
75 changes: 75 additions & 0 deletions pkg/vmcp/composer/template_expander.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,81 @@ func (e *defaultTemplateExpander) EvaluateCondition(
}
}

// ExpandOutputFormat evaluates the output_format template with workflow context.
// Returns a map representing the expanded JSON output structure.
func (e *defaultTemplateExpander) ExpandOutputFormat(
ctx context.Context,
outputFormatTemplate string,
workflowCtx *WorkflowContext,
startTime, endTime int64,
) (map[string]any, error) {
// Check context cancellation before expensive operations
if err := ctx.Err(); err != nil {
return nil, fmt.Errorf("context cancelled before output format expansion: %w", err)
}

// Build template context with params, steps, vars, and workflow metadata
tmplCtx := map[string]any{
"params": workflowCtx.Params,
"steps": e.buildStepsContext(workflowCtx),
"vars": workflowCtx.Variables,
"workflow": e.buildWorkflowMetadata(workflowCtx, startTime, endTime),
}

// Parse template
tmpl, err := template.New("output").Funcs(e.funcMap).Parse(outputFormatTemplate)
if err != nil {
return nil, fmt.Errorf("invalid output format template: %w", err)
}

// Execute template to string
var buf bytes.Buffer
buf.Grow(4096) // Pre-allocate reasonable buffer size

if err := tmpl.Execute(&buf, tmplCtx); err != nil {
return nil, fmt.Errorf("output format expansion failed: %w", err)
}

// Enforce output size limit
if buf.Len() > maxTemplateOutputSize {
return nil, fmt.Errorf("output format result too large: %d bytes (max %d)",
buf.Len(), maxTemplateOutputSize)
}

// Parse JSON result
var output map[string]any
if err := json.Unmarshal(buf.Bytes(), &output); err != nil {
return nil, fmt.Errorf("output format must produce valid JSON: %w", err)
}

return output, nil
}

// buildWorkflowMetadata creates workflow metadata for template expansion.
func (*defaultTemplateExpander) buildWorkflowMetadata(
workflowCtx *WorkflowContext,
startTime, endTime int64,
) map[string]any {
// Acquire read lock to safely access Steps map
workflowCtx.mu.RLock()
stepCount := len(workflowCtx.Steps)
workflowCtx.mu.RUnlock()

// Calculate duration in milliseconds
durationMs := int64(0)
if endTime > 0 && startTime > 0 && endTime >= startTime {
durationMs = endTime - startTime
}

return map[string]any{
"id": workflowCtx.WorkflowID,
"step_count": stepCount,
"duration_ms": durationMs,
"start_time": startTime,
"end_time": endTime,
}
}

// jsonEncode is a template function that encodes a value as JSON.
func jsonEncode(v any) (string, error) {
b, err := json.Marshal(v)
Expand Down
259 changes: 259 additions & 0 deletions pkg/vmcp/composer/template_expander_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package composer

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -218,3 +219,261 @@ func TestWorkflowContext_Clone(t *testing.T) {
assert.NotEqual(t, original.Params, clone.Params)
assert.NotEqual(t, len(original.Steps), len(clone.Steps))
}

func TestTemplateExpander_ExpandOutputFormat(t *testing.T) {
t.Parallel()

expander := NewTemplateExpander()
startTime := time.Now().UnixMilli()
endTime := startTime + 1500 // 1.5 seconds later

tests := []struct {
name string
template string
params map[string]any
steps map[string]*StepResult
workflowID string
expectedFields map[string]any // Fields to check in output
wantErr bool
errContains string
}{
{
name: "simple step output aggregation",
template: `{"logs": {{json .steps.fetch_logs.output}}, "metrics": {{json .steps.fetch_metrics.output}}}`,
steps: map[string]*StepResult{
"fetch_logs": {Status: StepStatusCompleted, Output: map[string]any{"count": 100}},
"fetch_metrics": {Status: StepStatusCompleted, Output: map[string]any{"cpu": "50%"}},
},
expectedFields: map[string]any{
"logs": map[string]any{"count": float64(100)}, // JSON unmarshal converts to float64
"metrics": map[string]any{"cpu": "50%"},
},
},
{
name: "with workflow metadata",
template: `{
"data": {{json .steps.fetch_data.output}},
"metadata": {
"workflow_id": "{{.workflow.id}}",
"duration_ms": {{.workflow.duration_ms}},
"step_count": {{.workflow.step_count}}
}
}`,
workflowID: "test-wf-123",
steps: map[string]*StepResult{
"fetch_data": {Status: StepStatusCompleted, Output: map[string]any{"result": "ok"}},
},
expectedFields: map[string]any{
"data": map[string]any{"result": "ok"},
"metadata": map[string]any{
"workflow_id": "test-wf-123",
"duration_ms": float64(1500),
"step_count": float64(1),
},
},
},
{
name: "with parameters",
template: `{
"incident_id": "{{.params.incident_id}}",
"data": {{json .steps.fetch.output}}
}`,
params: map[string]any{"incident_id": "INC-12345"},
steps: map[string]*StepResult{
"fetch": {Status: StepStatusCompleted, Output: map[string]any{"status": "resolved"}},
},
expectedFields: map[string]any{
"incident_id": "INC-12345",
"data": map[string]any{"status": "resolved"},
},
},
{
name: "multi-step with status",
template: `{
"results": {
"step1": {
"status": "{{.steps.step1.status}}",
"output": {{json .steps.step1.output}}
},
"step2": {
"status": "{{.steps.step2.status}}",
"output": {{json .steps.step2.output}}
}
}
}`,
steps: map[string]*StepResult{
"step1": {Status: StepStatusCompleted, Output: map[string]any{"a": 1}},
"step2": {Status: StepStatusCompleted, Output: map[string]any{"b": 2}},
},
expectedFields: map[string]any{
"results": map[string]any{
"step1": map[string]any{
"status": "completed",
"output": map[string]any{"a": float64(1)},
},
"step2": map[string]any{
"status": "completed",
"output": map[string]any{"b": float64(2)},
},
},
},
},
{
name: "nested data structures",
template: `{
"pages": {
"overview": {{json .steps.fetch_overview.output}},
"details": {{json .steps.fetch_details.output}}
},
"summary": {
"total_pages": 2,
"completed_at": {{.workflow.end_time}}
}
}`,
steps: map[string]*StepResult{
"fetch_overview": {
Status: StepStatusCompleted,
Output: map[string]any{"title": "Overview", "content": "..."},
},
"fetch_details": {
Status: StepStatusCompleted,
Output: map[string]any{"title": "Details", "sections": []any{"intro", "body"}},
},
},
expectedFields: map[string]any{
"pages": map[string]any{
"overview": map[string]any{"title": "Overview", "content": "..."},
"details": map[string]any{"title": "Details", "sections": []any{"intro", "body"}},
},
"summary": map[string]any{
"total_pages": float64(2),
"completed_at": float64(endTime),
},
},
},
{
name: "invalid template syntax",
template: `{"data": {{.steps.fetch.output}`,
wantErr: true,
errContains: "invalid output format template",
},
{
name: "template references missing field",
template: `{"data": {{.nonexistent.field}}}`,
wantErr: true,
errContains: "output format must produce valid JSON",
},
{
name: "non-JSON output",
template: `This is not JSON`,
wantErr: true,
errContains: "output format must produce valid JSON",
},
{
name: "invalid JSON structure",
template: `{"unclosed": "bracket"`,
wantErr: true,
errContains: "output format must produce valid JSON",
},
{
name: "empty steps",
template: `{
"workflow_id": "{{.workflow.id}}",
"step_count": {{.workflow.step_count}}
}`,
workflowID: "empty-wf",
steps: map[string]*StepResult{},
expectedFields: map[string]any{
"workflow_id": "empty-wf",
"step_count": float64(0),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx := newWorkflowContext(tt.params)
if tt.workflowID != "" {
ctx.WorkflowID = tt.workflowID
} else {
ctx.WorkflowID = "test-workflow"
}
if tt.steps != nil {
ctx.Steps = tt.steps
}

result, err := expander.ExpandOutputFormat(context.Background(), tt.template, ctx, startTime, endTime)

if tt.wantErr {
require.Error(t, err)
if tt.errContains != "" {
assert.Contains(t, err.Error(), tt.errContains)
}
return
}

require.NoError(t, err)
require.NotNil(t, result)

// Verify expected fields
for key, expectedValue := range tt.expectedFields {
actualValue, exists := result[key]
require.True(t, exists, "expected key %q not found in result", key)
assert.Equal(t, expectedValue, actualValue, "mismatch for key %q", key)
}
})
}
}

func TestTemplateExpander_ExpandOutputFormat_SizeLimits(t *testing.T) {
t.Parallel()

expander := NewTemplateExpander()
ctx := newWorkflowContext(nil)
ctx.WorkflowID = "test"

// Create a large step output that will exceed 10MB size limit
// Each entry is ~100 bytes, so we need >100k entries
largeString := make([]byte, 100)
for i := range largeString {
largeString[i] = 'x'
}

largeData := make(map[string]any)
for i := 0; i < 120000; i++ { // 120k entries * ~100 bytes > 10MB
largeData[fmt.Sprintf("key_%d", i)] = string(largeString)
}

ctx.Steps = map[string]*StepResult{
"large_step": {Status: StepStatusCompleted, Output: largeData},
}

template := `{"data": {{json .steps.large_step.output}}}`

_, err := expander.ExpandOutputFormat(context.Background(), template, ctx, 0, 0)
require.Error(t, err)
assert.Contains(t, err.Error(), "too large")
}

func TestTemplateExpander_ExpandOutputFormat_ContextCancellation(t *testing.T) {
t.Parallel()

expander := NewTemplateExpander()
ctx := newWorkflowContext(nil)
ctx.WorkflowID = "test"
ctx.Steps = map[string]*StepResult{
"step1": {Status: StepStatusCompleted, Output: map[string]any{"result": "ok"}},
}

// Create cancelled context
cancelledCtx, cancel := context.WithCancel(context.Background())
cancel()

template := `{"data": {{json .steps.step1.output}}}`

_, err := expander.ExpandOutputFormat(cancelledCtx, template, ctx, 0, 0)
require.Error(t, err)
assert.Contains(t, err.Error(), "context cancelled")
}
Loading
Loading