Files
k8s-cilium-node-label/internal/informer.go
Tobias Trabelsi a0593f7873
Some checks failed
Build and Test / Test (push) Failing after 18s
Build and Test / Build_Image_arm64 (push) Successful in 2m9s
Build and Test / Build_Image_amd64 (push) Has been cancelled
fix(): actually just compare the holder identity and not the resource version
2026-01-20 22:19:25 +01:00

126 lines
3.5 KiB
Go

package internal
import (
"fmt"
"strings"
"time"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/coordination/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func StartLeaseInformer(cfg *Config, stopCh <-chan struct{}) error {
log.WithFields(log.Fields{"Caller": "StartLeaseInformer"}).Info("Starting lease informer")
client, err := generateClient()
if err != nil {
return fmt.Errorf("Could not generate client for informer: %s", err.Error())
}
factory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace("kube-system"))
leaseInformer := factory.Coordination().V1().Leases().Informer()
leaseInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if l, ok := obj.(*v1.Lease); ok {
log.WithFields(log.Fields{"Caller": "lease.Add", "Lease": l.Name, "RV": l.ResourceVersion}).Debug("Add event for lease")
}
handleLease(obj, cfg)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldL, okOld := oldObj.(*v1.Lease)
newL, okNew := newObj.(*v1.Lease)
if okOld {
log.WithFields(log.Fields{"Caller": "lease.Update.old", "Lease": oldL.Name, "RV": oldL.ResourceVersion}).Debug("Old lease")
}
if okNew {
log.WithFields(log.Fields{"Caller": "lease.Update.new", "Lease": newL.Name, "RV": newL.ResourceVersion}).Debug("New lease")
}
// Only handle when either:
// - the object is not parseable as Lease (defensive), or
// - it's an Add-like event (old not present), or
// - HolderIdentity actually changed compared to previous object.
if okOld && okNew {
// If resourceVersion didn't change, skip (no real update)
if oldL.ResourceVersion == newL.ResourceVersion {
return
}
oldHolder := ""
newHolder := ""
if oldL.Spec.HolderIdentity != nil {
oldHolder = *oldL.Spec.HolderIdentity
}
if newL.Spec.HolderIdentity != nil {
newHolder = *newL.Spec.HolderIdentity
}
if oldHolder == newHolder {
return
}
}
handleLease(newObj, cfg)
},
DeleteFunc: func(obj interface{}) {
// nothing to do on delete for now
},
})
factory.Start(stopCh)
// wait for cache sync with timeout
synced := make(chan struct{})
go func() {
if !cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced) {
close(synced)
return
}
close(synced)
}()
select {
case <-synced:
// proceed
case <-time.After(15 * time.Second):
return fmt.Errorf("timed out waiting for informer cache sync")
}
log.WithFields(log.Fields{"Caller": "StartLeaseInformer"}).Info("Lease informer running")
return nil
}
func handleLease(obj interface{}, cfg *Config) {
lease, ok := obj.(*v1.Lease)
if !ok {
log.WithFields(log.Fields{"Caller": "handleLease"}).Warn("Could not cast object to Lease")
return
}
if lease == nil {
return
}
if lease.Name == "" || len(lease.Name) < 1 {
return
}
if !strings.HasPrefix(lease.Name, "cilium-l2announce") {
return
}
node, err := GetNodeNameFromLease(*lease)
if err != nil {
log.WithFields(log.Fields{"Caller": "handleLease", "Lease": lease.Name}).Errorf("Error parsing lease: %s", err.Error())
return
}
if cfg.DryRun {
log.WithFields(log.Fields{"Caller": "handleLease"}).Infof("Dry run: would label node %s with %s=true (lease %s)", node, cfg.CiliumLabel, lease.Name)
return
}
err = LabelNode(node, lease.Name, cfg)
if err != nil {
log.WithFields(log.Fields{"Caller": "handleLease", "Node": node, "Lease": lease.Name}).Errorf("Error labeling node: %s", err.Error())
}
}