Skip to content

Commit 104d293

Browse files
committed
api: updated response/future methods
Added Response's interface method Buffer. Connection now contains sync.Pool that was used to reduce allocations. Fixes #493
1 parent 713316b commit 104d293

File tree

6 files changed

+57
-13
lines changed

6 files changed

+57
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1212

1313
* New types for MessagePack extensions compatible with go-option (#459).
1414
* Added `box.MustNew` wrapper for `box.New` without an error (#448).
15+
* Method `Buffer` for `Response` interface(#493).
1516

1617
### Changed
1718

connection.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,12 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
175175
// More on graceful shutdown:
176176
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
177177
type Connection struct {
178-
addr net.Addr
179-
dialer Dialer
180-
c Conn
181-
mutex sync.Mutex
182-
cond *sync.Cond
178+
addr net.Addr
179+
dialer Dialer
180+
c Conn
181+
mutex sync.Mutex
182+
cond *sync.Cond
183+
slicePool *sync.Pool
183184
// schemaResolver contains a SchemaResolver implementation.
184185
schemaResolver SchemaResolver
185186
// requestId contains the last request ID for requests with nil context.
@@ -373,7 +374,12 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
373374
}
374375

375376
conn.cond = sync.NewCond(&conn.mutex)
376-
377+
conn.slicePool = &sync.Pool{
378+
New: func() any {
379+
buf := make([]byte, 0, 512)
380+
return &buf
381+
},
382+
}
377383
if conn.opts.Reconnect > 0 {
378384
// We don't need these mutex.Lock()/mutex.Unlock() here, but
379385
// runReconnects() expects mutex.Lock() to be set, so it's
@@ -848,8 +854,9 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
848854

849855
go conn.eventer(events)
850856

857+
buf := smallBuf{}
851858
for atomic.LoadUint32(&conn.state) != connClosed {
852-
respBytes, err := read(r, conn.lenbuf[:])
859+
respBytes, err := read(r, conn.lenbuf[:], conn)
853860
if err != nil {
854861
err = ClientError{
855862
ErrIoError,
@@ -858,7 +865,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
858865
conn.reconnect(err, c)
859866
return
860867
}
861-
buf := smallBuf{b: respBytes}
868+
buf.b, buf.p = respBytes, 0
862869
header, code, err := decodeHeader(conn.dec, &buf)
863870
if err != nil {
864871
err = ClientError{
@@ -925,7 +932,7 @@ func (conn *Connection) eventer(events <-chan connWatchEvent) {
925932

926933
func (conn *Connection) newFuture(req Request) (fut *Future) {
927934
ctx := req.Ctx()
928-
fut = NewFuture(req)
935+
fut = NewFuture(req, conn)
929936
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
930937
select {
931938
case conn.rlimit <- struct{}{}:
@@ -1187,7 +1194,7 @@ func (conn *Connection) timeouts() {
11871194
}
11881195
}
11891196

1190-
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
1197+
func read(r io.Reader, lenbuf []byte, conn ...*Connection) (response []byte, err error) {
11911198
var length uint64
11921199

11931200
if _, err = io.ReadFull(r, lenbuf); err != nil {
@@ -1211,7 +1218,15 @@ func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
12111218
return
12121219
}
12131220

1214-
response = make([]byte, length)
1221+
if len(conn) == 0 {
1222+
response = make([]byte, length)
1223+
} else {
1224+
response = *conn[0].slicePool.Get().(*[]byte)
1225+
if cap(response) < int(length) {
1226+
response = make([]byte, length)
1227+
}
1228+
response = response[:length]
1229+
}
12151230
_, err = io.ReadFull(r, response)
12161231

12171232
return
@@ -1232,7 +1247,7 @@ func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
12321247
func (conn *Connection) Do(req Request) *Future {
12331248
if connectedReq, ok := req.(ConnectedRequest); ok {
12341249
if connectedReq.Conn() != conn {
1235-
fut := NewFuture(req)
1250+
fut := NewFuture(req, conn)
12361251
fut.SetError(errUnknownRequest)
12371252
return fut
12381253
}

future.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type Future struct {
1212
req Request
1313
next *Future
1414
timeout time.Duration
15+
pool *sync.Pool
1516
mutex sync.Mutex
1617
resp Response
1718
err error
@@ -38,10 +39,15 @@ func (fut *Future) isDone() bool {
3839
}
3940

4041
// NewFuture creates a new empty Future for a given Request.
41-
func NewFuture(req Request) (fut *Future) {
42+
func NewFuture(req Request, conn ...*Connection) (fut *Future) {
4243
fut = &Future{}
4344
fut.done = make(chan struct{})
4445
fut.req = req
46+
if len(conn) == 0 {
47+
fut.pool = nil
48+
} else {
49+
fut.pool = conn[0].slicePool
50+
}
4551
return fut
4652
}
4753

@@ -61,6 +67,7 @@ func (fut *Future) SetResponse(header Header, body io.Reader) error {
6167
fut.resp = resp
6268

6369
close(fut.done)
70+
fut.release()
6471
return nil
6572
}
6673

@@ -75,6 +82,7 @@ func (fut *Future) SetError(err error) {
7582
fut.err = err
7683

7784
close(fut.done)
85+
fut.release()
7886
}
7987

8088
// GetResponse waits for Future to be filled and returns Response and error.
@@ -127,3 +135,9 @@ func (fut *Future) WaitChan() <-chan struct{} {
127135
}
128136
return fut.done
129137
}
138+
139+
func (fut *Future) release() {
140+
if fut.pool != nil && fut.resp != nil {
141+
fut.pool.Put(fut.resp.Buffer())
142+
}
143+
}

future_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (resp *futureMockResponse) Header() Header {
5353
return resp.header
5454
}
5555

56+
func (resp *futureMockResponse) Buffer() *[]byte {
57+
return &resp.data
58+
}
59+
5660
func (resp *futureMockResponse) Decode() ([]interface{}, error) {
5761
resp.decodeCnt++
5862

response.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
type Response interface {
1313
// Header returns a response header.
1414
Header() Header
15+
// Buffer returns a response buffer.
16+
Buffer() *[]byte
1517
// Decode decodes a response.
1618
Decode() ([]interface{}, error)
1719
// DecodeTyped decodes a response into a given container res.
@@ -657,6 +659,10 @@ func (resp *baseResponse) Header() Header {
657659
return resp.header
658660
}
659661

662+
func (resp *baseResponse) Buffer() *[]byte {
663+
return &resp.buf.b
664+
}
665+
660666
// Pos returns a position descriptor of the last selected tuple for the SelectResponse.
661667
// If the response was not decoded, this method will call Decode().
662668
func (resp *SelectResponse) Pos() ([]byte, error) {

test_helpers/response.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ func (resp *MockResponse) Header() tarantool.Header {
5454
return resp.header
5555
}
5656

57+
func (resp *MockResponse) Buffer() *[]byte {
58+
return &resp.data
59+
}
60+
5761
// Decode returns the result of decoding the response data as slice.
5862
func (resp *MockResponse) Decode() ([]interface{}, error) {
5963
if resp.data == nil {

0 commit comments

Comments
 (0)