diff --git a/cmd/rpcdaemon/README.md b/cmd/rpcdaemon/README.md index 267f5064698e3eb30199900125ea2f1f6afaf20c..15335ff7d72a0bb35fb7411afe9673320beb915a 100644 --- a/cmd/rpcdaemon/README.md +++ b/cmd/rpcdaemon/README.md @@ -387,11 +387,10 @@ Reduce `--private.api.ratelimit` ### Batch requests Currently batch requests are spawn multiple goroutines and process all sub-requests in parallel. -But to limit impact of 1 huge batch to other users - max amount of goroutines hardcoded to 50. -If submit 1 batch with 200 requests - RPCDaemon will spawn 50 goroutines and will use them to process 200 requests. -We can make it configurable if your use-case need it. +To limit impact of 1 huge batch to other users - added flag `--rpc.batch.concurrency` (default: 50). +Increase it to process large batches faster. -But if at least 1 request is "stremable" (has parameter of type *jsoniter.Stream) - then whole batch will processed sequentially (on 1 goroutine). +Known Issue: if at least 1 request is "stremable" (has parameter of type *jsoniter.Stream) - then whole batch will processed sequentially (on 1 goroutine). ## For Developers diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 8836920cf664467bcdc1d4e6447a535478a96e23..ddc570aceb503835b7fc4d7931cf22ab000ec27f 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -45,6 +45,7 @@ type Flags struct { WebsocketEnabled bool WebsocketCompression bool RpcAllowListFilePath string + RpcBatchConcurrency uint TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum } @@ -81,6 +82,7 @@ func RootCommand() (*cobra.Command, *Flags) { rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketEnabled, "ws", false, "Enable Websockets") rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketCompression, "ws.compression", false, "Enable Websocket compression (RFC 7692)") rootCmd.PersistentFlags().StringVar(&cfg.RpcAllowListFilePath, "rpc.accessList", "", "Specify granular (method-by-method) API allowlist") + rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, "rpc.batch.concurrency", 50, "Does limit amount of goroutines to process 1 batch request. Means 1 bach request can't overload server. 1 batch still can have unlimited amount of request") rootCmd.PersistentFlags().BoolVar(&cfg.TraceCompatibility, "trace.compat", false, "Bug for bug compatibility with OE for trace_ routines") if err := rootCmd.MarkPersistentFlagFilename("rpc.accessList", "json"); err != nil { @@ -223,7 +225,7 @@ func StartRpcServer(ctx context.Context, cfg Flags, rpcAPI []rpc.API) error { // register apis and create handler stack httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort) - srv := rpc.NewServer() + srv := rpc.NewServer(cfg.RpcBatchConcurrency) allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath) if err != nil { diff --git a/node/node.go b/node/node.go index 333364ec50c66afab62b5d7533081d2b58e11d01..273b66961c7d25047408f760f119f165cd12ee40 100644 --- a/node/node.go +++ b/node/node.go @@ -94,7 +94,7 @@ func New(conf *Config) (*Node, error) { node := &Node{ config: conf, - inprocHandler: rpc.NewServer(), + inprocHandler: rpc.NewServer(50), log: conf.Logger, stop: make(chan struct{}), databases: make([]ethdb.Closer, 0), diff --git a/node/rpcstack.go b/node/rpcstack.go index ca5a94095dec59c0d17314b9254abd2e76e42dc6..d2d18a65037c70ac9da88bd46a04ce8c9c27d968 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -282,7 +282,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig, allowList rpc. } // Create RPC server and handler. - srv := rpc.NewServer() + srv := rpc.NewServer(50) srv.SetAllowList(allowList) if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil { return err @@ -315,7 +315,7 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig, allowList rpc.All } // Create RPC server and handler. - srv := rpc.NewServer() + srv := rpc.NewServer(50) srv.SetAllowList(allowList) if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil { return err diff --git a/rpc/client.go b/rpc/client.go index 50600912b4ecb3b66f716ef9a9cab0200580ef05..13fcbf6c293b8f12e10acc8597fccad1f43fed8e 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -112,7 +112,7 @@ type clientConn struct { func (c *Client) newClientConn(conn ServerCodec) *clientConn { ctx := context.WithValue(context.Background(), clientContextKey{}, c) - handler := newHandler(ctx, conn, c.idgen, c.services, c.methodAllowList) + handler := newHandler(ctx, conn, c.idgen, c.services, c.methodAllowList, 50) return &clientConn{conn, handler} } diff --git a/rpc/endpoints.go b/rpc/endpoints.go index fb28443903e1e6f5634a36caf32e0a3ee458c76c..1b606240590a80b9d2044ab232e55f650cd95170 100644 --- a/rpc/endpoints.go +++ b/rpc/endpoints.go @@ -27,7 +27,7 @@ import ( func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) { // Register all the APIs exposed by the services. var ( - handler = NewServer() + handler = NewServer(50) regMap = make(map[string]struct{}) registered []string ) diff --git a/rpc/handler.go b/rpc/handler.go index 52227f600d62b20dbbd48d9d93c52028948bed7e..c9a5133e0f79d30ef75548986836e2ba102f88c2 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -30,8 +30,6 @@ import ( "github.com/ledgerwatch/erigon/log" ) -const MaxGoroutinesPerBatchRequest = 50 - // handler handles JSON-RPC messages. There is one handler per connection. Note that // handler is not safe for concurrent use. Message handling never blocks indefinitely // because RPCs are processed on background goroutines launched by handler. @@ -68,8 +66,9 @@ type handler struct { allowList AllowList // a list of explicitly allowed methods, if empty -- everything is allowed - subLock sync.Mutex - serverSubs map[ID]*Subscription + subLock sync.Mutex + serverSubs map[ID]*Subscription + maxBatchConcurrency uint } type callProc struct { @@ -77,7 +76,7 @@ type callProc struct { notifiers []*Notifier } -func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, allowList AllowList) *handler { +func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, allowList AllowList, maxBatchConcurrency uint) *handler { rootCtx, cancelRoot := context.WithCancel(connCtx) h := &handler{ reg: reg, @@ -91,6 +90,8 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg * serverSubs: make(map[ID]*Subscription), log: log.Root(), allowList: allowList, + + maxBatchConcurrency: maxBatchConcurrency, } if conn.remoteAddr() != "" { h.log = h.log.New("conn", conn.remoteAddr()) @@ -146,7 +147,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage, stream *jsoniter.Stream) { // All goroutines will place results right to this array. Because requests order must match reply orders. answersWithNils := make([]*jsonrpcMessage, len(msgs)) // Bounded parallelism pattern explanation https://blog.golang.org/pipelines#TOC_9. - boundedConcurrency := make(chan struct{}, MaxGoroutinesPerBatchRequest) + boundedConcurrency := make(chan struct{}, h.maxBatchConcurrency) defer close(boundedConcurrency) wg := sync.WaitGroup{} wg.Add(len(msgs)) diff --git a/rpc/http_test.go b/rpc/http_test.go index c06ddb6c5a63ae4ee1a86d565b37b9b5f241bc25..c1f7f7f486468b7ad45746023f11ea0dd76f78e9 100644 --- a/rpc/http_test.go +++ b/rpc/http_test.go @@ -103,7 +103,7 @@ func TestHTTPResponseWithEmptyGet(t *testing.T) { func TestHTTPRespBodyUnlimited(t *testing.T) { const respLength = maxRequestContentLength * 3 - s := NewServer() + s := NewServer(50) defer s.Stop() if err := s.RegisterName("test", largeRespService{respLength}); err != nil { t.Fatal(err) diff --git a/rpc/server.go b/rpc/server.go index 247ae6693c195668ecc792cd16daec31e69a0031..4fec17e35a0dfc2fc597eccf38cf4f3bd9924487 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -48,14 +48,16 @@ type Server struct { idgen func() ID run int32 codecs mapset.Set + + batchConcurrency uint } // NewServer creates a new server instance with no registered handlers. -func NewServer() *Server { - server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1} +func NewServer(batchConcurrency uint) *Server { + server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency} // Register the default service providing meta information about the RPC service such // as the services and methods it offers. - rpcService := &RPCService{server} + rpcService := &RPCService{server: server} server.RegisterName(MetadataApi, rpcService) return server } @@ -104,7 +106,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec, stre return } - h := newHandler(ctx, codec, s.idgen, &s.services, s.methodAllowList) + h := newHandler(ctx, codec, s.idgen, &s.services, s.methodAllowList, s.batchConcurrency) h.allowSubscribe = false defer h.close(io.EOF, nil) diff --git a/rpc/server_test.go b/rpc/server_test.go index 6a2b09e44940ab4be6adaf84df5f0624b6c57ccd..7c226cd7497fc1d5c4133d4255222a212e482514 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -29,7 +29,7 @@ import ( ) func TestServerRegisterName(t *testing.T) { - server := NewServer() + server := NewServer(50) service := new(testService) if err := server.RegisterName("test", service); err != nil { diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index 54a053dba8052c425a2b2a1c92b1ed39bc7b3e06..f96b60c067019b60a2cfdc5a7bb06f9218315f1c 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -53,7 +53,7 @@ func TestSubscriptions(t *testing.T) { subCount = len(namespaces) notificationCount = 3 - server = NewServer() + server = NewServer(50) clientConn, serverConn = net.Pipe() out = json.NewEncoder(clientConn) in = json.NewDecoder(clientConn) diff --git a/rpc/testservice_test.go b/rpc/testservice_test.go index 62afc1df44f4889be4debd0e48d614a5d2e75965..e9e16f4202f109a7f0510b5f9834f926ecf02349 100644 --- a/rpc/testservice_test.go +++ b/rpc/testservice_test.go @@ -26,7 +26,7 @@ import ( ) func newTestServer() *Server { - server := NewServer() + server := NewServer(50) server.idgen = sequentialIDGenerator() if err := server.RegisterName("test", new(testService)); err != nil { panic(err) diff --git a/rpc/websocket_test.go b/rpc/websocket_test.go index f03b599174be935b1744045d9d39e5517869e9ec..6d0ba3c2bd3e27187aa7d7b215760a2a3cd70ea7 100644 --- a/rpc/websocket_test.go +++ b/rpc/websocket_test.go @@ -163,7 +163,7 @@ func TestClientWebsocketPing(t *testing.T) { // This checks that the websocket transport can deal with large messages. func TestClientWebsocketLargeMessage(t *testing.T) { var ( - srv = NewServer() + srv = NewServer(50) httpsrv = httptest.NewServer(srv.WebsocketHandler(nil, false)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") )