diff --git a/codec/errors.go b/codec/errors.go index a175fab7c25e4dea3fb046882ba32774f3cf4ab3..a74c4950ea879dada7309066e279b775e17262d9 100644 --- a/codec/errors.go +++ b/codec/errors.go @@ -1,7 +1,11 @@ package codec import ( + "encoding/json" "fmt" + "io" + + "github.com/go-faster/jx" ) // HTTPError is returned by client operations when the HTTP status code of the @@ -28,9 +32,36 @@ type Error interface { // A DataError contains some data in addition to the error message. type DataError interface { Error() string // returns the message + ErrorCode() int // returns the error code ErrorData() any // returns the error data } +func EncodeError(i io.Writer, err error) error { + enc := jx.GetEncoder() + defer jx.PutEncoder(enc) + enc.Obj(func(e *jx.Encoder) { + switch er := err.(type) { + case Error: + e.Field("code", func(e *jx.Encoder) { e.Int(er.ErrorCode()) }) + e.Field("message", func(e *jx.Encoder) { e.Str(er.Error()) }) + case DataError: + data, err := json.Marshal(er.ErrorData()) + if err != nil { + data = []byte(`"failed to marshal error data"`) + } + e.Field("code", func(e *jx.Encoder) { e.Int(er.ErrorCode()) }) + e.Field("message", func(e *jx.Encoder) { e.Str(er.Error()) }) + e.Field("data", func(e *jx.Encoder) { + e.Raw(data) + }) + default: + e.Field("code", func(e *jx.Encoder) { e.Int(-32000) }) + e.Field("message", func(e *jx.Encoder) { e.Str(er.Error()) }) + } + }) + return nil +} + type JrpcErr struct { Data any } diff --git a/codec/transport.go b/codec/transport.go index 5155b6fa4f7019d155c3228925228c27e23d3350..12a26e0545b01cedfa563e6155889dfea31a8e10 100644 --- a/codec/transport.go +++ b/codec/transport.go @@ -3,6 +3,7 @@ package codec import ( "context" "encoding/json" + "io" ) type ReaderWriter interface { @@ -21,11 +22,11 @@ type Reader interface { Close() error } -// Writer can write JSON messages to its underlying connection. +// Writer can write bytes messages to their underlying connection. // Implementations must be safe for concurrent use. type Writer interface { // write json blob to stream - WriteJSON(context.Context, any) error + io.Writer // Closed returns a channel which is closed when the connection is closed. Closed() <-chan any // RemoteAddr returns the peer address of the connection. diff --git a/go.mod b/go.mod index 951d120ad8e7d960b5c7dacd913630daf266d75b..a038fdb8638e5e5b4584c536bbc4641db03b1e69 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/deckarep/golang-set v1.8.0 github.com/ethereum/go-ethereum v1.11.1 + github.com/go-faster/jx v1.0.0 github.com/gobuffalo/packr/v2 v2.8.3 github.com/goccy/go-json v0.10.0 github.com/iancoleman/strcase v0.2.0 @@ -24,6 +25,7 @@ require ( github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/gin-gonic/gin v1.7.7 // indirect + github.com/go-faster/errors v0.6.1 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-stack/stack v1.8.1 // indirect github.com/gobuffalo/logger v1.0.7 // indirect @@ -40,6 +42,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/rs/zerolog v1.29.0 // indirect + github.com/segmentio/asm v1.2.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/tklauser/go-sysconf v0.3.11 // indirect diff --git a/go.sum b/go.sum index ac374c050c1d2f9ac72668d864557a270a83664e..4cb13c261a9275e435a0ca316dafd68c5c612a87 100644 --- a/go.sum +++ b/go.sum @@ -92,6 +92,10 @@ github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs= github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U= +github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= +github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY= +github.com/go-faster/jx v1.0.0 h1:HE+ms2e6ZGkZ6u13t8u+onBinrPvIPI+0hWXGELm74g= +github.com/go-faster/jx v1.0.0/go.mod h1:zm8SlkwK+H0TYNKYtVJ/7cWFS7soJBQWhcPctKyYL/4= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -298,6 +302,8 @@ github.com/rs/zerolog v1.29.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6us github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= @@ -322,7 +328,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= @@ -372,6 +378,7 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/handler.go b/handler.go index beb507c43b8548ba5d28f3700063a4e91a5c0c4d..0832b345d6474bf50a7cb8cb5fe34e8ec83c2d09 100644 --- a/handler.go +++ b/handler.go @@ -1,19 +1,3 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - package jrpc import ( diff --git a/server.go b/server.go index 7113c8e335b02eaea7251efbab9c6d6465978ba5..99770879b61094f5116a217cf5571ceb01f588e0 100644 --- a/server.go +++ b/server.go @@ -2,10 +2,16 @@ package jrpc import ( "context" + "io" + "net/http" "sync/atomic" "gfx.cafe/open/jrpc/codec" + "gfx.cafe/util/go/bufpool" + mapset "github.com/deckarep/golang-set" + "github.com/go-faster/jx" + "github.com/goccy/go-json" ) // Server is an RPC server. @@ -13,6 +19,11 @@ type Server struct { services Handler run int32 codecs mapset.Set + Tracing Tracing +} + +type Tracing struct { + ErrorLogger func(remote codec.ReaderWriter, err error) } // NewServer creates a new server instance with no registered handlers. @@ -27,24 +38,212 @@ func NewServer(r Handler) *Server { return server } +func (s *Server) printError(remote codec.ReaderWriter, err error) { + if err != nil { + if s.Tracing.ErrorLogger != nil { + s.Tracing.ErrorLogger(remote, err) + } + } +} + // ServeCodec reads incoming requests from codec, calls the appropriate callback and writes // the response back using the given codec. It will block until the codec is closed or the // server is stopped. In either case the codec is closed. -func (s *Server) ServeCodec(codec codec.ReaderWriter) { - defer codec.Close() +func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) { + defer remote.Close() // Don't serve if server is stopped. if atomic.LoadInt32(&s.run) == 0 { return } // Add the codec to the set so it can be closed by Stop. - s.codecs.Add(codec) - defer s.codecs.Remove(codec) + s.codecs.Add(remote) + defer s.codecs.Remove(remote) + + responder := &callResponder{ + toSend: make(chan *callEnv, 8), + toNotify: make(chan *notifyEnv, 8), + remote: remote, + } + + ctx, cn := context.WithCancel(pctx) + defer cn() + go func() { + defer cn() + err := responder.run(ctx) + if err != nil { + s.printError(remote, err) + } + // lose + err = remote.Close() + if err != nil { + s.printError(remote, err) + } + }() + + for { + msgs, err := remote.ReadBatch(ctx) + if err != nil { + s.printError(remote, err) + return + } + msg, batch := codec.ParseMessage(msgs) + env := &callEnv{ + batch: batch, + } + for _, v := range msg { + env.responses = append(env.responses, + &callRespWriter{ + id: v.ID, + notifications: responder.toNotify, + header: remote.PeerInfo().HTTP.Headers, + }, + ) + } + responder.toSend <- env + } +} + +type callResponder struct { + toSend chan *callEnv + toNotify chan *notifyEnv + remote codec.ReaderWriter +} + +func (c *callResponder) run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case env := <-c.toSend: + err := c.send(ctx, env) + if err != nil { + return err + } + case env := <-c.toNotify: + err := c.notify(ctx, env) + if err != nil { + return err + } + } + } +} +func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error { + buf := bufpool.GetStd() + defer bufpool.PutStd(buf) + enc := jx.GetEncoder() + enc.ResetWriter(c.remote) + defer jx.PutEncoder(enc) + buf.Reset() + enc.ObjStart() + enc.FieldStart("jsonrpc") + enc.Str("2.0") + err := env.dat(buf) + if err != nil { + enc.FieldStart("error") + err := codec.EncodeError(enc, err) + if err != nil { + return err + } + } else { + enc.FieldStart("data") + enc.Raw(buf.Bytes()) + } + enc.ObjEnd() + err = enc.Close() + if err != nil { + return err + } + return nil +} + +func (c *callResponder) send(ctx context.Context, env *callEnv) error { + buf := bufpool.GetStd() + defer bufpool.PutStd(buf) + enc := jx.GetEncoder() + enc.ResetWriter(c.remote) + defer jx.PutEncoder(enc) + if env.batch { + enc.ArrStart() + } + for _, v := range env.responses { + if v.skip { + continue + } + buf.Reset() + enc.ObjStart() + enc.FieldStart("jsonrpc") + enc.Str("2.0") + enc.FieldStart("id") + enc.Raw(v.id.RawMessage()) + err := v.dat(buf) + if err != nil { + enc.FieldStart("error") + err := codec.EncodeError(enc, err) + if err != nil { + return err + } + } else { + enc.FieldStart("data") + enc.Raw(buf.Bytes()) + } + enc.ObjEnd() + } + if env.batch { + enc.ArrEnd() + } + err := enc.Close() + if err != nil { + return err + } + return nil +} - // TODO: handle this - // c := initClient(codec, s.services) - // <-codec.Closed() - // c.Close() +type callEnv struct { + responses []*callRespWriter + batch bool +} + +type notifyEnv struct { + dat func(io.Writer) error +} + +type callRespWriter struct { + id *codec.ID + dat func(io.Writer) error + err error + skip bool + header http.Header + + notifications chan *notifyEnv +} + +func (c *callRespWriter) Send(v any, err error) error { + if err != nil { + c.err = err + return nil + } + c.dat = func(w io.Writer) error { + return json.NewEncoder(w).Encode(v) + } + return nil +} + +func (c *callRespWriter) Option(k string, v any) { + // no options for now +} + +func (c *callRespWriter) Header() http.Header { + return c.header +} + +func (c *callRespWriter) Notify(v any) error { + c.notifications <- ¬ifyEnv{ + dat: func(w io.Writer) error { + return json.NewEncoder(w).Encode(v) + }, + } + return nil } // Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending diff --git a/session.go b/session.go new file mode 100644 index 0000000000000000000000000000000000000000..66cdccc0d83a425b60492c10e74e643323a0fe83 --- /dev/null +++ b/session.go @@ -0,0 +1,37 @@ +package jrpc + +import ( + "context" + + "gfx.cafe/open/jrpc/codec" +) + +type session struct { +} + +func (s *session) MaxProcs() int { + return 8 +} + +func (s *session) handle(ctx context.Context, stream codec.ReaderWriter, service Handler) error { + msg, err := stream.ReadBatch(ctx) + if err != nil { + //TODO: deal with this error + return err + } + messages, batch := codec.ParseMessage(msg) + totalReplies := 0 + for _, v := range messages { + if v.ID != nil && !v.ID.IsNull() { + totalReplies = totalReplies + 1 + } + } + for _, msg := range messages { + s.handleMessage(err) + } + return nil +} + +func (s *session) handleMessage(ctx context.Context, stream codec.ReaderWriter, service Handler, msg *codec.Message) func() { + return nil +}