Skip to content

Commit 8b5d7e8

Browse files
authored
Disable go-metrics for Kafka to prevent memory leak (#51)
1 parent f61ec91 commit 8b5d7e8

File tree

4 files changed

+20
-0
lines changed

4 files changed

+20
-0
lines changed

util/kafka/kafka_consumer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,10 @@ func NewKafkaConsumerGroup(cfg KafkaConsumerConfig) (*KafkaConsumerGroup, error)
416416
config := sarama.NewConfig()
417417
config.Consumer.Return.Errors = true
418418

419+
// Disable go-metrics to prevent memory leak from exponential decay sample heap
420+
// See: https://github.com/IBM/sarama/issues/1321
421+
config.MetricRegistry = nil
422+
419423
// Enable Sarama debug logging for consumer diagnostics (only if configured)
420424
// By default, SARAMA logs are too verbose and not needed in production
421425
if cfg.EnableDebugLogging {

util/kafka/kafka_health.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ func HealthChecker(_ context.Context, brokersURL []string) func(ctx context.Cont
5252
config.Net.ReadTimeout = 100 * time.Millisecond
5353
config.Net.WriteTimeout = 100 * time.Millisecond
5454
config.Metadata.Retry.Max = 0
55+
56+
// Disable go-metrics to prevent memory leak from exponential decay sample heap
57+
// See: https://github.com/IBM/sarama/issues/1321
58+
config.MetricRegistry = nil
5559
config.Metadata.Full = true
5660
config.Metadata.AllowAutoTopicCreation = false
5761

util/kafka/kafka_producer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ func NewKafkaProducer(kafkaURL *url.URL, kafkaSettings *settings.KafkaSettings)
119119
config := sarama.NewConfig()
120120
config.Version = sarama.V2_1_0_0
121121

122+
// Disable go-metrics to prevent memory leak from exponential decay sample heap
123+
// See: https://github.com/IBM/sarama/issues/1321
124+
config.MetricRegistry = nil
125+
122126
// Note: Debug logging not supported for sync producer as it doesn't have a logger parameter
123127
// If needed, add a logger parameter to NewKafkaProducer function
124128

@@ -199,6 +203,10 @@ func ConnectProducer(brokersURL []string, topic string, partitions int32, kafkaS
199203
config.Producer.Retry.Max = 5
200204
config.Producer.Partitioner = sarama.NewManualPartitioner
201205

206+
// Disable go-metrics to prevent memory leak from exponential decay sample heap
207+
// See: https://github.com/IBM/sarama/issues/1321
208+
config.MetricRegistry = nil
209+
202210
// Apply authentication settings if kafkaSettings provided and TLS is enabled
203211
if kafkaSettings != nil && kafkaSettings.EnableTLS {
204212
if err := configureKafkaAuthFromFields(config, kafkaSettings.EnableTLS, kafkaSettings.TLSSkipVerify,

util/kafka/kafka_producer_async.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ func NewKafkaAsyncProducer(logger ulogger.Logger, cfg KafkaProducerConfig) (*Kaf
191191
config.Producer.Flush.Frequency = cfg.FlushFrequency
192192
// config.Producer.Return.Successes = true
193193

194+
// Disable go-metrics to prevent memory leak from exponential decay sample heap
195+
// See: https://github.com/IBM/sarama/issues/1321
196+
config.MetricRegistry = nil
197+
194198
// Enable Sarama debug logging if configured
195199
if cfg.EnableDebugLogging {
196200
sarama.Logger = &saramaLoggerAdapter{logger: logger}

0 commit comments

Comments
 (0)