Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.

* New types for MessagePack extensions compatible with go-option (#459).
* Added `box.MustNew` wrapper for `box.New` without an error (#448).
* Added Future.cond (sync.Cond) and Future.finished bool. Added Future.finish() marks Future as done (#496).

### Changed

Expand All @@ -23,7 +24,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
* Removed deprecated `box.session.push()` support: Future.AppendPush()
and Future.GetIterator() methods, ResponseIterator and TimeoutResponseIterator types,
Future.pushes[], Future.ready (#480, #497).
* `LogAppendPushFailed` replaced with `LogBoxSessionPushUnsupported` (#480)
* `LogAppendPushFailed` replaced with `LogBoxSessionPushUnsupported` (#480).
* Removed deprecated `Connection` methods, related interfaces and tests are updated (#479).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like rebase artifacts.

* Future.done replaced with Future.cond (*sync.Cond) + Future.finished bool (#496).

### Fixed

Expand Down
18 changes: 18 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@ TODO
* Removed deprecated `pool` methods, related interfaces and tests are updated.
* Removed `box.session.push()` support: Future.AppendPush() and Future.GetIterator()
methods, ResponseIterator and TimeoutResponseIterator types.
* Removed deprecated `Connection` methods, related interfaces and tests are updated.

*NOTE*: due to Future.GetTyped() doesn't decode SelectRequest into structure, substitute Connection.GetTyped() following the example:
```Go
var singleTpl = Tuple{}
err = conn.GetTyped(space, index, key, &singleTpl)
```
At now became:
```Go
var tpl []Tuple
err = conn.Do(NewSelectRequest(space).
Index(index).
Limit(1).
Key(key)
).GetTyped(&tpl)
singleTpl := tpl[0]
```
Comment on lines +19 to +33
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here.

* Future.done replaced with Future.cond (*sync.Cond) + Future.finished bool.

## Migration from v1.x.x to v2.x.x

Expand Down
46 changes: 30 additions & 16 deletions boxerror_test.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

etc

Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ func TestErrorTypeEval(t *testing.T) {

for name, testcase := range tupleCases {
t.Run(name, func(t *testing.T) {
data, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val})
data, err := conn.Do(NewEvalRequest("return ...").
Args([]interface{}{&testcase.tuple.val}),
).Get()
require.Nil(t, err)
require.NotNil(t, data)
require.Equal(t, len(data), 1)
Expand All @@ -324,7 +326,9 @@ func TestErrorTypeEvalTyped(t *testing.T) {
for name, testcase := range tupleCases {
t.Run(name, func(t *testing.T) {
var res []BoxError
err := conn.EvalTyped("return ...", []interface{}{&testcase.tuple.val}, &res)
err := conn.Do(NewEvalRequest("return ...").
Args([]interface{}{&testcase.tuple.val}),
).GetTyped(&res)
require.Nil(t, err)
require.NotNil(t, res)
require.Equal(t, len(res), 1)
Expand All @@ -340,12 +344,12 @@ func TestErrorTypeInsert(t *testing.T) {
defer conn.Close()

truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
_, err := conn.Eval(truncateEval, []interface{}{})
_, err := conn.Do(NewEvalRequest(truncateEval).Args([]interface{}{})).Get()
require.Nil(t, err)

for name, testcase := range tupleCases {
t.Run(name, func(t *testing.T) {
_, err = conn.Insert(space, &testcase.tuple)
_, err = conn.Do(NewInsertRequest(space).Tuple(&testcase.tuple)).Get()
require.Nil(t, err)

checkEval := fmt.Sprintf(`
Expand All @@ -365,7 +369,7 @@ func TestErrorTypeInsert(t *testing.T) {
// of connector BoxError are equal to the Tarantool ones
// since they may differ between different Tarantool versions
// and editions.
_, err := conn.Eval(checkEval, []interface{}{})
_, err := conn.Do(NewEvalRequest(checkEval).Args([]interface{}{})).Get()
require.Nilf(t, err, "Tuple has been successfully inserted")
})
}
Expand All @@ -378,13 +382,13 @@ func TestErrorTypeInsertTyped(t *testing.T) {
defer conn.Close()

truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
_, err := conn.Eval(truncateEval, []interface{}{})
_, err := conn.Do(NewEvalRequest(truncateEval).Args([]interface{}{})).Get()
require.Nil(t, err)

for name, testcase := range tupleCases {
t.Run(name, func(t *testing.T) {
var res []TupleBoxError
err = conn.InsertTyped(space, &testcase.tuple, &res)
err = conn.Do(NewInsertRequest(space).Tuple(&testcase.tuple)).GetTyped(&res)
require.Nil(t, err)
require.NotNil(t, res)
require.Equal(t, len(res), 1)
Expand All @@ -407,7 +411,7 @@ func TestErrorTypeInsertTyped(t *testing.T) {
// of connector BoxError are equal to the Tarantool ones
// since they may differ between different Tarantool versions
// and editions.
_, err := conn.Eval(checkEval, []interface{}{})
_, err := conn.Do(NewEvalRequest(checkEval).Args([]interface{}{})).Get()
require.Nilf(t, err, "Tuple has been successfully inserted")
})
}
Expand All @@ -420,7 +424,7 @@ func TestErrorTypeSelect(t *testing.T) {
defer conn.Close()

truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
_, err := conn.Eval(truncateEval, []interface{}{})
_, err := conn.Do(NewEvalRequest(truncateEval).Args([]interface{}{})).Get()
require.Nil(t, err)

for name, testcase := range tupleCases {
Expand All @@ -433,13 +437,18 @@ func TestErrorTypeSelect(t *testing.T) {
assert(tuple ~= nil)
`, testcase.ttObj, space, testcase.tuple.pk)

_, err := conn.Eval(insertEval, []interface{}{})
_, err := conn.Do(NewEvalRequest(insertEval).Args([]interface{}{})).Get()
require.Nilf(t, err, "Tuple has been successfully inserted")

var offset uint32 = 0
var limit uint32 = 1
data, err := conn.Select(space, index, offset, limit, IterEq,
[]interface{}{testcase.tuple.pk})
data, err := conn.Do(NewSelectRequest(space).
Index(index).
Offset(offset).
Limit(limit).
Iterator(IterEq).
Key([]interface{}{testcase.tuple.pk}),
).Get()
require.Nil(t, err)
require.NotNil(t, data)
require.Equalf(t, len(data), 1, "Exactly one tuple had been found")
Expand All @@ -464,7 +473,7 @@ func TestErrorTypeSelectTyped(t *testing.T) {
defer conn.Close()

truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
_, err := conn.Eval(truncateEval, []interface{}{})
_, err := conn.Do(NewEvalRequest(truncateEval).Args([]interface{}{})).Get()
require.Nil(t, err)

for name, testcase := range tupleCases {
Expand All @@ -477,14 +486,19 @@ func TestErrorTypeSelectTyped(t *testing.T) {
assert(tuple ~= nil)
`, testcase.ttObj, space, testcase.tuple.pk)

_, err := conn.Eval(insertEval, []interface{}{})
_, err := conn.Do(NewEvalRequest(insertEval).Args([]interface{}{})).Get()
require.Nilf(t, err, "Tuple has been successfully inserted")

var offset uint32 = 0
var limit uint32 = 1
var resp []TupleBoxError
err = conn.SelectTyped(space, index, offset, limit, IterEq,
[]interface{}{testcase.tuple.pk}, &resp)
err = conn.Do(NewSelectRequest(space).
Index(index).
Offset(offset).
Limit(limit).
Iterator(IterEq).
Key([]interface{}{testcase.tuple.pk}),
).GetTyped(&resp)
require.Nil(t, err)
require.NotNil(t, resp)
require.Equalf(t, len(resp), 1, "Exactly one tuple had been found")
Expand Down
34 changes: 17 additions & 17 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ func (conn *Connection) pinger() {
return
case <-t.C:
}
conn.Ping()
conn.Do(NewPingRequest())
}
}

Expand Down Expand Up @@ -934,7 +934,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
ErrRateLimited,
"Request is rate limited on client",
}
fut.done = nil
fut.finish()
return
}
}
Expand All @@ -948,23 +948,23 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
ErrConnectionClosed,
"using closed connection",
}
fut.done = nil
fut.finish()
shard.rmut.Unlock()
return
case connDisconnected:
fut.err = ClientError{
ErrConnectionNotReady,
"client connection is not ready",
}
fut.done = nil
fut.finish()
shard.rmut.Unlock()
return
case connShutdown:
fut.err = ClientError{
ErrConnectionShutdown,
"server shutdown in progress",
}
fut.done = nil
fut.finish()
shard.rmut.Unlock()
return
}
Expand Down Expand Up @@ -993,9 +993,10 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
runtime.Gosched()
select {
case conn.rlimit <- struct{}{}:
case <-fut.done:
default:
<-fut.WaitChan()
Copy link
Collaborator

@oleg-jukovec oleg-jukovec Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should look like this:

Suggested change
<-fut.WaitChan()
default:
select{
case conn.rlimit <- struct{}{}:
case <-fut.WaitChan():
if fut.err == nil {
panic("fut.done is closed, but err is nil")
}
}

if fut.err == nil {
panic("fut.done is closed, but err is nil")
panic("future WaitChan is closed, but err is nil")
}
}
}
Expand All @@ -1007,17 +1008,16 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
// is "done" before the response is come.
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
select {
case <-fut.done:
case <-fut.WaitChan():
case <-ctx.Done():
}

select {
case <-fut.done:
if fut.isDone() {
return
default:
conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w",
fut.requestId, context.Cause(ctx)))
}

conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w",
fut.requestId, context.Cause(ctx)))
}

func (conn *Connection) incrementRequestCnt() {
Expand All @@ -1034,7 +1034,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
conn.incrementRequestCnt()

fut := conn.newFuture(req)
if fut.done == nil {
if fut.isDone() {
conn.decrementRequestCnt()
return fut
}
Expand All @@ -1057,12 +1057,12 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
shardn := fut.requestId & (conn.opts.Concurrency - 1)
shard := &conn.shard[shardn]
shard.bufmut.Lock()
select {
case <-fut.done:

if fut.isDone() {
shard.bufmut.Unlock()
return
default:
}

firstWritten := shard.buf.Len() == 0
if shard.buf.Cap() == 0 {
shard.buf.b = make([]byte, 0, 128)
Expand Down
60 changes: 45 additions & 15 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tarantool
import (
"io"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -15,32 +16,43 @@ type Future struct {
mutex sync.Mutex
resp Response
err error
cond *sync.Cond
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cond *sync.Cond
cond sync.Cond

finished atomic.Bool
done chan struct{}
}

func (fut *Future) wait() {
if fut.done == nil {
return
fut.mutex.Lock()
defer fut.mutex.Unlock()

for !fut.isDone() {
fut.cond.Wait()
}
<-fut.done
}

func (fut *Future) isDone() bool {
if fut.done == nil {
return true
}
select {
case <-fut.done:
return true
default:
return false
return fut.finished.Load()
}

func (fut *Future) finish() {
fut.mutex.Lock()
defer fut.mutex.Unlock()

fut.finished.Store(true)

if fut.done != nil {
close(fut.done)
}

fut.cond.Broadcast()
}

// NewFuture creates a new empty Future for a given Request.
func NewFuture(req Request) (fut *Future) {
fut = &Future{}
fut.done = make(chan struct{})
fut.done = nil
fut.finished.Store(false)
fut.cond = sync.NewCond(&fut.mutex)
fut.req = req
return fut
}
Expand All @@ -60,7 +72,13 @@ func (fut *Future) SetResponse(header Header, body io.Reader) error {
}
fut.resp = resp

close(fut.done)
fut.finished.Store(true)

if fut.done != nil {
close(fut.done)
}

fut.cond.Broadcast()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fut.cond.Broadcast()
fut.cond.Broadcast()

return nil
}

Expand All @@ -74,7 +92,13 @@ func (fut *Future) SetError(err error) {
}
fut.err = err

close(fut.done)
fut.finished.Store(true)

if fut.done != nil {
close(fut.done)
}

fut.cond.Broadcast()
}

// GetResponse waits for Future to be filled and returns Response and error.
Expand Down Expand Up @@ -122,8 +146,14 @@ func init() {

// WaitChan returns channel which becomes closed when response arrived or error occurred.
func (fut *Future) WaitChan() <-chan struct{} {
if fut.done == nil {
fut.mutex.Lock()
defer fut.mutex.Unlock()

if fut.isDone() {
return closedChan
}
if fut.done == nil {
fut.done = make(chan struct{})
}
Comment on lines +155 to +157
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a code-style issue.

Suggested change
if fut.done == nil {
fut.done = make(chan struct{})
}
if fut.done == nil {
fut.done = make(chan struct{})
}

return fut.done
}
Loading
Loading