diff --git a/cmd/arduino-app-cli/system/system.go b/cmd/arduino-app-cli/system/system.go index 54c89128..c23d5317 100644 --- a/cmd/arduino-app-cli/system/system.go +++ b/cmd/arduino-app-cli/system/system.go @@ -113,7 +113,13 @@ func newUpdateCmd() *cobra.Command { events := updater.Subscribe() for event := range events { - feedback.Printf("[%s] %s", event.Type.String(), event.Data) + if event.Type == update.ErrorEvent { + // TODO: add colors to error messages + err := event.GetError() + feedback.Printf("Error: %s [%s]", err.Error(), update.GetUpdateErrorCode(err)) + } else { + feedback.Printf("[%s] %s", event.Type.String(), event.GetData()) + } if event.Type == update.DoneEvent { break diff --git a/internal/api/handlers/update.go b/internal/api/handlers/update.go index 41ac992b..839fdcf5 100644 --- a/internal/api/handlers/update.go +++ b/internal/api/handlers/update.go @@ -16,7 +16,6 @@ package handlers import ( - "errors" "net/http" "strings" @@ -43,14 +42,20 @@ func HandleCheckUpgradable(updater *update.Manager) http.HandlerFunc { pkgs, err := updater.ListUpgradablePackages(r.Context(), filterFunc) if err != nil { - if errors.Is(err, update.ErrOperationAlreadyInProgress) { - render.EncodeResponse(w, http.StatusConflict, models.ErrorResponse{Details: err.Error()}) + code := update.GetUpdateErrorCode(err) + if code == update.OperationInProgressCode { + render.EncodeResponse(w, http.StatusConflict, models.ErrorResponse{ + Code: string(code), + Details: err.Error(), + }) return } - render.EncodeResponse(w, http.StatusBadRequest, models.ErrorResponse{Details: "Error checking for upgradable packages: " + err.Error()}) + render.EncodeResponse(w, http.StatusBadRequest, models.ErrorResponse{ + Code: string(code), + Details: err.Error(), + }) return } - if len(pkgs) == 0 { render.EncodeResponse(w, http.StatusNoContent, nil) return @@ -79,27 +84,40 @@ func HandleUpdateApply(updater *update.Manager) http.HandlerFunc { pkgs, err := updater.ListUpgradablePackages(r.Context(), filterFunc) if err != nil { - if errors.Is(err, update.ErrOperationAlreadyInProgress) { - render.EncodeResponse(w, http.StatusConflict, models.ErrorResponse{Details: err.Error()}) + code := update.GetUpdateErrorCode(err) + if code == update.OperationInProgressCode { + render.EncodeResponse(w, http.StatusConflict, models.ErrorResponse{ + Code: string(code), + Details: err.Error(), + }) return } slog.Error("Unable to get upgradable packages", slog.String("error", err.Error())) - render.EncodeResponse(w, http.StatusInternalServerError, models.ErrorResponse{Details: "Error checking for upgradable packages"}) + render.EncodeResponse(w, http.StatusInternalServerError, models.ErrorResponse{ + Code: string(code), + Details: err.Error(), + }) return } - if len(pkgs) == 0 { - render.EncodeResponse(w, http.StatusNoContent, models.ErrorResponse{Details: "System is up to date, no upgradable packages found"}) + render.EncodeResponse(w, http.StatusNoContent, nil) return } err = updater.UpgradePackages(r.Context(), pkgs) if err != nil { - if errors.Is(err, update.ErrOperationAlreadyInProgress) { - render.EncodeResponse(w, http.StatusConflict, models.ErrorResponse{Details: err.Error()}) + code := update.GetUpdateErrorCode(err) + if code == update.OperationInProgressCode { + render.EncodeResponse(w, http.StatusConflict, models.ErrorResponse{ + Code: string(code), + Details: err.Error(), + }) return } - render.EncodeResponse(w, http.StatusInternalServerError, models.ErrorResponse{Details: "Error upgrading packages"}) + render.EncodeResponse(w, http.StatusInternalServerError, models.ErrorResponse{ + Code: string(code), + Details: err.Error(), + }) return } @@ -128,14 +146,19 @@ func HandleUpdateEvents(updater *update.Manager) http.HandlerFunc { return } if event.Type == update.ErrorEvent { + err := event.GetError() + code := render.InternalServiceErr + if c := update.GetUpdateErrorCode(err); c != update.UnknownErrorCode { + code = render.SSEErrCode(string(c)) + } sseStream.SendError(render.SSEErrorData{ - Code: render.InternalServiceErr, - Message: event.Data, + Code: code, + Message: err.Error(), }) } else { sseStream.Send(render.SSEEvent{ Type: event.Type.String(), - Data: event.Data, + Data: event.GetData(), }) } diff --git a/internal/api/models/errors.go b/internal/api/models/errors.go index e87aafe1..1bbe3ec5 100644 --- a/internal/api/models/errors.go +++ b/internal/api/models/errors.go @@ -16,5 +16,6 @@ package models type ErrorResponse struct { + Code string `json:"code,omitempty"` Details string `json:"details"` } diff --git a/internal/update/apt/service.go b/internal/update/apt/service.go index f3d3984e..3a6f7288 100644 --- a/internal/update/apt/service.go +++ b/internal/update/apt/service.go @@ -25,7 +25,6 @@ import ( "regexp" "strings" "sync" - "time" "github.com/arduino/go-paths-helper" "go.bug.st/f" @@ -74,94 +73,78 @@ func (s *Service) ListUpgradablePackages(ctx context.Context, matcher func(updat // UpgradePackages upgrades the specified packages using the `apt-get upgrade` command. // It publishes events to subscribers during the upgrade process. // It returns an error if the upgrade is already in progress or if the upgrade command fails. -func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) { +func (s *Service) UpgradePackages(ctx context.Context, names []string) (iter.Seq[update.Event], error) { if !s.lock.TryLock() { return nil, update.ErrOperationAlreadyInProgress } - eventsCh := make(chan update.Event, 100) - go func() { + return func(yield func(update.Event) bool) { defer s.lock.Unlock() - defer close(eventsCh) - ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) - defer cancel() - - eventsCh <- update.Event{Type: update.StartEvent, Data: "Upgrade is starting"} - stream := runUpgradeCommand(ctx, names) - for line, err := range stream { + if !yield(update.NewDataEvent(update.StartEvent, "Upgrade is starting")) { + return + } + for line, err := range runUpgradeCommand(ctx, names) { if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Err: err, - Data: "Error running upgrade command", - } - slog.Error("error processing upgrade command output", "error", err) + _ = yield(update.NewErrorEvent(fmt.Errorf("error running upgrade command: %w", err))) + return + } + if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) { return } - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: line} } - eventsCh <- update.Event{Type: update.StartEvent, Data: "apt cleaning cache is starting"} + + if !yield(update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting")) { + return + } for line, err := range runAptCleanCommand(ctx) { if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Err: err, - Data: "Error running apt clean command", - } - slog.Error("error processing apt clean command output", "error", err) + _ = yield(update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err))) return } - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: line} + if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) { + return + } + } + + if !yield(update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....")) { + return } - // TEMPORARY PATCH: stopping and destroying docker containers and images since IDE does not implement it yet. - // TODO: Remove this workaround once IDE implements it. - // Tracking issue: https://github.com/arduino/arduino-app-cli/issues/623 - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: "Stop and destroy docker containers and images ..."} - streamCleanup := cleanupDockerContainers(ctx) - for line, err := range streamCleanup { + for line, err := range cleanupDockerContainers(ctx) { if err != nil { // TODO: maybe we should retun an error or a better feedback to the user? // currently, we just log the error and continue considenring not blocking - slog.Error("Error stopping and destroying docker containers", "error", err) + slog.Warn("Error stopping and destroying docker containers", "error", err) + } else if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) { + return } - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: line} } - // TEMPORARY PATCH: Install the latest docker images and show the logs to the users. // TODO: Remove this workaround once docker image versions are no longer hardcoded in arduino-app-cli. // Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600 // Currently, we need to launch `arduino-app-cli system init` to pull the latest docker images because // the version of the docker images are hardcoded in the (new downloaded) version of the arduino-app-cli. - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: "Pulling the latest docker images ..."} - streamDocker := pullDockerImages(ctx) - for line, err := range streamDocker { + if !yield(update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ...")) { + return + } + for line, err := range pullDockerImages(ctx) { if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Err: err, - Data: "Error upgrading docker images", - } - slog.Error("error upgrading docker images", "error", err) + _ = yield(update.NewErrorEvent(fmt.Errorf("error pulling docker images: %w", err))) return } - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: line} - } - eventsCh <- update.Event{Type: update.RestartEvent, Data: "Upgrade completed. Restarting ..."} - - err := restartServices(ctx) - if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Err: err, - Data: "Error restart services after upgrade", + if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) { + return } - slog.Error("failed to restart services", "error", err) + } + if !yield(update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...")) { return } - }() - return eventsCh, nil + if err := restartServices(ctx); err != nil { + _ = yield(update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err))) + return + } + }, nil } // runDpkgConfigureCommand is need in case an upgrade was interrupted in the middle diff --git a/internal/update/arduino/arduino.go b/internal/update/arduino/arduino.go index 01076dff..c3f2d699 100644 --- a/internal/update/arduino/arduino.go +++ b/internal/update/arduino/arduino.go @@ -18,9 +18,10 @@ package arduino import ( "context" "errors" + "fmt" + "iter" "log/slog" "sync" - "time" "github.com/arduino/arduino-cli/commands" "github.com/arduino/arduino-cli/commands/cmderrors" @@ -125,51 +126,42 @@ func (a *ArduinoPlatformUpdater) ListUpgradablePackages(ctx context.Context, _ f } // UpgradePackages implements ServiceUpdater. -func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) { +func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (iter.Seq[update.Event], error) { if !a.lock.TryLock() { return nil, update.ErrOperationAlreadyInProgress } - eventsCh := make(chan update.Event, 100) - downloadProgressCB := func(curr *rpc.DownloadProgress) { - data := helpers.ArduinoCLIDownloadProgressToString(curr) - slog.Debug("Download progress", slog.String("download_progress", data)) - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: data} - } - taskProgressCB := func(msg *rpc.TaskProgress) { - data := helpers.ArduinoCLITaskProgressToString(msg) - slog.Debug("Task progress", slog.String("task_progress", data)) - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: data} - } - - go func() { + return func(yield func(update.Event) bool) { defer a.lock.Unlock() - defer close(eventsCh) - ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) - defer cancel() + downloadProgressCB := func(curr *rpc.DownloadProgress) { + data := helpers.ArduinoCLIDownloadProgressToString(curr) + slog.Debug("Download progress", slog.String("download_progress", data)) + // TODO: add termination + _ = yield(update.NewDataEvent(update.UpgradeLineEvent, data)) + } + taskProgressCB := func(msg *rpc.TaskProgress) { + data := helpers.ArduinoCLITaskProgressToString(msg) + slog.Debug("Task progress", slog.String("task_progress", data)) + // TODO: add termination + _ = yield(update.NewDataEvent(update.UpgradeLineEvent, data)) + } - eventsCh <- update.Event{Type: update.StartEvent, Data: "Upgrade is starting"} + if !yield(update.NewDataEvent(update.StartEvent, "Upgrade is starting")) { + return + } logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli srv := commands.NewArduinoCoreServer() if err := setConfig(ctx, srv); err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Err: err, - Data: "Error setting additional URLs", - } + _ = yield(update.NewErrorEvent(fmt.Errorf("error setting config: %w", err))) return } var inst *rpc.Instance if resp, err := srv.Create(ctx, &rpc.CreateRequest{}); err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Err: err, - Data: "Error creating Arduino instance", - } + _ = yield(update.NewErrorEvent(fmt.Errorf("error creating arduino-cli instance: %w", err))) return } else { inst = resp.GetInstance() @@ -185,19 +177,11 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st { stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB) if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Err: err, - Data: "Error updating index", - } + _ = yield(update.NewErrorEvent(fmt.Errorf("error updating index: %w", err))) return } if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Err: err, - Data: "Error initializing Arduino instance", - } + _ = yield(update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err))) return } } @@ -219,17 +203,13 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st ); err != nil { var alreadyPresent *cmderrors.PlatformAlreadyAtTheLatestVersionError if errors.As(err, &alreadyPresent) { - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: alreadyPresent.Error()} + _ = yield(update.NewDataEvent(update.UpgradeLineEvent, alreadyPresent.Error())) return } var notFound *cmderrors.PlatformNotFoundError if !errors.As(err, ¬Found) { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Err: err, - Data: "Error upgrading platform", - } + _ = yield(update.NewErrorEvent(fmt.Errorf("error upgrading platform: %w", err))) return } // If the platform is not found, we will try to install it @@ -246,23 +226,17 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st ), ) if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Err: err, - Data: "Error installing platform", - } + _ = yield(update.NewErrorEvent(fmt.Errorf("error installing platform: %w", err))) return } } else if respCB().GetPlatform() == nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Data: "platform upgrade failed", - } + _ = yield(update.NewErrorEvent(fmt.Errorf("platform upgrade failed"))) return } cbw := orchestrator.NewCallbackWriter(func(line string) { - eventsCh <- update.Event{Type: update.UpgradeLineEvent, Data: line} + // TODO: add termination + _ = yield(update.NewDataEvent(update.UpgradeLineEvent, line)) }) err := srv.BurnBootloader( @@ -274,14 +248,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st commands.BurnBootloaderToServerStreams(ctx, cbw, cbw), ) if err != nil { - eventsCh <- update.Event{ - Type: update.ErrorEvent, - Err: err, - Data: "Error burning bootloader", - } + _ = yield(update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err))) return } - }() - - return eventsCh, nil + }, nil } diff --git a/internal/update/errors.go b/internal/update/errors.go new file mode 100644 index 00000000..9f65a63c --- /dev/null +++ b/internal/update/errors.go @@ -0,0 +1,55 @@ +package update + +import "errors" + +type ErrorCode string + +// TODO: add the error to the openAPI spec as an enum +const ( + NoInternetConnectionCode ErrorCode = "NO_INTERNET_CONNECTION" + OperationInProgressCode ErrorCode = "OPERATION_IN_PROGRESS" + UnknownErrorCode ErrorCode = "UNKNOWN_ERROR" +) + +var ( + ErrOperationAlreadyInProgress = &UpdateError{ + Code: OperationInProgressCode, + Details: "an operation is already in progress", + } + ErrNoInternetConnection = &UpdateError{ + Code: NoInternetConnectionCode, + Details: "no internet connection available", + } +) + +type UpdateError struct { + Code ErrorCode `json:"code"` + Details string `json:"details"` + + err error +} + +func (e *UpdateError) Error() string { + return e.Details +} + +func (e *UpdateError) Unwrap() error { + return e.err +} + +func NewUnkownError(err error) *UpdateError { + return &UpdateError{ + Details: err.Error(), + err: err, + } +} + +func GetUpdateErrorCode(err error) ErrorCode { + var updateError *UpdateError + if errors.As(err, &updateError) { + if updateError.Code != "" { + return updateError.Code + } + } + return UnknownErrorCode +} diff --git a/internal/update/errors_test.go b/internal/update/errors_test.go new file mode 100644 index 00000000..6b3a8bca --- /dev/null +++ b/internal/update/errors_test.go @@ -0,0 +1,32 @@ +package update + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUpdateError(t *testing.T) { + t.Run("known error", func(t *testing.T) { + var err error = &UpdateError{ + Code: NoInternetConnectionCode, + Details: "no internet connection available", + } + assert.Equal(t, "no internet connection available", err.Error()) + assert.Equal(t, "no internet connection available", fmt.Sprintf("%s", err)) + assert.Equal(t, NoInternetConnectionCode, GetUpdateErrorCode(err)) + }) + + t.Run("unknown error", func(t *testing.T) { + var underlyingErr = errors.New("underlying error") + var updateErr error = NewUnkownError(underlyingErr) + + assert.Equal(t, "underlying error", updateErr.Error()) + assert.Equal(t, "underlying error", fmt.Sprintf("%s", updateErr)) + assert.Equal(t, underlyingErr, errors.Unwrap(updateErr)) + assert.True(t, errors.Is(updateErr, underlyingErr)) + assert.Equal(t, UnknownErrorCode, GetUpdateErrorCode(updateErr)) + }) +} diff --git a/internal/update/event.go b/internal/update/event.go index 0f6c1a51..2aac04ee 100644 --- a/internal/update/event.go +++ b/internal/update/event.go @@ -15,6 +15,8 @@ package update +import "go.bug.st/f" + // EventType defines the type of upgrade event. type EventType int @@ -29,8 +31,9 @@ const ( // Event represents a single event in the upgrade process. type Event struct { Type EventType - Data string - Err error // Optional error field for error events + + data string + err error // error field for error events } func (t EventType) String() string { @@ -50,6 +53,30 @@ func (t EventType) String() string { } } +func NewDataEvent(t EventType, data string) Event { + return Event{ + Type: t, + data: data, + } +} + +func NewErrorEvent(err error) Event { + return Event{ + Type: ErrorEvent, + err: err, + } +} + +func (e Event) GetData() string { + f.Assert(e.Type != ErrorEvent, "not a data event") + return e.data +} + +func (e Event) GetError() error { + f.Assert(e.Type == ErrorEvent, "not an error event") + return e.err +} + type PackageType string const ( diff --git a/internal/update/update.go b/internal/update/update.go index 7a254478..feffe7a1 100644 --- a/internal/update/update.go +++ b/internal/update/update.go @@ -17,8 +17,8 @@ package update import ( "context" - "errors" "fmt" + "iter" "log/slog" "net/http" "strings" @@ -28,8 +28,6 @@ import ( "golang.org/x/sync/errgroup" ) -var ErrOperationAlreadyInProgress = errors.New("an operation is already in progress") - var MatchArduinoPackage = func(p UpgradablePackage) bool { return strings.HasPrefix(p.Name, "arduino-") || (p.Name == "adbd" && strings.Contains(p.ToVersion, "arduino")) // NOTE: changing this check could remove the adbd package, breaking the device access. @@ -49,7 +47,7 @@ type UpgradablePackage struct { type ServiceUpdater interface { ListUpgradablePackages(ctx context.Context, matcher func(UpgradablePackage) bool) ([]UpgradablePackage, error) - UpgradePackages(ctx context.Context, names []string) (<-chan Event, error) + UpgradePackages(ctx context.Context, names []string) (iter.Seq[Event], error) } type Manager struct { @@ -78,7 +76,7 @@ func (m *Manager) ListUpgradablePackages(ctx context.Context, matcher func(Upgra // Make sure to be connected to the internet, before checking for updates. // This is needed because the checks below work also when offline (using cached data). if !isConnected() { - return nil, errors.New("no internet connectivity") + return nil, ErrNoInternetConnection } // Get the list of upgradable packages from two sources (deb and platform) in parallel. @@ -141,12 +139,7 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) // in the middle the upgrade of the cores. arduinoEvents, err := m.arduinoPlatformUpdateService.UpgradePackages(ctx, arduinoPlatform) if err != nil { - m.broadcast( - Event{ - Type: ErrorEvent, - Data: "failed to upgrade Arduino packages", - Err: err, - }) + m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade Arduino packages: %w", err))) return } for e := range arduinoEvents { @@ -155,18 +148,14 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) aptEvents, err := m.debUpdateService.UpgradePackages(ctx, debPkgs) if err != nil { - m.broadcast( - Event{ - Type: ErrorEvent, - Data: "failed to upgrade APT packages", - Err: err, - }) + m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade APT packages: %w", err))) return } for e := range aptEvents { m.broadcast(e) } - m.broadcast(Event{Type: DoneEvent, Data: "Upgrade completed successfully"}) + + m.broadcast(NewDataEvent(DoneEvent, "Update completed")) }() return nil } @@ -175,17 +164,17 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) func (b *Manager) Subscribe() chan Event { eventCh := make(chan Event, 100) b.mu.Lock() + defer b.mu.Unlock() b.subs[eventCh] = struct{}{} - b.mu.Unlock() return eventCh } // Unsubscribe removes the channel from the list of subscribers and closes it. func (b *Manager) Unsubscribe(eventCh chan Event) { b.mu.Lock() + defer b.mu.Unlock() delete(b.subs, eventCh) close(eventCh) - b.mu.Unlock() } func (b *Manager) broadcast(event Event) { @@ -201,8 +190,7 @@ func (b *Manager) broadcast(event Event) { default: slog.Warn("Discarding event (channel full)", slog.String("type", event.Type.String()), - slog.String("data", fmt.Sprintf("%v", event.Data)), - slog.Any("error", event.Err), + slog.Any("event", event), ) } }