Skip to content

Commit beda545

Browse files
committed
api: updated response/future methods
Added method Release to Future and Response's interface, that allows to free used data directly by calling. Fixes #493
1 parent aff7842 commit beda545

File tree

10 files changed

+126
-21
lines changed

10 files changed

+126
-21
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1212

1313
* New types for MessagePack extensions compatible with go-option (#459).
1414
* Added `box.MustNew` wrapper for `box.New` without an error (#448).
15+
* Method `Release` for `Future` and `Response` interface that allows
16+
to free used data directly by calling (#493).
1517

1618
### Changed
1719

connection.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,18 @@ var (
3939
"to the current connection or connection pool")
4040
)
4141

42+
var smallBufPool = &sync.Pool{
43+
New: func() interface{} {
44+
return &smallBuf{}
45+
},
46+
}
47+
48+
var slicePool = &sync.Pool{
49+
New: func() interface{} {
50+
return make([]byte, 1024)
51+
},
52+
}
53+
4254
const (
4355
// Connected signals that connection is established or reestablished.
4456
Connected ConnEventKind = iota + 1
@@ -373,7 +385,6 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
373385
}
374386

375387
conn.cond = sync.NewCond(&conn.mutex)
376-
377388
if conn.opts.Reconnect > 0 {
378389
// We don't need these mutex.Lock()/mutex.Unlock() here, but
379390
// runReconnects() expects mutex.Lock() to be set, so it's
@@ -849,7 +860,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
849860
go conn.eventer(events)
850861

851862
for atomic.LoadUint32(&conn.state) != connClosed {
852-
respBytes, err := read(r, conn.lenbuf[:])
863+
respBytes, err := read(r, conn.lenbuf[:], conn)
853864
if err != nil {
854865
err = ClientError{
855866
ErrIoError,
@@ -858,8 +869,9 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
858869
conn.reconnect(err, c)
859870
return
860871
}
861-
buf := smallBuf{b: respBytes}
862-
header, code, err := decodeHeader(conn.dec, &buf)
872+
buf := smallBufPool.Get().(*smallBuf)
873+
buf.b = respBytes
874+
header, code, err := decodeHeader(conn.dec, buf)
863875
if err != nil {
864876
err = ClientError{
865877
ErrProtocolError,
@@ -871,7 +883,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
871883

872884
var fut *Future = nil
873885
if code == iproto.IPROTO_EVENT {
874-
if event, err := readWatchEvent(&buf); err == nil {
886+
if event, err := readWatchEvent(buf); err == nil {
875887
events <- event
876888
} else {
877889
err = ClientError{
@@ -885,7 +897,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
885897
conn.opts.Logger.Report(LogBoxSessionPushUnsupported, conn, header)
886898
} else {
887899
if fut = conn.fetchFuture(header.RequestId); fut != nil {
888-
if err := fut.SetResponse(header, &buf); err != nil {
900+
if err := fut.SetResponse(header, buf); err != nil {
889901
fut.SetError(fmt.Errorf("failed to set response: %w", err))
890902
}
891903
conn.markDone(fut)
@@ -1187,7 +1199,7 @@ func (conn *Connection) timeouts() {
11871199
}
11881200
}
11891201

1190-
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
1202+
func read(r io.Reader, lenbuf []byte, conn ...*Connection) (response []byte, err error) {
11911203
var length uint64
11921204

11931205
if _, err = io.ReadFull(r, lenbuf); err != nil {
@@ -1211,7 +1223,18 @@ func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
12111223
return
12121224
}
12131225

1214-
response = make([]byte, length)
1226+
if len(conn) == 0 {
1227+
response = make([]byte, length)
1228+
} else {
1229+
ptr := slicePool.Get().([]byte)
1230+
if cap(ptr) < int(length) {
1231+
response = make([]byte, length)
1232+
slicePool.Put(ptr) // nolint
1233+
} else {
1234+
response = ptr
1235+
response = response[:length]
1236+
}
1237+
}
12151238
_, err = io.ReadFull(r, response)
12161239

12171240
return

example_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,6 +1291,7 @@ func ExampleConnection_Do_failure() {
12911291

12921292
// We got a future, the request actually not performed yet.
12931293
future := conn.Do(req)
1294+
defer future.Release()
12941295

12951296
// When the future receives the response, the result of the Future is set
12961297
// and becomes available. We could wait for that moment with Future.Get(),

future.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func (fut *Future) GetResponse() (Response, error) {
8989
}
9090

9191
// Get waits for Future to be filled and returns the data of the Response and error.
92+
// Also Release Future's data. After this, Future becomes invalid.
9293
//
9394
// The data will be []interface{}, so if you want more performance, use GetTyped method.
9495
//
@@ -105,6 +106,8 @@ func (fut *Future) Get() ([]interface{}, error) {
105106
// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens.
106107
// It is could be much faster than Get() function.
107108
//
109+
// Also Release Future's data. After this, Future becomes invalid.
110+
//
108111
// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions).
109112
func (fut *Future) GetTyped(result interface{}) error {
110113
fut.wait()
@@ -127,3 +130,11 @@ func (fut *Future) WaitChan() <-chan struct{} {
127130
}
128131
return fut.done
129132
}
133+
134+
// Release is freeing the Future resources.
135+
// After this, using this Future becomes invalid.
136+
func (fut *Future) Release() {
137+
if fut.resp != nil {
138+
fut.resp.Release()
139+
}
140+
}

future_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (resp *futureMockResponse) Header() Header {
5353
return resp.header
5454
}
5555

56+
func (resp *futureMockResponse) Release() {
57+
// Releasing futureMockResponse data.
58+
}
59+
5660
func (resp *futureMockResponse) Decode() ([]interface{}, error) {
5761
resp.decodeCnt++
5862

prepared.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (req *PrepareRequest) Response(header Header, body io.Reader) (Response, er
9090
if err != nil {
9191
return nil, err
9292
}
93-
return &PrepareResponse{baseResponse: baseResp}, nil
93+
return &PrepareResponse{baseResponse: *baseResp}, nil
9494
}
9595

9696
// UnprepareRequest helps you to create an unprepare request object for
@@ -204,5 +204,5 @@ func (req *ExecutePreparedRequest) Response(header Header, body io.Reader) (Resp
204204
if err != nil {
205205
return nil, err
206206
}
207-
return &ExecuteResponse{baseResponse: baseResp}, nil
207+
return &ExecuteResponse{baseResponse: *baseResp}, nil
208208
}

request.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -620,11 +620,11 @@ func (req *SelectRequest) Context(ctx context.Context) *SelectRequest {
620620

621621
// Response creates a response for the SelectRequest.
622622
func (req *SelectRequest) Response(header Header, body io.Reader) (Response, error) {
623-
baseResp, err := createBaseResponse(header, body)
623+
SelectResp, err := createSelectResponse(header, body)
624624
if err != nil {
625625
return nil, err
626626
}
627-
return &SelectResponse{baseResponse: baseResp}, nil
627+
return SelectResp, nil
628628
}
629629

630630
// InsertRequest helps you to create an insert request object for execution
@@ -1154,7 +1154,7 @@ func (req *ExecuteRequest) Response(header Header, body io.Reader) (Response, er
11541154
if err != nil {
11551155
return nil, err
11561156
}
1157-
return &ExecuteResponse{baseResponse: baseResp}, nil
1157+
return &ExecuteResponse{baseResponse: *baseResp}, nil
11581158
}
11591159

11601160
// WatchOnceRequest synchronously fetches the value currently associated with a

response.go

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tarantool
33
import (
44
"fmt"
55
"io"
6+
"sync"
67

78
"github.com/tarantool/go-iproto"
89
"github.com/vmihailenco/msgpack/v5"
@@ -12,6 +13,8 @@ import (
1213
type Response interface {
1314
// Header returns a response header.
1415
Header() Header
16+
// Release free responses data and returns buffer's data.
17+
Release()
1518
// Decode decodes a response.
1619
Decode() ([]interface{}, error)
1720
// DecodeTyped decodes a response into a given container res.
@@ -31,24 +34,42 @@ type baseResponse struct {
3134
err error
3235
}
3336

34-
func createBaseResponse(header Header, body io.Reader) (baseResponse, error) {
37+
func createBaseResponse(header Header, body io.Reader) (*baseResponse, error) {
38+
resp := &baseResponse{}
3539
if body == nil {
36-
return baseResponse{header: header}, nil
40+
resp.header = header
41+
return resp, nil
3742
}
3843
if buf, ok := body.(*smallBuf); ok {
39-
return baseResponse{header: header, buf: *buf}, nil
44+
resp.header = header
45+
resp.buf.b = buf.b
46+
resp.buf.p = buf.p
47+
return resp, nil
4048
}
4149
data, err := io.ReadAll(body)
4250
if err != nil {
43-
return baseResponse{}, err
51+
return resp, err
4452
}
45-
return baseResponse{header: header, buf: smallBuf{b: data}}, nil
53+
resp.header = header
54+
resp.buf.b = data
55+
return resp, nil
56+
}
57+
58+
func (resp *baseResponse) clear() {
59+
*resp = baseResponse{}
60+
}
61+
62+
func (resp *baseResponse) Release() {
63+
slicePool.Put(resp.buf.b) // nolint
64+
resp.buf.b = nil
65+
resp.buf.p = 0
66+
smallBufPool.Put(&resp.buf)
4667
}
4768

4869
// DecodeBaseResponse parse response header and body.
4970
func DecodeBaseResponse(header Header, body io.Reader) (Response, error) {
5071
resp, err := createBaseResponse(header, body)
51-
return &resp, err
72+
return resp, err
5273
}
5374

5475
// SelectResponse is used for the select requests.
@@ -657,6 +678,41 @@ func (resp *baseResponse) Header() Header {
657678
return resp.header
658679
}
659680

681+
var selectResponsePool *sync.Pool = &sync.Pool{
682+
New: func() interface{} {
683+
return &SelectResponse{}
684+
},
685+
}
686+
687+
func createSelectResponse(header Header, body io.Reader) (*SelectResponse, error) {
688+
resp := selectResponsePool.Get().(*SelectResponse)
689+
if body == nil {
690+
resp.header = header
691+
return resp, nil
692+
}
693+
if buf, ok := body.(*smallBuf); ok {
694+
resp.header = header
695+
resp.buf.b = buf.b
696+
resp.buf.p = buf.p
697+
return resp, nil
698+
}
699+
data, err := io.ReadAll(body)
700+
if err != nil {
701+
return resp, err
702+
}
703+
resp.header = header
704+
resp.buf.b = data
705+
return resp, nil
706+
}
707+
708+
func (resp *SelectResponse) Release() {
709+
resp.baseResponse.Release()
710+
resp.baseResponse.clear()
711+
resp.pos = nil
712+
713+
selectResponsePool.Put(resp)
714+
}
715+
660716
// Pos returns a position descriptor of the last selected tuple for the SelectResponse.
661717
// If the response was not decoded, this method will call Decode().
662718
func (resp *SelectResponse) Pos() ([]byte, error) {

tarantool_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,14 +220,15 @@ func BenchmarkSync_naive_with_custom_type(b *testing.B) {
220220
b.ResetTimer()
221221

222222
for b.Loop() {
223-
err := conn.Do(req).GetTyped(&tuple)
224-
if err != nil {
223+
fut := conn.Do(req)
224+
if err := fut.GetTyped(&tuple); err != nil {
225225
b.Errorf("request error: %s", err)
226226
}
227227

228228
if tuple.id != 1111 {
229229
b.Errorf("invalid result")
230230
}
231+
fut.Release()
231232
}
232233
}
233234

@@ -529,6 +530,7 @@ func TestFutureMultipleGetGetTyped(t *testing.T) {
529530
defer conn.Close()
530531

531532
fut := conn.Do(NewCall17Request("simple_concat").Args([]interface{}{"1"}))
533+
defer fut.Release()
532534

533535
for i := 0; i < 30; i++ {
534536
// [0, 10) fut.Get()
@@ -567,6 +569,7 @@ func TestFutureMultipleGetWithError(t *testing.T) {
567569
defer conn.Close()
568570

569571
fut := conn.Do(NewCall17Request("non_exist").Args([]interface{}{"1"}))
572+
defer fut.Release()
570573

571574
for i := 0; i < 2; i++ {
572575
if _, err := fut.Get(); err == nil {
@@ -580,6 +583,7 @@ func TestFutureMultipleGetTypedWithError(t *testing.T) {
580583
defer conn.Close()
581584

582585
fut := conn.Do(NewCall17Request("simple_concat").Args([]interface{}{"1"}))
586+
defer fut.Release()
583587

584588
wrongTpl := struct {
585589
Val int

test_helpers/response.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ func (resp *MockResponse) Header() tarantool.Header {
5454
return resp.header
5555
}
5656

57+
func (resp *MockResponse) Release() {
58+
// Releasing MockResponse data
59+
}
60+
5761
// Decode returns the result of decoding the response data as slice.
5862
func (resp *MockResponse) Decode() ([]interface{}, error) {
5963
if resp.data == nil {

0 commit comments

Comments
 (0)