Skip to content

Commit 082e720

Browse files
authored
Fix(test): resolve data race in StreamedRequest (#1727)
Replaces the boolean flag and timer goroutine with a channel and select statement to prevent a data race on the timeout in integration tests. This ensures safe concurrent access when handling test timeouts.
1 parent 042d529 commit 082e720

File tree

1 file changed

+32
-18
lines changed

1 file changed

+32
-18
lines changed

test/integration/util.go

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -54,35 +54,49 @@ func SendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient,
5454
return res, err
5555
}
5656

57-
func StreamedRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, requests []*extProcPb.ProcessingRequest, expectedResponses int) ([]*extProcPb.ProcessingResponse, error) {
57+
// StreamedRequest sends a series of requests and collects the specified number of responses.
58+
func StreamedRequest(
59+
t *testing.T,
60+
client extProcPb.ExternalProcessor_ProcessClient,
61+
requests []*extProcPb.ProcessingRequest,
62+
expectedResponses int,
63+
) ([]*extProcPb.ProcessingResponse, error) {
5864
for _, req := range requests {
5965
t.Logf("Sending request: %v", req)
6066
if err := client.Send(req); err != nil {
6167
t.Logf("Failed to send request %+v: %v", req, err)
6268
return nil, err
6369
}
6470
}
71+
6572
responses := []*extProcPb.ProcessingResponse{}
73+
for i := range expectedResponses {
74+
type recvResult struct {
75+
res *extProcPb.ProcessingResponse
76+
err error
77+
}
78+
recvChan := make(chan recvResult, 1)
6679

67-
// Make an incredible simple timeout func in the case where
68-
// there is less than the expected amount of responses; bail and fail.
69-
var simpleTimeout bool
70-
go func() {
71-
time.Sleep(10 * time.Second)
72-
simpleTimeout = true
73-
}()
80+
go func() {
81+
res, err := client.Recv()
82+
recvChan <- recvResult{res, err}
83+
}()
7484

75-
for range expectedResponses {
76-
if simpleTimeout {
77-
break
78-
}
79-
res, err := client.Recv()
80-
if err != nil && err != io.EOF {
81-
t.Logf("Failed to receive: %v", err)
82-
return nil, err
85+
select {
86+
case <-time.After(10 * time.Second):
87+
t.Logf("Timeout waiting for response %d of %d", i+1, expectedResponses)
88+
return responses, nil
89+
case result := <-recvChan:
90+
if result.err != nil {
91+
if result.err == io.EOF {
92+
return responses, nil
93+
}
94+
t.Logf("Failed to receive: %v", result.err)
95+
return nil, result.err
96+
}
97+
t.Logf("Received response %+v", result.res)
98+
responses = append(responses, result.res)
8399
}
84-
t.Logf("Received response %+v", res)
85-
responses = append(responses, res)
86100
}
87101
return responses, nil
88102
}

0 commit comments

Comments
 (0)