From 360735318b53a9b8ad2f97b048bf139da0ec6cf5 Mon Sep 17 00:00:00 2001 From: Alex Sharov <AskAlexSharov@gmail.com> Date: Wed, 7 Jul 2021 10:48:21 +0700 Subject: [PATCH] rpc: add --rpc.batch.concurrency flag (#2308) --- cmd/rpcdaemon/README.md | 7 +++---- cmd/rpcdaemon/cli/config.go | 4 +++- node/node.go | 2 +- node/rpcstack.go | 4 ++-- rpc/client.go | 2 +- rpc/endpoints.go | 2 +- rpc/handler.go | 13 +++++++------ rpc/http_test.go | 2 +- rpc/server.go | 10 ++++++---- rpc/server_test.go | 2 +- rpc/subscription_test.go | 2 +- rpc/testservice_test.go | 2 +- rpc/websocket_test.go | 2 +- 13 files changed, 29 insertions(+), 25 deletions(-) diff --git a/cmd/rpcdaemon/README.md b/cmd/rpcdaemon/README.md index 267f506469..15335ff7d7 100644 --- a/cmd/rpcdaemon/README.md +++ b/cmd/rpcdaemon/README.md @@ -387,11 +387,10 @@ Reduce `--private.api.ratelimit` ### Batch requests Currently batch requests are spawn multiple goroutines and process all sub-requests in parallel. -But to limit impact of 1 huge batch to other users - max amount of goroutines hardcoded to 50. -If submit 1 batch with 200 requests - RPCDaemon will spawn 50 goroutines and will use them to process 200 requests. -We can make it configurable if your use-case need it. +To limit impact of 1 huge batch to other users - added flag `--rpc.batch.concurrency` (default: 50). +Increase it to process large batches faster. -But if at least 1 request is "stremable" (has parameter of type *jsoniter.Stream) - then whole batch will processed sequentially (on 1 goroutine). +Known Issue: if at least 1 request is "stremable" (has parameter of type *jsoniter.Stream) - then whole batch will processed sequentially (on 1 goroutine). ## For Developers diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 8836920cf6..ddc570aceb 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -45,6 +45,7 @@ type Flags struct { WebsocketEnabled bool WebsocketCompression bool RpcAllowListFilePath string + RpcBatchConcurrency uint TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum } @@ -81,6 +82,7 @@ func RootCommand() (*cobra.Command, *Flags) { rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketEnabled, "ws", false, "Enable Websockets") rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketCompression, "ws.compression", false, "Enable Websocket compression (RFC 7692)") rootCmd.PersistentFlags().StringVar(&cfg.RpcAllowListFilePath, "rpc.accessList", "", "Specify granular (method-by-method) API allowlist") + rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, "rpc.batch.concurrency", 50, "Does limit amount of goroutines to process 1 batch request. Means 1 bach request can't overload server. 1 batch still can have unlimited amount of request") rootCmd.PersistentFlags().BoolVar(&cfg.TraceCompatibility, "trace.compat", false, "Bug for bug compatibility with OE for trace_ routines") if err := rootCmd.MarkPersistentFlagFilename("rpc.accessList", "json"); err != nil { @@ -223,7 +225,7 @@ func StartRpcServer(ctx context.Context, cfg Flags, rpcAPI []rpc.API) error { // register apis and create handler stack httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort) - srv := rpc.NewServer() + srv := rpc.NewServer(cfg.RpcBatchConcurrency) allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath) if err != nil { diff --git a/node/node.go b/node/node.go index 333364ec50..273b66961c 100644 --- a/node/node.go +++ b/node/node.go @@ -94,7 +94,7 @@ func New(conf *Config) (*Node, error) { node := &Node{ config: conf, - inprocHandler: rpc.NewServer(), + inprocHandler: rpc.NewServer(50), log: conf.Logger, stop: make(chan struct{}), databases: make([]ethdb.Closer, 0), diff --git a/node/rpcstack.go b/node/rpcstack.go index ca5a94095d..d2d18a6503 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -282,7 +282,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig, allowList rpc. } // Create RPC server and handler. - srv := rpc.NewServer() + srv := rpc.NewServer(50) srv.SetAllowList(allowList) if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil { return err @@ -315,7 +315,7 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig, allowList rpc.All } // Create RPC server and handler. - srv := rpc.NewServer() + srv := rpc.NewServer(50) srv.SetAllowList(allowList) if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil { return err diff --git a/rpc/client.go b/rpc/client.go index 50600912b4..13fcbf6c29 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -112,7 +112,7 @@ type clientConn struct { func (c *Client) newClientConn(conn ServerCodec) *clientConn { ctx := context.WithValue(context.Background(), clientContextKey{}, c) - handler := newHandler(ctx, conn, c.idgen, c.services, c.methodAllowList) + handler := newHandler(ctx, conn, c.idgen, c.services, c.methodAllowList, 50) return &clientConn{conn, handler} } diff --git a/rpc/endpoints.go b/rpc/endpoints.go index fb28443903..1b60624059 100644 --- a/rpc/endpoints.go +++ b/rpc/endpoints.go @@ -27,7 +27,7 @@ import ( func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) { // Register all the APIs exposed by the services. var ( - handler = NewServer() + handler = NewServer(50) regMap = make(map[string]struct{}) registered []string ) diff --git a/rpc/handler.go b/rpc/handler.go index 52227f600d..c9a5133e0f 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -30,8 +30,6 @@ import ( "github.com/ledgerwatch/erigon/log" ) -const MaxGoroutinesPerBatchRequest = 50 - // handler handles JSON-RPC messages. There is one handler per connection. Note that // handler is not safe for concurrent use. Message handling never blocks indefinitely // because RPCs are processed on background goroutines launched by handler. @@ -68,8 +66,9 @@ type handler struct { allowList AllowList // a list of explicitly allowed methods, if empty -- everything is allowed - subLock sync.Mutex - serverSubs map[ID]*Subscription + subLock sync.Mutex + serverSubs map[ID]*Subscription + maxBatchConcurrency uint } type callProc struct { @@ -77,7 +76,7 @@ type callProc struct { notifiers []*Notifier } -func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, allowList AllowList) *handler { +func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, allowList AllowList, maxBatchConcurrency uint) *handler { rootCtx, cancelRoot := context.WithCancel(connCtx) h := &handler{ reg: reg, @@ -91,6 +90,8 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg * serverSubs: make(map[ID]*Subscription), log: log.Root(), allowList: allowList, + + maxBatchConcurrency: maxBatchConcurrency, } if conn.remoteAddr() != "" { h.log = h.log.New("conn", conn.remoteAddr()) @@ -146,7 +147,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage, stream *jsoniter.Stream) { // All goroutines will place results right to this array. Because requests order must match reply orders. answersWithNils := make([]*jsonrpcMessage, len(msgs)) // Bounded parallelism pattern explanation https://blog.golang.org/pipelines#TOC_9. - boundedConcurrency := make(chan struct{}, MaxGoroutinesPerBatchRequest) + boundedConcurrency := make(chan struct{}, h.maxBatchConcurrency) defer close(boundedConcurrency) wg := sync.WaitGroup{} wg.Add(len(msgs)) diff --git a/rpc/http_test.go b/rpc/http_test.go index c06ddb6c5a..c1f7f7f486 100644 --- a/rpc/http_test.go +++ b/rpc/http_test.go @@ -103,7 +103,7 @@ func TestHTTPResponseWithEmptyGet(t *testing.T) { func TestHTTPRespBodyUnlimited(t *testing.T) { const respLength = maxRequestContentLength * 3 - s := NewServer() + s := NewServer(50) defer s.Stop() if err := s.RegisterName("test", largeRespService{respLength}); err != nil { t.Fatal(err) diff --git a/rpc/server.go b/rpc/server.go index 247ae6693c..4fec17e35a 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -48,14 +48,16 @@ type Server struct { idgen func() ID run int32 codecs mapset.Set + + batchConcurrency uint } // NewServer creates a new server instance with no registered handlers. -func NewServer() *Server { - server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1} +func NewServer(batchConcurrency uint) *Server { + server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency} // Register the default service providing meta information about the RPC service such // as the services and methods it offers. - rpcService := &RPCService{server} + rpcService := &RPCService{server: server} server.RegisterName(MetadataApi, rpcService) return server } @@ -104,7 +106,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec, stre return } - h := newHandler(ctx, codec, s.idgen, &s.services, s.methodAllowList) + h := newHandler(ctx, codec, s.idgen, &s.services, s.methodAllowList, s.batchConcurrency) h.allowSubscribe = false defer h.close(io.EOF, nil) diff --git a/rpc/server_test.go b/rpc/server_test.go index 6a2b09e449..7c226cd749 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -29,7 +29,7 @@ import ( ) func TestServerRegisterName(t *testing.T) { - server := NewServer() + server := NewServer(50) service := new(testService) if err := server.RegisterName("test", service); err != nil { diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index 54a053dba8..f96b60c067 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -53,7 +53,7 @@ func TestSubscriptions(t *testing.T) { subCount = len(namespaces) notificationCount = 3 - server = NewServer() + server = NewServer(50) clientConn, serverConn = net.Pipe() out = json.NewEncoder(clientConn) in = json.NewDecoder(clientConn) diff --git a/rpc/testservice_test.go b/rpc/testservice_test.go index 62afc1df44..e9e16f4202 100644 --- a/rpc/testservice_test.go +++ b/rpc/testservice_test.go @@ -26,7 +26,7 @@ import ( ) func newTestServer() *Server { - server := NewServer() + server := NewServer(50) server.idgen = sequentialIDGenerator() if err := server.RegisterName("test", new(testService)); err != nil { panic(err) diff --git a/rpc/websocket_test.go b/rpc/websocket_test.go index f03b599174..6d0ba3c2bd 100644 --- a/rpc/websocket_test.go +++ b/rpc/websocket_test.go @@ -163,7 +163,7 @@ func TestClientWebsocketPing(t *testing.T) { // This checks that the websocket transport can deal with large messages. func TestClientWebsocketLargeMessage(t *testing.T) { var ( - srv = NewServer() + srv = NewServer(50) httpsrv = httptest.NewServer(srv.WebsocketHandler(nil, false)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) -- GitLab