Skip to content

Commit f4812ea

Browse files
committed
add progress events for update process
1 parent a0fa24d commit f4812ea

File tree

4 files changed

+82
-24
lines changed

4 files changed

+82
-24
lines changed

internal/api/handlers/update.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,17 @@ func HandleUpdateEvents(updater *update.Manager) http.HandlerFunc {
128128
slog.Info("APT event channel closed, stopping SSE stream")
129129
return
130130
}
131-
if event.Type == eventstream.ErrorEvent {
131+
switch event.Type {
132+
case eventstream.ErrorEvent:
132133
sseStream.SendError(render.SSEErrorData{
133134
Code: render.InternalServiceErr,
134-
Message: event.Data,
135+
Message: event.Data})
136+
case eventstream.ProgressEvent:
137+
sseStream.Send(render.SSEEvent{
138+
Type: event.Type.String(),
139+
Data: event.Progress,
135140
})
136-
} else {
141+
default:
137142
sseStream.Send(render.SSEEvent{
138143
Type: event.Type.String(),
139144
Data: event.Data,

internal/update/apt/service.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan e
103103
eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line}
104104
}
105105
eventsCh <- eventstream.Event{Type: eventstream.StartEvent, Data: "apt cleaning cache is starting"}
106+
eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: 80.0}
107+
eventsCh <- eventstream.Event{Type: eventstream.StartEvent, Data: "apt cleaning cache is starting"}
106108
for line, err := range runAptCleanCommand(ctx) {
107109
if err != nil {
108110
eventsCh <- eventstream.Event{
@@ -115,6 +117,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan e
115117
}
116118
eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line}
117119
}
120+
eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: 85.0}
118121
// TEMPORARY PATCH: stopping and destroying docker containers and images since IDE does not implement it yet.
119122
// TODO: Remove this workaround once IDE implements it.
120123
// Tracking issue: https://github.com/arduino/arduino-app-cli/issues/623
@@ -128,7 +131,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan e
128131
}
129132
eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line}
130133
}
131-
134+
eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: 90.0}
132135
// TEMPORARY PATCH: Install the latest docker images and show the logs to the users.
133136
// TODO: Remove this workaround once docker image versions are no longer hardcoded in arduino-app-cli.
134137
// Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600
@@ -148,6 +151,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan e
148151
}
149152
eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: line}
150153
}
154+
eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: 100.0}
151155
eventsCh <- eventstream.Event{Type: eventstream.RestartEvent, Data: "Upgrade completed. Restarting ..."}
152156

153157
err := restartServices(ctx)

internal/update/arduino/arduino.go

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -132,27 +132,51 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
132132
}
133133
eventsCh := make(chan eventstream.Event, 100)
134134

135-
downloadProgressCB := func(curr *rpc.DownloadProgress) {
136-
data := helpers.ArduinoCLIDownloadProgressToString(curr)
137-
slog.Debug("Download progress", slog.String("download_progress", data))
138-
eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: data}
139-
}
140-
taskProgressCB := func(msg *rpc.TaskProgress) {
141-
data := helpers.ArduinoCLITaskProgressToString(msg)
142-
slog.Debug("Task progress", slog.String("task_progress", data))
143-
eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: data}
144-
}
145-
146135
go func() {
147136
defer a.lock.Unlock()
148137
defer close(eventsCh)
149-
138+
const indexWeight float32 = 30.0
139+
const indexBase float32 = 0.0
140+
const upgradeBase float32 = 30.0
141+
const upgradeWeight float32 = 60.0
142+
143+
makeDownloadProgressCallback := func(basePercentage, phaseWeight float32) func(*rpc.DownloadProgress) {
144+
return func(curr *rpc.DownloadProgress) {
145+
data := helpers.ArduinoCLIDownloadProgressToString(curr)
146+
eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: data}
147+
if updateInfo := curr.GetUpdate(); updateInfo != nil {
148+
if updateInfo.GetTotalSize() <= 0 {
149+
return
150+
}
151+
localProgress := (float32(updateInfo.GetDownloaded()) / float32(updateInfo.GetTotalSize())) * 100.0
152+
totalArduinoProgress := basePercentage + (localProgress/100.0)*phaseWeight
153+
eventsCh <- eventstream.Event{
154+
Type: eventstream.ProgressEvent,
155+
Progress: totalArduinoProgress,
156+
}
157+
}
158+
}
159+
}
160+
makeTaskProgressCallback := func(basePercentage, phaseWeight float32) func(*rpc.TaskProgress) {
161+
return func(msg *rpc.TaskProgress) {
162+
data := helpers.ArduinoCLITaskProgressToString(msg)
163+
eventsCh <- eventstream.Event{Type: eventstream.UpgradeLineEvent, Data: data}
164+
if !msg.GetCompleted() {
165+
localProgress := msg.GetPercent()
166+
totalArduinoProgress := basePercentage + (localProgress/100.0)*phaseWeight
167+
eventsCh <- eventstream.Event{
168+
Type: eventstream.ProgressEvent,
169+
Progress: totalArduinoProgress,
170+
}
171+
}
172+
}
173+
}
150174
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
151175
defer cancel()
152176

153177
eventsCh <- eventstream.Event{Type: eventstream.StartEvent, Data: "Upgrade is starting"}
154178

155-
logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli
179+
logrus.SetLevel(logrus.ErrorLevel)
156180
srv := commands.NewArduinoCoreServer()
157181

158182
if err := setConfig(ctx, srv); err != nil {
@@ -184,7 +208,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
184208
}()
185209

186210
{
187-
stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB)
211+
updateIndexProgressCB := makeDownloadProgressCallback(indexBase, indexWeight)
212+
stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, updateIndexProgressCB)
188213
if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil {
189214
eventsCh <- eventstream.Event{
190215
Type: eventstream.ErrorEvent,
@@ -193,6 +218,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
193218
}
194219
return
195220
}
221+
eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: indexBase + indexWeight}
222+
196223
if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil {
197224
eventsCh <- eventstream.Event{
198225
Type: eventstream.ErrorEvent,
@@ -203,10 +230,13 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
203230
}
204231
}
205232

233+
platformDownloadCB := makeDownloadProgressCallback(upgradeBase, upgradeWeight)
234+
platformTaskCB := makeTaskProgressCallback(upgradeBase, upgradeWeight)
235+
206236
stream, respCB := commands.PlatformUpgradeStreamResponseToCallbackFunction(
207237
ctx,
208-
downloadProgressCB,
209-
taskProgressCB,
238+
platformDownloadCB,
239+
platformTaskCB,
210240
)
211241
if err := srv.PlatformUpgrade(
212242
&rpc.PlatformUpgradeRequest{
@@ -242,8 +272,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
242272
},
243273
commands.PlatformInstallStreamResponseToCallbackFunction(
244274
ctx,
245-
downloadProgressCB,
246-
taskProgressCB,
275+
platformDownloadCB,
276+
platformTaskCB,
247277
),
248278
)
249279
if err != nil {
@@ -282,6 +312,7 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
282312
}
283313
return
284314
}
315+
eventsCh <- eventstream.Event{Type: eventstream.ProgressEvent, Progress: 100.0}
285316
}()
286317

287318
return eventsCh, nil

internal/update/update.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,12 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage)
141141
// update of the cores we will end up with inconsistent state, or
142142
// we need to re run the upgrade because the orchestrator interrupted
143143
// in the middle the upgrade of the cores.
144+
145+
const arduinoWeight float32 = 20.0
146+
const aptWeight float32 = 80.0
147+
148+
m.broadcast(eventstream.Event{Type: eventstream.ProgressEvent, Progress: 0.0})
149+
144150
arduinoEvents, err := m.arduinoPlatformUpdateService.UpgradePackages(ctx, arduinoPlatform)
145151
if err != nil {
146152
m.broadcast(
@@ -152,9 +158,15 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage)
152158
return
153159
}
154160
for e := range arduinoEvents {
155-
m.broadcast(e)
161+
if e.Type == eventstream.ProgressEvent {
162+
globalProgress := (e.Progress / 100.0) * arduinoWeight
163+
m.broadcast(eventstream.Event{Type: eventstream.ProgressEvent, Progress: globalProgress})
164+
} else {
165+
m.broadcast(e)
166+
}
156167
}
157168

169+
m.broadcast(eventstream.Event{Type: eventstream.ProgressEvent, Progress: arduinoWeight})
158170
aptEvents, err := m.debUpdateService.UpgradePackages(ctx, debPkgs)
159171
if err != nil {
160172
m.broadcast(
@@ -166,8 +178,14 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage)
166178
return
167179
}
168180
for e := range aptEvents {
169-
m.broadcast(e)
181+
if e.Type == eventstream.ProgressEvent {
182+
globalProgress := arduinoWeight + (e.Progress/100.0)*aptWeight
183+
m.broadcast(eventstream.Event{Type: eventstream.ProgressEvent, Progress: globalProgress})
184+
} else {
185+
m.broadcast(e)
186+
}
170187
}
188+
m.broadcast(eventstream.Event{Type: eventstream.ProgressEvent, Progress: 100.0})
171189
m.broadcast(eventstream.Event{Type: eventstream.DoneEvent, Data: "Upgrade completed successfully"})
172190
}()
173191
return nil

0 commit comments

Comments
 (0)