diff --git a/CHANGELOG.md b/CHANGELOG.md index 42746418c..d581dbf0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. * New types for MessagePack extensions compatible with go-option (#459). * Added `box.MustNew` wrapper for `box.New` without an error (#448). +* Method `Release` for `Future` and `Response` interface that allows + to free used data directly by calling (#493). ### Changed diff --git a/connection.go b/connection.go index 3525bfda0..bdcf94f8e 100644 --- a/connection.go +++ b/connection.go @@ -39,6 +39,18 @@ var ( "to the current connection or connection pool") ) +var smallBufPool = &sync.Pool{ + New: func() interface{} { + return &smallBuf{} + }, +} + +var slicePool = &sync.Pool{ + New: func() interface{} { + return make([]byte, 1024) + }, +} + const ( // Connected signals that connection is established or reestablished. Connected ConnEventKind = iota + 1 @@ -373,7 +385,6 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e } conn.cond = sync.NewCond(&conn.mutex) - if conn.opts.Reconnect > 0 { // We don't need these mutex.Lock()/mutex.Unlock() here, but // runReconnects() expects mutex.Lock() to be set, so it's @@ -849,7 +860,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) { go conn.eventer(events) for atomic.LoadUint32(&conn.state) != connClosed { - respBytes, err := read(r, conn.lenbuf[:]) + respBytes, err := read(r, conn.lenbuf[:], conn) if err != nil { err = ClientError{ ErrIoError, @@ -858,8 +869,9 @@ func (conn *Connection) reader(r io.Reader, c Conn) { conn.reconnect(err, c) return } - buf := smallBuf{b: respBytes} - header, code, err := decodeHeader(conn.dec, &buf) + buf := smallBufPool.Get().(*smallBuf) + buf.b = respBytes + header, code, err := decodeHeader(conn.dec, buf) if err != nil { err = ClientError{ ErrProtocolError, @@ -871,7 +883,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) { var fut *Future = nil if code == iproto.IPROTO_EVENT { - if event, err := readWatchEvent(&buf); err == nil { + if event, err := readWatchEvent(buf); err == nil { events <- event } else { err = ClientError{ @@ -885,7 +897,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) { conn.opts.Logger.Report(LogBoxSessionPushUnsupported, conn, header) } else { if fut = conn.fetchFuture(header.RequestId); fut != nil { - if err := fut.SetResponse(header, &buf); err != nil { + if err := fut.SetResponse(header, buf); err != nil { fut.SetError(fmt.Errorf("failed to set response: %w", err)) } conn.markDone(fut) @@ -1187,7 +1199,9 @@ func (conn *Connection) timeouts() { } } -func read(r io.Reader, lenbuf []byte) (response []byte, err error) { +// read uses args to allocate slices for responses using sync.Pool. +// data could be released later using Release. +func read(r io.Reader, lenbuf []byte, args ...interface{}) (response []byte, err error) { var length uint64 if _, err = io.ReadFull(r, lenbuf); err != nil { @@ -1211,7 +1225,18 @@ func read(r io.Reader, lenbuf []byte) (response []byte, err error) { return } - response = make([]byte, length) + if len(args) == 0 { + response = make([]byte, length) + } else { + ptr := slicePool.Get().([]byte) + if cap(ptr) < int(length) { + response = make([]byte, length) + slicePool.Put(ptr) // nolint + } else { + response = ptr + response = response[:length] + } + } _, err = io.ReadFull(r, response) return diff --git a/example_test.go b/example_test.go index 9eadf5971..ef752d876 100644 --- a/example_test.go +++ b/example_test.go @@ -1291,6 +1291,7 @@ func ExampleConnection_Do_failure() { // We got a future, the request actually not performed yet. future := conn.Do(req) + defer future.Release() // When the future receives the response, the result of the Future is set // and becomes available. We could wait for that moment with Future.Get(), diff --git a/future.go b/future.go index 0f882014a..e7b227d55 100644 --- a/future.go +++ b/future.go @@ -127,3 +127,11 @@ func (fut *Future) WaitChan() <-chan struct{} { } return fut.done } + +// Release is freeing the Future resources. +// After this, using this Future becomes invalid. +func (fut *Future) Release() { + if fut.resp != nil { + fut.resp.Release() + } +} diff --git a/future_test.go b/future_test.go index 47f4e3c20..0fdf00b96 100644 --- a/future_test.go +++ b/future_test.go @@ -53,6 +53,10 @@ func (resp *futureMockResponse) Header() Header { return resp.header } +func (resp *futureMockResponse) Release() { + // Releasing futureMockResponse data. +} + func (resp *futureMockResponse) Decode() ([]interface{}, error) { resp.decodeCnt++ diff --git a/prepared.go b/prepared.go index 6f7ace911..035b59bd0 100644 --- a/prepared.go +++ b/prepared.go @@ -90,7 +90,7 @@ func (req *PrepareRequest) Response(header Header, body io.Reader) (Response, er if err != nil { return nil, err } - return &PrepareResponse{baseResponse: baseResp}, nil + return &PrepareResponse{baseResponse: *baseResp}, nil } // UnprepareRequest helps you to create an unprepare request object for @@ -204,5 +204,5 @@ func (req *ExecutePreparedRequest) Response(header Header, body io.Reader) (Resp if err != nil { return nil, err } - return &ExecuteResponse{baseResponse: baseResp}, nil + return &ExecuteResponse{baseResponse: *baseResp}, nil } diff --git a/request.go b/request.go index b01e1f92c..28e1ed5bd 100644 --- a/request.go +++ b/request.go @@ -620,11 +620,11 @@ func (req *SelectRequest) Context(ctx context.Context) *SelectRequest { // Response creates a response for the SelectRequest. func (req *SelectRequest) Response(header Header, body io.Reader) (Response, error) { - baseResp, err := createBaseResponse(header, body) + SelectResp, err := createSelectResponse(header, body) if err != nil { return nil, err } - return &SelectResponse{baseResponse: baseResp}, nil + return SelectResp, nil } // InsertRequest helps you to create an insert request object for execution @@ -1154,7 +1154,7 @@ func (req *ExecuteRequest) Response(header Header, body io.Reader) (Response, er if err != nil { return nil, err } - return &ExecuteResponse{baseResponse: baseResp}, nil + return &ExecuteResponse{baseResponse: *baseResp}, nil } // WatchOnceRequest synchronously fetches the value currently associated with a diff --git a/response.go b/response.go index 36aad66a0..0fe7e374b 100644 --- a/response.go +++ b/response.go @@ -3,6 +3,7 @@ package tarantool import ( "fmt" "io" + "sync" "github.com/tarantool/go-iproto" "github.com/vmihailenco/msgpack/v5" @@ -12,6 +13,8 @@ import ( type Response interface { // Header returns a response header. Header() Header + // Release free responses data. + Release() // Decode decodes a response. Decode() ([]interface{}, error) // DecodeTyped decodes a response into a given container res. @@ -31,24 +34,42 @@ type baseResponse struct { err error } -func createBaseResponse(header Header, body io.Reader) (baseResponse, error) { +func createBaseResponse(header Header, body io.Reader) (*baseResponse, error) { + resp := &baseResponse{} if body == nil { - return baseResponse{header: header}, nil + resp.header = header + return resp, nil } if buf, ok := body.(*smallBuf); ok { - return baseResponse{header: header, buf: *buf}, nil + resp.header = header + resp.buf.b = buf.b + resp.buf.p = buf.p + return resp, nil } data, err := io.ReadAll(body) if err != nil { - return baseResponse{}, err + return resp, err } - return baseResponse{header: header, buf: smallBuf{b: data}}, nil + resp.header = header + resp.buf.b = data + return resp, nil +} + +func (resp *baseResponse) clear() { + *resp = baseResponse{} +} + +func (resp *baseResponse) Release() { + slicePool.Put(resp.buf.b) // nolint + resp.buf.b = nil + resp.buf.p = 0 + smallBufPool.Put(&resp.buf) } // DecodeBaseResponse parse response header and body. func DecodeBaseResponse(header Header, body io.Reader) (Response, error) { resp, err := createBaseResponse(header, body) - return &resp, err + return resp, err } // SelectResponse is used for the select requests. @@ -657,6 +678,41 @@ func (resp *baseResponse) Header() Header { return resp.header } +var selectResponsePool *sync.Pool = &sync.Pool{ + New: func() interface{} { + return &SelectResponse{} + }, +} + +func createSelectResponse(header Header, body io.Reader) (*SelectResponse, error) { + resp := selectResponsePool.Get().(*SelectResponse) + if body == nil { + resp.header = header + return resp, nil + } + if buf, ok := body.(*smallBuf); ok { + resp.header = header + resp.buf.b = buf.b + resp.buf.p = buf.p + return resp, nil + } + data, err := io.ReadAll(body) + if err != nil { + return resp, err + } + resp.header = header + resp.buf.b = data + return resp, nil +} + +func (resp *SelectResponse) Release() { + resp.baseResponse.Release() + resp.baseResponse.clear() + resp.pos = nil + + selectResponsePool.Put(resp) +} + // Pos returns a position descriptor of the last selected tuple for the SelectResponse. // If the response was not decoded, this method will call Decode(). func (resp *SelectResponse) Pos() ([]byte, error) { diff --git a/tarantool_test.go b/tarantool_test.go index b3dab7543..3e1d7b3e4 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -220,14 +220,15 @@ func BenchmarkSync_naive_with_custom_type(b *testing.B) { b.ResetTimer() for b.Loop() { - err := conn.Do(req).GetTyped(&tuple) - if err != nil { + fut := conn.Do(req) + if err := fut.GetTyped(&tuple); err != nil { b.Errorf("request error: %s", err) } if tuple.id != 1111 { b.Errorf("invalid result") } + fut.Release() } } diff --git a/test_helpers/response.go b/test_helpers/response.go index 630ac7726..8e44aaf23 100644 --- a/test_helpers/response.go +++ b/test_helpers/response.go @@ -54,6 +54,10 @@ func (resp *MockResponse) Header() tarantool.Header { return resp.header } +func (resp *MockResponse) Release() { + // Releasing MockResponse data +} + // Decode returns the result of decoding the response data as slice. func (resp *MockResponse) Decode() ([]interface{}, error) { if resp.data == nil {