Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions sdks/community/go/pkg/core/events/activity_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package events

import (
"encoding/json"
"fmt"
)

// ActivitySnapshotEvent contains a snapshot of an activity message.
type ActivitySnapshotEvent struct {
*BaseEvent
MessageID string `json:"messageId"`
ActivityType string `json:"activityType"`
Content any `json:"content"`
Replace *bool `json:"replace,omitempty"`
}

// NewActivitySnapshotEvent creates a new activity snapshot event.
func NewActivitySnapshotEvent(messageID, activityType string, content any) *ActivitySnapshotEvent {
replace := true
return &ActivitySnapshotEvent{
BaseEvent: NewBaseEvent(EventTypeActivitySnapshot),
MessageID: messageID,
ActivityType: activityType,
Content: content,
Replace: &replace,
}
}

// WithReplace sets the replace flag for the snapshot event.
func (e *ActivitySnapshotEvent) WithReplace(replace bool) *ActivitySnapshotEvent {
e.Replace = &replace
return e
}

// Validate validates the activity snapshot event.
func (e *ActivitySnapshotEvent) Validate() error {
if err := e.BaseEvent.Validate(); err != nil {
return err
}

if e.MessageID == "" {
return fmt.Errorf("ActivitySnapshotEvent validation failed: messageId field is required")
}

if e.ActivityType == "" {
return fmt.Errorf("ActivitySnapshotEvent validation failed: activityType field is required")
}

if e.Content == nil {
return fmt.Errorf("ActivitySnapshotEvent validation failed: content field is required")
}

return nil
}

// ToJSON serializes the event to JSON.
func (e *ActivitySnapshotEvent) ToJSON() ([]byte, error) {
return json.Marshal(e)
}

// ActivityDeltaEvent contains incremental updates for an activity message.
type ActivityDeltaEvent struct {
*BaseEvent
MessageID string `json:"messageId"`
ActivityType string `json:"activityType"`
Patch []JSONPatchOperation `json:"patch"`
}

// NewActivityDeltaEvent creates a new activity delta event.
func NewActivityDeltaEvent(messageID, activityType string, patch []JSONPatchOperation) *ActivityDeltaEvent {
return &ActivityDeltaEvent{
BaseEvent: NewBaseEvent(EventTypeActivityDelta),
MessageID: messageID,
ActivityType: activityType,
Patch: patch,
}
}

// Validate validates the activity delta event.
func (e *ActivityDeltaEvent) Validate() error {
if err := e.BaseEvent.Validate(); err != nil {
return err
}

if e.MessageID == "" {
return fmt.Errorf("ActivityDeltaEvent validation failed: messageId field is required")
}

if e.ActivityType == "" {
return fmt.Errorf("ActivityDeltaEvent validation failed: activityType field is required")
}

if len(e.Patch) == 0 {
return fmt.Errorf("ActivityDeltaEvent validation failed: patch field must contain at least one operation")
}

for i, op := range e.Patch {
if err := validateJSONPatchOperation(op); err != nil {
return fmt.Errorf("ActivityDeltaEvent validation failed: invalid patch operation at index %d: %w", i, err)
}
}

return nil
}

// ToJSON serializes the event to JSON.
func (e *ActivityDeltaEvent) ToJSON() ([]byte, error) {
return json.Marshal(e)
}
109 changes: 109 additions & 0 deletions sdks/community/go/pkg/core/events/activity_events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package events

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestActivitySnapshotEventBasics(t *testing.T) {
content := map[string]any{"status": "draft"}

event := NewActivitySnapshotEvent("activity-1", "PLAN", content)

assert.Equal(t, EventTypeActivitySnapshot, event.Type())
assert.Equal(t, "activity-1", event.MessageID)
assert.Equal(t, "PLAN", event.ActivityType)
require.NotNil(t, event.Replace)
assert.True(t, *event.Replace)
assert.NoError(t, event.Validate())

event = event.WithReplace(false)
require.NotNil(t, event.Replace)
assert.False(t, *event.Replace)
}

func TestActivitySnapshotEventValidationAndJSON(t *testing.T) {
event := NewActivitySnapshotEvent("activity-1", "PLAN", map[string]any{"status": "draft"})

data, err := event.ToJSON()
require.NoError(t, err)

var decoded map[string]any
require.NoError(t, json.Unmarshal(data, &decoded))

assert.Equal(t, string(EventTypeActivitySnapshot), decoded["type"])
assert.Equal(t, "activity-1", decoded["messageId"])
assert.Equal(t, "PLAN", decoded["activityType"])
content, ok := decoded["content"].(map[string]any)
require.True(t, ok)
assert.Equal(t, "draft", content["status"])

event.MessageID = ""
assert.Error(t, event.Validate())

event.MessageID = "activity-1"
event.ActivityType = ""
assert.Error(t, event.Validate())

event.ActivityType = "PLAN"
event.Content = nil
assert.Error(t, event.Validate())

event.Content = map[string]any{"status": "draft"}
event.BaseEvent.EventType = ""
assert.Error(t, event.Validate())
}

func TestActivitySnapshotEvent_MissingActivityType(t *testing.T) {
event := NewActivitySnapshotEvent("activity-1", "", map[string]any{"status": "draft"})
err := event.Validate()
assert.Error(t, err)
}

func TestActivityDeltaEventValidationAndJSON(t *testing.T) {
patch := []JSONPatchOperation{{Op: "replace", Path: "/status", Value: "done"}}
event := NewActivityDeltaEvent("activity-1", "PLAN", patch)

assert.Equal(t, EventTypeActivityDelta, event.Type())
assert.NoError(t, event.Validate())

data, err := event.ToJSON()
require.NoError(t, err)

var decoded map[string]any
require.NoError(t, json.Unmarshal(data, &decoded))

assert.Equal(t, string(EventTypeActivityDelta), decoded["type"])
assert.Equal(t, "activity-1", decoded["messageId"])
assert.Equal(t, "PLAN", decoded["activityType"])
items, ok := decoded["patch"].([]any)
require.True(t, ok)
assert.Len(t, items, 1)

event.MessageID = ""
assert.Error(t, event.Validate())

event.MessageID = "activity-1"
event.Patch = []JSONPatchOperation{}
assert.Error(t, event.Validate())

event.Patch = []JSONPatchOperation{{Op: "invalid", Path: "/status"}}
assert.Error(t, event.Validate())

event.Patch = []JSONPatchOperation{{Op: "replace", Path: "/status", Value: "ok"}}
event.ActivityType = ""
assert.Error(t, event.Validate())

event.ActivityType = "PLAN"
event.BaseEvent.EventType = ""
assert.Error(t, event.Validate())
}

func TestActivityDeltaEvent_MissingActivityType(t *testing.T) {
event := NewActivityDeltaEvent("activity-1", "", []JSONPatchOperation{{Op: "replace", Path: "/status", Value: "done"}})
err := event.Validate()
assert.Error(t, err)
}
14 changes: 14 additions & 0 deletions sdks/community/go/pkg/core/events/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ func (ed *EventDecoder) DecodeEvent(eventName string, data []byte) (Event, error
}
return &evt, nil

case EventTypeActivitySnapshot:
var evt ActivitySnapshotEvent
if err := json.Unmarshal(data, &evt); err != nil {
return nil, fmt.Errorf("failed to decode ACTIVITY_SNAPSHOT: %w", err)
}
return &evt, nil

case EventTypeActivityDelta:
var evt ActivityDeltaEvent
if err := json.Unmarshal(data, &evt); err != nil {
return nil, fmt.Errorf("failed to decode ACTIVITY_DELTA: %w", err)
}
return &evt, nil

case EventTypeStepStarted:
var evt StepStartedEvent
if err := json.Unmarshal(data, &evt); err != nil {
Expand Down
35 changes: 35 additions & 0 deletions sdks/community/go/pkg/core/events/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,41 @@ func TestEventDecoder(t *testing.T) {
assert.Equal(t, "msg-1", msgEvent.Messages[0].ID)
})

t.Run("DecodeEvent_ActivitySnapshot", func(t *testing.T) {
decoder := NewEventDecoder(nil)
data := []byte(`{"messageId": "activity-1", "activityType": "PLAN", "content": {"status": "draft"}, "replace": false}`)

event, err := decoder.DecodeEvent("ACTIVITY_SNAPSHOT", data)
require.NoError(t, err)
require.NotNil(t, event)

activityEvent, ok := event.(*ActivitySnapshotEvent)
require.True(t, ok)
assert.Equal(t, "activity-1", activityEvent.MessageID)
assert.Equal(t, "PLAN", activityEvent.ActivityType)
require.NotNil(t, activityEvent.Replace)
assert.False(t, *activityEvent.Replace)
content, ok := activityEvent.Content.(map[string]any)
require.True(t, ok)
assert.Equal(t, "draft", content["status"])
})

t.Run("DecodeEvent_ActivityDelta", func(t *testing.T) {
decoder := NewEventDecoder(nil)
data := []byte(`{"messageId": "activity-1", "activityType": "PLAN", "patch": [{"op": "replace", "path": "/status", "value": "streaming"}]}`)

event, err := decoder.DecodeEvent("ACTIVITY_DELTA", data)
require.NoError(t, err)
require.NotNil(t, event)

activityEvent, ok := event.(*ActivityDeltaEvent)
require.True(t, ok)
assert.Equal(t, "activity-1", activityEvent.MessageID)
assert.Equal(t, "PLAN", activityEvent.ActivityType)
assert.Len(t, activityEvent.Patch, 1)
assert.Equal(t, "replace", activityEvent.Patch[0].Op)
})

t.Run("DecodeEvent_StepStarted", func(t *testing.T) {
decoder := NewEventDecoder(nil)
data := []byte(`{"stepName": "step-1"}`)
Expand Down
16 changes: 16 additions & 0 deletions sdks/community/go/pkg/core/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
EventTypeStateSnapshot EventType = "STATE_SNAPSHOT"
EventTypeStateDelta EventType = "STATE_DELTA"
EventTypeMessagesSnapshot EventType = "MESSAGES_SNAPSHOT"
EventTypeActivitySnapshot EventType = "ACTIVITY_SNAPSHOT"
EventTypeActivityDelta EventType = "ACTIVITY_DELTA"
EventTypeRaw EventType = "RAW"
EventTypeCustom EventType = "CUSTOM"
EventTypeRunStarted EventType = "RUN_STARTED"
Expand Down Expand Up @@ -57,6 +59,8 @@ var validEventTypes = map[EventType]bool{
EventTypeStateSnapshot: true,
EventTypeStateDelta: true,
EventTypeMessagesSnapshot: true,
EventTypeActivitySnapshot: true,
EventTypeActivityDelta: true,
EventTypeRaw: true,
EventTypeCustom: true,
EventTypeRunStarted: true,
Expand Down Expand Up @@ -318,6 +322,14 @@ func ValidateSequence(events []Event) error {
// They represent complete message state at any point in time
// Additional validation could be added if needed (e.g., consistency checks)

case EventTypeActivitySnapshot:
// Activity snapshot events are always valid in sequence context
// They represent complete activity state at any point in time

case EventTypeActivityDelta:
// Activity delta events are always valid in sequence context
// They represent incremental activity changes at any point in time

case EventTypeRaw:
// Raw events are always valid in sequence context
// They contain external data that should be passed through
Expand Down Expand Up @@ -381,6 +393,10 @@ func EventFromJSON(data []byte) (Event, error) {
event = &StateDeltaEvent{}
case EventTypeMessagesSnapshot:
event = &MessagesSnapshotEvent{}
case EventTypeActivitySnapshot:
event = &ActivitySnapshotEvent{}
case EventTypeActivityDelta:
event = &ActivityDeltaEvent{}
case EventTypeRaw:
event = &RawEvent{}
case EventTypeCustom:
Expand Down
Loading