diff --git a/Makefile b/Makefile index 245923ef2484413a1a797293d1a20c8db7472c2b..c1a259bd6c60ad3add45e5f2b653cb21ec187b54 100644 --- a/Makefile +++ b/Makefile @@ -184,6 +184,7 @@ grpc: $(GOBUILD) -o $(GOBIN)/protoc-gen-go-grpc google.golang.org/grpc/cmd/protoc-gen-go-grpc # generates grpc services PATH=$(GOBIN):$(PATH) go generate ./ethdb PATH=$(GOBIN):$(PATH) go generate ./cmd/headers + PATH=$(GOBIN):$(PATH) go generate ./turbo/shards simulator-genesis: go run ./cmd/tester genesis > ./cmd/tester/simulator_genesis.json diff --git a/cmd/headers/download/sentry.go b/cmd/headers/download/sentry.go index e292ef48aa9d61966fb5e6bc3f036d2c84832d27..6c94a442ffb2ad16741b1f656fefc2ca8ed0cd5c 100644 --- a/cmd/headers/download/sentry.go +++ b/cmd/headers/download/sentry.go @@ -471,7 +471,7 @@ func Sentry(natSetting string, port int, sentryAddr string, coreAddr string) err } go func() { - if err1 := grpcServer.Serve(lis); err != nil { + if err1 := grpcServer.Serve(lis); err1 != nil { log.Error("Sentry P2P server fail", "err", err1) } }() diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index b6507899301e6409aa1ee2f5d932f8ed7f9f3e6e..8b74cde81815e081bb5b6aa27f7c64f261208889 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -23,6 +23,10 @@ var ( mapSizeStr string freelistReuse int migration string + dispatcherAddr string + dispatcherLatency int + shardBits int + shardID int ) func must(err error) { @@ -90,3 +94,15 @@ func withBatchSize(cmd *cobra.Command) { func withMigration(cmd *cobra.Command) { cmd.Flags().StringVar(&migration, "migration", "", "action to apply to given migration") } + +func withDispatcher(cmd *cobra.Command) { + cmd.Flags().StringVar(&dispatcherAddr, "dispatcher_addr", "", "address of shard dispatcher") + cmd.Flags().IntVar(&dispatcherLatency, "dispatcher_latency", 0, "artificial latency of dispatcher, in ms") + cmd.Flags().IntVar(&shardBits, "shard_bits", 2, "number of bits in the key used to derive shardID") +} + +func withShard(cmd *cobra.Command) { + cmd.Flags().StringVar(&dispatcherAddr, "dispatcher_addr", "", "address of shard dispatcher") + cmd.Flags().IntVar(&shardBits, "shard_bits", 2, "number of bits in the key used to derive shardID") + cmd.Flags().IntVar(&shardID, "shard_id", 0, "shard ID") +} diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index b0d7d087f2f3aafcc91ed96f1471a11b76567e60..412839a2539ea307da6ad41a9cafb007bb873009 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -2,28 +2,42 @@ package commands import ( "context" + "fmt" + "net" "path" "runtime" "sort" "strings" + "sync" + "syscall" "time" + "github.com/VictoriaMetrics/fastcache" "github.com/c2h5oh/datasize" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/ledgerwatch/turbo-geth/cmd/utils" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/common/etl" "github.com/ledgerwatch/turbo-geth/consensus/ethash" "github.com/ledgerwatch/turbo-geth/core" "github.com/ledgerwatch/turbo-geth/core/rawdb" + "github.com/ledgerwatch/turbo-geth/core/state" "github.com/ledgerwatch/turbo-geth/core/vm" "github.com/ledgerwatch/turbo-geth/eth/stagedsync" "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" + "github.com/ledgerwatch/turbo-geth/metrics" "github.com/ledgerwatch/turbo-geth/migrations" "github.com/ledgerwatch/turbo-geth/params" + "github.com/ledgerwatch/turbo-geth/turbo/shards" "github.com/ledgerwatch/turbo-geth/turbo/torrent" "github.com/spf13/cobra" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/keepalive" ) var cmdStageSenders = &cobra.Command{ @@ -210,6 +224,19 @@ var cmdRunMigrations = &cobra.Command{ }, } +var cmdShardDispatcher = &cobra.Command{ + Use: "shard_dispatcher", + Short: "", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := utils.RootContext() + if err := shardDispatcher(ctx); err != nil { + log.Error("Starting Shard Dispatcher failed", "err", err) + return err + } + return nil + }, +} + func init() { withChaindata(cmdPrintStages) withLmdbFlags(cmdPrintStages) @@ -275,6 +302,7 @@ func init() { withBlock(cmdCallTraces) withUnwind(cmdCallTraces) withDatadir(cmdCallTraces) + withShard(cmdCallTraces) rootCmd.AddCommand(cmdCallTraces) @@ -299,6 +327,9 @@ func init() { withLmdbFlags(cmdRunMigrations) withDatadir(cmdRunMigrations) rootCmd.AddCommand(cmdRunMigrations) + + withDispatcher(cmdShardDispatcher) + rootCmd.AddCommand(cmdShardDispatcher) } func stageSenders(db ethdb.Database, ctx context.Context) error { @@ -496,7 +527,44 @@ func stageCallTraces(db ethdb.Database, ctx context.Context) error { return stagedsync.UnwindCallTraces(u, s, db, bc.Config(), bc, ch) } - if err := stagedsync.SpawnCallTraces(s, db, bc.Config(), bc, tmpdir, ch); err != nil { + var accessBuilder stagedsync.StateAccessBuilder + var toBlock uint64 + if dispatcherAddr != "" { + // CREATING GRPC CLIENT CONNECTION + var dialOpts []grpc.DialOption + dialOpts = []grpc.DialOption{ + grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig, MinConnectTimeout: 10 * time.Minute}), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(5 * datasize.MB))), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Timeout: 10 * time.Minute, + }), + } + + dialOpts = append(dialOpts, grpc.WithInsecure()) + + conn, err := grpc.DialContext(ctx, dispatcherAddr, dialOpts...) + if err != nil { + return fmt.Errorf("creating client connection to shard dispatcher: %w", err) + } + dispatcherClient := shards.NewDispatcherClient(conn) + var client shards.Dispatcher_StartDispatchClient + client, err = dispatcherClient.StartDispatch(ctx, &grpc.EmptyCallOption{}) + if err != nil { + return fmt.Errorf("starting shard dispatch: %w", err) + } + accessBuilder = func(db ethdb.Database, blockNumber uint64, accountCache, storageCache, codeCache, codeSizeCache *fastcache.Cache) (state.StateReader, state.WriterWithChangeSets) { + shard := shards.NewShard(db.(ethdb.HasTx).Tx(), blockNumber, client, accountCache, storageCache, codeCache, codeSizeCache, shardBits, byte(shardID)) + return shard, shard + } + toBlock = block + 10000 + } + + if err := stagedsync.SpawnCallTraces(s, db, bc.Config(), bc, tmpdir, ch, + stagedsync.CallTracesStageParams{ + AccessBuilder: accessBuilder, + PresetChanges: accessBuilder == nil, + ToBlock: toBlock, + }); err != nil { return err } return nil @@ -599,6 +667,122 @@ func removeMigration(db rawdb.DatabaseDeleter, _ context.Context) error { return nil } +func shardDispatcher(ctx context.Context) error { + // STARTING GRPC SERVER + log.Info("Starting Shard Dispatcher", "on", dispatcherAddr) + listenConfig := net.ListenConfig{ + Control: func(network, address string, _ syscall.RawConn) error { + log.Info("Shard Dispatcher received connection", "via", network, "from", address) + return nil + }, + } + lis, err := listenConfig.Listen(ctx, "tcp", dispatcherAddr) + if err != nil { + return fmt.Errorf("could not create Shard Dispatcher listener: %w, addr=%s", err, dispatcherAddr) + } + var ( + streamInterceptors []grpc.StreamServerInterceptor + unaryInterceptors []grpc.UnaryServerInterceptor + ) + if metrics.Enabled { + streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor) + unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor) + } + streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor()) + unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor()) + var grpcServer *grpc.Server + cpus := uint32(runtime.GOMAXPROCS(-1)) + opts := []grpc.ServerOption{ + grpc.NumStreamWorkers(cpus), // reduce amount of goroutines + grpc.WriteBufferSize(1024), // reduce buffers to save mem + grpc.ReadBufferSize(1024), + grpc.MaxConcurrentStreams(100), // to force clients reduce concurrency level + grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: 10 * time.Minute, + }), + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), + } + grpcServer = grpc.NewServer(opts...) + dispatcherServer := NewDispatcherServerImpl(1 << shardBits) + shards.RegisterDispatcherServer(grpcServer, dispatcherServer) + if metrics.Enabled { + grpc_prometheus.Register(grpcServer) + } + + go func() { + if err1 := grpcServer.Serve(lis); err1 != nil { + log.Error("Shard Dispatcher failed", "err", err1) + } + }() + <-ctx.Done() + return nil +} + +type DispatcherServerImpl struct { + shards.UnimplementedDispatcherServer + connMutex sync.RWMutex + expectedConnections int + connections map[int]shards.Dispatcher_StartDispatchServer + nextConnID int +} + +func NewDispatcherServerImpl(expectedConnections int) *DispatcherServerImpl { + return &DispatcherServerImpl{ + expectedConnections: expectedConnections, + connections: make(map[int]shards.Dispatcher_StartDispatchServer), + } +} + +func (ds *DispatcherServerImpl) addConnection(connection shards.Dispatcher_StartDispatchServer) int { + ds.connMutex.Lock() + defer ds.connMutex.Unlock() + connID := ds.nextConnID + ds.nextConnID++ + ds.connections[connID] = connection + return connID +} + +func (ds *DispatcherServerImpl) removeConnection(connID int) { + ds.connMutex.Lock() + defer ds.connMutex.Unlock() + delete(ds.connections, connID) +} + +func (ds *DispatcherServerImpl) makeBroadcastList(connID int) []shards.Dispatcher_StartDispatchServer { + ds.connMutex.RLock() + defer ds.connMutex.RUnlock() + var list []shards.Dispatcher_StartDispatchServer + for id, conn := range ds.connections { + if id != connID { + list = append(list, conn) + } + } + return list +} + +func (ds *DispatcherServerImpl) StartDispatch(connection shards.Dispatcher_StartDispatchServer) error { + connID := ds.addConnection(connection) + defer ds.removeConnection(connID) + for stateRead, recvErr := connection.Recv(); recvErr == nil; stateRead, recvErr = connection.Recv() { + // Get list of connections to broadcast to + var broadcastList []shards.Dispatcher_StartDispatchServer + for broadcastList = ds.makeBroadcastList(connID); len(broadcastList) < ds.expectedConnections-1; broadcastList = ds.makeBroadcastList(connID) { + log.Info("Waiting for more connections before broadcasting", "connections left", ds.expectedConnections-len(broadcastList)-1) + time.Sleep(5 * time.Second) + } + if dispatcherLatency > 0 { + time.Sleep(time.Duration(dispatcherLatency) * time.Millisecond) + } + for _, conn := range broadcastList { + if err := conn.Send(stateRead); err != nil { + return fmt.Errorf("could not send broadcas for connection id %d: %w", connID, err) + } + } + } + return nil +} + type progressFunc func(stage stages.SyncStage) *stagedsync.StageState func newSync(quitCh <-chan struct{}, db ethdb.Database, tx ethdb.Database, hook stagedsync.ChangeSetHook) (*core.TinyChainContext, *core.BlockChain, *stagedsync.State, progressFunc) { diff --git a/eth/stagedsync/stage_call_traces.go b/eth/stagedsync/stage_call_traces.go index 09a6cefdd1a79139b4dc3bc878cae295cc69a031..d1133697ed29625f16aca6b0b7d30c17114a0fcf 100644 --- a/eth/stagedsync/stage_call_traces.go +++ b/eth/stagedsync/stage_call_traces.go @@ -33,7 +33,16 @@ const ( callIndicesCheckSizeEvery = 30 * time.Second ) -func SpawnCallTraces(s *StageState, db ethdb.Database, chainConfig *params.ChainConfig, chainContext core.ChainContext, tmpdir string, quit <-chan struct{}) error { +type StateAccessBuilder func(db ethdb.Database, blockNumber uint64, + accountCache, storageCache, codeCache, codeSizeCache *fastcache.Cache) (state.StateReader, state.WriterWithChangeSets) + +type CallTracesStageParams struct { + ToBlock uint64 // not setting this params means no limit + AccessBuilder StateAccessBuilder + PresetChanges bool // Whether to use changesets to pre-set values in the cache +} + +func SpawnCallTraces(s *StageState, db ethdb.Database, chainConfig *params.ChainConfig, chainContext core.ChainContext, tmpdir string, quit <-chan struct{}, params CallTracesStageParams) error { var tx ethdb.DbWithPendingMutations var useExternalTx bool if hasTx, ok := db.(ethdb.HasTx); ok && hasTx.Tx() != nil { @@ -49,6 +58,9 @@ func SpawnCallTraces(s *StageState, db ethdb.Database, chainConfig *params.Chain } endBlock, err := s.ExecutionAt(tx) + if params.ToBlock > 0 && params.ToBlock < endBlock { + endBlock = params.ToBlock + } logPrefix := s.state.LogPrefix() if err != nil { return fmt.Errorf("%s: getting last executed block: %w", logPrefix, err) @@ -58,7 +70,7 @@ func SpawnCallTraces(s *StageState, db ethdb.Database, chainConfig *params.Chain return nil } - if err := promoteCallTraces(logPrefix, tx, s.BlockNumber+1, endBlock, chainConfig, chainContext, tmpdir, quit); err != nil { + if err := promoteCallTraces(logPrefix, tx, s.BlockNumber+1, endBlock, chainConfig, chainContext, tmpdir, quit, params); err != nil { return err } @@ -74,7 +86,7 @@ func SpawnCallTraces(s *StageState, db ethdb.Database, chainConfig *params.Chain return nil } -func promoteCallTraces(logPrefix string, tx ethdb.Database, startBlock, endBlock uint64, chainConfig *params.ChainConfig, chainContext core.ChainContext, tmpdir string, quit <-chan struct{}) error { +func promoteCallTraces(logPrefix string, tx ethdb.Database, startBlock, endBlock uint64, chainConfig *params.ChainConfig, chainContext core.ChainContext, tmpdir string, quit <-chan struct{}, params CallTracesStageParams) error { logEvery := time.NewTicker(logInterval) defer logEvery.Stop() @@ -105,16 +117,20 @@ func promoteCallTraces(logPrefix string, tx ethdb.Database, startBlock, endBlock } prev := startBlock - accountCsKey, accountCsVal, errAcc := accountChangesCursor.Seek(dbutils.EncodeTimestamp(startBlock)) - if errAcc != nil { - return fmt.Errorf("%s: seeking in account changeset cursor: %v", logPrefix, errAcc) - } - accountsPreset := 0 - storageCsKey, storageCsVal, errSt := storageChangesCursor.Seek(dbutils.EncodeTimestamp(startBlock)) - if errSt != nil { - return fmt.Errorf("%s: seeking in storage changeset cursor: %v", logPrefix, errSt) + var accountCsKey, accountCsVal []byte + var errAcc error + var storageCsKey, storageCsVal []byte + var errSt error + if params.PresetChanges { + accountCsKey, accountCsVal, errAcc = accountChangesCursor.Seek(dbutils.EncodeTimestamp(startBlock)) + if errAcc != nil { + return fmt.Errorf("%s: seeking in account changeset cursor: %v", logPrefix, errAcc) + } + storageCsKey, storageCsVal, errSt = storageChangesCursor.Seek(dbutils.EncodeTimestamp(startBlock)) + if errSt != nil { + return fmt.Errorf("%s: seeking in storage changeset cursor: %v", logPrefix, errSt) + } } - storagePreset := 0 for blockNum := startBlock; blockNum <= endBlock; blockNum++ { if err := common.Stopped(quit); err != nil { return err @@ -138,13 +154,9 @@ func promoteCallTraces(logPrefix string, tx ethdb.Database, startBlock, endBlock log.Info(fmt.Sprintf("[%s] Progress", logPrefix), "number", blockNum, dbutils.CallFromIndex, common.StorageSize(sz), dbutils.CallToIndex, common.StorageSize(sz2), "blk/second", speed, - "accounts preset", accountsPreset, - "storage preset", storagePreset, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC)) - accountsPreset = 0 - storagePreset = 0 case <-checkFlushEvery.C: if needFlush(froms, callIndicesMemLimit) { if err := flushBitmaps(collectorFrom, froms); err != nil { @@ -173,7 +185,29 @@ func promoteCallTraces(logPrefix string, tx ethdb.Database, startBlock, endBlock senders := rawdb.ReadSenders(tx, blockHash, blockNum) block.Body().SendersToTxs(senders) - if accountCsKey != nil { + var stateReader state.StateReader + var stateWriter state.WriterWithChangeSets + if params.AccessBuilder != nil { + reader, writer := params.AccessBuilder(tx, blockNum-1, accountCache, storageCache, codeCache, codeSizeCache) + stateReader = reader + stateWriter = writer + } else { + reader := state.NewPlainDBState(tx.(ethdb.HasTx).Tx(), blockNum-1) + writer := state.NewCacheStateWriter() + if caching { + reader.SetAccountCache(accountCache) + reader.SetStorageCache(storageCache) + reader.SetCodeCache(codeCache) + reader.SetCodeSizeCache(codeSizeCache) + writer.SetAccountCache(accountCache) + writer.SetStorageCache(storageCache) + writer.SetCodeCache(codeCache) + writer.SetCodeSizeCache(codeSizeCache) + } + stateReader = reader + stateWriter = writer + } + if params.PresetChanges && accountCsKey != nil { accountCsBlockNum, _ := dbutils.DecodeTimestamp(accountCsKey) if accountCsBlockNum == blockNum { cs := changeset.AccountChangeSetPlainBytes(accountCsVal) @@ -187,14 +221,13 @@ func promoteCallTraces(logPrefix string, tx ethdb.Database, startBlock, endBlock } else { accountCache.Set(k, v) } - accountsPreset++ return nil }); errAcc != nil { return fmt.Errorf("%s: walking in account changeset: %v", logPrefix, errAcc) } } } - if storageCsKey != nil { + if params.PresetChanges && storageCsKey != nil { storageCsBlockNum, _ := dbutils.DecodeTimestamp(storageCsKey) if storageCsBlockNum == blockNum { cs := changeset.StorageChangeSetPlainBytes(storageCsVal) @@ -208,26 +241,12 @@ func promoteCallTraces(logPrefix string, tx ethdb.Database, startBlock, endBlock } else { storageCache.Set(k, v) } - storagePreset++ return nil }); errSt != nil { return fmt.Errorf("%s: walking in storage changeset: %v", logPrefix, errSt) } } } - stateReader := state.NewPlainDBState(tx.(ethdb.HasTx).Tx(), blockNum-1) - stateWriter := state.NewCacheStateWriter() - - if caching { - stateReader.SetAccountCache(accountCache) - stateReader.SetStorageCache(storageCache) - stateReader.SetCodeCache(codeCache) - stateReader.SetCodeSizeCache(codeSizeCache) - stateWriter.SetAccountCache(accountCache) - stateWriter.SetStorageCache(storageCache) - stateWriter.SetCodeCache(codeCache) - stateWriter.SetCodeSizeCache(codeSizeCache) - } tracer := NewCallTracer() vmConfig := &vm.Config{Debug: true, NoReceipts: true, ReadOnly: false, Tracer: tracer} diff --git a/eth/stagedsync/stagebuilder.go b/eth/stagedsync/stagebuilder.go index e13be5789eb452f566b9f590888f15018f4389bc..48bd169cfd2fcdae9d7fdf0c22bda864b570208a 100644 --- a/eth/stagedsync/stagebuilder.go +++ b/eth/stagedsync/stagebuilder.go @@ -273,7 +273,8 @@ func DefaultStages() StageBuilders { Disabled: !world.storageMode.CallTraces, DisabledDescription: "Work In Progress", ExecFunc: func(s *StageState, u Unwinder) error { - return SpawnCallTraces(s, world.TX, world.chainConfig, world.chainContext, world.tmpdir, world.QuitCh) + return SpawnCallTraces(s, world.TX, world.chainConfig, world.chainContext, world.tmpdir, world.QuitCh, + CallTracesStageParams{}) }, UnwindFunc: func(u *UnwindState, s *StageState) error { return UnwindCallTraces(u, s, world.TX, world.chainConfig, world.chainContext, world.QuitCh) diff --git a/turbo/shards/shards.go b/turbo/shards/shards.go new file mode 100644 index 0000000000000000000000000000000000000000..239fa66a60785d46c061be0fa3deb440fa7485f2 --- /dev/null +++ b/turbo/shards/shards.go @@ -0,0 +1,355 @@ +package shards + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + + "github.com/VictoriaMetrics/fastcache" + "github.com/holiman/uint256" + "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/core/state" + "github.com/ledgerwatch/turbo-geth/core/types/accounts" + "github.com/ledgerwatch/turbo-geth/crypto" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/log" +) + +//go:generate protoc --proto_path=. --go_out=.. --go-grpc_out=.. "shards.proto" -I=. -I=./../../build/include/google + +var emptyCodeHash = crypto.Keccak256(nil) + +// Implements StateReader and StateWriter, for specified shard +type Shard struct { + tx ethdb.Tx + blockNr uint64 + accountCache *fastcache.Cache + storageCache *fastcache.Cache + codeCache *fastcache.Cache + codeSizeCache *fastcache.Cache + client Dispatcher_StartDispatchClient + shardBits int // Number of high bits in the key that determine the shard ID (maximum 7) + shardID byte // Shard ID in the lower bit +} + +func NewShard(tx ethdb.Tx, + blockNr uint64, + client Dispatcher_StartDispatchClient, + accountCache, storageCache, codeCache, codeSizeCache *fastcache.Cache, + shardBits int, + shardID byte, +) *Shard { + shard := &Shard{ + tx: tx, + blockNr: blockNr, + client: client, + shardBits: shardBits, + shardID: shardID, + } + shard.SetAccountCache(accountCache) + shard.SetStorageCache(storageCache) + shard.SetCodeCache(codeCache) + shard.SetCodeSizeCache(codeSizeCache) + return shard +} + +func (s *Shard) SetAccountCache(accountCache *fastcache.Cache) { + s.accountCache = accountCache +} + +func (s *Shard) SetStorageCache(storageCache *fastcache.Cache) { + s.storageCache = storageCache +} + +func (s *Shard) SetCodeCache(codeCache *fastcache.Cache) { + s.codeCache = codeCache +} + +func (s *Shard) SetCodeSizeCache(codeSizeCache *fastcache.Cache) { + s.codeSizeCache = codeSizeCache +} + +func (s *Shard) SetBlockNr(blockNr uint64) { + s.blockNr = blockNr +} + +func (s *Shard) GetBlockNr() uint64 { + return s.blockNr +} + +func (s *Shard) isMyShard(firstByte byte) bool { + return (firstByte >> (8 - s.shardBits)) == s.shardID +} + +func (s *Shard) ReadAccountData(address common.Address) (*accounts.Account, error) { + var enc []byte + var ok bool + if !s.isMyShard(address[0]) { + stateRead, err := s.client.Recv() + if err != nil { + log.Error("reading remote state for ReadAccountData", "address", address, "error", err) + return nil, fmt.Errorf("reading remote state for ReadAccountData %x: %w", address, err) + } + if !bytes.Equal(stateRead.K, address.Bytes()) { + log.Error("read mismatched key", "expected", address, "got", stateRead.K) + return nil, fmt.Errorf("read mismatched key, expected %x, got %x", address, stateRead.K) + } + //log.Info("ReadAccountData from remote shard", "key", stateRead.K, "val", stateRead.V) + enc = stateRead.V + } else { + if s.accountCache != nil { + enc, ok = s.accountCache.HasGet(nil, address[:]) + } + if !ok { + var err error + enc, err = state.GetAsOf(s.tx, false /* storage */, address[:], s.blockNr+1) + if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { + return nil, err + } + } + if !ok && s.accountCache != nil { + s.accountCache.Set(address[:], enc) + } + //log.Info("sending ReadAccountData to remote shards", "address", address) + if err := s.client.Send(&StateRead{K: address.Bytes(), V: enc}); err != nil { + return nil, fmt.Errorf("sending remove state for ReadAccountData %x: %w", address, err) + } + } + if len(enc) == 0 { + return nil, nil + } + var acc accounts.Account + if err := acc.DecodeForStorage(enc); err != nil { + return nil, err + } + //restore codehash + if acc.Incarnation > 0 && acc.IsEmptyCodeHash() { + codeHash, err := s.tx.GetOne(dbutils.PlainContractCodeBucket, dbutils.PlainGenerateStoragePrefix(address[:], acc.Incarnation)) + if err != nil { + return nil, err + } + if len(codeHash) > 0 { + acc.CodeHash = common.BytesToHash(codeHash) + } + } + return &acc, nil +} + +func (s *Shard) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) { + compositeKey := dbutils.PlainGenerateCompositeStorageKey(address, incarnation, *key) + var enc []byte + var ok bool + if !s.isMyShard(address[0]) { + stateRead, err := s.client.Recv() + if err != nil { + return nil, fmt.Errorf("reading remote state for ReadAccountStorage %d: %w", compositeKey, err) + } + if !bytes.Equal(stateRead.K, compositeKey) { + return nil, fmt.Errorf("read mismatched key, expected %x, got %x", compositeKey, stateRead.K) + } + enc = stateRead.V + } else { + if s.storageCache != nil { + enc, ok = s.storageCache.HasGet(nil, compositeKey) + } + if !ok { + var err error + enc, err = state.GetAsOf(s.tx, true /* storage */, compositeKey, s.blockNr+1) + if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { + return nil, err + } + } + if !ok && s.storageCache != nil { + s.storageCache.Set(compositeKey, enc) + } + if err := s.client.Send(&StateRead{K: compositeKey, V: enc}); err != nil { + return nil, fmt.Errorf("sending remove state for ReadAccountStorage %x: %w", compositeKey, err) + } + } + if len(enc) == 0 { + return nil, nil + } + return enc, nil +} + +func (s *Shard) ReadAccountCode(address common.Address, codeHash common.Hash) ([]byte, error) { + if bytes.Equal(codeHash[:], emptyCodeHash) { + return nil, nil + } + var code []byte + var ok bool + if !s.isMyShard(address[0]) { + stateRead, err := s.client.Recv() + if err != nil { + return nil, fmt.Errorf("reading remote state for ReadAccountCode %x: %w", address, err) + } + if !bytes.Equal(stateRead.K, address.Bytes()) { + return nil, fmt.Errorf("read mismatched key, expected %x, got %x", address, stateRead.K) + } + code = stateRead.V + } else { + if s.codeCache != nil { + code, ok = s.codeCache.HasGet(nil, address[:]) + } + if !ok { + var err error + code, err = ethdb.Get(s.tx, dbutils.CodeBucket, codeHash[:]) + if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { + return nil, err + } + } + if !ok && s.codeCache != nil && len(code) <= 1024 { + s.codeCache.Set(address[:], code) + } + if !ok && s.codeSizeCache != nil { + var b [4]byte + binary.BigEndian.PutUint32(b[:], uint32(len(code))) + s.codeSizeCache.Set(address[:], b[:]) + } + if err := s.client.Send(&StateRead{K: address.Bytes(), V: code}); err != nil { + return nil, fmt.Errorf("sending remove state for ReadAccountCode %x: %w", address, err) + } + } + return code, nil +} + +func (s *Shard) ReadAccountCodeSize(address common.Address, codeHash common.Hash) (int, error) { + if bytes.Equal(codeHash[:], emptyCodeHash) { + return 0, nil + } + var size int + var ok bool + if !s.isMyShard(address[0]) { + stateRead, err := s.client.Recv() + if err != nil { + return 0, fmt.Errorf("reading remote state for ReadAccountCodeSize %x: %w", address, err) + } + if !bytes.Equal(stateRead.K, address.Bytes()) { + return 0, fmt.Errorf("read mismatched key, expected %x, got %x", address, stateRead.K) + } + size = int(binary.BigEndian.Uint32(stateRead.V)) + } else { + if s.codeSizeCache != nil { + var b []byte + if b, ok = s.codeSizeCache.HasGet(nil, address[:]); ok { + size = int(binary.BigEndian.Uint32(b)) + } + } + if !ok { + code, err := ethdb.Get(s.tx, dbutils.CodeBucket, codeHash[:]) + if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { + return 0, err + } + size = len(code) + } + var b [4]byte + binary.BigEndian.PutUint32(b[:], uint32(size)) + if !ok && s.codeSizeCache != nil { + s.codeSizeCache.Set(address[:], b[:]) + } + if err := s.client.Send(&StateRead{K: address.Bytes(), V: b[:]}); err != nil { + return 0, fmt.Errorf("sending remove state for ReadAccountCodeSize %x: %w", address, err) + } + } + return size, nil +} + +func (s *Shard) ReadAccountIncarnation(address common.Address) (uint64, error) { + enc, err := state.GetAsOf(s.tx, false /* storage */, address[:], s.blockNr+2) + if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { + return 0, err + } + if len(enc) == 0 { + return 0, nil + } + var acc accounts.Account + if err = acc.DecodeForStorage(enc); err != nil { + return 0, err + } + if acc.Incarnation == 0 { + return 0, nil + } + return acc.Incarnation - 1, nil +} + +func (s *Shard) UpdateAccountData(ctx context.Context, address common.Address, original, account *accounts.Account) error { + if !s.isMyShard(address[0]) { + return nil + } + if s.accountCache != nil { + value := make([]byte, account.EncodingLengthForStorage()) + account.EncodeForStorage(value) + s.accountCache.Set(address[:], value) + } + return nil +} + +func (s *Shard) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error { + if !s.isMyShard(address[0]) { + return nil + } + if s.codeCache != nil { + if len(code) <= 1024 { + s.codeCache.Set(address[:], code) + } else { + s.codeCache.Del(address[:]) + } + } + if s.codeSizeCache != nil { + var b [4]byte + binary.BigEndian.PutUint32(b[:], uint32(len(code))) + s.codeSizeCache.Set(address[:], b[:]) + } + return nil +} + +func (s *Shard) DeleteAccount(ctx context.Context, address common.Address, original *accounts.Account) error { + if !s.isMyShard(address[0]) { + return nil + } + if s.accountCache != nil { + s.accountCache.Set(address[:], nil) + } + if s.codeCache != nil { + s.codeCache.Set(address[:], nil) + } + if s.codeSizeCache != nil { + var b [4]byte + binary.BigEndian.PutUint32(b[:], 0) + s.codeSizeCache.Set(address[:], b[:]) + } + return nil +} + +func (s *Shard) WriteAccountStorage(ctx context.Context, address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error { + if *original == *value { + return nil + } + if !s.isMyShard(address[0]) { + return nil + } + if s.storageCache != nil { + compositeKey := dbutils.PlainGenerateCompositeStorageKey(address, incarnation, *key) + s.storageCache.Set(compositeKey, value.Bytes()) + } + return nil +} + +func (s *Shard) CreateContract(address common.Address) error { + return nil +} + +func (s *Shard) WriteChangeSets() error { + return nil +} + +func (s *Shard) WriteHistory() error { + return nil +} + +func (s *Shard) ChangeSetWriter() *state.ChangeSetWriter { + return nil +} diff --git a/turbo/shards/shards.pb.go b/turbo/shards/shards.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..2ccd5aa0febbd9ad2d77f5ed02b2da5df9d3ed3b --- /dev/null +++ b/turbo/shards/shards.pb.go @@ -0,0 +1,162 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.13.0 +// source: shards.proto + +package shards + +import ( + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type StateRead struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + K []byte `protobuf:"bytes,1,opt,name=k,proto3" json:"k,omitempty"` + V []byte `protobuf:"bytes,2,opt,name=v,proto3" json:"v,omitempty"` +} + +func (x *StateRead) Reset() { + *x = StateRead{} + if protoimpl.UnsafeEnabled { + mi := &file_shards_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StateRead) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StateRead) ProtoMessage() {} + +func (x *StateRead) ProtoReflect() protoreflect.Message { + mi := &file_shards_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StateRead.ProtoReflect.Descriptor instead. +func (*StateRead) Descriptor() ([]byte, []int) { + return file_shards_proto_rawDescGZIP(), []int{0} +} + +func (x *StateRead) GetK() []byte { + if x != nil { + return x.K + } + return nil +} + +func (x *StateRead) GetV() []byte { + if x != nil { + return x.V + } + return nil +} + +var File_shards_proto protoreflect.FileDescriptor + +var file_shards_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x64, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, + 0x73, 0x68, 0x61, 0x72, 0x64, 0x73, 0x22, 0x27, 0x0a, 0x09, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, + 0x65, 0x61, 0x64, 0x12, 0x0c, 0x0a, 0x01, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x01, + 0x6b, 0x12, 0x0c, 0x0a, 0x01, 0x76, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x01, 0x76, 0x32, + 0x47, 0x0a, 0x0a, 0x44, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x12, 0x39, 0x0a, + 0x0d, 0x53, 0x74, 0x61, 0x72, 0x74, 0x44, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, 0x12, 0x11, + 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x73, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x61, + 0x64, 0x1a, 0x11, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x73, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x52, 0x65, 0x61, 0x64, 0x28, 0x01, 0x30, 0x01, 0x42, 0x11, 0x5a, 0x0f, 0x2e, 0x2f, 0x73, 0x68, + 0x61, 0x72, 0x64, 0x73, 0x3b, 0x73, 0x68, 0x61, 0x72, 0x64, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +} + +var ( + file_shards_proto_rawDescOnce sync.Once + file_shards_proto_rawDescData = file_shards_proto_rawDesc +) + +func file_shards_proto_rawDescGZIP() []byte { + file_shards_proto_rawDescOnce.Do(func() { + file_shards_proto_rawDescData = protoimpl.X.CompressGZIP(file_shards_proto_rawDescData) + }) + return file_shards_proto_rawDescData +} + +var file_shards_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_shards_proto_goTypes = []interface{}{ + (*StateRead)(nil), // 0: shards.StateRead +} +var file_shards_proto_depIdxs = []int32{ + 0, // 0: shards.Dispatcher.StartDispatch:input_type -> shards.StateRead + 0, // 1: shards.Dispatcher.StartDispatch:output_type -> shards.StateRead + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_shards_proto_init() } +func file_shards_proto_init() { + if File_shards_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_shards_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StateRead); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_shards_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_shards_proto_goTypes, + DependencyIndexes: file_shards_proto_depIdxs, + MessageInfos: file_shards_proto_msgTypes, + }.Build() + File_shards_proto = out.File + file_shards_proto_rawDesc = nil + file_shards_proto_goTypes = nil + file_shards_proto_depIdxs = nil +} diff --git a/turbo/shards/shards.proto b/turbo/shards/shards.proto new file mode 100644 index 0000000000000000000000000000000000000000..713c7684aa7de9bfbe5e4afe8e42f3a2176d9abc --- /dev/null +++ b/turbo/shards/shards.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package shards; + +option go_package = "./shards;shards"; + +// Provides methods to access key-value data +service Dispatcher { + rpc StartDispatch(stream StateRead) returns (stream StateRead); +} + +message StateRead { + bytes k = 1; + bytes v = 2; +} \ No newline at end of file diff --git a/turbo/shards/shards_grpc.pb.go b/turbo/shards/shards_grpc.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..c71f387868142b9dc5b07d09a5bb40e49d790e04 --- /dev/null +++ b/turbo/shards/shards_grpc.pb.go @@ -0,0 +1,129 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package shards + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion7 + +// DispatcherClient is the client API for Dispatcher service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type DispatcherClient interface { + StartDispatch(ctx context.Context, opts ...grpc.CallOption) (Dispatcher_StartDispatchClient, error) +} + +type dispatcherClient struct { + cc grpc.ClientConnInterface +} + +func NewDispatcherClient(cc grpc.ClientConnInterface) DispatcherClient { + return &dispatcherClient{cc} +} + +func (c *dispatcherClient) StartDispatch(ctx context.Context, opts ...grpc.CallOption) (Dispatcher_StartDispatchClient, error) { + stream, err := c.cc.NewStream(ctx, &_Dispatcher_serviceDesc.Streams[0], "/shards.Dispatcher/StartDispatch", opts...) + if err != nil { + return nil, err + } + x := &dispatcherStartDispatchClient{stream} + return x, nil +} + +type Dispatcher_StartDispatchClient interface { + Send(*StateRead) error + Recv() (*StateRead, error) + grpc.ClientStream +} + +type dispatcherStartDispatchClient struct { + grpc.ClientStream +} + +func (x *dispatcherStartDispatchClient) Send(m *StateRead) error { + return x.ClientStream.SendMsg(m) +} + +func (x *dispatcherStartDispatchClient) Recv() (*StateRead, error) { + m := new(StateRead) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// DispatcherServer is the server API for Dispatcher service. +// All implementations must embed UnimplementedDispatcherServer +// for forward compatibility +type DispatcherServer interface { + StartDispatch(Dispatcher_StartDispatchServer) error + mustEmbedUnimplementedDispatcherServer() +} + +// UnimplementedDispatcherServer must be embedded to have forward compatible implementations. +type UnimplementedDispatcherServer struct { +} + +func (UnimplementedDispatcherServer) StartDispatch(Dispatcher_StartDispatchServer) error { + return status.Errorf(codes.Unimplemented, "method StartDispatch not implemented") +} +func (UnimplementedDispatcherServer) mustEmbedUnimplementedDispatcherServer() {} + +// UnsafeDispatcherServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DispatcherServer will +// result in compilation errors. +type UnsafeDispatcherServer interface { + mustEmbedUnimplementedDispatcherServer() +} + +func RegisterDispatcherServer(s grpc.ServiceRegistrar, srv DispatcherServer) { + s.RegisterService(&_Dispatcher_serviceDesc, srv) +} + +func _Dispatcher_StartDispatch_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(DispatcherServer).StartDispatch(&dispatcherStartDispatchServer{stream}) +} + +type Dispatcher_StartDispatchServer interface { + Send(*StateRead) error + Recv() (*StateRead, error) + grpc.ServerStream +} + +type dispatcherStartDispatchServer struct { + grpc.ServerStream +} + +func (x *dispatcherStartDispatchServer) Send(m *StateRead) error { + return x.ServerStream.SendMsg(m) +} + +func (x *dispatcherStartDispatchServer) Recv() (*StateRead, error) { + m := new(StateRead) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _Dispatcher_serviceDesc = grpc.ServiceDesc{ + ServiceName: "shards.Dispatcher", + HandlerType: (*DispatcherServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "StartDispatch", + Handler: _Dispatcher_StartDispatch_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "shards.proto", +} diff --git a/turbo/shards/shards_test.go b/turbo/shards/shards_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5cc5eb45e1888beb81a3232a48ad2240e765313a --- /dev/null +++ b/turbo/shards/shards_test.go @@ -0,0 +1,9 @@ +package shards + +import ( + "testing" +) + +func TestShards(t *testing.T) { + NewShard(nil, 0, nil, nil, nil, nil, nil, 2, 0) +}