Skip to content

Commit 11e317e

Browse files
committed
api: replaced Future.done with a sync.Cond
This commit reduces allocations. Future.done allocation replaced with - Future.cond (*sync.Cond) - Future.finished (atomic.Bool) Other code use Future.isDone() instead (Future.done == nil) check. Added Future.finish() marks Future as done. Future.WaitChan() now creates channel on demand. Closes #496
1 parent 4231d9b commit 11e317e

File tree

4 files changed

+57
-32
lines changed

4 files changed

+57
-32
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1212

1313
* New types for MessagePack extensions compatible with go-option (#459).
1414
* Added `box.MustNew` wrapper for `box.New` without an error (#448).
15+
* Added Future.cond (sync.Cond) and Future.finished flag. Added Future.finish() marks Future as done.
1516

1617
### Changed
1718

@@ -25,6 +26,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2526
Future.pushes[], Future.ready (#480, #497).
2627
* `LogAppendPushFailed` replaced with `LogBoxSessionPushUnsupported` (#480)
2728
* Removed deprecated `Connection` methods, related interfaces and tests are updated (#479)
29+
* Future.done replaced with Future.cond (*sync.Cond) + Future.finished flag.
2830

2931
### Fixed
3032

MIGRATION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ TODO
1515
* Removed `box.session.push()` support: Future.AppendPush() and Future.GetIterator()
1616
methods, ResponseIterator and TimeoutResponseIterator types.
1717
* Removed deprecated `Connection` methods, related interfaces and tests are updated.
18+
1819
*NOTE*: due to Future.GetTyped() doesn't decode SelectRequest into structure, substitute Connection.GetTyped() following the example:
1920
```Go
2021
var singleTpl = Tuple{}
@@ -30,6 +31,7 @@ TODO
3031
).GetTyped(&tpl)
3132
singleTpl := tpl[0]
3233
```
34+
* Future.done replaced with Future.cond (*sync.Cond) + Future.finished flag.
3335

3436
## Migration from v1.x.x to v2.x.x
3537

connection.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -934,7 +934,8 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
934934
ErrRateLimited,
935935
"Request is rate limited on client",
936936
}
937-
fut.done = nil
937+
938+
fut.finish()
938939
return
939940
}
940941
}
@@ -948,23 +949,23 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
948949
ErrConnectionClosed,
949950
"using closed connection",
950951
}
951-
fut.done = nil
952+
fut.finish()
952953
shard.rmut.Unlock()
953954
return
954955
case connDisconnected:
955956
fut.err = ClientError{
956957
ErrConnectionNotReady,
957958
"client connection is not ready",
958959
}
959-
fut.done = nil
960+
fut.finish()
960961
shard.rmut.Unlock()
961962
return
962963
case connShutdown:
963964
fut.err = ClientError{
964965
ErrConnectionShutdown,
965966
"server shutdown in progress",
966967
}
967-
fut.done = nil
968+
fut.finish()
968969
shard.rmut.Unlock()
969970
return
970971
}
@@ -993,9 +994,10 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
993994
runtime.Gosched()
994995
select {
995996
case conn.rlimit <- struct{}{}:
996-
case <-fut.done:
997+
default:
998+
<-fut.WaitChan()
997999
if fut.err == nil {
998-
panic("fut.done is closed, but err is nil")
1000+
panic("future is done, but err is nil")
9991001
}
10001002
}
10011003
}
@@ -1007,17 +1009,16 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
10071009
// is "done" before the response is come.
10081010
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
10091011
select {
1010-
case <-fut.done:
1012+
case <-fut.WaitChan():
10111013
case <-ctx.Done():
10121014
}
10131015

1014-
select {
1015-
case <-fut.done:
1016+
if fut.isDone() {
10161017
return
1017-
default:
1018-
conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w",
1019-
fut.requestId, context.Cause(ctx)))
10201018
}
1019+
1020+
conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w",
1021+
fut.requestId, context.Cause(ctx)))
10211022
}
10221023

10231024
func (conn *Connection) incrementRequestCnt() {
@@ -1034,7 +1035,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
10341035
conn.incrementRequestCnt()
10351036

10361037
fut := conn.newFuture(req)
1037-
if fut.done == nil {
1038+
if fut.isDone() {
10381039
conn.decrementRequestCnt()
10391040
return fut
10401041
}
@@ -1057,12 +1058,12 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
10571058
shardn := fut.requestId & (conn.opts.Concurrency - 1)
10581059
shard := &conn.shard[shardn]
10591060
shard.bufmut.Lock()
1060-
select {
1061-
case <-fut.done:
1061+
1062+
if fut.isDone() {
10621063
shard.bufmut.Unlock()
10631064
return
1064-
default:
10651065
}
1066+
10661067
firstWritten := shard.buf.Len() == 0
10671068
if shard.buf.Cap() == 0 {
10681069
shard.buf.b = make([]byte, 0, 128)

future.go

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tarantool
33
import (
44
"io"
55
"sync"
6+
"sync/atomic"
67
"time"
78
)
89

@@ -15,32 +16,38 @@ type Future struct {
1516
mutex sync.Mutex
1617
resp Response
1718
err error
19+
cond *sync.Cond
20+
finished atomic.Bool
1821
done chan struct{}
1922
}
2023

2124
func (fut *Future) wait() {
22-
if fut.done == nil {
23-
return
25+
fut.mutex.Lock()
26+
defer fut.mutex.Unlock()
27+
28+
for !fut.isDone() {
29+
fut.cond.Wait()
2430
}
25-
<-fut.done
2631
}
2732

2833
func (fut *Future) isDone() bool {
29-
if fut.done == nil {
30-
return true
31-
}
32-
select {
33-
case <-fut.done:
34-
return true
35-
default:
36-
return false
37-
}
34+
return fut.finished.Load()
35+
}
36+
37+
func (fut *Future) finish() {
38+
fut.mutex.Lock()
39+
defer fut.mutex.Unlock()
40+
41+
fut.finished.Store(true)
42+
fut.cond.Broadcast()
3843
}
3944

4045
// NewFuture creates a new empty Future for a given Request.
4146
func NewFuture(req Request) (fut *Future) {
4247
fut = &Future{}
43-
fut.done = make(chan struct{})
48+
fut.done = nil
49+
fut.finished.Store(false)
50+
fut.cond = sync.NewCond(&fut.mutex)
4451
fut.req = req
4552
return fut
4653
}
@@ -60,7 +67,12 @@ func (fut *Future) SetResponse(header Header, body io.Reader) error {
6067
}
6168
fut.resp = resp
6269

63-
close(fut.done)
70+
if fut.done != nil {
71+
close(fut.done)
72+
}
73+
74+
fut.finished.Store(true)
75+
fut.cond.Broadcast()
6476
return nil
6577
}
6678

@@ -74,7 +86,12 @@ func (fut *Future) SetError(err error) {
7486
}
7587
fut.err = err
7688

77-
close(fut.done)
89+
if fut.done != nil {
90+
close(fut.done)
91+
}
92+
93+
fut.finished.Store(true)
94+
fut.cond.Broadcast()
7895
}
7996

8097
// GetResponse waits for Future to be filled and returns Response and error.
@@ -122,8 +139,11 @@ func init() {
122139

123140
// WaitChan returns channel which becomes closed when response arrived or error occurred.
124141
func (fut *Future) WaitChan() <-chan struct{} {
125-
if fut.done == nil {
142+
if fut.isDone() {
126143
return closedChan
127144
}
145+
if fut.done == nil {
146+
fut.done = make(chan struct{})
147+
}
128148
return fut.done
129149
}

0 commit comments

Comments
 (0)