good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit d5afc247 authored by a's avatar a
Browse files

pooling

parent 4a3cb469
No related branches found
No related tags found
No related merge requests found
Pipeline #29745 failed with stage
in 15 minutes and 20 seconds
...@@ -7,27 +7,59 @@ import ( ...@@ -7,27 +7,59 @@ import (
"sync/atomic" "sync/atomic"
"gfx.cafe/open/jrpc" "gfx.cafe/open/jrpc"
"gfx.cafe/open/jrpc/contrib/extension/subscription"
"gfx.cafe/open/jrpc/pkg/codec" "gfx.cafe/open/jrpc/pkg/codec"
) )
var _ codec.Conn = (*Pooling)(nil) var _ codec.Conn = (*Pooling)(nil)
var _ subscription.Conn = (*Pooling)(nil)
type Pooling struct { type Pooling struct {
dialer func(ctx context.Context) (jrpc.Conn, error) dialer func(ctx context.Context) (jrpc.Conn, error)
conns chan codec.Conn conns chan codec.Conn
base codec.Conn base subscription.Conn
closed atomic.Bool closed atomic.Bool
middleware []codec.Middleware middleware []codec.Middleware
mu sync.Mutex mu sync.Mutex
} }
func NewPooling(dialer func(ctx context.Context) (jrpc.Conn, error), max int) *Pooling { func (p *Pooling) getBase(ctx context.Context) (subscription.Conn, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.base == nil {
conn, err := subscription.UpgradeConn(p.dialer(ctx))
if err != nil {
return nil, err
}
p.base = conn
}
return p.base, nil
}
func (p *Pooling) Notify(ctx context.Context, method string, params any) error {
base, err := p.getBase(ctx)
if err != nil {
return err
}
return base.Notify(ctx, method, params)
}
func (p *Pooling) Subscribe(ctx context.Context, namespace string, channel any, args any) (subscription.ClientSubscription, error) {
base, err := p.getBase(ctx)
if err != nil {
return nil, err
}
return base.Subscribe(ctx, namespace, channel, args)
}
func NewPooling(ctx context.Context, dialer func(ctx context.Context) (jrpc.Conn, error), max int) (*Pooling, error) {
r := &Pooling{ r := &Pooling{
dialer: dialer, dialer: dialer,
conns: make(chan codec.Conn, max), conns: make(chan codec.Conn, max),
} }
return r
return r, nil
} }
func (r *Pooling) Do(ctx context.Context, result any, method string, params any) error { func (r *Pooling) Do(ctx context.Context, result any, method string, params any) error {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment