From a0fa24d7a25249e635a3383d3f287fb3994a2d26 Mon Sep 17 00:00:00 2001 From: mirkoCrobu Date: Wed, 12 Nov 2025 16:02:48 +0100 Subject: [PATCH 1/2] move update to eventstream, adding progress type --- cmd/arduino-app-cli/system/system.go | 3 +- cmd/gendoc/docs.go | 8 +-- internal/api/handlers/update.go | 3 +- .../event.go => eventstream/event_stream.go} | 12 +++-- internal/update/apt/service.go | 41 ++++++++-------- internal/update/apt/service_test.go | 13 ++--- internal/update/arduino/arduino.go | 49 ++++++++++--------- internal/update/update.go | 42 ++++++++-------- 8 files changed, 91 insertions(+), 80 deletions(-) rename internal/{update/event.go => eventstream/event_stream.go} (88%) diff --git a/cmd/arduino-app-cli/system/system.go b/cmd/arduino-app-cli/system/system.go index 54c89128..7dc1d96e 100644 --- a/cmd/arduino-app-cli/system/system.go +++ b/cmd/arduino-app-cli/system/system.go @@ -25,6 +25,7 @@ import ( "github.com/arduino/arduino-app-cli/cmd/arduino-app-cli/internal/servicelocator" "github.com/arduino/arduino-app-cli/cmd/feedback" + "github.com/arduino/arduino-app-cli/internal/eventstream" "github.com/arduino/arduino-app-cli/internal/helpers" "github.com/arduino/arduino-app-cli/internal/orchestrator" "github.com/arduino/arduino-app-cli/internal/orchestrator/config" @@ -115,7 +116,7 @@ func newUpdateCmd() *cobra.Command { for event := range events { feedback.Printf("[%s] %s", event.Type.String(), event.Data) - if event.Type == update.DoneEvent { + if event.Type == eventstream.DoneEvent { break } } diff --git a/cmd/gendoc/docs.go b/cmd/gendoc/docs.go index 8e85e2a8..d0ed882d 100644 --- a/cmd/gendoc/docs.go +++ b/cmd/gendoc/docs.go @@ -31,10 +31,10 @@ import ( "github.com/arduino/arduino-app-cli/internal/api/handlers" "github.com/arduino/arduino-app-cli/internal/api/models" + "github.com/arduino/arduino-app-cli/internal/eventstream" "github.com/arduino/arduino-app-cli/internal/orchestrator" "github.com/arduino/arduino-app-cli/internal/orchestrator/app" "github.com/arduino/arduino-app-cli/internal/orchestrator/bricks" - "github.com/arduino/arduino-app-cli/internal/update" ) type Tag string @@ -81,10 +81,10 @@ func NewOpenApiGenerator(version string) *Generator { openapi3.SchemaOrRef{ Schema: &openapi3.Schema{ UniqueItems: f.Ptr(true), - Enum: f.Map(update.PackageType("").AllowedStatuses(), func(v update.PackageType) interface{} { return v }), + Enum: f.Map(eventstream.PackageType("").AllowedStatuses(), func(v eventstream.PackageType) interface{} { return v }), Type: f.Ptr(openapi3.SchemaTypeString), Description: f.Ptr("Package type"), - ReflectType: reflect.TypeOf(update.PackageType("")), + ReflectType: reflect.TypeOf(eventstream.PackageType("")), }, }, ) @@ -216,7 +216,7 @@ func NewOpenApiGenerator(version string) *Generator { }) } - if params.Value.Type() == reflect.TypeOf(update.PackageType("")) { + if params.Value.Type() == reflect.TypeOf(eventstream.PackageType("")) { params.Schema.WithRef("#/components/schemas/PackageType") return true, nil } diff --git a/internal/api/handlers/update.go b/internal/api/handlers/update.go index 41ac992b..4ffa21ec 100644 --- a/internal/api/handlers/update.go +++ b/internal/api/handlers/update.go @@ -23,6 +23,7 @@ import ( "log/slog" "github.com/arduino/arduino-app-cli/internal/api/models" + "github.com/arduino/arduino-app-cli/internal/eventstream" "github.com/arduino/arduino-app-cli/internal/render" "github.com/arduino/arduino-app-cli/internal/update" ) @@ -127,7 +128,7 @@ func HandleUpdateEvents(updater *update.Manager) http.HandlerFunc { slog.Info("APT event channel closed, stopping SSE stream") return } - if event.Type == update.ErrorEvent { + if event.Type == eventstream.ErrorEvent { sseStream.SendError(render.SSEErrorData{ Code: render.InternalServiceErr, Message: event.Data, diff --git a/internal/update/event.go b/internal/eventstream/event_stream.go similarity index 88% rename from internal/update/event.go rename to internal/eventstream/event_stream.go index 0f6c1a51..3dfcaaf6 100644 --- a/internal/update/event.go +++ b/internal/eventstream/event_stream.go @@ -13,7 +13,7 @@ // Arduino software without disclosing the source code of your own applications. // To purchase a commercial license, send an email to license@arduino.cc. -package update +package eventstream // EventType defines the type of upgrade event. type EventType int @@ -24,13 +24,15 @@ const ( RestartEvent DoneEvent ErrorEvent + ProgressEvent ) // Event represents a single event in the upgrade process. type Event struct { - Type EventType - Data string - Err error // Optional error field for error events + Type EventType + Data string + Err error // Optional error field for error events + Progress float32 } func (t EventType) String() string { @@ -45,6 +47,8 @@ func (t EventType) String() string { return "done" case ErrorEvent: return "error" + case ProgressEvent: + return "progress" default: panic("unreachable") } diff --git a/internal/update/apt/service.go b/internal/update/apt/service.go index f3d3984e..b790ad08 100644 --- a/internal/update/apt/service.go +++ b/internal/update/apt/service.go @@ -30,6 +30,7 @@ import ( "github.com/arduino/go-paths-helper" "go.bug.st/f" + "github.com/arduino/arduino-app-cli/internal/eventstream" "github.com/arduino/arduino-app-cli/internal/orchestrator" "github.com/arduino/arduino-app-cli/internal/update" ) @@ -74,11 +75,11 @@ func (s *Service) ListUpgradablePackages(ctx context.Context, matcher func(updat // UpgradePackages upgrades the specified packages using the `apt-get upgrade` command. // It publishes events to subscribers during the upgrade process. // It returns an error if the upgrade is already in progress or if the upgrade command fails. -func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) { +func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan eventstream.Event, error) { if !s.lock.TryLock() { return nil, update.ErrOperationAlreadyInProgress } - eventsCh := make(chan update.Event, 100) + eventsCh := make(chan eventstream.Event, 100) go func() { defer s.lock.Unlock() @@ -87,37 +88,37 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() - eventsCh <- update.Event{Type: update.StartEvent, Data: "Upgrade is starting"} + eventsCh <- eventstream.Event{Type: eventstream.StartEvent, Data: "Upgrade is starting"} stream := runUpgradeCommand(ctx, names) for line, err := range stream { if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Err: err, Data: "Error running upgrade command", } slog.Error("error processing upgrade command output", "error", err) return } - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: line} + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line} } - eventsCh <- update.Event{Type: update.StartEvent, Data: "apt cleaning cache is starting"} + eventsCh <- eventstream.Event{Type: eventstream.StartEvent, Data: "apt cleaning cache is starting"} for line, err := range runAptCleanCommand(ctx) { if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Err: err, Data: "Error running apt clean command", } slog.Error("error processing apt clean command output", "error", err) return } - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: line} + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line} } // TEMPORARY PATCH: stopping and destroying docker containers and images since IDE does not implement it yet. // TODO: Remove this workaround once IDE implements it. // Tracking issue: https://github.com/arduino/arduino-app-cli/issues/623 - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: "Stop and destroy docker containers and images ..."} + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: "Stop and destroy docker containers and images ..."} streamCleanup := cleanupDockerContainers(ctx) for line, err := range streamCleanup { if err != nil { @@ -125,7 +126,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u // currently, we just log the error and continue considenring not blocking slog.Error("Error stopping and destroying docker containers", "error", err) } - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: line} + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line} } // TEMPORARY PATCH: Install the latest docker images and show the logs to the users. @@ -133,26 +134,26 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u // Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600 // Currently, we need to launch `arduino-app-cli system init` to pull the latest docker images because // the version of the docker images are hardcoded in the (new downloaded) version of the arduino-app-cli. - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: "Pulling the latest docker images ..."} + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: "Pulling the latest docker images ..."} streamDocker := pullDockerImages(ctx) for line, err := range streamDocker { if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Err: err, Data: "Error upgrading docker images", } slog.Error("error upgrading docker images", "error", err) return } - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: line} + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line} } - eventsCh <- update.Event{Type: update.RestartEvent, Data: "Upgrade completed. Restarting ..."} + eventsCh <- eventstream.Event{Type: eventstream.RestartEvent, Data: "Upgrade completed. Restarting ..."} err := restartServices(ctx) if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Err: err, Data: "Error restart services after upgrade", } @@ -361,7 +362,7 @@ func parseListUpgradableOutput(r io.Reader) []update.UpgradablePackage { name := strings.Split(matches[1], "/")[0] pkg := update.UpgradablePackage{ - Type: update.Debian, + Type: eventstream.Debian, Name: name, ToVersion: matches[2], Architecture: matches[3], diff --git a/internal/update/apt/service_test.go b/internal/update/apt/service_test.go index b2d8f76a..6a763a1a 100644 --- a/internal/update/apt/service_test.go +++ b/internal/update/apt/service_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/arduino/arduino-app-cli/internal/eventstream" "github.com/arduino/arduino-app-cli/internal/update" ) @@ -46,7 +47,7 @@ func TestParseListUpgradableOutput(t *testing.T) { input: "nano/bionic-updates 2.9.3-2 amd64", expected: []update.UpgradablePackage{ { - Type: update.Debian, + Type: eventstream.Debian, Name: "nano", ToVersion: "2.9.3-2", FromVersion: "", @@ -59,7 +60,7 @@ func TestParseListUpgradableOutput(t *testing.T) { input: "apt/focal-updates 2.0.11 amd64 [upgradable from: 2.0.10]", expected: []update.UpgradablePackage{ { - Type: update.Debian, + Type: eventstream.Debian, Name: "apt", ToVersion: "2.0.11", FromVersion: "2.0.10", @@ -77,28 +78,28 @@ containerd.io/focal 1.7.27-1 amd64 [upgradable from: 1.7.25-1] `, expected: []update.UpgradablePackage{ { - Type: update.Debian, + Type: eventstream.Debian, Name: "distro-info-data", ToVersion: "0.43ubuntu1.18", FromVersion: "0.43ubuntu1.16", Architecture: "all", }, { - Type: update.Debian, + Type: eventstream.Debian, Name: "apt", ToVersion: "2.0.11", FromVersion: "2.0.10", Architecture: "amd64", }, { - Type: update.Debian, + Type: eventstream.Debian, Name: "code", ToVersion: "1.100.3-1748872405", FromVersion: "1.100.2-1747260578", Architecture: "amd64", }, { - Type: update.Debian, + Type: eventstream.Debian, Name: "containerd.io", ToVersion: "1.7.27-1", FromVersion: "1.7.25-1", diff --git a/internal/update/arduino/arduino.go b/internal/update/arduino/arduino.go index 01076dff..2618ff7c 100644 --- a/internal/update/arduino/arduino.go +++ b/internal/update/arduino/arduino.go @@ -27,6 +27,7 @@ import ( rpc "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1" "github.com/sirupsen/logrus" + "github.com/arduino/arduino-app-cli/internal/eventstream" "github.com/arduino/arduino-app-cli/internal/helpers" "github.com/arduino/arduino-app-cli/internal/orchestrator" "github.com/arduino/arduino-app-cli/internal/update" @@ -117,7 +118,7 @@ func (a *ArduinoPlatformUpdater) ListUpgradablePackages(ctx context.Context, _ f } return []update.UpgradablePackage{{ - Type: update.Arduino, + Type: eventstream.Arduino, Name: "arduino:zephyr", FromVersion: platformSummary.GetInstalledVersion(), ToVersion: platformSummary.GetLatestVersion(), @@ -125,21 +126,21 @@ func (a *ArduinoPlatformUpdater) ListUpgradablePackages(ctx context.Context, _ f } // UpgradePackages implements ServiceUpdater. -func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) { +func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (<-chan eventstream.Event, error) { if !a.lock.TryLock() { return nil, update.ErrOperationAlreadyInProgress } - eventsCh := make(chan update.Event, 100) + eventsCh := make(chan eventstream.Event, 100) downloadProgressCB := func(curr *rpc.DownloadProgress) { data := helpers.ArduinoCLIDownloadProgressToString(curr) slog.Debug("Download progress", slog.String("download_progress", data)) - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: data} + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: data} } taskProgressCB := func(msg *rpc.TaskProgress) { data := helpers.ArduinoCLITaskProgressToString(msg) slog.Debug("Task progress", slog.String("task_progress", data)) - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: data} + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: data} } go func() { @@ -149,14 +150,14 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() - eventsCh <- update.Event{Type: update.StartEvent, Data: "Upgrade is starting"} + eventsCh <- eventstream.Event{Type: eventstream.StartEvent, Data: "Upgrade is starting"} logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli srv := commands.NewArduinoCoreServer() if err := setConfig(ctx, srv); err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Err: err, Data: "Error setting additional URLs", } @@ -165,8 +166,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st var inst *rpc.Instance if resp, err := srv.Create(ctx, &rpc.CreateRequest{}); err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Err: err, Data: "Error creating Arduino instance", } @@ -185,16 +186,16 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st { stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB) if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Err: err, Data: "Error updating index", } return } if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Err: err, Data: "Error initializing Arduino instance", } @@ -219,14 +220,14 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st ); err != nil { var alreadyPresent *cmderrors.PlatformAlreadyAtTheLatestVersionError if errors.As(err, &alreadyPresent) { - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: alreadyPresent.Error()} + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: alreadyPresent.Error()} return } var notFound *cmderrors.PlatformNotFoundError if !errors.As(err, ¬Found) { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Err: err, Data: "Error upgrading platform", } @@ -246,23 +247,23 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st ), ) if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Err: err, Data: "Error installing platform", } return } } else if respCB().GetPlatform() == nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Data: "platform upgrade failed", } return } cbw := orchestrator.NewCallbackWriter(func(line string) { - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: line} + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line} }) err := srv.BurnBootloader( @@ -274,8 +275,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st commands.BurnBootloaderToServerStreams(ctx, cbw, cbw), ) if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, + eventsCh <- eventstream.Event{ + Type: eventstream.ErrorEvent, Err: err, Data: "Error burning bootloader", } diff --git a/internal/update/update.go b/internal/update/update.go index 7a254478..b7812e33 100644 --- a/internal/update/update.go +++ b/internal/update/update.go @@ -26,6 +26,8 @@ import ( "time" "golang.org/x/sync/errgroup" + + "github.com/arduino/arduino-app-cli/internal/eventstream" ) var ErrOperationAlreadyInProgress = errors.New("an operation is already in progress") @@ -40,16 +42,16 @@ var MatchAllPackages = func(p UpgradablePackage) bool { } type UpgradablePackage struct { - Type PackageType `json:"type"` // e.g., "arduino", "deb" - Name string `json:"name"` // Package name without repository information - Architecture string `json:"-"` - FromVersion string `json:"from_version"` - ToVersion string `json:"to_version"` + Type eventstream.PackageType `json:"type"` // e.g., "arduino", "deb" + Name string `json:"name"` // Package name without repository information + Architecture string `json:"-"` + FromVersion string `json:"from_version"` + ToVersion string `json:"to_version"` } type ServiceUpdater interface { ListUpgradablePackages(ctx context.Context, matcher func(UpgradablePackage) bool) ([]UpgradablePackage, error) - UpgradePackages(ctx context.Context, names []string) (<-chan Event, error) + UpgradePackages(ctx context.Context, names []string) (<-chan eventstream.Event, error) } type Manager struct { @@ -58,14 +60,14 @@ type Manager struct { arduinoPlatformUpdateService ServiceUpdater mu sync.RWMutex - subs map[chan Event]struct{} + subs map[chan eventstream.Event]struct{} } func NewManager(debUpdateService ServiceUpdater, arduinoPlatformUpdateService ServiceUpdater) *Manager { return &Manager{ debUpdateService: debUpdateService, arduinoPlatformUpdateService: arduinoPlatformUpdateService, - subs: make(map[chan Event]struct{}), + subs: make(map[chan eventstream.Event]struct{}), } } @@ -123,9 +125,9 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) var arduinoPlatform []string for _, v := range pkgs { switch v.Type { - case Arduino: + case eventstream.Arduino: arduinoPlatform = append(arduinoPlatform, v.Name) - case Debian: + case eventstream.Debian: debPkgs = append(debPkgs, v.Name) default: return fmt.Errorf("unknown package type %s", v.Type) @@ -142,8 +144,8 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) arduinoEvents, err := m.arduinoPlatformUpdateService.UpgradePackages(ctx, arduinoPlatform) if err != nil { m.broadcast( - Event{ - Type: ErrorEvent, + eventstream.Event{ + Type: eventstream.ErrorEvent, Data: "failed to upgrade Arduino packages", Err: err, }) @@ -156,8 +158,8 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) aptEvents, err := m.debUpdateService.UpgradePackages(ctx, debPkgs) if err != nil { m.broadcast( - Event{ - Type: ErrorEvent, + eventstream.Event{ + Type: eventstream.ErrorEvent, Data: "failed to upgrade APT packages", Err: err, }) @@ -166,14 +168,14 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) for e := range aptEvents { m.broadcast(e) } - m.broadcast(Event{Type: DoneEvent, Data: "Upgrade completed successfully"}) + m.broadcast(eventstream.Event{Type: eventstream.DoneEvent, Data: "Upgrade completed successfully"}) }() return nil } // Subscribe creates a new channel for receiving APT events. -func (b *Manager) Subscribe() chan Event { - eventCh := make(chan Event, 100) +func (b *Manager) Subscribe() chan eventstream.Event { + eventCh := make(chan eventstream.Event, 100) b.mu.Lock() b.subs[eventCh] = struct{}{} b.mu.Unlock() @@ -181,18 +183,18 @@ func (b *Manager) Subscribe() chan Event { } // Unsubscribe removes the channel from the list of subscribers and closes it. -func (b *Manager) Unsubscribe(eventCh chan Event) { +func (b *Manager) Unsubscribe(eventCh chan eventstream.Event) { b.mu.Lock() delete(b.subs, eventCh) close(eventCh) b.mu.Unlock() } -func (b *Manager) broadcast(event Event) { +func (b *Manager) broadcast(event eventstream.Event) { b.mu.RLock() defer b.mu.RUnlock() - if event.Type == ErrorEvent { + if event.Type == eventstream.ErrorEvent { slog.Error("An error occurred", slog.Any("event", event)) } for ch := range b.subs { From f4812ea703ceeaed68ef1032a472421444f1093c Mon Sep 17 00:00:00 2001 From: mirkoCrobu Date: Wed, 12 Nov 2025 17:00:04 +0100 Subject: [PATCH 2/2] add progress events for update process --- internal/api/handlers/update.go | 11 +++-- internal/update/apt/service.go | 6 ++- internal/update/arduino/arduino.go | 67 ++++++++++++++++++++++-------- internal/update/update.go | 22 +++++++++- 4 files changed, 82 insertions(+), 24 deletions(-) diff --git a/internal/api/handlers/update.go b/internal/api/handlers/update.go index 4ffa21ec..236efd81 100644 --- a/internal/api/handlers/update.go +++ b/internal/api/handlers/update.go @@ -128,12 +128,17 @@ func HandleUpdateEvents(updater *update.Manager) http.HandlerFunc { slog.Info("APT event channel closed, stopping SSE stream") return } - if event.Type == eventstream.ErrorEvent { + switch event.Type { + case eventstream.ErrorEvent: sseStream.SendError(render.SSEErrorData{ Code: render.InternalServiceErr, - Message: event.Data, + Message: event.Data}) + case eventstream.ProgressEvent: + sseStream.Send(render.SSEEvent{ + Type: event.Type.String(), + Data: event.Progress, }) - } else { + default: sseStream.Send(render.SSEEvent{ Type: event.Type.String(), Data: event.Data, diff --git a/internal/update/apt/service.go b/internal/update/apt/service.go index b790ad08..6ea725ca 100644 --- a/internal/update/apt/service.go +++ b/internal/update/apt/service.go @@ -103,6 +103,8 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan e eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line} } eventsCh <- eventstream.Event{Type: eventstream.StartEvent, Data: "apt cleaning cache is starting"} + eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: 80.0} + eventsCh <- eventstream.Event{Type: eventstream.StartEvent, Data: "apt cleaning cache is starting"} for line, err := range runAptCleanCommand(ctx) { if err != nil { eventsCh <- eventstream.Event{ @@ -115,6 +117,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan e } eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line} } + eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: 85.0} // TEMPORARY PATCH: stopping and destroying docker containers and images since IDE does not implement it yet. // TODO: Remove this workaround once IDE implements it. // Tracking issue: https://github.com/arduino/arduino-app-cli/issues/623 @@ -128,7 +131,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan e } eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line} } - + eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: 90.0} // TEMPORARY PATCH: Install the latest docker images and show the logs to the users. // TODO: Remove this workaround once docker image versions are no longer hardcoded in arduino-app-cli. // Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600 @@ -148,6 +151,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan e } eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line} } + eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: 100.0} eventsCh <- eventstream.Event{Type: eventstream.RestartEvent, Data: "Upgrade completed. Restarting ..."} err := restartServices(ctx) diff --git a/internal/update/arduino/arduino.go b/internal/update/arduino/arduino.go index 2618ff7c..20557617 100644 --- a/internal/update/arduino/arduino.go +++ b/internal/update/arduino/arduino.go @@ -132,27 +132,51 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st } eventsCh := make(chan eventstream.Event, 100) - downloadProgressCB := func(curr *rpc.DownloadProgress) { - data := helpers.ArduinoCLIDownloadProgressToString(curr) - slog.Debug("Download progress", slog.String("download_progress", data)) - eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: data} - } - taskProgressCB := func(msg *rpc.TaskProgress) { - data := helpers.ArduinoCLITaskProgressToString(msg) - slog.Debug("Task progress", slog.String("task_progress", data)) - eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: data} - } - go func() { defer a.lock.Unlock() defer close(eventsCh) - + const indexWeight float32 = 30.0 + const indexBase float32 = 0.0 + const upgradeBase float32 = 30.0 + const upgradeWeight float32 = 60.0 + + makeDownloadProgressCallback := func(basePercentage, phaseWeight float32) func(*rpc.DownloadProgress) { + return func(curr *rpc.DownloadProgress) { + data := helpers.ArduinoCLIDownloadProgressToString(curr) + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: data} + if updateInfo := curr.GetUpdate(); updateInfo != nil { + if updateInfo.GetTotalSize() <= 0 { + return + } + localProgress := (float32(updateInfo.GetDownloaded()) / float32(updateInfo.GetTotalSize())) * 100.0 + totalArduinoProgress := basePercentage + (localProgress/100.0)*phaseWeight + eventsCh <- eventstream.Event{ + Type: eventstream.ProgressEvent, + Progress: totalArduinoProgress, + } + } + } + } + makeTaskProgressCallback := func(basePercentage, phaseWeight float32) func(*rpc.TaskProgress) { + return func(msg *rpc.TaskProgress) { + data := helpers.ArduinoCLITaskProgressToString(msg) + eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: data} + if !msg.GetCompleted() { + localProgress := msg.GetPercent() + totalArduinoProgress := basePercentage + (localProgress/100.0)*phaseWeight + eventsCh <- eventstream.Event{ + Type: eventstream.ProgressEvent, + Progress: totalArduinoProgress, + } + } + } + } ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() eventsCh <- eventstream.Event{Type: eventstream.StartEvent, Data: "Upgrade is starting"} - logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli + logrus.SetLevel(logrus.ErrorLevel) srv := commands.NewArduinoCoreServer() if err := setConfig(ctx, srv); err != nil { @@ -184,7 +208,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st }() { - stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB) + updateIndexProgressCB := makeDownloadProgressCallback(indexBase, indexWeight) + stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, updateIndexProgressCB) if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil { eventsCh <- eventstream.Event{ Type: eventstream.ErrorEvent, @@ -193,6 +218,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st } return } + eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: indexBase + indexWeight} + if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil { eventsCh <- eventstream.Event{ Type: eventstream.ErrorEvent, @@ -203,10 +230,13 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st } } + platformDownloadCB := makeDownloadProgressCallback(upgradeBase, upgradeWeight) + platformTaskCB := makeTaskProgressCallback(upgradeBase, upgradeWeight) + stream, respCB := commands.PlatformUpgradeStreamResponseToCallbackFunction( ctx, - downloadProgressCB, - taskProgressCB, + platformDownloadCB, + platformTaskCB, ) if err := srv.PlatformUpgrade( &rpc.PlatformUpgradeRequest{ @@ -242,8 +272,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st }, commands.PlatformInstallStreamResponseToCallbackFunction( ctx, - downloadProgressCB, - taskProgressCB, + platformDownloadCB, + platformTaskCB, ), ) if err != nil { @@ -282,6 +312,7 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st } return } + eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: 100.0} }() return eventsCh, nil diff --git a/internal/update/update.go b/internal/update/update.go index b7812e33..b1c289ed 100644 --- a/internal/update/update.go +++ b/internal/update/update.go @@ -141,6 +141,12 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) // update of the cores we will end up with inconsistent state, or // we need to re run the upgrade because the orchestrator interrupted // in the middle the upgrade of the cores. + + const arduinoWeight float32 = 20.0 + const aptWeight float32 = 80.0 + + m.broadcast(eventstream.Event{Type: eventstream.ProgressEvent, Progress: 0.0}) + arduinoEvents, err := m.arduinoPlatformUpdateService.UpgradePackages(ctx, arduinoPlatform) if err != nil { m.broadcast( @@ -152,9 +158,15 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) return } for e := range arduinoEvents { - m.broadcast(e) + if e.Type == eventstream.ProgressEvent { + globalProgress := (e.Progress / 100.0) * arduinoWeight + m.broadcast(eventstream.Event{Type: eventstream.ProgressEvent, Progress: globalProgress}) + } else { + m.broadcast(e) + } } + m.broadcast(eventstream.Event{Type: eventstream.ProgressEvent, Progress: arduinoWeight}) aptEvents, err := m.debUpdateService.UpgradePackages(ctx, debPkgs) if err != nil { m.broadcast( @@ -166,8 +178,14 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) return } for e := range aptEvents { - m.broadcast(e) + if e.Type == eventstream.ProgressEvent { + globalProgress := arduinoWeight + (e.Progress/100.0)*aptWeight + m.broadcast(eventstream.Event{Type: eventstream.ProgressEvent, Progress: globalProgress}) + } else { + m.broadcast(e) + } } + m.broadcast(eventstream.Event{Type: eventstream.ProgressEvent, Progress: 100.0}) m.broadcast(eventstream.Event{Type: eventstream.DoneEvent, Data: "Upgrade completed successfully"}) }() return nil