diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 9a852646a76811ea70ef0c98deb64b983943b84a..aebc42c0a234582e6046459a46d905f76bf5e875 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -31,6 +31,7 @@ import ( "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/migrations" "github.com/ledgerwatch/erigon/p2p" + "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/params" stages2 "github.com/ledgerwatch/erigon/turbo/stages" "github.com/ledgerwatch/erigon/turbo/txpool" @@ -1068,7 +1069,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) if err != nil { panic(err) } - fetchTx := func(peerID string, hashes []common.Hash) error { + fetchTx := func(peerID enode.ID, hashes []common.Hash) error { txPoolP2PServer.SendTxsRequest(context.TODO(), peerID, hashes) return nil } diff --git a/cmd/sentry/download/broadcast.go b/cmd/sentry/download/broadcast.go index 57d9471fc308cbb5fb2b6d5af3a731595560a907..fce68ee7b9a705dafe2b3dbbf928e7e20253cf0c 100644 --- a/cmd/sentry/download/broadcast.go +++ b/cmd/sentry/download/broadcast.go @@ -214,7 +214,7 @@ func (cs *ControlServerImpl) BroadcastRemotePooledTxs(ctx context.Context, txs [ } } -func (cs *ControlServerImpl) PropagatePooledTxsToPeersList(ctx context.Context, peers []*types2.H512, txs []common.Hash) { +func (cs *ControlServerImpl) PropagatePooledTxsToPeersList(ctx context.Context, peers []*types2.H256, txs []common.Hash) { if len(txs) == 0 { return } diff --git a/cmd/sentry/download/downloader.go b/cmd/sentry/download/downloader.go index 94f7441c8e165df4a4bf1f3d7bfeb6478a4a41b9..10fc8a167761ec0c63ec33b6401f7c7ed43fd7e4 100644 --- a/cmd/sentry/download/downloader.go +++ b/cmd/sentry/download/downloader.go @@ -384,7 +384,7 @@ func (cs *ControlServerImpl) newBlockHashes66(ctx context.Context, req *proto_se if !cs.Hd.RequestChaining() && !cs.Hd.Fetching() { return nil } - //log.Info(fmt.Sprintf("NewBlockHashes from [%s]", gointerfaces.ConvertH512ToBytes(req.PeerId))) + //log.Info(fmt.Sprintf("NewBlockHashes from [%s]", ConvertH256ToPeerID(req.PeerId))) var request eth.NewBlockHashesPacket if err := rlp.DecodeBytes(req.Data, &request); err != nil { return fmt.Errorf("decode NewBlockHashes66: %w", err) @@ -429,7 +429,7 @@ func (cs *ControlServerImpl) newBlockHashes65(ctx context.Context, req *proto_se if !cs.Hd.RequestChaining() && !cs.Hd.Fetching() { return nil } - //log.Info(fmt.Sprintf("NewBlockHashes from [%s]", gointerfaces.ConvertH512ToBytes(req.PeerId))) + //log.Info(fmt.Sprintf("NewBlockHashes from [%s]", ConvertH256ToPeerID(req.PeerId))) var request eth.NewBlockHashesPacket if err := rlp.DecodeBytes(req.Data, &request); err != nil { return fmt.Errorf("decode newBlockHashes65: %w", err) @@ -508,7 +508,7 @@ func (cs *ControlServerImpl) blockHeaders66(ctx context.Context, in *proto_sentr if penalty == headerdownload.NoPenalty { var canRequestMore bool for _, segment := range segments { - requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, string(gointerfaces.ConvertH512ToBytes(in.PeerId))) + requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(in.PeerId)) canRequestMore = canRequestMore || requestMore if len(penalties) > 0 { cs.Penalize(ctx, penalties) @@ -519,7 +519,7 @@ func (cs *ControlServerImpl) blockHeaders66(ctx context.Context, in *proto_sentr currentTime := uint64(time.Now().Unix()) req, penalties := cs.Hd.RequestMoreHeaders(currentTime) if req != nil { - if peer := cs.SendHeaderRequest(ctx, req); peer != nil { + if _, ok := cs.SendHeaderRequest(ctx, req); ok { cs.Hd.SentRequest(req, currentTime, 5 /* timeout */) log.Trace("Sent request", "height", req.Number) } @@ -583,7 +583,7 @@ func (cs *ControlServerImpl) blockHeaders65(ctx context.Context, in *proto_sentr if penalty == headerdownload.NoPenalty { var canRequestMore bool for _, segment := range segments { - requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, string(gointerfaces.ConvertH512ToBytes(in.PeerId))) + requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(in.PeerId)) canRequestMore = canRequestMore || requestMore if len(penalties) > 0 { cs.Penalize(ctx, penalties) @@ -594,7 +594,7 @@ func (cs *ControlServerImpl) blockHeaders65(ctx context.Context, in *proto_sentr currentTime := uint64(time.Now().Unix()) req, penalties := cs.Hd.RequestMoreHeaders(currentTime) if req != nil { - if peer := cs.SendHeaderRequest(ctx, req); peer != nil { + if _, ok := cs.SendHeaderRequest(ctx, req); ok { cs.Hd.SentRequest(req, currentTime, 5 /* timeout */) log.Trace("Sent request", "height", req.Number) } @@ -649,7 +649,7 @@ func (cs *ControlServerImpl) newBlock65(ctx context.Context, inreq *proto_sentry } if segments, penalty, err := cs.Hd.SingleHeaderAsSegment(headerRaw, request.Block.Header()); err == nil { if penalty == headerdownload.NoPenalty { - cs.Hd.ProcessSegment(segments[0], true /* newBlock */, string(gointerfaces.ConvertH512ToBytes(inreq.PeerId))) // There is only one segment in this case + cs.Hd.ProcessSegment(segments[0], true /* newBlock */, ConvertH256ToPeerID(inreq.PeerId)) // There is only one segment in this case } else { outreq := proto_sentry.PenalizePeerRequest{ PeerId: inreq.PeerId, @@ -675,7 +675,7 @@ func (cs *ControlServerImpl) newBlock65(ctx context.Context, inreq *proto_sentry if _, err1 := sentry.PeerMinBlock(ctx, &outreq, &grpc.EmptyCallOption{}); err1 != nil { log.Error("Could not send min block for peer", "err", err1) } - log.Trace(fmt.Sprintf("NewBlockMsg{blockNumber: %d} from [%s]", request.Block.NumberU64(), gointerfaces.ConvertH512ToBytes(inreq.PeerId))) + log.Trace(fmt.Sprintf("NewBlockMsg{blockNumber: %d} from [%s]", request.Block.NumberU64(), ConvertH256ToPeerID(inreq.PeerId))) return nil } @@ -685,7 +685,7 @@ func (cs *ControlServerImpl) blockBodies66(inreq *proto_sentry.InboundMessage, s return fmt.Errorf("decode BlockBodiesPacket66: %w", err) } txs, uncles := request.BlockRawBodiesPacket.Unpack() - cs.Bd.DeliverBodies(txs, uncles, uint64(len(inreq.Data)), string(gointerfaces.ConvertH512ToBytes(inreq.PeerId))) + cs.Bd.DeliverBodies(txs, uncles, uint64(len(inreq.Data)), ConvertH256ToPeerID(inreq.PeerId)) return nil } @@ -695,7 +695,7 @@ func (cs *ControlServerImpl) blockBodies65(inreq *proto_sentry.InboundMessage, s return fmt.Errorf("decode blockBodies65: %w", err) } txs, uncles := request.Unpack() - cs.Bd.DeliverBodies(txs, uncles, uint64(len(inreq.Data)), string(gointerfaces.ConvertH512ToBytes(inreq.PeerId))) + cs.Bd.DeliverBodies(txs, uncles, uint64(len(inreq.Data)), ConvertH256ToPeerID(inreq.PeerId)) return nil } @@ -743,7 +743,7 @@ func (cs *ControlServerImpl) getBlockHeaders66(ctx context.Context, inreq *proto } return fmt.Errorf("send header response 66: %w", err) } - //log.Info(fmt.Sprintf("[%s] GetBlockHeaderMsg{hash=%x, number=%d, amount=%d, skip=%d, reverse=%t, responseLen=%d}", string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query.Origin.Hash, query.Origin.Number, query.Amount, query.Skip, query.Reverse, len(b))) + //log.Info(fmt.Sprintf("[%s] GetBlockHeaderMsg{hash=%x, number=%d, amount=%d, skip=%d, reverse=%t, responseLen=%d}", ConvertH256ToPeerID(inreq.PeerId), query.Origin.Hash, query.Origin.Number, query.Amount, query.Skip, query.Reverse, len(b))) return nil } @@ -780,7 +780,7 @@ func (cs *ControlServerImpl) getBlockHeaders65(ctx context.Context, inreq *proto return fmt.Errorf("send header response 65: %w", err) } } - //log.Info(fmt.Sprintf("[%s] GetBlockHeaderMsg{hash=%x, number=%d, amount=%d, skip=%d, reverse=%t, responseLen=%d}", string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query.Origin.Hash, query.Origin.Number, query.Amount, query.Skip, query.Reverse, len(b))) + //log.Info(fmt.Sprintf("[%s] GetBlockHeaderMsg{hash=%x, number=%d, amount=%d, skip=%d, reverse=%t, responseLen=%d}", ConvertH256ToPeerID(inreq.PeerId), query.Origin.Hash, query.Origin.Number, query.Amount, query.Skip, query.Reverse, len(b))) return nil } @@ -817,7 +817,7 @@ func (cs *ControlServerImpl) getBlockBodies66(ctx context.Context, inreq *proto_ } return fmt.Errorf("send bodies response: %w", err) } - //log.Info(fmt.Sprintf("[%s] GetBlockBodiesMsg responseLen %d", string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), len(b))) + //log.Info(fmt.Sprintf("[%s] GetBlockBodiesMsg responseLen %d", ConvertH256ToPeerID(inreq.PeerId), len(b))) return nil } @@ -851,7 +851,7 @@ func (cs *ControlServerImpl) getBlockBodies65(ctx context.Context, inreq *proto_ } return fmt.Errorf("send bodies response: %w", err) } - //log.Info(fmt.Sprintf("[%s] GetBlockBodiesMsg responseLen %d", string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), len(b))) + //log.Info(fmt.Sprintf("[%s] GetBlockBodiesMsg responseLen %d", ConvertH256ToPeerID(inreq.PeerId), len(b))) return nil } @@ -891,7 +891,7 @@ func (cs *ControlServerImpl) getReceipts66(ctx context.Context, inreq *proto_sen } return fmt.Errorf("send bodies response: %w", err) } - //log.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), len(b))) + //log.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH256ToPeerID(inreq.PeerId), len(b))) return nil } @@ -928,7 +928,7 @@ func (cs *ControlServerImpl) getReceipts65(ctx context.Context, inreq *proto_sen } return fmt.Errorf("send bodies response: %w", err) } - //log.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), len(b))) + //log.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH256ToPeerID(inreq.PeerId), len(b))) return nil } diff --git a/cmd/sentry/download/sentry.go b/cmd/sentry/download/sentry.go index 3e6b9b161248dd9342b2171d175fe78f91801406..b8f4ceb385858497a8c4e7b538a39dec4c932ac6 100644 --- a/cmd/sentry/download/sentry.go +++ b/cmd/sentry/download/sentry.go @@ -55,6 +55,10 @@ type PeerInfo struct { removed bool } +func (pi *PeerInfo) ID() enode.ID { + return pi.peer.ID() +} + // AddDeadline adds given deadline to the list of deadlines // Deadlines must be added in the chronological order for the function // ClearDeadlines to work correctly (it uses binary search) @@ -102,6 +106,12 @@ func (pi *PeerInfo) Removed() bool { return pi.removed } +// ConvertH256ToPeerID() ensures the return type is enode.ID rather than [32]byte +// so that short variable declarations will still be formatted as hex in logs +func ConvertH256ToPeerID(h256 *proto_types.H256) enode.ID { + return gointerfaces.ConvertH256ToHash(h256) +} + func makeP2PServer( p2pConfig p2p.Config, genesisHash common.Hash, @@ -133,7 +143,7 @@ func makeP2PServer( func handShake( ctx context.Context, status *proto_sentry.StatusData, - peerID string, + peerID enode.ID, rw p2p.MsgReadWriter, version uint, minVersion uint, @@ -235,11 +245,11 @@ func handShake( func runPeer( ctx context.Context, - peerID string, + peerID enode.ID, protocol uint, rw p2p.MsgReadWriter, peerInfo *PeerInfo, - send func(msgId proto_sentry.MessageId, peerID string, b []byte), + send func(msgId proto_sentry.MessageId, peerID enode.ID, b []byte), hasSubscribers func(msgId proto_sentry.MessageId) bool, ) error { printTime := time.Now().Add(time.Minute) @@ -451,7 +461,7 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod Length: 17, DialCandidates: dialCandidates, Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { - peerID := peer.ID().String() + peerID := peer.ID() if ss.getPeer(peerID) != nil { log.Trace(fmt.Sprintf("[%s] Peer already has connection", peerID)) return nil @@ -466,7 +476,7 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod defer ss.GoodPeers.Delete(peerID) err := handShake(ctx, ss.GetStatus(), peerID, rw, protocol, protocol, func(bestHash common.Hash) error { ss.GoodPeers.Store(peerID, peerInfo) - ss.sendNewPeerToClients(gointerfaces.ConvertBytesToH512([]byte(peerID))) + ss.sendNewPeerToClients(gointerfaces.ConvertHashToH256(peerID)) return ss.startSync(ctx, bestHash, peerID) }) if err != nil { @@ -490,8 +500,7 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod NodeInfo: func() interface{} { return readNodeInfo() }, - PeerInfo: func(id enode.ID) interface{} { - peerID := id.String() + PeerInfo: func(peerID enode.ID) interface{} { if peerInfo := ss.getPeer(peerID); peerInfo != nil { return peerInfo.peer.Info() } @@ -540,18 +549,17 @@ type SentryServerImpl struct { p2p *p2p.Config } -func (ss *SentryServerImpl) rangePeers(f func(peerID string, peerInfo *PeerInfo) bool) { +func (ss *SentryServerImpl) rangePeers(f func(peerInfo *PeerInfo) bool) { ss.GoodPeers.Range(func(key, value interface{}) bool { peerInfo, _ := value.(*PeerInfo) if peerInfo == nil { return true } - peerID := key.(string) - return f(peerID, peerInfo) + return f(peerInfo) }) } -func (ss *SentryServerImpl) getPeer(peerID string) (peerInfo *PeerInfo) { +func (ss *SentryServerImpl) getPeer(peerID enode.ID) (peerInfo *PeerInfo) { if value, ok := ss.GoodPeers.Load(peerID); ok { peerInfo := value.(*PeerInfo) if peerInfo != nil { @@ -562,7 +570,7 @@ func (ss *SentryServerImpl) getPeer(peerID string) (peerInfo *PeerInfo) { return nil } -func (ss *SentryServerImpl) removePeer(peerID string) { +func (ss *SentryServerImpl) removePeer(peerID enode.ID) { if value, ok := ss.GoodPeers.LoadAndDelete(peerID); ok { peerInfo := value.(*PeerInfo) if peerInfo != nil { @@ -571,16 +579,16 @@ func (ss *SentryServerImpl) removePeer(peerID string) { } } -func (ss *SentryServerImpl) writePeer(peerID string, peerInfo *PeerInfo, msgcode uint64, data []byte) error { +func (ss *SentryServerImpl) writePeer(peerInfo *PeerInfo, msgcode uint64, data []byte) error { err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)}) if err != nil { peerInfo.Remove() - ss.GoodPeers.Delete(peerID) + ss.GoodPeers.Delete(peerInfo.ID()) } return err } -func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, peerID string) error { +func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, peerID enode.ID) error { switch ss.Protocol.Version { case eth.ETH66: b, err := rlp.EncodeToBytes(ð.GetBlockHeadersPacket66{ @@ -596,7 +604,7 @@ func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, return fmt.Errorf("startSync encode packet failed: %w", err) } if _, err := ss.SendMessageById(ctx, &proto_sentry.SendMessageByIdRequest{ - PeerId: gointerfaces.ConvertBytesToH512([]byte(peerID)), + PeerId: gointerfaces.ConvertHashToH256(peerID), Data: &proto_sentry.OutboundMessageData{ Id: proto_sentry.MessageId_GET_BLOCK_HEADERS_66, Data: b, @@ -610,13 +618,13 @@ func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, func (ss *SentryServerImpl) PenalizePeer(_ context.Context, req *proto_sentry.PenalizePeerRequest) (*emptypb.Empty, error) { //log.Warn("Received penalty", "kind", req.GetPenalty().Descriptor().FullName, "from", fmt.Sprintf("%s", req.GetPeerId())) - peerID := string(gointerfaces.ConvertH512ToBytes(req.PeerId)) + peerID := ConvertH256ToPeerID(req.PeerId) ss.removePeer(peerID) return &emptypb.Empty{}, nil } func (ss *SentryServerImpl) PeerMinBlock(_ context.Context, req *proto_sentry.PeerMinBlockRequest) (*emptypb.Empty, error) { - peerID := string(gointerfaces.ConvertH512ToBytes(req.PeerId)) + peerID := ConvertH256ToPeerID(req.PeerId) if peerInfo := ss.getPeer(peerID); peerInfo != nil { if req.MinBlock > peerInfo.Height() { peerInfo.SetHeight(req.MinBlock) @@ -625,13 +633,12 @@ func (ss *SentryServerImpl) PeerMinBlock(_ context.Context, req *proto_sentry.Pe return &emptypb.Empty{}, nil } -func (ss *SentryServerImpl) findPeer(minBlock uint64) (string, *PeerInfo, bool) { +func (ss *SentryServerImpl) findPeer(minBlock uint64) (*PeerInfo, bool) { // Choose a peer that we can send this request to, with maximum number of permits - var foundPeerID string var foundPeerInfo *PeerInfo var maxPermits int now := time.Now() - ss.rangePeers(func(peerID string, peerInfo *PeerInfo) bool { + ss.rangePeers(func(peerInfo *PeerInfo) bool { if peerInfo.Height() >= minBlock { deadlines := peerInfo.ClearDeadlines(now, false /* givePermit */) //fmt.Printf("%d deadlines for peer %s\n", deadlines, peerID) @@ -639,18 +646,17 @@ func (ss *SentryServerImpl) findPeer(minBlock uint64) (string, *PeerInfo, bool) permits := maxPermitsPerPeer - deadlines if permits > maxPermits { maxPermits = permits - foundPeerID = peerID foundPeerInfo = peerInfo } } } return true }) - return foundPeerID, foundPeerInfo, maxPermits > 0 + return foundPeerInfo, maxPermits > 0 } func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *proto_sentry.SendMessageByMinBlockRequest) (*proto_sentry.SentPeers, error) { - peerID, peerInfo, found := ss.findPeer(inreq.MinBlock) + peerInfo, found := ss.findPeer(inreq.MinBlock) if !found { return &proto_sentry.SentPeers{}, nil } @@ -660,15 +666,15 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot msgcode != eth.GetPooledTransactionsMsg { return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock not implemented for message Id: %s", inreq.Data.Id) } - if err := ss.writePeer(peerID, peerInfo, msgcode, inreq.Data.Data); err != nil { - return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock to peer %s: %w", peerID, err) + if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil { + return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock to peer %s: %w", peerInfo.ID(), err) } peerInfo.AddDeadline(time.Now().Add(30 * time.Second)) - return &proto_sentry.SentPeers{Peers: []*proto_types.H512{gointerfaces.ConvertBytesToH512([]byte(peerID))}}, nil + return &proto_sentry.SentPeers{Peers: []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())}}, nil } func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) { - peerID := string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)) + peerID := ConvertH256ToPeerID(inreq.PeerId) peerInfo := ss.getPeer(peerID) if peerInfo == nil { //TODO: enable after support peer to sentry mapping @@ -687,10 +693,10 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageById not implemented for message Id: %s", inreq.Data.Id) } - if err := ss.writePeer(peerID, peerInfo, msgcode, inreq.Data.Data); err != nil { + if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil { return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageById to peer %s: %w", peerID, err) } - return &proto_sentry.SentPeers{Peers: []*proto_types.H512{inreq.PeerId}}, nil + return &proto_sentry.SentPeers{Peers: []*proto_types.H256{inreq.PeerId}}, nil } func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *proto_sentry.SendMessageToRandomPeersRequest) (*proto_sentry.SentPeers, error) { @@ -702,7 +708,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p } amount := uint64(0) - ss.rangePeers(func(peerID string, peerInfo *PeerInfo) bool { + ss.rangePeers(func(peerInfo *PeerInfo) bool { amount++ return true }) @@ -714,13 +720,13 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p sendToAmount := int(math.Sqrt(float64(amount))) i := 0 var innerErr error - reply := &proto_sentry.SentPeers{Peers: []*proto_types.H512{}} - ss.rangePeers(func(peerID string, peerInfo *PeerInfo) bool { - if err := ss.writePeer(peerID, peerInfo, msgcode, req.Data.Data); err != nil { + reply := &proto_sentry.SentPeers{Peers: []*proto_types.H256{}} + ss.rangePeers(func(peerInfo *PeerInfo) bool { + if err := ss.writePeer(peerInfo, msgcode, req.Data.Data); err != nil { innerErr = err return true } - reply.Peers = append(reply.Peers, gointerfaces.ConvertBytesToH512([]byte(peerID))) + reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID())) i++ return i < sendToAmount }) @@ -739,13 +745,13 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen } var innerErr error - reply := &proto_sentry.SentPeers{Peers: []*proto_types.H512{}} - ss.rangePeers(func(peerID string, peerInfo *PeerInfo) bool { - if err := ss.writePeer(peerID, peerInfo, msgcode, req.Data); err != nil { + reply := &proto_sentry.SentPeers{Peers: []*proto_types.H256{}} + ss.rangePeers(func(peerInfo *PeerInfo) bool { + if err := ss.writePeer(peerInfo, msgcode, req.Data); err != nil { innerErr = err return true } - reply.Peers = append(reply.Peers, gointerfaces.ConvertBytesToH512([]byte(peerID))) + reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID())) return true }) if innerErr != nil { @@ -806,7 +812,7 @@ func (ss *SentryServerImpl) SetStatus(_ context.Context, statusData *proto_sentr } func (ss *SentryServerImpl) SimplePeerCount() (pc int) { - ss.rangePeers(func(peerID string, peerInfo *PeerInfo) bool { + ss.rangePeers(func(peerInfo *PeerInfo) bool { pc++ return true }) @@ -833,11 +839,11 @@ func (ss *SentryServerImpl) GetStatus() *proto_sentry.StatusData { return ss.statusData } -func (ss *SentryServerImpl) send(msgID proto_sentry.MessageId, peerID string, b []byte) { +func (ss *SentryServerImpl) send(msgID proto_sentry.MessageId, peerID enode.ID, b []byte) { ss.messageStreamsLock.RLock() defer ss.messageStreamsLock.RUnlock() req := &proto_sentry.InboundMessage{ - PeerId: gointerfaces.ConvertBytesToH512([]byte(peerID)), + PeerId: gointerfaces.ConvertHashToH256(peerID), Id: msgID, Data: b, } @@ -921,7 +927,7 @@ func (ss *SentryServerImpl) Close() { } } -func (ss *SentryServerImpl) sendNewPeerToClients(peerID *proto_types.H512) { +func (ss *SentryServerImpl) sendNewPeerToClients(peerID *proto_types.H256) { if err := ss.peersStreams.Broadcast(&proto_sentry.PeersReply{PeerId: peerID, Event: proto_sentry.PeersReply_Connect}); err != nil { log.Warn("Sending new peer notice to core P2P failed", "error", err) } diff --git a/cmd/sentry/download/sentry_api.go b/cmd/sentry/download/sentry_api.go index 05f7663d7dd22c023be2e3289202d4f9d1e7148a..6cefab0d18a8f424b99d5eced65bd2a4b0c6e1f7 100644 --- a/cmd/sentry/download/sentry_api.go +++ b/cmd/sentry/download/sentry_api.go @@ -9,6 +9,7 @@ import ( proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/eth/protocols/eth" + "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/stages/bodydownload" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" @@ -36,7 +37,7 @@ func (cs *ControlServerImpl) UpdateHead(ctx context.Context, height uint64, hash } } -func (cs *ControlServerImpl) SendBodyRequest(ctx context.Context, req *bodydownload.BodyRequest) []byte { +func (cs *ControlServerImpl) SendBodyRequest(ctx context.Context, req *bodydownload.BodyRequest) (peerID enode.ID, ok bool) { // if sentry not found peers to send such message, try next one. stop if found. for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() { if !cs.sentries[i].Ready() { @@ -54,7 +55,7 @@ func (cs *ControlServerImpl) SendBodyRequest(ctx context.Context, req *bodydownl }) if err != nil { log.Error("Could not encode block bodies request", "err", err) - return nil + return enode.ID{}, false } outreq := proto_sentry.SendMessageByMinBlockRequest{ MinBlock: req.BlockNums[len(req.BlockNums)-1], @@ -67,18 +68,18 @@ func (cs *ControlServerImpl) SendBodyRequest(ctx context.Context, req *bodydownl sentPeers, err1 := cs.sentries[i].SendMessageByMinBlock(ctx, &outreq, &grpc.EmptyCallOption{}) if err1 != nil { log.Error("Could not send block bodies request", "err", err1) - return nil + return enode.ID{}, false } if sentPeers == nil || len(sentPeers.Peers) == 0 { continue } - return gointerfaces.ConvertH512ToBytes(sentPeers.Peers[0]) + return ConvertH256ToPeerID(sentPeers.Peers[0]), true } } - return nil + return enode.ID{}, false } -func (cs *ControlServerImpl) SendHeaderRequest(ctx context.Context, req *headerdownload.HeaderRequest) []byte { +func (cs *ControlServerImpl) SendHeaderRequest(ctx context.Context, req *headerdownload.HeaderRequest) (peerID enode.ID, ok bool) { // if sentry not found peers to send such message, try next one. stop if found. for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() { if !cs.sentries[i].Ready() { @@ -102,7 +103,7 @@ func (cs *ControlServerImpl) SendHeaderRequest(ctx context.Context, req *headerd bytes, err := rlp.EncodeToBytes(reqData) if err != nil { log.Error("Could not encode header request", "err", err) - return nil + return enode.ID{}, false } minBlock := req.Number if !req.Reverse { @@ -119,15 +120,15 @@ func (cs *ControlServerImpl) SendHeaderRequest(ctx context.Context, req *headerd sentPeers, err1 := cs.sentries[i].SendMessageByMinBlock(ctx, &outreq, &grpc.EmptyCallOption{}) if err1 != nil { log.Error("Could not send header request", "err", err1) - return nil + return enode.ID{}, false } if sentPeers == nil || len(sentPeers.Peers) == 0 { continue } - return gointerfaces.ConvertH512ToBytes(sentPeers.Peers[0]) + return ConvertH256ToPeerID(sentPeers.Peers[0]), true } } - return nil + return enode.ID{}, false } func (cs *ControlServerImpl) randSentryIndex() (int, bool, func() (int, bool)) { @@ -146,7 +147,7 @@ func (cs *ControlServerImpl) randSentryIndex() (int, bool, func() (int, bool)) { func (cs *ControlServerImpl) Penalize(ctx context.Context, penalties []headerdownload.PenaltyItem) { for i := range penalties { outreq := proto_sentry.PenalizePeerRequest{ - PeerId: gointerfaces.ConvertBytesToH512([]byte(penalties[i].PeerID)), + PeerId: gointerfaces.ConvertHashToH256(penalties[i].PeerID), Penalty: proto_sentry.PenaltyKind_Kick, // TODO: Extend penalty kinds } for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() { diff --git a/cmd/sentry/download/sentry_test.go b/cmd/sentry/download/sentry_test.go index 1df63b8bcfade6d3ad2ca042c8666f76e4083d9f..95a989e54cc9e9f4198bad2dfb4b3134ef08a377 100644 --- a/cmd/sentry/download/sentry_test.go +++ b/cmd/sentry/download/sentry_test.go @@ -17,6 +17,7 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/eth/protocols/eth" "github.com/ledgerwatch/erigon/p2p" + "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/params" "github.com/stretchr/testify/require" ) @@ -93,8 +94,8 @@ func testForkIDSplit(t *testing.T, protocol uint) { defer p2pProFork.Close() errc := make(chan error, 2) - go func() { errc <- handShake(ctx, s1.GetStatus(), "1", p2pNoFork, protocol, protocol, nil) }() - go func() { errc <- handShake(ctx, s2.GetStatus(), "2", p2pProFork, protocol, protocol, nil) }() + go func() { errc <- handShake(ctx, s1.GetStatus(), enode.ID{1}, p2pNoFork, protocol, protocol, nil) }() + go func() { errc <- handShake(ctx, s2.GetStatus(), enode.ID{2}, p2pProFork, protocol, protocol, nil) }() for i := 0; i < 2; i++ { select { @@ -111,8 +112,8 @@ func testForkIDSplit(t *testing.T, protocol uint) { s1.statusData.MaxBlock = 1 s2.statusData.MaxBlock = 1 - go func() { errc <- handShake(ctx, s1.GetStatus(), "1", p2pNoFork, protocol, protocol, nil) }() - go func() { errc <- handShake(ctx, s2.GetStatus(), "2", p2pProFork, protocol, protocol, nil) }() + go func() { errc <- handShake(ctx, s1.GetStatus(), enode.ID{1}, p2pNoFork, protocol, protocol, nil) }() + go func() { errc <- handShake(ctx, s2.GetStatus(), enode.ID{2}, p2pProFork, protocol, protocol, nil) }() for i := 0; i < 2; i++ { select { @@ -130,8 +131,8 @@ func testForkIDSplit(t *testing.T, protocol uint) { s2.statusData.MaxBlock = 2 // Both nodes should allow the other to connect (same genesis, next fork is the same) - go func() { errc <- handShake(ctx, s1.GetStatus(), "1", p2pNoFork, protocol, protocol, nil) }() - go func() { errc <- handShake(ctx, s2.GetStatus(), "2", p2pProFork, protocol, protocol, nil) }() + go func() { errc <- handShake(ctx, s1.GetStatus(), enode.ID{1}, p2pNoFork, protocol, protocol, nil) }() + go func() { errc <- handShake(ctx, s2.GetStatus(), enode.ID{2}, p2pProFork, protocol, protocol, nil) }() var successes int for i := 0; i < 2; i++ { diff --git a/eth/backend.go b/eth/backend.go index 1e00ec66eb89c367e6f296772b13017fa2f91bc3..cb81bf192d16d4b9702b556ff7a3650cba5d70b0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -62,6 +62,7 @@ import ( "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/node" "github.com/ledgerwatch/erigon/p2p" + "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/shards" @@ -342,7 +343,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere return nil, err } - fetchTx := func(peerID string, hashes []common.Hash) error { + fetchTx := func(peerID enode.ID, hashes []common.Hash) error { backend.txPoolP2PServer.SendTxsRequest(context.TODO(), peerID, hashes) return nil } diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index a75a099d5938171ec3a1bdc6962d695fbf072f21..59313a62fb1406742ecfb3b744de4cae391d6aa7 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -20,7 +20,6 @@ import ( "bytes" "fmt" mrand "math/rand" - "sort" "time" mapset "github.com/deckarep/golang-set" @@ -30,6 +29,7 @@ import ( "github.com/ledgerwatch/erigon/common/mclock" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/log/v3" //"github.com/ledgerwatch/erigon/metrics" ) @@ -101,7 +101,7 @@ var ( // txAnnounce is the notification of the availability of a batch // of new transactions in the network. type txAnnounce struct { - origin string // Identifier of the peer originating the notification + origin enode.ID // Identifier of the peer originating the notification hashes []common.Hash // Batch of transaction hashes being announced } @@ -116,14 +116,14 @@ type txRequest struct { // txDelivery is the notification that a batch of transactions have been added // to the pool and should be untracked. type txDelivery struct { - origin string // Identifier of the peer originating the notification + origin enode.ID // Identifier of the peer originating the notification hashes []common.Hash // Batch of transaction hashes having been delivered direct bool // Whether this is a direct reply or a broadcast } // txDrop is the notiication that a peer has disconnected. type txDrop struct { - peer string + peer enode.ID } // TxFetcher is responsible for retrieving new transaction based on announcements. @@ -153,26 +153,26 @@ type TxFetcher struct { // ID 1: Waiting lists for newly discovered transactions that might be // broadcast without needing explicit request/reply round trips. - waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast - waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist - waitslots map[string]map[common.Hash]struct{} // Waiting announcement sgroupped by peer (DoS protection) + waitlist map[common.Hash]map[enode.ID]struct{} // Transactions waiting for an potential broadcast + waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist + waitslots map[enode.ID]map[common.Hash]struct{} // Waiting announcement sgroupped by peer (DoS protection) // ID 2: Queue of transactions that waiting to be allocated to some peer // to be retrieved directly. - announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer - announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash + announces map[enode.ID]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer + announced map[common.Hash]map[enode.ID]struct{} // Set of download locations, grouped by transaction hash // ID 3: Set of transactions currently being retrieved, some which may be // fulfilled and some rescheduled. Note, this step shares 'announces' from the // previous stage to avoid having to duplicate (need it for DoS checks). - fetching map[common.Hash]string // Transaction set currently being retrieved - requests map[string]*txRequest // In-flight transaction retrievals - alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails + fetching map[common.Hash]enode.ID // Transaction set currently being retrieved + requests map[enode.ID]*txRequest // In-flight transaction retrievals + alternates map[common.Hash]map[enode.ID]struct{} // In-flight transaction alternate origins if retrieval fails // Callbacks - hasTx func(common.Hash) bool // Retrieves a tx from the local txpool - addTxs func([]types.Transaction) []error // Insert a batch of transactions into local txpool - fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer + hasTx func(common.Hash) bool // Retrieves a tx from the local txpool + addTxs func([]types.Transaction) []error // Insert a batch of transactions into local txpool + fetchTxs func(enode.ID, []common.Hash) error // Retrieves a set of txs from a remote peer step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Time wrapper to simulate in tests @@ -181,28 +181,28 @@ type TxFetcher struct { // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. -func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher { +func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]types.Transaction) []error, fetchTxs func(enode.ID, []common.Hash) error) *TxFetcher { return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. func NewTxFetcherForTests( - hasTx func(common.Hash) bool, addTxs func([]types.Transaction) []error, fetchTxs func(string, []common.Hash) error, + hasTx func(common.Hash) bool, addTxs func([]types.Transaction) []error, fetchTxs func(enode.ID, []common.Hash) error, clock mclock.Clock, rand *mrand.Rand) *TxFetcher { return &TxFetcher{ notify: make(chan *txAnnounce), cleanup: make(chan *txDelivery), drop: make(chan *txDrop), quit: make(chan struct{}), - waitlist: make(map[common.Hash]map[string]struct{}), + waitlist: make(map[common.Hash]map[enode.ID]struct{}), waittime: make(map[common.Hash]mclock.AbsTime), - waitslots: make(map[string]map[common.Hash]struct{}), - announces: make(map[string]map[common.Hash]struct{}), - announced: make(map[common.Hash]map[string]struct{}), - fetching: make(map[common.Hash]string), - requests: make(map[string]*txRequest), - alternates: make(map[common.Hash]map[string]struct{}), + waitslots: make(map[enode.ID]map[common.Hash]struct{}), + announces: make(map[enode.ID]map[common.Hash]struct{}), + announced: make(map[common.Hash]map[enode.ID]struct{}), + fetching: make(map[common.Hash]enode.ID), + requests: make(map[enode.ID]*txRequest), + alternates: make(map[common.Hash]map[enode.ID]struct{}), underpriced: mapset.NewSet(), hasTx: hasTx, addTxs: addTxs, @@ -214,7 +214,7 @@ func NewTxFetcherForTests( // Notify announces the fetcher of the potential availability of a new batch of // transactions in the network. -func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { +func (f *TxFetcher) Notify(peer enode.ID, hashes []common.Hash) error { // Keep track of all the announced transactions //txAnnounceInMeter.Mark(int64(len(hashes))) @@ -262,7 +262,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { // and the fetcher. This method may be called by both transaction broadcasts and // direct request replies. The differentiation is important so the fetcher can // re-shedule missing transactions as soon as possible. -func (f *TxFetcher) Enqueue(peer string, txs []types.Transaction, direct bool) error { +func (f *TxFetcher) Enqueue(peer enode.ID, txs []types.Transaction, direct bool) error { // Keep track of all the propagated transactions //if direct { // txReplyInMeter.Mark(int64(len(txs))) @@ -324,7 +324,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []types.Transaction, direct bool) e // Drop should be called when a peer disconnects. It cleans up all the internal // data structures of the given node. -func (f *TxFetcher) Drop(peer string) error { +func (f *TxFetcher) Drop(peer enode.ID) error { select { case f.drop <- &txDrop{peer: peer}: return nil @@ -421,7 +421,7 @@ func (f *TxFetcher) loop() { continue } // Transaction unknown to the fetcher, insert it into the waiting list - f.waitlist[hash] = map[string]struct{}{ann.origin: {}} + f.waitlist[hash] = map[enode.ID]struct{}{ann.origin: {}} f.waittime[hash] = f.clock.Now() if waitslots := f.waitslots[ann.origin]; waitslots != nil { @@ -437,13 +437,13 @@ func (f *TxFetcher) loop() { // If this peer is new and announced something already queued, maybe // request transactions from them if !oldPeer && len(f.announces[ann.origin]) > 0 { - f.scheduleFetches(timeoutTimer, timeoutTrigger, map[string]struct{}{ann.origin: {}}) + f.scheduleFetches(timeoutTimer, timeoutTrigger, map[enode.ID]struct{}{ann.origin: {}}) } case <-waitTrigger: // At least one transaction's waiting time ran out, push all expired // ones into the retrieval queues - actives := make(map[string]struct{}) + actives := make(map[enode.ID]struct{}) for hash, instance := range f.waittime { if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout { // Transaction expired without propagation, schedule for retrieval @@ -751,11 +751,11 @@ func (f *TxFetcher) rescheduleTimeout(timer *mclock.Timer, trigger chan struct{} } // scheduleFetches starts a batch of retrievals for all available idle peers. -func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, whitelist map[string]struct{}) { +func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, whitelist map[enode.ID]struct{}) { // Gather the set of peers we want to retrieve from (default to all) actives := whitelist if actives == nil { - actives = make(map[string]struct{}) + actives = make(map[enode.ID]struct{}) for peer := range f.announces { actives[peer] = struct{}{} } @@ -766,7 +766,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, // For each active peer, try to schedule some transaction fetches idle := len(f.requests) == 0 - f.forEachPeer(actives, func(peer string) { + f.forEachPeer(actives, func(peer enode.ID) { if f.requests[peer] != nil { return // continue in the for-each } @@ -798,7 +798,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, f.requests[peer] = &txRequest{hashes: hashes, time: f.clock.Now()} //txRequestOutMeter.Mark(int64(len(hashes))) - go func(peer string, hashes []common.Hash) { + go func(peer enode.ID, hashes []common.Hash) { // Try to fetch the transactions, but in case of a request // failure (e.g. peer disconnected), reschedule the hashes. if err := f.fetchTxs(peer, hashes); err != nil { @@ -816,7 +816,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, // forEachPeer does a range loop over a map of peers in production, but during // testing it does a deterministic sorted random to allow reproducing issues. -func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string)) { +func (f *TxFetcher) forEachPeer(peers map[enode.ID]struct{}, do func(peer enode.ID)) { // If we're running production, use whatever Go's map gives us if f.rand == nil { for peer := range peers { @@ -825,12 +825,12 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string)) return } // We're running the test suite, make iteration deterministic - list := make([]string, 0, len(peers)) + list := make([]enode.ID, 0, len(peers)) for peer := range peers { list = append(list, peer) } - sort.Strings(list) - rotateStrings(list, f.rand.Intn(len(list))) + sortPeerIDs(list) + rotatePeerIDs(list, f.rand.Intn(len(list))) for _, peer := range list { do(peer) } @@ -862,10 +862,22 @@ func (f *TxFetcher) forEachHash(hashes map[common.Hash]struct{}, do func(hash co } } -// rotateStrings rotates the contents of a slice by n steps. This method is only +// sortPeerIDs sorts a slice of hashes. This method is only used in tests in order +// to simulate random map iteration but keep it deterministic. +func sortPeerIDs(slice []enode.ID) { + for i := 0; i < len(slice); i++ { + for j := i + 1; j < len(slice); j++ { + if bytes.Compare(slice[i][:], slice[j][:]) > 0 { + slice[i], slice[j] = slice[j], slice[i] + } + } + } +} + +// rotatePeerIDs rotates the contents of a slice by n steps. This method is only // used in tests to simulate random map iteration but keep it deterministic. -func rotateStrings(slice []string, n int) { - orig := make([]string, len(slice)) +func rotatePeerIDs(slice []enode.ID, n int) { + orig := make([]enode.ID, len(slice)) copy(orig, slice) for i := 0; i < len(orig); i++ { diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 70ff19c245e07f4a1388467dc5ec00aaec35fd55..859dbab8ba3346366912ef60a97823e78b60053d 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -28,6 +28,7 @@ import ( "github.com/ledgerwatch/erigon/common/mclock" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/p2p/enode" ) var ( @@ -43,11 +44,11 @@ var ( ) type doTxNotify struct { - peer string + peer enode.ID hashes []common.Hash } type doTxEnqueue struct { - peer string + peer enode.ID txs []types.Transaction direct bool } @@ -55,14 +56,14 @@ type doWait struct { time time.Duration step bool } -type doDrop string +type doDrop enode.ID type doFunc func() -type isWaiting map[string][]common.Hash +type isWaiting map[enode.ID][]common.Hash type isScheduled struct { - tracking map[string][]common.Hash - fetching map[string][]common.Hash - dangling map[string][]common.Hash + tracking map[enode.ID][]common.Hash + fetching map[enode.ID][]common.Hash + dangling map[enode.ID][]common.Hash } type isUnderpriced int @@ -81,34 +82,34 @@ func TestTransactionFetcherWaiting(t *testing.T) { return NewTxFetcher( func(common.Hash) bool { return false }, nil, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Initial announcement to get something into the waitlist - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x01}, {0x02}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }), // Announce from a new peer to check that no overwrite happens - doTxNotify{peer: "B", hashes: []common.Hash{{0x03}, {0x04}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, - "B": {{0x03}, {0x04}}, + doTxNotify{peer: enode.ID{'B'}, hashes: []common.Hash{{0x03}, {0x04}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, + {'B'}: {{0x03}, {0x04}}, }), // Announce clashing hashes but unique new peer - doTxNotify{peer: "C", hashes: []common.Hash{{0x01}, {0x04}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}, {0x04}}, + doTxNotify{peer: enode.ID{'C'}, hashes: []common.Hash{{0x01}, {0x04}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, + {'B'}: {{0x03}, {0x04}}, + {'C'}: {{0x01}, {0x04}}, }), // Announce existing and clashing hashes from existing peer - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x03}, {0x05}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x05}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}, {0x04}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x01}, {0x03}, {0x05}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}, {0x03}, {0x05}}, + {'B'}: {{0x03}, {0x04}}, + {'C'}: {{0x01}, {0x04}}, }), isScheduled{tracking: nil, fetching: nil}, @@ -117,46 +118,46 @@ func TestTransactionFetcherWaiting(t *testing.T) { doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x05}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}, {0x04}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}, {0x03}, {0x05}}, + {'B'}: {{0x03}, {0x04}}, + {'C'}: {{0x01}, {0x04}}, }, - fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer - "A": {{0x02}, {0x03}, {0x05}}, - "C": {{0x01}, {0x04}}, + fetching: map[enode.ID][]common.Hash{ // Depends on deterministic test randomizer + {'A'}: {{0x02}, {0x03}, {0x05}}, + {'C'}: {{0x01}, {0x04}}, }, }, // Queue up a non-fetchable transaction and then trigger it with a new // peer (weird case to test 1 line in the fetcher) - doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}}, - isWaiting(map[string][]common.Hash{ - "C": {{0x06}, {0x07}}, + doTxNotify{peer: enode.ID{'C'}, hashes: []common.Hash{{0x06}, {0x07}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'C'}: {{0x06}, {0x07}}, }), doWait{time: txArriveTimeout, step: true}, isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x05}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}, {0x04}, {0x06}, {0x07}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}, {0x03}, {0x05}}, + {'B'}: {{0x03}, {0x04}}, + {'C'}: {{0x01}, {0x04}, {0x06}, {0x07}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x02}, {0x03}, {0x05}}, - "C": {{0x01}, {0x04}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x02}, {0x03}, {0x05}}, + {'C'}: {{0x01}, {0x04}}, }, }, - doTxNotify{peer: "D", hashes: []common.Hash{{0x06}, {0x07}}}, + doTxNotify{peer: enode.ID{'D'}, hashes: []common.Hash{{0x06}, {0x07}}}, isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x05}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}, {0x04}, {0x06}, {0x07}}, - "D": {{0x06}, {0x07}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}, {0x03}, {0x05}}, + {'B'}: {{0x03}, {0x04}}, + {'C'}: {{0x01}, {0x04}, {0x06}, {0x07}}, + {'D'}: {{0x06}, {0x07}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x02}, {0x03}, {0x05}}, - "C": {{0x01}, {0x04}}, - "D": {{0x06}, {0x07}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x02}, {0x03}, {0x05}}, + {'C'}: {{0x01}, {0x04}}, + {'D'}: {{0x06}, {0x07}}, }, }, }, @@ -171,55 +172,55 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) { return NewTxFetcher( func(common.Hash) bool { return false }, nil, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x01}, {0x02}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, }, // Announce overlaps from the same peer, ensure the new ones end up // in stage one, and clashing ones don't get double tracked - doTxNotify{peer: "A", hashes: []common.Hash{{0x02}, {0x03}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x03}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x02}, {0x03}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x03}}, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, }, // Announce overlaps from a new peer, ensure new transactions end up // in stage one and clashing ones get tracked for the new peer - doTxNotify{peer: "B", hashes: []common.Hash{{0x02}, {0x03}, {0x04}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x03}}, - "B": {{0x03}, {0x04}}, + doTxNotify{peer: enode.ID{'B'}, hashes: []common.Hash{{0x02}, {0x03}, {0x04}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x03}}, + {'B'}: {{0x03}, {0x04}}, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, - "B": {{0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, + {'B'}: {{0x02}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, }, }, @@ -234,65 +235,65 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { return NewTxFetcher( func(common.Hash) bool { return false }, nil, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x01}, {0x02}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, }, // Announce a new set of transactions from the same peer and ensure // they do not start fetching since the peer is already busy - doTxNotify{peer: "A", hashes: []common.Hash{{0x03}, {0x04}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x03}, {0x04}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x03}, {0x04}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x03}, {0x04}}, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, }, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x04}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}, {0x03}, {0x04}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, }, // Announce a duplicate set of transactions from a new peer and ensure // uniquely new ones start downloading, even if clashing. - doTxNotify{peer: "B", hashes: []common.Hash{{0x02}, {0x03}, {0x05}, {0x06}}}, - isWaiting(map[string][]common.Hash{ - "B": {{0x05}, {0x06}}, + doTxNotify{peer: enode.ID{'B'}, hashes: []common.Hash{{0x02}, {0x03}, {0x05}, {0x06}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'B'}: {{0x05}, {0x06}}, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x04}}, - "B": {{0x02}, {0x03}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}, {0x03}, {0x04}}, + {'B'}: {{0x02}, {0x03}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, - "B": {{0x03}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, + {'B'}: {{0x03}}, }, }, }, @@ -311,7 +312,7 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) { return NewTxFetcher( func(common.Hash) bool { return false }, nil, - func(origin string, hashes []common.Hash) error { + func(origin enode.ID, hashes []common.Hash) error { <-proceed return errors.New("peer disconnected") }, @@ -319,33 +320,33 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) { }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x01}, {0x02}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, }, // While the original peer is stuck in the request, push in an second // data source. - doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}}, + doTxNotify{peer: enode.ID{'B'}, hashes: []common.Hash{{0x02}}}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, - "B": {{0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, + {'B'}: {{0x02}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, }, // Wait until the original request fails and check that transactions @@ -356,11 +357,11 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) { doWait{time: 0, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "B": {{0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'B'}: {{0x02}}, }, - fetching: map[string][]common.Hash{ - "B": {{0x02}}, + fetching: map[enode.ID][]common.Hash{ + {'B'}: {{0x02}}, }, }, doFunc(func() { @@ -383,29 +384,29 @@ func TestTransactionFetcherCleanup(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0]}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, - fetching: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, }, // Request should be delivered - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[0]}, direct: true}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[0]}, direct: true}, isScheduled{nil, nil, nil}, }, }) @@ -422,29 +423,29 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0]}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, - fetching: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, }, // Deliver an empty response and ensure the transaction is cleared, not rescheduled - doTxEnqueue{peer: "A", txs: []types.Transaction{}, direct: true}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{}, direct: true}, isScheduled{nil, nil, nil}, }, }) @@ -460,36 +461,36 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}, }, - fetching: map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}, }, }, // Deliver the middle transaction requested, the one before which // should be dropped and the one after re-requested. - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[0]}, direct: true}, // This depends on the deterministic random + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[0]}, direct: true}, // This depends on the deterministic random isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[2]}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[2]}, }, - fetching: map[string][]common.Hash{ - "A": {testTxsHashes[2]}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[2]}, }, }, }, @@ -506,30 +507,30 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1]}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0], testTxsHashes[1]}, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1]}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0], testTxsHashes[1]}, }, - fetching: map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1]}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0], testTxsHashes[1]}, }, }, // Deliver the middle transaction requested, the one before which // should be dropped and the one after re-requested. - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[1]}, direct: true}, // This depends on the deterministic random + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[1]}, direct: true}, // This depends on the deterministic random isScheduled{nil, nil, nil}, }, }) @@ -544,42 +545,42 @@ func TestTransactionFetcherBroadcasts(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Set up three transactions to be in different stats, waiting, queued and fetching - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0]}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[1]}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[2]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[2]}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[2]}, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1]}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0], testTxsHashes[1]}, }, - fetching: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, }, // Broadcast all the transactions and ensure everything gets cleaned // up, but the dangling request is left alone to avoid doing multiple // concurrent requests. - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[0], testTxs[1], testTxs[2]}, direct: false}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[0], testTxs[1], testTxs[2]}, direct: false}, isWaiting(nil), isScheduled{ tracking: nil, fetching: nil, - dangling: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + dangling: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, }, // Deliver the requested hashes - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[0], testTxs[1], testTxs[2]}, direct: true}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[0], testTxs[1], testTxs[2]}, direct: true}, isScheduled{nil, nil, nil}, }, }) @@ -592,47 +593,47 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { return NewTxFetcher( func(common.Hash) bool { return false }, nil, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x01}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x01}}, }), isScheduled{nil, nil, nil}, doWait{time: txArriveTimeout / 2, step: false}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x01}}, }), isScheduled{nil, nil, nil}, - doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x02}}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }), isScheduled{nil, nil, nil}, doWait{time: txArriveTimeout / 2, step: true}, - isWaiting(map[string][]common.Hash{ - "A": {{0x02}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x02}}, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}}, }, }, doWait{time: txArriveTimeout / 2, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}}, }, }, }, @@ -649,25 +650,25 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0]}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, - fetching: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, }, // Wait until the delivery times out, everything should be cleaned up @@ -676,31 +677,31 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { isScheduled{ tracking: nil, fetching: nil, - dangling: map[string][]common.Hash{ - "A": {}, + dangling: map[enode.ID][]common.Hash{ + {'A'}: {}, }, }, // Ensure that followup announcements don't get scheduled - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[1]}}, doWait{time: txArriveTimeout, step: true}, isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[1]}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[1]}, }, fetching: nil, - dangling: map[string][]common.Hash{ - "A": {}, + dangling: map[enode.ID][]common.Hash{ + {'A'}: {}, }, }, // If the dangling request arrives a bit later, do not choke - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[0]}, direct: true}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[0]}, direct: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[1]}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[1]}, }, - fetching: map[string][]common.Hash{ - "A": {testTxsHashes[1]}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[1]}, }, }, }, @@ -714,45 +715,45 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) { return NewTxFetcher( func(common.Hash) bool { return false }, nil, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x01}}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}}, + doTxNotify{peer: enode.ID{'B'}, hashes: []common.Hash{{0x02}}}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}}, - "B": {{0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}}, + {'B'}: {{0x02}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}}, - "B": {{0x02}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}}, + {'B'}: {{0x02}}, }, }, doWait{time: txFetchTimeout - txArriveTimeout, step: true}, isScheduled{ - tracking: map[string][]common.Hash{ - "B": {{0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'B'}: {{0x02}}, }, - fetching: map[string][]common.Hash{ - "B": {{0x02}}, + fetching: map[enode.ID][]common.Hash{ + {'B'}: {{0x02}}, }, - dangling: map[string][]common.Hash{ - "A": {}, + dangling: map[enode.ID][]common.Hash{ + {'A'}: {}, }, }, doWait{time: txArriveTimeout, step: true}, isScheduled{ tracking: nil, fetching: nil, - dangling: map[string][]common.Hash{ - "A": {}, - "B": {}, + dangling: map[enode.ID][]common.Hash{ + {'A'}: {}, + {'B'}: {}, }, }, }, @@ -773,21 +774,21 @@ func TestTransactionFetcherRateLimiting(t *testing.T) { return NewTxFetcher( func(common.Hash) bool { return false }, nil, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Announce all the transactions, wait a bit and ensure only a small // percentage gets requested - doTxNotify{peer: "A", hashes: hashes}, + doTxNotify{peer: enode.ID{'A'}, hashes: hashes}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": hashes, + tracking: map[enode.ID][]common.Hash{ + {'A'}: hashes, }, - fetching: map[string][]common.Hash{ - "A": hashes[1643 : 1643+maxTxRetrievals], + fetching: map[enode.ID][]common.Hash{ + {'A'}: hashes[1643 : 1643+maxTxRetrievals], }, }, }, @@ -811,50 +812,50 @@ func TestTransactionFetcherDoSProtection(t *testing.T) { return NewTxFetcher( func(common.Hash) bool { return false }, nil, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Announce half of the transaction and wait for them to be scheduled - doTxNotify{peer: "A", hashes: hashesA[:maxTxAnnounces/2]}, - doTxNotify{peer: "B", hashes: hashesB[:maxTxAnnounces/2-1]}, + doTxNotify{peer: enode.ID{'A'}, hashes: hashesA[:maxTxAnnounces/2]}, + doTxNotify{peer: enode.ID{'B'}, hashes: hashesB[:maxTxAnnounces/2-1]}, doWait{time: txArriveTimeout, step: true}, // Announce the second half and keep them in the wait list - doTxNotify{peer: "A", hashes: hashesA[maxTxAnnounces/2 : maxTxAnnounces]}, - doTxNotify{peer: "B", hashes: hashesB[maxTxAnnounces/2-1 : maxTxAnnounces-1]}, + doTxNotify{peer: enode.ID{'A'}, hashes: hashesA[maxTxAnnounces/2 : maxTxAnnounces]}, + doTxNotify{peer: enode.ID{'B'}, hashes: hashesB[maxTxAnnounces/2-1 : maxTxAnnounces-1]}, // Ensure the hashes are split half and half - isWaiting(map[string][]common.Hash{ - "A": hashesA[maxTxAnnounces/2 : maxTxAnnounces], - "B": hashesB[maxTxAnnounces/2-1 : maxTxAnnounces-1], + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: hashesA[maxTxAnnounces/2 : maxTxAnnounces], + {'B'}: hashesB[maxTxAnnounces/2-1 : maxTxAnnounces-1], }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": hashesA[:maxTxAnnounces/2], - "B": hashesB[:maxTxAnnounces/2-1], + tracking: map[enode.ID][]common.Hash{ + {'A'}: hashesA[:maxTxAnnounces/2], + {'B'}: hashesB[:maxTxAnnounces/2-1], }, - fetching: map[string][]common.Hash{ - "A": hashesA[1643 : 1643+maxTxRetrievals], - "B": append(append([]common.Hash{}, hashesB[maxTxAnnounces/2-3:maxTxAnnounces/2-1]...), hashesB[:maxTxRetrievals-2]...), + fetching: map[enode.ID][]common.Hash{ + {'A'}: hashesA[1643 : 1643+maxTxRetrievals], + {'B'}: append(append([]common.Hash{}, hashesB[maxTxAnnounces/2-3:maxTxAnnounces/2-1]...), hashesB[:maxTxRetrievals-2]...), }, }, // Ensure that adding even one more hash results in dropping the hash - doTxNotify{peer: "A", hashes: []common.Hash{hashesA[maxTxAnnounces]}}, - doTxNotify{peer: "B", hashes: hashesB[maxTxAnnounces-1 : maxTxAnnounces+1]}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{hashesA[maxTxAnnounces]}}, + doTxNotify{peer: enode.ID{'B'}, hashes: hashesB[maxTxAnnounces-1 : maxTxAnnounces+1]}, - isWaiting(map[string][]common.Hash{ - "A": hashesA[maxTxAnnounces/2 : maxTxAnnounces], - "B": hashesB[maxTxAnnounces/2-1 : maxTxAnnounces], + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: hashesA[maxTxAnnounces/2 : maxTxAnnounces], + {'B'}: hashesB[maxTxAnnounces/2-1 : maxTxAnnounces], }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": hashesA[:maxTxAnnounces/2], - "B": hashesB[:maxTxAnnounces/2-1], + tracking: map[enode.ID][]common.Hash{ + {'A'}: hashesA[:maxTxAnnounces/2], + {'B'}: hashesB[:maxTxAnnounces/2-1], }, - fetching: map[string][]common.Hash{ - "A": hashesA[1643 : 1643+maxTxRetrievals], - "B": append(append([]common.Hash{}, hashesB[maxTxAnnounces/2-3:maxTxAnnounces/2-1]...), hashesB[:maxTxRetrievals-2]...), + fetching: map[enode.ID][]common.Hash{ + {'A'}: hashesA[1643 : 1643+maxTxRetrievals], + {'B'}: append(append([]common.Hash{}, hashesB[maxTxAnnounces/2-3:maxTxAnnounces/2-1]...), hashesB[:maxTxRetrievals-2]...), }, }, }, @@ -878,20 +879,20 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) { } return errs }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Deliver a transaction through the fetcher, but reject as underpriced - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}}, doWait{time: txArriveTimeout, step: true}, - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[0], testTxs[1]}, direct: true}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[0], testTxs[1]}, direct: true}, isScheduled{nil, nil, nil}, // Try to announce the transaction again, ensure it's not scheduled back - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}}, // [2] is needed to force a step in the fetcher - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[2]}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}}, // [2] is needed to force a step in the fetcher + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[2]}, }), isScheduled{nil, nil, nil}, }, @@ -918,20 +919,20 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { // Generate a set of steps to announce and deliver the entire set of transactions var steps []interface{} for i := 0; i < maxTxUnderpricedSetSize/maxTxRetrievals; i++ { - steps = append(steps, doTxNotify{peer: "A", hashes: hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals]}) - steps = append(steps, isWaiting(map[string][]common.Hash{ - "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], + steps = append(steps, doTxNotify{peer: enode.ID{'A'}, hashes: hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals]}) + steps = append(steps, isWaiting(map[enode.ID][]common.Hash{ + {'A'}: hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], })) steps = append(steps, doWait{time: txArriveTimeout, step: true}) steps = append(steps, isScheduled{ - tracking: map[string][]common.Hash{ - "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], + tracking: map[enode.ID][]common.Hash{ + {'A'}: hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], }, - fetching: map[string][]common.Hash{ - "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], + fetching: map[enode.ID][]common.Hash{ + {'A'}: hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], }, }) - steps = append(steps, doTxEnqueue{peer: "A", txs: txs[i*maxTxRetrievals : (i+1)*maxTxRetrievals], direct: true}) + steps = append(steps, doTxEnqueue{peer: enode.ID{'A'}, txs: txs[i*maxTxRetrievals : (i+1)*maxTxRetrievals], direct: true}) steps = append(steps, isWaiting(nil)) steps = append(steps, isScheduled{nil, nil, nil}) steps = append(steps, isUnderpriced((i+1)*maxTxRetrievals)) @@ -947,14 +948,14 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { } return errs }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: append(steps, []interface{}{ // The preparation of the test has already been done in `steps`, add the last check - doTxNotify{peer: "A", hashes: []common.Hash{hashes[maxTxUnderpricedSetSize]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{hashes[maxTxUnderpricedSetSize]}}, doWait{time: txArriveTimeout, step: true}, - doTxEnqueue{peer: "A", txs: []types.Transaction{txs[maxTxUnderpricedSetSize]}, direct: true}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{txs[maxTxUnderpricedSetSize]}, direct: true}, isUnderpriced(maxTxUnderpricedSetSize), }...), }) @@ -969,43 +970,43 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Deliver something out of the blue isWaiting(nil), isScheduled{nil, nil, nil}, - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[0]}, direct: false}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[0]}, direct: false}, isWaiting(nil), isScheduled{nil, nil, nil}, // Set up a few hashes into various stages - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0]}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[1]}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[2]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[2]}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[2]}, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1]}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0], testTxsHashes[1]}, }, - fetching: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, }, // Deliver everything and more out of the blue - doTxEnqueue{peer: "B", txs: []types.Transaction{testTxs[0], testTxs[1], testTxs[2], testTxs[3]}, direct: true}, + doTxEnqueue{peer: enode.ID{'B'}, txs: []types.Transaction{testTxs[0], testTxs[1], testTxs[2], testTxs[3]}, direct: true}, isWaiting(nil), isScheduled{ tracking: nil, fetching: nil, - dangling: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + dangling: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, }, }, @@ -1022,43 +1023,43 @@ func TestTransactionFetcherDrop(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Set up a few hashes into various stages - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x01}}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x02}}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{{0x03}}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x03}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x03}}, + isWaiting(map[enode.ID][]common.Hash{ + {'A'}: {{0x03}}, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}, {0x02}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}}, }, }, // Drop the peer and ensure everything's cleaned out - doDrop("A"), + doDrop(enode.ID{'A'}), isWaiting(nil), isScheduled{nil, nil, nil}, // Push the node into a dangling (timeout) state - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0]}}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, - fetching: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {testTxsHashes[0]}, }, }, doWait{time: txFetchTimeout, step: true}, @@ -1066,12 +1067,12 @@ func TestTransactionFetcherDrop(t *testing.T) { isScheduled{ tracking: nil, fetching: nil, - dangling: map[string][]common.Hash{ - "A": {}, + dangling: map[enode.ID][]common.Hash{ + {'A'}: {}, }, }, // Drop the peer and ensure everything's cleaned out - doDrop("A"), + doDrop(enode.ID{'A'}), isWaiting(nil), isScheduled{nil, nil, nil}, }, @@ -1088,34 +1089,34 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Set up a few hashes into various stages - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{{0x01}}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "B", hashes: []common.Hash{{0x01}}}, + doTxNotify{peer: enode.ID{'B'}, hashes: []common.Hash{{0x01}}}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}}, - "B": {{0x01}}, + tracking: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}}, + {'B'}: {{0x01}}, }, - fetching: map[string][]common.Hash{ - "A": {{0x01}}, + fetching: map[enode.ID][]common.Hash{ + {'A'}: {{0x01}}, }, }, // Drop the peer and ensure everything's cleaned out - doDrop("A"), + doDrop(enode.ID{'A'}), isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "B": {{0x01}}, + tracking: map[enode.ID][]common.Hash{ + {'B'}: {{0x01}}, }, - fetching: map[string][]common.Hash{ - "B": {{0x01}}, + fetching: map[enode.ID][]common.Hash{ + {'B'}: {{0x01}}, }, }, }, @@ -1133,17 +1134,17 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0]}}, doWait{time: txArriveTimeout, step: true}, - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[0]}}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more and crash via a timeout - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0]}}, doWait{time: txFetchTimeout, step: true}, }, }) @@ -1160,19 +1161,19 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0]}}, doWait{time: txArriveTimeout, step: true}, - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[0]}}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more, re-fetch, and crash via a drop and timeout - doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: enode.ID{'B'}, hashes: []common.Hash{testTxsHashes[0]}}, doWait{time: txArriveTimeout, step: true}, - doDrop("A"), + doDrop(enode.ID{'A'}), doWait{time: txFetchTimeout, step: true}, }, }) @@ -1189,20 +1190,20 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, ) }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}}, doWait{time: txFetchTimeout, step: true}, - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[0], testTxs[1]}}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[0], testTxs[1]}}, // Notify the dangling transaction once more, partially deliver, clash&crash with a timeout - doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: enode.ID{'B'}, hashes: []common.Hash{testTxsHashes[0]}}, doWait{time: txArriveTimeout, step: true}, - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[1]}, direct: true}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[1]}, direct: true}, doWait{time: txFetchTimeout, step: true}, }, }) @@ -1222,7 +1223,7 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { + func(enode.ID, []common.Hash) error { <-proceed return errors.New("peer disconnected") }, @@ -1230,12 +1231,12 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) { }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: enode.ID{'A'}, hashes: []common.Hash{testTxsHashes[0]}}, doWait{time: txArriveTimeout, step: true}, - doTxEnqueue{peer: "A", txs: []types.Transaction{testTxs[0]}}, + doTxEnqueue{peer: enode.ID{'A'}, txs: []types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more, re-fetch, and crash via an in-flight disconnect - doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: enode.ID{'B'}, hashes: []common.Hash{testTxsHashes[0]}}, doWait{time: txArriveTimeout, step: true}, doFunc(func() { proceed <- struct{}{} // Allow peer A to return the failure @@ -1291,7 +1292,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { } case doDrop: - if err := fetcher.Drop(string(step)); err != nil { + if err := fetcher.Drop(enode.ID(step)); err != nil { t.Errorf("step %d: %v", i, err) } <-wait // Fetcher needs to process this, wait until it's done diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 2ea885e4131c4652815b9c52ec196afd2948cf4f..f52067abba126fcb0bf97f117bd78c909d7a95e1 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -112,8 +112,8 @@ func MakeProtocols(backend Backend, readNodeInfo func() *NodeInfo, dnsdisc enode NodeInfo: func() interface{} { return readNodeInfo() }, - PeerInfo: func(id enode.ID) interface{} { - return backend.PeerInfo(id) + PeerInfo: func(peerID enode.ID) interface{} { + return backend.PeerInfo(peerID) }, Attributes: []enr.Entry{CurrentENREntry(chainConfig, genesisHash, headHeight)}, DialCandidates: dnsdisc, diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index 9ec9cc920c3f11c7e8ddc9ea51257e41a464088d..c067dce27a9aa83f9f3992d813198139cf640a33 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -12,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/adapter" "github.com/ledgerwatch/erigon/turbo/stages/bodydownload" @@ -22,7 +23,7 @@ import ( type BodiesCfg struct { db kv.RwDB bd *bodydownload.BodyDownload - bodyReqSend func(context.Context, *bodydownload.BodyRequest) []byte + bodyReqSend func(context.Context, *bodydownload.BodyRequest) (enode.ID, bool) penalise func(context.Context, []headerdownload.PenaltyItem) blockPropagator adapter.BlockPropagator timeout int @@ -33,7 +34,7 @@ type BodiesCfg struct { func StageBodiesCfg( db kv.RwDB, bd *bodydownload.BodyDownload, - bodyReqSend func(context.Context, *bodydownload.BodyRequest) []byte, + bodyReqSend func(context.Context, *bodydownload.BodyRequest) (enode.ID, bool), penalise func(context.Context, []headerdownload.PenaltyItem), blockPropagator adapter.BlockPropagator, timeout int, @@ -102,7 +103,8 @@ func BodiesForward( timer := time.NewTimer(1 * time.Second) // Check periodically even in the abseence of incoming messages var blockNum uint64 var req *bodydownload.BodyRequest - var peer []byte + var peer enode.ID + var sentToPeer bool stopped := false prevProgress := bodyProgress noProgressCount := 0 // How many time the progress was printed without actual progress @@ -118,19 +120,20 @@ Loop: } d1 += time.Since(start) } - peer = nil + peer = enode.ID{} + sentToPeer = false if req != nil { start := time.Now() - peer = cfg.bodyReqSend(ctx, req) + peer, sentToPeer = cfg.bodyReqSend(ctx, req) d2 += time.Since(start) } - if req != nil && peer != nil { + if req != nil && sentToPeer { start := time.Now() currentTime := uint64(time.Now().Unix()) cfg.bd.RequestSent(req, currentTime+uint64(timeout), peer) d3 += time.Since(start) } - for req != nil && peer != nil { + for req != nil && sentToPeer { start := time.Now() currentTime := uint64(time.Now().Unix()) req, blockNum, err = cfg.bd.RequestMoreBodies(tx, blockNum, currentTime, cfg.blockPropagator) @@ -138,13 +141,14 @@ Loop: return fmt.Errorf("request more bodies: %w", err) } d1 += time.Since(start) - peer = nil + peer = enode.ID{} + sentToPeer = false if req != nil { start = time.Now() - peer = cfg.bodyReqSend(ctx, req) + peer, sentToPeer = cfg.bodyReqSend(ctx, req) d2 += time.Since(start) } - if req != nil && peer != nil { + if req != nil && sentToPeer { start = time.Now() cfg.bd.RequestSent(req, currentTime+uint64(timeout), peer) d3 += time.Since(start) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 862c4d7e27ae1e038e7866942a4fef57a4035c2f..7cd935336c339b2eed06fc5226d8106ea2e9021b 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -16,6 +16,7 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" @@ -27,7 +28,7 @@ type HeadersCfg struct { db kv.RwDB hd *headerdownload.HeaderDownload chainConfig params.ChainConfig - headerReqSend func(context.Context, *headerdownload.HeaderRequest) []byte + headerReqSend func(context.Context, *headerdownload.HeaderRequest) (enode.ID, bool) announceNewHashes func(context.Context, []headerdownload.Announce) penalize func(context.Context, []headerdownload.PenaltyItem) batchSize datasize.ByteSize @@ -38,7 +39,7 @@ func StageHeadersCfg( db kv.RwDB, headerDownload *headerdownload.HeaderDownload, chainConfig params.ChainConfig, - headerReqSend func(context.Context, *headerdownload.HeaderRequest) []byte, + headerReqSend func(context.Context, *headerdownload.HeaderRequest) (enode.ID, bool), announceNewHashes func(context.Context, []headerdownload.Announce), penalize func(context.Context, []headerdownload.PenaltyItem), batchSize datasize.ByteSize, @@ -117,7 +118,7 @@ func HeadersForward( headerInserter := headerdownload.NewHeaderInserter(logPrefix, localTd, headerProgress) cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx}) - var peer []byte + var sentToPeer bool stopped := false prevProgress := headerProgress Loop: @@ -134,19 +135,19 @@ Loop: currentTime := uint64(time.Now().Unix()) req, penalties := cfg.hd.RequestMoreHeaders(currentTime) if req != nil { - peer = cfg.headerReqSend(ctx, req) - if peer != nil { + _, sentToPeer = cfg.headerReqSend(ctx, req) + if sentToPeer { cfg.hd.SentRequest(req, currentTime, 5 /* timeout */) log.Trace("Sent request", "height", req.Number) } } cfg.penalize(ctx, penalties) maxRequests := 64 // Limit number of requests sent per round to let some headers to be inserted into the database - for req != nil && peer != nil && maxRequests > 0 { + for req != nil && sentToPeer && maxRequests > 0 { req, penalties = cfg.hd.RequestMoreHeaders(currentTime) if req != nil { - peer = cfg.headerReqSend(ctx, req) - if peer != nil { + _, sentToPeer = cfg.headerReqSend(ctx, req) + if sentToPeer { cfg.hd.SentRequest(req, currentTime, 5 /*timeout */) log.Trace("Sent request", "height", req.Number) } @@ -158,8 +159,8 @@ Loop: // Send skeleton request if required req = cfg.hd.RequestSkeleton() if req != nil { - peer = cfg.headerReqSend(ctx, req) - if peer != nil { + _, sentToPeer = cfg.headerReqSend(ctx, req) + if sentToPeer { log.Trace("Sent skeleton request", "height", req.Number) } } diff --git a/go.mod b/go.mod index 648452151502350e506e1bb11896aa9b1f993cd3..41159e3cf90a61a8c45dfc6b622ab49472ae0a26 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible - github.com/ledgerwatch/erigon-lib v0.0.0-20211121101848-2959036be0cd + github.com/ledgerwatch/erigon-lib v0.0.0-20211121193559-d8870b19c307 github.com/ledgerwatch/log/v3 v3.4.0 github.com/ledgerwatch/secp256k1 v1.0.0 github.com/logrusorgru/aurora/v3 v3.0.0 diff --git a/go.sum b/go.sum index e16b6dcf02068b7304e68102ff5b0af729cd699a..40484fb7e96496c93a8f184a5b70bcf3872333f8 100644 --- a/go.sum +++ b/go.sum @@ -598,6 +598,8 @@ github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7 github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/ledgerwatch/erigon-lib v0.0.0-20211121101848-2959036be0cd h1:mpt4BOqt2jBYfHM6XmJVEp71X6PyXnvJOaeeMqg+ujY= github.com/ledgerwatch/erigon-lib v0.0.0-20211121101848-2959036be0cd/go.mod h1:CuEZROm43MykZT5CjCj02jw0FOwaDl8Nh+PZkTEGopg= +github.com/ledgerwatch/erigon-lib v0.0.0-20211121193559-d8870b19c307 h1:qR/7ZpY455sHnU73BXOUWx5zMEbNrdiqDA/i53Hya+8= +github.com/ledgerwatch/erigon-lib v0.0.0-20211121193559-d8870b19c307/go.mod h1:CuEZROm43MykZT5CjCj02jw0FOwaDl8Nh+PZkTEGopg= github.com/ledgerwatch/log/v3 v3.4.0 h1:SEIOcv5a2zkG3PmoT5jeTU9m/0nEUv0BJS5bzsjwKCI= github.com/ledgerwatch/log/v3 v3.4.0/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ= @@ -621,6 +623,7 @@ github.com/marten-seemann/qtls v0.2.3/go.mod h1:xzjG7avBwGGbdZ8dTGxlBnLArsVKLvwm github.com/marten-seemann/qtls v0.10.0/go.mod h1:UvMd1oaYDACI99/oZUYLzMCkBXQVT0aGm99sJhbT8hs= github.com/marten-seemann/qtls-go1-15 v0.1.0/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= github.com/marten-seemann/qtls-go1-15 v0.1.1/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= +github.com/matryer/moq v0.2.3 h1:Q06vEqnBYjjfx5KKgHfYRKE/lvlRu+Nj+xodG4YdHnU= github.com/matryer/moq v0.2.3/go.mod h1:9RtPYjTnH1bSBIkpvtHkFN7nbWAnO7oRpdJkEIn6UtE= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= diff --git a/p2p/peer.go b/p2p/peer.go index 5eae9296956b12e6b9198ca30c217681d3669eda..501f4fa6dec067a6754c57d2018bc96745404722 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -64,7 +64,7 @@ type protoHandshake struct { Name string Caps []Cap ListenPort uint64 - ID []byte // secp256k1 public key + Pubkey []byte // secp256k1 public key // Ignore additional fields (for forward compatibility). Rest []rlp.RawValue `rlp:"tail"` @@ -130,7 +130,7 @@ func NewPeer(id enode.ID, name string, caps []Cap) *Peer { return peer } -// ID returns the node's public key. +// ID returns the node's unique identifier. func (p *Peer) ID() enode.ID { return p.rw.node.ID() } diff --git a/p2p/protocol.go b/p2p/protocol.go index 44c8c7555e1829697406a7bbed3902ac85f3ad20..8dfb20c4e0693b40f8d4e9a5e87e787c2d0b9488 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -52,7 +52,7 @@ type Protocol struct { // PeerInfo is an optional helper method to retrieve protocol specific metadata // about a certain peer in the network. If an info retrieval function is set, // but returns nil, it is assumed that the protocol handshake is still running. - PeerInfo func(id enode.ID) interface{} + PeerInfo func(peerID enode.ID) interface{} // DialCandidates, if non-nil, is a way to tell Server about protocol-specific nodes // that should be dialed. The server continuously reads nodes from the iterator and diff --git a/p2p/server.go b/p2p/server.go index 3ab28b2c67b02b340839ba2e887f9328d1d65b82..2277ed4a3e4ce79e3f6976d256eb2bba9382c9d5 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -510,7 +510,7 @@ func (srv *Server) Start() error { func (srv *Server) setupLocalNode() error { // Create the devp2p handshake. pubkey := crypto.FromECDSAPub(&srv.PrivateKey.PublicKey) - srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: pubkey[1:]} + srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, Pubkey: pubkey[1:]} for _, p := range srv.Protocols { srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap()) } @@ -1012,8 +1012,8 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro clog.Trace("Failed p2p handshake", "err", err) return err } - if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) { - clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID)) + if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.Pubkey), id[:]) { + clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.Pubkey)) return DiscUnexpectedIdentity } c.caps, c.name = phs.Caps, phs.Name @@ -1094,7 +1094,7 @@ func (srv *Server) runPeer(p *Peer) { // NodeInfo represents a short summary of the information known about the host. type NodeInfo struct { - ID string `json:"id"` // Unique node identifier (also the encryption key) + ID string `json:"id"` // Unique node identifier Name string `json:"name"` // Name of the node, including client type, version, OS, custom data Enode string `json:"enode"` // Enode URL for adding this peer from remote peers ENR string `json:"enr"` // Ethereum Node Record diff --git a/p2p/server_test.go b/p2p/server_test.go index 15c519486daa6537939b7139270e8a195e627ea1..79fc55d3ee8cc28a4a93bb8c5818888121fb90e2 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -58,7 +58,7 @@ func (c *testTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, func (c *testTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake, error) { pubkey := crypto.FromECDSAPub(c.rpub)[1:] - return &protoHandshake{ID: pubkey, Name: "test"}, nil + return &protoHandshake{Pubkey: pubkey, Name: "test"}, nil } func (c *testTransport) close(err error) { @@ -312,7 +312,7 @@ func TestServerPeerLimits(t *testing.T) { var tp = &setupTransport{ pubkey: &clientkey.PublicKey, phs: protoHandshake{ - ID: crypto.FromECDSAPub(&clientkey.PublicKey)[1:], + Pubkey: crypto.FromECDSAPub(&clientkey.PublicKey)[1:], // Force "DiscUselessPeer" due to unmatching caps // Caps: []Cap{discard.cap()}, }, @@ -397,7 +397,7 @@ func TestServerSetupConn(t *testing.T) { wantCloseErr: errors.New("read error"), }, { - tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{ID: randomID().Bytes()}}, + tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{Pubkey: randomID().Bytes()}}, dialDest: enode.NewV4(clientpub, nil, 0, 0), flags: dynDialedConn, wantCalls: "doEncHandshake,doProtoHandshake,close,", @@ -411,13 +411,13 @@ func TestServerSetupConn(t *testing.T) { wantCloseErr: errors.New("foo"), }, { - tt: &setupTransport{pubkey: srvpub, phs: protoHandshake{ID: crypto.FromECDSAPub(srvpub)[1:]}}, + tt: &setupTransport{pubkey: srvpub, phs: protoHandshake{Pubkey: crypto.FromECDSAPub(srvpub)[1:]}}, flags: inboundConn, wantCalls: "doEncHandshake,close,", wantCloseErr: DiscSelf, }, { - tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{ID: crypto.FromECDSAPub(clientpub)[1:]}}, + tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{Pubkey: crypto.FromECDSAPub(clientpub)[1:]}}, flags: inboundConn, wantCalls: "doEncHandshake,doProtoHandshake,close,", wantCloseErr: DiscUselessPeer, diff --git a/p2p/simulations/test.go b/p2p/simulations/test.go index e2160a3cf32ba8c610889e25b749989714a549a5..396352084c2153553c595983cca41a48e6b89c68 100644 --- a/p2p/simulations/test.go +++ b/p2p/simulations/test.go @@ -54,7 +54,7 @@ func (t *NoopService) Protocols() []p2p.Protocol { NodeInfo: func() interface{} { return struct{}{} }, - PeerInfo: func(id enode.ID) interface{} { + PeerInfo: func(peerID enode.ID) interface{} { return struct{}{} }, Attributes: []enr.Entry{}, diff --git a/p2p/transport.go b/p2p/transport.go index 9b28c1d686eee5f52ed532770eb3c7b524b6337c..0190b688925198eee6c1343028e804944ac68987 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -184,7 +184,7 @@ func readProtocolHandshake(rw MsgReader) (*protoHandshake, error) { if err := msg.Decode(&hs); err != nil { return nil, err } - if len(hs.ID) != 64 || !bitutil.TestBytes(hs.ID) { + if len(hs.Pubkey) != 64 || !bitutil.TestBytes(hs.Pubkey) { return nil, DiscInvalidIdentity } return &hs, nil diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 136a74ba2b6b073e4cb92a79e2b5f41bd6a7aa3f..a11d03aac8db25ea3b5f4654396eeb915b959991 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -31,11 +31,11 @@ func TestProtocolHandshake(t *testing.T) { var ( prv0, _ = crypto.GenerateKey() pub0 = crypto.FromECDSAPub(&prv0.PublicKey)[1:] - hs0 = &protoHandshake{Version: 3, ID: pub0, Caps: []Cap{{"a", 0}, {"b", 2}}} + hs0 = &protoHandshake{Version: 3, Pubkey: pub0, Caps: []Cap{{"a", 0}, {"b", 2}}} prv1, _ = crypto.GenerateKey() pub1 = crypto.FromECDSAPub(&prv1.PublicKey)[1:] - hs1 = &protoHandshake{Version: 3, ID: pub1, Caps: []Cap{{"c", 1}, {"d", 3}}} + hs1 = &protoHandshake{Version: 3, Pubkey: pub1, Caps: []Cap{{"c", 1}, {"d", 3}}} wg sync.WaitGroup ) diff --git a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go index e4ab68ed75b3bcd75aa6c08d233817dcd07e9742..a5e1e642f1e3630b0a06edfdfced4d6a7f877c22 100644 --- a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go +++ b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go @@ -28,10 +28,11 @@ import ( "github.com/ledgerwatch/erigon/common/mclock" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/fetcher" + "github.com/ledgerwatch/erigon/p2p/enode" ) var ( - peers []string + peers []enode.ID txs []types.Transaction ) @@ -39,9 +40,9 @@ func init() { // Random is nice, but we need it deterministic rand := rand.New(rand.NewSource(0x3a29)) - peers = make([]string, 10) + peers = make([]enode.ID, 10) for i := 0; i < len(peers); i++ { - peers[i] = fmt.Sprintf("Peer #%d", i) + peers[i] = enode.ID{byte(i)} } txs = make([]types.Transaction, 65536) // We need to bump enough to hit all the limits for i := 0; i < len(txs); i++ { @@ -83,7 +84,7 @@ func Fuzz(input []byte) int { func(txs []types.Transaction) []error { return make([]error, len(txs)) }, - func(string, []common.Hash) error { return nil }, + func(enode.ID, []common.Hash) error { return nil }, clock, rand, ) f.Start() diff --git a/turbo/stages/bodydownload/body_algos.go b/turbo/stages/bodydownload/body_algos.go index 9eebc2d5f8bbc2a2e59e7400baf4ace5c2300755..44082725a36ef799a4c920b73ac6f4af8f363120 100644 --- a/turbo/stages/bodydownload/body_algos.go +++ b/turbo/stages/bodydownload/body_algos.go @@ -14,6 +14,7 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/turbo/adapter" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" "github.com/ledgerwatch/log/v3" @@ -26,7 +27,7 @@ const BlockBufferSize = 128 // validated at this point. // It returns 2 errors - first is Validation error (reason to penalize peer and continue processing other // bodies), second is internal runtime error (like network error or db error) -type VerifyUnclesFunc func(peerID string, header *types.Header, uncles []*types.Header) error +type VerifyUnclesFunc func(peerID enode.ID, header *types.Header, uncles []*types.Header) error // UpdateFromDb reads the state of the database and refreshes the state of the body download func (bd *BodyDownload) UpdateFromDb(db kv.Tx) (headHeight uint64, headHash common.Hash, headTd256 *uint256.Int, err error) { @@ -53,7 +54,7 @@ func (bd *BodyDownload) UpdateFromDb(db kv.Tx) (headHeight uint64, headHash comm bd.deliveriesB[i] = nil bd.requests[i] = nil } - bd.peerMap = make(map[string]int) + bd.peerMap = make(map[enode.ID]int) headHeight = bodyProgress headHash, err = rawdb.ReadCanonicalHash(db, headHeight) if err != nil { @@ -98,7 +99,7 @@ func (bd *BodyDownload) RequestMoreBodies(db kv.Tx, blockNum uint64, currentTime if currentTime < req.waitUntil { continue } - bd.peerMap[string(req.peerID)]++ + bd.peerMap[req.peerID]++ bd.requests[blockNum-bd.requestedLow] = nil } var hash common.Hash @@ -175,7 +176,7 @@ func (bd *BodyDownload) RequestMoreBodies(db kv.Tx, blockNum uint64, currentTime return bodyReq, blockNum, nil } -func (bd *BodyDownload) RequestSent(bodyReq *BodyRequest, timeWithTimeout uint64, peer []byte) { +func (bd *BodyDownload) RequestSent(bodyReq *BodyRequest, timeWithTimeout uint64, peer enode.ID) { for _, blockNum := range bodyReq.BlockNums { if blockNum < bd.requestedLow { continue @@ -189,7 +190,7 @@ func (bd *BodyDownload) RequestSent(bodyReq *BodyRequest, timeWithTimeout uint64 } // DeliverBodies takes the block body received from a peer and adds it to the various data structures -func (bd *BodyDownload) DeliverBodies(txs [][][]byte, uncles [][]*types.Header, lenOfP2PMsg uint64, peerID string) { +func (bd *BodyDownload) DeliverBodies(txs [][][]byte, uncles [][]*types.Header, lenOfP2PMsg uint64, peerID enode.ID) { bd.deliveryCh <- Delivery{txs: txs, uncles: uncles, lenOfP2PMessage: lenOfP2PMsg, peerID: peerID} select { @@ -341,11 +342,11 @@ func (bd *BodyDownload) DeliveryCounts() (float64, float64) { return bd.deliveredCount, bd.wastedCount } -func (bd *BodyDownload) GetPenaltyPeers() [][]byte { - peers := make([][]byte, len(bd.peerMap)) +func (bd *BodyDownload) GetPenaltyPeers() []enode.ID { + peers := make([]enode.ID, len(bd.peerMap)) i := 0 for p := range bd.peerMap { - peers[i] = []byte(p) + peers[i] = p i++ } return peers @@ -357,7 +358,7 @@ func (bd *BodyDownload) PrintPeerMap() { fmt.Printf("%s = %d\n", p, n) } fmt.Printf("---------------------------\n") - bd.peerMap = make(map[string]int) + bd.peerMap = make(map[enode.ID]int) } func (bd *BodyDownload) AddToPrefetch(block *types.Block) { diff --git a/turbo/stages/bodydownload/body_data_struct.go b/turbo/stages/bodydownload/body_data_struct.go index c206e8445cdd7773a4b9378b75a1ef572da2daa5..fb622d07e24896eda9523e22b843fa56e5b569a1 100644 --- a/turbo/stages/bodydownload/body_data_struct.go +++ b/turbo/stages/bodydownload/body_data_struct.go @@ -5,6 +5,7 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/p2p/enode" ) // DoubleHash is type to be used for the mapping between TxHash and UncleHash to the block header @@ -13,7 +14,7 @@ type DoubleHash [2 * common.HashLength]byte const MaxBodiesInRequest = 1024 type Delivery struct { - peerID string + peerID enode.ID txs [][][]byte uncles [][]*types.Header lenOfP2PMessage uint64 @@ -21,7 +22,7 @@ type Delivery struct { // BodyDownload represents the state of body downloading process type BodyDownload struct { - peerMap map[string]int + peerMap map[enode.ID]int requestedMap map[DoubleHash]uint64 DeliveryNotify chan struct{} deliveryCh chan Delivery @@ -44,7 +45,7 @@ type BodyDownload struct { type BodyRequest struct { BlockNums []uint64 Hashes []common.Hash - peerID []byte + peerID enode.ID waitUntil uint64 } @@ -57,7 +58,7 @@ func NewBodyDownload(outstandingLimit int, engine consensus.Engine) *BodyDownloa deliveriesH: make([]*types.Header, outstandingLimit+MaxBodiesInRequest), deliveriesB: make([]*types.RawBody, outstandingLimit+MaxBodiesInRequest), requests: make([]*BodyRequest, outstandingLimit+MaxBodiesInRequest), - peerMap: make(map[string]int), + peerMap: make(map[enode.ID]int), prefetchedBlocks: NewPrefetchedBlocks(), // DeliveryNotify has capacity 1, and it is also used so that senders never block // This makes this channel a mailbox with no more than one letter in it, meaning diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 3dcbdb8276e5272a33bf8ca4111afc8337af9d78..0b5ce43ba3ccf78db97356ce512586da86c78952 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -21,6 +21,7 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/log/v3" @@ -322,7 +323,7 @@ func (hd *HeaderDownload) removeAnchor(segment *ChainSegment, start int) error { } // if anchor will be abandoned - given peerID will get Penalty -func (hd *HeaderDownload) newAnchor(segment *ChainSegment, start, end int, peerID string) (bool, error) { +func (hd *HeaderDownload) newAnchor(segment *ChainSegment, start, end int, peerID enode.ID) (bool, error) { anchorHeader := segment.Headers[end-1] var anchor *Anchor @@ -859,7 +860,7 @@ func (hi *HeaderInserter) BestHeaderChanged() bool { // it allows higher-level algo immediately request more headers without waiting all stages precessing, // speeds up visibility of new blocks // It remember peerID - then later - if anchors created from segments will abandoned - this peerID gonna get Penalty -func (hd *HeaderDownload) ProcessSegment(segment *ChainSegment, newBlock bool, peerID string) (requestMore bool, penalties []PenaltyItem) { +func (hd *HeaderDownload) ProcessSegment(segment *ChainSegment, newBlock bool, peerID enode.ID) (requestMore bool, penalties []PenaltyItem) { log.Trace("processSegment", "from", segment.Headers[0].Number.Uint64(), "to", segment.Headers[len(segment.Headers)-1].Number.Uint64()) hd.lock.Lock() defer hd.lock.Unlock() @@ -1009,8 +1010,10 @@ func (hd *HeaderDownload) AddMinedBlock(block *types.Block) error { return err } + peerID := enode.ID{'m', 'i', 'n', 'e', 'r'} // "miner" + for _, segment := range segments { - _, _ = hd.ProcessSegment(segment, false /* newBlock */, "miner") + _, _ = hd.ProcessSegment(segment, false /* newBlock */, peerID) } return nil } diff --git a/turbo/stages/headerdownload/header_data_struct.go b/turbo/stages/headerdownload/header_data_struct.go index bd22fc679666d295c832a9a8c9ebc6c5786ba10a..149d9c7635ce6b9abcb50740ec706061c03248f6 100644 --- a/turbo/stages/headerdownload/header_data_struct.go +++ b/turbo/stages/headerdownload/header_data_struct.go @@ -10,6 +10,7 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/p2p/enode" ) // Link is a chain link that can be connect to other chain links @@ -74,7 +75,7 @@ func (lq *LinkQueue) Pop() interface{} { } type Anchor struct { - peerID string + peerID enode.ID links []*Link // Links attached immediately to this anchor parentHash common.Hash // Hash of the header this anchor can be connected to (to disappear) blockHeight uint64 @@ -157,7 +158,7 @@ type HeaderRequest struct { } type PenaltyItem struct { - PeerID string + PeerID enode.ID Penalty Penalty } type Announce struct { diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 36b5d6847e910362b79c21d15355744b51f36380..04432db94613eef0ae0636727917326bd1d4229c 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -35,6 +35,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/erigon/ethdb/prune" + "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/shards" @@ -62,7 +63,7 @@ type MockSentry struct { Key *ecdsa.PrivateKey Genesis *types.Block SentryClient direct.SentryClient - PeerId *ptypes.H512 + PeerId *ptypes.H256 UpdateHead func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int) streams map[proto_sentry.MessageId][]proto_sentry.Sentry_MessagesServer sentMessages []*proto_sentry.OutboundMessageData @@ -190,7 +191,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey }, UpdateHead: func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int) { }, - PeerId: gointerfaces.ConvertBytesToH512([]byte("12345")), + PeerId: gointerfaces.ConvertHashToH256([32]byte{0x12, 0x34, 0x50}), // "12345" } if t != nil { t.Cleanup(mock.Close) @@ -199,7 +200,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey mock.Address = crypto.PubkeyToAddress(mock.Key.PublicKey) var err error - sendHeaderRequest := func(_ context.Context, r *headerdownload.HeaderRequest) []byte { return nil } + sendHeaderRequest := func(_ context.Context, r *headerdownload.HeaderRequest) (enode.ID, bool) { return enode.ID{}, false } propagateNewBlockHashes := func(context.Context, []headerdownload.Announce) {} penalize := func(context.Context, []headerdownload.PenaltyItem) {} cfg := ethconfig.Defaults @@ -213,7 +214,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey mock.SentryClient = direct.NewSentryClientDirect(eth.ETH66, mock) sentries := []direct.SentryClient{mock.SentryClient} - sendBodyRequest := func(context.Context, *bodydownload.BodyRequest) []byte { return nil } + sendBodyRequest := func(context.Context, *bodydownload.BodyRequest) (enode.ID, bool) { return enode.ID{}, false } blockPropagator := func(Ctx context.Context, block *types.Block, td *big.Int) {} if !cfg.TxPool.Disable { diff --git a/turbo/stages/txpropagate/deprecated.go b/turbo/stages/txpropagate/deprecated.go index 54c8df56ed09e7f8bd3db5334d853801fecf1c7d..99ed73ce1528cfa2c7060c2b92304a7172bddf98 100644 --- a/turbo/stages/txpropagate/deprecated.go +++ b/turbo/stages/txpropagate/deprecated.go @@ -70,7 +70,7 @@ func BroadcastPendingTxsToNetwork(ctx context.Context, txPool *core.TxPool, rece type RecentlyConnectedPeers struct { lock sync.RWMutex - peers []*types.H512 + peers []*types.H256 } func (l *RecentlyConnectedPeers) Len() int { @@ -78,12 +78,12 @@ func (l *RecentlyConnectedPeers) Len() int { defer l.lock.RUnlock() return len(l.peers) } -func (l *RecentlyConnectedPeers) AddPeer(p *types.H512) { +func (l *RecentlyConnectedPeers) AddPeer(p *types.H256) { l.lock.Lock() defer l.lock.Unlock() l.peers = append(l.peers, p) } -func (l *RecentlyConnectedPeers) GetAndClean() []*types.H512 { +func (l *RecentlyConnectedPeers) GetAndClean() []*types.H256 { l.lock.Lock() defer l.lock.Unlock() peers := l.peers diff --git a/turbo/txpool/p2p.go b/turbo/txpool/p2p.go index fa173335b3c1c4fd8c3d24d1fa0d83d84078f7dc..179adde755e6ff9fa2fc297ed0b5c03b8f176cb0 100644 --- a/turbo/txpool/p2p.go +++ b/turbo/txpool/p2p.go @@ -15,10 +15,12 @@ import ( "github.com/ledgerwatch/erigon-lib/direct" "github.com/ledgerwatch/erigon-lib/gointerfaces" proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + "github.com/ledgerwatch/erigon/cmd/sentry/download" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/eth/fetcher" "github.com/ledgerwatch/erigon/eth/protocols/eth" + "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/stages/txpropagate" "github.com/ledgerwatch/log/v3" @@ -53,7 +55,7 @@ func (tp *P2PServer) newPooledTransactionHashes66(ctx context.Context, inreq *pr if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding newPooledTransactionHashes66: %w, data: %x", err, inreq.Data) } - return tp.TxFetcher.Notify(string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query) + return tp.TxFetcher.Notify(download.ConvertH256ToPeerID(inreq.PeerId), query) } func (tp *P2PServer) newPooledTransactionHashes65(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { @@ -61,7 +63,7 @@ func (tp *P2PServer) newPooledTransactionHashes65(ctx context.Context, inreq *pr if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding newPooledTransactionHashes65: %w, data: %x", err, inreq.Data) } - return tp.TxFetcher.Notify(string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query) + return tp.TxFetcher.Notify(download.ConvertH256ToPeerID(inreq.PeerId), query) } func (tp *P2PServer) pooledTransactions66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { @@ -70,7 +72,7 @@ func (tp *P2PServer) pooledTransactions66(ctx context.Context, inreq *proto_sent return fmt.Errorf("decoding pooledTransactions66: %w, data: %x", err, inreq.Data) } - return tp.TxFetcher.Enqueue(string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), txs.PooledTransactionsPacket, true) + return tp.TxFetcher.Enqueue(download.ConvertH256ToPeerID(inreq.PeerId), txs.PooledTransactionsPacket, true) } func (tp *P2PServer) pooledTransactions65(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { @@ -79,7 +81,7 @@ func (tp *P2PServer) pooledTransactions65(ctx context.Context, inreq *proto_sent return fmt.Errorf("decoding pooledTransactions65: %w, data: %x", err, inreq.Data) } - return tp.TxFetcher.Enqueue(string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), *txs, true) + return tp.TxFetcher.Enqueue(download.ConvertH256ToPeerID(inreq.PeerId), *txs, true) } func (tp *P2PServer) transactions66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { @@ -94,7 +96,7 @@ func (tp *P2PServer) transactions65(ctx context.Context, inreq *proto_sentry.Inb if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding TransactionsPacket: %w, data: %x", err, inreq.Data) } - return tp.TxFetcher.Enqueue(string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query, false) + return tp.TxFetcher.Enqueue(download.ConvertH256ToPeerID(inreq.PeerId), query, false) } func (tp *P2PServer) getPooledTransactions66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { @@ -150,7 +152,7 @@ func (tp *P2PServer) getPooledTransactions65(ctx context.Context, inreq *proto_s return nil } -func (tp *P2PServer) SendTxsRequest(ctx context.Context, peerID string, hashes []common.Hash) []byte { +func (tp *P2PServer) SendTxsRequest(ctx context.Context, peerID enode.ID, hashes []common.Hash) (_ enode.ID, ok bool) { var outreq66 *proto_sentry.SendMessageByIdRequest // if sentry not found peers to send such message, try next one. stop if found. @@ -168,11 +170,11 @@ func (tp *P2PServer) SendTxsRequest(ctx context.Context, peerID string, hashes [ }) if err != nil { log.Error("Could not encode transactions request", "err", err) - return nil + return enode.ID{}, false } outreq66 = &proto_sentry.SendMessageByIdRequest{ - PeerId: gointerfaces.ConvertBytesToH512([]byte(peerID)), + PeerId: gointerfaces.ConvertHashToH256(peerID), Data: &proto_sentry.OutboundMessageData{Id: proto_sentry.MessageId_GET_POOLED_TRANSACTIONS_66, Data: data66}, } } @@ -184,11 +186,11 @@ func (tp *P2PServer) SendTxsRequest(ctx context.Context, peerID string, hashes [ log.Error("[SendTxsRequest]", "err", err1) } else if sentPeers != nil && len(sentPeers.Peers) != 0 { - return gointerfaces.ConvertH512ToBytes(sentPeers.Peers[0]) + return download.ConvertH256ToPeerID(sentPeers.Peers[0]), true } } } - return nil + return enode.ID{}, false } func (tp *P2PServer) randSentryIndex() (int, bool, func() (int, bool)) {