feat(): switch to informer
This commit is contained in:
48
cmd/main.go
48
cmd/main.go
@@ -2,6 +2,9 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.uploadfilter24.eu/covidnetes/k8s-cilium-node-label/internal/utils"
|
"git.uploadfilter24.eu/covidnetes/k8s-cilium-node-label/internal/utils"
|
||||||
@@ -32,34 +35,23 @@ func main() {
|
|||||||
"Caller": "Main",
|
"Caller": "Main",
|
||||||
}).Info("Entering main event loop")
|
}).Info("Entering main event loop")
|
||||||
|
|
||||||
for {
|
stopCh := make(chan struct{})
|
||||||
leases, err := internal.GetCiliumL2Leases()
|
go func() {
|
||||||
|
err := internal.StartLeaseInformer(cfg, stopCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{"Caller": "Main"}).Fatalf("Lease informer failed: %s", err.Error())
|
||||||
"Caller": "Main",
|
|
||||||
}).Errorf("Error getting Cilium L2 leases: %s", err.Error())
|
|
||||||
}
|
}
|
||||||
for lease := range leases {
|
}()
|
||||||
// We should probably group here as there may be multiple leases per node
|
|
||||||
node, err := internal.GetNodeNameFromLease(leases[lease])
|
sigCh := make(chan os.Signal, 1)
|
||||||
if err != nil {
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||||
log.WithFields(log.Fields{
|
s := <-sigCh
|
||||||
"Caller": "Main",
|
log.WithFields(log.Fields{
|
||||||
}).Errorf("Error getting node name from Cilium L2 lease: %s", err.Error())
|
"Caller": "Main",
|
||||||
}
|
"Signal": s.String(),
|
||||||
if cfg.DryRun {
|
}).Info("Signal received, shutting down")
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"Caller": "Main",
|
close(stopCh)
|
||||||
}).Info(fmt.Sprintf("Dry run enabled, would label node %s with %s=true", node, cfg.CiliumLabel))
|
|
||||||
} else {
|
time.Sleep(2 * time.Second)
|
||||||
err := internal.LabelNode(node, leases[lease].Name, cfg)
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"Caller": "Main",
|
|
||||||
}).Error(fmt.Sprintf("Error labeling node: %s", err.Error()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
time.Sleep(5 * time.Minute)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -21,6 +21,7 @@ require (
|
|||||||
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
||||||
github.com/go-openapi/swag v0.23.0 // indirect
|
github.com/go-openapi/swag v0.23.0 // indirect
|
||||||
github.com/google/gnostic-models v0.7.0 // indirect
|
github.com/google/gnostic-models v0.7.0 // indirect
|
||||||
|
github.com/google/go-cmp v0.7.0 // indirect
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/josharian/intern v1.0.0 // indirect
|
github.com/josharian/intern v1.0.0 // indirect
|
||||||
github.com/json-iterator/go v1.1.12 // indirect
|
github.com/json-iterator/go v1.1.12 // indirect
|
||||||
@@ -28,6 +29,7 @@ require (
|
|||||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||||
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
|
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
|
||||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
github.com/x448/float16 v0.8.4 // indirect
|
github.com/x448/float16 v0.8.4 // indirect
|
||||||
go.yaml.in/yaml/v2 v2.4.3 // indirect
|
go.yaml.in/yaml/v2 v2.4.3 // indirect
|
||||||
go.yaml.in/yaml/v3 v3.0.4 // indirect
|
go.yaml.in/yaml/v3 v3.0.4 // indirect
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -81,6 +81,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
|
|||||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||||
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
|
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
|
||||||
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||||
go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
|
go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
|
||||||
go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8=
|
go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8=
|
||||||
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
||||||
|
|||||||
97
internal/informer.go
Normal file
97
internal/informer.go
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
v1 "k8s.io/api/coordination/v1"
|
||||||
|
"k8s.io/client-go/informers"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StartLeaseInformer starts a long-running informer watching Lease objects in the kube-system namespace.
|
||||||
|
// It will react to Add/Update events for leases whose name starts with "cilium-l2announce" and label
|
||||||
|
// the corresponding node.
|
||||||
|
func StartLeaseInformer(cfg *Config, stopCh <-chan struct{}) error {
|
||||||
|
log.WithFields(log.Fields{"Caller": "StartLeaseInformer"}).Info("Starting lease informer")
|
||||||
|
|
||||||
|
client, err := generateClient()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Could not generate client for informer: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
factory := informers.NewSharedInformerFactoryWithOptions(&client, 0, informers.WithNamespace("kube-system"))
|
||||||
|
leaseInformer := factory.Coordination().V1().Leases().Informer()
|
||||||
|
|
||||||
|
leaseInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: func(obj interface{}) {
|
||||||
|
handleLease(obj, cfg)
|
||||||
|
},
|
||||||
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
|
handleLease(newObj, cfg)
|
||||||
|
},
|
||||||
|
DeleteFunc: func(obj interface{}) {
|
||||||
|
// nothing to do on delete for now
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
factory.Start(stopCh)
|
||||||
|
|
||||||
|
// wait for cache sync with timeout
|
||||||
|
synced := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
if !cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced) {
|
||||||
|
close(synced)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
close(synced)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-synced:
|
||||||
|
// proceed
|
||||||
|
case <-time.After(15 * time.Second):
|
||||||
|
return fmt.Errorf("timed out waiting for informer cache sync")
|
||||||
|
}
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{"Caller": "StartLeaseInformer"}).Info("Lease informer running")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleLease(obj interface{}, cfg *Config) {
|
||||||
|
lease, ok := obj.(*v1.Lease)
|
||||||
|
if !ok {
|
||||||
|
log.WithFields(log.Fields{"Caller": "handleLease"}).Warn("Could not cast object to Lease")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if lease == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if lease.Name == "" || len(lease.Name) < 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.HasPrefix(lease.Name, "cilium-l2announce") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
node, err := GetNodeNameFromLease(*lease)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{"Caller": "handleLease", "Lease": lease.Name}).Errorf("Error parsing lease: %s", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.DryRun {
|
||||||
|
log.WithFields(log.Fields{"Caller": "handleLease"}).Infof("Dry run: would label node %s with %s=true (lease %s)", node, cfg.CiliumLabel, lease.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = LabelNode(node, lease.Name, cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{"Caller": "handleLease", "Node": node, "Lease": lease.Name}).Errorf("Error labeling node: %s", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -92,36 +92,36 @@ func RemoveLabelFromNode(nodeName string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetCiliumL2Leases() ([]v1.Lease, error) {
|
// func GetCiliumL2Leases() ([]v1.Lease, error) {
|
||||||
client, err := generateClient()
|
// client, err := generateClient()
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
log.WithFields(log.Fields{"Caller": "GetCiliumL2Leases"}).Errorf("Could not generate client: %s", err.Error())
|
// log.WithFields(log.Fields{"Caller": "GetCiliumL2Leases"}).Errorf("Could not generate client: %s", err.Error())
|
||||||
return nil, err
|
// return nil, err
|
||||||
}
|
// }
|
||||||
|
|
||||||
leases, err := client.CoordinationV1().Leases("kube-system").List(context.TODO(), metav1.ListOptions{})
|
// leases, err := client.CoordinationV1().Leases("kube-system").List(context.TODO(), metav1.ListOptions{})
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
log.WithFields(log.Fields{"Caller": "GetCiliumL2Leases"}).Errorf("Could not list leases: %s", err.Error())
|
// log.WithFields(log.Fields{"Caller": "GetCiliumL2Leases"}).Errorf("Could not list leases: %s", err.Error())
|
||||||
return nil, err
|
// return nil, err
|
||||||
}
|
// }
|
||||||
|
|
||||||
ciliumLeases := []v1.Lease{}
|
// ciliumLeases := []v1.Lease{}
|
||||||
|
|
||||||
for _, l := range leases.Items {
|
// for _, l := range leases.Items {
|
||||||
if strings.HasPrefix(l.Name, "cilium-l2announce") {
|
// if strings.HasPrefix(l.Name, "cilium-l2announce") {
|
||||||
log.WithFields(log.Fields{
|
// log.WithFields(log.Fields{
|
||||||
"Caller": "GetCiliumL2Leases",
|
// "Caller": "GetCiliumL2Leases",
|
||||||
"Lease": l.Name,
|
// "Lease": l.Name,
|
||||||
}).Info("Found Cilium L2 lease")
|
// }).Info("Found Cilium L2 lease")
|
||||||
// Pretty-print a couple of fields for debugging
|
// // Pretty-print a couple of fields for debugging
|
||||||
if l.Spec.HolderIdentity != nil {
|
// if l.Spec.HolderIdentity != nil {
|
||||||
log.WithFields(log.Fields{"HolderIdentity": *l.Spec.HolderIdentity}).Debug("HolderIdentity")
|
// log.WithFields(log.Fields{"HolderIdentity": *l.Spec.HolderIdentity}).Debug("HolderIdentity")
|
||||||
}
|
// }
|
||||||
ciliumLeases = append(ciliumLeases, l)
|
// ciliumLeases = append(ciliumLeases, l)
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
return ciliumLeases, nil
|
// return ciliumLeases, nil
|
||||||
}
|
// }
|
||||||
|
|
||||||
func GetNodeNameFromLease(lease v1.Lease) (string, error) {
|
func GetNodeNameFromLease(lease v1.Lease) (string, error) {
|
||||||
if lease.Spec.HolderIdentity == nil {
|
if lease.Spec.HolderIdentity == nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user