diff --git a/Makefile b/Makefile index db975a22bd58aa6bc34d01b5b14309dff19197e1..f9cf23b2f305c250ca02b470f56b194002573807 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,7 @@ lintci: lintci-deps: rm -f ./build/bin/golangci-lint - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b ./build/bin v1.20.1 + curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b ./build/bin v1.21.0 clean: ./build/clean_go_build_cache.sh diff --git a/accounts/accounts.go b/accounts/accounts.go index 930033aa03861927eb4f5ce6ce0ed18306228aeb..25892650b3e859cb73e2cb4d2a7fa29a2cb5bb3c 100644 --- a/accounts/accounts.go +++ b/accounts/accounts.go @@ -169,6 +169,8 @@ type Backend interface { // Subscribe creates an async subscription to receive notifications when the // backend detects the arrival or departure of a wallet. Subscribe(sink chan<- WalletEvent) event.Subscription + + Close() } // TextHash is a helper function that calculates a hash for the given message that can be diff --git a/accounts/external/backend.go b/accounts/external/backend.go index c0e4a6a71f81b19fed3442ec9f22a5c48aeb3fd1..c1be78cc0254c0d60925e42af4250fa2e45a8571 100644 --- a/accounts/external/backend.go +++ b/accounts/external/backend.go @@ -58,6 +58,12 @@ func (eb *ExternalBackend) Subscribe(sink chan<- accounts.WalletEvent) event.Sub }) } +func (eb *ExternalBackend) Close() { + for _, w := range eb.signers { + w.Close() + } +} + // ExternalSigner provides an API to interact with an external signer (clef) // It proxies request to the external signer while forwarding relevant // request headers diff --git a/accounts/keystore/keystore.go b/accounts/keystore/keystore.go index 44d87122769f413dda6aa015b0b940b74d35a090..17ab96c003b06c013439c7bac807faab5d1571ae 100644 --- a/accounts/keystore/keystore.go +++ b/accounts/keystore/keystore.go @@ -29,7 +29,6 @@ import ( "os" "path/filepath" "reflect" - "runtime" "sync" "time" @@ -101,12 +100,6 @@ func (ks *KeyStore) init(keydir string) { ks.unlocked = make(map[common.Address]*unlocked) ks.cache, ks.changes = newAccountCache(keydir) - // TODO: In order for this finalizer to work, there must be no references - // to ks. addressCache doesn't keep a reference but unlocked keys do, - // so the finalizer will not trigger until all timed unlocks have expired. - runtime.SetFinalizer(ks, func(m *KeyStore) { - m.cache.close() - }) // Create the initial list of wallets from the cache accs := ks.cache.accounts() ks.wallets = make([]accounts.Wallet, len(accs)) @@ -487,6 +480,10 @@ func (ks *KeyStore) ImportPreSaleKey(keyJSON []byte, passphrase string) (account return a, nil } +func (ks *KeyStore) Close() { + ks.cache.close() +} + // zeroKey zeroes a private key in memory. func zeroKey(k *ecdsa.PrivateKey) { b := k.D.Bits() diff --git a/accounts/keystore/watch.go b/accounts/keystore/watch.go index 8dce029a6688a19d8d24547010285c81a1c11ab0..de2c514869095abf44a6e00ffefe07c2477cf82a 100644 --- a/accounts/keystore/watch.go +++ b/accounts/keystore/watch.go @@ -19,10 +19,12 @@ package keystore import ( + "sync/atomic" "time" - "github.com/ledgerwatch/turbo-geth/log" "github.com/rjeczalik/notify" + + "github.com/ledgerwatch/turbo-geth/log" ) type watcher struct { @@ -33,6 +35,8 @@ type watcher struct { quit chan struct{} } +var watcherCount = new(int32) + func newWatcher(ac *accountCache) *watcher { return &watcher{ ac: ac, @@ -57,6 +61,8 @@ func (w *watcher) close() { } func (w *watcher) loop() { + atomic.AddInt32(watcherCount, 1) + defer func() { w.ac.mu.Lock() w.running = false @@ -69,7 +75,14 @@ func (w *watcher) loop() { logger.Trace("Failed to watch keystore folder", "err", err) return } - defer notify.Stop(w.ev) + + defer func() { + notify.Stop(w.ev) + if count := atomic.AddInt32(watcherCount, -1); count == 0 { + notify.Close() + } + }() + logger.Trace("Started watching keystore folder") defer logger.Trace("Stopped watching keystore folder") diff --git a/accounts/manager.go b/accounts/manager.go index c0a77f1ae7bfc201aa9213108e87a9eed44ba857..c1311fd0c433f6b564a66157b5a3e4237e18d542 100644 --- a/accounts/manager.go +++ b/accounts/manager.go @@ -70,7 +70,7 @@ func NewManager(config *Config, backends ...Backend) *Manager { updaters: subs, updates: updates, wallets: wallets, - quit: make(chan chan error), + quit: make(chan chan error, 1), } for _, backend := range backends { kind := reflect.TypeOf(backend) @@ -85,6 +85,13 @@ func NewManager(config *Config, backends ...Backend) *Manager { func (am *Manager) Close() error { errc := make(chan error) am.quit <- errc + + for _, backs := range am.backends { + for _, b := range backs { + b.Close() + } + } + return <-errc } diff --git a/accounts/scwallet/hub.go b/accounts/scwallet/hub.go index bede88781af4aceb69193611813f494b54e37bcc..2abbc930245089e341345b808cc5f870f4a1c1b7 100644 --- a/accounts/scwallet/hub.go +++ b/accounts/scwallet/hub.go @@ -300,3 +300,7 @@ func (hub *Hub) updater() { hub.stateLock.Unlock() } } + +func (hub *Hub) Close() { + close(hub.quit) +} diff --git a/accounts/usbwallet/hub.go b/accounts/usbwallet/hub.go index 7d7354d30cbf326a00323976b9b1c6d45ddf48a0..372d76b136cfe7b4cd3fa71e9a4631580421fc49 100644 --- a/accounts/usbwallet/hub.go +++ b/accounts/usbwallet/hub.go @@ -278,3 +278,7 @@ func (hub *Hub) updater() { hub.stateLock.Unlock() } } + +func (hub *Hub) Close() { + close(hub.quit) +} diff --git a/cmd/state/deps.go b/cmd/state/deps.go index 1a46775711301f8d3c0686797914586b76f8773f..a53ae159c23da8c97246f2694cdb2524feaafb2a 100644 --- a/cmd/state/deps.go +++ b/cmd/state/deps.go @@ -52,9 +52,9 @@ func (dt *DepTracer) CaptureState(env *vm.EVM, pc uint64, op vm.OpCode, gas, cos if smap, ok := dt.storageWriteSetFrame[addr]; ok { smap[loc] = struct{}{} } else { - smap_dest := make(map[common.Hash]struct{}) - smap_dest[loc] = struct{}{} - dt.storageWriteSetFrame[addr] = smap_dest + smapDest := make(map[common.Hash]struct{}) + smapDest[loc] = struct{}{} + dt.storageWriteSetFrame[addr] = smapDest } } else if op == vm.SLOAD { addr := contract.Address() @@ -65,9 +65,9 @@ func (dt *DepTracer) CaptureState(env *vm.EVM, pc uint64, op vm.OpCode, gas, cos if smap, ok := dt.storageReadSet[addr]; ok { smap[loc] = struct{}{} } else { - smap_dest := make(map[common.Hash]struct{}) - smap_dest[loc] = struct{}{} - dt.storageReadSet[addr] = smap_dest + smapDest := make(map[common.Hash]struct{}) + smapDest[loc] = struct{}{} + dt.storageReadSet[addr] = smapDest } } return nil @@ -84,9 +84,9 @@ func (dt *DepTracer) CaptureEnd(depth int, output []byte, gasUsed uint64, t time dt.accountsWriteSet[addr] = struct{}{} } for addr, smap := range dt.storageWriteSetFrame { - if smap_dest, ok := dt.storageWriteSet[addr]; ok { + if smapDest, ok := dt.storageWriteSet[addr]; ok { for loc := range smap { - smap_dest[loc] = struct{}{} + smapDest[loc] = struct{}{} } } else { dt.storageWriteSet[addr] = smap diff --git a/core/blockchain.go b/core/blockchain.go index e0ffca15871934c74b3be187c56e3f7bc5d7e035..c2907735041b0927809e5a6e1ec008f98d57cf65 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -178,6 +178,7 @@ type BlockChain struct { // procInterrupt must be atomically called procInterrupt int32 // interrupt signaler for block processing wg sync.WaitGroup // chain processing wait group for shutting down + quitMu sync.RWMutex engine consensus.Engine validator Validator // Block and state validator interface @@ -876,8 +877,9 @@ func (bc *BlockChain) Stop() { // Unsubscribe all subscriptions registered from blockchain bc.scope.Close() close(bc.quit) - atomic.StoreInt32(&bc.procInterrupt, 1) - bc.wg.Wait() + + bc.waitJobs() + if bc.pruner != nil { bc.pruner.Stop() } @@ -989,8 +991,10 @@ type numberHash struct { func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) { // We don't require the chainMu here since we want to maximize the // concurrency of header insertion and receipt insertion. - bc.wg.Add(1) - defer bc.wg.Done() + if err := bc.addJob(); err != nil { + return 0, nil + } + defer bc.doneJob() var ( ancientBlocks, liveBlocks types.Blocks @@ -1058,7 +1062,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ var deleted []*numberHash for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.getProcInterrupt() { return 0, errInsertionInterrupted } // Short circuit insertion if it is required(used in testing only) @@ -1131,7 +1135,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ batch := bc.db.NewBatch() for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.getProcInterrupt() { return 0, errInsertionInterrupted } // Short circuit if the owner header is unknown @@ -1198,14 +1202,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return 0, nil } -var lastWrite uint64 - // writeBlockWithoutState writes only the block and its metadata to the database, // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) { - bc.wg.Add(1) - defer bc.wg.Done() + if err := bc.addJob(); err != nil { + return nil + } + defer bc.doneJob() if err := bc.hc.WriteTd(bc.db, block.Hash(), block.NumberU64(), td); err != nil { return err @@ -1218,8 +1222,10 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e // writeKnownBlock updates the head block flag with a known block // and introduces chain reorg if necessary. func (bc *BlockChain) writeKnownBlock(block *types.Block) error { - bc.wg.Add(1) - defer bc.wg.Done() + if err := bc.addJob(); err != nil { + return nil + } + defer bc.doneJob() current := bc.CurrentBlock() if block.ParentHash() != current.Hash() { @@ -1246,8 +1252,10 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. // writeBlockWithState writes the block and all associated state to the database, // but is expects the chain mutex to be held. func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, stateDb *state.IntraBlockState, tds *state.TrieDbState) (status WriteStatus, err error) { - bc.wg.Add(1) - defer bc.wg.Done() + if err = bc.addJob(); err != nil { + return NonStatTy, nil + } + defer bc.doneJob() // Make sure no inconsistent state is leaked during insertion currentBlock := bc.CurrentBlock() @@ -1373,12 +1381,14 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { } // Only insert if the difficulty of the inserted chain is bigger than existing chain // Pre-checks passed, start the full block imports - bc.wg.Add(1) + if err := bc.addJob(); err != nil { + return 0, nil + } + defer bc.doneJob() ctx := bc.WithContext(context.Background(), chain[0].Number()) bc.chainmu.Lock() n, events, logs, err := bc.insertChain(ctx, chain, true) bc.chainmu.Unlock() - bc.wg.Done() bc.PostChainEvents(events, logs) return n, err @@ -1395,7 +1405,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { log.Info("Inserting chain", "start", chain[0].NumberU64(), "end", chain[len(chain)-1].NumberU64()) // If the chain is terminating, don't even bother starting u - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.getProcInterrupt() { return 0, nil, nil, nil } // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) @@ -1497,7 +1507,7 @@ func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, verif k = i - offset } // If the chain is terminating, stop processing blocks - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.getProcInterrupt() { log.Debug("Premature abort during blocks processing") break } @@ -2036,8 +2046,10 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i bc.chainmu.Lock() defer bc.chainmu.Unlock() - bc.wg.Add(1) - defer bc.wg.Done() + if err := bc.addJob(); err != nil { + return 0, nil + } + defer bc.doneJob() whFunc := func(header *types.Header) error { _, err := bc.hc.WriteHeader(header) @@ -2198,3 +2210,25 @@ type Pruner interface { Start() error Stop() } + +func (bc *BlockChain) addJob() error { + bc.quitMu.RLock() + defer bc.quitMu.RUnlock() + if bc.getProcInterrupt() { + return errors.New("blockchain is stopped") + } + bc.wg.Add(1) + + return nil +} + +func (bc *BlockChain) doneJob() { + bc.wg.Done() +} + +func (bc *BlockChain) waitJobs() { + bc.quitMu.Lock() + atomic.StoreInt32(&bc.procInterrupt, 1) + bc.wg.Wait() + bc.quitMu.Unlock() +} diff --git a/dashboard/log.go b/dashboard/log.go index 4213eb54e6ecdbd8b96c2025349fc6c5ddf169f6..f5c57329ef7e09248f85f4de16473b5a118150ba 100644 --- a/dashboard/log.go +++ b/dashboard/log.go @@ -26,9 +26,10 @@ import ( "sort" "time" - "github.com/ledgerwatch/turbo-geth/log" "github.com/mohae/deepcopy" "github.com/rjeczalik/notify" + + "github.com/ledgerwatch/turbo-geth/log" ) var emptyChunk = json.RawMessage("[]") diff --git a/eth/backend.go b/eth/backend.go index 06d445d56c67288cfb5849d2730ed15ba82b91f9..aa1d40343cc9b96287df59a04d0a0105e06f4e65 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -576,7 +576,7 @@ func (s *Ethereum) Stop() error { s.lesServer.Stop() } s.txPool.Stop() - //s.miner.Stop() + //s.miner.Close() s.eventMux.Stop() s.chainDb.Close() diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 2377813eb7b39b03d3c20974d39b986af47c4018..0975dc473fa6e603b6274d24fba599f520e9deb3 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -640,7 +640,12 @@ func (f *Fetcher) insert(peer string, block *types.Block) { // Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { - defer func() { f.done <- hash }() + defer func() { + select { + case <-f.quit: + case f.done <- hash: + } + }() // If the parent's unknown, abort insertion parent := f.getBlock(block.ParentHash()) diff --git a/eth/filters/api.go b/eth/filters/api.go index e056fda15f38cb8e1895e264c52907f6571fcbfa..e62bcec64addb8f5662ccea9e557fee3c11ca930 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -66,6 +66,7 @@ func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { api := &PublicFilterAPI{ backend: backend, mux: backend.EventMux(), + quit: make(chan struct{}, 1), chainDb: backend.ChainDb(), events: NewEventSystem(backend.EventMux(), backend, lightMode), filters: make(map[rpc.ID]*filter), @@ -80,7 +81,13 @@ func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { func (api *PublicFilterAPI) timeoutLoop() { ticker := time.NewTicker(5 * time.Minute) for { - <-ticker.C + select { + case <-ticker.C: + //nothing to do + case <-api.quit: + return + } + api.filtersMu.Lock() for id, f := range api.filters { select { @@ -95,6 +102,10 @@ func (api *PublicFilterAPI) timeoutLoop() { } } +func (api *PublicFilterAPI) Close() { + close(api.quit) +} + // NewPendingTransactionFilter creates a filter that fetches pending transaction hashes // as transactions enter the pending state. // @@ -126,6 +137,8 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { delete(api.filters, pendingTxSub.ID) api.filtersMu.Unlock() return + case <-api.quit: + return } } }() @@ -161,6 +174,8 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su case <-notifier.Closed(): pendingTxSub.Unsubscribe() return + case <-api.quit: + return } } }() @@ -196,6 +211,8 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { delete(api.filters, headerSub.ID) api.filtersMu.Unlock() return + case <-api.quit: + return } } }() @@ -226,6 +243,8 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er case <-notifier.Closed(): headersSub.Unsubscribe() return + case <-api.quit: + return } } }() @@ -264,6 +283,8 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc case <-notifier.Closed(): // connection dropped logsSub.Unsubscribe() return + case <-api.quit: + return } } }() @@ -313,6 +334,8 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { delete(api.filters, logsSub.ID) api.filtersMu.Unlock() return + case <-api.quit: + return } } }() diff --git a/go.mod b/go.mod index 24b57e2583a3f092ff5fbd19dc56aca274f35189..2561f5797561096250b4885b0c0e73ac65278844 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ledgerwatch/turbo-geth -go 1.12 +go 1.13 require ( github.com/Azure/azure-storage-blob-go v0.8.0 @@ -67,7 +67,7 @@ require ( github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582 - golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb + golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd golang.org/x/text v0.3.0 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce @@ -75,3 +75,5 @@ require ( gopkg.in/sourcemap.v1 v1.0.5 // indirect gotest.tools v2.2.0+incompatible // indirect ) + +replace github.com/rjeczalik/notify => github.com/JekaMas/notify v0.9.4 diff --git a/go.sum b/go.sum index f564552c40f83c1b31bb68211128388b29457919..ff37d18a2529a798f0ddfb792fa4a0d910001120 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6L github.com/Azure/go-autorest/tracing v0.5.0 h1:TRn4WjSnkcSy5AEG3pnbtFSwNtwzjr4VYyQflFE619k= github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/JekaMas/notify v0.9.4 h1:Ns+DRf9kho8T0yQNSKoZqAnbvO/Hg3KmJCUaoRhL7MM= +github.com/JekaMas/notify v0.9.4/go.mod h1:KYZd45vBSOYP2/9lY38EjZtvKRZMfgWaJk8bvBxhIYk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= @@ -190,7 +192,6 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.10.0 h1:If5rVCMTp6W2SiRAQFlbpJNgVlgMEd+U2GZckwK38ic= github.com/prometheus/tsdb v0.10.0/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSgwXEyGCt4= -github.com/rjeczalik/notify v0.9.1 h1:CLCKso/QK1snAlnhNR/CNvNiFU2saUtjV0bx3EwNeCE= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= github.com/robertkrimen/otto v0.0.0-20170205013659-6a77b7cbc37d h1:ouzpe+YhpIfnjR40gSkJHWsvXmB6TiPKqMtMpfyU9DE= github.com/robertkrimen/otto v0.0.0-20170205013659-6a77b7cbc37d/go.mod h1:xvqspoSXJTIpemEonrMDFq6XzwHYYgToXWj5eRX1OtY= @@ -259,6 +260,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 h1:mzjBh+S5frKOsOBobWIMAbXavqjmgO17k/2puhcFR94= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -267,6 +269,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd h1:3x5uuvBgE6oaXJjCOvpCC1IpgJogqQ+PqGGU3ZxAgII= +golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/miner/miner.go b/miner/miner.go index 6af8284c641507bcd76f42b5335bd25871750293..eb6aa1d02174994e924ddee11b096500ed6772ed 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -20,6 +20,7 @@ package miner import ( "fmt" "math/big" + "sync" "sync/atomic" "time" @@ -55,12 +56,13 @@ type Config struct { // Miner creates blocks and searches for proof-of-work values. type Miner struct { - mux *event.TypeMux - worker *worker - coinbase common.Address - eth Backend - engine consensus.Engine - exitCh chan struct{} + mux *event.TypeMux + worker *worker + coinbase common.Address + coinbaseMu sync.RWMutex + eth Backend + engine consensus.Engine + exitCh chan struct{} canStart int32 // can start indicates whether we can start the mining operation shouldStart int32 // should start indicates whether we should start after sync @@ -108,7 +110,7 @@ func (mnr *Miner) update() { atomic.StoreInt32(&mnr.canStart, 1) atomic.StoreInt32(&mnr.shouldStart, 0) if shouldStart { - mnr.Start(mnr.coinbase) + mnr.Start(mnr.getCoinbase()) } // stop immediately and ignore all further pending events return @@ -136,6 +138,7 @@ func (mnr *Miner) Stop() { } func (mnr *Miner) Close() { + mnr.worker.stop() mnr.worker.close() close(mnr.exitCh) } @@ -179,6 +182,18 @@ func (mnr *Miner) PendingBlock() *types.Block { } func (mnr *Miner) SetEtherbase(addr common.Address) { - mnr.coinbase = addr + mnr.setCoinbase(addr) mnr.worker.setEtherbase(addr) } + +func (mnr *Miner) getCoinbase() common.Address { + mnr.coinbaseMu.RLock() + defer mnr.coinbaseMu.RUnlock() + return mnr.coinbase +} + +func (mnr *Miner) setCoinbase(addr common.Address) { + mnr.coinbaseMu.Lock() + mnr.coinbase = addr + mnr.coinbaseMu.Unlock() +} diff --git a/miner/worker.go b/miner/worker.go index 5527c408d7520a1fe5af48afe3a6a8b2607678b0..6f2024229d0e4ce82ecf8a6a641b0f3e8b94842d 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -196,8 +196,8 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - newWorkCh: make(chan *newWorkReq), - taskCh: make(chan *task), + newWorkCh: make(chan *newWorkReq, 1), + taskCh: make(chan *task, 1), resultCh: make(chan *types.Block, resultQueueSize), exitCh: make(chan struct{}), startCh: make(chan struct{}, 1), diff --git a/node/node.go b/node/node.go index ea24cd3fba2af82cfd6e850d3801e86b8cace0d5..625e739d0ae0a770968d48703b32dbfa7f2afca0 100644 --- a/node/node.go +++ b/node/node.go @@ -438,6 +438,17 @@ func (n *Node) Stop() error { n.stopWS() n.stopHTTP() n.stopIPC() + + type closer interface { + Close() + } + + for _, api := range n.rpcAPIs { + if closeAPI, ok := api.Service.(closer); ok { + closeAPI.Close() + } + } + n.rpcAPIs = nil failure := &StopError{ Services: make(map[reflect.Type]error), diff --git a/p2p/dial.go b/p2p/dial.go index 87cd77d75d82746203f8b7cd444795edc8dc5ae2..b4327bd1d3aabe6c4d75eff4fe08616b876ccee1 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -356,7 +356,7 @@ func (t *discoverTask) Do(srv *Server) { // event loop spins too fast. next := srv.lastLookup.Add(lookupInterval) if now := time.Now(); now.Before(next) { - time.Sleep(next.Sub(now)) + sleep(next.Sub(now), srv.quit) } srv.lastLookup = time.Now() t.results = srv.ntab.LookupRandom() @@ -370,9 +370,21 @@ func (t *discoverTask) String() string { return s } -func (t waitExpireTask) Do(*Server) { - time.Sleep(t.Duration) +func (t waitExpireTask) Do(srv *Server) { + sleep(t.Duration, srv.quit) } func (t waitExpireTask) String() string { return fmt.Sprintf("wait for dial hist expire (%v)", t.Duration) } + +func sleep(d time.Duration, quit chan struct{}) { + timer := time.NewTimer(d) + defer timer.Stop() + + select { + case <-timer.C: + //nothing to do + case <-quit: + return + } +} diff --git a/p2p/server.go b/p2p/server.go index ebf79473cf903beb29feadfea8ad351bc84fa82b..8b7b29fba98b9d241ad11c5bd854bfac597607cf 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -381,11 +381,11 @@ func (srv *Server) Stop() { return } srv.running = false + close(srv.quit) if srv.listener != nil { // this unblocks listener Accept srv.listener.Close() } - close(srv.quit) srv.lock.Unlock() srv.loopWG.Wait() } @@ -650,7 +650,21 @@ func (srv *Server) run(dialstate dialer) { for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] srv.log.Trace("New dial task", "task", t) - go func() { t.Do(srv); taskdone <- t }() + + go func() { + select { + case <-srv.quit: + return + default: + t.Do(srv) + } + + select { + case <-srv.quit: + return + case taskdone <- t: + } + }() runningTasks = append(runningTasks, t) } return ts[i:]