diff --git a/Makefile b/Makefile index 99ea42ccac920d019012c59573a5634ecae8e971..bf0459ca3ffad0a2cbff6de6a667fb900deb7716 100644 --- a/Makefile +++ b/Makefile @@ -3,13 +3,9 @@ all: runotel -devenv: - export GFX_CORE_ALLOCATION=0 - -otelenv: - export OTEL_RESOURCE_ATTRIBUTES=deployment.environment=local,service.version=0.1.0,service.instance.id=$(HOSTNAME) - export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces - -runotel: devenv otelenv +runotel: export GFX_CORE_ALLOCATION=0 +runotel: export OTEL_RESOURCE_ATTRIBUTES=deployment.environment=local,service.version=0.1.0,service.instance.id=$(HOSTNAME) +runotel: export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces +runotel: go run ./cmd/pggat run pool basic transaction diff --git a/cmd/pggat/main.go b/cmd/pggat/main.go index 640985955af3a108b37b7c3e1e0cce50104c85db..64a67e5a626da9299eaa6c048ba7845be7167738 100644 --- a/cmd/pggat/main.go +++ b/cmd/pggat/main.go @@ -3,10 +3,9 @@ package main import ( "context" caddycmd "gfx.cafe/gfx/pggat/cmd" - "gfx.cafe/util/go/gotel" - _ "gfx.cafe/gfx/pggat/lib/gat/gatcaddyfile" _ "gfx.cafe/gfx/pggat/lib/gat/standard" + "gfx.cafe/util/go/gotel" ) func main() { diff --git a/lib/fed/conn.go b/lib/fed/conn.go index 81efee66ed1c29f7759beb2f4641ea3882946395..adabdf2dcae26dd1fcb597ef118a114bdf989961 100644 --- a/lib/fed/conn.go +++ b/lib/fed/conn.go @@ -19,7 +19,6 @@ type Conn struct { noCopy decorator.NoCopy codec PacketCodec - Ctx context.Context Middleware []Middleware @@ -34,9 +33,8 @@ type Conn struct { Ready bool } -func NewConn(ctx context.Context, codec PacketCodec) *Conn { +func NewConn(codec PacketCodec) *Conn { c := &Conn{ - Ctx: ctx, codec: codec, } return c @@ -60,7 +58,7 @@ func (T *Conn) ReadPacket(typed bool) (Packet, error) { for i := 0; i < len(T.Middleware); i++ { middleware := T.Middleware[i] for { - packet, err := middleware.PreRead(T.Ctx, typed) + packet, err := middleware.PreRead(context.Background(), typed) if err != nil { return nil, err } @@ -70,7 +68,7 @@ func (T *Conn) ReadPacket(typed bool) (Packet, error) { } for j := i; j < len(T.Middleware); j++ { - packet, err = T.Middleware[j].ReadPacket(T.Ctx, packet) + packet, err = T.Middleware[j].ReadPacket(context.Background(), packet) if err != nil { return nil, err } @@ -90,7 +88,7 @@ func (T *Conn) ReadPacket(typed bool) (Packet, error) { return nil, err } for _, middleware := range T.Middleware { - packet, err = middleware.ReadPacket(T.Ctx, packet) + packet, err = middleware.ReadPacket(context.Background(), packet) if err != nil { return nil, err } @@ -113,7 +111,7 @@ func (T *Conn) WritePacket(packet Packet) error { middleware := T.Middleware[i] var err error - packet, err = middleware.WritePacket(T.Ctx, packet) + packet, err = middleware.WritePacket(context.Background(), packet) if err != nil { return err } @@ -133,7 +131,7 @@ func (T *Conn) WritePacket(packet Packet) error { for { var err error - packet, err = middleware.PostWrite(T.Ctx) + packet, err = middleware.PostWrite(context.Background()) if err != nil { return err } @@ -143,7 +141,7 @@ func (T *Conn) WritePacket(packet Packet) error { } for j := i; j >= 0; j-- { - packet, err = T.Middleware[j].WritePacket(T.Ctx, packet) + packet, err = T.Middleware[j].WritePacket(context.Background(), packet) if err != nil { return err } diff --git a/lib/fed/listeners/netconnlistener/listener.go b/lib/fed/listeners/netconnlistener/listener.go index 6ce55f35544374da035e0e1800924216db07bf69..0d6c0c55759bf2b482d96d8b1431c7e2c85da80d 100644 --- a/lib/fed/listeners/netconnlistener/listener.go +++ b/lib/fed/listeners/netconnlistener/listener.go @@ -1,7 +1,6 @@ package netconnlistener import ( - "context" "net" "gfx.cafe/gfx/pggat/lib/fed" @@ -17,7 +16,7 @@ func (listener *Listener) Accept(fn func(*fed.Conn)) error { if err != nil { return err } - fedConn := fed.NewConn(context.Background(), netconncodec.NewCodec(raw)) + fedConn := fed.NewConn(netconncodec.NewCodec(raw)) go func() { fn(fedConn) }() diff --git a/lib/fed/middlewares/tracing/oteltracer.go b/lib/fed/middlewares/tracing/oteltrace.go similarity index 97% rename from lib/fed/middlewares/tracing/oteltracer.go rename to lib/fed/middlewares/tracing/oteltrace.go index ee63a6e83321e92905c68f49b65ca14c8df59768..ce86e5824a77d1468c46b51bdd4ae963ac30acfc 100644 --- a/lib/fed/middlewares/tracing/oteltracer.go +++ b/lib/fed/middlewares/tracing/oteltrace.go @@ -9,7 +9,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" - "log/slog" ) type queryState int @@ -95,7 +94,7 @@ func getStateName(state queryState) (str string) { } func (t *otelTrace) setState(ctx context.Context, state queryState) { - slog.Warn(fmt.Sprintf("State Change: %s => %s", getStateName(t.state), getStateName(state))) + // slog.Warn(fmt.Sprintf("State Change: %s => %s", getStateName(t.state), getStateName(state))) t.state = state } diff --git a/lib/fed/middlewares/tracing/pgtrace.go b/lib/fed/middlewares/tracing/pgtrace.go index 5731d3d8c5ec3d1ea31c3ff4afc4b648058ea737..0f000cbd1750c69f2b7f26e55eb51f0b711ca8a8 100644 --- a/lib/fed/middlewares/tracing/pgtrace.go +++ b/lib/fed/middlewares/tracing/pgtrace.go @@ -5,26 +5,26 @@ import ( "gfx.cafe/gfx/pggat/lib/fed" ) -type pgtrace struct{} +type packetTrace struct{} -func NewPgTrace(ctx context.Context) fed.Middleware { - return &pgtrace{} +func NewPacketTrace(ctx context.Context) fed.Middleware { + return &packetTrace{} } -func (t *pgtrace) ReadPacket(ctx context.Context, packet fed.Packet) (fed.Packet, error) { +func (t *packetTrace) ReadPacket(ctx context.Context, packet fed.Packet) (fed.Packet, error) { logPacket("Read ", packet) return packet, nil } -func (t *pgtrace) WritePacket(ctx context.Context, packet fed.Packet) (fed.Packet, error) { +func (t *packetTrace) WritePacket(ctx context.Context, packet fed.Packet) (fed.Packet, error) { logPacket("Write", packet) return packet, nil } -func (t *pgtrace) PreRead(ctx context.Context, _ bool) (fed.Packet, error) { +func (t *packetTrace) PreRead(ctx context.Context, _ bool) (fed.Packet, error) { return nil, nil } -func (t *pgtrace) PostWrite(ctx context.Context) (fed.Packet, error) { +func (t *packetTrace) PostWrite(ctx context.Context) (fed.Packet, error) { return nil, nil } diff --git a/lib/gat/gatcaddyfile/pool.go b/lib/gat/gatcaddyfile/pool.go index 4babf8ce34e71d46c2f5ec53a98c6c83cd8bb9ff..2baba1eb853034223e77114e61bfdc4d22e6d9cf 100644 --- a/lib/gat/gatcaddyfile/pool.go +++ b/lib/gat/gatcaddyfile/pool.go @@ -1,6 +1,7 @@ package gatcaddyfile import ( + "strconv" "time" "github.com/caddyserver/caddy/v2" @@ -115,6 +116,26 @@ func init() { } else { module.ExtendedQuerySync = true } + case "packet_tracing_option": + if d.NextArg() { + val, err := strconv.Atoi(d.Val()) + if err != nil { + return nil, err + } + module.PacketTracingOption = basic.TracingOption(val) + } else { + module.PacketTracingOption = basic.TracingOptionDisabled + } + case "otel_tracing_option": + if d.NextArg() { + val, err := strconv.Atoi(d.Val()) + if err != nil { + return nil, err + } + module.OtelTracingOption = basic.TracingOption(val) + } else { + module.OtelTracingOption = basic.TracingOptionDisabled + } case "reset_query": if !d.NextArg() { return nil, d.ArgErr() diff --git a/lib/gat/handlers/pool/dialer.go b/lib/gat/handlers/pool/dialer.go index 49ec224a8a8a332773f69863153b497b49c13e27..1da5d9fea194640190cd085290e8c104ce083623 100644 --- a/lib/gat/handlers/pool/dialer.go +++ b/lib/gat/handlers/pool/dialer.go @@ -1,7 +1,6 @@ package pool import ( - "context" "crypto/tls" "encoding/json" "fmt" @@ -67,7 +66,7 @@ func (T *Dialer) Dial() (*fed.Conn, error) { if err != nil { return nil, err } - conn := fed.NewConn(context.Background(), netconncodec.NewCodec(c)) + conn := fed.NewConn(netconncodec.NewCodec(c)) conn.User = T.Username conn.Database = T.Database err = backends.Accept( @@ -91,7 +90,7 @@ func (T *Dialer) Cancel(key fed.BackendKey) { if err != nil { return } - conn := fed.NewConn(context.Background(), netconncodec.NewCodec(c)) + conn := fed.NewConn(netconncodec.NewCodec(c)) defer func() { _ = conn.Close() }() diff --git a/lib/gat/handlers/pool/pools/basic/config.go b/lib/gat/handlers/pool/pools/basic/config.go index 6eb560721bec93c72533494505a431f2f9b3aaea..b968bd6d697748fd1e35b3b12759ed51ffe0b5e4 100644 --- a/lib/gat/handlers/pool/pools/basic/config.go +++ b/lib/gat/handlers/pool/pools/basic/config.go @@ -28,6 +28,20 @@ const ( ParameterStatusSyncDynamic = "dynamic" ) +type TracingOption int + +const ( + // TracingOptionDisabled indicates tracing is disabled + TracingOptionDisabled TracingOption = iota + // TracingOptionClient indicates tracing is enabled for client connections + TracingOptionClient = 1 << (iota - 1) + // TracingOptionServer indicates tracing is enabled for server connections + TracingOptionServer = 1 << (iota - 1) + // TracingOptionClientAndServer indicates tracing is enabled for both + // client and server connections + TracingOptionClientAndServer = TracingOptionClient | TracingOptionServer +) + type Config struct { RawPoolerFactory json.RawMessage `json:"pooler" caddy:"namespace=pggat.handlers.pool.poolers inline_key=pooler"` @@ -46,6 +60,14 @@ type Config struct { // Use true for transaction pooling ExtendedQuerySync bool `json:"extended_query_sync,omitempty"` + // PacketTracingOption enables/disables packet debug tracing for client and/or + // server connections + PacketTracingOption TracingOption `json:"packet_tracing_option,omitempty"` + + // OtelTracingOption enables/disables Open Telemetry tracing for client and/or + // server connections + OtelTracingOption TracingOption `json:"otel_tracing_option,omitempty"` + ServerResetQuery string `json:"server_reset_query,omitempty"` // ClientAcquireTimeout defines how long a client may be in AWAITING_SERVER state before it is disconnected @@ -76,6 +98,8 @@ func (T Config) Spool() spool.Config { PoolerFactory: T.PoolerFactory, UsePS: T.ParameterStatusSync == ParameterStatusSyncDynamic, UseEQP: T.ExtendedQuerySync, + UseOtelTracing: (T.OtelTracingOption & TracingOptionServer) != 0, + UsePacketTracing: (T.PacketTracingOption & TracingOptionServer) != 0, ResetQuery: T.ServerResetQuery, AcquireTimeout: time.Duration(T.ClientAcquireTimeout), IdleTimeout: time.Duration(T.ServerIdleTimeout), @@ -100,6 +124,8 @@ var Session = Config{ ParameterStatusSync: ParameterStatusSyncInitial, ExtendedQuerySync: false, ServerResetQuery: "DISCARD ALL", + OtelTracingOption: TracingOptionClient, + PacketTracingOption: TracingOptionDisabled, } var Transaction = Config{ @@ -113,4 +139,6 @@ var Transaction = Config{ ReleaseAfterTransaction: true, ParameterStatusSync: ParameterStatusSyncDynamic, ExtendedQuerySync: true, + OtelTracingOption: TracingOptionClient, + PacketTracingOption: TracingOptionDisabled, } diff --git a/lib/gat/handlers/pool/pools/basic/pool.go b/lib/gat/handlers/pool/pools/basic/pool.go index cbdb1854525d2bc03a83f2c4dc8e114980dfb49b..072640cd7504cecfc7fac55aa1307cf61a34055c 100644 --- a/lib/gat/handlers/pool/pools/basic/pool.go +++ b/lib/gat/handlers/pool/pools/basic/pool.go @@ -1,6 +1,7 @@ package basic import ( + "context" "fmt" "gfx.cafe/gfx/pggat/lib/fed/middlewares/tracing" "sync" @@ -157,10 +158,17 @@ func (T *Pool) removeClient(client *Client) { } func (T *Pool) Serve(conn *fed.Conn) error { - conn.Middleware = append( - conn.Middleware, - tracing.NewPgTrace(conn.Ctx), - tracing.NewOtelTrace(conn.Ctx)) + if (T.config.PacketTracingOption & TracingOptionClient) != 0 { + conn.Middleware = append( + conn.Middleware, + tracing.NewPacketTrace(context.Background())) + } + + if (T.config.OtelTracingOption & TracingOptionClient) != 0 { + conn.Middleware = append( + conn.Middleware, + tracing.NewOtelTrace(context.Background())) + } if T.config.ParameterStatusSync == ParameterStatusSyncDynamic { conn.Middleware = append( diff --git a/lib/gat/handlers/pool/spool/config.go b/lib/gat/handlers/pool/spool/config.go index ec58e1c6da3a6bc2a758ad4ba4187469e26c2502..f707f5f75b3356101c727d5779581ae5e1834494 100644 --- a/lib/gat/handlers/pool/spool/config.go +++ b/lib/gat/handlers/pool/spool/config.go @@ -16,6 +16,9 @@ type Config struct { // UseEQP controls whether to add the eqp middleware to servers UseEQP bool + UseOtelTracing bool + UsePacketTracing bool + ResetQuery string AcquireTimeout time.Duration diff --git a/lib/gat/handlers/pool/spool/pool.go b/lib/gat/handlers/pool/spool/pool.go index 7589ae16ea073585b4a480993c909d517d9d82f6..6d1157d49d281ed40bc05d802afb2c7095226ca2 100644 --- a/lib/gat/handlers/pool/spool/pool.go +++ b/lib/gat/handlers/pool/spool/pool.go @@ -1,6 +1,8 @@ package spool import ( + "context" + "gfx.cafe/gfx/pggat/lib/fed/middlewares/tracing" "sync" "time" @@ -53,6 +55,18 @@ func NewPool(config Config) *Pool { } func (T *Pool) addServer(conn *fed.Conn) { + if T.config.UsePacketTracing { + conn.Middleware = append( + conn.Middleware, + tracing.NewPacketTrace(context.Background())) + } + + if T.config.UseOtelTracing { + conn.Middleware = append( + conn.Middleware, + tracing.NewOtelTrace(context.Background())) + } + if T.config.UsePS { conn.Middleware = append( conn.Middleware, diff --git a/lib/gsql/pair.go b/lib/gsql/pair.go index 857f8fe13d3411efcfee2691ba8bab21788424ee..2fcaff95d81d73f27868478ff58ab7c557e7aacb 100644 --- a/lib/gsql/pair.go +++ b/lib/gsql/pair.go @@ -1,7 +1,6 @@ package gsql import ( - "context" "net" "gfx.cafe/gfx/pggat/lib/fed" @@ -14,14 +13,12 @@ func NewPair() (*fed.Conn, *fed.Conn, net.Conn, net.Conn) { in := mio.InwardConn{Conn: conn} out := mio.OutwardConn{Conn: conn} inward := fed.NewConn( - context.Background(), netconncodec.NewCodec( in, ), ) inward.Ready = true outward := fed.NewConn( - context.Background(), netconncodec.NewCodec( out, ),