Skip to content

Commit f5ce75f

Browse files
committed
api: replaced Future.done with a sync.Cond
This commit reduces allocations. Future.done allocation replaced with - Future.cond (*sync.Cond) - Future.finished (atomic.Bool) Other code use Future.isDone() instead (Future.done == nil) check. Added Future.finish() marks Future as done. Future.WaitChan() now creates channel on demand. Closes #496
1 parent 713316b commit f5ce75f

File tree

8 files changed

+292
-544
lines changed

8 files changed

+292
-544
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1212

1313
* New types for MessagePack extensions compatible with go-option (#459).
1414
* Added `box.MustNew` wrapper for `box.New` without an error (#448).
15+
* Added Future.cond (sync.Cond) and Future.finished bool. Added Future.finish() marks Future as done (#496).
1516

1617
### Changed
1718

@@ -23,7 +24,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2324
* Removed deprecated `box.session.push()` support: Future.AppendPush()
2425
and Future.GetIterator() methods, ResponseIterator and TimeoutResponseIterator types,
2526
Future.pushes[], Future.ready (#480, #497).
26-
* `LogAppendPushFailed` replaced with `LogBoxSessionPushUnsupported` (#480)
27+
* `LogAppendPushFailed` replaced with `LogBoxSessionPushUnsupported` (#480).
28+
* Removed deprecated `Connection` methods, related interfaces and tests are updated (#479).
29+
* Future.done replaced with Future.cond (*sync.Cond) + Future.finished bool (#496).
2730

2831
### Fixed
2932

MIGRATION.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,24 @@ TODO
1414
* Removed deprecated `pool` methods, related interfaces and tests are updated.
1515
* Removed `box.session.push()` support: Future.AppendPush() and Future.GetIterator()
1616
methods, ResponseIterator and TimeoutResponseIterator types.
17+
* Removed deprecated `Connection` methods, related interfaces and tests are updated.
18+
19+
*NOTE*: due to Future.GetTyped() doesn't decode SelectRequest into structure, substitute Connection.GetTyped() following the example:
20+
```Go
21+
var singleTpl = Tuple{}
22+
err = conn.GetTyped(space, index, key, &singleTpl)
23+
```
24+
At now became:
25+
```Go
26+
var tpl []Tuple
27+
err = conn.Do(NewSelectRequest(space).
28+
Index(index).
29+
Limit(1).
30+
Key(key)
31+
).GetTyped(&tpl)
32+
singleTpl := tpl[0]
33+
```
34+
* Future.done replaced with Future.cond (*sync.Cond) + Future.finished bool.
1735

1836
## Migration from v1.x.x to v2.x.x
1937

boxerror_test.go

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,9 @@ func TestErrorTypeEval(t *testing.T) {
304304

305305
for name, testcase := range tupleCases {
306306
t.Run(name, func(t *testing.T) {
307-
data, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val})
307+
data, err := conn.Do(NewEvalRequest("return ...").
308+
Args([]interface{}{&testcase.tuple.val}),
309+
).Get()
308310
require.Nil(t, err)
309311
require.NotNil(t, data)
310312
require.Equal(t, len(data), 1)
@@ -324,7 +326,9 @@ func TestErrorTypeEvalTyped(t *testing.T) {
324326
for name, testcase := range tupleCases {
325327
t.Run(name, func(t *testing.T) {
326328
var res []BoxError
327-
err := conn.EvalTyped("return ...", []interface{}{&testcase.tuple.val}, &res)
329+
err := conn.Do(NewEvalRequest("return ...").
330+
Args([]interface{}{&testcase.tuple.val}),
331+
).GetTyped(&res)
328332
require.Nil(t, err)
329333
require.NotNil(t, res)
330334
require.Equal(t, len(res), 1)
@@ -340,12 +344,12 @@ func TestErrorTypeInsert(t *testing.T) {
340344
defer conn.Close()
341345

342346
truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
343-
_, err := conn.Eval(truncateEval, []interface{}{})
347+
_, err := conn.Do(NewEvalRequest(truncateEval).Args([]interface{}{})).Get()
344348
require.Nil(t, err)
345349

346350
for name, testcase := range tupleCases {
347351
t.Run(name, func(t *testing.T) {
348-
_, err = conn.Insert(space, &testcase.tuple)
352+
_, err = conn.Do(NewInsertRequest(space).Tuple(&testcase.tuple)).Get()
349353
require.Nil(t, err)
350354

351355
checkEval := fmt.Sprintf(`
@@ -365,7 +369,7 @@ func TestErrorTypeInsert(t *testing.T) {
365369
// of connector BoxError are equal to the Tarantool ones
366370
// since they may differ between different Tarantool versions
367371
// and editions.
368-
_, err := conn.Eval(checkEval, []interface{}{})
372+
_, err := conn.Do(NewEvalRequest(checkEval).Args([]interface{}{})).Get()
369373
require.Nilf(t, err, "Tuple has been successfully inserted")
370374
})
371375
}
@@ -378,13 +382,13 @@ func TestErrorTypeInsertTyped(t *testing.T) {
378382
defer conn.Close()
379383

380384
truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
381-
_, err := conn.Eval(truncateEval, []interface{}{})
385+
_, err := conn.Do(NewEvalRequest(truncateEval).Args([]interface{}{})).Get()
382386
require.Nil(t, err)
383387

384388
for name, testcase := range tupleCases {
385389
t.Run(name, func(t *testing.T) {
386390
var res []TupleBoxError
387-
err = conn.InsertTyped(space, &testcase.tuple, &res)
391+
err = conn.Do(NewInsertRequest(space).Tuple(&testcase.tuple)).GetTyped(&res)
388392
require.Nil(t, err)
389393
require.NotNil(t, res)
390394
require.Equal(t, len(res), 1)
@@ -407,7 +411,7 @@ func TestErrorTypeInsertTyped(t *testing.T) {
407411
// of connector BoxError are equal to the Tarantool ones
408412
// since they may differ between different Tarantool versions
409413
// and editions.
410-
_, err := conn.Eval(checkEval, []interface{}{})
414+
_, err := conn.Do(NewEvalRequest(checkEval).Args([]interface{}{})).Get()
411415
require.Nilf(t, err, "Tuple has been successfully inserted")
412416
})
413417
}
@@ -420,7 +424,7 @@ func TestErrorTypeSelect(t *testing.T) {
420424
defer conn.Close()
421425

422426
truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
423-
_, err := conn.Eval(truncateEval, []interface{}{})
427+
_, err := conn.Do(NewEvalRequest(truncateEval).Args([]interface{}{})).Get()
424428
require.Nil(t, err)
425429

426430
for name, testcase := range tupleCases {
@@ -433,13 +437,18 @@ func TestErrorTypeSelect(t *testing.T) {
433437
assert(tuple ~= nil)
434438
`, testcase.ttObj, space, testcase.tuple.pk)
435439

436-
_, err := conn.Eval(insertEval, []interface{}{})
440+
_, err := conn.Do(NewEvalRequest(insertEval).Args([]interface{}{})).Get()
437441
require.Nilf(t, err, "Tuple has been successfully inserted")
438442

439443
var offset uint32 = 0
440444
var limit uint32 = 1
441-
data, err := conn.Select(space, index, offset, limit, IterEq,
442-
[]interface{}{testcase.tuple.pk})
445+
data, err := conn.Do(NewSelectRequest(space).
446+
Index(index).
447+
Offset(offset).
448+
Limit(limit).
449+
Iterator(IterEq).
450+
Key([]interface{}{testcase.tuple.pk}),
451+
).Get()
443452
require.Nil(t, err)
444453
require.NotNil(t, data)
445454
require.Equalf(t, len(data), 1, "Exactly one tuple had been found")
@@ -464,7 +473,7 @@ func TestErrorTypeSelectTyped(t *testing.T) {
464473
defer conn.Close()
465474

466475
truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
467-
_, err := conn.Eval(truncateEval, []interface{}{})
476+
_, err := conn.Do(NewEvalRequest(truncateEval).Args([]interface{}{})).Get()
468477
require.Nil(t, err)
469478

470479
for name, testcase := range tupleCases {
@@ -477,14 +486,19 @@ func TestErrorTypeSelectTyped(t *testing.T) {
477486
assert(tuple ~= nil)
478487
`, testcase.ttObj, space, testcase.tuple.pk)
479488

480-
_, err := conn.Eval(insertEval, []interface{}{})
489+
_, err := conn.Do(NewEvalRequest(insertEval).Args([]interface{}{})).Get()
481490
require.Nilf(t, err, "Tuple has been successfully inserted")
482491

483492
var offset uint32 = 0
484493
var limit uint32 = 1
485494
var resp []TupleBoxError
486-
err = conn.SelectTyped(space, index, offset, limit, IterEq,
487-
[]interface{}{testcase.tuple.pk}, &resp)
495+
err = conn.Do(NewSelectRequest(space).
496+
Index(index).
497+
Offset(offset).
498+
Limit(limit).
499+
Iterator(IterEq).
500+
Key([]interface{}{testcase.tuple.pk}),
501+
).GetTyped(&resp)
488502
require.Nil(t, err)
489503
require.NotNil(t, resp)
490504
require.Equalf(t, len(resp), 1, "Exactly one tuple had been found")

connection.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -739,7 +739,7 @@ func (conn *Connection) pinger() {
739739
return
740740
case <-t.C:
741741
}
742-
conn.Ping()
742+
conn.Do(NewPingRequest())
743743
}
744744
}
745745

@@ -934,7 +934,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
934934
ErrRateLimited,
935935
"Request is rate limited on client",
936936
}
937-
fut.done = nil
937+
fut.finish()
938938
return
939939
}
940940
}
@@ -948,23 +948,23 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
948948
ErrConnectionClosed,
949949
"using closed connection",
950950
}
951-
fut.done = nil
951+
fut.finish()
952952
shard.rmut.Unlock()
953953
return
954954
case connDisconnected:
955955
fut.err = ClientError{
956956
ErrConnectionNotReady,
957957
"client connection is not ready",
958958
}
959-
fut.done = nil
959+
fut.finish()
960960
shard.rmut.Unlock()
961961
return
962962
case connShutdown:
963963
fut.err = ClientError{
964964
ErrConnectionShutdown,
965965
"server shutdown in progress",
966966
}
967-
fut.done = nil
967+
fut.finish()
968968
shard.rmut.Unlock()
969969
return
970970
}
@@ -993,9 +993,10 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
993993
runtime.Gosched()
994994
select {
995995
case conn.rlimit <- struct{}{}:
996-
case <-fut.done:
996+
default:
997+
<-fut.WaitChan()
997998
if fut.err == nil {
998-
panic("fut.done is closed, but err is nil")
999+
panic("future WaitChan is closed, but err is nil")
9991000
}
10001001
}
10011002
}
@@ -1007,17 +1008,16 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
10071008
// is "done" before the response is come.
10081009
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
10091010
select {
1010-
case <-fut.done:
1011+
case <-fut.WaitChan():
10111012
case <-ctx.Done():
10121013
}
10131014

1014-
select {
1015-
case <-fut.done:
1015+
if fut.isDone() {
10161016
return
1017-
default:
1018-
conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w",
1019-
fut.requestId, context.Cause(ctx)))
10201017
}
1018+
1019+
conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w",
1020+
fut.requestId, context.Cause(ctx)))
10211021
}
10221022

10231023
func (conn *Connection) incrementRequestCnt() {
@@ -1034,7 +1034,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
10341034
conn.incrementRequestCnt()
10351035

10361036
fut := conn.newFuture(req)
1037-
if fut.done == nil {
1037+
if fut.isDone() {
10381038
conn.decrementRequestCnt()
10391039
return fut
10401040
}
@@ -1057,12 +1057,12 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
10571057
shardn := fut.requestId & (conn.opts.Concurrency - 1)
10581058
shard := &conn.shard[shardn]
10591059
shard.bufmut.Lock()
1060-
select {
1061-
case <-fut.done:
1060+
1061+
if fut.isDone() {
10621062
shard.bufmut.Unlock()
10631063
return
1064-
default:
10651064
}
1065+
10661066
firstWritten := shard.buf.Len() == 0
10671067
if shard.buf.Cap() == 0 {
10681068
shard.buf.b = make([]byte, 0, 128)

future.go

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tarantool
33
import (
44
"io"
55
"sync"
6+
"sync/atomic"
67
"time"
78
)
89

@@ -15,32 +16,43 @@ type Future struct {
1516
mutex sync.Mutex
1617
resp Response
1718
err error
19+
cond *sync.Cond
20+
finished atomic.Bool
1821
done chan struct{}
1922
}
2023

2124
func (fut *Future) wait() {
22-
if fut.done == nil {
23-
return
25+
fut.mutex.Lock()
26+
defer fut.mutex.Unlock()
27+
28+
for !fut.isDone() {
29+
fut.cond.Wait()
2430
}
25-
<-fut.done
2631
}
2732

2833
func (fut *Future) isDone() bool {
29-
if fut.done == nil {
30-
return true
31-
}
32-
select {
33-
case <-fut.done:
34-
return true
35-
default:
36-
return false
34+
return fut.finished.Load()
35+
}
36+
37+
func (fut *Future) finish() {
38+
fut.mutex.Lock()
39+
defer fut.mutex.Unlock()
40+
41+
fut.finished.Store(true)
42+
43+
if fut.done != nil {
44+
close(fut.done)
3745
}
46+
47+
fut.cond.Broadcast()
3848
}
3949

4050
// NewFuture creates a new empty Future for a given Request.
4151
func NewFuture(req Request) (fut *Future) {
4252
fut = &Future{}
43-
fut.done = make(chan struct{})
53+
fut.done = nil
54+
fut.finished.Store(false)
55+
fut.cond = sync.NewCond(&fut.mutex)
4456
fut.req = req
4557
return fut
4658
}
@@ -60,7 +72,13 @@ func (fut *Future) SetResponse(header Header, body io.Reader) error {
6072
}
6173
fut.resp = resp
6274

63-
close(fut.done)
75+
fut.finished.Store(true)
76+
77+
if fut.done != nil {
78+
close(fut.done)
79+
}
80+
81+
fut.cond.Broadcast()
6482
return nil
6583
}
6684

@@ -74,7 +92,13 @@ func (fut *Future) SetError(err error) {
7492
}
7593
fut.err = err
7694

77-
close(fut.done)
95+
fut.finished.Store(true)
96+
97+
if fut.done != nil {
98+
close(fut.done)
99+
}
100+
101+
fut.cond.Broadcast()
78102
}
79103

80104
// GetResponse waits for Future to be filled and returns Response and error.
@@ -122,8 +146,14 @@ func init() {
122146

123147
// WaitChan returns channel which becomes closed when response arrived or error occurred.
124148
func (fut *Future) WaitChan() <-chan struct{} {
125-
if fut.done == nil {
149+
fut.mutex.Lock()
150+
defer fut.mutex.Unlock()
151+
152+
if fut.isDone() {
126153
return closedChan
127154
}
155+
if fut.done == nil {
156+
fut.done = make(chan struct{})
157+
}
128158
return fut.done
129159
}

0 commit comments

Comments
 (0)