good morning!!!!

Skip to content
Commits on Source (4)
......@@ -3,38 +3,30 @@ package fxriver
import (
"context"
"fmt"
"strings"
"gfx.cafe/util/go/gotel"
"github.com/riverqueue/river"
"github.com/riverqueue/river/rivertype"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"strings"
"time"
)
type TraceWorker[T river.JobArgs] struct {
worker river.Worker[T]
func NewTraceMiddleware() rivertype.WorkerMiddleware {
return &traceWorkerMiddleware{}
}
func NewTraceWorker[T river.JobArgs](worker river.Worker[T]) river.Worker[T] {
o := TraceWorker[T]{
worker: worker,
}
return o
type traceWorkerMiddleware struct {
// embed JobInsertMiddlewareDefaults for forward compatibility
// in case additional methods are added to the interface:
river.WorkerMiddlewareDefaults
}
func (o TraceWorker[T]) NextRetry(job *river.Job[T]) time.Time {
return o.worker.NextRetry(job)
}
func (o TraceWorker[T]) Timeout(job *river.Job[T]) time.Duration {
return o.worker.Timeout(job)
}
func (o TraceWorker[T]) Work(ctx context.Context, job *river.Job[T]) (err error) {
taskName := job.Args.Kind()
func (m *traceWorkerMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
// Extract the trace ID from the job metadata and log it.
taskName := job.Kind
pkg, _, _ := strings.Cut(taskName, ".")
tracer := otel.Tracer(pkg)
......@@ -44,8 +36,7 @@ func (o TraceWorker[T]) Work(ctx context.Context, job *river.Job[T]) (err error)
span.SetAttributes(gotel.MakeKeyValue("p.args", job))
err = o.worker.Work(ctx, job)
err := doInner(ctx)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.SetAttributes(
......@@ -56,6 +47,5 @@ func (o TraceWorker[T]) Work(ctx context.Context, job *river.Job[T]) (err error)
} else {
span.SetStatus(codes.Ok, "completed")
}
return
return err
}
......@@ -8,7 +8,7 @@ require (
anime.bike/hrd v0.0.0-20240702041851-931ec40bba56
github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/cristalhq/aconfig v0.18.6-0.20231226125657-6e7f9a54f85d
github.com/cristalhq/aconfig v0.18.6
github.com/fatih/structtag v1.2.0
github.com/georgysavva/scany/v2 v2.1.3
github.com/go-chi/chi/v5 v5.1.0
......@@ -17,17 +17,18 @@ require (
github.com/lmittmann/tint v1.0.5
github.com/modern-go/reflect2 v1.0.2
github.com/prometheus/client_golang v1.20.4
github.com/riverqueue/river v0.12.1
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.12.1
github.com/riverqueue/river/rivertype v0.12.1
github.com/riverqueue/river v0.13.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.13.0
github.com/riverqueue/river/rivertype v0.13.0
github.com/stretchr/testify v1.9.0
github.com/tidwall/sjson v1.2.5
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.30.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.30.0
go.opentelemetry.io/otel/sdk v1.30.0
go.opentelemetry.io/otel/trace v1.30.0
go.uber.org/fx v1.22.2
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
golang.org/x/sync v0.8.0
sigs.k8s.io/yaml v1.4.0
)
......@@ -53,12 +54,11 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.59.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/riverqueue/river/riverdriver v0.12.1 // indirect
github.com/riverqueue/river/rivershared v0.12.1 // indirect
github.com/tidwall/gjson v1.17.3 // indirect
github.com/riverqueue/river/riverdriver v0.13.0 // indirect
github.com/riverqueue/river/rivershared v0.13.0 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.30.0 // indirect
go.opentelemetry.io/otel/metric v1.30.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
......@@ -69,7 +69,7 @@ require (
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240930140551-af27646dc61f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f // indirect
google.golang.org/grpc v1.67.0 // indirect
......
......@@ -14,6 +14,8 @@ github.com/cockroachdb/cockroach-go/v2 v2.2.0 h1:/5znzg5n373N/3ESjHF5SMLxiW4RKB0
github.com/cockroachdb/cockroach-go/v2 v2.2.0/go.mod h1:u3MiKYGupPPjkn3ozknpMUpxPaNLTFWAya419/zv6eI=
github.com/cristalhq/aconfig v0.18.6-0.20231226125657-6e7f9a54f85d h1:Ywr0kz8Pf8uxAVedQ9iuFxtQwwSxozru2QCfAjxIdWo=
github.com/cristalhq/aconfig v0.18.6-0.20231226125657-6e7f9a54f85d/go.mod h1:9ogrGEt9yU5V4pif/ThkVUfhj8JkdV+iDeahZGgfnDU=
github.com/cristalhq/aconfig v0.18.6 h1:8KRBznzdjUUiaa7HeIpYbMx1uPE1/xOBEU1ajsnmNME=
github.com/cristalhq/aconfig v0.18.6/go.mod h1:9ogrGEt9yU5V4pif/ThkVUfhj8JkdV+iDeahZGgfnDU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
......@@ -85,18 +87,18 @@ github.com/prometheus/common v0.59.1 h1:LXb1quJHWm1P6wq/U824uxYi4Sg0oGvNeUm1z5dJ
github.com/prometheus/common v0.59.1/go.mod h1:GpWM7dewqmVYcd7SmRaiWVe9SSqjf0UrwnYnpEZNuT0=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/riverqueue/river v0.12.1 h1:TS3FVioPStlvb7yj1dYgtgX+zn/3JkLdPn+S6qNBcZ0=
github.com/riverqueue/river v0.12.1/go.mod h1:j7O42JlHo76YgXkAFX66E63Ke890/oSUUlui/ImyLuU=
github.com/riverqueue/river/riverdriver v0.12.1 h1:MqAh6mw9h/m/nNXImJTXXtCefTDPdmZSlgsUdSYUHe0=
github.com/riverqueue/river/riverdriver v0.12.1/go.mod h1:E4hf4wPidG0xYrwsez4R9u4LvLdjlDu9m4iJFpb1DfQ=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.12.1 h1:E2pYemeaaiqOqr1x1Cq872IdulGu5z/iIHChqxPJwfA=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.12.1/go.mod h1:+5DVUCfdPS3ZtsRm4V0GzQfXJI9MsFvq3BNqW/Nei3E=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.12.1 h1:stodaBk+GKMU4Uwoj2tShG5L/EK/E5gWOQwZhsJ65QY=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.12.1/go.mod h1:kwV0SdmvBYOj3hsI4sn3tQQQ5NqXrq68yDvKb1Jms1E=
github.com/riverqueue/river/rivershared v0.12.1 h1:7y03CM6iYrSoT1k6ylneTIoK74qQ27yi1aoT3dozU6Y=
github.com/riverqueue/river/rivershared v0.12.1/go.mod h1:IpJ63Jz/Rx61nKhJ45K9IdJR0VEHf3qnFlEPI9l11HM=
github.com/riverqueue/river/rivertype v0.12.1 h1:iTciVhZ/yQQQBMAouivPrSlrQH8MEK5uCVtzu3eITu8=
github.com/riverqueue/river/rivertype v0.12.1/go.mod h1:3WRQEDlLKZky/vGwFcZC3uKjC+/8izE6ucHwCsuir98=
github.com/riverqueue/river v0.13.0 h1:BvEJfXAnHJ7HwraoPZWiD271t2jDVvX1SPCtvLzojiA=
github.com/riverqueue/river v0.13.0/go.mod h1:SOG+j28RQpKDsTA8AlfxjFdYpoPm+MSOio+Ev4ljN2U=
github.com/riverqueue/river/riverdriver v0.13.0 h1:UVzMtNfp3R+Ehr/yaRqgF58YOFEWGVqIAamCeK7RMkA=
github.com/riverqueue/river/riverdriver v0.13.0/go.mod h1:pxmx6qmGl+dNCrfa+xuktg8zrrZO3AEqlUFlFWOy8U4=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.13.0 h1:xiiwQVFUoPv/7PQIsEIerpw2ux1lZ14oZScgiB4JHdE=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.13.0/go.mod h1:f7TWWD965tE6v96qi1Y40IP2shsAai0qJBHbqT7yFLM=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.13.0 h1:wjLgea/eI5rIMh0+TCjS+/+dsULIst3Wu8bZQo2DHno=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.13.0/go.mod h1:Vzt3E33kNks2vN9lTgLJL8VFrbcAWDbwzyZLo02FlBk=
github.com/riverqueue/river/rivershared v0.13.0 h1:AqRP54GgtwoLIvV5eoZmOGOCZXL8Ce5Zm8s60R8NKOA=
github.com/riverqueue/river/rivershared v0.13.0/go.mod h1:vzvawQpDy2Z1U5chkvh1NykzWNkRhc9RLcURsJRhlbE=
github.com/riverqueue/river/rivertype v0.13.0 h1:PkT3h9tP0ZV3h0EGy2MiwEhgZqpRMN4fXfj27UKc9Q0=
github.com/riverqueue/river/rivertype v0.13.0/go.mod h1:wVOhGBeay6+JcIi0pTFlF4KtUgHYFkhMYv8dpxU46W0=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
......@@ -109,8 +111,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94=
github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
......@@ -148,14 +150,16 @@ golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/api v0.0.0-20240930140551-af27646dc61f h1:jTm13A2itBi3La6yTGqn8bVSrc3ZZ1r8ENHlIXBfnRA=
google.golang.org/genproto/googleapis/api v0.0.0-20240930140551-af27646dc61f/go.mod h1:CLGoBuH1VHxAUXVPP8FfPwPEVJB6lz3URE5mY2SuayE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f h1:cUMEy+8oS78BWIH9OWazBkzbr090Od9tWBNtZHkOhf0=
......