From 7ceddd4f2c4383ba20544327da63b73d29f2dc6d Mon Sep 17 00:00:00 2001 From: a <a@tuxpa.in> Date: Wed, 25 Oct 2023 04:38:19 -0500 Subject: [PATCH] remove state from the server, finally --- contrib/codecs/http/http_test.go | 4 - contrib/codecs/websocket/websocket_test.go | 4 - pkg/jrpctest/suites.go | 2 - pkg/server/server.go | 119 ++++++++------------- pkg/server/server_test.go | 1 - 5 files changed, 42 insertions(+), 88 deletions(-) diff --git a/contrib/codecs/http/http_test.go b/contrib/codecs/http/http_test.go index f18dbe5..b8e5c78 100644 --- a/contrib/codecs/http/http_test.go +++ b/contrib/codecs/http/http_test.go @@ -95,7 +95,6 @@ func TestHTTPErrorResponseWithValidRequest(t *testing.T) { func confirmHTTPRequestYieldsStatusCode(t *testing.T, method, contentType, body string, expectedStatusCode int) { t.Helper() s := server.NewServer(jmux.NewMux()) - defer s.Stop() ts := httptest.NewServer(&Server{Server: s}) defer ts.Close() @@ -120,7 +119,6 @@ func TestHTTPResponseWithEmptyGet(t *testing.T) { // This checks that maxRequestContentLength is not applied to the response of a request. func TestHTTPRespBodyUnlimited(t *testing.T) { s := jrpctest.NewServer() - defer s.Stop() ts := httptest.NewServer(&Server{Server: s}) defer ts.Close() @@ -180,7 +178,6 @@ func TestHTTPErrorResponse(t *testing.T) { func TestHTTPPeerInfo(t *testing.T) { s := jrpctest.NewServer() - defer s.Stop() ts := httptest.NewServer(&Server{Server: s}) defer ts.Close() @@ -215,7 +212,6 @@ func TestHTTPPeerInfo(t *testing.T) { } func TestClientHTTP(t *testing.T) { s := jrpctest.NewServer() - defer s.Stop() ts := httptest.NewServer(&Server{Server: s}) defer ts.Close() c, err := DialHTTP(ts.URL) diff --git a/contrib/codecs/websocket/websocket_test.go b/contrib/codecs/websocket/websocket_test.go index 0979b5a..4517153 100644 --- a/contrib/codecs/websocket/websocket_test.go +++ b/contrib/codecs/websocket/websocket_test.go @@ -40,7 +40,6 @@ func TestWebsocketOriginCheck(t *testing.T) { httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, []string{"http://example.com"})) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) - defer srv.Stop() defer httpsrv.Close() client, err := websocket.DialWebsocket(context.Background(), wsURL, "http://ekzample.com") @@ -70,7 +69,6 @@ func TestWebsocketLargeCall(t *testing.T) { httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, []string{"*"})) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) - defer srv.Stop() defer httpsrv.Close() client, err := websocket.DialWebsocket(context.Background(), wsURL, "") @@ -103,7 +101,6 @@ func TestWebsocketPeerInfo(t *testing.T) { ts = httptest.NewServer(websocket.WebsocketHandler(s, []string{"origin.example.com"})) tsurl = "ws:" + strings.TrimPrefix(ts.URL, "http:") ) - defer s.Stop() defer ts.Close() ctx := context.Background() @@ -140,7 +137,6 @@ func TestClientWebsocketLargeMessage(t *testing.T) { httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, nil)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) - defer srv.Stop() defer httpsrv.Close() respLength := websocket.WsMessageSizeLimit - 50 diff --git a/pkg/jrpctest/suites.go b/pkg/jrpctest/suites.go index ddb4325..9adb7da 100644 --- a/pkg/jrpctest/suites.go +++ b/pkg/jrpctest/suites.go @@ -30,7 +30,6 @@ func TestExecutor(sm ServerMaker) func(t *testing.T, c TestContext) { return func(t *testing.T, c TestContext) { server, dialer, cn := sm() defer cn() - defer server.Stop() client := dialer() defer client.Close() c(t, server, client) @@ -40,7 +39,6 @@ func BenchExecutor(sm ServerMaker) func(t *testing.B, c BenchContext) { return func(t *testing.B, c BenchContext) { server, dialer, cn := sm() defer cn() - defer server.Stop() client := dialer() defer client.Close() c(t, server, client) diff --git a/pkg/server/server.go b/pkg/server/server.go index 9e63162..28fa148 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -3,12 +3,9 @@ package server import ( "bytes" "context" - "fmt" "sync" - "sync/atomic" "gfx.cafe/open/jrpc/pkg/codec" - "gfx.cafe/open/jrpc/pkg/util/mapset" "gfx.cafe/util/go/bufpool" @@ -21,24 +18,55 @@ import ( // a server can be used to listenandserve multiple codecs at a time type Server struct { services codec.Handler - run int32 - codecs *mapset.Set[codec.ReaderWriter] - Tracing Tracing -} - -type Tracing struct { } // NewServer creates a new server instance with no registered handlers. func NewServer(r codec.Handler) *Server { - server := &Server{ - codecs: mapset.NewSet[codec.ReaderWriter](), - run: 1, - } - server.services = r + server := &Server{services: r} return server } +// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes +// the response back using the given codec. It will block until the codec is closed +func (s *Server) ServeCodec(ctx context.Context, remote codec.ReaderWriter) error { + defer remote.Close() + responder := &callResponder{ + remote: remote, + } + // add a cancel to the context so we can cancel all the child tasks on return + ctx, cn := context.WithCancel(ContextWithPeerInfo(ctx, remote.PeerInfo())) + defer cn() + + errch := make(chan error) + go func() { + for { + // read messages from the stream synchronously + incoming, batch, err := remote.ReadBatch(ctx) + if err != nil { + errch <- err + return + } + go func() { + err = s.serveBatch(ctx, incoming, batch, remote, responder) + if err != nil { + errch <- err + return + } + }() + } + }() + // exit on either the first error, or the context closing. + select { + case <-ctx.Done(): + return nil + case err := <-errch: + // perform a flush on error just in case there are dangling things to be sent, states to be cleaned up, etc. + // the connection is already dead, so at this point there are no rules, so this is okay to do i think + remote.Flush() + return err + } +} + func (s *Server) serveBatch(ctx context.Context, incoming []*codec.Message, batch bool, @@ -115,69 +143,6 @@ func (s *Server) serveBatch(ctx context.Context, return responder.send(ctx, env) } -// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes -// the response back using the given codec. It will block until the codec is closed or the -// server is stopped. In either case the codec is closed when this function returns. -func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) error { - defer remote.Close() - // Don't serve if server is stopped. - if atomic.LoadInt32(&s.run) == 0 { - return fmt.Errorf("Server stopped") - } - // Add the codec to the set so it can be closed by Stop. - s.codecs.Add(remote) - defer s.codecs.Remove(remote) - responder := &callResponder{ - remote: remote, - } - // add a cancel to the context so we can cancel all the child tasks on return - ctx, cn := context.WithCancel(ContextWithPeerInfo(pctx, remote.PeerInfo())) - defer cn() - - errch := make(chan error) - go func() { - for { - // read messages from the stream synchronously - incoming, batch, err := remote.ReadBatch(ctx) - if err != nil { - errch <- err - return - } - // process each in a goroutine - go func() { - // the only reason this should error is if - err = s.serveBatch(ctx, incoming, batch, remote, responder) - if err != nil { - errch <- err - return - } - }() - } - }() - // exit on either the first error, or the context closing. - select { - case <-ctx.Done(): - return nil - case err := <-errch: - // perform a flush on error just in case there are dangling things to be sent, states to be cleaned up, etc. - // the connection is already dead, so at this point there are no rules, so this is okay to do i think - remote.Flush() - return err - } -} - -// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending -// requests to finish, then closes all codecs which will cancel pending requests and -// subscriptions. -func (s *Server) Stop() { - if atomic.CompareAndSwapInt32(&s.run, 1, 0) { - s.codecs.Each(func(c codec.ReaderWriter) bool { - c.Close() - return true - }) - } -} - type callResponder struct { remote codec.ReaderWriter mu sync.Mutex diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index a980047..33295ee 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -23,7 +23,6 @@ func TestGoEthereumTestScripts(t *testing.T) { srv := jrpctest.NewServer() c := rdwr.NewCodec(wr, wr) go srv.ServeCodec(context.TODO(), c) - defer srv.Stop() for _, act := range tf.Action { switch act.Direction { case jrpctest.DirectionRecv: -- GitLab