Skip to content

Commit e994e25

Browse files
author
James Cor
committed
resplit
1 parent e551a02 commit e994e25

File tree

1 file changed

+30
-8
lines changed

1 file changed

+30
-8
lines changed

server/handler.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,34 @@ func (h *Handler) resultForDefaultIter2(ctx *sql.Context, c *mysql.Conn, iter sq
803803
defer timer.Stop()
804804

805805
wg := sync.WaitGroup{}
806-
wg.Add(1)
806+
wg.Add(2)
807+
808+
// Read rows from iter and send them off
809+
var rowChan = make(chan sql.Row2, 512)
810+
eg.Go(func() (err error) {
811+
defer pan2err(&err)
812+
defer wg.Done()
813+
defer close(rowChan)
814+
for {
815+
select {
816+
case <-ctx.Done():
817+
return context.Cause(ctx)
818+
default:
819+
row, err := iter.Next2(ctx)
820+
if err == io.EOF {
821+
return nil
822+
}
823+
if err != nil {
824+
return err
825+
}
826+
select {
827+
case rowChan <- row:
828+
case <-ctx.Done():
829+
return nil
830+
}
831+
}
832+
}
833+
})
807834

808835
var res *sqltypes.Result
809836
var processedAtLeastOneBatch bool
@@ -831,20 +858,15 @@ func (h *Handler) resultForDefaultIter2(ctx *sql.Context, c *mysql.Conn, iter sq
831858
case <-ctx.Done():
832859
return context.Cause(ctx)
833860
case <-timer.C:
834-
// TODO: timer should probably go in its own thread, as rowChan is blocking
835861
if h.readTimeout != 0 {
836862
// Cancel and return so Vitess can call the CloseConnection callback
837863
ctx.GetLogger().Tracef("connection timeout")
838864
return ErrRowTimeout.New()
839865
}
840-
default:
841-
row, err := iter.Next2(ctx)
842-
if err == io.EOF {
866+
case row, ok := <-rowChan:
867+
if !ok {
843868
return nil
844869
}
845-
if err != nil {
846-
return err
847-
}
848870
ctx.GetLogger().Tracef("spooling result row %s", row)
849871
res.Rows = append(res.Rows, row)
850872
res.RowsAffected++

0 commit comments

Comments
 (0)