Skip to content

Commit 9415772

Browse files
authored
Clear SSE pending message when message is sent to queue (#2452)
Without this change messages were stuck in a pending state indefinitely once the queue was full. The same messages were added to the queue over and over again. Fix #2389
1 parent a1b4ed0 commit 9415772

File tree

2 files changed

+45
-2
lines changed

2 files changed

+45
-2
lines changed

pkg/transport/proxy/httpsse/http_proxy.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ func (p *HTTPSSEProxy) processPendingMessages(clientID string, messageCh chan<-
488488
}
489489

490490
// Find messages for this client (all messages for now)
491-
for _, pendingMsg := range p.pendingMessages {
491+
for i, pendingMsg := range p.pendingMessages {
492492
// Convert to SSE string
493493
sseString := pendingMsg.Message.ToSSEString()
494494

@@ -498,7 +498,10 @@ func (p *HTTPSSEProxy) processPendingMessages(clientID string, messageCh chan<-
498498
// Message sent successfully
499499
default:
500500
// Channel is full, stop sending
501-
logger.Errorf("Failed to send pending message to client %s (channel full)", clientID)
501+
logger.Errorf("Client %s channel full after sending %d/%d pending messages",
502+
clientID, i, len(p.pendingMessages))
503+
// Remove successfully sent messages and keep the rest
504+
p.pendingMessages = p.pendingMessages[i:]
502505
return
503506
}
504507
}

pkg/transport/proxy/httpsse/http_proxy_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,46 @@ func TestProcessPendingMessages(t *testing.T) {
298298
proxy.pendingMutex.Unlock()
299299
}
300300

301+
// TestProcessPendingMessages_ChannelFull tests partial delivery when channel is full
302+
//
303+
//nolint:paralleltest // Test modifies shared proxy state
304+
func TestProcessPendingMessages_ChannelFull(t *testing.T) {
305+
proxy := NewHTTPSSEProxy("localhost", 8080, false, nil)
306+
307+
// Add 10 pending messages
308+
for i := 0; i < 10; i++ {
309+
msg := ssecommon.NewSSEMessage("test", fmt.Sprintf("data-%d", i))
310+
proxy.pendingMutex.Lock()
311+
proxy.pendingMessages = append(proxy.pendingMessages, ssecommon.NewPendingSSEMessage(msg))
312+
proxy.pendingMutex.Unlock()
313+
}
314+
315+
// Create a client channel that can only hold 3 messages
316+
messageCh := make(chan string, 3)
317+
318+
// Process pending messages
319+
proxy.processPendingMessages("client-1", messageCh)
320+
321+
// Verify only 3 messages were sent
322+
assert.Len(t, messageCh, 3)
323+
324+
// Verify 7 messages remain pending for reconnection
325+
proxy.pendingMutex.Lock()
326+
assert.Len(t, proxy.pendingMessages, 7)
327+
proxy.pendingMutex.Unlock()
328+
329+
// Reconnected client should receive the remaining messages
330+
messageCh2 := make(chan string, 10)
331+
proxy.processPendingMessages("client-1", messageCh2)
332+
333+
assert.Len(t, messageCh2, 7)
334+
335+
// Verify all pending messages are now cleared
336+
proxy.pendingMutex.Lock()
337+
assert.Empty(t, proxy.pendingMessages)
338+
proxy.pendingMutex.Unlock()
339+
}
340+
301341
// TestHandleSSEConnection tests the SSE connection handler
302342
//
303343
//nolint:paralleltest // Test uses HTTP test server

0 commit comments

Comments
 (0)