Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.

* New types for MessagePack extensions compatible with go-option (#459).
* Added `box.MustNew` wrapper for `box.New` without an error (#448).
* Method `Release` for `Future` and `Response` interface that allows
to free used data directly by calling (#493).

### Changed

Expand Down
41 changes: 33 additions & 8 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ var (
"to the current connection or connection pool")
)

var smallBufPool = &sync.Pool{
New: func() interface{} {
return &smallBuf{}
},
}

var slicePool = &sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}

const (
// Connected signals that connection is established or reestablished.
Connected ConnEventKind = iota + 1
Expand Down Expand Up @@ -373,7 +385,6 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
}

conn.cond = sync.NewCond(&conn.mutex)

if conn.opts.Reconnect > 0 {
// We don't need these mutex.Lock()/mutex.Unlock() here, but
// runReconnects() expects mutex.Lock() to be set, so it's
Expand Down Expand Up @@ -849,7 +860,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
go conn.eventer(events)

for atomic.LoadUint32(&conn.state) != connClosed {
respBytes, err := read(r, conn.lenbuf[:])
respBytes, err := read(r, conn.lenbuf[:], conn)
if err != nil {
err = ClientError{
ErrIoError,
Expand All @@ -858,8 +869,9 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
conn.reconnect(err, c)
return
}
buf := smallBuf{b: respBytes}
header, code, err := decodeHeader(conn.dec, &buf)
buf := smallBufPool.Get().(*smallBuf)
buf.b = respBytes
header, code, err := decodeHeader(conn.dec, buf)
if err != nil {
err = ClientError{
ErrProtocolError,
Expand All @@ -871,7 +883,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {

var fut *Future = nil
if code == iproto.IPROTO_EVENT {
if event, err := readWatchEvent(&buf); err == nil {
if event, err := readWatchEvent(buf); err == nil {
events <- event
} else {
err = ClientError{
Expand All @@ -885,7 +897,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
conn.opts.Logger.Report(LogBoxSessionPushUnsupported, conn, header)
} else {
if fut = conn.fetchFuture(header.RequestId); fut != nil {
if err := fut.SetResponse(header, &buf); err != nil {
if err := fut.SetResponse(header, buf); err != nil {
fut.SetError(fmt.Errorf("failed to set response: %w", err))
}
conn.markDone(fut)
Expand Down Expand Up @@ -1187,7 +1199,9 @@ func (conn *Connection) timeouts() {
}
}

func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
// read uses args to allocate slices for responses using sync.Pool.
// data could be released later using Release.
func read(r io.Reader, lenbuf []byte, args ...interface{}) (response []byte, err error) {
var length uint64

if _, err = io.ReadFull(r, lenbuf); err != nil {
Expand All @@ -1211,7 +1225,18 @@ func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
return
}

response = make([]byte, length)
if len(args) == 0 {
response = make([]byte, length)
} else {
ptr := slicePool.Get().([]byte)
if cap(ptr) < int(length) {
response = make([]byte, length)
slicePool.Put(ptr) // nolint
} else {
response = ptr
response = response[:length]
}
}
_, err = io.ReadFull(r, response)

return
Expand Down
1 change: 1 addition & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,7 @@ func ExampleConnection_Do_failure() {

// We got a future, the request actually not performed yet.
future := conn.Do(req)
defer future.Release()

// When the future receives the response, the result of the Future is set
// and becomes available. We could wait for that moment with Future.Get(),
Expand Down
8 changes: 8 additions & 0 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,11 @@ func (fut *Future) WaitChan() <-chan struct{} {
}
return fut.done
}

// Release is freeing the Future resources.
// After this, using this Future becomes invalid.
func (fut *Future) Release() {
if fut.resp != nil {
fut.resp.Release()
}
}
4 changes: 4 additions & 0 deletions future_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (resp *futureMockResponse) Header() Header {
return resp.header
}

func (resp *futureMockResponse) Release() {
// Releasing futureMockResponse data.
}

func (resp *futureMockResponse) Decode() ([]interface{}, error) {
resp.decodeCnt++

Expand Down
4 changes: 2 additions & 2 deletions prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (req *PrepareRequest) Response(header Header, body io.Reader) (Response, er
if err != nil {
return nil, err
}
return &PrepareResponse{baseResponse: baseResp}, nil
return &PrepareResponse{baseResponse: *baseResp}, nil
}

// UnprepareRequest helps you to create an unprepare request object for
Expand Down Expand Up @@ -204,5 +204,5 @@ func (req *ExecutePreparedRequest) Response(header Header, body io.Reader) (Resp
if err != nil {
return nil, err
}
return &ExecuteResponse{baseResponse: baseResp}, nil
return &ExecuteResponse{baseResponse: *baseResp}, nil
}
6 changes: 3 additions & 3 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,11 @@ func (req *SelectRequest) Context(ctx context.Context) *SelectRequest {

// Response creates a response for the SelectRequest.
func (req *SelectRequest) Response(header Header, body io.Reader) (Response, error) {
baseResp, err := createBaseResponse(header, body)
SelectResp, err := createSelectResponse(header, body)
if err != nil {
return nil, err
}
return &SelectResponse{baseResponse: baseResp}, nil
return SelectResp, nil
}

// InsertRequest helps you to create an insert request object for execution
Expand Down Expand Up @@ -1154,7 +1154,7 @@ func (req *ExecuteRequest) Response(header Header, body io.Reader) (Response, er
if err != nil {
return nil, err
}
return &ExecuteResponse{baseResponse: baseResp}, nil
return &ExecuteResponse{baseResponse: *baseResp}, nil
}

// WatchOnceRequest synchronously fetches the value currently associated with a
Expand Down
68 changes: 62 additions & 6 deletions response.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tarantool
import (
"fmt"
"io"
"sync"

"github.com/tarantool/go-iproto"
"github.com/vmihailenco/msgpack/v5"
Expand All @@ -12,6 +13,8 @@ import (
type Response interface {
// Header returns a response header.
Header() Header
// Release free responses data.
Release()
// Decode decodes a response.
Decode() ([]interface{}, error)
// DecodeTyped decodes a response into a given container res.
Expand All @@ -31,24 +34,42 @@ type baseResponse struct {
err error
}

func createBaseResponse(header Header, body io.Reader) (baseResponse, error) {
func createBaseResponse(header Header, body io.Reader) (*baseResponse, error) {
resp := &baseResponse{}
if body == nil {
return baseResponse{header: header}, nil
resp.header = header
return resp, nil
}
if buf, ok := body.(*smallBuf); ok {
return baseResponse{header: header, buf: *buf}, nil
resp.header = header
resp.buf.b = buf.b
resp.buf.p = buf.p
return resp, nil
}
data, err := io.ReadAll(body)
if err != nil {
return baseResponse{}, err
return resp, err
}
return baseResponse{header: header, buf: smallBuf{b: data}}, nil
resp.header = header
resp.buf.b = data
return resp, nil
}

func (resp *baseResponse) clear() {
*resp = baseResponse{}
}

func (resp *baseResponse) Release() {
slicePool.Put(resp.buf.b) // nolint
resp.buf.b = nil
resp.buf.p = 0
smallBufPool.Put(&resp.buf)
}

// DecodeBaseResponse parse response header and body.
func DecodeBaseResponse(header Header, body io.Reader) (Response, error) {
resp, err := createBaseResponse(header, body)
return &resp, err
return resp, err
}

// SelectResponse is used for the select requests.
Expand Down Expand Up @@ -657,6 +678,41 @@ func (resp *baseResponse) Header() Header {
return resp.header
}

var selectResponsePool *sync.Pool = &sync.Pool{
New: func() interface{} {
return &SelectResponse{}
},
}

func createSelectResponse(header Header, body io.Reader) (*SelectResponse, error) {
resp := selectResponsePool.Get().(*SelectResponse)
if body == nil {
resp.header = header
return resp, nil
}
if buf, ok := body.(*smallBuf); ok {
resp.header = header
resp.buf.b = buf.b
resp.buf.p = buf.p
return resp, nil
}
data, err := io.ReadAll(body)
if err != nil {
return resp, err
}
resp.header = header
resp.buf.b = data
return resp, nil
}

func (resp *SelectResponse) Release() {
resp.baseResponse.Release()
resp.baseResponse.clear()
resp.pos = nil

selectResponsePool.Put(resp)
}

// Pos returns a position descriptor of the last selected tuple for the SelectResponse.
// If the response was not decoded, this method will call Decode().
func (resp *SelectResponse) Pos() ([]byte, error) {
Expand Down
5 changes: 3 additions & 2 deletions tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,15 @@ func BenchmarkSync_naive_with_custom_type(b *testing.B) {
b.ResetTimer()

for b.Loop() {
err := conn.Do(req).GetTyped(&tuple)
if err != nil {
fut := conn.Do(req)
if err := fut.GetTyped(&tuple); err != nil {
b.Errorf("request error: %s", err)
}

if tuple.id != 1111 {
b.Errorf("invalid result")
}
fut.Release()
}
}

Expand Down
4 changes: 4 additions & 0 deletions test_helpers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (resp *MockResponse) Header() tarantool.Header {
return resp.header
}

func (resp *MockResponse) Release() {
// Releasing MockResponse data
}

// Decode returns the result of decoding the response data as slice.
func (resp *MockResponse) Decode() ([]interface{}, error) {
if resp.data == nil {
Expand Down
Loading