Skip to content

Commit 0ed8c8e

Browse files
feat: sync pools added to all internal structs
1 parent 5a18bbc commit 0ed8c8e

File tree

33 files changed

+1624
-389
lines changed

33 files changed

+1624
-389
lines changed

core/providers/anthropic.go

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"io"
1212
"net/http"
1313
"strings"
14-
"sync"
1514
"time"
1615

1716
"github.com/bytedance/sonic"
@@ -31,48 +30,6 @@ type AnthropicProvider struct {
3130
customProviderConfig *schemas.CustomProviderConfig // Custom provider config
3231
}
3332

34-
// anthropicChatResponsePool provides a pool for Anthropic chat response objects.
35-
var anthropicChatResponsePool = sync.Pool{
36-
New: func() interface{} {
37-
return &anthropic.AnthropicMessageResponse{}
38-
},
39-
}
40-
41-
// anthropicTextResponsePool provides a pool for Anthropic text response objects.
42-
var anthropicTextResponsePool = sync.Pool{
43-
New: func() interface{} {
44-
return &anthropic.AnthropicTextResponse{}
45-
},
46-
}
47-
48-
// acquireAnthropicChatResponse gets an Anthropic chat response from the pool and resets it.
49-
func acquireAnthropicChatResponse() *anthropic.AnthropicMessageResponse {
50-
resp := anthropicChatResponsePool.Get().(*anthropic.AnthropicMessageResponse)
51-
*resp = anthropic.AnthropicMessageResponse{} // Reset the struct
52-
return resp
53-
}
54-
55-
// releaseAnthropicChatResponse returns an Anthropic chat response to the pool.
56-
func releaseAnthropicChatResponse(resp *anthropic.AnthropicMessageResponse) {
57-
if resp != nil {
58-
anthropicChatResponsePool.Put(resp)
59-
}
60-
}
61-
62-
// acquireAnthropicTextResponse gets an Anthropic text response from the pool and resets it.
63-
func acquireAnthropicTextResponse() *anthropic.AnthropicTextResponse {
64-
resp := anthropicTextResponsePool.Get().(*anthropic.AnthropicTextResponse)
65-
*resp = anthropic.AnthropicTextResponse{} // Reset the struct
66-
return resp
67-
}
68-
69-
// releaseAnthropicTextResponse returns an Anthropic text response to the pool.
70-
func releaseAnthropicTextResponse(resp *anthropic.AnthropicTextResponse) {
71-
if resp != nil {
72-
anthropicTextResponsePool.Put(resp)
73-
}
74-
}
75-
7633
// NewAnthropicProvider creates a new Anthropic provider instance.
7734
// It initializes the HTTP client with the provided configuration and sets up response pools.
7835
// The client is configured with timeouts, concurrency limits, and optional proxy settings.
@@ -92,8 +49,8 @@ func NewAnthropicProvider(config *schemas.ProviderConfig, logger schemas.Logger)
9249

9350
// Pre-warm response pools
9451
for i := 0; i < config.ConcurrencyAndBufferSize.Concurrency; i++ {
95-
anthropicTextResponsePool.Put(&anthropic.AnthropicTextResponse{})
96-
anthropicChatResponsePool.Put(&anthropic.AnthropicMessageResponse{})
52+
anthropic.ReleaseTextResponse(&anthropic.AnthropicTextResponse{})
53+
anthropic.ReleaseChatResponse(&anthropic.AnthropicMessageResponse{})
9754
}
9855

9956
// Configure proxy if provided
@@ -186,6 +143,7 @@ func (provider *AnthropicProvider) TextCompletion(ctx context.Context, key schem
186143
if reqBody == nil {
187144
return nil, newBifrostOperationError("text completion input is not provided", nil, provider.GetProviderKey())
188145
}
146+
defer anthropic.ReleaseTextRequest(reqBody)
189147

190148
// Use struct directly for JSON marshaling
191149
responseBody, err := provider.completeRequest(ctx, reqBody, provider.networkConfig.BaseURL+"/v1/complete", key.Value)
@@ -194,8 +152,8 @@ func (provider *AnthropicProvider) TextCompletion(ctx context.Context, key schem
194152
}
195153

196154
// Create response object from pool
197-
response := acquireAnthropicTextResponse()
198-
defer releaseAnthropicTextResponse(response)
155+
response := anthropic.AcquireTextResponse()
156+
defer anthropic.ReleaseTextResponse(response)
199157

200158
rawResponse, bifrostErr := handleProviderResponse(responseBody, response, provider.sendBackRawResponse)
201159
if bifrostErr != nil {
@@ -237,6 +195,7 @@ func (provider *AnthropicProvider) ChatCompletion(ctx context.Context, key schem
237195
if reqBody == nil {
238196
return nil, newBifrostOperationError("chat completion input is not provided", nil, provider.GetProviderKey())
239197
}
198+
defer anthropic.ReleaseChatRequest(reqBody)
240199

241200
// Use struct directly for JSON marshaling
242201
responseBody, err := provider.completeRequest(ctx, reqBody, provider.networkConfig.BaseURL+"/v1/messages", key.Value)
@@ -245,8 +204,8 @@ func (provider *AnthropicProvider) ChatCompletion(ctx context.Context, key schem
245204
}
246205

247206
// Create response object from pool
248-
response := acquireAnthropicChatResponse()
249-
defer releaseAnthropicChatResponse(response)
207+
response := anthropic.AcquireChatResponse()
208+
defer anthropic.ReleaseChatResponse(response)
250209

251210
rawResponse, bifrostErr := handleProviderResponse(responseBody, response, provider.sendBackRawResponse)
252211
if bifrostErr != nil {
@@ -282,6 +241,7 @@ func (provider *AnthropicProvider) Responses(ctx context.Context, key schemas.Ke
282241
if reqBody == nil {
283242
return nil, newBifrostOperationError("responses input is not provided", nil, provider.GetProviderKey())
284243
}
244+
defer anthropic.ReleaseChatRequest(reqBody) // ToAnthropicResponsesRequest returns *AnthropicMessageRequest
285245

286246
// Use struct directly for JSON marshaling
287247
responseBody, err := provider.completeRequest(ctx, reqBody, provider.networkConfig.BaseURL+"/v1/messages", key.Value)
@@ -290,8 +250,8 @@ func (provider *AnthropicProvider) Responses(ctx context.Context, key schemas.Ke
290250
}
291251

292252
// Create response object from pool
293-
response := acquireAnthropicChatResponse()
294-
defer releaseAnthropicChatResponse(response)
253+
response := anthropic.AcquireChatResponse()
254+
defer anthropic.ReleaseChatResponse(response)
295255

296256
rawResponse, bifrostErr := handleProviderResponse(responseBody, response, provider.sendBackRawResponse)
297257
if bifrostErr != nil {
@@ -332,6 +292,7 @@ func (provider *AnthropicProvider) ChatCompletionStream(ctx context.Context, pos
332292
if reqBody == nil {
333293
return nil, newBifrostOperationError("failed to convert request", fmt.Errorf("conversion returned nil"), provider.GetProviderKey())
334294
}
295+
defer anthropic.ReleaseChatRequest(reqBody)
335296
reqBody.Stream = schemas.Ptr(true)
336297

337298
// Prepare Anthropic headers

core/providers/azure.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ func NewAzureProvider(config *schemas.ProviderConfig, logger schemas.Logger) (*A
4343
Timeout: time.Second * time.Duration(config.NetworkConfig.DefaultRequestTimeoutInSeconds),
4444
}
4545

46+
// Pre-warm OpenAI pools since Azure uses OpenAI schema objects
47+
for i := 0; i < config.ConcurrencyAndBufferSize.Concurrency; i++ {
48+
openai.ReleaseTextRequest(&openai.OpenAITextCompletionRequest{})
49+
openai.ReleaseChatRequest(&openai.OpenAIChatRequest{})
50+
openai.ReleaseEmbeddingRequest(&openai.OpenAIEmbeddingRequest{})
51+
}
52+
4653
// Configure proxy if provided
4754
client = configureProxy(client, config.ProxyConfig, logger)
4855

@@ -151,6 +158,7 @@ func (provider *AzureProvider) TextCompletion(ctx context.Context, key schemas.K
151158
if reqBody == nil {
152159
return nil, newBifrostOperationError("text completion input is not provided", nil, schemas.Azure)
153160
}
161+
defer openai.ReleaseTextRequest(reqBody)
154162

155163
responseBody, err := provider.completeRequest(ctx, reqBody, "completions", key, request.Model)
156164
if err != nil {
@@ -240,6 +248,7 @@ func (provider *AzureProvider) ChatCompletion(ctx context.Context, key schemas.K
240248
if reqBody == nil {
241249
return nil, newBifrostOperationError("chat completion input is not provided", nil, schemas.Azure)
242250
}
251+
defer openai.ReleaseChatRequest(reqBody)
243252

244253
responseBody, err := provider.completeRequest(ctx, reqBody, "chat/completions", key, request.Model)
245254
if err != nil {
@@ -296,6 +305,7 @@ func (provider *AzureProvider) Embedding(ctx context.Context, key schemas.Key, r
296305
if reqBody == nil {
297306
return nil, newBifrostOperationError("embedding input is not provided", nil, schemas.Azure)
298307
}
308+
defer openai.ReleaseEmbeddingRequest(reqBody)
299309

300310
responseBody, err := provider.completeRequest(ctx, reqBody, "embeddings", key, request.Model)
301311
if err != nil {

core/providers/cohere.go

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"fmt"
1111
"io"
1212
"strings"
13-
"sync"
1413
"time"
1514

1615
"net/http"
@@ -21,26 +20,7 @@ import (
2120
"github.com/valyala/fasthttp"
2221
)
2322

24-
// cohereResponsePool provides a pool for Cohere v2 response objects.
25-
var cohereResponsePool = sync.Pool{
26-
New: func() interface{} {
27-
return &cohere.CohereChatResponse{}
28-
},
29-
}
30-
31-
// acquireCohereResponse gets a Cohere v2 response from the pool and resets it.
32-
func acquireCohereResponse() *cohere.CohereChatResponse {
33-
resp := cohereResponsePool.Get().(*cohere.CohereChatResponse)
34-
*resp = cohere.CohereChatResponse{} // Reset the struct
35-
return resp
36-
}
37-
38-
// releaseCohereResponse returns a Cohere v2 response to the pool.
39-
func releaseCohereResponse(resp *cohere.CohereChatResponse) {
40-
if resp != nil {
41-
cohereResponsePool.Put(resp)
42-
}
43-
}
23+
// Removed deprecated cohereResponsePool - now using schema-level pools
4424

4525
// CohereProvider implements the Provider interface for Cohere.
4626
type CohereProvider struct {
@@ -71,7 +51,8 @@ func NewCohereProvider(config *schemas.ProviderConfig, logger schemas.Logger) *C
7151

7252
// Pre-warm response pools
7353
for i := 0; i < config.ConcurrencyAndBufferSize.Concurrency; i++ {
74-
cohereResponsePool.Put(&cohere.CohereChatResponse{})
54+
cohere.ReleaseChatRequest(&cohere.CohereChatRequest{})
55+
cohere.ReleaseEmbeddingRequest(&cohere.CohereEmbeddingRequest{})
7556
}
7657

7758
// Set default BaseURL if not provided
@@ -124,6 +105,7 @@ func (provider *CohereProvider) ChatCompletion(ctx context.Context, key schemas.
124105
if reqBody == nil {
125106
return nil, newBifrostOperationError("chat completion input is not provided", nil, providerName)
126107
}
108+
defer cohere.ReleaseChatRequest(reqBody)
127109

128110
cohereResponse, rawResponse, err := provider.handleCohereChatCompletionRequest(ctx, reqBody, key)
129111
if err != nil {
@@ -235,6 +217,7 @@ func (provider *CohereProvider) Responses(ctx context.Context, key schemas.Key,
235217
if reqBody == nil {
236218
return nil, newBifrostOperationError("responses input is not provided", nil, providerName)
237219
}
220+
defer cohere.ReleaseChatRequest(reqBody) // ToCohereResponsesRequest returns *CohereChatRequest
238221

239222
cohereResponse, rawResponse, err := provider.handleCohereChatCompletionRequest(ctx, reqBody, key)
240223
if err != nil {
@@ -271,6 +254,7 @@ func (provider *CohereProvider) Embedding(ctx context.Context, key schemas.Key,
271254
if reqBody == nil {
272255
return nil, newBifrostOperationError("embedding input is not provided", nil, providerName)
273256
}
257+
defer cohere.ReleaseEmbeddingRequest(reqBody)
274258

275259
// Marshal request body
276260
jsonBody, err := sonic.Marshal(reqBody)
@@ -353,6 +337,7 @@ func (provider *CohereProvider) ChatCompletionStream(ctx context.Context, postHo
353337
if reqBody == nil {
354338
return nil, newBifrostOperationError("chat completion input is not provided", nil, providerName)
355339
}
340+
defer cohere.ReleaseChatRequest(reqBody)
356341
reqBody.Stream = schemas.Ptr(true)
357342

358343
jsonBody, err := sonic.Marshal(reqBody)

core/providers/openai.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func handleOpenAITextCompletionRequest(
111111
if reqBody == nil {
112112
return nil, newBifrostOperationError("text completion input is not provided", nil, providerName)
113113
}
114+
defer openai.ReleaseTextRequest(reqBody)
114115
// Create request
115116
req := fasthttp.AcquireRequest()
116117
resp := fasthttp.AcquireResponse()
@@ -206,6 +207,7 @@ func handleOpenAITextCompletionStreaming(
206207
if reqBody == nil {
207208
return nil, newBifrostOperationError("text completion input is not provided", nil, providerName)
208209
}
210+
defer openai.ReleaseTextRequest(reqBody)
209211
reqBody.Stream = schemas.Ptr(true)
210212
reqBody.StreamOptions = &schemas.ChatStreamOptions{
211213
IncludeUsage: schemas.Ptr(true),
@@ -436,6 +438,7 @@ func handleOpenAIChatCompletionRequest(
436438
if reqBody == nil {
437439
return nil, newBifrostOperationError("chat completion input is not provided", nil, providerName)
438440
}
441+
defer openai.ReleaseChatRequest(reqBody)
439442

440443
jsonBody, err := sonic.Marshal(reqBody)
441444
if err != nil {
@@ -530,6 +533,7 @@ func handleOpenAIResponsesRequest(
530533
if reqBody == nil {
531534
return nil, newBifrostOperationError("responses input is not provided", nil, providerName)
532535
}
536+
defer openai.ReleaseResponsesRequest(reqBody)
533537

534538
jsonBody, err := sonic.Marshal(reqBody)
535539
if err != nil {
@@ -630,6 +634,7 @@ func handleOpenAIEmbeddingRequest(
630634
if reqBody == nil {
631635
return nil, newBifrostOperationError("embedding input is not provided", nil, providerName)
632636
}
637+
defer openai.ReleaseEmbeddingRequest(reqBody)
633638

634639
jsonBody, err := sonic.Marshal(reqBody)
635640
if err != nil {
@@ -726,6 +731,7 @@ func handleOpenAIStreaming(
726731
if reqBody == nil {
727732
return nil, newBifrostOperationError("chat completion input is not provided", nil, providerName)
728733
}
734+
defer openai.ReleaseChatRequest(reqBody)
729735
reqBody.Stream = schemas.Ptr(true)
730736
reqBody.StreamOptions = &schemas.ChatStreamOptions{
731737
IncludeUsage: schemas.Ptr(true),
@@ -946,6 +952,7 @@ func (provider *OpenAIProvider) Speech(ctx context.Context, key schemas.Key, req
946952
if reqBody == nil {
947953
return nil, newBifrostOperationError("speech input is not provided", nil, providerName)
948954
}
955+
defer openai.ReleaseSpeechRequest(reqBody)
949956

950957
jsonBody, err := sonic.Marshal(reqBody)
951958
if err != nil {
@@ -1017,6 +1024,7 @@ func (provider *OpenAIProvider) SpeechStream(ctx context.Context, postHookRunner
10171024
if reqBody == nil {
10181025
return nil, newBifrostOperationError("speech input is not provided", nil, providerName)
10191026
}
1027+
defer openai.ReleaseSpeechRequest(reqBody)
10201028
reqBody.StreamFormat = schemas.Ptr("sse")
10211029

10221030
jsonBody, err := sonic.Marshal(reqBody)
@@ -1197,6 +1205,7 @@ func (provider *OpenAIProvider) Transcription(ctx context.Context, key schemas.K
11971205
if reqBody == nil {
11981206
return nil, newBifrostOperationError("transcription input is not provided", nil, providerName)
11991207
}
1208+
defer openai.ReleaseTranscriptionRequest(reqBody)
12001209

12011210
// Create multipart form
12021211
var body bytes.Buffer
@@ -1283,6 +1292,7 @@ func (provider *OpenAIProvider) TranscriptionStream(ctx context.Context, postHoo
12831292
if reqBody == nil {
12841293
return nil, newBifrostOperationError("transcription input is not provided", nil, providerName)
12851294
}
1295+
defer openai.ReleaseTranscriptionRequest(reqBody)
12861296
reqBody.Stream = schemas.Ptr(true)
12871297

12881298
// Create multipart form

core/providers/vertex.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,8 @@ func NewVertexProvider(config *schemas.ProviderConfig, logger schemas.Logger) (*
7070

7171
// Pre-warm response pools
7272
for range config.ConcurrencyAndBufferSize.Concurrency {
73-
// openAIResponsePool.Put(&schemas.BifrostResponse{})
74-
anthropicChatResponsePool.Put(&anthropic.AnthropicMessageResponse{})
75-
73+
vertex.ReleaseEmbeddingRequest(&vertex.VertexEmbeddingRequest{})
74+
vertex.ReleaseEmbeddingResponse(&vertex.VertexEmbeddingResponse{})
7675
}
7776

7877
return &VertexProvider{
@@ -222,7 +221,7 @@ func (provider *VertexProvider) ChatCompletion(ctx context.Context, key schemas.
222221
},
223222
}
224223
}
225-
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
224+
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
226225
return nil, newBifrostOperationError(schemas.ErrProviderRequestTimedOut, err, schemas.Vertex)
227226
}
228227
return nil, &schemas.BifrostError{
@@ -259,7 +258,7 @@ func (provider *VertexProvider) ChatCompletion(ctx context.Context, key schemas.
259258
},
260259
}
261260
}
262-
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
261+
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
263262
return nil, newBifrostOperationError(schemas.ErrProviderRequestTimedOut, err, schemas.Vertex)
264263
}
265264
// Remove client from pool for non-context errors (could be auth/network issues)
@@ -300,8 +299,8 @@ func (provider *VertexProvider) ChatCompletion(ctx context.Context, key schemas.
300299

301300
if strings.Contains(request.Model, "claude") {
302301
// Create response object from pool
303-
response := acquireAnthropicChatResponse()
304-
defer releaseAnthropicChatResponse(response)
302+
response := anthropic.AcquireChatResponse()
303+
defer anthropic.ReleaseChatResponse(response)
305304

306305
rawResponse, bifrostErr := handleProviderResponse(body, response, provider.sendBackRawResponse)
307306
if bifrostErr != nil {
@@ -383,6 +382,7 @@ func (provider *VertexProvider) Embedding(ctx context.Context, key schemas.Key,
383382
if reqBody == nil {
384383
return nil, newConfigurationError("embedding input texts are empty", schemas.Vertex)
385384
}
385+
defer vertex.ReleaseEmbeddingRequest(reqBody)
386386

387387
// All Vertex AI embedding models use the same native Vertex embedding API
388388
return provider.handleVertexEmbedding(ctx, request.Model, key, reqBody, request.Params)
@@ -414,7 +414,7 @@ func (provider *VertexProvider) handleVertexEmbedding(ctx context.Context, model
414414
},
415415
}
416416
}
417-
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
417+
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
418418
return nil, newBifrostOperationError(schemas.ErrProviderRequestTimedOut, err, schemas.Vertex)
419419
}
420420
return nil, newBifrostOperationError(schemas.ErrProviderRequest, err, schemas.Vertex)
@@ -445,7 +445,7 @@ func (provider *VertexProvider) handleVertexEmbedding(ctx context.Context, model
445445
},
446446
}
447447
}
448-
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
448+
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
449449
return nil, newBifrostOperationError(schemas.ErrProviderRequestTimedOut, err, schemas.Vertex)
450450
}
451451
// Remove client from pool for non-context errors (could be auth/network issues)

core/schemas/pool.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package schemas

0 commit comments

Comments
 (0)