Skip to content

Commit a42a27c

Browse files
authored
Disable go-metrics for Kafka to prevent memory leak (#53)
1 parent 78c4d5c commit a42a27c

File tree

4 files changed

+8
-20
lines changed

4 files changed

+8
-20
lines changed

util/kafka/kafka_consumer.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -416,10 +416,6 @@ func NewKafkaConsumerGroup(cfg KafkaConsumerConfig) (*KafkaConsumerGroup, error)
416416
config := sarama.NewConfig()
417417
config.Consumer.Return.Errors = true
418418

419-
// Disable go-metrics to prevent memory leak from exponential decay sample heap
420-
// See: https://github.com/IBM/sarama/issues/1321
421-
config.MetricRegistry = nil
422-
423419
// Enable Sarama debug logging for consumer diagnostics (only if configured)
424420
// By default, SARAMA logs are too verbose and not needed in production
425421
if cfg.EnableDebugLogging {

util/kafka/kafka_health.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@ func HealthChecker(_ context.Context, brokersURL []string) func(ctx context.Cont
5252
config.Net.ReadTimeout = 100 * time.Millisecond
5353
config.Net.WriteTimeout = 100 * time.Millisecond
5454
config.Metadata.Retry.Max = 0
55-
56-
// Disable go-metrics to prevent memory leak from exponential decay sample heap
57-
// See: https://github.com/IBM/sarama/issues/1321
58-
config.MetricRegistry = nil
5955
config.Metadata.Full = true
6056
config.Metadata.AllowAutoTopicCreation = false
6157

util/kafka/kafka_producer.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,6 @@ func NewKafkaProducer(kafkaURL *url.URL, kafkaSettings *settings.KafkaSettings)
119119
config := sarama.NewConfig()
120120
config.Version = sarama.V2_1_0_0
121121

122-
// Disable go-metrics to prevent memory leak from exponential decay sample heap
123-
// See: https://github.com/IBM/sarama/issues/1321
124-
config.MetricRegistry = nil
125-
126122
// Note: Debug logging not supported for sync producer as it doesn't have a logger parameter
127123
// If needed, add a logger parameter to NewKafkaProducer function
128124

@@ -203,10 +199,6 @@ func ConnectProducer(brokersURL []string, topic string, partitions int32, kafkaS
203199
config.Producer.Retry.Max = 5
204200
config.Producer.Partitioner = sarama.NewManualPartitioner
205201

206-
// Disable go-metrics to prevent memory leak from exponential decay sample heap
207-
// See: https://github.com/IBM/sarama/issues/1321
208-
config.MetricRegistry = nil
209-
210202
// Apply authentication settings if kafkaSettings provided and TLS is enabled
211203
if kafkaSettings != nil && kafkaSettings.EnableTLS {
212204
if err := configureKafkaAuthFromFields(config, kafkaSettings.EnableTLS, kafkaSettings.TLSSkipVerify,

util/kafka/kafka_producer_async.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,16 @@ import (
2121
"github.com/bsv-blockchain/teranode/util"
2222
inmemorykafka "github.com/bsv-blockchain/teranode/util/kafka/in_memory_kafka"
2323
"github.com/bsv-blockchain/teranode/util/retry"
24+
"github.com/rcrowley/go-metrics"
2425
)
2526

27+
// init disables go-metrics globally to prevent memory leak from exponential decay sample heap.
28+
// This must be set before any Sarama clients are created.
29+
// See: https://github.com/IBM/sarama/issues/1321
30+
func init() {
31+
metrics.UseNilMetrics = true
32+
}
33+
2634
// KafkaAsyncProducerI defines the interface for asynchronous Kafka producer operations.
2735
type KafkaAsyncProducerI interface {
2836
// Start begins the async producer operation with the given message channel
@@ -191,10 +199,6 @@ func NewKafkaAsyncProducer(logger ulogger.Logger, cfg KafkaProducerConfig) (*Kaf
191199
config.Producer.Flush.Frequency = cfg.FlushFrequency
192200
// config.Producer.Return.Successes = true
193201

194-
// Disable go-metrics to prevent memory leak from exponential decay sample heap
195-
// See: https://github.com/IBM/sarama/issues/1321
196-
config.MetricRegistry = nil
197-
198202
// Enable Sarama debug logging if configured
199203
if cfg.EnableDebugLogging {
200204
sarama.Logger = &saramaLoggerAdapter{logger: logger}

0 commit comments

Comments
 (0)