good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit 2e539c85 authored by a's avatar a
Browse files

ok

parent 9474309e
Branches
Tags
No related merge requests found
package k8s
import (
"context"
"fmt"
"pggat2/lib/gat"
"tuxpa.in/a/zlog/log"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type PodWatcher struct {
BaseRecipe gat.Recipe
Namespace string
ListOptions metav1.ListOptions
pods map[string]*v1.Pod
}
func (p *PodWatcher) Start(
pctx context.Context,
c *kubernetes.Clientset,
pool gat.Pool,
) error {
p.pods = make(map[string]*v1.Pod)
egg, ctx := errgroup.WithContext(pctx)
egg.Go(func() error {
return p.startWatching(ctx, c, pool)
})
err := p.getInitialPods(ctx, c, pool)
if err != nil {
return err
}
return egg.Wait()
}
func (p *PodWatcher) getInitialPods(
ctx context.Context,
c *kubernetes.Clientset,
pool gat.Pool,
) error {
pods, err := c.CoreV1().Pods(p.Namespace).List(ctx, p.ListOptions)
if err != nil {
return err
}
for _, pod := range pods.Items {
if isPodReady(&pod) {
p.pods[pod.Name] = &pod
}
}
return nil
}
func (p *PodWatcher) startWatching(
ctx context.Context,
c *kubernetes.Clientset,
pool gat.Pool,
) error {
watcher, err := c.CoreV1().Pods(p.Namespace).Watch(ctx, p.ListOptions)
if err != nil {
return err
}
defer watcher.Stop()
for event := range watcher.ResultChan() {
pod, ok := event.Object.(*v1.Pod)
if !ok {
continue
}
podName := pod.Name
podIp := pod.Status.PodIP
podReady := isPodReady(pod)
shouldDelete := false
shouldCreate := false
// Log raw event stream to debug log
switch event.Type {
case "ADDED":
log.Printf("ADDED pod %s with ip %s. Ready = %v", podName, podIp, podReady)
if podReady {
shouldCreate = true
} else {
shouldDelete = true
}
case "MODIFIED":
log.Printf("MODIFIED pod %s with ip %s. Ready = %v", podName, podIp, podReady)
if podReady {
shouldCreate = true
} else {
shouldDelete = true
}
case "DELETED":
log.Printf("DELETED pod %s with ip %s. Ready = %v", podName, podIp, podReady)
shouldDelete = true
}
if shouldDelete {
pool.RemoveRecipe(podName)
delete(p.pods, podName)
} else if shouldCreate {
r := p.BaseRecipe
r.Address = fmt.Sprintf(r.Address, pod.Status.PodIP)
pool.AddRecipe(podName, r)
}
}
return nil
}
func isPodReady(pod *v1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue {
return true
}
}
return false
}
...@@ -3,9 +3,53 @@ module pggat2 ...@@ -3,9 +3,53 @@ module pggat2
go 1.20 go 1.20
require ( require (
github.com/google/uuid v1.3.0 // indirect github.com/google/uuid v1.3.0
github.com/xdg-go/scram v1.1.2
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
k8s.io/api v0.27.4
k8s.io/apimachinery v0.27.4
k8s.io/client-go v0.27.4
tuxpa.in/a/zlog v1.61.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.1 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/rs/zerolog v1.28.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect
golang.org/x/text v0.3.8 // indirect golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
) )
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment