From b088b71dd89817288079eec56d62f46f6bfddbca Mon Sep 17 00:00:00 2001 From: Marco Ferrer <35935108+marcoferrer@users.noreply.github.com> Date: Thu, 24 Apr 2025 16:44:16 -0400 Subject: [PATCH 1/2] make ErrMsgTimedOut count as timeout error --- kafka/error.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/error.go b/kafka/error.go index 06c94bb4e..a75877293 100644 --- a/kafka/error.go +++ b/kafka/error.go @@ -130,7 +130,7 @@ func (e Error) IsRetriable() bool { // IsTimeout returns true if the error is a timeout error. // A timeout error indicates that the operation timed out locally. func (e Error) IsTimeout() bool { - return e.code == ErrTimedOut || e.code == ErrTimedOutQueue + return e.code == ErrTimedOut || e.code == ErrTimedOutQueue || e.code == ErrMsgTimedOut } // TxnRequiresAbort returns true if the error is an abortable transaction error From bd61136aa5548ca5eb36b974a061ee0154eb2286 Mon Sep 17 00:00:00 2001 From: Marco Ferrer <35935108+marcoferrer@users.noreply.github.com> Date: Thu, 24 Apr 2025 17:14:58 -0400 Subject: [PATCH 2/2] Update error_test.go --- kafka/error_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/kafka/error_test.go b/kafka/error_test.go index a2756bf56..50a8ee32b 100644 --- a/kafka/error_test.go +++ b/kafka/error_test.go @@ -37,6 +37,15 @@ func TestFatalError(t *testing.T) { t.Logf("%v", normalErr) } +// TestIsTimeoutError tests timeout errors +func TestIsTimeoutError(t *testing.T) { + err := newErrorFromString(ErrMsgTimedOut, "Testing timeout error") + if !err.IsTimeout() { + t.Errorf("Expected IsTimeout() to return true for %v", err) + } + t.Logf("%v", err) +} + // TestFatalErrorClient tests fatal errors using a client instance func TestFatalErrorClient(t *testing.T) { p, err := NewProducer(&ConfigMap{})