diff --git a/cmd/csi-provisioner/csi-provisioner.go b/cmd/csi-provisioner/csi-provisioner.go index 4cb07e693b..40d39abcb7 100644 --- a/cmd/csi-provisioner/csi-provisioner.go +++ b/cmd/csi-provisioner/csi-provisioner.go @@ -467,6 +467,7 @@ func main() { ) var capacityController *capacity.Controller + var topologyInformer topology.Informer if *enableCapacity { // Publishing storage capacity information uses its own client // with separate rate limiting. @@ -501,7 +502,6 @@ func main() { klog.Infof("using %s/%s %s as owner of CSIStorageCapacity objects", controller.APIVersion, controller.Kind, controller.Name) } - var topologyInformer topology.Informer if nodeDeployment == nil { topologyRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax) topologyInformer = topology.NewNodeTopology( @@ -521,7 +521,6 @@ func main() { klog.Infof("producing CSIStorageCapacity objects with fixed topology segment %s", segment) topologyInformer = topology.NewFixedNodeTopology(&segment) } - go topologyInformer.RunWorker(ctx) managedByID := "external-provisioner" if *enableNodeDeployment { @@ -680,10 +679,13 @@ func main() { factory.Start(ctx.Done()) if factoryForNamespace != nil { - // Starting is enough, the capacity controller will + // Starting is enough, the capacityController and topologyInformer will // wait for sync. factoryForNamespace.Start(ctx.Done()) } + if topologyInformer != nil { + go topologyInformer.RunWorker(ctx) + } cacheSyncResult := factory.WaitForCacheSync(ctx.Done()) for _, v := range cacheSyncResult { if !v { diff --git a/pkg/capacity/topology/nodes.go b/pkg/capacity/topology/nodes.go index d19dbef6b0..75efc41eb1 100644 --- a/pkg/capacity/topology/nodes.go +++ b/pkg/capacity/topology/nodes.go @@ -21,6 +21,7 @@ import ( "reflect" "sort" "sync" + "sync/atomic" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" @@ -159,6 +160,7 @@ type nodeTopology struct { nodeInformer coreinformersv1.NodeInformer csiNodeInformer storageinformersv1.CSINodeInformer queue workqueue.TypedRateLimitingInterface[string] + hasSynced atomic.Bool mutex sync.Mutex // segments hold a list of all currently known topology segments. @@ -205,19 +207,22 @@ func (nt *nodeTopology) RunWorker(ctx context.Context) { klog.Info("Started node topology worker") defer klog.Info("Shutting node topology worker") + if !cache.WaitForCacheSync(ctx.Done(), + nt.nodeInformer.Informer().HasSynced, nt.csiNodeInformer.Informer().HasSynced) { + return + } + + go func() { + <-ctx.Done() + nt.queue.ShutDown() + }() + nt.queue.Add("") // Initial sync to ensure HasSynced() will become true. for nt.processNextWorkItem(ctx) { } } func (nt *nodeTopology) HasSynced() bool { - if nt.nodeInformer.Informer().HasSynced() && - nt.csiNodeInformer.Informer().HasSynced() { - // Now that both informers are up-to-date, use that - // information to update our own view of the world. - nt.sync(context.Background()) - return true - } - return false + return nt.hasSynced.Load() } func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool { @@ -227,6 +232,7 @@ func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool { } defer nt.queue.Done(obj) nt.sync(ctx) + nt.hasSynced.Store(true) return true } diff --git a/pkg/capacity/topology/nodes_test.go b/pkg/capacity/topology/nodes_test.go index 4ea2f39367..7cd9adfaa8 100644 --- a/pkg/capacity/topology/nodes_test.go +++ b/pkg/capacity/topology/nodes_test.go @@ -23,6 +23,7 @@ import ( "reflect" "sort" "testing" + "testing/synctest" "time" v1 "k8s.io/api/core/v1" @@ -566,6 +567,39 @@ func TestNodeTopology(t *testing.T) { } } +func TestHasSynced(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + client := fakeclientset.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 1*time.Hour) + nodeInformer := informerFactory.Core().V1().Nodes() + csiNodeInformer := informerFactory.Storage().V1().CSINodes() + rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, 2*time.Second) + queue := workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "items"}) + + nt := NewNodeTopology( + driverName, + client, + nodeInformer, + csiNodeInformer, + queue, + ).(*nodeTopology) + + ctx := t.Context() + go nt.RunWorker(ctx) + time.Sleep(10 * time.Second) + if nt.HasSynced() { + t.Fatalf("upstream informer not started yet, expected HasSynced to return false") + } + + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + synctest.Wait() + if !nt.HasSynced() { + t.Fatalf("nt should be synced now") + } + }) +} + type segmentsFound map[*Segment]bool func (sf segmentsFound) Found() []*Segment {