@@ -2,7 +2,6 @@ package dataplane
22
33import (
44 "context"
5- "errors"
65 "fmt"
76 "time"
87
@@ -11,21 +10,15 @@ import (
1110 "github.com/google/nftables/binaryutil"
1211 "github.com/google/nftables/expr"
1312 "github.com/mdlayher/netlink"
14- vishnetlink "github.com/vishvananda/netlink"
15- "github.com/vishvananda/netlink/nl"
1613 "golang.org/x/sys/unix"
1714
18- v1 "k8s.io/api/core/v1"
1915 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20- "k8s.io/apimachinery/pkg/util/sets"
2116 "k8s.io/apimachinery/pkg/util/wait"
2217 "k8s.io/klog/v2"
2318
24- "sigs.k8s.io/kube-network-policies/pkg/dataplane/conntrack"
2519 "sigs.k8s.io/kube-network-policies/pkg/network"
2620 "sigs.k8s.io/kube-network-policies/pkg/networkpolicy"
2721 "sigs.k8s.io/kube-network-policies/pkg/runner"
28- "sigs.k8s.io/kube-network-policies/pkg/socket"
2922)
3023
3124// Network policies are hard to implement efficiently, and in large clusters this is
@@ -106,22 +99,9 @@ func newController(
10699 1 * time .Hour , // maxInterval
107100 )
108101
109- // The conntrack cleaner will run periodically to remove entries
110- // that might have been left behind after network policies changed.
111- // The deletion of the entries will cause the connections to be dropped
112- // and re-evaluated by the new network policies.
113- c .conntrackReconciler = runner .NewBoundedFrequencyRunner (
114- controllerName + "-conntrack" ,
115- func () error { return c .firewallEnforcer (context .Background ()) },
116- 60 * time .Second , // minInterval is conservative to avoid too much load on conntrack
117- 10 * time .Second , // retryInterval
118- 1 * time .Hour , // maxInterval
119- )
120-
121102 // The sync callback now triggers the runner.
122103 syncCallback := func () {
123104 c .syncRunner .Run ()
124- c .conntrackReconciler .Run ()
125105 }
126106 c .policyEngine .SetDataplaneSyncCallbacks (syncCallback )
127107
@@ -130,10 +110,9 @@ func newController(
130110
131111// Controller manages selector-based networkpolicy endpoints.
132112type Controller struct {
133- config Config
134- policyEngine * networkpolicy.PolicyEngine
135- syncRunner * runner.BoundedFrequencyRunner
136- conntrackReconciler * runner.BoundedFrequencyRunner
113+ config Config
114+ policyEngine * networkpolicy.PolicyEngine
115+ syncRunner * runner.BoundedFrequencyRunner
137116
138117 nfq * nfqueue.Nfqueue
139118 flushed bool
@@ -163,7 +142,7 @@ func (c *Controller) Run(ctx context.Context) error {
163142 registerMetrics (ctx )
164143 // collect metrics periodically
165144 go wait .UntilWithContext (ctx , func (ctx context.Context ) {
166- logger := klog .FromContext (ctx ). WithName ( "metrics-collector" )
145+ logger := klog .FromContext (ctx )
167146 queues , err := readNfnetlinkQueueStats ()
168147 if err != nil {
169148 logger .Error (err , "reading nfqueue stats" )
@@ -180,9 +159,8 @@ func (c *Controller) Run(ctx context.Context) error {
180159
181160 }, 30 * time .Second )
182161
183- // Start the BoundedFrequencyRunner's loops .
162+ // Start the BoundedFrequencyRunner's loop .
184163 go c .syncRunner .Loop (ctx .Done ())
185- go c .conntrackReconciler .Loop (ctx .Done ())
186164
187165 // Perform an initial sync to ensure rules are in place at startup.
188166 if err := c .syncNFTablesRules (ctx ); err != nil {
@@ -332,140 +310,11 @@ func (c *Controller) evaluatePacket(ctx context.Context, p *network.Packet) bool
332310 return allowed
333311}
334312
335- // firewallEnforcer ensures that existing connections comply with current network policies.
336- func (c * Controller ) firewallEnforcer (ctx context.Context ) error {
337- logger := klog .FromContext (ctx ).WithName ("firewall-enforcer" )
338-
339- logger .Info ("Enforcing firewall policies on existing connections" )
340- start := time .Now ()
341- defer func () {
342- logger .Info ("Enforcing firewall policies on existing connections" , "elapsed" , time .Since (start ))
343- }()
344-
345- flows , err := vishnetlink .ConntrackTableList (vishnetlink .ConntrackTable , vishnetlink .FAMILY_ALL )
346- if err != nil {
347- return err
348- }
349-
350- allPodIPs , divertAll , err := c .policyEngine .GetManagedIPs (ctx )
351- if err != nil {
352- return err
353- }
354-
355- ipset := sets.Set [string ]{}
356- if ! divertAll {
357- for _ , ip := range allPodIPs {
358- ipset .Insert (ip .String ())
359- }
360- }
361-
362- connectionsToFlush := []* vishnetlink.ConntrackFlow {}
363- connectionsToDestroy := []network.Packet {}
364-
365- for _ , flow := range flows {
366- // only UDP, SCTP or TCP connections in ESTABLISHED state are evaluated
367- if flow .Forward .Protocol != unix .IPPROTO_UDP &&
368- flow .Forward .Protocol != unix .IPPROTO_SCTP &&
369- flow .Forward .Protocol != unix .IPPROTO_TCP {
370- continue
371- }
372- if flow .ProtoInfo != nil {
373- if state , ok := flow .ProtoInfo .(* vishnetlink.ProtoInfoTCP ); ok && state .State != nl .TCP_CONNTRACK_ESTABLISHED {
374- continue
375- }
376- }
377-
378- // If divertAll is true, all pod IPs are managed by network policies.
379- if ! divertAll {
380- // Only evaluate flows that are affected by network policies.
381- // Translated packets will be evaluated on the destination node.
382- if ! ipset .Has (flow .Forward .SrcIP .String ()) && ! ipset .Has (flow .Reverse .SrcIP .String ()) {
383- klog .V (4 ).Info ("Skipping conntrack entry not involving managed IPs" , "flow" , flow )
384- continue
385- }
386- }
387-
388- // Evaluate both directions of the flow.
389- p := conntrack .PacketFromFlow (flow )
390- if p == nil {
391- continue
392- }
393- reverse := network .SwapPacket (p )
394- klog .V (4 ).Info ("Evaluating packet" , "packet" , p .String ())
395-
396- for _ , packet := range []* network.Packet {p , reverse } {
397- // Evaluate the packet against current network policies.
398- allowed , err := c .policyEngine .EvaluatePacket (ctx , packet )
399- if err != nil {
400- klog .Infof ("error evaluating conntrack entry %v: %v" , flow , err )
401- continue
402- }
403- // If the flow is not allowed, take action.
404- if allowed {
405- continue
406- }
407- klog .V (4 ).Info ("Connection no longer allowed by network policies" , "packet" , packet .String ())
408-
409- // For not TCP flows we flush the conntrack entries, this is a safe operation.
410- if packet .Proto != v1 .ProtocolTCP {
411- connectionsToFlush = append (connectionsToFlush , flow )
412- } else {
413- // For TCP we destroy the sockets, it is invasive but is also the most effective way
414- // and implemented in cilium https://github.com/cilium/cilium/blob/main/pkg/datapath/sockets/sockets.go
415- // The alternative is to send a RST packet, but it is more complex to implement since it requires/
416- // to get the seq/ack numbers from the conntrack entry, and also it is not guaranteed that the application
417- // will receive it.
418- connectionsToDestroy = append (connectionsToDestroy , * packet )
419- }
420- }
421- }
422-
423- var errorList []error
424- // UDP
425- if len (connectionsToFlush ) > 0 {
426- logger .Info ("Flushing UDP/SCTP conntrack entries" , "entries" , len (connectionsToFlush ))
427-
428- filter := conntrack .NewConntrackFilter (connectionsToFlush )
429- n , err := vishnetlink .ConntrackDeleteFilters (vishnetlink .ConntrackTable , unix .AF_INET , filter )
430- if err != nil {
431- errorList = append (errorList , err )
432- }
433- logger .V (4 ).Info ("Deleted IPv4 conntrack entries" , "entries" , n )
434-
435- n , err = vishnetlink .ConntrackDeleteFilters (vishnetlink .ConntrackTable , unix .AF_INET6 , filter )
436- if err != nil {
437- errorList = append (errorList , err )
438- }
439- logger .V (4 ).Info ("Deleted IPv6 conntrack entries" , "entries" , n )
440- }
441- // TCP
442- if len (connectionsToDestroy ) > 0 {
443- logger .Info ("Destroying TCP connections" , "connections" , len (connectionsToDestroy ))
444-
445- // Create an instance of the terminator.
446- terminator := socket .NewSocketTerminator ()
447- // connectionsToDestroy is a slice of network.Packet from your original code.
448- for _ , packet := range connectionsToDestroy {
449- // The packet needs to be a pointer for the function call.
450- p := packet
451- if err := terminator .TerminateSocket (& p ); err != nil {
452- // Log the error but continue processing others.
453- klog .Errorf ("Failed to terminate connection for packet %s: %v" , p .String (), err )
454- errorList = append (errorList , err )
455- }
456- }
457- }
458- if len (errorList ) > 0 {
459- return fmt .Errorf ("error cleaning conntrack entries: %v" , errorList )
460- }
461- return errors .Join (errorList ... )
462- }
463-
464313// syncNFTablesRules adds the necessary rules to process the first connection packets in userspace
465314// and check if network policies must apply.
466315// TODO: We can divert only the traffic affected by network policies using a set in nftables or an IPset.
467316func (c * Controller ) syncNFTablesRules (ctx context.Context ) error {
468- logger := klog .FromContext (ctx ). WithName ( "nftables-sync" )
317+ logger := klog .FromContext (ctx )
469318
470319 logger .Info ("Syncing nftables rules" )
471320 start := time .Now ()
@@ -731,7 +580,7 @@ func (c *Controller) syncNFTablesRules(ctx context.Context) error {
731580 }
732581
733582 if err := nft .Flush (); err != nil {
734- logger .Info ("syncing nftables rules" , "error" , err )
583+ klog . FromContext ( ctx ) .Info ("syncing nftables rules" , "error" , err )
735584 return err
736585 }
737586 return nil
0 commit comments