good morning!!!!

Skip to content
Snippets Groups Projects
Commit 4ef72066 authored by Garet Halliday's avatar Garet Halliday
Browse files

improve discovery for zalando operator discovery

parent 20ea084f
Branches
Tags
No related merge requests found
package discovery
// Discoverer looks up and returns the servers. It must implement either Clusters or Added, Updated, and Removed.
// Both can be implemented for extra robustness.
// Discoverer looks up and returns the servers. It must implement Clusters. Optionally, it can implement Added
// and Removed for faster updating. For updates, just send to Added.
type Discoverer interface {
Clusters() ([]Cluster, error)
Added() <-chan Cluster
Updated() <-chan Cluster
Removed() <-chan string
}
......@@ -100,10 +100,6 @@ func (T *Discoverer) Added() <-chan discovery.Cluster {
return nil
}
func (T *Discoverer) Updated() <-chan discovery.Cluster {
return nil
}
func (T *Discoverer) Removed() <-chan string {
return nil
}
......
......@@ -208,10 +208,6 @@ func (T *Discoverer) Added() <-chan discovery.Cluster {
return nil
}
func (T *Discoverer) Updated() <-chan discovery.Cluster {
return nil
}
func (T *Discoverer) Removed() <-chan string {
return nil
}
......
......@@ -36,7 +36,6 @@ type Discoverer struct {
k8s k8sutil.KubernetesClient
added chan discovery.Cluster
updated chan discovery.Cluster
removed chan string
done chan struct{}
......@@ -107,7 +106,6 @@ func (T *Discoverer) Provision(ctx caddy.Context) error {
)
T.added = make(chan discovery.Cluster)
T.updated = make(chan discovery.Cluster)
T.removed = make(chan string)
_, err = T.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
......@@ -131,7 +129,7 @@ func (T *Discoverer) Provision(ctx caddy.Context) error {
if err != nil {
return
}
T.updated <- cluster
T.added <- cluster
},
DeleteFunc: func(obj interface{}) {
psql, ok := obj.(*acidv1.Postgresql)
......@@ -207,15 +205,25 @@ func (T *Discoverer) postgresqlToCluster(cluster acidv1.Postgresql) (discovery.C
}
func (T *Discoverer) Clusters() ([]discovery.Cluster, error) {
return nil, nil
clusters, err := T.k8s.Postgresqls(T.Namespace).List(context.Background(), metav1.ListOptions{})
if err != nil {
return nil, err
}
func (T *Discoverer) Added() <-chan discovery.Cluster {
return T.added
res := make([]discovery.Cluster, 0, len(clusters.Items))
for _, cluster := range clusters.Items {
r, err := T.postgresqlToCluster(cluster)
if err != nil {
return nil, err
}
res = append(res, r)
}
return res, nil
}
func (T *Discoverer) Updated() <-chan discovery.Cluster {
return T.updated
func (T *Discoverer) Added() <-chan discovery.Cluster {
return T.added
}
func (T *Discoverer) Removed() <-chan string {
......
......@@ -125,6 +125,10 @@ func (T *Module) creds(user User) (primary, replica auth.Credentials) {
}
func (T *Module) added(cluster Cluster) {
if prev, ok := T.clusters[cluster.ID]; ok {
T.updated(prev, cluster)
return
}
if T.clusters == nil {
T.clusters = make(map[string]Cluster)
}
......@@ -475,8 +479,6 @@ func (T *Module) discoverLoop() {
T.added(cluster)
case id := <-T.discoverer.Removed():
T.removed(id)
case next := <-T.discoverer.Updated():
T.updated(T.clusters[next.ID], next)
case <-reconcile:
err := T.reconcile()
if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment