diff --git a/pkg/epp/flowcontrol/contracts/mocks/mocks.go b/pkg/epp/flowcontrol/contracts/mocks/mocks.go index 10f093b11..49205ee1e 100644 --- a/pkg/epp/flowcontrol/contracts/mocks/mocks.go +++ b/pkg/epp/flowcontrol/contracts/mocks/mocks.go @@ -149,9 +149,9 @@ type MockManagedQueue struct { // RemoveFunc allows a test to completely override the default Remove behavior. RemoveFunc func(handle types.QueueItemHandle) (types.QueueItemAccessor, error) // CleanupFunc allows a test to completely override the default Cleanup behavior. - CleanupFunc func(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) + CleanupFunc func(predicate framework.PredicateFunc) []types.QueueItemAccessor // DrainFunc allows a test to completely override the default Drain behavior. - DrainFunc func() ([]types.QueueItemAccessor, error) + DrainFunc func() []types.QueueItemAccessor // mu protects access to the internal `items` map. mu sync.Mutex @@ -209,7 +209,7 @@ func (m *MockManagedQueue) Remove(handle types.QueueItemHandle) (types.QueueItem } // Cleanup removes items matching a predicate. It checks for a test override before locking. -func (m *MockManagedQueue) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) { +func (m *MockManagedQueue) Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor { if m.CleanupFunc != nil { return m.CleanupFunc(predicate) } @@ -223,11 +223,11 @@ func (m *MockManagedQueue) Cleanup(predicate framework.PredicateFunc) ([]types.Q delete(m.items, handle) } } - return removed, nil + return removed } // Drain removes all items from the queue. It checks for a test override before locking. -func (m *MockManagedQueue) Drain() ([]types.QueueItemAccessor, error) { +func (m *MockManagedQueue) Drain() []types.QueueItemAccessor { if m.DrainFunc != nil { return m.DrainFunc() } @@ -239,7 +239,7 @@ func (m *MockManagedQueue) Drain() ([]types.QueueItemAccessor, error) { drained = append(drained, item) } m.items = make(map[types.QueueItemHandle]types.QueueItemAccessor) - return drained, nil + return drained } func (m *MockManagedQueue) FlowKey() types.FlowKey { return m.FlowKeyV } @@ -268,17 +268,17 @@ func (m *MockManagedQueue) ByteSize() uint64 { } // PeekHead returns the first item found in the mock queue. Note: map iteration order is not guaranteed. -func (m *MockManagedQueue) PeekHead() (types.QueueItemAccessor, error) { +func (m *MockManagedQueue) PeekHead() types.QueueItemAccessor { m.mu.Lock() defer m.mu.Unlock() m.init() for _, item := range m.items { - return item, nil // Return first item found + return item // Return first item found } - return nil, nil // Queue is empty + return nil // Queue is empty } // PeekTail is not implemented for this mock. -func (m *MockManagedQueue) PeekTail() (types.QueueItemAccessor, error) { - return nil, nil +func (m *MockManagedQueue) PeekTail() types.QueueItemAccessor { + return nil } diff --git a/pkg/epp/flowcontrol/contracts/registry.go b/pkg/epp/flowcontrol/contracts/registry.go index fe0b790b9..addc2c1f0 100644 --- a/pkg/epp/flowcontrol/contracts/registry.go +++ b/pkg/epp/flowcontrol/contracts/registry.go @@ -142,17 +142,24 @@ type RegistryShard interface { } // ManagedQueue defines the interface for a flow's queue on a specific shard. -// It acts as a stateful decorator around an underlying `framework.SafeQueue`, augmenting it with statistics tracking. +// It acts as a stateful decorator that *use an underlying framework.SafeQueue, augmenting it with statistics tracking +// and lifecycle awareness (e.g., rejecting adds when a shard is draining). // -// # Conformance -// -// - Implementations MUST be goroutine-safe. -// - All mutating methods MUST ensure that the underlying queue state and the public statistics (`Len`, `ByteSize`) -// are updated as a single atomic transaction. -// - The `Add` method MUST return an error wrapping `ErrShardDraining` if the queue instance belongs to a parent shard -// that is no longer Active. +// Conformance: Implementations MUST be goroutine-safe. type ManagedQueue interface { - framework.SafeQueue + // Add attempts to enqueue an item, performing an atomic check on the parent shard's lifecycle state before adding + // the item to the underlying queue. + // Returns ErrShardDraining if the parent shard is no longer Active. + Add(item types.QueueItemAccessor) error + + // Remove atomically finds and removes an item from the underlying queue using its handle. + Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) + + // Cleanup removes all items from the underlying queue that satisfy the predicate. + Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor + + // Drain removes all items from the underlying queue. + Drain() []types.QueueItemAccessor // FlowQueueAccessor returns a read-only, flow-aware accessor for this queue, used by policy plugins. // Conformance: This method MUST NOT return nil. diff --git a/pkg/epp/flowcontrol/controller/controller.go b/pkg/epp/flowcontrol/controller/controller.go index a11ef26a5..751a420f4 100644 --- a/pkg/epp/flowcontrol/controller/controller.go +++ b/pkg/epp/flowcontrol/controller/controller.go @@ -374,7 +374,7 @@ func (fc *FlowController) selectDistributionCandidates(key types.FlowKey) ([]can "flowKey", key, "shardID", shard.ID()) continue } - candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.ByteSize()}) + candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.FlowQueueAccessor().ByteSize()}) } return nil }) diff --git a/pkg/epp/flowcontrol/controller/controller_test.go b/pkg/epp/flowcontrol/controller/controller_test.go index 917a5a1e5..9f4637446 100644 --- a/pkg/epp/flowcontrol/controller/controller_test.go +++ b/pkg/epp/flowcontrol/controller/controller_test.go @@ -1128,7 +1128,7 @@ func setupRegistryForConcurrency(t *testing.T, numShards int, flowKey types.Flow IntraFlowDispatchPolicyFunc: func(_ types.FlowKey) (framework.IntraFlowDispatchPolicy, error) { return &frameworkmocks.MockIntraFlowDispatchPolicy{ SelectItemFunc: func(qa framework.FlowQueueAccessor) (types.QueueItemAccessor, error) { - return qa.PeekHead() + return qa.PeekHead(), nil }, }, nil }, diff --git a/pkg/epp/flowcontrol/controller/internal/processor.go b/pkg/epp/flowcontrol/controller/internal/processor.go index 2370fd646..7fb74aa26 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor.go +++ b/pkg/epp/flowcontrol/controller/internal/processor.go @@ -408,10 +408,7 @@ func (sp *ShardProcessor) sweepFinalizedItems() { predicate := func(itemAcc types.QueueItemAccessor) bool { return itemAcc.(*FlowItem).FinalState() != nil } - removedItems, err := managedQ.Cleanup(predicate) - if err != nil { - logger.Error(err, "Error during ManagedQueue Cleanup", "flowKey", key) - } + removedItems := managedQ.Cleanup(predicate) logger.V(logutil.DEBUG).Info("Swept finalized items and released capacity.", "flowKey", key, "count", len(removedItems)) } @@ -449,10 +446,7 @@ func (sp *ShardProcessor) shutdown() { func (sp *ShardProcessor) evictAll() { processFn := func(managedQ contracts.ManagedQueue, logger logr.Logger) { key := managedQ.FlowQueueAccessor().FlowKey() - removedItems, err := managedQ.Drain() - if err != nil { - logger.Error(err, "Error during ManagedQueue Drain", "flowKey", key) - } + removedItems := managedQ.Drain() outcome := types.QueueOutcomeEvictedOther errShutdown := fmt.Errorf("%w: %w", types.ErrEvicted, types.ErrFlowControllerNotRunning) diff --git a/pkg/epp/flowcontrol/controller/internal/processor_test.go b/pkg/epp/flowcontrol/controller/internal/processor_test.go index 73fc5b13d..a4efd3c5a 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor_test.go +++ b/pkg/epp/flowcontrol/controller/internal/processor_test.go @@ -282,7 +282,7 @@ func (h *testHarness) intraFlowDispatchPolicy(types.FlowKey) (framework.IntraFlo // Otherwise, use a default implementation that selects the head of the queue. policy.SelectItemFunc = func(fqa framework.FlowQueueAccessor) (types.QueueItemAccessor, error) { - return fqa.PeekHead() + return fqa.PeekHead(), nil } return policy, nil } diff --git a/pkg/epp/flowcontrol/framework/errors.go b/pkg/epp/flowcontrol/framework/errors.go index fb19c4aab..79d62acc8 100644 --- a/pkg/epp/flowcontrol/framework/errors.go +++ b/pkg/epp/flowcontrol/framework/errors.go @@ -26,13 +26,6 @@ import ( // and might be handled or wrapped by the `contracts.FlowRegistry`'s `contracts.ManagedQueue` or the // `controller.FlowController`. var ( - // ErrNilQueueItem indicates that a nil `types.QueueItemAccessor` was passed to `SafeQueue.Add()`. - ErrNilQueueItem = errors.New("queue item cannot be nil") - - // ErrQueueEmpty indicates an attempt to perform an operation on an empty `SafeQueue` that requires one or more items - // (e.g., calling `SafeQueue.PeekHead()`). - ErrQueueEmpty = errors.New("queue is empty") - // ErrInvalidQueueItemHandle indicates that a `types.QueueItemHandle` provided to a `SafeQueue` operation (e.g., // `SafeQueue.Remove()`) is not valid for that queue, has been invalidated, or does not correspond to an actual item // in the queue. diff --git a/pkg/epp/flowcontrol/framework/mocks/mocks.go b/pkg/epp/flowcontrol/framework/mocks/mocks.go index ff8441fde..29eead472 100644 --- a/pkg/epp/flowcontrol/framework/mocks/mocks.go +++ b/pkg/epp/flowcontrol/framework/mocks/mocks.go @@ -38,9 +38,7 @@ type MockFlowQueueAccessor struct { LenV int ByteSizeV uint64 PeekHeadV types.QueueItemAccessor - PeekHeadErrV error PeekTailV types.QueueItemAccessor - PeekTailErrV error FlowKeyV types.FlowKey ComparatorV framework.ItemComparator CapabilitiesV []framework.QueueCapability @@ -53,12 +51,12 @@ func (m *MockFlowQueueAccessor) Comparator() framework.ItemComparator { ret func (m *MockFlowQueueAccessor) FlowKey() types.FlowKey { return m.FlowKeyV } func (m *MockFlowQueueAccessor) Capabilities() []framework.QueueCapability { return m.CapabilitiesV } -func (m *MockFlowQueueAccessor) PeekHead() (types.QueueItemAccessor, error) { - return m.PeekHeadV, m.PeekHeadErrV +func (m *MockFlowQueueAccessor) PeekHead() types.QueueItemAccessor { + return m.PeekHeadV } -func (m *MockFlowQueueAccessor) PeekTail() (types.QueueItemAccessor, error) { - return m.PeekTailV, m.PeekTailErrV +func (m *MockFlowQueueAccessor) PeekTail() types.QueueItemAccessor { + return m.PeekTailV } var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{} @@ -108,13 +106,11 @@ type MockSafeQueue struct { LenV int ByteSizeV uint64 PeekHeadV types.QueueItemAccessor - PeekHeadErrV error PeekTailV types.QueueItemAccessor - PeekTailErrV error - AddFunc func(item types.QueueItemAccessor) error + AddFunc func(item types.QueueItemAccessor) RemoveFunc func(handle types.QueueItemHandle) (types.QueueItemAccessor, error) - CleanupFunc func(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) - DrainFunc func() ([]types.QueueItemAccessor, error) + CleanupFunc func(predicate framework.PredicateFunc) []types.QueueItemAccessor + DrainFunc func() []types.QueueItemAccessor } func (m *MockSafeQueue) Name() string { return m.NameV } @@ -122,19 +118,18 @@ func (m *MockSafeQueue) Capabilities() []framework.QueueCapability { return m.Ca func (m *MockSafeQueue) Len() int { return m.LenV } func (m *MockSafeQueue) ByteSize() uint64 { return m.ByteSizeV } -func (m *MockSafeQueue) PeekHead() (types.QueueItemAccessor, error) { - return m.PeekHeadV, m.PeekHeadErrV +func (m *MockSafeQueue) PeekHead() types.QueueItemAccessor { + return m.PeekHeadV } -func (m *MockSafeQueue) PeekTail() (types.QueueItemAccessor, error) { - return m.PeekTailV, m.PeekTailErrV +func (m *MockSafeQueue) PeekTail() types.QueueItemAccessor { + return m.PeekTailV } -func (m *MockSafeQueue) Add(item types.QueueItemAccessor) error { +func (m *MockSafeQueue) Add(item types.QueueItemAccessor) { if m.AddFunc != nil { - return m.AddFunc(item) + m.AddFunc(item) } - return nil } func (m *MockSafeQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) { @@ -144,18 +139,18 @@ func (m *MockSafeQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAcc return nil, nil } -func (m *MockSafeQueue) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) { +func (m *MockSafeQueue) Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor { if m.CleanupFunc != nil { return m.CleanupFunc(predicate) } - return nil, nil + return nil } -func (m *MockSafeQueue) Drain() ([]types.QueueItemAccessor, error) { +func (m *MockSafeQueue) Drain() []types.QueueItemAccessor { if m.DrainFunc != nil { return m.DrainFunc() } - return nil, nil + return nil } var _ framework.SafeQueue = &MockSafeQueue{} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go index 41033e755..ae4cbd606 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go @@ -68,8 +68,8 @@ func (p *bestHead) SelectQueue(band framework.PriorityBandAccessor) (framework.F return true } - item, err := queue.PeekHead() - if err != nil || item == nil { + item := queue.PeekHead() + if item == nil { return true } diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go index 5971d3f0c..288560366 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go @@ -17,7 +17,6 @@ limitations under the License. package besthead import ( - "errors" "testing" "time" @@ -105,10 +104,10 @@ func TestBestHead_SelectQueue(t *testing.T) { ComparatorV: newTestComparator(), } queueEmpty := &frameworkmocks.MockFlowQueueAccessor{ - LenV: 0, - PeekHeadErrV: framework.ErrQueueEmpty, - FlowKeyV: types.FlowKey{ID: "flowEmpty"}, - ComparatorV: newTestComparator(), + LenV: 0, + PeekHeadV: nil, + FlowKeyV: types.FlowKey{ID: "flowEmpty"}, + ComparatorV: newTestComparator(), } testCases := []struct { @@ -151,19 +150,6 @@ func TestBestHead_SelectQueue(t *testing.T) { ), expectedErr: framework.ErrIncompatiblePriorityType, }, - { - name: "QueuePeekHeadErrors", - band: newTestBand( - &frameworkmocks.MockFlowQueueAccessor{ - LenV: 1, - PeekHeadErrV: errors.New("peek error"), - FlowKeyV: flow1Key, - ComparatorV: newTestComparator(), - }, - queue2, - ), - expectedQueueID: flow2ID, - }, { name: "QueueComparatorIsNil", band: newTestBand( @@ -195,10 +181,10 @@ func TestBestHead_SelectQueue(t *testing.T) { band: newTestBand( queueEmpty, &frameworkmocks.MockFlowQueueAccessor{ - LenV: 0, - PeekHeadErrV: framework.ErrQueueEmpty, - FlowKeyV: types.FlowKey{ID: "flowEmpty2"}, - ComparatorV: newTestComparator(), + LenV: 0, + PeekHeadV: nil, + FlowKeyV: types.FlowKey{ID: "flowEmpty2"}, + ComparatorV: newTestComparator(), }, ), }, diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go index 7649b324e..3dba8b7ea 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go @@ -63,9 +63,9 @@ func runSelectQueueConformanceTests(t *testing.T, policy framework.InterFlowDisp flowIDEmpty := "flow-empty" mockQueueEmpty := &frameworkmocks.MockFlowQueueAccessor{ - LenV: 0, - PeekHeadErrV: framework.ErrQueueEmpty, - FlowKeyV: types.FlowKey{ID: flowIDEmpty}, + LenV: 0, + PeekHeadV: nil, + FlowKeyV: types.FlowKey{ID: flowIDEmpty}, } testCases := []struct { diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go index 7addb9d13..63ffe31b9 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go @@ -18,8 +18,6 @@ limitations under the License. package fcfs import ( - "errors" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" @@ -86,11 +84,7 @@ func (p *fcfs) SelectItem(queue framework.FlowQueueAccessor) (types.QueueItemAcc if queue == nil { return nil, nil } - item, err := queue.PeekHead() - if errors.Is(err, framework.ErrQueueEmpty) { - return nil, nil - } - return item, err + return queue.PeekHead(), nil } // Comparator returns a `framework.ItemComparator` based on enqueue time. diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go index 01e8a1c8e..4088b4da2 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go @@ -22,7 +22,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" _ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs" @@ -66,8 +65,8 @@ func TestIntraFlowDispatchPolicyConformance(t *testing.T) { t.Run("SelectItemFromEmptyQueue", func(t *testing.T) { t.Parallel() mockQueue := &frameworkmocks.MockFlowQueueAccessor{ - PeekHeadErrV: framework.ErrQueueEmpty, - LenV: 0, + PeekHeadV: nil, + LenV: 0, } item, err := policy.SelectItem(mockQueue) require.NoError(t, err, "SelectItem from an empty queue for %s should not return an error", policyName) diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go index 24f4a9238..b9461ebf1 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go @@ -70,11 +70,8 @@ func benchmarkAddRemove(b *testing.B, q framework.SafeQueue) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) - err := q.Add(item) - if err != nil { - b.Fatalf("Add failed: %v", err) - } - _, err = q.Remove(item.Handle()) + q.Add(item) + _, err := q.Remove(item.Handle()) if err != nil { b.Fatalf("Remove failed: %v", err) } @@ -87,27 +84,21 @@ func benchmarkAddRemove(b *testing.B, q framework.SafeQueue) { func benchmarkAddPeekRemove(b *testing.B, q framework.SafeQueue) { // Pre-add one item so PeekHead doesn't fail on the first iteration. initialItem := mocks.NewMockQueueItemAccessor(1, "initial", benchmarkFlowKey) - if err := q.Add(initialItem); err != nil { - b.Fatalf("Failed to add initial item: %v", err) - } + q.Add(initialItem) b.ReportAllocs() for b.Loop() { item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) - err := q.Add(item) - if err != nil { - b.Fatalf("Add failed: %v", err) - } - - peeked, err := q.PeekHead() - if err != nil { + q.Add(item) + peeked := q.PeekHead() + if peeked == nil { // In a concurrent benchmark, this could happen if the queue becomes empty. // In a serial one, it's a fatal error. - b.Fatalf("PeekHead failed: %v", err) + b.Fatal("PeekHead failed") } - _, err = q.Remove(peeked.Handle()) + _, err := q.Remove(peeked.Handle()) if err != nil { b.Fatalf("Remove failed: %v", err) } @@ -125,16 +116,14 @@ func benchmarkBulkAddThenBulkRemove(b *testing.B, q framework.SafeQueue) { for j := range items { item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("bulk-%d-%d", i, j), benchmarkFlowKey) items[j] = item - if err := q.Add(item); err != nil { - b.Fatalf("Add failed: %v", err) - } + q.Add(item) } // Remove the same number of items for range items { - peeked, err := q.PeekHead() - if err != nil { - b.Fatalf("PeekHead failed: %v", err) + peeked := q.PeekHead() + if peeked == nil { + b.Fatal("PeekHead failed") } if _, err := q.Remove(peeked.Handle()); err != nil { b.Fatalf("Remove failed: %v", err) @@ -148,25 +137,20 @@ func benchmarkBulkAddThenBulkRemove(b *testing.B, q framework.SafeQueue) { func benchmarkAddPeekTailRemove(b *testing.B, q framework.SafeQueue) { // Pre-add one item so PeekTail doesn't fail on the first iteration. initialItem := mocks.NewMockQueueItemAccessor(1, "initial", benchmarkFlowKey) - if err := q.Add(initialItem); err != nil { - b.Fatalf("Failed to add initial item: %v", err) - } + q.Add(initialItem) b.ReportAllocs() for b.Loop() { item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) - err := q.Add(item) - if err != nil { - b.Fatalf("Add failed: %v", err) - } + q.Add(item) - peeked, err := q.PeekTail() - if err != nil { - b.Fatalf("PeekTail failed: %v", err) + peeked := q.PeekTail() + if peeked == nil { + b.Fatal("PeekTail failed") } - _, err = q.Remove(peeked.Handle()) + _, err := q.Remove(peeked.Handle()) if err != nil { b.Fatalf("Remove failed: %v", err) } @@ -179,9 +163,7 @@ func benchmarkHighContention(b *testing.B, q framework.SafeQueue) { // Pre-fill the queue to ensure consumers have work to do immediately. for i := range 1000 { item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("prefill-%d", i), benchmarkFlowKey) - if err := q.Add(item); err != nil { - b.Fatalf("Failed to pre-fill queue: %v", err) - } + q.Add(item) } stopCh := make(chan struct{}) @@ -198,7 +180,7 @@ func benchmarkHighContention(b *testing.B, q framework.SafeQueue) { return default: item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) - _ = q.Add(item) + q.Add(item) } } }() @@ -210,8 +192,8 @@ func benchmarkHighContention(b *testing.B, q framework.SafeQueue) { // Consumers drive the benchmark. b.RunParallel(func(pb *testing.PB) { for pb.Next() { - peeked, err := q.PeekHead() - if err == nil { + peeked := q.PeekHead() + if peeked != nil { _, _ = q.Remove(peeked.Handle()) } } diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go index d632c1dc0..2282b8efd 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go @@ -74,22 +74,16 @@ func testLifecycleAndOrdering( t.Helper() // PeekHead/PeekTail on empty queue - peeked, err := q.PeekHead() - assert.ErrorIs(t, err, framework.ErrQueueEmpty, - "[%s] PeekHead on empty queue should return ErrQueueEmpty", comparatorName) + peeked := q.PeekHead() assert.Nil(t, peeked, "[%s] PeekHead on empty queue should return a nil item", comparatorName) - peeked, err = q.PeekTail() - assert.ErrorIs(t, err, framework.ErrQueueEmpty, - "[%s] PeekTail on empty queue should return ErrQueueEmpty", comparatorName) + peeked = q.PeekTail() assert.Nil(t, peeked, "[%s] PeekTail on empty queue should return a nil item", comparatorName) // Add items currentExpectedLen := 0 var currentExpectedByteSize uint64 for i, item := range itemsInOrder { - addErr := q.Add(item) - require.NoError(t, addErr, "[%s] Add should not fail for a valid item (item %d, ID: %s)", - comparatorName, i, item.OriginalRequest().ID()) + q.Add(item) require.NotNil(t, item.Handle(), "[%s] Add must assign a non-nil handle to the item (item %d, ID: %s)", comparatorName, i, item.OriginalRequest().ID()) require.False(t, item.Handle().IsInvalidated(), @@ -121,8 +115,7 @@ func testLifecycleAndOrdering( expectedByteSize := expectedTotalByteSize for i, expectedItem := range itemsInOrder { // Verify PeekHead - peeked, err = q.PeekHead() - require.NoError(t, err, "[%s] PeekHead should not error on a non-empty queue (iteration %d)", comparatorName, i) + peeked = q.PeekHead() require.NotNil(t, peeked, "[%s] PeekHead should return a non-nil item (iteration %d)", comparatorName, i) assert.Equal(t, expectedItem.OriginalRequest().ID(), peeked.OriginalRequest().ID(), "[%s] PeekHead must return the item (ID: %s) at the head of the queue (iteration %d)", @@ -137,8 +130,7 @@ func testLifecycleAndOrdering( "[%s] ByteSize() must be unchanged after PeekHead (iteration %d)", comparatorName, i) // Verify PeekTail - peekedTail, err := q.PeekTail() - require.NoError(t, err, "[%s] PeekTail should not error on a non-empty queue (iteration %d)", comparatorName, i) + peekedTail := q.PeekTail() require.NotNil(t, peekedTail, "[%s] PeekTail should return a non-nil item (iteration %d)", comparatorName, i) // The tail is the last item in the *remaining* ordered slice. expectedTailItem := itemsInOrder[len(itemsInOrder)-1] @@ -166,9 +158,7 @@ func testLifecycleAndOrdering( assert.Zero(t, q.Len(), "[%s] Queue length should be 0 after all items are removed", comparatorName) assert.Zero(t, q.ByteSize(), "[%s] Queue byte size should be 0 after all items are removed", comparatorName) - peeked, err = q.PeekHead() - assert.ErrorIs(t, err, framework.ErrQueueEmpty, "[%s] PeekHead on an empty queue should return ErrQueueEmpty again", - comparatorName) + peeked = q.PeekHead() assert.Nil(t, peeked, "[%s] PeekHead on an empty queue should return a nil item again", comparatorName) } @@ -247,33 +237,18 @@ func TestQueueConformance(t *testing.T) { }) } - t.Run("Add_NilItem", func(t *testing.T) { - t.Parallel() - q, err := constructor(enqueueTimeComparator) - require.NoError(t, err, "Setup: creating queue for test should not fail") - - currentLen := q.Len() - currentByteSize := q.ByteSize() - err = q.Add(nil) - assert.ErrorIs(t, err, framework.ErrNilQueueItem, "Add(nil) must return ErrNilQueueItem") - assert.Equal(t, currentLen, q.Len(), "The queue's length must not change after a failed Add") - assert.Equal(t, currentByteSize, q.ByteSize(), "The queue's byte size must not change after a failed Add") - }) - t.Run("Remove_InvalidHandle", func(t *testing.T) { t.Parallel() q, err := constructor(enqueueTimeComparator) require.NoError(t, err, "Setup: creating queue for test should not fail") item := typesmocks.NewMockQueueItemAccessor(100, "item", flowKey) - err = q.Add(item) - require.NoError(t, err, "Setup: adding an item should succeed") + q.Add(item) otherQ, err := constructor(enqueueTimeComparator) // A different queue instance require.NoError(t, err, "Setup: creating otherQ should succeed") otherItem := typesmocks.NewMockQueueItemAccessor(10, "other_item", types.FlowKey{ID: "other-flow"}) - err = otherQ.Add(otherItem) - require.NoError(t, err, "Setup: adding item to otherQ should succeed") + otherQ.Add(otherItem) alienHandle := otherItem.Handle() require.NotNil(t, alienHandle, "Setup: alien handle should not be nil") @@ -322,9 +297,9 @@ func TestQueueConformance(t *testing.T) { item3 := typesmocks.NewMockQueueItemAccessor(30, "item3_nonhead", flowKey) item3.EnqueueTimeV = now.Add(-1 * time.Second) - _ = q.Add(item1) - _ = q.Add(item2) - _ = q.Add(item3) + q.Add(item1) + q.Add(item2) + q.Add(item3) require.Equal(t, 3, q.Len(), "Queue should have 3 items before removing non-head") handleNonHead := item2.Handle() @@ -351,8 +326,7 @@ func TestQueueConformance(t *testing.T) { t.Run("Cleanup_EmptyQueue", func(t *testing.T) { t.Parallel() emptyQ, _ := constructor(enqueueTimeComparator) - cleanedItems, err := emptyQ.Cleanup(predicateRemoveOddSizes) - require.NoError(t, err, "Cleanup on an empty queue should not return an error") + cleanedItems := emptyQ.Cleanup(predicateRemoveOddSizes) assert.Empty(t, cleanedItems, "Cleanup on an empty queue should return an empty slice") assert.Zero(t, emptyQ.Len(), "Len() should be 0 after Cleanup on an empty queue") assert.Zero(t, emptyQ.ByteSize(), "ByteSize() should be 0 after Cleanup on an empty queue") @@ -363,13 +337,12 @@ func TestQueueConformance(t *testing.T) { q, _ := constructor(enqueueTimeComparator) itemK1 := typesmocks.NewMockQueueItemAccessor(10, "k1_matchNone", flowKey) itemK2 := typesmocks.NewMockQueueItemAccessor(12, "k2_matchNone", flowKey) - _ = q.Add(itemK1) - _ = q.Add(itemK2) + q.Add(itemK1) + q.Add(itemK2) initialLen := q.Len() initialBs := q.ByteSize() - cleanedItems, err := q.Cleanup(func(item types.QueueItemAccessor) bool { return false }) - require.NoError(t, err, "Cleanup should not return an error") + cleanedItems := q.Cleanup(func(item types.QueueItemAccessor) bool { return false }) assert.Empty(t, cleanedItems, "Cleanup should return an empty slice when no items match the predicate") assert.Equal(t, initialLen, q.Len(), "Len() should not change after Cleanup when no items match thepredicate") assert.Equal(t, initialBs, q.ByteSize(), @@ -383,11 +356,10 @@ func TestQueueConformance(t *testing.T) { q, _ := constructor(enqueueTimeComparator) itemR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_matchAll", flowKey) itemR2 := typesmocks.NewMockQueueItemAccessor(13, "r2_matchAll", flowKey) - _ = q.Add(itemR1) - _ = q.Add(itemR2) + q.Add(itemR1) + q.Add(itemR2) - cleanedItems, err := q.Cleanup(func(item types.QueueItemAccessor) bool { return true }) - require.NoError(t, err, "Cleanup should not return an error") + cleanedItems := q.Cleanup(func(item types.QueueItemAccessor) bool { return true }) assert.Len(t, cleanedItems, 2, "Cleanup should return all items that matched the predicate") assert.Zero(t, q.Len(), "Len() should be 0 after Cleanup") assert.Zero(t, q.ByteSize(), "ByteSize() should be 0 after Cleanup") @@ -402,15 +374,14 @@ func TestQueueConformance(t *testing.T) { iR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_subset", flowKey) iK2 := typesmocks.NewMockQueueItemAccessor(22, "k2_subset", flowKey) iR2 := typesmocks.NewMockQueueItemAccessor(33, "r2_subset", flowKey) - _ = q.Add(iK1) - _ = q.Add(iR1) - _ = q.Add(iK2) - _ = q.Add(iR2) + q.Add(iK1) + q.Add(iR1) + q.Add(iK2) + q.Add(iR2) expectedKeptByteSize := iK1.OriginalRequest().ByteSize() + iK2.OriginalRequest().ByteSize() - cleanedItems, err := q.Cleanup(predicateRemoveOddSizes) - require.NoError(t, err, "Cleanup should not return an error") + cleanedItems := q.Cleanup(predicateRemoveOddSizes) assert.Len(t, cleanedItems, 2, "Cleanup should return 2 items that matched the predicate") assert.Equal(t, 2, q.Len(), "Len() should be 2 after Cleanup") assert.Equal(t, expectedKeptByteSize, q.ByteSize(), "ByteSize() should be sum of kept items after Cleanup") @@ -435,7 +406,7 @@ func TestQueueConformance(t *testing.T) { // Verify remaining items are correct var remainingIDs []string for q.Len() > 0 { - peeked, _ := q.PeekHead() + peeked := q.PeekHead() item, _ := q.Remove(peeked.Handle()) remainingIDs = append(remainingIDs, item.OriginalRequest().ID()) } @@ -452,11 +423,10 @@ func TestQueueConformance(t *testing.T) { itemD1 := typesmocks.NewMockQueueItemAccessor(10, "ditem1", flowKey) itemD2 := typesmocks.NewMockQueueItemAccessor(20, "ditem2", flowKey) - _ = q.Add(itemD1) - _ = q.Add(itemD2) + q.Add(itemD1) + q.Add(itemD2) - drainedItems, err := q.Drain() - require.NoError(t, err, "Drain on a non-empty queue should not fail") + drainedItems := q.Drain() assert.Len(t, drainedItems, 2, "Drain should return all items that were in the queue") assert.Zero(t, q.Len(), "Queue length must be 0 after Drain") assert.Zero(t, q.ByteSize(), "Queue byte size must be 0 after Drain") @@ -482,12 +452,10 @@ func TestQueueConformance(t *testing.T) { q, err := constructor(enqueueTimeComparator) require.NoError(t, err, "Setup: creating queue for empty drain test should not fail") - drainedItems, err := q.Drain() // First drain on empty - require.NoError(t, err, "Drain on an empty queue should not fail") + drainedItems := q.Drain() // First drain on empty assert.Empty(t, drainedItems, "Drain on an empty queue should return an empty slice") - drainedAgain, err := q.Drain() // Second drain on already empty - require.NoError(t, err, "Second drain on an already empty queue should not fail") + drainedAgain := q.Drain() // Second drain on already empty assert.Empty(t, drainedAgain, "Second drain on an already empty queue should return an empty slice") assert.Zero(t, q.Len()) assert.Zero(t, q.ByteSize()) @@ -510,7 +478,7 @@ func TestQueueConformance(t *testing.T) { // Pre-populate the queue with an initial set of items. for i := range initialItems { item := typesmocks.NewMockQueueItemAccessor(1, fmt.Sprintf("%s_conc_init_%d", flowKey, i), flowKey) - err := q.Add(item) + q.Add(item) require.NoError(t, err, "Setup: pre-populating the queue should not fail") handleChan <- item.Handle() } @@ -529,11 +497,9 @@ func TestQueueConformance(t *testing.T) { case 0: // Add item := typesmocks.NewMockQueueItemAccessor(1, fmt.Sprintf("%s_conc_init_%d_%d", flowKey, routineID, j), flowKey) - err := q.Add(item) - if assert.NoError(t, err, "Add must be goroutine-safe") { - successfulAdds.Add(1) - handleChan <- item.Handle() - } + q.Add(item) + successfulAdds.Add(1) + handleChan <- item.Handle() case 1: // Remove select { case handle := <-handleChan: @@ -553,17 +519,16 @@ func TestQueueConformance(t *testing.T) { case 2: // Inspect _ = q.Len() _ = q.ByteSize() - _, err := q.PeekHead() - if q.Len() == 0 { // Only expect ErrQueueEmpty if Len is 0 - assert.ErrorIs(t, err, framework.ErrQueueEmpty, "Peek on empty queue expected ErrQueueEmpty") + peeked := q.PeekHead() + if q.Len() == 0 { + assert.Nil(t, peeked, "PeekHead on empty queue expected nil") } - _, err = q.PeekTail() - if q.Len() == 0 { // Only expect ErrQueueEmpty if Len is 0 - assert.ErrorIs(t, err, framework.ErrQueueEmpty, "PeekTail on empty queue expected ErrQueueEmpty") + peeked = q.PeekTail() + if q.Len() == 0 { + assert.Nil(t, peeked, "PeekTail on empty queue expected nil") } case 3: // Cleanup - _, cleanupErr := q.Cleanup(func(item types.QueueItemAccessor) bool { return false }) - assert.NoError(t, cleanupErr, "Cleanup (no-op) should be goroutine-safe") + q.Cleanup(func(item types.QueueItemAccessor) bool { return false }) } } }(i) @@ -573,8 +538,7 @@ func TestQueueConformance(t *testing.T) { close(handleChan) // Drain the queue to verify all handles are invalidated and to count remaining items accurately. - drainedItems, drainErr := q.Drain() - require.NoError(t, drainErr, "Draining queue at the end of concurrency test should not fail") + drainedItems := q.Drain() for _, item := range drainedItems { require.True(t, item.Handle().IsInvalidated(), "All handles from final drain must be invalidated") diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/listqueue.go b/pkg/epp/flowcontrol/framework/plugins/queue/listqueue.go index 082510e71..0d241cf97 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/listqueue.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/listqueue.go @@ -102,18 +102,13 @@ func newListQueue() *listQueue { // --- `framework.SafeQueue` Interface Implementation --- // Add enqueues an item to the back of the list. -func (lq *listQueue) Add(item types.QueueItemAccessor) error { +func (lq *listQueue) Add(item types.QueueItemAccessor) { lq.mu.Lock() defer lq.mu.Unlock() - if item == nil { - return framework.ErrNilQueueItem - } - element := lq.requests.PushBack(item) lq.byteSize.Add(item.OriginalRequest().ByteSize()) item.SetHandle(&listItemHandle{element: element, owner: lq}) - return nil } // Remove removes an item identified by the given handle from the queue. @@ -142,7 +137,7 @@ func (lq *listQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccess } // Cleanup removes items from the queue that satisfy the predicate. -func (lq *listQueue) Cleanup(predicate framework.PredicateFunc) (cleanedItems []types.QueueItemAccessor, err error) { +func (lq *listQueue) Cleanup(predicate framework.PredicateFunc) (cleanedItems []types.QueueItemAccessor) { lq.mu.Lock() defer lq.mu.Unlock() @@ -162,11 +157,11 @@ func (lq *listQueue) Cleanup(predicate framework.PredicateFunc) (cleanedItems [] removedItems = append(removedItems, item) } } - return removedItems, nil + return removedItems } // Drain removes all items from the queue and returns them. -func (lq *listQueue) Drain() (removedItems []types.QueueItemAccessor, err error) { +func (lq *listQueue) Drain() (removedItems []types.QueueItemAccessor) { lq.mu.Lock() defer lq.mu.Unlock() @@ -182,7 +177,7 @@ func (lq *listQueue) Drain() (removedItems []types.QueueItemAccessor, err error) lq.requests.Init() lq.byteSize.Store(0) - return removedItems, nil + return removedItems } // Name returns the name of the queue. @@ -208,25 +203,25 @@ func (lq *listQueue) ByteSize() uint64 { } // PeekHead returns the item at the front of the queue without removing it. -func (lq *listQueue) PeekHead() (head types.QueueItemAccessor, err error) { +func (lq *listQueue) PeekHead() types.QueueItemAccessor { lq.mu.RLock() defer lq.mu.RUnlock() if lq.requests.Len() == 0 { - return nil, framework.ErrQueueEmpty + return nil } element := lq.requests.Front() - return element.Value.(types.QueueItemAccessor), nil + return element.Value.(types.QueueItemAccessor) } // PeekTail returns the item at the back of the queue without removing it. -func (lq *listQueue) PeekTail() (tail types.QueueItemAccessor, err error) { +func (lq *listQueue) PeekTail() types.QueueItemAccessor { lq.mu.RLock() defer lq.mu.RUnlock() if lq.requests.Len() == 0 { - return nil, framework.ErrQueueEmpty + return nil } element := lq.requests.Back() - return element.Value.(types.QueueItemAccessor), nil + return element.Value.(types.QueueItemAccessor) } diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap.go b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap.go index 41a15fa73..4a3bc8773 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap.go @@ -115,56 +115,51 @@ func (h *maxMinHeap) ByteSize() uint64 { // PeekHead returns the item with the highest priority (max value) without removing it. // Time complexity: O(1). -func (h *maxMinHeap) PeekHead() (types.QueueItemAccessor, error) { +func (h *maxMinHeap) PeekHead() types.QueueItemAccessor { h.mu.RLock() defer h.mu.RUnlock() if len(h.items) == 0 { - return nil, framework.ErrQueueEmpty + return nil } // The root of the max-min heap is always the maximum element. - return h.items[0], nil + return h.items[0] } // PeekTail returns the item with the lowest priority (min value) without removing it. // Time complexity: O(1). -func (h *maxMinHeap) PeekTail() (types.QueueItemAccessor, error) { +func (h *maxMinHeap) PeekTail() types.QueueItemAccessor { h.mu.RLock() defer h.mu.RUnlock() n := len(h.items) if n == 0 { - return nil, framework.ErrQueueEmpty + return nil } if n == 1 { - return h.items[0], nil + return h.items[0] } if n == 2 { // With two items, the root is max, the second is min. - return h.items[1], nil + return h.items[1] } // With three or more items, the minimum element is guaranteed to be one of the two children of the root (at indices 1 // and 2). We must compare them to find the true minimum. if h.comparator.Func()(h.items[1], h.items[2]) { - return h.items[2], nil + return h.items[2] } - return h.items[1], nil + return h.items[1] } // Add adds an item to the queue. // Time complexity: O(log n). -func (h *maxMinHeap) Add(item types.QueueItemAccessor) error { +func (h *maxMinHeap) Add(item types.QueueItemAccessor) { h.mu.Lock() defer h.mu.Unlock() - if item == nil { - return framework.ErrNilQueueItem - } - h.push(item) h.byteSize.Add(item.OriginalRequest().ByteSize()) - return nil } // push adds an item to the heap and restores the heap property. @@ -425,7 +420,7 @@ func isMinLevel(i int) bool { } // Cleanup removes items from the queue that satisfy the predicate. -func (h *maxMinHeap) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) { +func (h *maxMinHeap) Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor { h.mu.Lock() defer h.mu.Unlock() @@ -459,11 +454,11 @@ func (h *maxMinHeap) Cleanup(predicate framework.PredicateFunc) ([]types.QueueIt } } - return removedItems, nil + return removedItems } // Drain removes all items from the queue. -func (h *maxMinHeap) Drain() ([]types.QueueItemAccessor, error) { +func (h *maxMinHeap) Drain() []types.QueueItemAccessor { h.mu.Lock() defer h.mu.Unlock() @@ -482,5 +477,5 @@ func (h *maxMinHeap) Drain() ([]types.QueueItemAccessor, error) { h.handles = make(map[types.QueueItemHandle]*heapItem) h.byteSize.Store(0) - return drainedItems, nil + return drainedItems } diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap_test.go index b4e22ab74..ee6fab708 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap_test.go @@ -39,8 +39,7 @@ func TestMaxMinHeap_InternalProperty(t *testing.T) { // Add items in a somewhat random order of enqueue times items[i] = typesmocks.NewMockQueueItemAccessor(10, "item", types.FlowKey{ID: "flow"}) items[i].EnqueueTimeV = now.Add(time.Duration((i%5-2)*10) * time.Second) - err := q.Add(items[i]) - require.NoError(t, err, "Add should not fail") + q.Add(items[i]) assertHeapProperty(t, q, "after adding item %d", i) } @@ -54,9 +53,9 @@ func TestMaxMinHeap_InternalProperty(t *testing.T) { // Remove remaining items from the head and validate each time for q.Len() > 0 { - head, err := q.PeekHead() - require.NoError(t, err) - _, err = q.Remove(head.Handle()) + head := q.PeekHead() + require.NotNil(t, head) + _, err := q.Remove(head.Handle()) require.NoError(t, err) assertHeapProperty(t, q, "after removing head item") } diff --git a/pkg/epp/flowcontrol/framework/queue.go b/pkg/epp/flowcontrol/framework/queue.go index d00cccfaf..3f33df75c 100644 --- a/pkg/epp/flowcontrol/framework/queue.go +++ b/pkg/epp/flowcontrol/framework/queue.go @@ -20,30 +20,23 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) -// QueueCapability defines a functional capability that a `SafeQueue` implementation can provide. -// These capabilities allow policies (like `IntraFlowDispatchPolicy`) to declare their operational requirements, -// ensuring that a policy is always paired with a compatible queue. -// -// For example, a policy that requires a priority-ordered queue would declare `CapabilityPriorityConfigurable`, and the -// `contracts.FlowRegistry` would ensure it is paired with a queue implementation (like a heap) that provides this -// capability. -// -// While a simpler boolean method (e.g., `IsPriorityConfigurable()`) could satisfy current needs, this slice-based -// approach is intentionally chosen for future extensibility. It allows a queue to advertise multiple features (e.g., -// dynamic priority reshuffling, size bounds, etc.) without future breaking changes to the `SafeQueue` interface. +// QueueCapability defines a functional capability that a SafeQueue implementation can provide. +// These capabilities allow policies to declare their operational requirements, ensuring that a policy is always paired +// with a compatible queue. type QueueCapability string const ( // CapabilityFIFO indicates that the queue operates in a First-In, First-Out manner. - // `PeekHead()` will return the oldest item (by logical enqueue time). + // PeekHead() will return the oldest item (by logical enqueue time). CapabilityFIFO QueueCapability = "FIFO" - // CapabilityPriorityConfigurable indicates that the queue's ordering is determined by an `ItemComparator`. - // `PeekHead()` will return the highest priority item according to this comparator. + // CapabilityPriorityConfigurable indicates that the queue's ordering is determined by an ItemComparator. + // PeekHead() will return the highest priority item, and PeekTail() will return the lowest priority item according to + // this comparator. CapabilityPriorityConfigurable QueueCapability = "PriorityConfigurable" ) -// QueueInspectionMethods defines `SafeQueue`'s read-only methods. +// QueueInspectionMethods defines SafeQueue's read-only methods. type QueueInspectionMethods interface { // Name returns a string identifier for the concrete queue implementation type (e.g., "ListQueue"). Name() string @@ -59,48 +52,44 @@ type QueueInspectionMethods interface { // PeekHead returns the item at the "head" of the queue (the item with the highest priority according to the queue's // ordering) without removing it. - // - // Returns `ErrQueueEmpty` if the queue is empty. - PeekHead() (peekedItem types.QueueItemAccessor, err error) + // Returns nil if the queue is empty. + PeekHead() types.QueueItemAccessor // PeekTail returns the item at the "tail" of the queue (the item with the lowest priority according to the queue's // ordering) without removing it. - // - // Returns `ErrQueueEmpty` if the queue is empty. - PeekTail() (peekedItem types.QueueItemAccessor, err error) + // Returns nil if the queue is empty. + PeekTail() types.QueueItemAccessor } // PredicateFunc defines a function that returns true if a given item matches a certain condition. -// It is used by `SafeQueue.Cleanup` to filter items. +// It is used by SafeQueue.Cleanup to filter items. type PredicateFunc func(item types.QueueItemAccessor) bool // SafeQueue defines the contract for a single, concurrent-safe queue implementation. -// +// This interface is designed for in-memory, synchronous flow control. Implementations are expected to be unbounded; +// capacity management occurs outside the queue implementation. // All implementations MUST be goroutine-safe. type SafeQueue interface { QueueInspectionMethods - // Add attempts to enqueue an item. On success, it must associate a new, unique `types.QueueItemHandle` with the item - // by calling `item.SetHandle()`. - Add(item types.QueueItemAccessor) error + // Add enqueues an item. It must associate a new, unique types.QueueItemHandle with the item by calling + // item.SetHandle(). + // Contract: The caller MUST NOT provide a nil item. + Add(item types.QueueItemAccessor) // Remove atomically finds and removes the item identified by the given handle. - // - // On success, implementations MUST invalidate the provided handle by calling `handle.Invalidate()`. - // + // On success, implementations MUST invalidate the provided handle by calling handle.Invalidate(). // Returns the removed item. - // Returns `ErrInvalidQueueItemHandle` if the handle is invalid (e.g., nil, wrong type, already invalidated). - // Returns `ErrQueueItemNotFound` if the handle is valid but the item is not in the queue. + // Returns ErrInvalidQueueItemHandle if the handle is invalid. + // Returns ErrQueueItemNotFound if the handle is valid but the item is not in the queue. Remove(handle types.QueueItemHandle) (removedItem types.QueueItemAccessor, err error) // Cleanup iterates through the queue and atomically removes all items for which the predicate returns true, returning // them in a slice. - // // The handle for each removed item MUST be invalidated. - Cleanup(predicate PredicateFunc) (cleanedItems []types.QueueItemAccessor, err error) + Cleanup(predicate PredicateFunc) (cleanedItems []types.QueueItemAccessor) // Drain atomically removes all items from the queue and returns them in a slice. - // // The handle for all removed items MUST be invalidated. The queue MUST be empty after this operation. - Drain() (drainedItems []types.QueueItemAccessor, err error) + Drain() (drainedItems []types.QueueItemAccessor) } diff --git a/pkg/epp/flowcontrol/registry/managedqueue.go b/pkg/epp/flowcontrol/registry/managedqueue.go index e56cdd821..b141dff39 100644 --- a/pkg/epp/flowcontrol/registry/managedqueue.go +++ b/pkg/epp/flowcontrol/registry/managedqueue.go @@ -100,7 +100,7 @@ func newManagedQueue( "flowKey", key, "queueType", queue.Name(), ) - mq := &managedQueue{ + return &managedQueue{ queue: queue, dispatchPolicy: dispatchPolicy, key: key, @@ -108,37 +108,30 @@ func newManagedQueue( logger: mqLogger, isDraining: isDraining, } - return mq } // FlowQueueAccessor returns a read-only, flow-aware view of this queue. -// This accessor is primarily used by policy plugins to inspect the queue's state in a structured way. func (mq *managedQueue) FlowQueueAccessor() framework.FlowQueueAccessor { return &flowQueueAccessor{mq: mq} } -// Add wraps the underlying `framework.SafeQueue.Add` call and atomically updates the queue's and the parent shard's -// statistics. +// Add performs an atomic check on the parent shard's lifecycle state before adding the item to the underlying queue. +// This is the critical enforcement point that prevents new requests from entering a draining shard. func (mq *managedQueue) Add(item types.QueueItemAccessor) error { - // Enforce the system's routing contract by rejecting new work for a Draining shard. - // This prevents a race where a caller could route a request to a shard just as it begins to drain. - if mq.isDraining() { - return contracts.ErrShardDraining - } - mq.mu.Lock() defer mq.mu.Unlock() - if err := mq.queue.Add(item); err != nil { - return err + if mq.isDraining() { + return contracts.ErrShardDraining } + mq.queue.Add(item) + mq.propagateStatsDeltaLocked(1, int64(item.OriginalRequest().ByteSize())) mq.logger.V(logging.TRACE).Info("Request added to queue", "requestID", item.OriginalRequest().ID()) return nil } -// Remove wraps the underlying `framework.SafeQueue.Remove` call and atomically updates statistics upon successful -// removal. +// Remove wraps the underlying framework.SafeQueue.Remove and updates statistics. func (mq *managedQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) { mq.mu.Lock() defer mq.mu.Unlock() @@ -152,52 +145,43 @@ func (mq *managedQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAcc return removedItem, nil } -// Cleanup wraps the underlying `framework.SafeQueue.Cleanup` call and atomically updates statistics for all removed -// items. -func (mq *managedQueue) Cleanup(predicate framework.PredicateFunc) (cleanedItems []types.QueueItemAccessor, err error) { +// Cleanup wraps the underlying framework.SafeQueue.Cleanup and updates statistics. +func (mq *managedQueue) Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor { mq.mu.Lock() defer mq.mu.Unlock() - cleanedItems, err = mq.queue.Cleanup(predicate) - if err != nil { - return nil, err - } + cleanedItems := mq.queue.Cleanup(predicate) if len(cleanedItems) == 0 { - return cleanedItems, nil + return nil } mq.propagateStatsDeltaForRemovedItemsLocked(cleanedItems) mq.logger.V(logging.DEBUG).Info("Cleaned up queue", "removedItemCount", len(cleanedItems)) - return cleanedItems, nil + return cleanedItems } -// Drain wraps the underlying `framework.SafeQueue.Drain` call and atomically updates statistics for all removed items. -func (mq *managedQueue) Drain() ([]types.QueueItemAccessor, error) { +// Drain wraps the underlying framework.SafeQueue.Drain and updates statistics. +func (mq *managedQueue) Drain() []types.QueueItemAccessor { mq.mu.Lock() defer mq.mu.Unlock() - drainedItems, err := mq.queue.Drain() - if err != nil { - return nil, err - } + drainedItems := mq.queue.Drain() if len(drainedItems) == 0 { - return drainedItems, nil + return nil } mq.propagateStatsDeltaForRemovedItemsLocked(drainedItems) mq.logger.V(logging.DEBUG).Info("Drained queue", "itemCount", len(drainedItems)) - return drainedItems, nil + return drainedItems } -// --- Pass-through and Accessor Methods --- - -func (mq *managedQueue) Name() string { return mq.queue.Name() } -func (mq *managedQueue) Capabilities() []framework.QueueCapability { return mq.queue.Capabilities() } -func (mq *managedQueue) Len() int { return int(mq.len.Load()) } -func (mq *managedQueue) ByteSize() uint64 { return uint64(mq.byteSize.Load()) } -func (mq *managedQueue) PeekHead() (types.QueueItemAccessor, error) { return mq.queue.PeekHead() } -func (mq *managedQueue) PeekTail() (types.QueueItemAccessor, error) { return mq.queue.PeekTail() } -func (mq *managedQueue) Comparator() framework.ItemComparator { return mq.dispatchPolicy.Comparator() } +// Len returns the current number of items in the queue. +func (mq *managedQueue) Len() int { + return int(mq.len.Load()) +} -// --- Internal Methods --- +// ByteSize returns the current total byte size of all items in the queue. +func (mq *managedQueue) ByteSize() uint64 { + return uint64(mq.byteSize.Load()) +} // propagateStatsDeltaLocked updates the queue's statistics and propagates the delta to the parent shard. // It must be called while holding the `managedQueue.mu` lock. @@ -243,11 +227,18 @@ type flowQueueAccessor struct { var _ framework.FlowQueueAccessor = &flowQueueAccessor{} -func (a *flowQueueAccessor) Name() string { return a.mq.Name() } -func (a *flowQueueAccessor) Capabilities() []framework.QueueCapability { return a.mq.Capabilities() } -func (a *flowQueueAccessor) Len() int { return a.mq.Len() } -func (a *flowQueueAccessor) ByteSize() uint64 { return a.mq.ByteSize() } -func (a *flowQueueAccessor) PeekHead() (types.QueueItemAccessor, error) { return a.mq.PeekHead() } -func (a *flowQueueAccessor) PeekTail() (types.QueueItemAccessor, error) { return a.mq.PeekTail() } -func (a *flowQueueAccessor) Comparator() framework.ItemComparator { return a.mq.Comparator() } -func (a *flowQueueAccessor) FlowKey() types.FlowKey { return a.mq.key } +// --- Read-only pass-through methods to the underlying SafeQueue --- +func (a *flowQueueAccessor) Name() string { return a.mq.queue.Name() } +func (a *flowQueueAccessor) Capabilities() []framework.QueueCapability { + return a.mq.queue.Capabilities() +} +func (a *flowQueueAccessor) PeekHead() types.QueueItemAccessor { return a.mq.queue.PeekHead() } +func (a *flowQueueAccessor) PeekTail() types.QueueItemAccessor { return a.mq.queue.PeekTail() } + +// --- Read-only methods from the managedQueue wrapper --- +func (a *flowQueueAccessor) Len() int { return a.mq.Len() } +func (a *flowQueueAccessor) ByteSize() uint64 { return a.mq.ByteSize() } +func (a *flowQueueAccessor) Comparator() framework.ItemComparator { + return a.mq.dispatchPolicy.Comparator() +} +func (a *flowQueueAccessor) FlowKey() types.FlowKey { return a.mq.key } diff --git a/pkg/epp/flowcontrol/registry/managedqueue_test.go b/pkg/epp/flowcontrol/registry/managedqueue_test.go index b4d32fba4..1e0afaca7 100644 --- a/pkg/epp/flowcontrol/registry/managedqueue_test.go +++ b/pkg/epp/flowcontrol/registry/managedqueue_test.go @@ -132,23 +132,13 @@ func TestManagedQueue_Add(t *testing.T) { { name: "ShouldSucceed_AndIncrementStats", setupMock: func(q *frameworkmocks.MockSafeQueue) { - q.AddFunc = func(types.QueueItemAccessor) error { return nil } + q.AddFunc = func(types.QueueItemAccessor) {} }, isDraining: false, expectErr: false, expectedLenDelta: 1, expectedByteSizeDelta: 100, }, - { - name: "ShouldFail_AndNotChangeStats_WhenUnderlyingQueueFails", - setupMock: func(q *frameworkmocks.MockSafeQueue) { - q.AddFunc = func(types.QueueItemAccessor) error { return errors.New("add failed") } - }, - isDraining: false, - expectErr: true, - expectedLenDelta: 0, - expectedByteSizeDelta: 0, - }, { name: "ShouldFail_AndNotChangeStats_WhenQueueIsDraining", setupMock: func(q *frameworkmocks.MockSafeQueue) {}, @@ -255,40 +245,26 @@ func TestManagedQueue_Cleanup(t *testing.T) { testCases := []struct { name string setupMock func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) - expectErr bool expectedLenDelta int64 expectedByteSizeDelta int64 }{ { name: "ShouldSucceed_AndDecrementStats_WhenItemsRemoved", setupMock: func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) { - q.CleanupFunc = func(_ framework.PredicateFunc) ([]types.QueueItemAccessor, error) { - return items, nil + q.CleanupFunc = func(_ framework.PredicateFunc) []types.QueueItemAccessor { + return items } }, - expectErr: false, expectedLenDelta: -2, expectedByteSizeDelta: -125, }, { name: "ShouldSucceed_AndNotChangeStats_WhenNoItemsRemoved", setupMock: func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) { - q.CleanupFunc = func(_ framework.PredicateFunc) ([]types.QueueItemAccessor, error) { - return nil, nil // Simulate no items matching predicate. + q.CleanupFunc = func(_ framework.PredicateFunc) []types.QueueItemAccessor { + return nil // Simulate no items matching predicate. } }, - expectErr: false, - expectedLenDelta: 0, - expectedByteSizeDelta: 0, - }, - { - name: "ShouldFail_AndNotChangeStats_WhenUnderlyingQueueFails", - setupMock: func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) { - q.CleanupFunc = func(_ framework.PredicateFunc) ([]types.QueueItemAccessor, error) { - return nil, errors.New("cleanup failed") - } - }, - expectErr: true, expectedLenDelta: 0, expectedByteSizeDelta: 0, }, @@ -305,14 +281,7 @@ func TestManagedQueue_Cleanup(t *testing.T) { } h.setupWithItems(items...) tc.setupMock(q, items) - - _, err := h.mq.Cleanup(func(_ types.QueueItemAccessor) bool { return true }) - - if tc.expectErr { - require.Error(t, err, "Cleanup operation must fail if the underlying queue implementation encounters an error") - } else { - require.NoError(t, err, "Cleanup operation must succeed if the underlying queue implementation succeeds") - } + h.mq.Cleanup(func(_ types.QueueItemAccessor) bool { return true }) assert.Equal(t, tc.expectedLenDelta, h.propagator.lenDelta.Load(), "The propagated length delta must exactly match the total number of items removed during cleanup") assert.Equal(t, tc.expectedByteSizeDelta, h.propagator.byteSizeDelta.Load(), @@ -328,32 +297,19 @@ func TestManagedQueue_Drain(t *testing.T) { testCases := []struct { name string setupMock func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) - expectErr bool expectedLenDelta int64 expectedByteSizeDelta int64 }{ { name: "ShouldSucceed_AndDecrementStats", setupMock: func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) { - q.DrainFunc = func() ([]types.QueueItemAccessor, error) { - return items, nil + q.DrainFunc = func() []types.QueueItemAccessor { + return items } }, - expectErr: false, expectedLenDelta: -2, expectedByteSizeDelta: -125, }, - { - name: "ShouldFail_AndNotChangeStats_WhenUnderlyingQueueFails", - setupMock: func(q *frameworkmocks.MockSafeQueue, items []types.QueueItemAccessor) { - q.DrainFunc = func() ([]types.QueueItemAccessor, error) { - return nil, errors.New("drain failed") - } - }, - expectErr: true, - expectedLenDelta: 0, - expectedByteSizeDelta: 0, - }, } for _, tc := range testCases { @@ -368,13 +324,7 @@ func TestManagedQueue_Drain(t *testing.T) { h.setupWithItems(items...) tc.setupMock(q, items) - _, err := h.mq.Drain() - - if tc.expectErr { - require.Error(t, err, "Drain operation must fail if the underlying queue implementation encounters an error") - } else { - require.NoError(t, err, "Drain operation must succeed if the underlying queue implementation succeeds") - } + h.mq.Drain() assert.Equal(t, tc.expectedLenDelta, h.propagator.lenDelta.Load(), "The propagated length delta must exactly match the total number of items drained") assert.Equal(t, tc.expectedByteSizeDelta, h.propagator.byteSizeDelta.Load(), @@ -401,8 +351,8 @@ func TestManagedQueue_FlowQueueAccessor(t *testing.T) { accessor := harness.mq.FlowQueueAccessor() require.NotNil(t, accessor, "FlowQueueAccessor must return a non-nil instance (guaranteed by contract)") - assert.Equal(t, harness.mq.Name(), accessor.Name(), "Accessor Name() must proxy the underlying queue's name") - assert.Equal(t, harness.mq.Capabilities(), accessor.Capabilities(), + assert.Equal(t, harness.mq.queue.Name(), accessor.Name(), "Accessor Name() must proxy the underlying queue's name") + assert.Equal(t, harness.mq.queue.Capabilities(), accessor.Capabilities(), "Accessor Capabilities() must proxy the underlying queue's capabilities") assert.Equal(t, harness.mq.Len(), accessor.Len(), "Accessor Len() must reflect the managed queue's current length") assert.Equal(t, harness.mq.ByteSize(), accessor.ByteSize(), @@ -410,15 +360,11 @@ func TestManagedQueue_FlowQueueAccessor(t *testing.T) { assert.Equal(t, flowKey, accessor.FlowKey(), "Accessor FlowKey() must return the correct identifier for the flow") assert.Equal(t, harness.mockPolicy.Comparator(), accessor.Comparator(), "Accessor Comparator() must return the comparator provided by the configured intra-flow policy") - assert.Equal(t, harness.mockPolicy.Comparator(), harness.mq.Comparator(), - "ManagedQueue Comparator() must also return the comparator provided by the configured intra-flow policy") - peekedHead, err := accessor.PeekHead() - require.NoError(t, err, "Accessor PeekHead() must succeed when the underlying queue succeeds") + peekedHead := accessor.PeekHead() assert.Same(t, item, peekedHead, "Accessor PeekHead() must return the exact item instance at the head") - peekedTail, err := accessor.PeekTail() - require.NoError(t, err, "Accessor PeekTail() must succeed when the underlying queue succeeds") + peekedTail := accessor.PeekTail() assert.Same(t, item, peekedTail, "Accessor PeekTail() must return the exact item instance at the tail") }) @@ -426,14 +372,10 @@ func TestManagedQueue_FlowQueueAccessor(t *testing.T) { t.Parallel() flowKey := types.FlowKey{ID: "flow", Priority: 1} q := &frameworkmocks.MockSafeQueue{} - expectedErr := errors.New("queue is empty") - q.PeekHeadErrV = expectedErr + q.PeekHeadV = nil harness := newMockedMqHarness(t, q, flowKey) accessor := harness.mq.FlowQueueAccessor() - - _, err := accessor.PeekHead() - require.Error(t, err, "Accessor PeekHead() should return an error on an empty queue") - assert.ErrorIs(t, err, expectedErr, "Accessor should proxy the specific error from the underlying queue") + assert.Nil(t, accessor.PeekHead(), "Accessor PeekHead() should return an nil on an empty queue") }) } @@ -470,9 +412,7 @@ func TestManagedQueue_Concurrency_StatsIntegrity(t *testing.T) { // After all operations, the queue should ideally be empty, but we drain any remaining items to get a definitive final // state. - _, err := h.mq.Drain() - require.NoError(t, err, "Final drain operation must succeed to finalize the test state") - + h.mq.Drain() assert.Zero(t, h.mq.Len(), "Final queue length must be zero after draining all remaining items") assert.Zero(t, h.mq.ByteSize(), "Final queue byte size must be zero after draining all remaining items") assert.Equal(t, int64(0), h.propagator.lenDelta.Load(), @@ -488,7 +428,7 @@ func TestManagedQueue_InvariantPanics_OnUnderflow(t *testing.T) { flowKey := types.FlowKey{ID: "flow", Priority: 1} item := typesmocks.NewMockQueueItemAccessor(100, "req", flowKey) q := &frameworkmocks.MockSafeQueue{} - q.AddFunc = func(types.QueueItemAccessor) error { return nil } + q.AddFunc = func(types.QueueItemAccessor) {} q.RemoveFunc = func(types.QueueItemHandle) (types.QueueItemAccessor, error) { return item, nil }