diff --git a/pkg/vmcp/composer/composer.go b/pkg/vmcp/composer/composer.go index febdfafe2..60c5c7299 100644 --- a/pkg/vmcp/composer/composer.go +++ b/pkg/vmcp/composer/composer.go @@ -64,6 +64,7 @@ type WorkflowDefinition struct { // - .params.* - Input parameters // - .steps.*.output - Step outputs // - .steps.*.status - Step status + // - .vars.* - Workflow variables // - .workflow.* - Workflow metadata (id, duration, timestamps) // // The template must produce valid JSON. If omitted, defaults to returning @@ -324,6 +325,21 @@ type TemplateExpander interface { // EvaluateCondition evaluates a condition template to a boolean. EvaluateCondition(ctx context.Context, condition string, workflowCtx *WorkflowContext) (bool, error) + + // ExpandOutputFormat evaluates the output_format template with workflow context. + // Returns a map representing the expanded JSON output structure. + // The template has access to: + // - .params.* - Input parameters + // - .steps.*.output - Step outputs + // - .steps.*.status - Step status + // - .vars.* - Workflow variables + // - .workflow.* - Workflow metadata (id, duration, timestamps, step_count) + ExpandOutputFormat( + ctx context.Context, + outputFormatTemplate string, + workflowCtx *WorkflowContext, + startTime, endTime int64, + ) (map[string]any, error) } // WorkflowContext contains the execution context for a workflow. diff --git a/pkg/vmcp/composer/template_expander.go b/pkg/vmcp/composer/template_expander.go index f55f2df39..74a7dc8ef 100644 --- a/pkg/vmcp/composer/template_expander.go +++ b/pkg/vmcp/composer/template_expander.go @@ -215,6 +215,81 @@ func (e *defaultTemplateExpander) EvaluateCondition( } } +// ExpandOutputFormat evaluates the output_format template with workflow context. +// Returns a map representing the expanded JSON output structure. +func (e *defaultTemplateExpander) ExpandOutputFormat( + ctx context.Context, + outputFormatTemplate string, + workflowCtx *WorkflowContext, + startTime, endTime int64, +) (map[string]any, error) { + // Check context cancellation before expensive operations + if err := ctx.Err(); err != nil { + return nil, fmt.Errorf("context cancelled before output format expansion: %w", err) + } + + // Build template context with params, steps, vars, and workflow metadata + tmplCtx := map[string]any{ + "params": workflowCtx.Params, + "steps": e.buildStepsContext(workflowCtx), + "vars": workflowCtx.Variables, + "workflow": e.buildWorkflowMetadata(workflowCtx, startTime, endTime), + } + + // Parse template + tmpl, err := template.New("output").Funcs(e.funcMap).Parse(outputFormatTemplate) + if err != nil { + return nil, fmt.Errorf("invalid output format template: %w", err) + } + + // Execute template to string + var buf bytes.Buffer + buf.Grow(4096) // Pre-allocate reasonable buffer size + + if err := tmpl.Execute(&buf, tmplCtx); err != nil { + return nil, fmt.Errorf("output format expansion failed: %w", err) + } + + // Enforce output size limit + if buf.Len() > maxTemplateOutputSize { + return nil, fmt.Errorf("output format result too large: %d bytes (max %d)", + buf.Len(), maxTemplateOutputSize) + } + + // Parse JSON result + var output map[string]any + if err := json.Unmarshal(buf.Bytes(), &output); err != nil { + return nil, fmt.Errorf("output format must produce valid JSON: %w", err) + } + + return output, nil +} + +// buildWorkflowMetadata creates workflow metadata for template expansion. +func (*defaultTemplateExpander) buildWorkflowMetadata( + workflowCtx *WorkflowContext, + startTime, endTime int64, +) map[string]any { + // Acquire read lock to safely access Steps map + workflowCtx.mu.RLock() + stepCount := len(workflowCtx.Steps) + workflowCtx.mu.RUnlock() + + // Calculate duration in milliseconds + durationMs := int64(0) + if endTime > 0 && startTime > 0 && endTime >= startTime { + durationMs = endTime - startTime + } + + return map[string]any{ + "id": workflowCtx.WorkflowID, + "step_count": stepCount, + "duration_ms": durationMs, + "start_time": startTime, + "end_time": endTime, + } +} + // jsonEncode is a template function that encodes a value as JSON. func jsonEncode(v any) (string, error) { b, err := json.Marshal(v) diff --git a/pkg/vmcp/composer/template_expander_test.go b/pkg/vmcp/composer/template_expander_test.go index ebbd055fb..03cbeaea2 100644 --- a/pkg/vmcp/composer/template_expander_test.go +++ b/pkg/vmcp/composer/template_expander_test.go @@ -2,6 +2,7 @@ package composer import ( "context" + "fmt" "testing" "time" @@ -218,3 +219,261 @@ func TestWorkflowContext_Clone(t *testing.T) { assert.NotEqual(t, original.Params, clone.Params) assert.NotEqual(t, len(original.Steps), len(clone.Steps)) } + +func TestTemplateExpander_ExpandOutputFormat(t *testing.T) { + t.Parallel() + + expander := NewTemplateExpander() + startTime := time.Now().UnixMilli() + endTime := startTime + 1500 // 1.5 seconds later + + tests := []struct { + name string + template string + params map[string]any + steps map[string]*StepResult + workflowID string + expectedFields map[string]any // Fields to check in output + wantErr bool + errContains string + }{ + { + name: "simple step output aggregation", + template: `{"logs": {{json .steps.fetch_logs.output}}, "metrics": {{json .steps.fetch_metrics.output}}}`, + steps: map[string]*StepResult{ + "fetch_logs": {Status: StepStatusCompleted, Output: map[string]any{"count": 100}}, + "fetch_metrics": {Status: StepStatusCompleted, Output: map[string]any{"cpu": "50%"}}, + }, + expectedFields: map[string]any{ + "logs": map[string]any{"count": float64(100)}, // JSON unmarshal converts to float64 + "metrics": map[string]any{"cpu": "50%"}, + }, + }, + { + name: "with workflow metadata", + template: `{ + "data": {{json .steps.fetch_data.output}}, + "metadata": { + "workflow_id": "{{.workflow.id}}", + "duration_ms": {{.workflow.duration_ms}}, + "step_count": {{.workflow.step_count}} + } + }`, + workflowID: "test-wf-123", + steps: map[string]*StepResult{ + "fetch_data": {Status: StepStatusCompleted, Output: map[string]any{"result": "ok"}}, + }, + expectedFields: map[string]any{ + "data": map[string]any{"result": "ok"}, + "metadata": map[string]any{ + "workflow_id": "test-wf-123", + "duration_ms": float64(1500), + "step_count": float64(1), + }, + }, + }, + { + name: "with parameters", + template: `{ + "incident_id": "{{.params.incident_id}}", + "data": {{json .steps.fetch.output}} + }`, + params: map[string]any{"incident_id": "INC-12345"}, + steps: map[string]*StepResult{ + "fetch": {Status: StepStatusCompleted, Output: map[string]any{"status": "resolved"}}, + }, + expectedFields: map[string]any{ + "incident_id": "INC-12345", + "data": map[string]any{"status": "resolved"}, + }, + }, + { + name: "multi-step with status", + template: `{ + "results": { + "step1": { + "status": "{{.steps.step1.status}}", + "output": {{json .steps.step1.output}} + }, + "step2": { + "status": "{{.steps.step2.status}}", + "output": {{json .steps.step2.output}} + } + } + }`, + steps: map[string]*StepResult{ + "step1": {Status: StepStatusCompleted, Output: map[string]any{"a": 1}}, + "step2": {Status: StepStatusCompleted, Output: map[string]any{"b": 2}}, + }, + expectedFields: map[string]any{ + "results": map[string]any{ + "step1": map[string]any{ + "status": "completed", + "output": map[string]any{"a": float64(1)}, + }, + "step2": map[string]any{ + "status": "completed", + "output": map[string]any{"b": float64(2)}, + }, + }, + }, + }, + { + name: "nested data structures", + template: `{ + "pages": { + "overview": {{json .steps.fetch_overview.output}}, + "details": {{json .steps.fetch_details.output}} + }, + "summary": { + "total_pages": 2, + "completed_at": {{.workflow.end_time}} + } + }`, + steps: map[string]*StepResult{ + "fetch_overview": { + Status: StepStatusCompleted, + Output: map[string]any{"title": "Overview", "content": "..."}, + }, + "fetch_details": { + Status: StepStatusCompleted, + Output: map[string]any{"title": "Details", "sections": []any{"intro", "body"}}, + }, + }, + expectedFields: map[string]any{ + "pages": map[string]any{ + "overview": map[string]any{"title": "Overview", "content": "..."}, + "details": map[string]any{"title": "Details", "sections": []any{"intro", "body"}}, + }, + "summary": map[string]any{ + "total_pages": float64(2), + "completed_at": float64(endTime), + }, + }, + }, + { + name: "invalid template syntax", + template: `{"data": {{.steps.fetch.output}`, + wantErr: true, + errContains: "invalid output format template", + }, + { + name: "template references missing field", + template: `{"data": {{.nonexistent.field}}}`, + wantErr: true, + errContains: "output format must produce valid JSON", + }, + { + name: "non-JSON output", + template: `This is not JSON`, + wantErr: true, + errContains: "output format must produce valid JSON", + }, + { + name: "invalid JSON structure", + template: `{"unclosed": "bracket"`, + wantErr: true, + errContains: "output format must produce valid JSON", + }, + { + name: "empty steps", + template: `{ + "workflow_id": "{{.workflow.id}}", + "step_count": {{.workflow.step_count}} + }`, + workflowID: "empty-wf", + steps: map[string]*StepResult{}, + expectedFields: map[string]any{ + "workflow_id": "empty-wf", + "step_count": float64(0), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctx := newWorkflowContext(tt.params) + if tt.workflowID != "" { + ctx.WorkflowID = tt.workflowID + } else { + ctx.WorkflowID = "test-workflow" + } + if tt.steps != nil { + ctx.Steps = tt.steps + } + + result, err := expander.ExpandOutputFormat(context.Background(), tt.template, ctx, startTime, endTime) + + if tt.wantErr { + require.Error(t, err) + if tt.errContains != "" { + assert.Contains(t, err.Error(), tt.errContains) + } + return + } + + require.NoError(t, err) + require.NotNil(t, result) + + // Verify expected fields + for key, expectedValue := range tt.expectedFields { + actualValue, exists := result[key] + require.True(t, exists, "expected key %q not found in result", key) + assert.Equal(t, expectedValue, actualValue, "mismatch for key %q", key) + } + }) + } +} + +func TestTemplateExpander_ExpandOutputFormat_SizeLimits(t *testing.T) { + t.Parallel() + + expander := NewTemplateExpander() + ctx := newWorkflowContext(nil) + ctx.WorkflowID = "test" + + // Create a large step output that will exceed 10MB size limit + // Each entry is ~100 bytes, so we need >100k entries + largeString := make([]byte, 100) + for i := range largeString { + largeString[i] = 'x' + } + + largeData := make(map[string]any) + for i := 0; i < 120000; i++ { // 120k entries * ~100 bytes > 10MB + largeData[fmt.Sprintf("key_%d", i)] = string(largeString) + } + + ctx.Steps = map[string]*StepResult{ + "large_step": {Status: StepStatusCompleted, Output: largeData}, + } + + template := `{"data": {{json .steps.large_step.output}}}` + + _, err := expander.ExpandOutputFormat(context.Background(), template, ctx, 0, 0) + require.Error(t, err) + assert.Contains(t, err.Error(), "too large") +} + +func TestTemplateExpander_ExpandOutputFormat_ContextCancellation(t *testing.T) { + t.Parallel() + + expander := NewTemplateExpander() + ctx := newWorkflowContext(nil) + ctx.WorkflowID = "test" + ctx.Steps = map[string]*StepResult{ + "step1": {Status: StepStatusCompleted, Output: map[string]any{"result": "ok"}}, + } + + // Create cancelled context + cancelledCtx, cancel := context.WithCancel(context.Background()) + cancel() + + template := `{"data": {{json .steps.step1.output}}}` + + _, err := expander.ExpandOutputFormat(cancelledCtx, template, ctx, 0, 0) + require.Error(t, err) + assert.Contains(t, err.Error(), "context cancelled") +} diff --git a/pkg/vmcp/composer/workflow_engine.go b/pkg/vmcp/composer/workflow_engine.go index 714c91e76..8e0a6148e 100644 --- a/pkg/vmcp/composer/workflow_engine.go +++ b/pkg/vmcp/composer/workflow_engine.go @@ -176,9 +176,9 @@ func (e *workflowEngine) ExecuteWorkflow( if e.stateStore != nil { finalState := e.buildWorkflowStatus(workflowCtx, WorkflowStatusTimedOut) finalState.StartTime = result.StartTime - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + saveCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _ = e.stateStore.SaveState(ctx, workflowCtx.WorkflowID, finalState) + _ = e.stateStore.SaveState(saveCtx, workflowCtx.WorkflowID, finalState) } logger.Warnf("Workflow %s timed out after %v", def.Name, result.Duration) @@ -195,9 +195,10 @@ func (e *workflowEngine) ExecuteWorkflow( if e.stateStore != nil { finalState := e.buildWorkflowStatus(workflowCtx, WorkflowStatusFailed) finalState.StartTime = result.StartTime - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + finalState.LastUpdateTime = result.EndTime + saveCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _ = e.stateStore.SaveState(ctx, workflowCtx.WorkflowID, finalState) + _ = e.stateStore.SaveState(saveCtx, workflowCtx.WorkflowID, finalState) } return result, result.Error @@ -205,17 +206,50 @@ func (e *workflowEngine) ExecuteWorkflow( // Workflow completed successfully result.Status = WorkflowStatusCompleted - result.Output = workflowCtx.GetLastStepOutput() result.EndTime = time.Now() result.Duration = result.EndTime.Sub(result.StartTime) + // Determine workflow output based on OutputFormat configuration + if def.OutputFormat != "" { + // Use custom output format template + expandedOutput, err := e.templateExpander.ExpandOutputFormat( + ctx, // Use original context, not execCtx which may be cancelled + def.OutputFormat, + workflowCtx, + result.StartTime.UnixMilli(), + result.EndTime.UnixMilli(), + ) + if err != nil { + // Output format expansion failed - treat as workflow failure + result.Status = WorkflowStatusFailed + result.Error = fmt.Errorf("output format expansion failed: %w", err) + logger.Errorf("Workflow %s output format expansion failed: %v", def.Name, err) + + // Save failure state + if e.stateStore != nil { + finalState := e.buildWorkflowStatus(workflowCtx, WorkflowStatusFailed) + finalState.StartTime = result.StartTime + finalState.LastUpdateTime = result.EndTime + saveCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = e.stateStore.SaveState(saveCtx, workflowCtx.WorkflowID, finalState) + } + + return result, result.Error + } + result.Output = expandedOutput + } else { + // Default behavior: return last step output (backward compatible) + result.Output = workflowCtx.GetLastStepOutput() + } + // Save final workflow state if e.stateStore != nil { finalState := e.buildWorkflowStatus(workflowCtx, WorkflowStatusCompleted) finalState.StartTime = result.StartTime - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + saveCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := e.stateStore.SaveState(ctx, workflowCtx.WorkflowID, finalState); err != nil { + if err := e.stateStore.SaveState(saveCtx, workflowCtx.WorkflowID, finalState); err != nil { logger.Warnf("Failed to save final workflow state: %v", err) } } diff --git a/pkg/vmcp/composer/workflow_engine_test.go b/pkg/vmcp/composer/workflow_engine_test.go index f826987bd..7db41e4b0 100644 --- a/pkg/vmcp/composer/workflow_engine_test.go +++ b/pkg/vmcp/composer/workflow_engine_test.go @@ -421,3 +421,164 @@ func TestWorkflowEngine_ParallelExecution(t *testing.T) { "fetch_metrics should complete before create_report starts") } } + +func TestWorkflowEngine_ExecuteWorkflow_WithOutputFormat(t *testing.T) { + t.Parallel() + te := newTestEngine(t) + + // Workflow with output_format for aggregation + def := &WorkflowDefinition{ + Name: "aggregate-workflow", + Steps: []WorkflowStep{ + toolStep("fetch_logs", "logs.fetch", nil), + toolStep("fetch_metrics", "metrics.fetch", nil), + }, + OutputFormat: `{ + "logs": {{json .steps.fetch_logs.output}}, + "metrics": {{json .steps.fetch_metrics.output}}, + "workflow_id": "{{.workflow.id}}" + }`, + } + + // Set up expectations + te.expectToolCall("logs.fetch", nil, map[string]any{"count": 100}) + te.expectToolCall("metrics.fetch", nil, map[string]any{"cpu": "50%"}) + + result, err := execute(t, te.Engine, def, nil) + + require.NoError(t, err) + assert.Equal(t, WorkflowStatusCompleted, result.Status) + + // Verify aggregated output + require.NotNil(t, result.Output) + + // Check aggregated fields + logs, exists := result.Output["logs"] + require.True(t, exists, "logs field should exist") + assert.Equal(t, map[string]any{"count": float64(100)}, logs) + + metrics, exists := result.Output["metrics"] + require.True(t, exists, "metrics field should exist") + assert.Equal(t, map[string]any{"cpu": "50%"}, metrics) + + workflowID, exists := result.Output["workflow_id"] + require.True(t, exists, "workflow_id field should exist") + assert.NotEmpty(t, workflowID) +} + +func TestWorkflowEngine_ExecuteWorkflow_WithoutOutputFormat(t *testing.T) { + t.Parallel() + te := newTestEngine(t) + + // Workflow WITHOUT output_format (backward compatibility) + // Using sequential steps (step2 depends on step1) to ensure order + def := &WorkflowDefinition{ + Name: "backward-compat", + Steps: []WorkflowStep{ + toolStep("step1", "tool.one", nil), + toolStepWithDeps("step2", "tool.two", nil, []string{"step1"}), + }, + } + + te.expectToolCall("tool.one", nil, map[string]any{"result": "first"}) + te.expectToolCall("tool.two", nil, map[string]any{"result": "last"}) + + result, err := execute(t, te.Engine, def, nil) + + require.NoError(t, err) + assert.Equal(t, WorkflowStatusCompleted, result.Status) + + // Should return last step output (backward compatible) + // Since step2 depends on step1, step2 will always complete last + assert.Equal(t, map[string]any{"result": "last"}, result.Output) +} + +func TestWorkflowEngine_ExecuteWorkflow_OutputFormatWithMetadata(t *testing.T) { + t.Parallel() + te := newTestEngine(t) + + def := &WorkflowDefinition{ + Name: "metadata-workflow", + Steps: []WorkflowStep{ + toolStep("fetch_data", "data.fetch", nil), + }, + OutputFormat: `{ + "data": {{json .steps.fetch_data.output}}, + "metadata": { + "workflow_id": "{{.workflow.id}}", + "step_count": {{.workflow.step_count}}, + "duration_ms": {{.workflow.duration_ms}} + } + }`, + } + + te.expectToolCall("data.fetch", nil, map[string]any{"value": "test"}) + + result, err := execute(t, te.Engine, def, nil) + + require.NoError(t, err) + assert.Equal(t, WorkflowStatusCompleted, result.Status) + + // Check data field + data := result.Output["data"] + assert.Equal(t, map[string]any{"value": "test"}, data) + + // Check metadata + metadata, exists := result.Output["metadata"] + require.True(t, exists) + metadataMap, ok := metadata.(map[string]any) + require.True(t, ok) + + assert.NotEmpty(t, metadataMap["workflow_id"]) + assert.Equal(t, float64(1), metadataMap["step_count"]) + assert.GreaterOrEqual(t, metadataMap["duration_ms"], float64(0)) +} + +func TestWorkflowEngine_ExecuteWorkflow_OutputFormatInvalid(t *testing.T) { + t.Parallel() + te := newTestEngine(t) + + // Workflow with invalid output_format template + def := &WorkflowDefinition{ + Name: "invalid-format", + Steps: []WorkflowStep{ + toolStep("step1", "tool.one", nil), + }, + OutputFormat: `{"invalid json syntax`, + } + + te.expectToolCall("tool.one", nil, map[string]any{"result": "ok"}) + + result, err := execute(t, te.Engine, def, nil) + + // Should fail due to invalid output format + require.Error(t, err) + assert.Contains(t, err.Error(), "output format") + assert.Equal(t, WorkflowStatusFailed, result.Status) +} + +func TestWorkflowEngine_ExecuteWorkflow_OutputFormatWithParameters(t *testing.T) { + t.Parallel() + te := newTestEngine(t) + + def := &WorkflowDefinition{ + Name: "params-workflow", + Steps: []WorkflowStep{ + toolStep("fetch", "data.fetch", nil), + }, + OutputFormat: `{ + "incident_id": "{{.params.incident_id}}", + "data": {{json .steps.fetch.output}} + }`, + } + + te.expectToolCall("data.fetch", nil, map[string]any{"status": "resolved"}) + + result, err := execute(t, te.Engine, def, map[string]any{"incident_id": "INC-123"}) + + require.NoError(t, err) + assert.Equal(t, WorkflowStatusCompleted, result.Status) + + assert.Equal(t, "INC-123", result.Output["incident_id"]) + assert.Equal(t, map[string]any{"status": "resolved"}, result.Output["data"]) +}