Skip to content

Commit 3e930cb

Browse files
authored
refactor: Flatten Flow Control queue plugin directory structure (#1824)
* refactor: Flatten plugins/queue package structure Moves queue implementations (ListQueue, MaxMinHeap) directly into the `plugins/queue` package, removing the unnecessary nested directories. This simplifies the import paths and project structure. This is a no-op refactoring. * chore: Apply some modernization best practices Changes generated via: ``` go run golang.org/x/tools/gopls/internal/analysis/modernize/cmd/modernize@latest \ -fix \ -test ./pkg/epp/flowcontrol/framework/plugins/queue/... ```
1 parent ae8adbc commit 3e930cb

File tree

8 files changed

+18
-36
lines changed

8 files changed

+18
-36
lines changed

pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package queue_test
17+
package queue
1818

1919
import (
2020
"fmt"
2121
"sync"
2222
"testing"
2323

2424
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
25-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
2625
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2726
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
2827
)
@@ -31,7 +30,7 @@ var benchmarkFlowKey = types.FlowKey{ID: "benchmark-flow"}
3130

3231
// BenchmarkQueues runs a series of benchmarks against all registered queue implementations.
3332
func BenchmarkQueues(b *testing.B) {
34-
for queueName, constructor := range queue.RegisteredQueues {
33+
for queueName, constructor := range RegisteredQueues {
3534
b.Run(string(queueName), func(b *testing.B) {
3635
// All queue implementations must support the default enqueue time comparator.
3736
q, err := constructor(enqueueTimeComparator)
@@ -94,7 +93,7 @@ func benchmarkAddPeekRemove(b *testing.B, q framework.SafeQueue) {
9493

9594
b.ReportAllocs()
9695

97-
for i := 0; i < b.N; i++ {
96+
for b.Loop() {
9897
item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey)
9998
err := q.Add(item)
10099
if err != nil {
@@ -120,7 +119,7 @@ func benchmarkAddPeekRemove(b *testing.B, q framework.SafeQueue) {
120119
func benchmarkBulkAddThenBulkRemove(b *testing.B, q framework.SafeQueue) {
121120
b.ReportAllocs()
122121

123-
for i := 0; i < b.N; i++ {
122+
for i := 0; b.Loop(); i++ {
124123
// Add a batch of items
125124
items := make([]types.QueueItemAccessor, 100)
126125
for j := range items {
@@ -155,7 +154,7 @@ func benchmarkAddPeekTailRemove(b *testing.B, q framework.SafeQueue) {
155154

156155
b.ReportAllocs()
157156

158-
for i := 0; i < b.N; i++ {
157+
for b.Loop() {
159158
item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey)
160159
err := q.Add(item)
161160
if err != nil {

pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package queue_test
17+
package queue
1818

1919
import (
2020
"fmt"
@@ -30,9 +30,6 @@ import (
3030

3131
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
3232
frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks"
33-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
34-
_ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/listqueue"
35-
_ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap"
3633
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
3734
typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
3835
)
@@ -181,7 +178,7 @@ func testLifecycleAndOrdering(
181178
func TestQueueConformance(t *testing.T) {
182179
t.Parallel()
183180

184-
for queueName, constructor := range queue.RegisteredQueues {
181+
for queueName, constructor := range RegisteredQueues {
185182
t.Run(string(queueName), func(t *testing.T) {
186183
t.Parallel()
187184
flowKey := types.FlowKey{ID: "test-flow-1", Priority: 0}
@@ -511,7 +508,7 @@ func TestQueueConformance(t *testing.T) {
511508
handleChan := make(chan types.QueueItemHandle, initialItems+(numGoroutines*opsPerGoroutine))
512509

513510
// Pre-populate the queue with an initial set of items.
514-
for i := 0; i < initialItems; i++ {
511+
for i := range initialItems {
515512
item := typesmocks.NewMockQueueItemAccessor(1, fmt.Sprintf("%s_conc_init_%d", flowKey, i), flowKey)
516513
err := q.Add(item)
517514
require.NoError(t, err, "Setup: pre-populating the queue should not fail")
@@ -526,7 +523,7 @@ func TestQueueConformance(t *testing.T) {
526523
for i := range numGoroutines {
527524
go func(routineID int) {
528525
defer wg.Done()
529-
for j := 0; j < opsPerGoroutine; j++ {
526+
for j := range opsPerGoroutine {
530527
opType := (j + routineID) % 4 // Vary operations more across goroutines
531528
switch opType {
532529
case 0: // Add

pkg/epp/flowcontrol/framework/plugins/queue/listqueue/listqueue.go renamed to pkg/epp/flowcontrol/framework/plugins/queue/listqueue.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@ limitations under the License.
1616

1717
// Package listqueue provides a high-performance, concurrent-safe FIFO (First-In, First-Out) implementation of
1818
// implementation of the `framework.SafeQueue` based on the standard library's `container/list`.
19-
package listqueue
19+
package queue
2020

2121
import (
2222
"container/list"
2323
"sync"
2424
"sync/atomic"
2525

2626
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
27-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
2827
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2928
)
3029

@@ -53,7 +52,7 @@ import (
5352
const ListQueueName = "ListQueue"
5453

5554
func init() {
56-
queue.MustRegisterQueue(queue.RegisteredQueueName(ListQueueName),
55+
MustRegisterQueue(RegisteredQueueName(ListQueueName),
5756
func(_ framework.ItemComparator) (framework.SafeQueue, error) {
5857
// The list queue is a simple FIFO queue and does not use a comparator.
5958
return newListQueue(), nil

pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap.go renamed to pkg/epp/flowcontrol/framework/plugins/queue/maxminheap.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,22 @@ limitations under the License.
2323
//
2424
// The core heap maintenance logic (up, down, and grandchild finding) is adapted from the public domain implementation
2525
// at https://github.com/esote/minmaxheap, which is licensed under CC0-1.0.
26-
package maxminheap
26+
package queue
2727

2828
import (
2929
"math"
3030
"sync"
3131
"sync/atomic"
3232

3333
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
34-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
3534
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
3635
)
3736

3837
// MaxMinHeapName is the name of the max-min heap queue implementation.
3938
const MaxMinHeapName = "MaxMinHeap"
4039

4140
func init() {
42-
queue.MustRegisterQueue(queue.RegisteredQueueName(MaxMinHeapName),
41+
MustRegisterQueue(RegisteredQueueName(MaxMinHeapName),
4342
func(comparator framework.ItemComparator) (framework.SafeQueue, error) {
4443
return newMaxMinHeap(comparator), nil
4544
})

pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go renamed to pkg/epp/flowcontrol/framework/plugins/queue/maxminheap_test.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package maxminheap
17+
package queue
1818

1919
import (
2020
"math"
@@ -23,19 +23,10 @@ import (
2323

2424
"github.com/stretchr/testify/require"
2525

26-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks"
2726
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2827
typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
2928
)
3029

31-
// enqueueTimeComparator orders items by their enqueue time (earlier first).
32-
var enqueueTimeComparator = &mocks.MockItemComparator{
33-
ScoreTypeV: "enqueue_time_ns_asc",
34-
FuncV: func(a, b types.QueueItemAccessor) bool {
35-
return a.EnqueueTime().After(b.EnqueueTime())
36-
},
37-
}
38-
3930
// TestMaxMinHeap_InternalProperty validates that the max-min heap property is maintained after a series of `Add` and
4031
// `Remove` operations. This is a white-box test to ensure the internal data structure is always in a valid state.
4132
func TestMaxMinHeap_InternalProperty(t *testing.T) {

pkg/epp/flowcontrol/registry/config.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
2929
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
3030
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
31-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/listqueue"
3231
)
3332

3433
// =============================================================================
@@ -44,7 +43,7 @@ const (
4443
// defaultInterFlowDispatchPolicy is the default policy for selecting which flow's queue to service next.
4544
defaultInterFlowDispatchPolicy inter.RegisteredPolicyName = besthead.BestHeadPolicyName
4645
// defaultQueue is the default queue implementation for flows.
47-
defaultQueue queue.RegisteredQueueName = listqueue.ListQueueName
46+
defaultQueue queue.RegisteredQueueName = queue.ListQueueName
4847
// defaultInitialShardCount is the default number of parallel shards to create when the registry is initialized.
4948
defaultInitialShardCount int = 1
5049
// defaultFlowGCTimeout is the default duration of inactivity after which an idle flow is garbage collected.

pkg/epp/flowcontrol/registry/config_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
3333
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
3434
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
35-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/listqueue"
3635
)
3736

3837
func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
@@ -84,7 +83,7 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
8483
PriorityName: "Critical",
8584
IntraFlowDispatchPolicy: fcfs.FCFSPolicyName,
8685
InterFlowDispatchPolicy: besthead.BestHeadPolicyName,
87-
Queue: listqueue.ListQueueName,
86+
Queue: queue.ListQueueName,
8887
MaxBytes: 500,
8988
}},
9089
},
@@ -195,7 +194,7 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
195194
Priority: 1,
196195
PriorityName: "High",
197196
IntraFlowDispatchPolicy: intra.RegisteredPolicyName("policy-with-req"),
198-
Queue: listqueue.ListQueueName,
197+
Queue: queue.ListQueueName,
199198
}},
200199
},
201200
opts: []configOption{withIntraFlowDispatchPolicyFactory(

pkg/epp/flowcontrol/registry/managedqueue_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
3131
frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks"
3232
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
33-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/listqueue"
3433
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
3534
typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
3635
)
@@ -56,7 +55,7 @@ func newMockedMqHarness(t *testing.T, queue *frameworkmocks.MockSafeQueue, key t
5655
// This is essential for integration and concurrency tests.
5756
func newRealMqHarness(t *testing.T, key types.FlowKey) *mqTestHarness {
5857
t.Helper()
59-
q, err := queue.NewQueueFromName(listqueue.ListQueueName, nil)
58+
q, err := queue.NewQueueFromName(queue.ListQueueName, nil)
6059
require.NoError(t, err, "Test setup: creating a real ListQueue implementation should not fail")
6160
return newMqHarness(t, q, key, false)
6261
}

0 commit comments

Comments
 (0)