diff --git a/cmd/prometheus/prometheus.yml b/cmd/prometheus/prometheus.yml index 706c214a09abbc849ec262345f7a2d1b290ec42d..1d0e0750eae2b15bb883b2e643ad5ada1c21258b 100644 --- a/cmd/prometheus/prometheus.yml +++ b/cmd/prometheus/prometheus.yml @@ -22,16 +22,19 @@ scrape_configs: - host.docker.internal:6060 # this is how docker-for-mac allow to access host machine - host.docker.internal:6061 - host.docker.internal:6062 + - 192.168.255.138:6060 + - 192.168.255.138:6061 - job_name: turbo-geth2 metrics_path: /debug/metrics/prometheus2 scheme: http static_configs: - targets: - - turbo-geth:6060 - - turbo-geth:6061 - - turbo-geth:6062 - - host.docker.internal:6060 - - host.docker.internal:6061 - - host.docker.internal:6062 - + - turbo-geth:6060 + - turbo-geth:6061 + - turbo-geth:6062 + - host.docker.internal:6060 + - host.docker.internal:6061 + - host.docker.internal:6062 + - 192.168.255.138:6060 + - 192.168.255.138:6061 diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index b37db531603a847f7f5cbc2e749b9f8143eb9895..485f44da1ce0ee71395841008d1c216d423634d7 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "time" "github.com/ledgerwatch/turbo-geth/cmd/utils" "github.com/ledgerwatch/turbo-geth/ethdb" @@ -121,7 +122,6 @@ func StartRpcServer(ctx context.Context, cfg Flags, rpcAPI []rpc.API) error { }) listener, _, err := node.StartHTTPEndpoint(httpEndpoint, rpc.DefaultHTTPTimeouts, handler) - if err != nil { return fmt.Errorf("could not start RPC api: %w", err) } @@ -132,10 +132,13 @@ func StartRpcServer(ctx context.Context, cfg Flags, rpcAPI []rpc.API) error { log.Info("HTTP endpoint opened", "url", httpEndpoint, "ws", cfg.WebsocketEnabled) defer func() { - listener.Close() + srv.Stop() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = listener.Shutdown(shutdownCtx) log.Info("HTTP endpoint closed", "url", httpEndpoint) }() - sig := <-ctx.Done() - log.Info("Exiting...", "signal", sig) + <-ctx.Done() + log.Info("Exiting...") return nil } diff --git a/cmd/rpcdaemon/main.go b/cmd/rpcdaemon/main.go index 4346744f18c540047f5b41c755d98ea9c64174cc..1fd3d5944760ab6118c701591e0d360fa58ee8e4 100644 --- a/cmd/rpcdaemon/main.go +++ b/cmd/rpcdaemon/main.go @@ -3,10 +3,9 @@ package main import ( "os" - "github.com/ledgerwatch/turbo-geth/cmd/utils" - "github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/cli" "github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/commands" + "github.com/ledgerwatch/turbo-geth/cmd/utils" "github.com/ledgerwatch/turbo-geth/log" "github.com/spf13/cobra" ) @@ -23,6 +22,7 @@ func main() { log.Error("Could not connect to remoteDb", "error", err) return nil } + defer db.Close() var apiList = commands.APIList(db, backend, *cfg, nil) return cli.StartRpcServer(cmd.Context(), *cfg, apiList) diff --git a/cmd/rpcdaemon/test.http b/cmd/rpcdaemon/test.http index 81fabf30d64031d660a57f91aeaea761727b3ce6..aa34022ad8ee9ec6debe6a93ed2db53dfbc66bf4 100644 --- a/cmd/rpcdaemon/test.http +++ b/cmd/rpcdaemon/test.http @@ -78,7 +78,7 @@ Content-Type: application/json ### -POST localhost:8545 +POST 192.168.255.138:8545 Content-Type: application/json { diff --git a/cmd/rpctest/Readme.md b/cmd/rpctest/Readme.md index ee647366c52d7e49d4aa648cbab23154424333d5..d52c5ad853f8a2926ff19f7e67efd86ad3a50169 100644 --- a/cmd/rpctest/Readme.md +++ b/cmd/rpctest/Readme.md @@ -20,7 +20,7 @@ go get -u github.com/tsenart/vegeta ### Run vegeta ``` tmpDir = "/var/folders/x_/1mnbt25s3291zr5_fxhjfnq9n86kng/T/" -cat $(tmpDir)/turbo_geth_stress_test/vegeta_geth_debug_storageRangeAt.csv | vegeta attack -rate=600 -format=json -duration=20s | vegeta plot > plot.html +cat $(tmpDir)/turbo_geth_stress_test/vegeta_geth_debug_storageRangeAt.csv | vegeta attack -rate=600 -format=json -duration=20s -timeout=300s | vegeta plot > plot.html open plot.html ``` @@ -37,7 +37,7 @@ GODEBUG=remotedb.debug=1 go run ./cmd/rpcdaemon --rpcapi eth,debug,net --rpcport On simple requests `eth_getBlockByNumber` RPC Daemon looks well: ``` -cat /tmp/turbo_geth_stress_test/vegeta_turbo_geth_eth_getBlockByNumber.txt | vegeta attack -rate=1000 -format=json -duration=20s | vegeta report +cat /tmp/turbo_geth_stress_test/vegeta_turbo_geth_eth_getBlockByNumber.txt | vegeta attack -rate=1000 -format=json -duration=20s -timeout=300s | vegeta report 300rps: - Geth Alone: 80% of CPU, 95-Latency 2ms @@ -61,7 +61,7 @@ cat /tmp/turbo_geth_stress_test/vegeta_turbo_geth_eth_getBlockByNumber.txt | veg On complex request - `debug_storageRangeAt` producing >600 db.View calls with twice more .Bucket/.Cursor/.Seek calls: ``` -echo "POST http://localhost:9545 \n Content-Type: application/json \n @$(pwd)/cmd/rpctest/heavyStorageRangeAt.json" | vegeta attack -rate=20 -duration=20s | vegeta report +echo "POST http://localhost:8545 \n Content-Type: application/json \n @$(pwd)/cmd/rpctest/heavyStorageRangeAt.json" | vegeta attack -rate=20 -duration=20s -timeout=300s | vegeta report 10rps, batchSize 10K: - Geth Alone: 100% of CPU, 95-Latency 15ms diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go index 45ee81482f4f61c90b73487b0628cdb131a5d131..5a4c419fa77dc345bd7c63a3efc268d7954f0f3a 100644 --- a/cmd/utils/cmd.go +++ b/cmd/utils/cmd.go @@ -342,8 +342,8 @@ func RootContext() context.Context { defer signal.Stop(ch) select { - case <-ch: - log.Info("Got interrupt, shutting down...") + case sig := <-ch: + log.Info("Got interrupt, shutting down...", "sig", sig) case <-ctx.Done(): } }() diff --git a/docker-compose.yml b/docker-compose.yml index 45d1c671fc646d9a212557d59e66b7ea7589dac1..27928ace80711b6bb055928abb2605d61b8e0e5a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,7 +21,7 @@ services: - ${XDG_DATA_HOME:-~/.local/share}/tg-prometheus:/prometheus grafana: - image: grafana/grafana:7.1.5 + image: grafana/grafana:7.2.1 ports: - 3000:3000 volumes: diff --git a/eth/backend.go b/eth/backend.go index 1c2c41b4549798e89ad5fc3e052234859750bc0b..8f79cd089343f817301d36d05150ae44b67f82db 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -31,6 +31,7 @@ import ( "sync/atomic" ethereum "github.com/ledgerwatch/turbo-geth" + "google.golang.org/grpc" "google.golang.org/grpc/credentials" "github.com/ledgerwatch/turbo-geth/accounts" @@ -74,8 +75,9 @@ type Ethereum struct { dialCandidates enode.Iterator // DB interfaces - chainDb *ethdb.ObjectDatabase // Block chain database - chainKV ethdb.KV // Same as chainDb, but different interface + chainDb *ethdb.ObjectDatabase // Block chain database + chainKV ethdb.KV // Same as chainDb, but different interface + privateAPI *grpc.Server eventMux *event.TypeMux engine consensus.Engine @@ -261,9 +263,15 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) { if err != nil { return nil, err } - remotedbserver.StartGrpc(chainDb.KV(), eth, stack.Config().PrivateApiAddr, &creds) + eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.KV(), eth, stack.Config().PrivateApiAddr, &creds) + if err != nil { + return nil, err + } } else { - remotedbserver.StartGrpc(chainDb.KV(), eth, stack.Config().PrivateApiAddr, nil) + eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.KV(), eth, stack.Config().PrivateApiAddr, nil) + if err != nil { + return nil, err + } } } @@ -704,6 +712,9 @@ func (s *Ethereum) StopTxPool() error { func (s *Ethereum) Stop() error { // Stop all the peer-related stuff first. s.protocolManager.Stop() + if s.privateAPI != nil { + s.privateAPI.GracefulStop() + } // Then stop everything else. s.bloomIndexer.Close() @@ -718,6 +729,5 @@ func (s *Ethereum) Stop() error { if s.txPool != nil { s.txPool.Stop() } - //s.chainDb.Close() return nil } diff --git a/ethdb/kv_remote.go b/ethdb/kv_remote.go index d7b43223ee495d2d83d11985e06203574eb5cf46..2d04249a526745c6b3c87465c82e93380fb6f2f2 100644 --- a/ethdb/kv_remote.go +++ b/ethdb/kv_remote.go @@ -5,7 +5,9 @@ import ( "context" "crypto/tls" "crypto/x509" + "errors" "fmt" + "io" "io/ioutil" "net" "time" @@ -18,6 +20,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/test/bufconn" ) @@ -97,12 +100,15 @@ func (opts remoteOpts) InMem(listener *bufconn.Listener) remoteOpts { func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, error) { var dialOpts []grpc.DialOption + dialOpts = []grpc.DialOption{ + grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig, MinConnectTimeout: 10 * time.Minute}), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(5 * datasize.MB))), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Timeout: 10 * time.Minute, + }), + } if certFile == "" { - dialOpts = []grpc.DialOption{ - grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig}), - grpc.WithInsecure(), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(5 * datasize.MB))), - } + dialOpts = append(dialOpts, grpc.WithInsecure()) } else { var creds credentials.TransportCredentials var err error @@ -135,11 +141,7 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, erro }) } - dialOpts = []grpc.DialOption{ - grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig}), - grpc.WithTransportCredentials(creds), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(5 * datasize.MB))), - } + dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds)) } if opts.inMemConn != nil { @@ -244,10 +246,7 @@ func (tx *remoteTx) Commit(ctx context.Context) error { func (tx *remoteTx) Rollback() { for _, c := range tx.cursors { - if c.stream != nil { - c.streamCancelFn() - c.stream = nil - } + c.Close() } } @@ -271,15 +270,7 @@ func (tx *remoteTx) BucketSize(name string) (uint64, error) { func (tx *remoteTx) Get(bucket string, key []byte) (val []byte, err error) { c := tx.Cursor(bucket) - defer func() { - if v, ok := c.(*remoteCursor); ok { - if v.stream == nil { - return - } - v.streamCancelFn() - } - }() - + defer c.Close() return c.SeekExact(key) } @@ -326,11 +317,7 @@ func (c *remoteCursor) First() ([]byte, []byte, error) { // Seek - doesn't start streaming (because much of code does only several .Seek calls without reading sequence of data) // .Next() - does request streaming (if configured by user) func (c *remoteCursor) Seek(seek []byte) ([]byte, []byte, error) { - if c.stream != nil { - c.streamCancelFn() // This will close the stream and free resources - c.stream = nil - c.streamingRequested = false - } + c.closeGrpcStream() c.initialized = true var err error @@ -382,12 +369,37 @@ func (c *remoteCursor) Last() ([]byte, []byte, error) { panic("not implemented yet") } -func (c *remoteCursor) Close() { - if c.stream != nil { +func (c *remoteCursor) closeGrpcStream() { + if c.stream == nil { + return + } + defer c.streamCancelFn() // hard cancel stream if graceful wasn't successful + + if c.streamingRequested { + // if streaming is in progress, can't use `CloseSend` - because + // server will not read it right not - it busy with streaming data + // TODO: set flag 'c.streamingRequested' to false when got terminator from server (nil key or os.EOF) c.streamCancelFn() - c.stream = nil - c.streamingRequested = false + } else { + // try graceful close stream + err := c.stream.CloseSend() + if err != nil { + if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { + log.Warn("couldn't send msg CloseSend to server", "err", err) + } + } else { + _, err = c.stream.Recv() + if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { + log.Warn("received unexpected error from server after CloseSend", "err", err) + } + } } + c.stream = nil + c.streamingRequested = false +} + +func (c *remoteCursor) Close() { + c.closeGrpcStream() } func (tx *remoteTx) CursorDupSort(bucket string) CursorDupSort { @@ -420,11 +432,7 @@ func (c *remoteCursorDupSort) SeekBothExact(key, value []byte) ([]byte, []byte, } func (c *remoteCursorDupSort) SeekBothRange(key, value []byte) ([]byte, []byte, error) { - if c.stream != nil { - c.streamCancelFn() // This will close the stream and free resources - c.stream = nil - c.streamingRequested = false - } + c.closeGrpcStream() // TODO: if streaming not requested then no reason to close c.initialized = true var err error diff --git a/ethdb/remote/remotedbserver/server.go b/ethdb/remote/remotedbserver/server.go index 1fd2d0e6256803d71b2f63b6f54b9eae05589667..9c16a5e4e7bdeacb78bd72673e7087ef8cee89d8 100644 --- a/ethdb/remote/remotedbserver/server.go +++ b/ethdb/remote/remotedbserver/server.go @@ -1,6 +1,7 @@ package remotedbserver import ( + "fmt" "io" "net" "time" @@ -8,15 +9,15 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/core" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/ethdb/remote" "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/metrics" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" ) const MaxTxTTL = 30 * time.Second @@ -27,12 +28,11 @@ type KvServer struct { kv ethdb.KV } -func StartGrpc(kv ethdb.KV, eth core.Backend, addr string, creds *credentials.TransportCredentials) { +func StartGrpc(kv ethdb.KV, eth core.Backend, addr string, creds *credentials.TransportCredentials) (*grpc.Server, error) { log.Info("Starting private RPC server", "on", addr) lis, err := net.Listen("tcp", addr) if err != nil { - log.Error("Could not create listener", "address", addr, "err", err) - return + return nil, fmt.Errorf("could not create listener: %w, addr=%s", err, addr) } kvSrv := NewKvServer(kv) @@ -49,26 +49,22 @@ func StartGrpc(kv ethdb.KV, eth core.Backend, addr string, creds *credentials.Tr streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor()) unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor()) var grpcServer *grpc.Server + opts := []grpc.ServerOption{ + grpc.WriteBufferSize(1024), // reduce buffers to save mem + grpc.ReadBufferSize(1024), + grpc.MaxConcurrentStreams(60), // to force clients reduce concurrency level + grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: 10 * time.Minute, + }), + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), + } if creds == nil { - grpcServer = grpc.NewServer( - grpc.NumStreamWorkers(20), // reduce amount of goroutines - grpc.WriteBufferSize(1024), // reduce buffers to save mem - grpc.ReadBufferSize(1024), - grpc.MaxConcurrentStreams(40), // to force clients reduce concurency level - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), - ) + // no specific opts } else { - grpcServer = grpc.NewServer( - grpc.NumStreamWorkers(20), // reduce amount of goroutines - grpc.WriteBufferSize(1024), // reduce buffers to save mem - grpc.ReadBufferSize(1024), - grpc.MaxConcurrentStreams(40), // to force clients reduce concurency level - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), - grpc.Creds(*creds), - ) + opts = append(opts, grpc.Creds(*creds)) } + grpcServer = grpc.NewServer(opts...) remote.RegisterKVService(grpcServer, remote.NewKVService(kvSrv)) remote.RegisterDBService(grpcServer, remote.NewDBService(dbSrv)) remote.RegisterETHBACKENDService(grpcServer, remote.NewETHBACKENDService(ethBackendSrv)) @@ -82,6 +78,8 @@ func StartGrpc(kv ethdb.KV, eth core.Backend, addr string, creds *credentials.Tr log.Error("private RPC server fail", "err", err) } }() + + return grpcServer, nil } func NewKvServer(kv ethdb.KV) *KvServer { @@ -95,7 +93,7 @@ func (s *KvServer) Seek(stream remote.KV_SeekServer) error { } tx, err := s.kv.Begin(stream.Context(), nil, false) if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } rollback := func() { tx.Rollback() @@ -115,18 +113,18 @@ func (s *KvServer) Seek(stream remote.KV_SeekServer) error { c = tx.Cursor(bucketName).Prefix(prefix) k, v, err = c.Seek(in.SeekKey) if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } } else { cd := tx.CursorDupSort(bucketName) k, v, err = cd.SeekBothRange(in.SeekKey, in.SeekValue) if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } if k == nil { // it may happen that key where we stopped disappeared after transaction reopen, then just move to next key k, v, err = cd.Next() if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } } c = cd @@ -136,7 +134,7 @@ func (s *KvServer) Seek(stream remote.KV_SeekServer) error { for { err = stream.Send(&remote.Pair{Key: common.CopyBytes(k), Value: common.CopyBytes(v)}) if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } if k == nil { return nil @@ -149,35 +147,35 @@ func (s *KvServer) Seek(stream remote.KV_SeekServer) error { if err == io.EOF { return nil } - return err + return fmt.Errorf("server-side error: %w", err) } if len(in.SeekValue) > 0 { k, v, err = c.(ethdb.CursorDupSort).SeekBothRange(in.SeekKey, in.SeekValue) if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } if k == nil { // it may happen that key where we stopped disappeared after transaction reopen, then just move to next key k, v, err = c.Next() if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } } } else if len(in.SeekKey) > 0 { k, v, err = c.Seek(in.SeekKey) if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } } else { k, v, err = c.Next() if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } } } else { k, v, err = c.Next() if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } } @@ -188,18 +186,19 @@ func (s *KvServer) Seek(stream remote.KV_SeekServer) error { tx.Rollback() tx, err = s.kv.Begin(stream.Context(), nil, false) if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) + } if isDupsort { dc := tx.CursorDupSort(bucketName) k, v, err = dc.SeekBothRange(k, v) if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } if k == nil { // it may happen that key where we stopped disappeared after transaction reopen, then just move to next key k, v, err = dc.Next() if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } } c = dc @@ -207,7 +206,7 @@ func (s *KvServer) Seek(stream remote.KV_SeekServer) error { c = tx.Cursor(bucketName).Prefix(prefix) k, v, err = c.Seek(k) if err != nil { - return err + return fmt.Errorf("server-side error: %w", err) } } }