good morning!!!!
Skip to content
GitLab
Explore
Sign in
Commits on Source (4)
noot
· 6480afcb
a
authored
Oct 10, 2024
6480afcb
feat: add trace middleware
· 5b2fa130
a
authored
Oct 10, 2024
5b2fa130
fix: delete traceWorker
· a41fa6bb
a
authored
Oct 10, 2024
a41fa6bb
fix: update deps
· f3186500
a
authored
Oct 10, 2024
f3186500
Hide whitespace changes
Inline
Side-by-side
fxriver/trace
Worker
.go
→
fxriver/trace
Middleware
.go
View file @
f3186500
...
...
@@ -3,38 +3,30 @@ package fxriver
import
(
"context"
"fmt"
"strings"
"gfx.cafe/util/go/gotel"
"github.com/riverqueue/river"
"github.com/riverqueue/river/rivertype"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"strings"
"time"
)
type
TraceWorker
[
T
river
.
JobArgs
]
struct
{
worker
river
.
Worker
[
T
]
func
NewTraceMiddleware
()
rivertype
.
WorkerMiddleware
{
return
&
traceWorkerMiddleware
{}
}
func
NewTraceWorker
[
T
river
.
JobArgs
](
worker
river
.
Worker
[
T
])
river
.
Worker
[
T
]
{
o
:=
TraceWorker
[
T
]{
worker
:
worker
,
}
return
o
type
traceWorkerMiddleware
struct
{
// embed JobInsertMiddlewareDefaults for forward compatibility
// in case additional methods are added to the interface:
river
.
WorkerMiddlewareDefaults
}
func
(
o
TraceWorker
[
T
])
NextRetry
(
job
*
river
.
Job
[
T
])
time
.
Time
{
return
o
.
worker
.
NextRetry
(
job
)
}
func
(
o
TraceWorker
[
T
])
Timeout
(
job
*
river
.
Job
[
T
])
time
.
Duration
{
return
o
.
worker
.
Timeout
(
job
)
}
func
(
o
TraceWorker
[
T
])
Work
(
ctx
context
.
Context
,
job
*
river
.
Job
[
T
])
(
err
error
)
{
taskName
:=
job
.
Args
.
Kind
()
func
(
m
*
traceWorkerMiddleware
)
Work
(
ctx
context
.
Context
,
job
*
rivertype
.
JobRow
,
doInner
func
(
ctx
context
.
Context
)
error
)
error
{
// Extract the trace ID from the job metadata and log it.
taskName
:=
job
.
Kind
pkg
,
_
,
_
:=
strings
.
Cut
(
taskName
,
"."
)
tracer
:=
otel
.
Tracer
(
pkg
)
...
...
@@ -44,8 +36,7 @@ func (o TraceWorker[T]) Work(ctx context.Context, job *river.Job[T]) (err error)
span
.
SetAttributes
(
gotel
.
MakeKeyValue
(
"p.args"
,
job
))
err
=
o
.
worker
.
Work
(
ctx
,
job
)
err
:=
doInner
(
ctx
)
if
err
!=
nil
{
span
.
SetStatus
(
codes
.
Error
,
err
.
Error
())
span
.
SetAttributes
(
...
...
@@ -56,6 +47,5 @@ func (o TraceWorker[T]) Work(ctx context.Context, job *river.Job[T]) (err error)
}
else
{
span
.
SetStatus
(
codes
.
Ok
,
"completed"
)
}
return
return
err
}
go.mod
View file @
f3186500
...
...
@@ -8,7 +8,7 @@ require (
anime.bike/hrd
v0.0.0-20240702041851-931ec40bba56
github.com/c2h5oh/datasize
v0.0.0-20231215233829-aa82cc1e6500
github.com/cloudevents/sdk-go/v2
v2.15.2
github.com/cristalhq/aconfig
v0.18.6
-0.20231226125657-6e7f9a54f85d
github.com/cristalhq/aconfig
v0.18.6
github.com/fatih/structtag
v1.2.0
github.com/georgysavva/scany/v2
v2.1.3
github.com/go-chi/chi/v5
v5.1.0
...
...
@@ -17,17 +17,18 @@ require (
github.com/lmittmann/tint
v1.0.5
github.com/modern-go/reflect2
v1.0.2
github.com/prometheus/client_golang
v1.20.4
github.com/riverqueue/river
v0.1
2.1
github.com/riverqueue/river/riverdriver/riverpgxv5
v0.1
2.1
github.com/riverqueue/river/rivertype
v0.1
2.1
github.com/riverqueue/river
v0.1
3.0
github.com/riverqueue/river/riverdriver/riverpgxv5
v0.1
3.0
github.com/riverqueue/river/rivertype
v0.1
3.0
github.com/stretchr/testify
v1.9.0
github.com/tidwall/sjson
v1.2.5
go.opentelemetry.io/otel
v1.30.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp
v1.30.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace
v1.30.0
go.opentelemetry.io/otel/sdk
v1.30.0
go.opentelemetry.io/otel/trace
v1.30.0
go.uber.org/fx
v1.22.2
golang.org/x/exp
v0.0.0-20240
9
091
61429-701f63a606c0
golang.org/x/exp
v0.0.0-2024
1
0091
80824-f66d83c29e7c
golang.org/x/sync
v0.8.0
sigs.k8s.io/yaml
v1.4.0
)
...
...
@@ -53,12 +54,11 @@ require (
github.com/prometheus/client_model
v0.6.1 // indirect
github.com/prometheus/common
v0.59.1 // indirect
github.com/prometheus/procfs
v0.15.1 // indirect
github.com/riverqueue/river/riverdriver
v0.1
2.1
// indirect
github.com/riverqueue/river/rivershared
v0.1
2.1
// indirect
github.com/tidwall/gjson
v1.1
7.3
// indirect
github.com/riverqueue/river/riverdriver
v0.1
3.0
// indirect
github.com/riverqueue/river/rivershared
v0.1
3.0
// indirect
github.com/tidwall/gjson
v1.1
8.0
// indirect
github.com/tidwall/match
v1.1.1 // indirect
github.com/tidwall/pretty
v1.2.1 // indirect
github.com/tidwall/sjson
v1.2.5 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace
v1.30.0 // indirect
go.opentelemetry.io/otel/metric
v1.30.0 // indirect
go.opentelemetry.io/proto/otlp
v1.3.1 // indirect
...
...
@@ -69,7 +69,7 @@ require (
golang.org/x/crypto
v0.27.0 // indirect
golang.org/x/net
v0.29.0 // indirect
golang.org/x/sys
v0.25.0 // indirect
golang.org/x/text
v0.1
8
.0 // indirect
golang.org/x/text
v0.1
9
.0 // indirect
google.golang.org/genproto/googleapis/api
v0.0.0-20240930140551-af27646dc61f // indirect
google.golang.org/genproto/googleapis/rpc
v0.0.0-20240930140551-af27646dc61f // indirect
google.golang.org/grpc
v1.67.0 // indirect
...
...
go.sum
View file @
f3186500
...
...
@@ -14,6 +14,8 @@ github.com/cockroachdb/cockroach-go/v2 v2.2.0 h1:/5znzg5n373N/3ESjHF5SMLxiW4RKB0
github.com/cockroachdb/cockroach-go/v2
v2.2.0/go.mod h1:
u3MiKYGupPPjkn3ozknpMUpxPaNLTFWAya419/zv6eI=
github.com/cristalhq/aconfig
v0.18.6-0.20231226125657-6e7f9a54f85d h1:
Ywr0kz8Pf8uxAVedQ9iuFxtQwwSxozru2QCfAjxIdWo=
github.com/cristalhq/aconfig
v0.18.6-0.20231226125657-6e7f9a54f85d/go.mod h1:
9ogrGEt9yU5V4pif/ThkVUfhj8JkdV+iDeahZGgfnDU=
github.com/cristalhq/aconfig
v0.18.6 h1:
8KRBznzdjUUiaa7HeIpYbMx1uPE1/xOBEU1ajsnmNME=
github.com/cristalhq/aconfig
v0.18.6/go.mod h1:
9ogrGEt9yU5V4pif/ThkVUfhj8JkdV+iDeahZGgfnDU=
github.com/davecgh/go-spew
v1.1.0/go.mod h1:
J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew
v1.1.1 h1:
vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew
v1.1.1/go.mod h1:
J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
...
...
@@ -85,18 +87,18 @@ github.com/prometheus/common v0.59.1 h1:LXb1quJHWm1P6wq/U824uxYi4Sg0oGvNeUm1z5dJ
github.com/prometheus/common
v0.59.1/go.mod h1:
GpWM7dewqmVYcd7SmRaiWVe9SSqjf0UrwnYnpEZNuT0=
github.com/prometheus/procfs
v0.15.1 h1:
YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs
v0.15.1/go.mod h1:
fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/riverqueue/river
v0.1
2.1
h1:
TS3FVioPStlvb7yj1dYgtgX+zn/3JkLdPn+S6qNBcZ0
=
github.com/riverqueue/river
v0.1
2.1
/go.mod h1:
j7O42JlHo76YgXkAFX66E63Ke890/oSUUlui/ImyLu
U=
github.com/riverqueue/river/riverdriver
v0.1
2.1
h1:
MqAh6mw9h/m/nNXImJTXXtCefTDPdmZSlgsUdSYUHe0
=
github.com/riverqueue/river/riverdriver
v0.1
2.1
/go.mod h1:
E4hf4wPidG0xYrwsez4R9u4LvLdjlDu9m4iJFpb1DfQ
=
github.com/riverqueue/river/riverdriver/riverdatabasesql
v0.1
2.1
h1:
E2pYemeaaiqOqr1x1Cq872IdulGu5z/iIHChqxPJwfA
=
github.com/riverqueue/river/riverdriver/riverdatabasesql
v0.1
2.1
/go.mod h1:
+5DVUCfdPS3ZtsRm4V0GzQfXJI9MsFvq3BNqW/Nei3E
=
github.com/riverqueue/river/riverdriver/riverpgxv5
v0.1
2.1
h1:
stodaBk+GKMU4Uwoj2tShG5L/EK/E5gWOQwZhsJ65QY
=
github.com/riverqueue/river/riverdriver/riverpgxv5
v0.1
2.1
/go.mod h1:
kwV0SdmvBYOj3hsI4sn3tQQQ5NqXrq68yDvKb1Jms1E
=
github.com/riverqueue/river/rivershared
v0.1
2.1
h1:
7y03CM6iYrSoT1k6ylneTIoK74qQ27yi1aoT3dozU6Y
=
github.com/riverqueue/river/rivershared
v0.1
2.1
/go.mod h1:
IpJ63Jz/Rx61nKhJ45K9IdJR0VEHf3qnFlEPI9l11HM
=
github.com/riverqueue/river/rivertype
v0.1
2.1
h1:
iTciVhZ/yQQQBMAouivPrSlrQH8MEK5uCVtzu3eITu8
=
github.com/riverqueue/river/rivertype
v0.1
2.1
/go.mod h1:
3WRQEDlLKZky/vGwFcZC3uKjC+/8izE6ucHwCsuir98
=
github.com/riverqueue/river
v0.1
3.0
h1:
BvEJfXAnHJ7HwraoPZWiD271t2jDVvX1SPCtvLzojiA
=
github.com/riverqueue/river
v0.1
3.0
/go.mod h1:
SOG+j28RQpKDsTA8AlfxjFdYpoPm+MSOio+Ev4ljN2
U=
github.com/riverqueue/river/riverdriver
v0.1
3.0
h1:
UVzMtNfp3R+Ehr/yaRqgF58YOFEWGVqIAamCeK7RMkA
=
github.com/riverqueue/river/riverdriver
v0.1
3.0
/go.mod h1:
pxmx6qmGl+dNCrfa+xuktg8zrrZO3AEqlUFlFWOy8U4
=
github.com/riverqueue/river/riverdriver/riverdatabasesql
v0.1
3.0
h1:
xiiwQVFUoPv/7PQIsEIerpw2ux1lZ14oZScgiB4JHdE
=
github.com/riverqueue/river/riverdriver/riverdatabasesql
v0.1
3.0
/go.mod h1:
f7TWWD965tE6v96qi1Y40IP2shsAai0qJBHbqT7yFLM
=
github.com/riverqueue/river/riverdriver/riverpgxv5
v0.1
3.0
h1:
wjLgea/eI5rIMh0+TCjS+/+dsULIst3Wu8bZQo2DHno
=
github.com/riverqueue/river/riverdriver/riverpgxv5
v0.1
3.0
/go.mod h1:
Vzt3E33kNks2vN9lTgLJL8VFrbcAWDbwzyZLo02FlBk
=
github.com/riverqueue/river/rivershared
v0.1
3.0
h1:
AqRP54GgtwoLIvV5eoZmOGOCZXL8Ce5Zm8s60R8NKOA
=
github.com/riverqueue/river/rivershared
v0.1
3.0
/go.mod h1:
vzvawQpDy2Z1U5chkvh1NykzWNkRhc9RLcURsJRhlbE
=
github.com/riverqueue/river/rivertype
v0.1
3.0
h1:
PkT3h9tP0ZV3h0EGy2MiwEhgZqpRMN4fXfj27UKc9Q0
=
github.com/riverqueue/river/rivertype
v0.1
3.0
/go.mod h1:
wVOhGBeay6+JcIi0pTFlF4KtUgHYFkhMYv8dpxU46W0
=
github.com/robfig/cron/v3
v3.0.1 h1:
WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3
v3.0.1/go.mod h1:
eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal
v1.12.0 h1:
exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
...
...
@@ -109,8 +111,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify
v1.9.0 h1:
HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify
v1.9.0/go.mod h1:
r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/gjson
v1.14.2/go.mod h1:
/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson
v1.1
7.3
h1:
bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94
=
github.com/tidwall/gjson
v1.1
7.3
/go.mod h1:
/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson
v1.1
8.0
h1:
FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY
=
github.com/tidwall/gjson
v1.1
8.0
/go.mod h1:
/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match
v1.1.1 h1:
+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match
v1.1.1/go.mod h1:
eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty
v1.2.0/go.mod h1:
ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
...
...
@@ -148,14 +150,16 @@ golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto
v0.27.0/go.mod h1:
1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/exp
v0.0.0-20240909161429-701f63a606c0 h1:
e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
golang.org/x/exp
v0.0.0-20240909161429-701f63a606c0/go.mod h1:
2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
golang.org/x/exp
v0.0.0-20241009180824-f66d83c29e7c h1:
7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY=
golang.org/x/exp
v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:
NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
golang.org/x/net
v0.29.0 h1:
5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net
v0.29.0/go.mod h1:
gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync
v0.8.0 h1:
3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync
v0.8.0/go.mod h1:
Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys
v0.25.0 h1:
r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys
v0.25.0/go.mod h1:
/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text
v0.1
8
.0 h1:
XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224
=
golang.org/x/text
v0.1
8
.0/go.mod h1:
BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text
v0.1
9
.0 h1:
kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM
=
golang.org/x/text
v0.1
9
.0/go.mod h1:
BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/api
v0.0.0-20240930140551-af27646dc61f h1:
jTm13A2itBi3La6yTGqn8bVSrc3ZZ1r8ENHlIXBfnRA=
google.golang.org/genproto/googleapis/api
v0.0.0-20240930140551-af27646dc61f/go.mod h1:
CLGoBuH1VHxAUXVPP8FfPwPEVJB6lz3URE5mY2SuayE=
google.golang.org/genproto/googleapis/rpc
v0.0.0-20240930140551-af27646dc61f h1:
cUMEy+8oS78BWIH9OWazBkzbr090Od9tWBNtZHkOhf0=
...
...