From 031b0b177e2b69596f92a88e6bf34a02e556666a Mon Sep 17 00:00:00 2001 From: Alex Sharov <AskAlexSharov@gmail.com> Date: Tue, 20 Jul 2021 15:34:11 +0700 Subject: [PATCH] remove mutexes use (#2406) --- cmd/sentry/download/sentry.go | 44 ++++++++++++------------- consensus/aura/validators.go | 14 ++++---- core/state/journal.go | 18 +++++------ ethdb/remote/remotedbserver/mining.go | 46 +++++++++++++-------------- metrics/meter.go | 18 +++++------ turbo/remote/sentry_client.go | 18 +++++------ 6 files changed, 79 insertions(+), 79 deletions(-) diff --git a/cmd/sentry/download/sentry.go b/cmd/sentry/download/sentry.go index a6e981dbde..b7f2474d30 100644 --- a/cmd/sentry/download/sentry.go +++ b/cmd/sentry/download/sentry.go @@ -949,7 +949,7 @@ func (ss *SentryServerImpl) Messages(req *proto_sentry.MessagesRequest, server p // MessageStreams - it's safe to use this class as non-pointer type MessageStreams struct { - sync.RWMutex + mu sync.RWMutex id uint streams map[uint]proto_sentry.Sentry_MessagesServer } @@ -959,8 +959,8 @@ func NewStreamsList() *MessageStreams { } func (s *MessageStreams) Add(stream proto_sentry.Sentry_MessagesServer) (remove func()) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() if s.streams == nil { s.streams = make(map[uint]proto_sentry.Sentry_MessagesServer) } @@ -971,8 +971,8 @@ func (s *MessageStreams) Add(stream proto_sentry.Sentry_MessagesServer) (remove } func (s *MessageStreams) doBroadcast(reply *proto_sentry.InboundMessage) (ids []uint, errs []error) { - s.RLock() - defer s.RUnlock() + s.mu.RLock() + defer s.mu.RUnlock() for id, stream := range s.streams { err := stream.Send(reply) if err != nil { @@ -991,8 +991,8 @@ func (s *MessageStreams) Broadcast(reply *proto_sentry.InboundMessage) (errs []e var ids []uint ids, errs = s.doBroadcast(reply) if len(ids) > 0 { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() } for _, id := range ids { delete(s.streams, id) @@ -1001,14 +1001,14 @@ func (s *MessageStreams) Broadcast(reply *proto_sentry.InboundMessage) (errs []e } func (s *MessageStreams) Len() int { - s.RLock() - defer s.RUnlock() + s.mu.RLock() + defer s.mu.RUnlock() return len(s.streams) } func (s *MessageStreams) remove(id uint) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() _, ok := s.streams[id] if !ok { // double-unsubscribe support return @@ -1035,7 +1035,7 @@ func (ss *SentryServerImpl) Peers(req *proto_sentry.PeersRequest, server proto_s // PeersStreams - it's safe to use this class as non-pointer type PeersStreams struct { - sync.RWMutex + mu sync.RWMutex id uint streams map[uint]proto_sentry.Sentry_PeersServer } @@ -1045,8 +1045,8 @@ func NewPeersStreams() *PeersStreams { } func (s *PeersStreams) Add(stream proto_sentry.Sentry_PeersServer) (remove func()) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() if s.streams == nil { s.streams = make(map[uint]proto_sentry.Sentry_PeersServer) } @@ -1057,8 +1057,8 @@ func (s *PeersStreams) Add(stream proto_sentry.Sentry_PeersServer) (remove func( } func (s *PeersStreams) doBroadcast(reply *proto_sentry.PeersReply) (ids []uint, errs []error) { - s.RLock() - defer s.RUnlock() + s.mu.RLock() + defer s.mu.RUnlock() for id, stream := range s.streams { err := stream.Send(reply) if err != nil { @@ -1077,8 +1077,8 @@ func (s *PeersStreams) Broadcast(reply *proto_sentry.PeersReply) (errs []error) var ids []uint ids, errs = s.doBroadcast(reply) if len(ids) > 0 { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() } for _, id := range ids { delete(s.streams, id) @@ -1087,14 +1087,14 @@ func (s *PeersStreams) Broadcast(reply *proto_sentry.PeersReply) (errs []error) } func (s *PeersStreams) Len() int { - s.RLock() - defer s.RUnlock() + s.mu.RLock() + defer s.mu.RUnlock() return len(s.streams) } func (s *PeersStreams) remove(id uint) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() _, ok := s.streams[id] if !ok { // double-unsubscribe support return diff --git a/consensus/aura/validators.go b/consensus/aura/validators.go index a1b3f7cab4..c8e8c963ed 100644 --- a/consensus/aura/validators.go +++ b/consensus/aura/validators.go @@ -341,22 +341,22 @@ type ReportQueueItem struct { data []byte } type ReportQueue struct { - sync.RWMutex + mu sync.RWMutex list *list.List } //nolint func (q *ReportQueue) push(addr common.Address, blockNum uint64, data []byte) { - q.Lock() - defer q.Unlock() + q.mu.Lock() + defer q.mu.Unlock() q.list.PushBack(&ReportQueueItem{addr: addr, blockNum: blockNum, data: data}) } // Filters reports of validators that have already been reported or are banned. func (q *ReportQueue) filter(abi aurainterfaces.ValidatorSetABI, client client, ourAddr, contractAddr common.Address) error { - q.Lock() - defer q.Unlock() + q.mu.Lock() + defer q.mu.Unlock() for e := q.list.Front(); e != nil; e = e.Next() { el := e.Value.(*ReportQueueItem) // Check if the validator should be reported. @@ -387,8 +387,8 @@ func (q *ReportQueue) truncate() { // The maximum number of reports to keep queued. const MaxQueuedReports = 10 - q.RLock() - defer q.RUnlock() + q.mu.RLock() + defer q.mu.RUnlock() // Removes reports from the queue if it contains more than `MAX_QUEUED_REPORTS` entries. if q.list.Len() > MaxQueuedReports { log.Warn("Removing reports from report cache, even though it has not been finalized", "amount", q.list.Len()-MaxQueuedReports) diff --git a/core/state/journal.go b/core/state/journal.go index e660fdff22..a1674af064 100644 --- a/core/state/journal.go +++ b/core/state/journal.go @@ -40,7 +40,7 @@ type journalEntry interface { type journal struct { entries []journalEntry // Current changes tracked by the journal dirties map[common.Address]int // Dirty accounts and the number of changes - sync.RWMutex + mu sync.RWMutex } // newJournal create a new initialized journal. @@ -52,18 +52,18 @@ func newJournal() *journal { // append inserts a new modification entry to the end of the change journal. func (j *journal) append(entry journalEntry) { - j.Lock() + j.mu.Lock() j.entries = append(j.entries, entry) if addr := entry.dirtied(); addr != nil { j.dirties[*addr]++ } - j.Unlock() + j.mu.Unlock() } // revert undoes a batch of journalled modifications along with any reverted // dirty handling too. func (j *journal) revert(statedb *IntraBlockState, snapshot int) { - j.Lock() + j.mu.Lock() for i := len(j.entries) - 1; i >= snapshot; i-- { // Undo the changes made by the operation j.entries[i].revert(statedb) @@ -76,23 +76,23 @@ func (j *journal) revert(statedb *IntraBlockState, snapshot int) { } } j.entries = j.entries[:snapshot] - j.Unlock() + j.mu.Unlock() } // dirty explicitly sets an address to dirty, even if the change entries would // otherwise suggest it as clean. This method is an ugly hack to handle the RIPEMD // precompile consensus exception. func (j *journal) dirty(addr common.Address) { - j.Lock() + j.mu.Lock() j.dirties[addr]++ - j.Unlock() + j.mu.Unlock() } // length returns the current number of entries in the journal. func (j *journal) length() int { - j.RLock() + j.mu.RLock() n := len(j.entries) - j.RUnlock() + j.mu.RUnlock() return n } diff --git a/ethdb/remote/remotedbserver/mining.go b/ethdb/remote/remotedbserver/mining.go index 8154b29547..c35f169a17 100644 --- a/ethdb/remote/remotedbserver/mining.go +++ b/ethdb/remote/remotedbserver/mining.go @@ -145,12 +145,12 @@ func (s *MiningServer) BroadcastMinedBlock(block *types.Block) error { type MinedBlockStreams struct { chans map[uint]proto_txpool.Mining_OnMinedBlockServer id uint - sync.Mutex + mu sync.Mutex } func (s *MinedBlockStreams) Add(stream proto_txpool.Mining_OnMinedBlockServer) (remove func()) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() if s.chans == nil { s.chans = make(map[uint]proto_txpool.Mining_OnMinedBlockServer) } @@ -161,8 +161,8 @@ func (s *MinedBlockStreams) Add(stream proto_txpool.Mining_OnMinedBlockServer) ( } func (s *MinedBlockStreams) Broadcast(reply *proto_txpool.OnMinedBlockReply) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() for id, stream := range s.chans { err := stream.Send(reply) if err != nil { @@ -177,8 +177,8 @@ func (s *MinedBlockStreams) Broadcast(reply *proto_txpool.OnMinedBlockReply) { } func (s *MinedBlockStreams) remove(id uint) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() _, ok := s.chans[id] if !ok { // double-unsubscribe support return @@ -189,13 +189,13 @@ func (s *MinedBlockStreams) remove(id uint) { // PendingBlockStreams - it's safe to use this class as non-pointer type PendingBlockStreams struct { chans map[uint]proto_txpool.Mining_OnPendingBlockServer - sync.Mutex - id uint + mu sync.Mutex + id uint } func (s *PendingBlockStreams) Add(stream proto_txpool.Mining_OnPendingBlockServer) (remove func()) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() if s.chans == nil { s.chans = make(map[uint]proto_txpool.Mining_OnPendingBlockServer) } @@ -206,8 +206,8 @@ func (s *PendingBlockStreams) Add(stream proto_txpool.Mining_OnPendingBlockServe } func (s *PendingBlockStreams) Broadcast(reply *proto_txpool.OnPendingBlockReply) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() for id, stream := range s.chans { err := stream.Send(reply) if err != nil { @@ -222,8 +222,8 @@ func (s *PendingBlockStreams) Broadcast(reply *proto_txpool.OnPendingBlockReply) } func (s *PendingBlockStreams) remove(id uint) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() _, ok := s.chans[id] if !ok { // double-unsubscribe support return @@ -234,13 +234,13 @@ func (s *PendingBlockStreams) remove(id uint) { // PendingLogsStreams - it's safe to use this class as non-pointer type PendingLogsStreams struct { chans map[uint]proto_txpool.Mining_OnPendingLogsServer - sync.Mutex - id uint + mu sync.Mutex + id uint } func (s *PendingLogsStreams) Add(stream proto_txpool.Mining_OnPendingLogsServer) (remove func()) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() if s.chans == nil { s.chans = make(map[uint]proto_txpool.Mining_OnPendingLogsServer) } @@ -251,8 +251,8 @@ func (s *PendingLogsStreams) Add(stream proto_txpool.Mining_OnPendingLogsServer) } func (s *PendingLogsStreams) Broadcast(reply *proto_txpool.OnPendingLogsReply) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() for id, stream := range s.chans { err := stream.Send(reply) if err != nil { @@ -267,8 +267,8 @@ func (s *PendingLogsStreams) Broadcast(reply *proto_txpool.OnPendingLogsReply) { } func (s *PendingLogsStreams) remove(id uint) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() _, ok := s.chans[id] if !ok { // double-unsubscribe support return diff --git a/metrics/meter.go b/metrics/meter.go index c606cfb88e..87a5a91dc2 100644 --- a/metrics/meter.go +++ b/metrics/meter.go @@ -50,8 +50,8 @@ func NewMeter() Meter { return NilMeter{} } m := newStandardMeter() - arbiter.Lock() - defer arbiter.Unlock() + arbiter.mu.Lock() + defer arbiter.mu.Unlock() arbiter.meters[m] = struct{}{} if !arbiter.started { arbiter.started = true @@ -65,8 +65,8 @@ func NewMeter() Meter { // Be sure to call Stop() once the meter is of no use to allow for garbage collection. func NewMeterForced() Meter { m := newStandardMeter() - arbiter.Lock() - defer arbiter.Unlock() + arbiter.mu.Lock() + defer arbiter.mu.Unlock() arbiter.meters[m] = struct{}{} if !arbiter.started { arbiter.started = true @@ -192,9 +192,9 @@ func newStandardMeter() *StandardMeter { func (m *StandardMeter) Stop() { stopped := atomic.SwapUint32(&m.stopped, 1) if stopped != 1 { - arbiter.Lock() + arbiter.mu.Lock() delete(arbiter.meters, m) - arbiter.Unlock() + arbiter.mu.Unlock() } } @@ -281,7 +281,7 @@ func (m *StandardMeter) tick() { // meterArbiter ticks meters every 5s from a single goroutine. // meters are references in a set for future stopping. type meterArbiter struct { - sync.RWMutex + mu sync.RWMutex started bool meters map[*StandardMeter]struct{} ticker *time.Ticker @@ -298,8 +298,8 @@ func (ma *meterArbiter) tick() { } func (ma *meterArbiter) tickMeters() { - ma.RLock() - defer ma.RUnlock() + ma.mu.RLock() + defer ma.mu.RUnlock() for meter := range ma.meters { meter.tick() } diff --git a/turbo/remote/sentry_client.go b/turbo/remote/sentry_client.go index 5cee2a56c2..a6ace8476f 100644 --- a/turbo/remote/sentry_client.go +++ b/turbo/remote/sentry_client.go @@ -22,7 +22,7 @@ type SentryClient interface { type SentryClientRemote struct { proto_sentry.SentryClient - sync.RWMutex + mu sync.RWMutex protocol uint ready bool } @@ -35,20 +35,20 @@ func NewSentryClientRemote(client proto_sentry.SentryClient) *SentryClientRemote } func (c *SentryClientRemote) Protocol() uint { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() return c.protocol } func (c *SentryClientRemote) Ready() bool { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() return c.ready } func (c *SentryClientRemote) MarkDisconnected() { - c.Lock() - defer c.Unlock() + c.mu.Lock() + defer c.mu.Unlock() c.ready = false } @@ -58,8 +58,8 @@ func (c *SentryClientRemote) SetStatus(ctx context.Context, in *proto_sentry.Sta return nil, err } - c.Lock() - defer c.Unlock() + c.mu.Lock() + defer c.mu.Unlock() switch reply.Protocol { case proto_sentry.Protocol_ETH65: c.protocol = eth.ETH65 -- GitLab