Skip to content

Commit 2654d19

Browse files
LukeAVanDrieBenjaminBraunDev
authored andcommitted
chore: Improve Flow Control docs and logging (kubernetes-sigs#1726)
* feat(flowcontrol): Introduce explicit context handling in Controller This commit completes the refactoring of the request lifecycle by making context management explicit and adapting the FlowController to the new internal components. This is the final commit in a series: 1. Harden FlowItem for concurrent finalization. 2. Adapt ShardProcessor to new FlowItem lifecycle. 3. This commit: Introduce explicit context handling. Key changes: - The `FlowControlRequest` interface no longer includes `Context()`. - `FlowController.EnqueueAndWait` now accepts a `context.Context` as its first argument, making the request lifecycle explicit and caller-controlled. - The controller now actively monitors this context and will initiate an "asynchronous finalization" if the context expires after an item has been handed off to a processor. - Adds extensive integration and concurrency tests to validate the new end-to-end lifecycle management under contention. * chore(fc): Improve controller docs and logging - Overhaul package-level documentation in doc.go files for controller and internal packages to clarify architecture, concurrency, and ownership. - Refine GoDoc and inline comments across all files for clarity, consistency, and to better explain intent. - Remove low signal-to-noise inline comments. - Adjust log levels for better signal-to-noise ratio (e.g., TRACE for high-frequency events). - Remove unnecessary logger.WithValues calls in hot paths within the ShardProcessor to reduce allocation overhead. - Remove overly defensive nil check in EnqueueAndWait. - Remove config deep copy in NewFlowController (we already pass by value) - Replace a panic with logger.Error in distributeRequest for a ManagedQueue invariant violation.
1 parent a15e402 commit 2654d19

File tree

6 files changed

+150
-183
lines changed

6 files changed

+150
-183
lines changed

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 52 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// Package controller contains the implementation of the `FlowController` engine.
17+
// Package controller contains the implementation of the FlowController engine.
1818
//
19-
// The FlowController is the central processing engine of the Flow Control system. It is a sharded, high-throughput
19+
// The FlowController is the central processing engine of the Flow Control layer. It is a sharded, high-throughput
2020
// component responsible for managing the lifecycle of all incoming requests. It achieves this by acting as a stateless
21-
// supervisor that orchestrates a pool of stateful workers (`internal.ShardProcessor`), distributing incoming requests
22-
// among them using a sophisticated load-balancing algorithm.
21+
// supervisor that orchestrates a pool of stateful workers (ShardProcessors), distributing incoming requests among them.
2322
package controller
2423

2524
import (
@@ -43,22 +42,20 @@ import (
4342
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4443
)
4544

46-
// registryClient defines the minimal interface that the `FlowController` needs to interact with the `FlowRegistry`.
45+
// registryClient defines the minimal interface that the FlowController needs to interact with the FlowRegistry.
4746
type registryClient interface {
4847
contracts.FlowRegistryObserver
4948
contracts.FlowRegistryDataPlane
5049
}
5150

52-
// shardProcessor is the minimal internal interface that the `FlowController` requires from its workers.
53-
// This abstraction allows for the injection of mock processors during testing.
51+
// shardProcessor is the minimal internal interface that the FlowController requires from its workers.
5452
type shardProcessor interface {
5553
Run(ctx context.Context)
5654
Submit(item *internal.FlowItem) error
5755
SubmitOrBlock(ctx context.Context, item *internal.FlowItem) error
5856
}
5957

60-
// shardProcessorFactory defines the signature for a function that creates a `shardProcessor`.
61-
// This enables dependency injection for testing.
58+
// shardProcessorFactory defines the signature for creating a shardProcessor.
6259
type shardProcessorFactory func(
6360
ctx context.Context,
6461
shard contracts.RegistryShard,
@@ -74,15 +71,16 @@ var _ shardProcessor = &internal.ShardProcessor{}
7471
// managedWorker holds the state for a single supervised worker.
7572
type managedWorker struct {
7673
processor shardProcessor
77-
cancel context.CancelFunc
74+
// cancel function for the worker-specific context. Used during shutdown and GC.
75+
cancel context.CancelFunc
7876
}
7977

80-
// FlowController is the central, high-throughput engine of the Flow Control system.
81-
// It is designed as a stateless distributor that orchestrates a pool of stateful workers (`internal.ShardProcessor`),
82-
// following a "supervisor-worker" pattern.
78+
// FlowController is the central, high-throughput engine of the Flow Control layer.
79+
// It is designed as a stateless distributor that orchestrates a pool of stateful workers (ShardProcessor), following a
80+
// supervisor-worker pattern.
8381
//
84-
// The controller's `Run` loop executes periodically, acting as a garbage collector that keeps the pool of running
85-
// workers synchronized with the dynamic shard topology of the `FlowRegistry`.
82+
// The controller's run loop executes periodically, acting as a garbage collector that keeps the pool of running
83+
// workers synchronized with the dynamic shard topology of the FlowRegistry.
8684
//
8785
// Request Lifecycle Management:
8886
//
@@ -103,25 +101,26 @@ type FlowController struct {
103101

104102
// --- Lifecycle state ---
105103

106-
// parentCtx is the root context for the controller's lifecycle, established when `Run` is called.
104+
// parentCtx is the root context for the controller's lifecycle, established when NewFlowController is called.
107105
// It is the parent for all long-lived worker goroutines.
108106
parentCtx context.Context
109107

110108
// --- Concurrent state ---
111109

112-
// workers is a highly concurrent map storing the `managedWorker` for each shard.
110+
// workers is a highly concurrent map storing the managedWorker for each shard.
113111
// It is the controller's source of truth for the worker pool.
114-
// The key is the shard ID (`string`), and the value is a `*managedWorker`.
115-
workers sync.Map
112+
workers sync.Map // key: shard ID (string); value: *managedWorker
116113

114+
// wg waits for all worker goroutines to terminate during shutdown.
117115
wg sync.WaitGroup
118116
}
119117

120-
// flowControllerOption is a function that applies a configuration change to a `FlowController`.
118+
// flowControllerOption is a function that applies a configuration change.
121119
// test-only
122120
type flowControllerOption func(*FlowController)
123121

124-
// NewFlowController creates a new `FlowController` instance.
122+
// NewFlowController creates and starts a new FlowController instance.
123+
// The provided context governs the lifecycle of the controller and all its workers.
125124
func NewFlowController(
126125
ctx context.Context,
127126
config Config,
@@ -131,15 +130,14 @@ func NewFlowController(
131130
opts ...flowControllerOption,
132131
) (*FlowController, error) {
133132
fc := &FlowController{
134-
config: *config.deepCopy(),
133+
config: config,
135134
registry: registry,
136135
saturationDetector: sd,
137136
clock: clock.RealClock{},
138137
logger: logger.WithName("flow-controller"),
139138
parentCtx: ctx,
140139
}
141140

142-
// Use the real shard processor implementation by default.
143141
fc.shardProcessorFactory = func(
144142
ctx context.Context,
145143
shard contracts.RegistryShard,
@@ -167,9 +165,9 @@ func NewFlowController(
167165
return fc, nil
168166
}
169167

170-
// run starts the `FlowController`'s main reconciliation loop.
168+
// run starts the FlowController's main reconciliation loop (supervisor loop).
171169
// This loop is responsible for garbage collecting workers whose shards no longer exist in the registry.
172-
// This method blocks until the provided context is cancelled and ALL worker goroutines have fully terminated.
170+
// This method blocks until the provided context is cancelled and all worker goroutines have fully terminated.
173171
func (fc *FlowController) run(ctx context.Context) {
174172
fc.logger.Info("Starting FlowController reconciliation loop.")
175173
defer fc.logger.Info("FlowController reconciliation loop stopped.")
@@ -194,23 +192,19 @@ func (fc *FlowController) run(ctx context.Context) {
194192
// # Design Rationale: The Synchronous Model
195193
//
196194
// This blocking model is deliberately chosen for its simplicity and robustness, especially in the context of Envoy
197-
// External Processing (`ext_proc`), which operates on a stream-based protocol.
195+
// External Processing (ext_proc), which operates on a stream-based protocol.
198196
//
199-
// - `ext_proc` Alignment: A single goroutine typically manages the stream for a given HTTP request.
200-
// `EnqueueAndWait` fits this perfectly: the request-handling goroutine calls it, blocks, and upon return, has a
197+
// - ext_proc Alignment: A single goroutine typically manages the stream for a given HTTP request.
198+
// EnqueueAndWait fits this perfectly: the request-handling goroutine calls it, blocks, and upon return, has a
201199
// definitive outcome to act upon.
202200
// - Simplified State Management: The state of a "waiting" request is implicitly managed by the blocked goroutine's
203-
// stack and its `context.Context`. The system only needs to signal this specific goroutine to unblock it.
204-
// - Direct Backpressure: If queues are full, `EnqueueAndWait` returns an error immediately, providing direct
201+
// stack and its Context. The system only needs to signal this specific goroutine to unblock it.
202+
// - Direct Backpressure: If queues are full, EnqueueAndWait returns an error immediately, providing direct
205203
// backpressure to the caller.
206204
func (fc *FlowController) EnqueueAndWait(
207205
ctx context.Context,
208206
req types.FlowControlRequest,
209207
) (types.QueueOutcome, error) {
210-
if req == nil {
211-
return types.QueueOutcomeRejectedOther, errors.New("request cannot be nil")
212-
}
213-
214208
flowKey := req.FlowKey()
215209
fairnessID := flowKey.ID
216210
priority := strconv.Itoa(flowKey.Priority)
@@ -365,17 +359,22 @@ type candidate struct {
365359
// of that flow's queue, from least to most loaded.
366360
func (fc *FlowController) selectDistributionCandidates(key types.FlowKey) ([]candidate, error) {
367361
var candidates []candidate
362+
363+
// Acquire a connection to the registry for the flow key. This ensures a consistent view of the ActiveShards for the
364+
// duration of the shard selection process, preventing races with concurrent shard topology changes.
368365
err := fc.registry.WithConnection(key, func(conn contracts.ActiveFlowConnection) error {
369366
shards := conn.ActiveShards()
370-
candidates = make([]candidate, len(shards))
371-
for i, shard := range shards {
367+
candidates = make([]candidate, 0, len(shards))
368+
for _, shard := range shards {
372369
worker := fc.getOrStartWorker(shard)
373370
mq, err := shard.ManagedQueue(key)
374371
if err != nil {
375-
panic(fmt.Sprintf("invariant violation: ManagedQueue for leased flow %s failed on shard %s: %v",
376-
key, shard.ID(), err))
372+
fc.logger.Error(err,
373+
"Invariant violation. Failed to get ManagedQueue for a leased flow on an Active shard. Skipping shard.",
374+
"flowKey", key, "shardID", shard.ID())
375+
continue
377376
}
378-
candidates[i] = candidate{worker.processor, shard.ID(), mq.ByteSize()}
377+
candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.ByteSize()})
379378
}
380379
return nil
381380
})
@@ -435,15 +434,15 @@ func (fc *FlowController) distributeRequest(
435434
}
436435

437436
// getOrStartWorker implements the lazy-loading and startup of shard processors.
438-
// It attempts to retrieve an existing worker for a shard. If one doesn't exist, it constructs a new worker and attempts
439-
// to register it atomically. The worker's processor goroutine is only started *after* it has successfully been
440-
// registered, preventing race conditions where multiple goroutines create and start the same worker.
437+
// It ensures that exactly one worker goroutine is started for each shard, using atomic operations
438+
// (sync.Map.LoadOrStore). The worker's processor goroutine is only started after it has successfully been registered,
439+
// preventing race conditions where multiple goroutines create and start the same worker.
441440
func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *managedWorker {
442441
if w, ok := fc.workers.Load(shard.ID()); ok {
443442
return w.(*managedWorker)
444443
}
445444

446-
// Construct a new worker, but do not start its processor goroutine yet.
445+
// Construct a new worker, but do not start its goroutine yet.
447446
processorCtx, cancel := context.WithCancel(fc.parentCtx)
448447
processor := fc.shardProcessorFactory(
449448
processorCtx,
@@ -459,18 +458,17 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
459458
cancel: cancel,
460459
}
461460

462-
// Atomically load or store. This is the critical step for preventing race conditions.
461+
// Atomically load or store. This is the critical synchronization step.
463462
actual, loaded := fc.workers.LoadOrStore(shard.ID(), newWorker)
464463
if loaded {
465464
// Another goroutine beat us to it. The `newWorker` we created was not stored.
466-
// We must cancel the context we created for it to prevent a leak, but we do not need to do anything else, as its
467-
// processor was never started.
465+
// We must cancel the context we created to prevent a leak.
468466
cancel()
469467
return actual.(*managedWorker)
470468
}
471469

472-
// We won the race. The `newWorker` was successfully stored.
473-
// Now, and only now, do we start the processor's long-running goroutine.
470+
// We won the race. The newWorker was stored. Now, start the processor's long-running goroutine.
471+
fc.logger.V(logutil.DEFAULT).Info("Starting new ShardProcessor worker.", "shardID", shard.ID())
474472
fc.wg.Add(1)
475473
go func() {
476474
defer fc.wg.Done()
@@ -481,24 +479,22 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
481479
}
482480

483481
// reconcileProcessors is the supervisor's core garbage collection loop.
484-
// It fetches the current list of Active shards from the registry and removes any workers whose corresponding shards
485-
// have been fully drained and garbage collected by the registry.
482+
// It identifies and stops workers whose corresponding shards have been removed from the registry.
486483
func (fc *FlowController) reconcileProcessors() {
487484
stats := fc.registry.ShardStats()
488-
shards := make(map[string]struct{}, len(stats)) // `map[shardID] -> isActive`
485+
shards := make(map[string]struct{}, len(stats)) // map[shardID] -> isActive
489486
for _, s := range stats {
490487
shards[s.ID] = struct{}{}
491488
}
492489

493490
fc.workers.Range(func(key, value any) bool {
494491
shardID := key.(string)
495492
worker := value.(*managedWorker)
496-
497-
// GC check: Is the shard no longer in the registry at all?
498493
if _, exists := shards[shardID]; !exists {
499-
fc.logger.Info("Stale worker detected for GC'd shard, shutting down.", "shardID", shardID)
500-
worker.cancel()
501-
fc.workers.Delete(shardID)
494+
fc.logger.V(logutil.DEFAULT).Info("Stale worker detected for GC'd shard, initiating shutdown.",
495+
"shardID", shardID)
496+
worker.cancel() // Cancel the worker's context, initiating the Processor's graceful shutdown sequence.
497+
fc.workers.Delete(shardID) // Delete from the map so no new requests are routed to it.
502498
}
503499
return true
504500
})
@@ -515,7 +511,6 @@ func (fc *FlowController) shutdown() {
515511
worker.cancel()
516512
return true
517513
})
518-
519514
fc.wg.Wait()
520515
fc.logger.Info("All shard processors have shut down.")
521516
}

pkg/epp/flowcontrol/controller/controller_test.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -317,17 +317,6 @@ func TestFlowController_EnqueueAndWait(t *testing.T) {
317317
t.Run("Rejections", func(t *testing.T) {
318318
t.Parallel()
319319

320-
t.Run("OnNilRequest", func(t *testing.T) {
321-
t.Parallel()
322-
h := newUnitHarness(t, t.Context(), Config{}, nil)
323-
324-
outcome, err := h.fc.EnqueueAndWait(context.Background(), nil)
325-
require.Error(t, err, "EnqueueAndWait must reject a nil request")
326-
assert.Equal(t, "request cannot be nil", err.Error(), "error message must be specific")
327-
assert.Equal(t, types.QueueOutcomeRejectedOther, outcome,
328-
"outcome should be QueueOutcomeRejectedOther for invalid inputs")
329-
})
330-
331320
t.Run("OnReqCtxExpiredBeforeDistribution", func(t *testing.T) {
332321
t.Parallel()
333322
// Test that if the request context provided to EnqueueAndWait is already expired, it returns immediately.
@@ -418,14 +407,13 @@ func TestFlowController_EnqueueAndWait(t *testing.T) {
418407
"outcome should be QueueOutcomeRejectedOther for transient registry errors")
419408
})
420409

421-
// This test validates the documented invariant handling in distributeRequest.
422-
t.Run("PanicsOnManagedQueueError", func(t *testing.T) {
410+
t.Run("OnManagedQueueError", func(t *testing.T) {
423411
t.Parallel()
424412
mockRegistry := &mockRegistryClient{}
425413
h := newUnitHarness(t, t.Context(), Config{}, mockRegistry)
426414

427415
// Create a faulty shard that successfully leases the flow but fails to return the
428-
// ManagedQueue.
416+
// ManagedQueue. This shard should be considered as unavailable.
429417
faultyShard := &mocks.MockRegistryShard{
430418
IDFunc: func() string { return "faulty-shard" },
431419
ManagedQueueFunc: func(_ types.FlowKey) (contracts.ManagedQueue, error) {
@@ -440,9 +428,11 @@ func TestFlowController_EnqueueAndWait(t *testing.T) {
440428
}
441429

442430
req := newTestRequest(defaultFlowKey)
443-
assert.Panics(t, func() {
444-
_, _ = h.fc.EnqueueAndWait(context.Background(), req)
445-
}, "EnqueueAndWait must panic when a registry implementation violates the ManagedQueue contract")
431+
outcome, err := h.fc.EnqueueAndWait(context.Background(), req)
432+
require.Error(t, err, "EnqueueAndWait must reject requests if no shards are available")
433+
assert.ErrorIs(t, err, types.ErrRejected, "error should wrap ErrRejected")
434+
assert.Equal(t, types.QueueOutcomeRejectedCapacity, outcome,
435+
"outcome should be QueueOutcomeRejectedCapacity when no shards exist for the flow")
446436
})
447437
})
448438

pkg/epp/flowcontrol/controller/doc.go

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,48 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// Package controller contains the implementation of the `FlowController` engine.
17+
// Package controller contains the implementation of the FlowController engine.
1818
//
1919
// # Overview
2020
//
21-
// The `FlowController` is the central processing engine of the Flow Control system. It acts as a stateless supervisor
22-
// that orchestrates a pool of stateful workers (`internal.ShardProcessor`), managing the lifecycle of all incoming
23-
// requests from initial submission to a terminal outcome (dispatch, rejection, or eviction).
21+
// The FlowController is the central processing engine of the Flow Control layer. It acts as a stateless supervisor that
22+
// orchestrates a pool of stateful workers (internal.ShardProcessor), managing the lifecycle of all incoming requests
23+
// from initial submission to a terminal outcome (dispatch, rejection, or eviction).
2424
//
2525
// # Architecture: Supervisor-Worker Pattern
2626
//
27-
// This package implements a classic "supervisor-worker" pattern to achieve high throughput and dynamic scalability.
27+
// This package implements a supervisor-worker pattern to achieve high throughput and dynamic scalability.
2828
//
29-
// - The `FlowController` (Supervisor): The public-facing API of the system. Its primary responsibilities are to
30-
// execute a distribution algorithm to select the optimal worker for a new request and to manage the lifecycle of
31-
// the worker pool, ensuring it stays synchronized with the underlying shard topology defined by the
32-
// `contracts.FlowRegistry`.
33-
// - The `internal.ShardProcessor` (Worker): A stateful, single-goroutine actor responsible for the entire lifecycle
34-
// of requests on a single shard. The supervisor manages a pool of these workers, one for each
35-
// `contracts.RegistryShard`.
29+
// - The FlowController (Supervisor): The public-facing API of the system. Its primary responsibilities are to execute
30+
// a distribution algorithm to select the optimal worker for a new request and to manage the lifecycle of the worker
31+
// pool, ensuring it stays synchronized with the underlying shard topology defined by the contracts.FlowRegistry.
32+
// - The internal.ShardProcessor (Worker): A stateful, single-goroutine actor responsible for the entire lifecycle of
33+
// requests on a single shard. The supervisor manages a pool of these workers, one for each contracts.RegistryShard.
34+
//
35+
// # Concurrency Model
36+
//
37+
// The FlowController is designed to be highly concurrent and thread-safe. It acts primarily as a stateless distributor.
38+
//
39+
// - EnqueueAndWait: Can be called concurrently by many goroutines.
40+
// - Worker Management: Uses a sync.Map (workers) for concurrent access and lazy initialization of workers.
41+
// - Supervision: A single background goroutine (run) manages the worker pool lifecycle (garbage collection).
42+
//
43+
// It achieves high throughput by minimizing shared state and relying on the internal ShardProcessors to handle state
44+
// mutations serially (using an actor model).
45+
//
46+
// # Request Lifecycle and Ownership
47+
//
48+
// A request (represented internally as a FlowItem) has a lifecycle managed cooperatively by the Controller and a
49+
// Processor. Defining ownership is critical for ensuring an item is finalized exactly once.
50+
//
51+
// 1. Submission (Controller): The Controller attempts to hand off the item to a Processor.
52+
// 2. Handoff:
53+
// - Success: Ownership transfers to the Processor, which is now responsible for Finalization.
54+
// - Failure: Ownership remains with the Controller, which must Finalize the item.
55+
// 3. Processing (Processor): The Processor enqueues, manages, and eventually dispatches or rejects the item.
56+
// 4. Finalization: The terminal outcome is set. This can happen:
57+
// - Synchronously: The Processor determines the outcome (e.g., Dispatch, Capacity Rejection).
58+
// - Asynchronously: The Controller observes the request's Context expiry (TTL/Cancellation) and calls Finalize.
59+
//
60+
// The FlowItem uses atomic operations to safely coordinate the Finalization state across goroutines.
3661
package controller

0 commit comments

Comments
 (0)