Skip to content

Commit 5e19d8e

Browse files
committed
vmcp composition - Template Expander Enhancement
Successfully implemented the ExpandOutputFormat() method in the template expander to support output aggregation for multi-step vMCP composite tool workflows. Also integrate with the workflow
1 parent b2e7e32 commit 5e19d8e

File tree

5 files changed

+542
-1
lines changed

5 files changed

+542
-1
lines changed

pkg/vmcp/composer/composer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,20 @@ type TemplateExpander interface {
324324

325325
// EvaluateCondition evaluates a condition template to a boolean.
326326
EvaluateCondition(ctx context.Context, condition string, workflowCtx *WorkflowContext) (bool, error)
327+
328+
// ExpandOutputFormat evaluates the output_format template with workflow context.
329+
// Returns a map representing the expanded JSON output structure.
330+
// The template has access to:
331+
// - .params.* - Input parameters
332+
// - .steps.*.output - Step outputs
333+
// - .steps.*.status - Step status
334+
// - .workflow.* - Workflow metadata (id, duration, timestamps, step_count)
335+
ExpandOutputFormat(
336+
ctx context.Context,
337+
outputFormatTemplate string,
338+
workflowCtx *WorkflowContext,
339+
startTime, endTime int64,
340+
) (map[string]any, error)
327341
}
328342

329343
// WorkflowContext contains the execution context for a workflow.

pkg/vmcp/composer/template_expander.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,81 @@ func (e *defaultTemplateExpander) EvaluateCondition(
215215
}
216216
}
217217

218+
// ExpandOutputFormat evaluates the output_format template with workflow context.
219+
// Returns a map representing the expanded JSON output structure.
220+
func (e *defaultTemplateExpander) ExpandOutputFormat(
221+
ctx context.Context,
222+
outputFormatTemplate string,
223+
workflowCtx *WorkflowContext,
224+
startTime, endTime int64,
225+
) (map[string]any, error) {
226+
// Check context cancellation before expensive operations
227+
if err := ctx.Err(); err != nil {
228+
return nil, fmt.Errorf("context cancelled before output format expansion: %w", err)
229+
}
230+
231+
// Build template context with params, steps, and workflow metadata
232+
tmplCtx := map[string]any{
233+
"params": workflowCtx.Params,
234+
"steps": e.buildStepsContext(workflowCtx),
235+
"workflow": e.buildWorkflowMetadata(workflowCtx, startTime, endTime),
236+
}
237+
238+
// Parse template
239+
tmpl, err := template.New("output").Funcs(e.funcMap).Parse(outputFormatTemplate)
240+
if err != nil {
241+
return nil, fmt.Errorf("invalid output format template: %w", err)
242+
}
243+
244+
// Execute template to string
245+
var buf bytes.Buffer
246+
buf.Grow(4096) // Pre-allocate reasonable buffer size
247+
248+
if err := tmpl.Execute(&buf, tmplCtx); err != nil {
249+
return nil, fmt.Errorf("output format expansion failed: %w", err)
250+
}
251+
252+
// Enforce output size limit
253+
if buf.Len() > maxTemplateOutputSize {
254+
return nil, fmt.Errorf("output format result too large: %d bytes (max %d)",
255+
buf.Len(), maxTemplateOutputSize)
256+
}
257+
258+
// Parse JSON result
259+
var output map[string]any
260+
if err := json.Unmarshal(buf.Bytes(), &output); err != nil {
261+
return nil, fmt.Errorf("output format must produce valid JSON: %w (template output: %s)",
262+
err, buf.String())
263+
}
264+
265+
return output, nil
266+
}
267+
268+
// buildWorkflowMetadata creates workflow metadata for template expansion.
269+
func (*defaultTemplateExpander) buildWorkflowMetadata(
270+
workflowCtx *WorkflowContext,
271+
startTime, endTime int64,
272+
) map[string]any {
273+
// Acquire read lock to safely access Steps map
274+
workflowCtx.mu.RLock()
275+
stepCount := len(workflowCtx.Steps)
276+
workflowCtx.mu.RUnlock()
277+
278+
// Calculate duration in milliseconds
279+
durationMs := int64(0)
280+
if endTime > 0 && startTime > 0 {
281+
durationMs = endTime - startTime
282+
}
283+
284+
return map[string]any{
285+
"id": workflowCtx.WorkflowID,
286+
"step_count": stepCount,
287+
"duration_ms": durationMs,
288+
"start_time": startTime,
289+
"end_time": endTime,
290+
}
291+
}
292+
218293
// jsonEncode is a template function that encodes a value as JSON.
219294
func jsonEncode(v any) (string, error) {
220295
b, err := json.Marshal(v)

pkg/vmcp/composer/template_expander_test.go

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package composer
22

33
import (
44
"context"
5+
"fmt"
56
"testing"
67
"time"
78

@@ -218,3 +219,261 @@ func TestWorkflowContext_Clone(t *testing.T) {
218219
assert.NotEqual(t, original.Params, clone.Params)
219220
assert.NotEqual(t, len(original.Steps), len(clone.Steps))
220221
}
222+
223+
func TestTemplateExpander_ExpandOutputFormat(t *testing.T) {
224+
t.Parallel()
225+
226+
expander := NewTemplateExpander()
227+
startTime := time.Now().UnixMilli()
228+
endTime := startTime + 1500 // 1.5 seconds later
229+
230+
tests := []struct {
231+
name string
232+
template string
233+
params map[string]any
234+
steps map[string]*StepResult
235+
workflowID string
236+
expectedFields map[string]any // Fields to check in output
237+
wantErr bool
238+
errContains string
239+
}{
240+
{
241+
name: "simple step output aggregation",
242+
template: `{"logs": {{json .steps.fetch_logs.output}}, "metrics": {{json .steps.fetch_metrics.output}}}`,
243+
steps: map[string]*StepResult{
244+
"fetch_logs": {Status: StepStatusCompleted, Output: map[string]any{"count": 100}},
245+
"fetch_metrics": {Status: StepStatusCompleted, Output: map[string]any{"cpu": "50%"}},
246+
},
247+
expectedFields: map[string]any{
248+
"logs": map[string]any{"count": float64(100)}, // JSON unmarshal converts to float64
249+
"metrics": map[string]any{"cpu": "50%"},
250+
},
251+
},
252+
{
253+
name: "with workflow metadata",
254+
template: `{
255+
"data": {{json .steps.fetch_data.output}},
256+
"metadata": {
257+
"workflow_id": "{{.workflow.id}}",
258+
"duration_ms": {{.workflow.duration_ms}},
259+
"step_count": {{.workflow.step_count}}
260+
}
261+
}`,
262+
workflowID: "test-wf-123",
263+
steps: map[string]*StepResult{
264+
"fetch_data": {Status: StepStatusCompleted, Output: map[string]any{"result": "ok"}},
265+
},
266+
expectedFields: map[string]any{
267+
"data": map[string]any{"result": "ok"},
268+
"metadata": map[string]any{
269+
"workflow_id": "test-wf-123",
270+
"duration_ms": float64(1500),
271+
"step_count": float64(1),
272+
},
273+
},
274+
},
275+
{
276+
name: "with parameters",
277+
template: `{
278+
"incident_id": "{{.params.incident_id}}",
279+
"data": {{json .steps.fetch.output}}
280+
}`,
281+
params: map[string]any{"incident_id": "INC-12345"},
282+
steps: map[string]*StepResult{
283+
"fetch": {Status: StepStatusCompleted, Output: map[string]any{"status": "resolved"}},
284+
},
285+
expectedFields: map[string]any{
286+
"incident_id": "INC-12345",
287+
"data": map[string]any{"status": "resolved"},
288+
},
289+
},
290+
{
291+
name: "multi-step with status",
292+
template: `{
293+
"results": {
294+
"step1": {
295+
"status": "{{.steps.step1.status}}",
296+
"output": {{json .steps.step1.output}}
297+
},
298+
"step2": {
299+
"status": "{{.steps.step2.status}}",
300+
"output": {{json .steps.step2.output}}
301+
}
302+
}
303+
}`,
304+
steps: map[string]*StepResult{
305+
"step1": {Status: StepStatusCompleted, Output: map[string]any{"a": 1}},
306+
"step2": {Status: StepStatusCompleted, Output: map[string]any{"b": 2}},
307+
},
308+
expectedFields: map[string]any{
309+
"results": map[string]any{
310+
"step1": map[string]any{
311+
"status": "completed",
312+
"output": map[string]any{"a": float64(1)},
313+
},
314+
"step2": map[string]any{
315+
"status": "completed",
316+
"output": map[string]any{"b": float64(2)},
317+
},
318+
},
319+
},
320+
},
321+
{
322+
name: "nested data structures",
323+
template: `{
324+
"pages": {
325+
"overview": {{json .steps.fetch_overview.output}},
326+
"details": {{json .steps.fetch_details.output}}
327+
},
328+
"summary": {
329+
"total_pages": 2,
330+
"completed_at": {{.workflow.end_time}}
331+
}
332+
}`,
333+
steps: map[string]*StepResult{
334+
"fetch_overview": {
335+
Status: StepStatusCompleted,
336+
Output: map[string]any{"title": "Overview", "content": "..."},
337+
},
338+
"fetch_details": {
339+
Status: StepStatusCompleted,
340+
Output: map[string]any{"title": "Details", "sections": []any{"intro", "body"}},
341+
},
342+
},
343+
expectedFields: map[string]any{
344+
"pages": map[string]any{
345+
"overview": map[string]any{"title": "Overview", "content": "..."},
346+
"details": map[string]any{"title": "Details", "sections": []any{"intro", "body"}},
347+
},
348+
"summary": map[string]any{
349+
"total_pages": float64(2),
350+
"completed_at": float64(endTime),
351+
},
352+
},
353+
},
354+
{
355+
name: "invalid template syntax",
356+
template: `{"data": {{.steps.fetch.output}`,
357+
wantErr: true,
358+
errContains: "invalid output format template",
359+
},
360+
{
361+
name: "template references missing field",
362+
template: `{"data": {{.nonexistent.field}}}`,
363+
wantErr: true,
364+
errContains: "output format must produce valid JSON",
365+
},
366+
{
367+
name: "non-JSON output",
368+
template: `This is not JSON`,
369+
wantErr: true,
370+
errContains: "output format must produce valid JSON",
371+
},
372+
{
373+
name: "invalid JSON structure",
374+
template: `{"unclosed": "bracket"`,
375+
wantErr: true,
376+
errContains: "output format must produce valid JSON",
377+
},
378+
{
379+
name: "empty steps",
380+
template: `{
381+
"workflow_id": "{{.workflow.id}}",
382+
"step_count": {{.workflow.step_count}}
383+
}`,
384+
workflowID: "empty-wf",
385+
steps: map[string]*StepResult{},
386+
expectedFields: map[string]any{
387+
"workflow_id": "empty-wf",
388+
"step_count": float64(0),
389+
},
390+
},
391+
}
392+
393+
for _, tt := range tests {
394+
t.Run(tt.name, func(t *testing.T) {
395+
t.Parallel()
396+
397+
ctx := newWorkflowContext(tt.params)
398+
if tt.workflowID != "" {
399+
ctx.WorkflowID = tt.workflowID
400+
} else {
401+
ctx.WorkflowID = "test-workflow"
402+
}
403+
if tt.steps != nil {
404+
ctx.Steps = tt.steps
405+
}
406+
407+
result, err := expander.ExpandOutputFormat(context.Background(), tt.template, ctx, startTime, endTime)
408+
409+
if tt.wantErr {
410+
require.Error(t, err)
411+
if tt.errContains != "" {
412+
assert.Contains(t, err.Error(), tt.errContains)
413+
}
414+
return
415+
}
416+
417+
require.NoError(t, err)
418+
require.NotNil(t, result)
419+
420+
// Verify expected fields
421+
for key, expectedValue := range tt.expectedFields {
422+
actualValue, exists := result[key]
423+
require.True(t, exists, "expected key %q not found in result", key)
424+
assert.Equal(t, expectedValue, actualValue, "mismatch for key %q", key)
425+
}
426+
})
427+
}
428+
}
429+
430+
func TestTemplateExpander_ExpandOutputFormat_SizeLimits(t *testing.T) {
431+
t.Parallel()
432+
433+
expander := NewTemplateExpander()
434+
ctx := newWorkflowContext(nil)
435+
ctx.WorkflowID = "test"
436+
437+
// Create a large step output that will exceed 10MB size limit
438+
// Each entry is ~100 bytes, so we need >100k entries
439+
largeString := make([]byte, 100)
440+
for i := range largeString {
441+
largeString[i] = 'x'
442+
}
443+
444+
largeData := make(map[string]any)
445+
for i := 0; i < 120000; i++ { // 120k entries * ~100 bytes > 10MB
446+
largeData[fmt.Sprintf("key_%d", i)] = string(largeString)
447+
}
448+
449+
ctx.Steps = map[string]*StepResult{
450+
"large_step": {Status: StepStatusCompleted, Output: largeData},
451+
}
452+
453+
template := `{"data": {{json .steps.large_step.output}}}`
454+
455+
_, err := expander.ExpandOutputFormat(context.Background(), template, ctx, 0, 0)
456+
require.Error(t, err)
457+
assert.Contains(t, err.Error(), "too large")
458+
}
459+
460+
func TestTemplateExpander_ExpandOutputFormat_ContextCancellation(t *testing.T) {
461+
t.Parallel()
462+
463+
expander := NewTemplateExpander()
464+
ctx := newWorkflowContext(nil)
465+
ctx.WorkflowID = "test"
466+
ctx.Steps = map[string]*StepResult{
467+
"step1": {Status: StepStatusCompleted, Output: map[string]any{"result": "ok"}},
468+
}
469+
470+
// Create cancelled context
471+
cancelledCtx, cancel := context.WithCancel(context.Background())
472+
cancel()
473+
474+
template := `{"data": {{json .steps.step1.output}}}`
475+
476+
_, err := expander.ExpandOutputFormat(cancelledCtx, template, ctx, 0, 0)
477+
require.Error(t, err)
478+
assert.Contains(t, err.Error(), "context cancelled")
479+
}

0 commit comments

Comments
 (0)