Skip to content

Commit 6b20319

Browse files
committed
refactor(update): use iterators instead channels
1 parent ec13ed1 commit 6b20319

File tree

3 files changed

+72
-59
lines changed

3 files changed

+72
-59
lines changed

internal/update/apt/service.go

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -73,70 +73,78 @@ func (s *Service) ListUpgradablePackages(ctx context.Context, matcher func(updat
7373
// UpgradePackages upgrades the specified packages using the `apt-get upgrade` command.
7474
// It publishes events to subscribers during the upgrade process.
7575
// It returns an error if the upgrade is already in progress or if the upgrade command fails.
76-
func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) {
76+
func (s *Service) UpgradePackages(ctx context.Context, names []string) (iter.Seq[update.Event], error) {
7777
if !s.lock.TryLock() {
7878
return nil, update.ErrOperationAlreadyInProgress
7979
}
80-
eventsCh := make(chan update.Event, 100)
8180

82-
go func() {
81+
return func(yield func(update.Event) bool) {
8382
defer s.lock.Unlock()
84-
defer close(eventsCh)
8583

86-
eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting")
87-
stream := runUpgradeCommand(ctx, names)
88-
for line, err := range stream {
84+
if !yield(update.NewDataEvent(update.StartEvent, "Upgrade is starting")) {
85+
return
86+
}
87+
for line, err := range runUpgradeCommand(ctx, names) {
8988
if err != nil {
90-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error running upgrade command: %w", err))
89+
_ = yield(update.NewErrorEvent(fmt.Errorf("error running upgrade command: %w", err)))
90+
return
91+
}
92+
if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) {
9193
return
9294
}
93-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
9495
}
9596

96-
eventsCh <- update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting")
97+
if !yield(update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting")) {
98+
return
99+
}
97100
for line, err := range runAptCleanCommand(ctx) {
98101
if err != nil {
99-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err))
102+
_ = yield(update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err)))
103+
return
104+
}
105+
if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) {
100106
return
101107
}
102-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
103108
}
104109

105-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....")
106-
streamCleanup := cleanupDockerContainers(ctx)
107-
for line, err := range streamCleanup {
110+
if !yield(update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....")) {
111+
return
112+
}
113+
for line, err := range cleanupDockerContainers(ctx) {
108114
if err != nil {
109115
// TODO: maybe we should retun an error or a better feedback to the user?
110116
// currently, we just log the error and continue considenring not blocking
111117
slog.Warn("Error stopping and destroying docker containers", "error", err)
112-
} else {
113-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
118+
} else if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) {
119+
return
114120
}
115121
}
116122

117123
// TODO: Remove this workaround once docker image versions are no longer hardcoded in arduino-app-cli.
118124
// Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600
119125
// Currently, we need to launch `arduino-app-cli system init` to pull the latest docker images because
120126
// the version of the docker images are hardcoded in the (new downloaded) version of the arduino-app-cli.
121-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ...")
122-
streamDocker := pullDockerImages(ctx)
123-
for line, err := range streamDocker {
127+
if !yield(update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ...")) {
128+
return
129+
}
130+
for line, err := range pullDockerImages(ctx) {
124131
if err != nil {
125-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error pulling docker images: %w", err))
132+
_ = yield(update.NewErrorEvent(fmt.Errorf("error pulling docker images: %w", err)))
133+
return
134+
}
135+
if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) {
126136
return
127137
}
128-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
129138
}
130-
eventsCh <- update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...")
131-
132-
err := restartServices(ctx)
133-
if err != nil {
134-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err))
139+
if !yield(update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...")) {
135140
return
136141
}
137-
}()
138142

139-
return eventsCh, nil
143+
if err := restartServices(ctx); err != nil {
144+
_ = yield(update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err)))
145+
return
146+
}
147+
}, nil
140148
}
141149

142150
// runDpkgConfigureCommand is need in case an upgrade was interrupted in the middle

internal/update/arduino/arduino.go

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"errors"
2121
"fmt"
22+
"iter"
2223
"log/slog"
2324
"sync"
2425

@@ -125,40 +126,42 @@ func (a *ArduinoPlatformUpdater) ListUpgradablePackages(ctx context.Context, _ f
125126
}
126127

127128
// UpgradePackages implements ServiceUpdater.
128-
func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) {
129+
func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (iter.Seq[update.Event], error) {
129130
if !a.lock.TryLock() {
130131
return nil, update.ErrOperationAlreadyInProgress
131132
}
132-
eventsCh := make(chan update.Event, 100)
133133

134-
downloadProgressCB := func(curr *rpc.DownloadProgress) {
135-
data := helpers.ArduinoCLIDownloadProgressToString(curr)
136-
slog.Debug("Download progress", slog.String("download_progress", data))
137-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
138-
}
139-
taskProgressCB := func(msg *rpc.TaskProgress) {
140-
data := helpers.ArduinoCLITaskProgressToString(msg)
141-
slog.Debug("Task progress", slog.String("task_progress", data))
142-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
143-
}
144-
145-
go func() {
134+
return func(yield func(update.Event) bool) {
146135
defer a.lock.Unlock()
147-
defer close(eventsCh)
148136

149-
eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting")
137+
downloadProgressCB := func(curr *rpc.DownloadProgress) {
138+
data := helpers.ArduinoCLIDownloadProgressToString(curr)
139+
slog.Debug("Download progress", slog.String("download_progress", data))
140+
// TODO: add termination
141+
_ = yield(update.NewDataEvent(update.UpgradeLineEvent, data))
142+
}
143+
taskProgressCB := func(msg *rpc.TaskProgress) {
144+
data := helpers.ArduinoCLITaskProgressToString(msg)
145+
slog.Debug("Task progress", slog.String("task_progress", data))
146+
// TODO: add termination
147+
_ = yield(update.NewDataEvent(update.UpgradeLineEvent, data))
148+
}
149+
150+
if !yield(update.NewDataEvent(update.StartEvent, "Upgrade is starting")) {
151+
return
152+
}
150153

151154
logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli
152155
srv := commands.NewArduinoCoreServer()
153156

154157
if err := setConfig(ctx, srv); err != nil {
155-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error setting config: %w", err))
158+
_ = yield(update.NewErrorEvent(fmt.Errorf("error setting config: %w", err)))
156159
return
157160
}
158161

159162
var inst *rpc.Instance
160163
if resp, err := srv.Create(ctx, &rpc.CreateRequest{}); err != nil {
161-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error creating arduino-cli instance: %w", err))
164+
_ = yield(update.NewErrorEvent(fmt.Errorf("error creating arduino-cli instance: %w", err)))
162165
return
163166
} else {
164167
inst = resp.GetInstance()
@@ -174,11 +177,11 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
174177
{
175178
stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB)
176179
if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil {
177-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error updating index: %w", err))
180+
_ = yield(update.NewErrorEvent(fmt.Errorf("error updating index: %w", err)))
178181
return
179182
}
180183
if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil {
181-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err))
184+
_ = yield(update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err)))
182185
return
183186
}
184187
}
@@ -200,13 +203,13 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
200203
); err != nil {
201204
var alreadyPresent *cmderrors.PlatformAlreadyAtTheLatestVersionError
202205
if errors.As(err, &alreadyPresent) {
203-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, alreadyPresent.Error())
206+
_ = yield(update.NewDataEvent(update.UpgradeLineEvent, alreadyPresent.Error()))
204207
return
205208
}
206209

207210
var notFound *cmderrors.PlatformNotFoundError
208211
if !errors.As(err, &notFound) {
209-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error upgrading platform: %w", err))
212+
_ = yield(update.NewErrorEvent(fmt.Errorf("error upgrading platform: %w", err)))
210213
return
211214
}
212215
// If the platform is not found, we will try to install it
@@ -223,16 +226,19 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
223226
),
224227
)
225228
if err != nil {
226-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error installing platform: %w", err))
229+
_ = yield(update.NewErrorEvent(fmt.Errorf("error installing platform: %w", err)))
227230
return
228231
}
229232
} else if respCB().GetPlatform() == nil {
230-
eventsCh <- update.NewErrorEvent(fmt.Errorf("platform upgrade failed"))
233+
_ = yield(update.NewErrorEvent(fmt.Errorf("platform upgrade failed")))
231234
return
232235
}
233236

234237
cbw := orchestrator.NewCallbackWriter(func(line string) {
235-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
238+
// TODO: add termination
239+
if !yield(update.NewDataEvent(update.UpgradeLineEvent, line)) {
240+
return
241+
}
236242
})
237243

238244
err := srv.BurnBootloader(
@@ -244,10 +250,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
244250
commands.BurnBootloaderToServerStreams(ctx, cbw, cbw),
245251
)
246252
if err != nil {
247-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err))
253+
_ = yield(update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err)))
248254
return
249255
}
250-
}()
251-
252-
return eventsCh, nil
256+
}, nil
253257
}

internal/update/update.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package update
1818
import (
1919
"context"
2020
"fmt"
21+
"iter"
2122
"log/slog"
2223
"net/http"
2324
"strings"
@@ -46,7 +47,7 @@ type UpgradablePackage struct {
4647

4748
type ServiceUpdater interface {
4849
ListUpgradablePackages(ctx context.Context, matcher func(UpgradablePackage) bool) ([]UpgradablePackage, error)
49-
UpgradePackages(ctx context.Context, names []string) (<-chan Event, error)
50+
UpgradePackages(ctx context.Context, names []string) (iter.Seq[Event], error)
5051
}
5152

5253
type Manager struct {

0 commit comments

Comments
 (0)