-
Notifications
You must be signed in to change notification settings - Fork 60
api: replaced Future.done with a sync.Cond #508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,24 @@ TODO | |
| * Removed deprecated `pool` methods, related interfaces and tests are updated. | ||
| * Removed `box.session.push()` support: Future.AppendPush() and Future.GetIterator() | ||
| methods, ResponseIterator and TimeoutResponseIterator types. | ||
| * Removed deprecated `Connection` methods, related interfaces and tests are updated. | ||
|
|
||
| *NOTE*: due to Future.GetTyped() doesn't decode SelectRequest into structure, substitute Connection.GetTyped() following the example: | ||
| ```Go | ||
| var singleTpl = Tuple{} | ||
| err = conn.GetTyped(space, index, key, &singleTpl) | ||
| ``` | ||
| At now became: | ||
| ```Go | ||
| var tpl []Tuple | ||
| err = conn.Do(NewSelectRequest(space). | ||
| Index(index). | ||
| Limit(1). | ||
| Key(key) | ||
| ).GetTyped(&tpl) | ||
| singleTpl := tpl[0] | ||
| ``` | ||
|
Comment on lines
+19
to
+33
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And here. |
||
| * Future.done replaced with Future.cond (*sync.Cond) + Future.finished bool. | ||
|
|
||
| ## Migration from v1.x.x to v2.x.x | ||
|
|
||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. etc |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -739,7 +739,7 @@ func (conn *Connection) pinger() { | |||||||||||||||||||
| return | ||||||||||||||||||||
| case <-t.C: | ||||||||||||||||||||
| } | ||||||||||||||||||||
| conn.Ping() | ||||||||||||||||||||
| conn.Do(NewPingRequest()) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
@@ -934,7 +934,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { | |||||||||||||||||||
| ErrRateLimited, | ||||||||||||||||||||
| "Request is rate limited on client", | ||||||||||||||||||||
| } | ||||||||||||||||||||
| fut.done = nil | ||||||||||||||||||||
| fut.finish() | ||||||||||||||||||||
| return | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
@@ -948,23 +948,23 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { | |||||||||||||||||||
| ErrConnectionClosed, | ||||||||||||||||||||
| "using closed connection", | ||||||||||||||||||||
| } | ||||||||||||||||||||
| fut.done = nil | ||||||||||||||||||||
| fut.finish() | ||||||||||||||||||||
| shard.rmut.Unlock() | ||||||||||||||||||||
| return | ||||||||||||||||||||
| case connDisconnected: | ||||||||||||||||||||
| fut.err = ClientError{ | ||||||||||||||||||||
| ErrConnectionNotReady, | ||||||||||||||||||||
| "client connection is not ready", | ||||||||||||||||||||
| } | ||||||||||||||||||||
| fut.done = nil | ||||||||||||||||||||
| fut.finish() | ||||||||||||||||||||
| shard.rmut.Unlock() | ||||||||||||||||||||
| return | ||||||||||||||||||||
| case connShutdown: | ||||||||||||||||||||
| fut.err = ClientError{ | ||||||||||||||||||||
| ErrConnectionShutdown, | ||||||||||||||||||||
| "server shutdown in progress", | ||||||||||||||||||||
| } | ||||||||||||||||||||
| fut.done = nil | ||||||||||||||||||||
| fut.finish() | ||||||||||||||||||||
| shard.rmut.Unlock() | ||||||||||||||||||||
| return | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
@@ -993,9 +993,10 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { | |||||||||||||||||||
| runtime.Gosched() | ||||||||||||||||||||
| select { | ||||||||||||||||||||
| case conn.rlimit <- struct{}{}: | ||||||||||||||||||||
| case <-fut.done: | ||||||||||||||||||||
| default: | ||||||||||||||||||||
| <-fut.WaitChan() | ||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should look like this:
Suggested change
|
||||||||||||||||||||
| if fut.err == nil { | ||||||||||||||||||||
| panic("fut.done is closed, but err is nil") | ||||||||||||||||||||
| panic("future WaitChan is closed, but err is nil") | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
@@ -1007,17 +1008,16 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { | |||||||||||||||||||
| // is "done" before the response is come. | ||||||||||||||||||||
| func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) { | ||||||||||||||||||||
| select { | ||||||||||||||||||||
| case <-fut.done: | ||||||||||||||||||||
| case <-fut.WaitChan(): | ||||||||||||||||||||
| case <-ctx.Done(): | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| select { | ||||||||||||||||||||
| case <-fut.done: | ||||||||||||||||||||
| if fut.isDone() { | ||||||||||||||||||||
| return | ||||||||||||||||||||
| default: | ||||||||||||||||||||
| conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w", | ||||||||||||||||||||
| fut.requestId, context.Cause(ctx))) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w", | ||||||||||||||||||||
| fut.requestId, context.Cause(ctx))) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func (conn *Connection) incrementRequestCnt() { | ||||||||||||||||||||
|
|
@@ -1034,7 +1034,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { | |||||||||||||||||||
| conn.incrementRequestCnt() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| fut := conn.newFuture(req) | ||||||||||||||||||||
| if fut.done == nil { | ||||||||||||||||||||
| if fut.isDone() { | ||||||||||||||||||||
| conn.decrementRequestCnt() | ||||||||||||||||||||
| return fut | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
@@ -1057,12 +1057,12 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { | |||||||||||||||||||
| shardn := fut.requestId & (conn.opts.Concurrency - 1) | ||||||||||||||||||||
| shard := &conn.shard[shardn] | ||||||||||||||||||||
| shard.bufmut.Lock() | ||||||||||||||||||||
| select { | ||||||||||||||||||||
| case <-fut.done: | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if fut.isDone() { | ||||||||||||||||||||
| shard.bufmut.Unlock() | ||||||||||||||||||||
| return | ||||||||||||||||||||
| default: | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| firstWritten := shard.buf.Len() == 0 | ||||||||||||||||||||
| if shard.buf.Cap() == 0 { | ||||||||||||||||||||
| shard.buf.b = make([]byte, 0, 128) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,6 +3,7 @@ package tarantool | |||||||||||||||||
| import ( | ||||||||||||||||||
| "io" | ||||||||||||||||||
| "sync" | ||||||||||||||||||
| "sync/atomic" | ||||||||||||||||||
| "time" | ||||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -15,32 +16,43 @@ type Future struct { | |||||||||||||||||
| mutex sync.Mutex | ||||||||||||||||||
| resp Response | ||||||||||||||||||
| err error | ||||||||||||||||||
| cond *sync.Cond | ||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
| finished atomic.Bool | ||||||||||||||||||
| done chan struct{} | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| func (fut *Future) wait() { | ||||||||||||||||||
| if fut.done == nil { | ||||||||||||||||||
| return | ||||||||||||||||||
| fut.mutex.Lock() | ||||||||||||||||||
| defer fut.mutex.Unlock() | ||||||||||||||||||
|
|
||||||||||||||||||
| for !fut.isDone() { | ||||||||||||||||||
| fut.cond.Wait() | ||||||||||||||||||
| } | ||||||||||||||||||
| <-fut.done | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| func (fut *Future) isDone() bool { | ||||||||||||||||||
| if fut.done == nil { | ||||||||||||||||||
| return true | ||||||||||||||||||
| } | ||||||||||||||||||
| select { | ||||||||||||||||||
| case <-fut.done: | ||||||||||||||||||
| return true | ||||||||||||||||||
| default: | ||||||||||||||||||
| return false | ||||||||||||||||||
| return fut.finished.Load() | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| func (fut *Future) finish() { | ||||||||||||||||||
| fut.mutex.Lock() | ||||||||||||||||||
| defer fut.mutex.Unlock() | ||||||||||||||||||
|
|
||||||||||||||||||
| fut.finished.Store(true) | ||||||||||||||||||
|
|
||||||||||||||||||
| if fut.done != nil { | ||||||||||||||||||
| close(fut.done) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| fut.cond.Broadcast() | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // NewFuture creates a new empty Future for a given Request. | ||||||||||||||||||
| func NewFuture(req Request) (fut *Future) { | ||||||||||||||||||
| fut = &Future{} | ||||||||||||||||||
| fut.done = make(chan struct{}) | ||||||||||||||||||
| fut.done = nil | ||||||||||||||||||
| fut.finished.Store(false) | ||||||||||||||||||
| fut.cond = sync.NewCond(&fut.mutex) | ||||||||||||||||||
| fut.req = req | ||||||||||||||||||
| return fut | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
@@ -60,7 +72,13 @@ func (fut *Future) SetResponse(header Header, body io.Reader) error { | |||||||||||||||||
| } | ||||||||||||||||||
| fut.resp = resp | ||||||||||||||||||
|
|
||||||||||||||||||
| close(fut.done) | ||||||||||||||||||
| fut.finished.Store(true) | ||||||||||||||||||
|
|
||||||||||||||||||
| if fut.done != nil { | ||||||||||||||||||
| close(fut.done) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| fut.cond.Broadcast() | ||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
| return nil | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -74,7 +92,13 @@ func (fut *Future) SetError(err error) { | |||||||||||||||||
| } | ||||||||||||||||||
| fut.err = err | ||||||||||||||||||
|
|
||||||||||||||||||
| close(fut.done) | ||||||||||||||||||
| fut.finished.Store(true) | ||||||||||||||||||
|
|
||||||||||||||||||
| if fut.done != nil { | ||||||||||||||||||
| close(fut.done) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| fut.cond.Broadcast() | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // GetResponse waits for Future to be filled and returns Response and error. | ||||||||||||||||||
|
|
@@ -122,8 +146,14 @@ func init() { | |||||||||||||||||
|
|
||||||||||||||||||
| // WaitChan returns channel which becomes closed when response arrived or error occurred. | ||||||||||||||||||
| func (fut *Future) WaitChan() <-chan struct{} { | ||||||||||||||||||
| if fut.done == nil { | ||||||||||||||||||
| fut.mutex.Lock() | ||||||||||||||||||
| defer fut.mutex.Unlock() | ||||||||||||||||||
|
|
||||||||||||||||||
| if fut.isDone() { | ||||||||||||||||||
| return closedChan | ||||||||||||||||||
| } | ||||||||||||||||||
| if fut.done == nil { | ||||||||||||||||||
| fut.done = make(chan struct{}) | ||||||||||||||||||
| } | ||||||||||||||||||
|
Comment on lines
+155
to
+157
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a code-style issue.
Suggested change
|
||||||||||||||||||
| return fut.done | ||||||||||||||||||
| } | ||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like rebase artifacts.