diff --git a/lib/gat/server.go b/lib/gat/server.go index f9b7662a4af6a114e870624ae4aa212035d57eea..584d6b54a6e61184577b250885727c7c2eecc652 100644 --- a/lib/gat/server.go +++ b/lib/gat/server.go @@ -5,6 +5,10 @@ import ( "crypto/tls" "errors" "fmt" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "io" "net" @@ -30,19 +34,27 @@ type Server struct { routes []*Route cancellableHandlers []CancellableHandler metricsHandlers []MetricsHandler - - log *zap.Logger + tracer trace.Tracer + log *zap.Logger } -func (T *Server) Provision(ctx caddy.Context) error { - T.log = ctx.Logger() +func (T *Server) Provision(cdyctx caddy.Context) error { + T.log = cdyctx.Logger() + T.tracer = otel.Tracer("server", trace.WithInstrumentationAttributes( + attribute.String("component", "gfx.cafe/gfx/pggat/lib/gat/server.go"), + )) + + // note give Caddy, using the returned context in subsequent provision calls + // seems problematic + _, span := T.tracer.Start(cdyctx.Context, "provision", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() T.listen = make([]*Listener, 0, len(T.Listen)) for _, config := range T.Listen { listener := &Listener{ ListenerConfig: config, } - if err := listener.Provision(ctx); err != nil { + if err := listener.Provision(cdyctx); err != nil { return err } T.listen = append(T.listen, listener) @@ -53,7 +65,7 @@ func (T *Server) Provision(ctx caddy.Context) error { route := &Route{ RouteConfig: config, } - if err := route.Provision(ctx); err != nil { + if err := route.Provision(cdyctx); err != nil { return err } if cancellableHandler, ok := route.handle.(CancellableHandler); ok { @@ -69,6 +81,19 @@ func (T *Server) Provision(ctx caddy.Context) error { } func (T *Server) Start(ctx context.Context) error { + ctx, span := T.tracer.Start(ctx, "Start", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + + err := T.start(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err +} + +func (T *Server) start(_ context.Context) error { for _, listener := range T.listen { if err := listener.Start(); err != nil { return err @@ -76,7 +101,8 @@ func (T *Server) Start(ctx context.Context) error { go func(listener *Listener) { for { - if !T.acceptFrom(ctx, listener) { + // acceptFrom creates its own context + if !T.acceptFrom(listener) { break } } @@ -87,6 +113,19 @@ func (T *Server) Start(ctx context.Context) error { } func (T *Server) Stop(ctx context.Context) error { + ctx, span := T.tracer.Start(ctx, "Stop", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + + err := T.stop(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err +} + +func (T *Server) stop(_ context.Context) error { for _, listen := range T.listen { if err := listen.Stop(); err != nil { return err @@ -97,19 +136,26 @@ func (T *Server) Stop(ctx context.Context) error { } func (T *Server) Cancel(ctx context.Context, key fed.BackendKey) { + ctx, span := T.tracer.Start(ctx, "Cancel", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + for _, cancellableHandler := range T.cancellableHandlers { cancellableHandler.Cancel(ctx, key) } } func (T *Server) ReadMetrics(ctx context.Context, m *metrics.Server) { + ctx, span := T.tracer.Start(ctx, "ReadMetrics", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + for _, metricsHandler := range T.metricsHandlers { metricsHandler.ReadMetrics(ctx, &m.Handler) } } -func (T *Server) Serve(conn *fed.Conn) { - ctx := context.Background() +func (T *Server) Serve(ctx context.Context, conn *fed.Conn) { + ctx, span := T.tracer.Start(ctx, "Serve", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() composed := Router(RouterFunc(func(ctx context.Context, conn *fed.Conn) error { // database not found @@ -143,6 +189,10 @@ func (T *Server) Serve(conn *fed.Conn) { errResp := perror.ToPacket(perror.Wrap(err)) _ = conn.WritePacket(ctx, errResp) + + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return } } @@ -194,14 +244,20 @@ func (T *Server) accept(ctx context.Context, listener *Listener, conn *fed.Conn) return } prom.Listener.Accepted(labels).Inc() - T.Serve(conn) + T.Serve(ctx, conn) } -func (T *Server) acceptFrom(ctx context.Context, listener *Listener) bool { +func (T *Server) acceptFrom(listener *Listener) bool { + ctx, span := T.tracer.Start(context.Background(), "acceptFrom", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + err := listener.listener.Accept(func(c *fed.Conn) { T.accept(ctx, listener, c) }) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + if errors.Is(err, net.ErrClosed) { return false }