good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit 55596806 authored by a's avatar a
Browse files

ok

parent 413679a7
No related branches found
No related tags found
1 merge request!23Whatthefuck
Pipeline #30097 failed
...@@ -12,6 +12,25 @@ import ( ...@@ -12,6 +12,25 @@ import (
"gfx.cafe/open/jrpc/pkg/server" "gfx.cafe/open/jrpc/pkg/server"
) )
func TestBenchmarkSuite(t *testing.T) {
var executeTest = jrpctest.TestExecutor(rdwr.ServerMaker)
var makeTest = func(name string, fm jrpctest.TestContext) {
t.Run(name, func(t *testing.T) {
executeTest(t, fm)
})
}
ctx := context.Background()
makeTest("SingleClient", func(t *testing.T, server *server.Server, client codec.Conn) {
for i := 0; i < 10; i++ {
err := client.Do(ctx, nil, "test_ping", nil)
if err != nil {
t.Error(err)
}
}
})
}
func runBenchmarkSuite(b *testing.B, sm jrpctest.ServerMaker) { func runBenchmarkSuite(b *testing.B, sm jrpctest.ServerMaker) {
ctx := context.Background() ctx := context.Background()
executeBench := jrpctest.BenchExecutor(sm) executeBench := jrpctest.BenchExecutor(sm)
......
...@@ -48,7 +48,8 @@ func (c *Codec) PeerInfo() codec.PeerInfo { ...@@ -48,7 +48,8 @@ func (c *Codec) PeerInfo() codec.PeerInfo {
func (c *Codec) decodeSingleMessage(ctx context.Context) (*serverutil.Bundle, error) { func (c *Codec) decodeSingleMessage(ctx context.Context) (*serverutil.Bundle, error) {
c.decLock.Lock() c.decLock.Lock()
defer c.decLock.Unlock() defer c.decLock.Unlock()
c.decBuf = c.decBuf[:0] //c.decBuf = c.decBuf[:0]
c.decBuf = json.RawMessage{}
err := c.dec.DecodeContext(ctx, &c.decBuf) err := c.dec.DecodeContext(ctx, &c.decBuf)
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -3,7 +3,6 @@ package websocket ...@@ -3,7 +3,6 @@ package websocket
import ( import (
"context" "context"
"io" "io"
"log"
"net/http" "net/http"
"sync" "sync"
"time" "time"
...@@ -17,12 +16,6 @@ import ( ...@@ -17,12 +16,6 @@ import (
"gfx.cafe/open/jrpc/pkg/serverutil" "gfx.cafe/open/jrpc/pkg/serverutil"
) )
func init() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
}
type Codec struct { type Codec struct {
closed chan struct{} closed chan struct{}
conn *websocket.Conn conn *websocket.Conn
......
...@@ -31,6 +31,22 @@ func (i *IdReply) NextId() *codec.ID { ...@@ -31,6 +31,22 @@ func (i *IdReply) NextId() *codec.ID {
return codec.NewNumberIDPtr(i.id.Add(1)) return codec.NewNumberIDPtr(i.id.Add(1))
} }
func (i *IdReply) makeOrTake(id []byte) chan msgOrError {
i.mu.Lock()
defer i.mu.Unlock()
ch, ok := i.chs[string(id)]
if ok {
// take
delete(i.chs, string(id))
} else {
// make
ch = make(chan msgOrError, 1)
i.chs[string(id)] = ch
}
return ch
}
func (i *IdReply) make(id []byte) <-chan msgOrError { func (i *IdReply) make(id []byte) <-chan msgOrError {
i.mu.Lock() i.mu.Lock()
defer i.mu.Unlock() defer i.mu.Unlock()
...@@ -54,7 +70,7 @@ func (i *IdReply) remove(id []byte) { ...@@ -54,7 +70,7 @@ func (i *IdReply) remove(id []byte) {
} }
func (i *IdReply) Resolve(id []byte, msg json.RawMessage, err error) { func (i *IdReply) Resolve(id []byte, msg json.RawMessage, err error) {
ch := i.take(id) ch := i.makeOrTake(id)
if ch == nil { if ch == nil {
return return
} }
...@@ -73,7 +89,7 @@ func (i *IdReply) Resolve(id []byte, msg json.RawMessage, err error) { ...@@ -73,7 +89,7 @@ func (i *IdReply) Resolve(id []byte, msg json.RawMessage, err error) {
func (i *IdReply) Ask(ctx context.Context, id []byte) (json.RawMessage, error) { func (i *IdReply) Ask(ctx context.Context, id []byte) (json.RawMessage, error) {
select { select {
case resp := <-i.make(id): case resp := <-i.makeOrTake(id):
return resp.msg, resp.err return resp.msg, resp.err
case <-ctx.Done(): case <-ctx.Done():
i.remove(id) i.remove(id)
......
...@@ -2,10 +2,8 @@ package server ...@@ -2,10 +2,8 @@ package server
import ( import (
"context" "context"
"log"
"net/http" "net/http"
"sync" "sync"
"time"
"gfx.cafe/open/jrpc/pkg/codec" "gfx.cafe/open/jrpc/pkg/codec"
"github.com/goccy/go-json" "github.com/goccy/go-json"
...@@ -63,14 +61,10 @@ func (c *callRespWriter) Send(v any, e error) (err error) { ...@@ -63,14 +61,10 @@ func (c *callRespWriter) Send(v any, e error) (err error) {
} }
return nil return nil
} }
s := time.Now()
log.Println("try")
err = c.cr.mu.Acquire(c.ctx, 1) err = c.cr.mu.Acquire(c.ctx, 1)
if err != nil { if err != nil {
return err return err
} }
log.Println("release", time.Since(s))
s2 := time.Now()
defer c.cr.mu.Release(1) defer c.cr.mu.Release(1)
err = c.cr.send(c.ctx, &callEnv{ err = c.cr.send(c.ctx, &callEnv{
v: &v, v: &v,
...@@ -78,7 +72,6 @@ func (c *callRespWriter) Send(v any, e error) (err error) { ...@@ -78,7 +72,6 @@ func (c *callRespWriter) Send(v any, e error) (err error) {
id: c.msg.ID, id: c.msg.ID,
extrafields: c.msg.ExtraFields, extrafields: c.msg.ExtraFields,
}) })
log.Println("release", time.Since(s2))
err = c.cr.remote.Flush() err = c.cr.remote.Flush()
if err != nil { if err != nil {
return err return err
......
...@@ -68,7 +68,6 @@ func (s *Server) ServeCodec(ctx context.Context, remote codec.ReaderWriter) erro ...@@ -68,7 +68,6 @@ func (s *Server) ServeCodec(ctx context.Context, remote codec.ReaderWriter) erro
}() }()
} }
}() }()
wg.Wait()
allErrs = append(allErrs, err) allErrs = append(allErrs, err)
if len(allErrs) > 0 { if len(allErrs) > 0 {
return errors.Join(allErrs...) return errors.Join(allErrs...)
...@@ -143,13 +142,6 @@ func (s *Server) serveBatch(ctx context.Context, ...@@ -143,13 +142,6 @@ func (s *Server) serveBatch(ctx context.Context,
batchResults = append(batchResults, v) batchResults = append(batchResults, v)
} }
} }
// early respond to nil requests
if v.err != nil {
v.sendCalled = true
v.doneMu.Release(1)
wg.Done()
continue
}
// now process each request in its own goroutine // now process each request in its own goroutine
// TODO: stress test this. // TODO: stress test this.
go func() { go func() {
...@@ -162,9 +154,7 @@ func (s *Server) serveBatch(ctx context.Context, ...@@ -162,9 +154,7 @@ func (s *Server) serveBatch(ctx context.Context,
s.services.ServeRPC(v, req) s.services.ServeRPC(v, req)
}() }()
} }
if r.batch { if r.batch {
// we only need to do this if this is a batch call with requests
err = doneMu.Acquire(ctx, int64(totalRequests)) err = doneMu.Acquire(ctx, int64(totalRequests))
if err != nil { if err != nil {
return err return err
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment