Skip to content

Commit 290c61b

Browse files
feat: sync pools added to all internal structs
1 parent d80798a commit 290c61b

File tree

33 files changed

+1622
-389
lines changed

33 files changed

+1622
-389
lines changed

core/providers/anthropic.go

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"io"
1212
"net/http"
1313
"strings"
14-
"sync"
1514
"time"
1615

1716
"github.com/bytedance/sonic"
@@ -31,48 +30,6 @@ type AnthropicProvider struct {
3130
customProviderConfig *schemas.CustomProviderConfig // Custom provider config
3231
}
3332

34-
// anthropicChatResponsePool provides a pool for Anthropic chat response objects.
35-
var anthropicChatResponsePool = sync.Pool{
36-
New: func() interface{} {
37-
return &anthropic.AnthropicMessageResponse{}
38-
},
39-
}
40-
41-
// anthropicTextResponsePool provides a pool for Anthropic text response objects.
42-
var anthropicTextResponsePool = sync.Pool{
43-
New: func() interface{} {
44-
return &anthropic.AnthropicTextResponse{}
45-
},
46-
}
47-
48-
// acquireAnthropicChatResponse gets an Anthropic chat response from the pool and resets it.
49-
func acquireAnthropicChatResponse() *anthropic.AnthropicMessageResponse {
50-
resp := anthropicChatResponsePool.Get().(*anthropic.AnthropicMessageResponse)
51-
*resp = anthropic.AnthropicMessageResponse{} // Reset the struct
52-
return resp
53-
}
54-
55-
// releaseAnthropicChatResponse returns an Anthropic chat response to the pool.
56-
func releaseAnthropicChatResponse(resp *anthropic.AnthropicMessageResponse) {
57-
if resp != nil {
58-
anthropicChatResponsePool.Put(resp)
59-
}
60-
}
61-
62-
// acquireAnthropicTextResponse gets an Anthropic text response from the pool and resets it.
63-
func acquireAnthropicTextResponse() *anthropic.AnthropicTextResponse {
64-
resp := anthropicTextResponsePool.Get().(*anthropic.AnthropicTextResponse)
65-
*resp = anthropic.AnthropicTextResponse{} // Reset the struct
66-
return resp
67-
}
68-
69-
// releaseAnthropicTextResponse returns an Anthropic text response to the pool.
70-
func releaseAnthropicTextResponse(resp *anthropic.AnthropicTextResponse) {
71-
if resp != nil {
72-
anthropicTextResponsePool.Put(resp)
73-
}
74-
}
75-
7633
// NewAnthropicProvider creates a new Anthropic provider instance.
7734
// It initializes the HTTP client with the provided configuration and sets up response pools.
7835
// The client is configured with timeouts, concurrency limits, and optional proxy settings.
@@ -92,8 +49,8 @@ func NewAnthropicProvider(config *schemas.ProviderConfig, logger schemas.Logger)
9249

9350
// Pre-warm response pools
9451
for i := 0; i < config.ConcurrencyAndBufferSize.Concurrency; i++ {
95-
anthropicTextResponsePool.Put(&anthropic.AnthropicTextResponse{})
96-
anthropicChatResponsePool.Put(&anthropic.AnthropicMessageResponse{})
52+
anthropic.ReleaseTextResponse(&anthropic.AnthropicTextResponse{})
53+
anthropic.ReleaseChatResponse(&anthropic.AnthropicMessageResponse{})
9754
}
9855

9956
// Configure proxy if provided
@@ -186,6 +143,7 @@ func (provider *AnthropicProvider) TextCompletion(ctx context.Context, key schem
186143
if reqBody == nil {
187144
return nil, newBifrostOperationError("text completion input is not provided", nil, provider.GetProviderKey())
188145
}
146+
defer anthropic.ReleaseTextRequest(reqBody)
189147

190148
// Use struct directly for JSON marshaling
191149
responseBody, latency, err := provider.completeRequest(ctx, reqBody, provider.networkConfig.BaseURL+"/v1/complete", key.Value)
@@ -194,8 +152,8 @@ func (provider *AnthropicProvider) TextCompletion(ctx context.Context, key schem
194152
}
195153

196154
// Create response object from pool
197-
response := acquireAnthropicTextResponse()
198-
defer releaseAnthropicTextResponse(response)
155+
response := anthropic.AcquireTextResponse()
156+
defer anthropic.ReleaseTextResponse(response)
199157

200158
rawResponse, bifrostErr := handleProviderResponse(responseBody, response, provider.sendBackRawResponse)
201159
if bifrostErr != nil {
@@ -238,6 +196,7 @@ func (provider *AnthropicProvider) ChatCompletion(ctx context.Context, key schem
238196
if reqBody == nil {
239197
return nil, newBifrostOperationError("chat completion input is not provided", nil, provider.GetProviderKey())
240198
}
199+
defer anthropic.ReleaseChatRequest(reqBody)
241200

242201
// Use struct directly for JSON marshaling
243202
responseBody, latency, err := provider.completeRequest(ctx, reqBody, provider.networkConfig.BaseURL+"/v1/messages", key.Value)
@@ -246,8 +205,8 @@ func (provider *AnthropicProvider) ChatCompletion(ctx context.Context, key schem
246205
}
247206

248207
// Create response object from pool
249-
response := acquireAnthropicChatResponse()
250-
defer releaseAnthropicChatResponse(response)
208+
response := anthropic.AcquireChatResponse()
209+
defer anthropic.ReleaseChatResponse(response)
251210

252211
rawResponse, bifrostErr := handleProviderResponse(responseBody, response, provider.sendBackRawResponse)
253212
if bifrostErr != nil {
@@ -284,6 +243,7 @@ func (provider *AnthropicProvider) Responses(ctx context.Context, key schemas.Ke
284243
if reqBody == nil {
285244
return nil, newBifrostOperationError("responses input is not provided", nil, provider.GetProviderKey())
286245
}
246+
defer anthropic.ReleaseChatRequest(reqBody) // ToAnthropicResponsesRequest returns *AnthropicMessageRequest
287247

288248
// Use struct directly for JSON marshaling
289249
responseBody, latency, err := provider.completeRequest(ctx, reqBody, provider.networkConfig.BaseURL+"/v1/messages", key.Value)
@@ -292,8 +252,8 @@ func (provider *AnthropicProvider) Responses(ctx context.Context, key schemas.Ke
292252
}
293253

294254
// Create response object from pool
295-
response := acquireAnthropicChatResponse()
296-
defer releaseAnthropicChatResponse(response)
255+
response := anthropic.AcquireChatResponse()
256+
defer anthropic.ReleaseChatResponse(response)
297257

298258
rawResponse, bifrostErr := handleProviderResponse(responseBody, response, provider.sendBackRawResponse)
299259
if bifrostErr != nil {
@@ -335,6 +295,7 @@ func (provider *AnthropicProvider) ChatCompletionStream(ctx context.Context, pos
335295
if reqBody == nil {
336296
return nil, newBifrostOperationError("failed to convert request", fmt.Errorf("conversion returned nil"), provider.GetProviderKey())
337297
}
298+
defer anthropic.ReleaseChatRequest(reqBody)
338299
reqBody.Stream = schemas.Ptr(true)
339300

340301
// Prepare Anthropic headers

core/providers/azure.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ func NewAzureProvider(config *schemas.ProviderConfig, logger schemas.Logger) (*A
4343
Timeout: time.Second * time.Duration(config.NetworkConfig.DefaultRequestTimeoutInSeconds),
4444
}
4545

46+
// Pre-warm OpenAI pools since Azure uses OpenAI schema objects
47+
for i := 0; i < config.ConcurrencyAndBufferSize.Concurrency; i++ {
48+
openai.ReleaseTextRequest(&openai.OpenAITextCompletionRequest{})
49+
openai.ReleaseChatRequest(&openai.OpenAIChatRequest{})
50+
openai.ReleaseEmbeddingRequest(&openai.OpenAIEmbeddingRequest{})
51+
}
52+
4653
// Configure proxy if provided
4754
client = configureProxy(client, config.ProxyConfig, logger)
4855

@@ -151,6 +158,7 @@ func (provider *AzureProvider) TextCompletion(ctx context.Context, key schemas.K
151158
if reqBody == nil {
152159
return nil, newBifrostOperationError("text completion input is not provided", nil, schemas.Azure)
153160
}
161+
defer openai.ReleaseTextRequest(reqBody)
154162

155163
responseBody, latency, err := provider.completeRequest(ctx, reqBody, "completions", key, request.Model)
156164
if err != nil {
@@ -241,6 +249,7 @@ func (provider *AzureProvider) ChatCompletion(ctx context.Context, key schemas.K
241249
if reqBody == nil {
242250
return nil, newBifrostOperationError("chat completion input is not provided", nil, schemas.Azure)
243251
}
252+
defer openai.ReleaseChatRequest(reqBody)
244253

245254
responseBody, latency, err := provider.completeRequest(ctx, reqBody, "chat/completions", key, request.Model)
246255
if err != nil {
@@ -297,6 +306,7 @@ func (provider *AzureProvider) Embedding(ctx context.Context, key schemas.Key, r
297306
if reqBody == nil {
298307
return nil, newBifrostOperationError("embedding input is not provided", nil, schemas.Azure)
299308
}
309+
defer openai.ReleaseEmbeddingRequest(reqBody)
300310

301311
responseBody, latency, err := provider.completeRequest(ctx, reqBody, "embeddings", key, request.Model)
302312
if err != nil {

core/providers/cohere.go

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"fmt"
1111
"io"
1212
"strings"
13-
"sync"
1413
"time"
1514

1615
"net/http"
@@ -21,26 +20,7 @@ import (
2120
"github.com/valyala/fasthttp"
2221
)
2322

24-
// cohereResponsePool provides a pool for Cohere v2 response objects.
25-
var cohereResponsePool = sync.Pool{
26-
New: func() interface{} {
27-
return &cohere.CohereChatResponse{}
28-
},
29-
}
30-
31-
// acquireCohereResponse gets a Cohere v2 response from the pool and resets it.
32-
func acquireCohereResponse() *cohere.CohereChatResponse {
33-
resp := cohereResponsePool.Get().(*cohere.CohereChatResponse)
34-
*resp = cohere.CohereChatResponse{} // Reset the struct
35-
return resp
36-
}
37-
38-
// releaseCohereResponse returns a Cohere v2 response to the pool.
39-
func releaseCohereResponse(resp *cohere.CohereChatResponse) {
40-
if resp != nil {
41-
cohereResponsePool.Put(resp)
42-
}
43-
}
23+
// Removed deprecated cohereResponsePool - now using schema-level pools
4424

4525
// CohereProvider implements the Provider interface for Cohere.
4626
type CohereProvider struct {
@@ -71,7 +51,8 @@ func NewCohereProvider(config *schemas.ProviderConfig, logger schemas.Logger) *C
7151

7252
// Pre-warm response pools
7353
for i := 0; i < config.ConcurrencyAndBufferSize.Concurrency; i++ {
74-
cohereResponsePool.Put(&cohere.CohereChatResponse{})
54+
cohere.ReleaseChatRequest(&cohere.CohereChatRequest{})
55+
cohere.ReleaseEmbeddingRequest(&cohere.CohereEmbeddingRequest{})
7556
}
7657

7758
// Set default BaseURL if not provided
@@ -124,6 +105,7 @@ func (provider *CohereProvider) ChatCompletion(ctx context.Context, key schemas.
124105
if reqBody == nil {
125106
return nil, newBifrostOperationError("chat completion input is not provided", nil, providerName)
126107
}
108+
defer cohere.ReleaseChatRequest(reqBody)
127109

128110
cohereResponse, rawResponse, latency, err := provider.handleCohereChatCompletionRequest(ctx, reqBody, key)
129111
if err != nil {
@@ -236,6 +218,7 @@ func (provider *CohereProvider) Responses(ctx context.Context, key schemas.Key,
236218
if reqBody == nil {
237219
return nil, newBifrostOperationError("responses input is not provided", nil, providerName)
238220
}
221+
defer cohere.ReleaseChatRequest(reqBody) // ToCohereResponsesRequest returns *CohereChatRequest
239222

240223
cohereResponse, rawResponse, latency, err := provider.handleCohereChatCompletionRequest(ctx, reqBody, key)
241224
if err != nil {
@@ -273,6 +256,7 @@ func (provider *CohereProvider) Embedding(ctx context.Context, key schemas.Key,
273256
if reqBody == nil {
274257
return nil, newBifrostOperationError("embedding input is not provided", nil, providerName)
275258
}
259+
defer cohere.ReleaseEmbeddingRequest(reqBody)
276260

277261
// Marshal request body
278262
jsonBody, err := sonic.Marshal(reqBody)
@@ -357,6 +341,7 @@ func (provider *CohereProvider) ChatCompletionStream(ctx context.Context, postHo
357341
if reqBody == nil {
358342
return nil, newBifrostOperationError("chat completion input is not provided", nil, providerName)
359343
}
344+
defer cohere.ReleaseChatRequest(reqBody)
360345
reqBody.Stream = schemas.Ptr(true)
361346

362347
jsonBody, err := sonic.Marshal(reqBody)

core/providers/openai.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func handleOpenAITextCompletionRequest(
111111
if reqBody == nil {
112112
return nil, newBifrostOperationError("text completion input is not provided", nil, providerName)
113113
}
114+
defer openai.ReleaseTextRequest(reqBody)
114115
// Create request
115116
req := fasthttp.AcquireRequest()
116117
resp := fasthttp.AcquireResponse()
@@ -207,6 +208,7 @@ func handleOpenAITextCompletionStreaming(
207208
if reqBody == nil {
208209
return nil, newBifrostOperationError("text completion input is not provided", nil, providerName)
209210
}
211+
defer openai.ReleaseTextRequest(reqBody)
210212
reqBody.Stream = schemas.Ptr(true)
211213
reqBody.StreamOptions = &schemas.ChatStreamOptions{
212214
IncludeUsage: schemas.Ptr(true),
@@ -437,6 +439,7 @@ func handleOpenAIChatCompletionRequest(
437439
if reqBody == nil {
438440
return nil, newBifrostOperationError("chat completion input is not provided", nil, providerName)
439441
}
442+
defer openai.ReleaseChatRequest(reqBody)
440443

441444
jsonBody, err := sonic.Marshal(reqBody)
442445
if err != nil {
@@ -532,6 +535,7 @@ func handleOpenAIResponsesRequest(
532535
if reqBody == nil {
533536
return nil, newBifrostOperationError("responses input is not provided", nil, providerName)
534537
}
538+
defer openai.ReleaseResponsesRequest(reqBody)
535539

536540
jsonBody, err := sonic.Marshal(reqBody)
537541
if err != nil {
@@ -633,6 +637,7 @@ func handleOpenAIEmbeddingRequest(
633637
if reqBody == nil {
634638
return nil, newBifrostOperationError("embedding input is not provided", nil, providerName)
635639
}
640+
defer openai.ReleaseEmbeddingRequest(reqBody)
636641

637642
jsonBody, err := sonic.Marshal(reqBody)
638643
if err != nil {
@@ -730,6 +735,7 @@ func handleOpenAIStreaming(
730735
if reqBody == nil {
731736
return nil, newBifrostOperationError("chat completion input is not provided", nil, providerName)
732737
}
738+
defer openai.ReleaseChatRequest(reqBody)
733739
reqBody.Stream = schemas.Ptr(true)
734740
reqBody.StreamOptions = &schemas.ChatStreamOptions{
735741
IncludeUsage: schemas.Ptr(true),
@@ -950,6 +956,7 @@ func (provider *OpenAIProvider) Speech(ctx context.Context, key schemas.Key, req
950956
if reqBody == nil {
951957
return nil, newBifrostOperationError("speech input is not provided", nil, providerName)
952958
}
959+
defer openai.ReleaseSpeechRequest(reqBody)
953960

954961
jsonBody, err := sonic.Marshal(reqBody)
955962
if err != nil {
@@ -1022,6 +1029,7 @@ func (provider *OpenAIProvider) SpeechStream(ctx context.Context, postHookRunner
10221029
if reqBody == nil {
10231030
return nil, newBifrostOperationError("speech input is not provided", nil, providerName)
10241031
}
1032+
defer openai.ReleaseSpeechRequest(reqBody)
10251033
reqBody.StreamFormat = schemas.Ptr("sse")
10261034

10271035
jsonBody, err := sonic.Marshal(reqBody)
@@ -1202,6 +1210,7 @@ func (provider *OpenAIProvider) Transcription(ctx context.Context, key schemas.K
12021210
if reqBody == nil {
12031211
return nil, newBifrostOperationError("transcription input is not provided", nil, providerName)
12041212
}
1213+
defer openai.ReleaseTranscriptionRequest(reqBody)
12051214

12061215
// Create multipart form
12071216
var body bytes.Buffer
@@ -1289,6 +1298,7 @@ func (provider *OpenAIProvider) TranscriptionStream(ctx context.Context, postHoo
12891298
if reqBody == nil {
12901299
return nil, newBifrostOperationError("transcription input is not provided", nil, providerName)
12911300
}
1301+
defer openai.ReleaseTranscriptionRequest(reqBody)
12921302
reqBody.Stream = schemas.Ptr(true)
12931303

12941304
// Create multipart form

core/providers/vertex.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,8 @@ func NewVertexProvider(config *schemas.ProviderConfig, logger schemas.Logger) (*
7070

7171
// Pre-warm response pools
7272
for range config.ConcurrencyAndBufferSize.Concurrency {
73-
// openAIResponsePool.Put(&schemas.BifrostResponse{})
74-
anthropicChatResponsePool.Put(&anthropic.AnthropicMessageResponse{})
75-
73+
vertex.ReleaseEmbeddingRequest(&vertex.VertexEmbeddingRequest{})
74+
vertex.ReleaseEmbeddingResponse(&vertex.VertexEmbeddingResponse{})
7675
}
7776

7877
return &VertexProvider{
@@ -222,7 +221,7 @@ func (provider *VertexProvider) ChatCompletion(ctx context.Context, key schemas.
222221
},
223222
}
224223
}
225-
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
224+
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
226225
return nil, newBifrostOperationError(schemas.ErrProviderRequestTimedOut, err, schemas.Vertex)
227226
}
228227
return nil, &schemas.BifrostError{
@@ -259,7 +258,7 @@ func (provider *VertexProvider) ChatCompletion(ctx context.Context, key schemas.
259258
},
260259
}
261260
}
262-
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
261+
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
263262
return nil, newBifrostOperationError(schemas.ErrProviderRequestTimedOut, err, schemas.Vertex)
264263
}
265264
// Remove client from pool for non-context errors (could be auth/network issues)
@@ -300,8 +299,8 @@ func (provider *VertexProvider) ChatCompletion(ctx context.Context, key schemas.
300299

301300
if strings.Contains(request.Model, "claude") {
302301
// Create response object from pool
303-
response := acquireAnthropicChatResponse()
304-
defer releaseAnthropicChatResponse(response)
302+
response := anthropic.AcquireChatResponse()
303+
defer anthropic.ReleaseChatResponse(response)
305304

306305
rawResponse, bifrostErr := handleProviderResponse(body, response, provider.sendBackRawResponse)
307306
if bifrostErr != nil {
@@ -383,6 +382,7 @@ func (provider *VertexProvider) Embedding(ctx context.Context, key schemas.Key,
383382
if reqBody == nil {
384383
return nil, newConfigurationError("embedding input texts are empty", schemas.Vertex)
385384
}
385+
defer vertex.ReleaseEmbeddingRequest(reqBody)
386386

387387
// All Vertex AI embedding models use the same native Vertex embedding API
388388
return provider.handleVertexEmbedding(ctx, request.Model, key, reqBody, request.Params)
@@ -414,7 +414,7 @@ func (provider *VertexProvider) handleVertexEmbedding(ctx context.Context, model
414414
},
415415
}
416416
}
417-
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
417+
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
418418
return nil, newBifrostOperationError(schemas.ErrProviderRequestTimedOut, err, schemas.Vertex)
419419
}
420420
return nil, newBifrostOperationError(schemas.ErrProviderRequest, err, schemas.Vertex)
@@ -445,7 +445,7 @@ func (provider *VertexProvider) handleVertexEmbedding(ctx context.Context, model
445445
},
446446
}
447447
}
448-
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
448+
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
449449
return nil, newBifrostOperationError(schemas.ErrProviderRequestTimedOut, err, schemas.Vertex)
450450
}
451451
// Remove client from pool for non-context errors (could be auth/network issues)

core/schemas/pool.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package schemas

0 commit comments

Comments
 (0)