From 0a5268a41d8a890bbf2969d5d6fbe858dd226374 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Thu, 6 Nov 2025 20:26:47 +0000 Subject: [PATCH 1/2] refactor: Simplify SafeQueue interface This commit refactors the `framework.SafeQueue` interface to remove error returns from methods where failure is not expected in normal operation (Add, PeekHead, PeekTail, Drain, Cleanup). The interface now relies on the following premises: - Queues are unbounded and in-memory. - OOM is a panic, not a handled error. - Internal callers are trusted not to provide nil items. - PeekHead/PeekTail return nil for empty queues. This change simplifies the interface and reduces unnecessary error checking on the hot path. This commit updates the queue implementations (ListQueue, MaxMinHeap), mocks, and all callers within the `pkg/epp/flowcontrol/framework/...` tree to conform to the new interface. A subsequent commit will address callers outside this tree. --- pkg/epp/flowcontrol/framework/errors.go | 7 -- pkg/epp/flowcontrol/framework/mocks/mocks.go | 39 +++--- .../interflow/dispatch/besthead/besthead.go | 4 +- .../dispatch/besthead/besthead_test.go | 30 ++--- .../interflow/dispatch/functional_test.go | 6 +- .../policies/intraflow/dispatch/fcfs/fcfs.go | 8 +- .../intraflow/dispatch/functional_test.go | 5 +- .../framework/plugins/queue/benchmark_test.go | 62 ++++----- .../plugins/queue/functional_test.go | 118 ++++++------------ .../framework/plugins/queue/listqueue.go | 27 ++-- .../framework/plugins/queue/maxminheap.go | 33 +++-- .../plugins/queue/maxminheap_test.go | 9 +- pkg/epp/flowcontrol/framework/queue.go | 59 ++++----- 13 files changed, 149 insertions(+), 258 deletions(-) diff --git a/pkg/epp/flowcontrol/framework/errors.go b/pkg/epp/flowcontrol/framework/errors.go index fb19c4aab..79d62acc8 100644 --- a/pkg/epp/flowcontrol/framework/errors.go +++ b/pkg/epp/flowcontrol/framework/errors.go @@ -26,13 +26,6 @@ import ( // and might be handled or wrapped by the `contracts.FlowRegistry`'s `contracts.ManagedQueue` or the // `controller.FlowController`. var ( - // ErrNilQueueItem indicates that a nil `types.QueueItemAccessor` was passed to `SafeQueue.Add()`. - ErrNilQueueItem = errors.New("queue item cannot be nil") - - // ErrQueueEmpty indicates an attempt to perform an operation on an empty `SafeQueue` that requires one or more items - // (e.g., calling `SafeQueue.PeekHead()`). - ErrQueueEmpty = errors.New("queue is empty") - // ErrInvalidQueueItemHandle indicates that a `types.QueueItemHandle` provided to a `SafeQueue` operation (e.g., // `SafeQueue.Remove()`) is not valid for that queue, has been invalidated, or does not correspond to an actual item // in the queue. diff --git a/pkg/epp/flowcontrol/framework/mocks/mocks.go b/pkg/epp/flowcontrol/framework/mocks/mocks.go index ff8441fde..29eead472 100644 --- a/pkg/epp/flowcontrol/framework/mocks/mocks.go +++ b/pkg/epp/flowcontrol/framework/mocks/mocks.go @@ -38,9 +38,7 @@ type MockFlowQueueAccessor struct { LenV int ByteSizeV uint64 PeekHeadV types.QueueItemAccessor - PeekHeadErrV error PeekTailV types.QueueItemAccessor - PeekTailErrV error FlowKeyV types.FlowKey ComparatorV framework.ItemComparator CapabilitiesV []framework.QueueCapability @@ -53,12 +51,12 @@ func (m *MockFlowQueueAccessor) Comparator() framework.ItemComparator { ret func (m *MockFlowQueueAccessor) FlowKey() types.FlowKey { return m.FlowKeyV } func (m *MockFlowQueueAccessor) Capabilities() []framework.QueueCapability { return m.CapabilitiesV } -func (m *MockFlowQueueAccessor) PeekHead() (types.QueueItemAccessor, error) { - return m.PeekHeadV, m.PeekHeadErrV +func (m *MockFlowQueueAccessor) PeekHead() types.QueueItemAccessor { + return m.PeekHeadV } -func (m *MockFlowQueueAccessor) PeekTail() (types.QueueItemAccessor, error) { - return m.PeekTailV, m.PeekTailErrV +func (m *MockFlowQueueAccessor) PeekTail() types.QueueItemAccessor { + return m.PeekTailV } var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{} @@ -108,13 +106,11 @@ type MockSafeQueue struct { LenV int ByteSizeV uint64 PeekHeadV types.QueueItemAccessor - PeekHeadErrV error PeekTailV types.QueueItemAccessor - PeekTailErrV error - AddFunc func(item types.QueueItemAccessor) error + AddFunc func(item types.QueueItemAccessor) RemoveFunc func(handle types.QueueItemHandle) (types.QueueItemAccessor, error) - CleanupFunc func(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) - DrainFunc func() ([]types.QueueItemAccessor, error) + CleanupFunc func(predicate framework.PredicateFunc) []types.QueueItemAccessor + DrainFunc func() []types.QueueItemAccessor } func (m *MockSafeQueue) Name() string { return m.NameV } @@ -122,19 +118,18 @@ func (m *MockSafeQueue) Capabilities() []framework.QueueCapability { return m.Ca func (m *MockSafeQueue) Len() int { return m.LenV } func (m *MockSafeQueue) ByteSize() uint64 { return m.ByteSizeV } -func (m *MockSafeQueue) PeekHead() (types.QueueItemAccessor, error) { - return m.PeekHeadV, m.PeekHeadErrV +func (m *MockSafeQueue) PeekHead() types.QueueItemAccessor { + return m.PeekHeadV } -func (m *MockSafeQueue) PeekTail() (types.QueueItemAccessor, error) { - return m.PeekTailV, m.PeekTailErrV +func (m *MockSafeQueue) PeekTail() types.QueueItemAccessor { + return m.PeekTailV } -func (m *MockSafeQueue) Add(item types.QueueItemAccessor) error { +func (m *MockSafeQueue) Add(item types.QueueItemAccessor) { if m.AddFunc != nil { - return m.AddFunc(item) + m.AddFunc(item) } - return nil } func (m *MockSafeQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) { @@ -144,18 +139,18 @@ func (m *MockSafeQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAcc return nil, nil } -func (m *MockSafeQueue) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) { +func (m *MockSafeQueue) Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor { if m.CleanupFunc != nil { return m.CleanupFunc(predicate) } - return nil, nil + return nil } -func (m *MockSafeQueue) Drain() ([]types.QueueItemAccessor, error) { +func (m *MockSafeQueue) Drain() []types.QueueItemAccessor { if m.DrainFunc != nil { return m.DrainFunc() } - return nil, nil + return nil } var _ framework.SafeQueue = &MockSafeQueue{} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go index 41033e755..ae4cbd606 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go @@ -68,8 +68,8 @@ func (p *bestHead) SelectQueue(band framework.PriorityBandAccessor) (framework.F return true } - item, err := queue.PeekHead() - if err != nil || item == nil { + item := queue.PeekHead() + if item == nil { return true } diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go index 5971d3f0c..288560366 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go @@ -17,7 +17,6 @@ limitations under the License. package besthead import ( - "errors" "testing" "time" @@ -105,10 +104,10 @@ func TestBestHead_SelectQueue(t *testing.T) { ComparatorV: newTestComparator(), } queueEmpty := &frameworkmocks.MockFlowQueueAccessor{ - LenV: 0, - PeekHeadErrV: framework.ErrQueueEmpty, - FlowKeyV: types.FlowKey{ID: "flowEmpty"}, - ComparatorV: newTestComparator(), + LenV: 0, + PeekHeadV: nil, + FlowKeyV: types.FlowKey{ID: "flowEmpty"}, + ComparatorV: newTestComparator(), } testCases := []struct { @@ -151,19 +150,6 @@ func TestBestHead_SelectQueue(t *testing.T) { ), expectedErr: framework.ErrIncompatiblePriorityType, }, - { - name: "QueuePeekHeadErrors", - band: newTestBand( - &frameworkmocks.MockFlowQueueAccessor{ - LenV: 1, - PeekHeadErrV: errors.New("peek error"), - FlowKeyV: flow1Key, - ComparatorV: newTestComparator(), - }, - queue2, - ), - expectedQueueID: flow2ID, - }, { name: "QueueComparatorIsNil", band: newTestBand( @@ -195,10 +181,10 @@ func TestBestHead_SelectQueue(t *testing.T) { band: newTestBand( queueEmpty, &frameworkmocks.MockFlowQueueAccessor{ - LenV: 0, - PeekHeadErrV: framework.ErrQueueEmpty, - FlowKeyV: types.FlowKey{ID: "flowEmpty2"}, - ComparatorV: newTestComparator(), + LenV: 0, + PeekHeadV: nil, + FlowKeyV: types.FlowKey{ID: "flowEmpty2"}, + ComparatorV: newTestComparator(), }, ), }, diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go index 7649b324e..3dba8b7ea 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go @@ -63,9 +63,9 @@ func runSelectQueueConformanceTests(t *testing.T, policy framework.InterFlowDisp flowIDEmpty := "flow-empty" mockQueueEmpty := &frameworkmocks.MockFlowQueueAccessor{ - LenV: 0, - PeekHeadErrV: framework.ErrQueueEmpty, - FlowKeyV: types.FlowKey{ID: flowIDEmpty}, + LenV: 0, + PeekHeadV: nil, + FlowKeyV: types.FlowKey{ID: flowIDEmpty}, } testCases := []struct { diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go index 7addb9d13..63ffe31b9 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go @@ -18,8 +18,6 @@ limitations under the License. package fcfs import ( - "errors" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" @@ -86,11 +84,7 @@ func (p *fcfs) SelectItem(queue framework.FlowQueueAccessor) (types.QueueItemAcc if queue == nil { return nil, nil } - item, err := queue.PeekHead() - if errors.Is(err, framework.ErrQueueEmpty) { - return nil, nil - } - return item, err + return queue.PeekHead(), nil } // Comparator returns a `framework.ItemComparator` based on enqueue time. diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go index 01e8a1c8e..4088b4da2 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go @@ -22,7 +22,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" _ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs" @@ -66,8 +65,8 @@ func TestIntraFlowDispatchPolicyConformance(t *testing.T) { t.Run("SelectItemFromEmptyQueue", func(t *testing.T) { t.Parallel() mockQueue := &frameworkmocks.MockFlowQueueAccessor{ - PeekHeadErrV: framework.ErrQueueEmpty, - LenV: 0, + PeekHeadV: nil, + LenV: 0, } item, err := policy.SelectItem(mockQueue) require.NoError(t, err, "SelectItem from an empty queue for %s should not return an error", policyName) diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go index 24f4a9238..b9461ebf1 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go @@ -70,11 +70,8 @@ func benchmarkAddRemove(b *testing.B, q framework.SafeQueue) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) - err := q.Add(item) - if err != nil { - b.Fatalf("Add failed: %v", err) - } - _, err = q.Remove(item.Handle()) + q.Add(item) + _, err := q.Remove(item.Handle()) if err != nil { b.Fatalf("Remove failed: %v", err) } @@ -87,27 +84,21 @@ func benchmarkAddRemove(b *testing.B, q framework.SafeQueue) { func benchmarkAddPeekRemove(b *testing.B, q framework.SafeQueue) { // Pre-add one item so PeekHead doesn't fail on the first iteration. initialItem := mocks.NewMockQueueItemAccessor(1, "initial", benchmarkFlowKey) - if err := q.Add(initialItem); err != nil { - b.Fatalf("Failed to add initial item: %v", err) - } + q.Add(initialItem) b.ReportAllocs() for b.Loop() { item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) - err := q.Add(item) - if err != nil { - b.Fatalf("Add failed: %v", err) - } - - peeked, err := q.PeekHead() - if err != nil { + q.Add(item) + peeked := q.PeekHead() + if peeked == nil { // In a concurrent benchmark, this could happen if the queue becomes empty. // In a serial one, it's a fatal error. - b.Fatalf("PeekHead failed: %v", err) + b.Fatal("PeekHead failed") } - _, err = q.Remove(peeked.Handle()) + _, err := q.Remove(peeked.Handle()) if err != nil { b.Fatalf("Remove failed: %v", err) } @@ -125,16 +116,14 @@ func benchmarkBulkAddThenBulkRemove(b *testing.B, q framework.SafeQueue) { for j := range items { item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("bulk-%d-%d", i, j), benchmarkFlowKey) items[j] = item - if err := q.Add(item); err != nil { - b.Fatalf("Add failed: %v", err) - } + q.Add(item) } // Remove the same number of items for range items { - peeked, err := q.PeekHead() - if err != nil { - b.Fatalf("PeekHead failed: %v", err) + peeked := q.PeekHead() + if peeked == nil { + b.Fatal("PeekHead failed") } if _, err := q.Remove(peeked.Handle()); err != nil { b.Fatalf("Remove failed: %v", err) @@ -148,25 +137,20 @@ func benchmarkBulkAddThenBulkRemove(b *testing.B, q framework.SafeQueue) { func benchmarkAddPeekTailRemove(b *testing.B, q framework.SafeQueue) { // Pre-add one item so PeekTail doesn't fail on the first iteration. initialItem := mocks.NewMockQueueItemAccessor(1, "initial", benchmarkFlowKey) - if err := q.Add(initialItem); err != nil { - b.Fatalf("Failed to add initial item: %v", err) - } + q.Add(initialItem) b.ReportAllocs() for b.Loop() { item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) - err := q.Add(item) - if err != nil { - b.Fatalf("Add failed: %v", err) - } + q.Add(item) - peeked, err := q.PeekTail() - if err != nil { - b.Fatalf("PeekTail failed: %v", err) + peeked := q.PeekTail() + if peeked == nil { + b.Fatal("PeekTail failed") } - _, err = q.Remove(peeked.Handle()) + _, err := q.Remove(peeked.Handle()) if err != nil { b.Fatalf("Remove failed: %v", err) } @@ -179,9 +163,7 @@ func benchmarkHighContention(b *testing.B, q framework.SafeQueue) { // Pre-fill the queue to ensure consumers have work to do immediately. for i := range 1000 { item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("prefill-%d", i), benchmarkFlowKey) - if err := q.Add(item); err != nil { - b.Fatalf("Failed to pre-fill queue: %v", err) - } + q.Add(item) } stopCh := make(chan struct{}) @@ -198,7 +180,7 @@ func benchmarkHighContention(b *testing.B, q framework.SafeQueue) { return default: item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) - _ = q.Add(item) + q.Add(item) } } }() @@ -210,8 +192,8 @@ func benchmarkHighContention(b *testing.B, q framework.SafeQueue) { // Consumers drive the benchmark. b.RunParallel(func(pb *testing.PB) { for pb.Next() { - peeked, err := q.PeekHead() - if err == nil { + peeked := q.PeekHead() + if peeked != nil { _, _ = q.Remove(peeked.Handle()) } } diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go index d632c1dc0..2282b8efd 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go @@ -74,22 +74,16 @@ func testLifecycleAndOrdering( t.Helper() // PeekHead/PeekTail on empty queue - peeked, err := q.PeekHead() - assert.ErrorIs(t, err, framework.ErrQueueEmpty, - "[%s] PeekHead on empty queue should return ErrQueueEmpty", comparatorName) + peeked := q.PeekHead() assert.Nil(t, peeked, "[%s] PeekHead on empty queue should return a nil item", comparatorName) - peeked, err = q.PeekTail() - assert.ErrorIs(t, err, framework.ErrQueueEmpty, - "[%s] PeekTail on empty queue should return ErrQueueEmpty", comparatorName) + peeked = q.PeekTail() assert.Nil(t, peeked, "[%s] PeekTail on empty queue should return a nil item", comparatorName) // Add items currentExpectedLen := 0 var currentExpectedByteSize uint64 for i, item := range itemsInOrder { - addErr := q.Add(item) - require.NoError(t, addErr, "[%s] Add should not fail for a valid item (item %d, ID: %s)", - comparatorName, i, item.OriginalRequest().ID()) + q.Add(item) require.NotNil(t, item.Handle(), "[%s] Add must assign a non-nil handle to the item (item %d, ID: %s)", comparatorName, i, item.OriginalRequest().ID()) require.False(t, item.Handle().IsInvalidated(), @@ -121,8 +115,7 @@ func testLifecycleAndOrdering( expectedByteSize := expectedTotalByteSize for i, expectedItem := range itemsInOrder { // Verify PeekHead - peeked, err = q.PeekHead() - require.NoError(t, err, "[%s] PeekHead should not error on a non-empty queue (iteration %d)", comparatorName, i) + peeked = q.PeekHead() require.NotNil(t, peeked, "[%s] PeekHead should return a non-nil item (iteration %d)", comparatorName, i) assert.Equal(t, expectedItem.OriginalRequest().ID(), peeked.OriginalRequest().ID(), "[%s] PeekHead must return the item (ID: %s) at the head of the queue (iteration %d)", @@ -137,8 +130,7 @@ func testLifecycleAndOrdering( "[%s] ByteSize() must be unchanged after PeekHead (iteration %d)", comparatorName, i) // Verify PeekTail - peekedTail, err := q.PeekTail() - require.NoError(t, err, "[%s] PeekTail should not error on a non-empty queue (iteration %d)", comparatorName, i) + peekedTail := q.PeekTail() require.NotNil(t, peekedTail, "[%s] PeekTail should return a non-nil item (iteration %d)", comparatorName, i) // The tail is the last item in the *remaining* ordered slice. expectedTailItem := itemsInOrder[len(itemsInOrder)-1] @@ -166,9 +158,7 @@ func testLifecycleAndOrdering( assert.Zero(t, q.Len(), "[%s] Queue length should be 0 after all items are removed", comparatorName) assert.Zero(t, q.ByteSize(), "[%s] Queue byte size should be 0 after all items are removed", comparatorName) - peeked, err = q.PeekHead() - assert.ErrorIs(t, err, framework.ErrQueueEmpty, "[%s] PeekHead on an empty queue should return ErrQueueEmpty again", - comparatorName) + peeked = q.PeekHead() assert.Nil(t, peeked, "[%s] PeekHead on an empty queue should return a nil item again", comparatorName) } @@ -247,33 +237,18 @@ func TestQueueConformance(t *testing.T) { }) } - t.Run("Add_NilItem", func(t *testing.T) { - t.Parallel() - q, err := constructor(enqueueTimeComparator) - require.NoError(t, err, "Setup: creating queue for test should not fail") - - currentLen := q.Len() - currentByteSize := q.ByteSize() - err = q.Add(nil) - assert.ErrorIs(t, err, framework.ErrNilQueueItem, "Add(nil) must return ErrNilQueueItem") - assert.Equal(t, currentLen, q.Len(), "The queue's length must not change after a failed Add") - assert.Equal(t, currentByteSize, q.ByteSize(), "The queue's byte size must not change after a failed Add") - }) - t.Run("Remove_InvalidHandle", func(t *testing.T) { t.Parallel() q, err := constructor(enqueueTimeComparator) require.NoError(t, err, "Setup: creating queue for test should not fail") item := typesmocks.NewMockQueueItemAccessor(100, "item", flowKey) - err = q.Add(item) - require.NoError(t, err, "Setup: adding an item should succeed") + q.Add(item) otherQ, err := constructor(enqueueTimeComparator) // A different queue instance require.NoError(t, err, "Setup: creating otherQ should succeed") otherItem := typesmocks.NewMockQueueItemAccessor(10, "other_item", types.FlowKey{ID: "other-flow"}) - err = otherQ.Add(otherItem) - require.NoError(t, err, "Setup: adding item to otherQ should succeed") + otherQ.Add(otherItem) alienHandle := otherItem.Handle() require.NotNil(t, alienHandle, "Setup: alien handle should not be nil") @@ -322,9 +297,9 @@ func TestQueueConformance(t *testing.T) { item3 := typesmocks.NewMockQueueItemAccessor(30, "item3_nonhead", flowKey) item3.EnqueueTimeV = now.Add(-1 * time.Second) - _ = q.Add(item1) - _ = q.Add(item2) - _ = q.Add(item3) + q.Add(item1) + q.Add(item2) + q.Add(item3) require.Equal(t, 3, q.Len(), "Queue should have 3 items before removing non-head") handleNonHead := item2.Handle() @@ -351,8 +326,7 @@ func TestQueueConformance(t *testing.T) { t.Run("Cleanup_EmptyQueue", func(t *testing.T) { t.Parallel() emptyQ, _ := constructor(enqueueTimeComparator) - cleanedItems, err := emptyQ.Cleanup(predicateRemoveOddSizes) - require.NoError(t, err, "Cleanup on an empty queue should not return an error") + cleanedItems := emptyQ.Cleanup(predicateRemoveOddSizes) assert.Empty(t, cleanedItems, "Cleanup on an empty queue should return an empty slice") assert.Zero(t, emptyQ.Len(), "Len() should be 0 after Cleanup on an empty queue") assert.Zero(t, emptyQ.ByteSize(), "ByteSize() should be 0 after Cleanup on an empty queue") @@ -363,13 +337,12 @@ func TestQueueConformance(t *testing.T) { q, _ := constructor(enqueueTimeComparator) itemK1 := typesmocks.NewMockQueueItemAccessor(10, "k1_matchNone", flowKey) itemK2 := typesmocks.NewMockQueueItemAccessor(12, "k2_matchNone", flowKey) - _ = q.Add(itemK1) - _ = q.Add(itemK2) + q.Add(itemK1) + q.Add(itemK2) initialLen := q.Len() initialBs := q.ByteSize() - cleanedItems, err := q.Cleanup(func(item types.QueueItemAccessor) bool { return false }) - require.NoError(t, err, "Cleanup should not return an error") + cleanedItems := q.Cleanup(func(item types.QueueItemAccessor) bool { return false }) assert.Empty(t, cleanedItems, "Cleanup should return an empty slice when no items match the predicate") assert.Equal(t, initialLen, q.Len(), "Len() should not change after Cleanup when no items match thepredicate") assert.Equal(t, initialBs, q.ByteSize(), @@ -383,11 +356,10 @@ func TestQueueConformance(t *testing.T) { q, _ := constructor(enqueueTimeComparator) itemR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_matchAll", flowKey) itemR2 := typesmocks.NewMockQueueItemAccessor(13, "r2_matchAll", flowKey) - _ = q.Add(itemR1) - _ = q.Add(itemR2) + q.Add(itemR1) + q.Add(itemR2) - cleanedItems, err := q.Cleanup(func(item types.QueueItemAccessor) bool { return true }) - require.NoError(t, err, "Cleanup should not return an error") + cleanedItems := q.Cleanup(func(item types.QueueItemAccessor) bool { return true }) assert.Len(t, cleanedItems, 2, "Cleanup should return all items that matched the predicate") assert.Zero(t, q.Len(), "Len() should be 0 after Cleanup") assert.Zero(t, q.ByteSize(), "ByteSize() should be 0 after Cleanup") @@ -402,15 +374,14 @@ func TestQueueConformance(t *testing.T) { iR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_subset", flowKey) iK2 := typesmocks.NewMockQueueItemAccessor(22, "k2_subset", flowKey) iR2 := typesmocks.NewMockQueueItemAccessor(33, "r2_subset", flowKey) - _ = q.Add(iK1) - _ = q.Add(iR1) - _ = q.Add(iK2) - _ = q.Add(iR2) + q.Add(iK1) + q.Add(iR1) + q.Add(iK2) + q.Add(iR2) expectedKeptByteSize := iK1.OriginalRequest().ByteSize() + iK2.OriginalRequest().ByteSize() - cleanedItems, err := q.Cleanup(predicateRemoveOddSizes) - require.NoError(t, err, "Cleanup should not return an error") + cleanedItems := q.Cleanup(predicateRemoveOddSizes) assert.Len(t, cleanedItems, 2, "Cleanup should return 2 items that matched the predicate") assert.Equal(t, 2, q.Len(), "Len() should be 2 after Cleanup") assert.Equal(t, expectedKeptByteSize, q.ByteSize(), "ByteSize() should be sum of kept items after Cleanup") @@ -435,7 +406,7 @@ func TestQueueConformance(t *testing.T) { // Verify remaining items are correct var remainingIDs []string for q.Len() > 0 { - peeked, _ := q.PeekHead() + peeked := q.PeekHead() item, _ := q.Remove(peeked.Handle()) remainingIDs = append(remainingIDs, item.OriginalRequest().ID()) } @@ -452,11 +423,10 @@ func TestQueueConformance(t *testing.T) { itemD1 := typesmocks.NewMockQueueItemAccessor(10, "ditem1", flowKey) itemD2 := typesmocks.NewMockQueueItemAccessor(20, "ditem2", flowKey) - _ = q.Add(itemD1) - _ = q.Add(itemD2) + q.Add(itemD1) + q.Add(itemD2) - drainedItems, err := q.Drain() - require.NoError(t, err, "Drain on a non-empty queue should not fail") + drainedItems := q.Drain() assert.Len(t, drainedItems, 2, "Drain should return all items that were in the queue") assert.Zero(t, q.Len(), "Queue length must be 0 after Drain") assert.Zero(t, q.ByteSize(), "Queue byte size must be 0 after Drain") @@ -482,12 +452,10 @@ func TestQueueConformance(t *testing.T) { q, err := constructor(enqueueTimeComparator) require.NoError(t, err, "Setup: creating queue for empty drain test should not fail") - drainedItems, err := q.Drain() // First drain on empty - require.NoError(t, err, "Drain on an empty queue should not fail") + drainedItems := q.Drain() // First drain on empty assert.Empty(t, drainedItems, "Drain on an empty queue should return an empty slice") - drainedAgain, err := q.Drain() // Second drain on already empty - require.NoError(t, err, "Second drain on an already empty queue should not fail") + drainedAgain := q.Drain() // Second drain on already empty assert.Empty(t, drainedAgain, "Second drain on an already empty queue should return an empty slice") assert.Zero(t, q.Len()) assert.Zero(t, q.ByteSize()) @@ -510,7 +478,7 @@ func TestQueueConformance(t *testing.T) { // Pre-populate the queue with an initial set of items. for i := range initialItems { item := typesmocks.NewMockQueueItemAccessor(1, fmt.Sprintf("%s_conc_init_%d", flowKey, i), flowKey) - err := q.Add(item) + q.Add(item) require.NoError(t, err, "Setup: pre-populating the queue should not fail") handleChan <- item.Handle() } @@ -529,11 +497,9 @@ func TestQueueConformance(t *testing.T) { case 0: // Add item := typesmocks.NewMockQueueItemAccessor(1, fmt.Sprintf("%s_conc_init_%d_%d", flowKey, routineID, j), flowKey) - err := q.Add(item) - if assert.NoError(t, err, "Add must be goroutine-safe") { - successfulAdds.Add(1) - handleChan <- item.Handle() - } + q.Add(item) + successfulAdds.Add(1) + handleChan <- item.Handle() case 1: // Remove select { case handle := <-handleChan: @@ -553,17 +519,16 @@ func TestQueueConformance(t *testing.T) { case 2: // Inspect _ = q.Len() _ = q.ByteSize() - _, err := q.PeekHead() - if q.Len() == 0 { // Only expect ErrQueueEmpty if Len is 0 - assert.ErrorIs(t, err, framework.ErrQueueEmpty, "Peek on empty queue expected ErrQueueEmpty") + peeked := q.PeekHead() + if q.Len() == 0 { + assert.Nil(t, peeked, "PeekHead on empty queue expected nil") } - _, err = q.PeekTail() - if q.Len() == 0 { // Only expect ErrQueueEmpty if Len is 0 - assert.ErrorIs(t, err, framework.ErrQueueEmpty, "PeekTail on empty queue expected ErrQueueEmpty") + peeked = q.PeekTail() + if q.Len() == 0 { + assert.Nil(t, peeked, "PeekTail on empty queue expected nil") } case 3: // Cleanup - _, cleanupErr := q.Cleanup(func(item types.QueueItemAccessor) bool { return false }) - assert.NoError(t, cleanupErr, "Cleanup (no-op) should be goroutine-safe") + q.Cleanup(func(item types.QueueItemAccessor) bool { return false }) } } }(i) @@ -573,8 +538,7 @@ func TestQueueConformance(t *testing.T) { close(handleChan) // Drain the queue to verify all handles are invalidated and to count remaining items accurately. - drainedItems, drainErr := q.Drain() - require.NoError(t, drainErr, "Draining queue at the end of concurrency test should not fail") + drainedItems := q.Drain() for _, item := range drainedItems { require.True(t, item.Handle().IsInvalidated(), "All handles from final drain must be invalidated") diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/listqueue.go b/pkg/epp/flowcontrol/framework/plugins/queue/listqueue.go index 082510e71..0d241cf97 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/listqueue.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/listqueue.go @@ -102,18 +102,13 @@ func newListQueue() *listQueue { // --- `framework.SafeQueue` Interface Implementation --- // Add enqueues an item to the back of the list. -func (lq *listQueue) Add(item types.QueueItemAccessor) error { +func (lq *listQueue) Add(item types.QueueItemAccessor) { lq.mu.Lock() defer lq.mu.Unlock() - if item == nil { - return framework.ErrNilQueueItem - } - element := lq.requests.PushBack(item) lq.byteSize.Add(item.OriginalRequest().ByteSize()) item.SetHandle(&listItemHandle{element: element, owner: lq}) - return nil } // Remove removes an item identified by the given handle from the queue. @@ -142,7 +137,7 @@ func (lq *listQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccess } // Cleanup removes items from the queue that satisfy the predicate. -func (lq *listQueue) Cleanup(predicate framework.PredicateFunc) (cleanedItems []types.QueueItemAccessor, err error) { +func (lq *listQueue) Cleanup(predicate framework.PredicateFunc) (cleanedItems []types.QueueItemAccessor) { lq.mu.Lock() defer lq.mu.Unlock() @@ -162,11 +157,11 @@ func (lq *listQueue) Cleanup(predicate framework.PredicateFunc) (cleanedItems [] removedItems = append(removedItems, item) } } - return removedItems, nil + return removedItems } // Drain removes all items from the queue and returns them. -func (lq *listQueue) Drain() (removedItems []types.QueueItemAccessor, err error) { +func (lq *listQueue) Drain() (removedItems []types.QueueItemAccessor) { lq.mu.Lock() defer lq.mu.Unlock() @@ -182,7 +177,7 @@ func (lq *listQueue) Drain() (removedItems []types.QueueItemAccessor, err error) lq.requests.Init() lq.byteSize.Store(0) - return removedItems, nil + return removedItems } // Name returns the name of the queue. @@ -208,25 +203,25 @@ func (lq *listQueue) ByteSize() uint64 { } // PeekHead returns the item at the front of the queue without removing it. -func (lq *listQueue) PeekHead() (head types.QueueItemAccessor, err error) { +func (lq *listQueue) PeekHead() types.QueueItemAccessor { lq.mu.RLock() defer lq.mu.RUnlock() if lq.requests.Len() == 0 { - return nil, framework.ErrQueueEmpty + return nil } element := lq.requests.Front() - return element.Value.(types.QueueItemAccessor), nil + return element.Value.(types.QueueItemAccessor) } // PeekTail returns the item at the back of the queue without removing it. -func (lq *listQueue) PeekTail() (tail types.QueueItemAccessor, err error) { +func (lq *listQueue) PeekTail() types.QueueItemAccessor { lq.mu.RLock() defer lq.mu.RUnlock() if lq.requests.Len() == 0 { - return nil, framework.ErrQueueEmpty + return nil } element := lq.requests.Back() - return element.Value.(types.QueueItemAccessor), nil + return element.Value.(types.QueueItemAccessor) } diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap.go b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap.go index 41a15fa73..4a3bc8773 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap.go @@ -115,56 +115,51 @@ func (h *maxMinHeap) ByteSize() uint64 { // PeekHead returns the item with the highest priority (max value) without removing it. // Time complexity: O(1). -func (h *maxMinHeap) PeekHead() (types.QueueItemAccessor, error) { +func (h *maxMinHeap) PeekHead() types.QueueItemAccessor { h.mu.RLock() defer h.mu.RUnlock() if len(h.items) == 0 { - return nil, framework.ErrQueueEmpty + return nil } // The root of the max-min heap is always the maximum element. - return h.items[0], nil + return h.items[0] } // PeekTail returns the item with the lowest priority (min value) without removing it. // Time complexity: O(1). -func (h *maxMinHeap) PeekTail() (types.QueueItemAccessor, error) { +func (h *maxMinHeap) PeekTail() types.QueueItemAccessor { h.mu.RLock() defer h.mu.RUnlock() n := len(h.items) if n == 0 { - return nil, framework.ErrQueueEmpty + return nil } if n == 1 { - return h.items[0], nil + return h.items[0] } if n == 2 { // With two items, the root is max, the second is min. - return h.items[1], nil + return h.items[1] } // With three or more items, the minimum element is guaranteed to be one of the two children of the root (at indices 1 // and 2). We must compare them to find the true minimum. if h.comparator.Func()(h.items[1], h.items[2]) { - return h.items[2], nil + return h.items[2] } - return h.items[1], nil + return h.items[1] } // Add adds an item to the queue. // Time complexity: O(log n). -func (h *maxMinHeap) Add(item types.QueueItemAccessor) error { +func (h *maxMinHeap) Add(item types.QueueItemAccessor) { h.mu.Lock() defer h.mu.Unlock() - if item == nil { - return framework.ErrNilQueueItem - } - h.push(item) h.byteSize.Add(item.OriginalRequest().ByteSize()) - return nil } // push adds an item to the heap and restores the heap property. @@ -425,7 +420,7 @@ func isMinLevel(i int) bool { } // Cleanup removes items from the queue that satisfy the predicate. -func (h *maxMinHeap) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) { +func (h *maxMinHeap) Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor { h.mu.Lock() defer h.mu.Unlock() @@ -459,11 +454,11 @@ func (h *maxMinHeap) Cleanup(predicate framework.PredicateFunc) ([]types.QueueIt } } - return removedItems, nil + return removedItems } // Drain removes all items from the queue. -func (h *maxMinHeap) Drain() ([]types.QueueItemAccessor, error) { +func (h *maxMinHeap) Drain() []types.QueueItemAccessor { h.mu.Lock() defer h.mu.Unlock() @@ -482,5 +477,5 @@ func (h *maxMinHeap) Drain() ([]types.QueueItemAccessor, error) { h.handles = make(map[types.QueueItemHandle]*heapItem) h.byteSize.Store(0) - return drainedItems, nil + return drainedItems } diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap_test.go index b4e22ab74..ee6fab708 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap_test.go @@ -39,8 +39,7 @@ func TestMaxMinHeap_InternalProperty(t *testing.T) { // Add items in a somewhat random order of enqueue times items[i] = typesmocks.NewMockQueueItemAccessor(10, "item", types.FlowKey{ID: "flow"}) items[i].EnqueueTimeV = now.Add(time.Duration((i%5-2)*10) * time.Second) - err := q.Add(items[i]) - require.NoError(t, err, "Add should not fail") + q.Add(items[i]) assertHeapProperty(t, q, "after adding item %d", i) } @@ -54,9 +53,9 @@ func TestMaxMinHeap_InternalProperty(t *testing.T) { // Remove remaining items from the head and validate each time for q.Len() > 0 { - head, err := q.PeekHead() - require.NoError(t, err) - _, err = q.Remove(head.Handle()) + head := q.PeekHead() + require.NotNil(t, head) + _, err := q.Remove(head.Handle()) require.NoError(t, err) assertHeapProperty(t, q, "after removing head item") } diff --git a/pkg/epp/flowcontrol/framework/queue.go b/pkg/epp/flowcontrol/framework/queue.go index d00cccfaf..3f33df75c 100644 --- a/pkg/epp/flowcontrol/framework/queue.go +++ b/pkg/epp/flowcontrol/framework/queue.go @@ -20,30 +20,23 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) -// QueueCapability defines a functional capability that a `SafeQueue` implementation can provide. -// These capabilities allow policies (like `IntraFlowDispatchPolicy`) to declare their operational requirements, -// ensuring that a policy is always paired with a compatible queue. -// -// For example, a policy that requires a priority-ordered queue would declare `CapabilityPriorityConfigurable`, and the -// `contracts.FlowRegistry` would ensure it is paired with a queue implementation (like a heap) that provides this -// capability. -// -// While a simpler boolean method (e.g., `IsPriorityConfigurable()`) could satisfy current needs, this slice-based -// approach is intentionally chosen for future extensibility. It allows a queue to advertise multiple features (e.g., -// dynamic priority reshuffling, size bounds, etc.) without future breaking changes to the `SafeQueue` interface. +// QueueCapability defines a functional capability that a SafeQueue implementation can provide. +// These capabilities allow policies to declare their operational requirements, ensuring that a policy is always paired +// with a compatible queue. type QueueCapability string const ( // CapabilityFIFO indicates that the queue operates in a First-In, First-Out manner. - // `PeekHead()` will return the oldest item (by logical enqueue time). + // PeekHead() will return the oldest item (by logical enqueue time). CapabilityFIFO QueueCapability = "FIFO" - // CapabilityPriorityConfigurable indicates that the queue's ordering is determined by an `ItemComparator`. - // `PeekHead()` will return the highest priority item according to this comparator. + // CapabilityPriorityConfigurable indicates that the queue's ordering is determined by an ItemComparator. + // PeekHead() will return the highest priority item, and PeekTail() will return the lowest priority item according to + // this comparator. CapabilityPriorityConfigurable QueueCapability = "PriorityConfigurable" ) -// QueueInspectionMethods defines `SafeQueue`'s read-only methods. +// QueueInspectionMethods defines SafeQueue's read-only methods. type QueueInspectionMethods interface { // Name returns a string identifier for the concrete queue implementation type (e.g., "ListQueue"). Name() string @@ -59,48 +52,44 @@ type QueueInspectionMethods interface { // PeekHead returns the item at the "head" of the queue (the item with the highest priority according to the queue's // ordering) without removing it. - // - // Returns `ErrQueueEmpty` if the queue is empty. - PeekHead() (peekedItem types.QueueItemAccessor, err error) + // Returns nil if the queue is empty. + PeekHead() types.QueueItemAccessor // PeekTail returns the item at the "tail" of the queue (the item with the lowest priority according to the queue's // ordering) without removing it. - // - // Returns `ErrQueueEmpty` if the queue is empty. - PeekTail() (peekedItem types.QueueItemAccessor, err error) + // Returns nil if the queue is empty. + PeekTail() types.QueueItemAccessor } // PredicateFunc defines a function that returns true if a given item matches a certain condition. -// It is used by `SafeQueue.Cleanup` to filter items. +// It is used by SafeQueue.Cleanup to filter items. type PredicateFunc func(item types.QueueItemAccessor) bool // SafeQueue defines the contract for a single, concurrent-safe queue implementation. -// +// This interface is designed for in-memory, synchronous flow control. Implementations are expected to be unbounded; +// capacity management occurs outside the queue implementation. // All implementations MUST be goroutine-safe. type SafeQueue interface { QueueInspectionMethods - // Add attempts to enqueue an item. On success, it must associate a new, unique `types.QueueItemHandle` with the item - // by calling `item.SetHandle()`. - Add(item types.QueueItemAccessor) error + // Add enqueues an item. It must associate a new, unique types.QueueItemHandle with the item by calling + // item.SetHandle(). + // Contract: The caller MUST NOT provide a nil item. + Add(item types.QueueItemAccessor) // Remove atomically finds and removes the item identified by the given handle. - // - // On success, implementations MUST invalidate the provided handle by calling `handle.Invalidate()`. - // + // On success, implementations MUST invalidate the provided handle by calling handle.Invalidate(). // Returns the removed item. - // Returns `ErrInvalidQueueItemHandle` if the handle is invalid (e.g., nil, wrong type, already invalidated). - // Returns `ErrQueueItemNotFound` if the handle is valid but the item is not in the queue. + // Returns ErrInvalidQueueItemHandle if the handle is invalid. + // Returns ErrQueueItemNotFound if the handle is valid but the item is not in the queue. Remove(handle types.QueueItemHandle) (removedItem types.QueueItemAccessor, err error) // Cleanup iterates through the queue and atomically removes all items for which the predicate returns true, returning // them in a slice. - // // The handle for each removed item MUST be invalidated. - Cleanup(predicate PredicateFunc) (cleanedItems []types.QueueItemAccessor, err error) + Cleanup(predicate PredicateFunc) (cleanedItems []types.QueueItemAccessor) // Drain atomically removes all items from the queue and returns them in a slice. - // // The handle for all removed items MUST be invalidated. The queue MUST be empty after this operation. - Drain() (drainedItems []types.QueueItemAccessor, err error) + Drain() (drainedItems []types.QueueItemAccessor) } From 23741847f4c98d3fcd0dba024c8fe2ce914b6131 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Thu, 6 Nov 2025 22:01:07 +0000 Subject: [PATCH 2/2] refactor: Decouple ManagedQueue from SafeQueue This commit refactors the `ManagedQueue` contract after the preceding commit which altered the `SafeQueue` contract. Motivation: The preceding commit made the generic `framework.SafeQueue.Add` method infallible. This created a design conflict, as the higher-level `ManagedQueue` decorator requires a *fallible* `Add` operation to atomically reject requests when its parent shard is draining. This commit addresses this by refactoring `ManagedQueue` to favor composition over embedding: - `ManagedQueue` no longer embeds `SafeQueue`; it now contains it as a field, correctly modeling the "has-a" relationship. - The `ManagedQueue.Add` method now has its own explicit, fallible contract, returning `contracts.ErrShardDraining`. - The `Remove` and `Cleanup` operations follow the new, simpler `SafeQueue` contract. This change also includes the necessary updates to all callers in the `controller` and `registry` packages to conform to these new, more robust contracts as well as updating tests and mocks. --- pkg/epp/flowcontrol/contracts/mocks/mocks.go | 22 ++--- pkg/epp/flowcontrol/contracts/registry.go | 25 +++-- pkg/epp/flowcontrol/controller/controller.go | 2 +- .../flowcontrol/controller/controller_test.go | 2 +- .../controller/internal/processor.go | 10 +- .../controller/internal/processor_test.go | 2 +- pkg/epp/flowcontrol/registry/managedqueue.go | 91 ++++++++---------- .../flowcontrol/registry/managedqueue_test.go | 94 ++++--------------- 8 files changed, 90 insertions(+), 158 deletions(-) diff --git a/pkg/epp/flowcontrol/contracts/mocks/mocks.go b/pkg/epp/flowcontrol/contracts/mocks/mocks.go index 10f093b11..49205ee1e 100644 --- a/pkg/epp/flowcontrol/contracts/mocks/mocks.go +++ b/pkg/epp/flowcontrol/contracts/mocks/mocks.go @@ -149,9 +149,9 @@ type MockManagedQueue struct { // RemoveFunc allows a test to completely override the default Remove behavior. RemoveFunc func(handle types.QueueItemHandle) (types.QueueItemAccessor, error) // CleanupFunc allows a test to completely override the default Cleanup behavior. - CleanupFunc func(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) + CleanupFunc func(predicate framework.PredicateFunc) []types.QueueItemAccessor // DrainFunc allows a test to completely override the default Drain behavior. - DrainFunc func() ([]types.QueueItemAccessor, error) + DrainFunc func() []types.QueueItemAccessor // mu protects access to the internal `items` map. mu sync.Mutex @@ -209,7 +209,7 @@ func (m *MockManagedQueue) Remove(handle types.QueueItemHandle) (types.QueueItem } // Cleanup removes items matching a predicate. It checks for a test override before locking. -func (m *MockManagedQueue) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) { +func (m *MockManagedQueue) Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor { if m.CleanupFunc != nil { return m.CleanupFunc(predicate) } @@ -223,11 +223,11 @@ func (m *MockManagedQueue) Cleanup(predicate framework.PredicateFunc) ([]types.Q delete(m.items, handle) } } - return removed, nil + return removed } // Drain removes all items from the queue. It checks for a test override before locking. -func (m *MockManagedQueue) Drain() ([]types.QueueItemAccessor, error) { +func (m *MockManagedQueue) Drain() []types.QueueItemAccessor { if m.DrainFunc != nil { return m.DrainFunc() } @@ -239,7 +239,7 @@ func (m *MockManagedQueue) Drain() ([]types.QueueItemAccessor, error) { drained = append(drained, item) } m.items = make(map[types.QueueItemHandle]types.QueueItemAccessor) - return drained, nil + return drained } func (m *MockManagedQueue) FlowKey() types.FlowKey { return m.FlowKeyV } @@ -268,17 +268,17 @@ func (m *MockManagedQueue) ByteSize() uint64 { } // PeekHead returns the first item found in the mock queue. Note: map iteration order is not guaranteed. -func (m *MockManagedQueue) PeekHead() (types.QueueItemAccessor, error) { +func (m *MockManagedQueue) PeekHead() types.QueueItemAccessor { m.mu.Lock() defer m.mu.Unlock() m.init() for _, item := range m.items { - return item, nil // Return first item found + return item // Return first item found } - return nil, nil // Queue is empty + return nil // Queue is empty } // PeekTail is not implemented for this mock. -func (m *MockManagedQueue) PeekTail() (types.QueueItemAccessor, error) { - return nil, nil +func (m *MockManagedQueue) PeekTail() types.QueueItemAccessor { + return nil } diff --git a/pkg/epp/flowcontrol/contracts/registry.go b/pkg/epp/flowcontrol/contracts/registry.go index fe0b790b9..addc2c1f0 100644 --- a/pkg/epp/flowcontrol/contracts/registry.go +++ b/pkg/epp/flowcontrol/contracts/registry.go @@ -142,17 +142,24 @@ type RegistryShard interface { } // ManagedQueue defines the interface for a flow's queue on a specific shard. -// It acts as a stateful decorator around an underlying `framework.SafeQueue`, augmenting it with statistics tracking. +// It acts as a stateful decorator that *use an underlying framework.SafeQueue, augmenting it with statistics tracking +// and lifecycle awareness (e.g., rejecting adds when a shard is draining). // -// # Conformance -// -// - Implementations MUST be goroutine-safe. -// - All mutating methods MUST ensure that the underlying queue state and the public statistics (`Len`, `ByteSize`) -// are updated as a single atomic transaction. -// - The `Add` method MUST return an error wrapping `ErrShardDraining` if the queue instance belongs to a parent shard -// that is no longer Active. +// Conformance: Implementations MUST be goroutine-safe. type ManagedQueue interface { - framework.SafeQueue + // Add attempts to enqueue an item, performing an atomic check on the parent shard's lifecycle state before adding + // the item to the underlying queue. + // Returns ErrShardDraining if the parent shard is no longer Active. + Add(item types.QueueItemAccessor) error + + // Remove atomically finds and removes an item from the underlying queue using its handle. + Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) + + // Cleanup removes all items from the underlying queue that satisfy the predicate. + Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor + + // Drain removes all items from the underlying queue. + Drain() []types.QueueItemAccessor // FlowQueueAccessor returns a read-only, flow-aware accessor for this queue, used by policy plugins. // Conformance: This method MUST NOT return nil. diff --git a/pkg/epp/flowcontrol/controller/controller.go b/pkg/epp/flowcontrol/controller/controller.go index a11ef26a5..751a420f4 100644 --- a/pkg/epp/flowcontrol/controller/controller.go +++ b/pkg/epp/flowcontrol/controller/controller.go @@ -374,7 +374,7 @@ func (fc *FlowController) selectDistributionCandidates(key types.FlowKey) ([]can "flowKey", key, "shardID", shard.ID()) continue } - candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.ByteSize()}) + candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.FlowQueueAccessor().ByteSize()}) } return nil }) diff --git a/pkg/epp/flowcontrol/controller/controller_test.go b/pkg/epp/flowcontrol/controller/controller_test.go index 917a5a1e5..9f4637446 100644 --- a/pkg/epp/flowcontrol/controller/controller_test.go +++ b/pkg/epp/flowcontrol/controller/controller_test.go @@ -1128,7 +1128,7 @@ func setupRegistryForConcurrency(t *testing.T, numShards int, flowKey types.Flow IntraFlowDispatchPolicyFunc: func(_ types.FlowKey) (framework.IntraFlowDispatchPolicy, error) { return &frameworkmocks.MockIntraFlowDispatchPolicy{ SelectItemFunc: func(qa framework.FlowQueueAccessor) (types.QueueItemAccessor, error) { - return qa.PeekHead() + return qa.PeekHead(), nil }, }, nil }, diff --git a/pkg/epp/flowcontrol/controller/internal/processor.go b/pkg/epp/flowcontrol/controller/internal/processor.go index 2370fd646..7fb74aa26 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor.go +++ b/pkg/epp/flowcontrol/controller/internal/processor.go @@ -408,10 +408,7 @@ func (sp *ShardProcessor) sweepFinalizedItems() { predicate := func(itemAcc types.QueueItemAccessor) bool { return itemAcc.(*FlowItem).FinalState() != nil } - removedItems, err := managedQ.Cleanup(predicate) - if err != nil { - logger.Error(err, "Error during ManagedQueue Cleanup", "flowKey", key) - } + removedItems := managedQ.Cleanup(predicate) logger.V(logutil.DEBUG).Info("Swept finalized items and released capacity.", "flowKey", key, "count", len(removedItems)) } @@ -449,10 +446,7 @@ func (sp *ShardProcessor) shutdown() { func (sp *ShardProcessor) evictAll() { processFn := func(managedQ contracts.ManagedQueue, logger logr.Logger) { key := managedQ.FlowQueueAccessor().FlowKey() - removedItems, err := managedQ.Drain() - if err != nil { - logger.Error(err, "Error during ManagedQueue Drain", "flowKey", key) - } + removedItems := managedQ.Drain() outcome := types.QueueOutcomeEvictedOther errShutdown := fmt.Errorf("%w: %w", types.ErrEvicted, types.ErrFlowControllerNotRunning) diff --git a/pkg/epp/flowcontrol/controller/internal/processor_test.go b/pkg/epp/flowcontrol/controller/internal/processor_test.go index 73fc5b13d..a4efd3c5a 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor_test.go +++ b/pkg/epp/flowcontrol/controller/internal/processor_test.go @@ -282,7 +282,7 @@ func (h *testHarness) intraFlowDispatchPolicy(types.FlowKey) (framework.IntraFlo // Otherwise, use a default implementation that selects the head of the queue. policy.SelectItemFunc = func(fqa framework.FlowQueueAccessor) (types.QueueItemAccessor, error) { - return fqa.PeekHead() + return fqa.PeekHead(), nil } return policy, nil } diff --git a/pkg/epp/flowcontrol/registry/managedqueue.go b/pkg/epp/flowcontrol/registry/managedqueue.go index e56cdd821..b141dff39 100644 --- a/pkg/epp/flowcontrol/registry/managedqueue.go +++ b/pkg/epp/flowcontrol/registry/managedqueue.go @@ -100,7 +100,7 @@ func newManagedQueue( "flowKey", key, "queueType", queue.Name(), ) - mq := &managedQueue{ + return &managedQueue{ queue: queue, dispatchPolicy: dispatchPolicy, key: key, @@ -108,37 +108,30 @@ func newManagedQueue( logger: mqLogger, isDraining: isDraining, } - return mq } // FlowQueueAccessor returns a read-only, flow-aware view of this queue. -// This accessor is primarily used by policy plugins to inspect the queue's state in a structured way. func (mq *managedQueue) FlowQueueAccessor() framework.FlowQueueAccessor { return &flowQueueAccessor{mq: mq} } -// Add wraps the underlying `framework.SafeQueue.Add` call and atomically updates the queue's and the parent shard's -// statistics. +// Add performs an atomic check on the parent shard's lifecycle state before adding the item to the underlying queue. +// This is the critical enforcement point that prevents new requests from entering a draining shard. func (mq *managedQueue) Add(item types.QueueItemAccessor) error { - // Enforce the system's routing contract by rejecting new work for a Draining shard. - // This prevents a race where a caller could route a request to a shard just as it begins to drain. - if mq.isDraining() { - return contracts.ErrShardDraining - } - mq.mu.Lock() defer mq.mu.Unlock() - if err := mq.queue.Add(item); err != nil { - return err + if mq.isDraining() { + return contracts.ErrShardDraining } + mq.queue.Add(item) + mq.propagateStatsDeltaLocked(1, int64(item.OriginalRequest().ByteSize())) mq.logger.V(logging.TRACE).Info("Request added to queue", "requestID", item.OriginalRequest().ID()) return nil } -// Remove wraps the underlying `framework.SafeQueue.Remove` call and atomically updates statistics upon successful -// removal. +// Remove wraps the underlying framework.SafeQueue.Remove and updates statistics. func (mq *managedQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) { mq.mu.Lock() defer mq.mu.Unlock() @@ -152,52 +145,43 @@ func (mq *managedQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAcc return removedItem, nil } -// Cleanup wraps the underlying `framework.SafeQueue.Cleanup` call and atomically updates statistics for all removed -// items. -func (mq *managedQueue) Cleanup(predicate framework.PredicateFunc) (cleanedItems []types.QueueItemAccessor, err error) { +// Cleanup wraps the underlying framework.SafeQueue.Cleanup and updates statistics. +func (mq *managedQueue) Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor { mq.mu.Lock() defer mq.mu.Unlock() - cleanedItems, err = mq.queue.Cleanup(predicate) - if err != nil { - return nil, err - } + cleanedItems := mq.queue.Cleanup(predicate) if len(cleanedItems) == 0 { - return cleanedItems, nil + return nil } mq.propagateStatsDeltaForRemovedItemsLocked(cleanedItems) mq.logger.V(logging.DEBUG).Info("Cleaned up queue", "removedItemCount", len(cleanedItems)) - return cleanedItems, nil + return cleanedItems } -// Drain wraps the underlying `framework.SafeQueue.Drain` call and atomically updates statistics for all removed items. -func (mq *managedQueue) Drain() ([]types.QueueItemAccessor, error) { +// Drain wraps the underlying framework.SafeQueue.Drain and updates statistics. +func (mq *managedQueue) Drain() []types.QueueItemAccessor { mq.mu.Lock() defer mq.mu.Unlock() - drainedItems, err := mq.queue.Drain() - if err != nil { - return nil, err - } + drainedItems := mq.queue.Drain() if len(drainedItems) == 0 { - return drainedItems, nil + return nil } mq.propagateStatsDeltaForRemovedItemsLocked(drainedItems) mq.logger.V(logging.DEBUG).Info("Drained queue", "itemCount", len(drainedItems)) - return drainedItems, nil + return drainedItems } -// --- Pass-through and Accessor Methods --- - -func (mq *managedQueue) Name() string { return mq.queue.Name() } -func (mq *managedQueue) Capabilities() []framework.QueueCapability { return mq.queue.Capabilities() } -func (mq *managedQueue) Len() int { return int(mq.len.Load()) } -func (mq *managedQueue) ByteSize() uint64 { return uint64(mq.byteSize.Load()) } -func (mq *managedQueue) PeekHead() (types.QueueItemAccessor, error) { return mq.queue.PeekHead() } -func (mq *managedQueue) PeekTail() (types.QueueItemAccessor, error) { return mq.queue.PeekTail() } -func (mq *managedQueue) Comparator() framework.ItemComparator { return mq.dispatchPolicy.Comparator() } +// Len returns the current number of items in the queue. +func (mq *managedQueue) Len() int { + return int(mq.len.Load()) +} -// --- Internal Methods --- +// ByteSize returns the current total byte size of all items in the queue. +func (mq *managedQueue) ByteSize() uint64 { + return uint64(mq.byteSize.Load()) +} // propagateStatsDeltaLocked updates the queue's statistics and propagates the delta to the parent shard. // It must be called while holding the `managedQueue.mu` lock. @@ -243,11 +227,18 @@ type flowQueueAccessor struct { var _ framework.FlowQueueAccessor = &flowQueueAccessor{} -func (a *flowQueueAccessor) Name() string { return a.mq.Name() } -func (a *flowQueueAccessor) Capabilities() []framework.QueueCapability { return a.mq.Capabilities() } -func (a *flowQueueAccessor) Len() int { return a.mq.Len() } -func (a *flowQueueAccessor) ByteSize() uint64 { return a.mq.ByteSize() } -func (a *flowQueueAccessor) PeekHead() (types.QueueItemAccessor, error) { return a.mq.PeekHead() } -func (a *flowQueueAccessor) PeekTail() (types.QueueItemAccessor, error) { return a.mq.PeekTail() } -func (a *flowQueueAccessor) Comparator() framework.ItemComparator { return a.mq.Comparator() } -func (a *flowQueueAccessor) FlowKey() types.FlowKey { return a.mq.key } +// --- Read-only pass-through methods to the underlying SafeQueue --- +func (a *flowQueueAccessor) Name() string { return a.mq.queue.Name() } +func (a *flowQueueAccessor) Capabilities() []framework.QueueCapability { + return a.mq.queue.Capabilities() +} +func (a *flowQueueAccessor) PeekHead() types.QueueItemAccessor { return a.mq.queue.PeekHead() } +func (a *flowQueueAccessor) PeekTail() types.QueueItemAccessor { return a.mq.queue.PeekTail() } + +// --- Read-only methods from the managedQueue wrapper --- +func (a *flowQueueAccessor) Len() int { return a.mq.Len() } +func (a *flowQueueAccessor) ByteSize() uint64 { return a.mq.ByteSize() } +func (a *flowQueueAccessor) Comparator() framework.ItemComparator { + return a.mq.dispatchPolicy.Comparator() +} +func (a *flowQueueAccessor) FlowKey() types.FlowKey { return a.mq.key } diff --git a/pkg/epp/flowcontrol/registry/managedqueue_test.go b/pkg/epp/flowcontrol/registry/managedqueue_test.go index b4d32fba4..1e0afaca7 100644 --- a/pkg/epp/flowcontrol/registry/managedqueue_test.go +++ b/pkg/epp/flowcontrol/registry/managedqueue_test.go @@ -132,23 +132,13 @@ func TestManagedQueue_Add(t *testing.T) { { name: "ShouldSucceed_AndIncrementStats", setupMock: func(q *frameworkmocks.MockSafeQueue) { - q.AddFunc = func(types.QueueItemAccessor) error { return nil } + q.AddFunc = func(types.QueueItemAccessor) {} }, isDraining: false, expectErr: false, expectedLenDelta: 1, expectedByteSizeDelta: 100, }, - { - name: "ShouldFail_AndNotChangeStats_WhenUnderlyingQueueFails", - setupMock: func(q *frameworkmocks.MockSafeQueue) { - q.AddFunc = func(types.QueueItemAccessor) error { return errors.New("add failed") } - }, - isDraining: false, - expectErr: true, - expectedLenDelta: 0, - expectedByteSizeDelta: 0, - }, { name: "ShouldFail_AndNotChangeStats_WhenQueueIsDraining", setupMock: func(q *frameworkmocks.MockSafeQueue) {}, @@ -255,40 +245,26 @@ func TestManagedQueue_Cleanup(t *testing.T) { testCases := []struct { name string setupMock func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) - expectErr bool expectedLenDelta int64 expectedByteSizeDelta int64 }{ { name: "ShouldSucceed_AndDecrementStats_WhenItemsRemoved", setupMock: func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) { - q.CleanupFunc = func(_ framework.PredicateFunc) ([]types.QueueItemAccessor, error) { - return items, nil + q.CleanupFunc = func(_ framework.PredicateFunc) []types.QueueItemAccessor { + return items } }, - expectErr: false, expectedLenDelta: -2, expectedByteSizeDelta: -125, }, { name: "ShouldSucceed_AndNotChangeStats_WhenNoItemsRemoved", setupMock: func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) { - q.CleanupFunc = func(_ framework.PredicateFunc) ([]types.QueueItemAccessor, error) { - return nil, nil // Simulate no items matching predicate. + q.CleanupFunc = func(_ framework.PredicateFunc) []types.QueueItemAccessor { + return nil // Simulate no items matching predicate. } }, - expectErr: false, - expectedLenDelta: 0, - expectedByteSizeDelta: 0, - }, - { - name: "ShouldFail_AndNotChangeStats_WhenUnderlyingQueueFails", - setupMock: func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) { - q.CleanupFunc = func(_ framework.PredicateFunc) ([]types.QueueItemAccessor, error) { - return nil, errors.New("cleanup failed") - } - }, - expectErr: true, expectedLenDelta: 0, expectedByteSizeDelta: 0, }, @@ -305,14 +281,7 @@ func TestManagedQueue_Cleanup(t *testing.T) { } h.setupWithItems(items...) tc.setupMock(q, items) - - _, err := h.mq.Cleanup(func(_ types.QueueItemAccessor) bool { return true }) - - if tc.expectErr { - require.Error(t, err, "Cleanup operation must fail if the underlying queue implementation encounters an error") - } else { - require.NoError(t, err, "Cleanup operation must succeed if the underlying queue implementation succeeds") - } + h.mq.Cleanup(func(_ types.QueueItemAccessor) bool { return true }) assert.Equal(t, tc.expectedLenDelta, h.propagator.lenDelta.Load(), "The propagated length delta must exactly match the total number of items removed during cleanup") assert.Equal(t, tc.expectedByteSizeDelta, h.propagator.byteSizeDelta.Load(), @@ -328,32 +297,19 @@ func TestManagedQueue_Drain(t *testing.T) { testCases := []struct { name string setupMock func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) - expectErr bool expectedLenDelta int64 expectedByteSizeDelta int64 }{ { name: "ShouldSucceed_AndDecrementStats", setupMock: func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) { - q.DrainFunc = func() ([]types.QueueItemAccessor, error) { - return items, nil + q.DrainFunc = func() []types.QueueItemAccessor { + return items } }, - expectErr: false, expectedLenDelta: -2, expectedByteSizeDelta: -125, }, - { - name: "ShouldFail_AndNotChangeStats_WhenUnderlyingQueueFails", - setupMock: func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) { - q.DrainFunc = func() ([]types.QueueItemAccessor, error) { - return nil, errors.New("drain failed") - } - }, - expectErr: true, - expectedLenDelta: 0, - expectedByteSizeDelta: 0, - }, } for _, tc := range testCases { @@ -368,13 +324,7 @@ func TestManagedQueue_Drain(t *testing.T) { h.setupWithItems(items...) tc.setupMock(q, items) - _, err := h.mq.Drain() - - if tc.expectErr { - require.Error(t, err, "Drain operation must fail if the underlying queue implementation encounters an error") - } else { - require.NoError(t, err, "Drain operation must succeed if the underlying queue implementation succeeds") - } + h.mq.Drain() assert.Equal(t, tc.expectedLenDelta, h.propagator.lenDelta.Load(), "The propagated length delta must exactly match the total number of items drained") assert.Equal(t, tc.expectedByteSizeDelta, h.propagator.byteSizeDelta.Load(), @@ -401,8 +351,8 @@ func TestManagedQueue_FlowQueueAccessor(t *testing.T) { accessor := harness.mq.FlowQueueAccessor() require.NotNil(t, accessor, "FlowQueueAccessor must return a non-nil instance (guaranteed by contract)") - assert.Equal(t, harness.mq.Name(), accessor.Name(), "Accessor Name() must proxy the underlying queue's name") - assert.Equal(t, harness.mq.Capabilities(), accessor.Capabilities(), + assert.Equal(t, harness.mq.queue.Name(), accessor.Name(), "Accessor Name() must proxy the underlying queue's name") + assert.Equal(t, harness.mq.queue.Capabilities(), accessor.Capabilities(), "Accessor Capabilities() must proxy the underlying queue's capabilities") assert.Equal(t, harness.mq.Len(), accessor.Len(), "Accessor Len() must reflect the managed queue's current length") assert.Equal(t, harness.mq.ByteSize(), accessor.ByteSize(), @@ -410,15 +360,11 @@ func TestManagedQueue_FlowQueueAccessor(t *testing.T) { assert.Equal(t, flowKey, accessor.FlowKey(), "Accessor FlowKey() must return the correct identifier for the flow") assert.Equal(t, harness.mockPolicy.Comparator(), accessor.Comparator(), "Accessor Comparator() must return the comparator provided by the configured intra-flow policy") - assert.Equal(t, harness.mockPolicy.Comparator(), harness.mq.Comparator(), - "ManagedQueue Comparator() must also return the comparator provided by the configured intra-flow policy") - peekedHead, err := accessor.PeekHead() - require.NoError(t, err, "Accessor PeekHead() must succeed when the underlying queue succeeds") + peekedHead := accessor.PeekHead() assert.Same(t, item, peekedHead, "Accessor PeekHead() must return the exact item instance at the head") - peekedTail, err := accessor.PeekTail() - require.NoError(t, err, "Accessor PeekTail() must succeed when the underlying queue succeeds") + peekedTail := accessor.PeekTail() assert.Same(t, item, peekedTail, "Accessor PeekTail() must return the exact item instance at the tail") }) @@ -426,14 +372,10 @@ func TestManagedQueue_FlowQueueAccessor(t *testing.T) { t.Parallel() flowKey := types.FlowKey{ID: "flow", Priority: 1} q := &frameworkmocks.MockSafeQueue{} - expectedErr := errors.New("queue is empty") - q.PeekHeadErrV = expectedErr + q.PeekHeadV = nil harness := newMockedMqHarness(t, q, flowKey) accessor := harness.mq.FlowQueueAccessor() - - _, err := accessor.PeekHead() - require.Error(t, err, "Accessor PeekHead() should return an error on an empty queue") - assert.ErrorIs(t, err, expectedErr, "Accessor should proxy the specific error from the underlying queue") + assert.Nil(t, accessor.PeekHead(), "Accessor PeekHead() should return an nil on an empty queue") }) } @@ -470,9 +412,7 @@ func TestManagedQueue_Concurrency_StatsIntegrity(t *testing.T) { // After all operations, the queue should ideally be empty, but we drain any remaining items to get a definitive final // state. - _, err := h.mq.Drain() - require.NoError(t, err, "Final drain operation must succeed to finalize the test state") - + h.mq.Drain() assert.Zero(t, h.mq.Len(), "Final queue length must be zero after draining all remaining items") assert.Zero(t, h.mq.ByteSize(), "Final queue byte size must be zero after draining all remaining items") assert.Equal(t, int64(0), h.propagator.lenDelta.Load(), @@ -488,7 +428,7 @@ func TestManagedQueue_InvariantPanics_OnUnderflow(t *testing.T) { flowKey := types.FlowKey{ID: "flow", Priority: 1} item := typesmocks.NewMockQueueItemAccessor(100, "req", flowKey) q := &frameworkmocks.MockSafeQueue{} - q.AddFunc = func(types.QueueItemAccessor) error { return nil } + q.AddFunc = func(types.QueueItemAccessor) {} q.RemoveFunc = func(types.QueueItemHandle) (types.QueueItemAccessor, error) { return item, nil }