Skip to content

Commit d784575

Browse files
committed
vmdriver(vz): Wait for Start() to complete on server side
Signed-off-by: Ansuman Sahoo <anshumansahoo500@gmail.com>
1 parent 8b72891 commit d784575

File tree

4 files changed

+46
-6
lines changed

4 files changed

+46
-6
lines changed

pkg/driver/external/client/methods.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ func (d *DriverClient) CreateDisk(ctx context.Context) error {
5858
return nil
5959
}
6060

61+
// Start initiates the driver instance and receives streaming responses. It blocks until
62+
// receiving the initial success response, then spawns a goroutine to consume subsequent
63+
// error messages from the stream. Any errors from the driver are sent to the channel.
6164
func (d *DriverClient) Start(ctx context.Context) (chan error, error) {
6265
d.logger.Debug("Starting driver instance")
6366

@@ -67,19 +70,37 @@ func (d *DriverClient) Start(ctx context.Context) (chan error, error) {
6770
return nil, err
6871
}
6972

73+
// Blocking to receive an initial response to ensure Start() is initiated
74+
// at the server-side.
75+
initialResp, err := stream.Recv()
76+
if err != nil {
77+
d.logger.WithError(err).Error("Error receiving initial response from driver start")
78+
return nil, err
79+
}
80+
if !initialResp.Success {
81+
return nil, errors.New(initialResp.Error)
82+
}
83+
84+
go func() {
85+
<-ctx.Done()
86+
if closeErr := stream.CloseSend(); closeErr != nil {
87+
d.logger.WithError(closeErr).Warn("Failed to close stream")
88+
}
89+
}()
90+
7091
errCh := make(chan error, 1)
7192
go func() {
7293
for {
73-
errorStream, err := stream.Recv()
94+
respStream, err := stream.Recv()
7495
if err != nil {
75-
d.logger.Errorf("Error receiving response from driver: %v", err)
96+
d.logger.Infof("Error receiving response from driver: %v", err)
7697
return
7798
}
78-
d.logger.Debugf("Received response: %v", errorStream)
79-
if !errorStream.Success {
80-
errCh <- errors.New(errorStream.Error)
99+
d.logger.Debugf("Received response: %v", respStream)
100+
if !respStream.Success {
101+
errCh <- errors.New(respStream.Error)
81102
} else {
82-
errCh <- nil
103+
close(errCh)
83104
return
84105
}
85106
}

pkg/driver/external/driver.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ message InfoResponse{
4646
bytes info_json = 1;
4747
}
4848

49+
// StartResponse is a streamed response for Start() RPC. It tries to mimic
50+
// errChan from pkg/driver/driver.go. The server sends an initial response
51+
// with success=true when Start() is initiated. If errors occur, they are
52+
// sent as success=false with the error field populated. When the error channel
53+
// closes, a final success=true message is sent.
4954
message StartResponse {
5055
bool success = 1;
5156
string error = 2;

pkg/driver/external/server/methods.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,20 @@ func (s *DriverServer) Start(_ *emptypb.Empty, stream pb.Driver_StartServer) err
2626
errChan, err := s.driver.Start(stream.Context())
2727
if err != nil {
2828
s.logger.Errorf("Start failed: %v", err)
29+
if sendErr := stream.Send(&pb.StartResponse{Success: false, Error: err.Error()}); sendErr != nil {
30+
s.logger.Errorf("Failed to send error response: %v", sendErr)
31+
return status.Errorf(codes.Internal, "failed to send error response: %v", sendErr)
32+
}
2933
return status.Errorf(codes.Internal, "failed to start driver: %v", err)
3034
}
3135

36+
// First send a success response upon receiving the errChan to unblock the client
37+
// and start receiving errors (if any).
38+
if err := stream.Send(&pb.StartResponse{Success: true}); err != nil {
39+
s.logger.Errorf("Failed to send success response: %v", err)
40+
return status.Errorf(codes.Internal, "failed to send success response: %v", err)
41+
}
42+
3243
for {
3344
select {
3445
case err, ok := <-errChan:

pkg/driver/external/server/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ func handlePreConfiguredDriverAction(ctx context.Context, driver driver.Driver)
207207
}
208208
}
209209

210+
// Start begins the driver startup process. It sends an initial response to unblock
211+
// the client and then streams subsequent errors(if any), as the driver initializes.
212+
// A final success message is streamed upon successful completion.
210213
func Start(extDriver *registry.ExternalDriver, instName string) error {
211214
extDriver.Logger.Debugf("Starting external driver at %s", extDriver.Path)
212215
if instName == "" {

0 commit comments

Comments
 (0)