diff --git a/p2p/discover/common.go b/p2p/discover/common.go index 6e234ed8be9f1fd620715f8cad9a6abc74678e78..6ee5c4c0bd19dd3f75406906a808f3e5f01335e8 100644 --- a/p2p/discover/common.go +++ b/p2p/discover/common.go @@ -56,9 +56,11 @@ type Config struct { PingBackDelay time.Duration PrivateKeyGenerator func() (*ecdsa.PrivateKey, error) + + TableRevalidateInterval time.Duration } -func (cfg Config) withDefaults() Config { +func (cfg Config) withDefaults(defaultReplyTimeout time.Duration) Config { if cfg.Log == nil { cfg.Log = log.Root() } @@ -69,7 +71,7 @@ func (cfg Config) withDefaults() Config { cfg.Clock = mclock.System{} } if cfg.ReplyTimeout == 0 { - cfg.ReplyTimeout = respTimeout + cfg.ReplyTimeout = defaultReplyTimeout } if cfg.PingBackDelay == 0 { cfg.PingBackDelay = respTimeout @@ -77,6 +79,9 @@ func (cfg Config) withDefaults() Config { if cfg.PrivateKeyGenerator == nil { cfg.PrivateKeyGenerator = crypto.GenerateKey } + if cfg.TableRevalidateInterval == 0 { + cfg.TableRevalidateInterval = revalidateInterval + } return cfg } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index e70c47c3edfd11544929cf93f8fb5c3201f331f9..77e36a4c8cc392cdf15b8f121063473aefa3d91a 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -55,7 +55,7 @@ const ( tableIPLimit, tableSubnet = 10, 24 refreshInterval = 30 * time.Minute - revalidateInterval = 10 * time.Second + revalidateInterval = 5 * time.Second copyNodesInterval = 30 * time.Second seedMinTableTime = 5 * time.Minute seedCount = 30 @@ -72,6 +72,8 @@ type Table struct { rand *mrand.Rand // source of randomness, periodically reseeded ips netutil.DistinctNetSet + revalidateInterval time.Duration + log log.Logger db *enode.DB // database of known nodes net transport @@ -100,7 +102,13 @@ type bucket struct { ips netutil.DistinctNetSet } -func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, logger log.Logger) (*Table, error) { +func newTable( + t transport, + db *enode.DB, + bootnodes []*enode.Node, + revalidateInterval time.Duration, + logger log.Logger, +) (*Table, error) { tab := &Table{ net: t, db: db, @@ -110,7 +118,10 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, logger log.Log closed: make(chan struct{}), rand: mrand.New(mrand.NewSource(0)), ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, - log: logger, + + revalidateInterval: revalidateInterval, + + log: logger, } if err := tab.setFallbackNodes(bootnodes); err != nil { return nil, err @@ -218,7 +229,7 @@ func (tab *Table) refresh() <-chan struct{} { // loop schedules runs of doRefresh, doRevalidate and copyLiveNodes. func (tab *Table) loop() { var ( - revalidate = time.NewTimer(tab.nextRevalidateTime()) + revalidate = time.NewTimer(tab.revalidateInterval) refresh = time.NewTicker(refreshInterval) copyNodes = time.NewTicker(copyNodesInterval) refreshDone = make(chan struct{}) // where doRefresh reports completion @@ -257,7 +268,7 @@ loop: revalidateDone = make(chan struct{}) go tab.doRevalidate(revalidateDone) case <-revalidateDone: - revalidate.Reset(tab.nextRevalidateTime()) + revalidate.Reset(tab.revalidateInterval) revalidateDone = nil case <-copyNodes.C: go tab.copyLiveNodes() @@ -373,13 +384,6 @@ func (tab *Table) nodeToRevalidate() (n *node, bi int) { return nil, 0 } -func (tab *Table) nextRevalidateTime() time.Duration { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - return time.Duration(tab.rand.Int63n(int64(revalidateInterval))) -} - // copyLiveNodes adds nodes from the table to the database if they have been in the table // longer than seedMinTableTime. func (tab *Table) copyLiveNodes() { diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 3cd03cf384fea651bd338566f4e94d7eea2063c6..c618f04de851f0e61d3715629442593fc90f5587 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -26,6 +26,7 @@ import ( "net" "sort" "sync" + "time" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/p2p/enode" @@ -46,7 +47,7 @@ func newTestTable(t transport) (*Table, *enode.DB) { if err != nil { panic(err) } - tab, _ := newTable(t, db, nil, log.Root()) + tab, _ := newTable(t, db, nil, time.Hour, log.Root()) go tab.loop() return tab, db } @@ -60,9 +61,16 @@ func nodeAtDistance(base enode.ID, ld int, ip net.IP) *node { // nodesAtDistance creates n nodes for which enode.LogDist(base, node.ID()) == ld. func nodesAtDistance(base enode.ID, ld int, n int) []*enode.Node { - results := make([]*enode.Node, n) - for i := range results { - results[i] = unwrapNode(nodeAtDistance(base, ld, intIP(i))) + results := make([]*enode.Node, 0, n) + nodeSet := make(map[enode.ID]bool, n) + for len(results) < n { + node := unwrapNode(nodeAtDistance(base, ld, intIP(len(results)+1))) + // idAtDistance might return an ID that's already generated + // make sure that the node has a unique ID, otherwise regenerate + if !nodeSet[node.ID()] { + nodeSet[node.ID()] = true + results = append(results, node) + } } return results } diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index fec6b6b29d0934bc06d4c44d4fa8fe48cdf9941c..38687292d3fc1b5ccccdfaa969706893456e463e 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -131,7 +131,7 @@ type reply struct { } func ListenV4(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { - cfg = cfg.withDefaults() + cfg = cfg.withDefaults(respTimeout) closeCtx, cancel := context.WithCancel(ctx) t := &UDPv4{ conn: c, @@ -150,7 +150,7 @@ func ListenV4(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) ( privateKeyGenerator: cfg.PrivateKeyGenerator, } - tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log) + tab, err := newTable(t, ln.Database(), cfg.Bootnodes, cfg.TableRevalidateInterval, cfg.Log) if err != nil { return nil, err } diff --git a/p2p/discover/v4_udp_test.go b/p2p/discover/v4_udp_test.go index 79769d4b2e8081b1820430b46440f8b36ef16857..09cea62e6e9d6d3e809ad3bccadbfdb0170ddf3d 100644 --- a/p2p/discover/v4_udp_test.go +++ b/p2p/discover/v4_udp_test.go @@ -95,6 +95,8 @@ func newUDPTestContext(ctx context.Context, t *testing.T) *udpTest { PingBackDelay: time.Nanosecond, PrivateKeyGenerator: contextGetPrivateKeyGenerator(ctx), + + TableRevalidateInterval: time.Hour, }) if err != nil { panic(err) diff --git a/p2p/discover/v5_lookup_test.go b/p2p/discover/v5_lookup_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b168ad94e01b521c066bd58fbb28f47e4357cfb8 --- /dev/null +++ b/p2p/discover/v5_lookup_test.go @@ -0,0 +1,120 @@ +//go:build integration +// +build integration + +package discover + +import ( + "math/rand" + "net" + "runtime" + "sort" + "testing" + + "github.com/ledgerwatch/erigon/p2p/discover/v5wire" + "github.com/ledgerwatch/erigon/p2p/enode" +) + +// This test checks that lookup works. +func TestUDPv5_lookup(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("fix me on win please") + } + t.Parallel() + test := newUDPV5Test(t) + t.Cleanup(test.close) + + // Lookup on empty table returns no nodes. + if results := test.udp.Lookup(lookupTestnet.target.ID()); len(results) > 0 { + t.Fatalf("lookup on empty table returned %d results: %#v", len(results), results) + } + + // Ensure the tester knows all nodes in lookupTestnet by IP. + for d, nn := range lookupTestnet.dists { + for i, key := range nn { + n := lookupTestnet.node(d, i) + test.getNode(key, &net.UDPAddr{IP: n.IP(), Port: n.UDP()}) + } + } + + // Seed table with initial node. + initialNode := lookupTestnet.node(256, 0) + fillTable(test.table, []*node{wrapNode(initialNode)}) + + // Start the lookup. + resultC := make(chan []*enode.Node, 1) + go func() { + resultC <- test.udp.Lookup(lookupTestnet.target.ID()) + test.close() + }() + + // Answer lookup packets. + asked := make(map[enode.ID]bool) + for done := false; !done; { + done = test.waitPacketOut(func(p v5wire.Packet, to *net.UDPAddr, _ v5wire.Nonce) { + recipient, key := lookupTestnet.nodeByAddr(to) + switch p := p.(type) { + case *v5wire.Ping: + test.packetInFrom(key, to, &v5wire.Pong{ReqID: p.ReqID}) + case *v5wire.Findnode: + if asked[recipient.ID()] { + t.Error("Asked node", recipient.ID(), "twice") + } + asked[recipient.ID()] = true + nodes := lookupTestnet.neighborsAtDistances(recipient, p.Distances, 16) + t.Logf("Got FINDNODE for %v, returning %d nodes", p.Distances, len(nodes)) + for _, resp := range packNodes(p.ReqID, nodes) { + test.packetInFrom(key, to, resp) + } + } + }) + } + + // Verify result nodes. + results := <-resultC + checkLookupResults(t, lookupTestnet, results) +} + +// Real sockets, real crypto: this test checks end-to-end connectivity for UDPv5. +func TestUDPv5_lookupE2E(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("fix me on win please") + } + t.Parallel() + + bootNode := startLocalhostV5(t, Config{}) + bootNodeRec := bootNode.Self() + + const N = 5 + nodes := []*UDPv5{bootNode} + for len(nodes) < N { + cfg := Config{ + Bootnodes: []*enode.Node{bootNodeRec}, + } + node := startLocalhostV5(t, cfg) + nodes = append(nodes, node) + } + + defer func() { + for _, node := range nodes { + node.Close() + } + }() + + last := nodes[N-1] + target := nodes[rand.Intn(N-2)].Self() + + // It is expected that all nodes can be found. + expectedResult := make([]*enode.Node, len(nodes)) + for i := range nodes { + expectedResult[i] = nodes[i].Self() + } + sort.Slice(expectedResult, func(i, j int) bool { + return enode.DistCmp(target.ID(), expectedResult[i].ID(), expectedResult[j].ID()) < 0 + }) + + // Do the lookup. + results := last.Lookup(target.ID()) + if err := checkNodesEqual(results, expectedResult); err != nil { + t.Fatalf("lookup returned wrong results: %v", err) + } +} diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 7ccbb08ce35bf625c662bc72892a8f1460fa69c2..f27a075dc7d3a134253bc2c813442da7c34f019b 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -84,6 +84,7 @@ type UDPv5 struct { callCh chan *callV5 callDoneCh chan *callV5 respTimeoutCh chan *callTimeout + replyTimeout time.Duration // state of dispatch codec codecV5 @@ -139,7 +140,7 @@ func ListenV5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config // newUDPv5 creates a UDPv5 transport, but doesn't start any goroutines. func newUDPv5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { closeCtx, cancelCloseCtx := context.WithCancel(ctx) - cfg = cfg.withDefaults() + cfg = cfg.withDefaults(respTimeoutV5) t := &UDPv5{ // static fields conn: conn, @@ -157,6 +158,7 @@ func newUDPv5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config callCh: make(chan *callV5), callDoneCh: make(chan *callV5), respTimeoutCh: make(chan *callTimeout), + replyTimeout: cfg.ReplyTimeout, // state of dispatch codec: v5wire.NewCodec(ln, cfg.PrivateKey, cfg.Clock), activeCallByNode: make(map[enode.ID]*callV5), @@ -166,7 +168,7 @@ func newUDPv5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config closeCtx: closeCtx, cancelCloseCtx: cancelCloseCtx, } - tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.Log) + tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.TableRevalidateInterval, cfg.Log) if err != nil { return nil, err } @@ -550,7 +552,7 @@ func (t *UDPv5) startResponseTimeout(c *callV5) { timer mclock.Timer done = make(chan struct{}) ) - timer = t.clock.AfterFunc(respTimeoutV5, func() { + timer = t.clock.AfterFunc(t.replyTimeout, func() { <-done select { case t.respTimeoutCh <- &callTimeout{c, timer}: @@ -828,7 +830,7 @@ func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes { return []*v5wire.Nodes{{ReqID: reqid, Total: 1}} } - total := uint8(math.Ceil(float64(len(nodes)) / 3)) + total := uint8(math.Ceil(float64(len(nodes)) / nodesResponseItemLimit)) var resp []*v5wire.Nodes for len(nodes) > 0 { p := &v5wire.Nodes{ReqID: reqid, Total: total} diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index 5c1282a52a4830258615c5a2b7688cbd3a94a606..6b4b2fbcc601f690b95b4dc44e9fef9248fb0dcf 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -23,11 +23,9 @@ import ( "encoding/binary" "errors" "fmt" - "math/rand" "net" "reflect" "runtime" - "sort" "testing" "time" @@ -40,54 +38,9 @@ import ( "github.com/ledgerwatch/erigon/rlp" ) -// Real sockets, real crypto: this test checks end-to-end connectivity for UDPv5. -func TestUDPv5_lookupE2E(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("fix me on win please") - } - t.Parallel() - - bootNode := startLocalhostV5(t, Config{}) - bootNodeRec := bootNode.Self() - - const N = 5 - nodes := []*UDPv5{bootNode} - for len(nodes) < N { - cfg := Config{ - Bootnodes: []*enode.Node{bootNodeRec}, - } - node := startLocalhostV5(t, cfg) - nodes = append(nodes, node) - } - - defer func() { - for _, node := range nodes { - node.Close() - } - }() - - last := nodes[N-1] - target := nodes[rand.Intn(N-2)].Self() - - // It is expected that all nodes can be found. - expectedResult := make([]*enode.Node, len(nodes)) - for i := range nodes { - expectedResult[i] = nodes[i].Self() - } - sort.Slice(expectedResult, func(i, j int) bool { - return enode.DistCmp(target.ID(), expectedResult[i].ID(), expectedResult[j].ID()) < 0 - }) - - // Do the lookup. - results := last.Lookup(target.ID()) - if err := checkNodesEqual(results, expectedResult); err != nil { - t.Fatalf("lookup returned wrong results: %v", err) - } -} - func startLocalhostV5(t *testing.T, cfg Config) *UDPv5 { cfg.PrivateKey = newkey() - db, err := enode.OpenDB(t.TempDir()) + db, err := enode.OpenDB("") if err != nil { panic(err) } @@ -442,14 +395,28 @@ func TestUDPv5_callTimeoutReset(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() - test := newUDPV5Test(t) + + replyTimeout := 120 * time.Millisecond + // This must be significantly lower than replyTimeout to not get "RPC timeout" error. + singleReplyDelay := replyTimeout / (totalNodesResponseLimit - 1) + if singleReplyDelay*totalNodesResponseLimit < replyTimeout { + t.Fatalf("The total delay of all replies must exceed an individual reply timeout.") + } + if replyTimeout-singleReplyDelay < 50*time.Millisecond { + t.Errorf("50ms is sometimes not enough on a slow CI to process a reply.") + } + + ctx := context.Background() + ctx = contextWithReplyTimeout(ctx, replyTimeout) + + test := newUDPV5TestContext(ctx, t) t.Cleanup(test.close) // Launch the request: var ( distance = uint(230) remote = test.getNode(test.remotekey, test.remoteaddr).Node() - nodes = nodesAtDistance(remote.ID(), int(distance), 8) + nodes = nodesAtDistance(remote.ID(), int(distance), totalNodesResponseLimit) done = make(chan error, 1) ) go func() { @@ -459,19 +426,14 @@ func TestUDPv5_callTimeoutReset(t *testing.T) { // Serve two responses, slowly. test.waitPacketOut(func(p *v5wire.Findnode, addr *net.UDPAddr, _ v5wire.Nonce) { - time.Sleep(respTimeout - 50*time.Millisecond) - test.packetIn(&v5wire.Nodes{ - ReqID: p.ReqID, - Total: 2, - Nodes: nodesToRecords(nodes[:4]), - }) - - time.Sleep(respTimeout - 50*time.Millisecond) - test.packetIn(&v5wire.Nodes{ - ReqID: p.ReqID, - Total: 2, - Nodes: nodesToRecords(nodes[4:]), - }) + for i := 0; i < totalNodesResponseLimit; i++ { + time.Sleep(singleReplyDelay) + test.packetIn(&v5wire.Nodes{ + ReqID: p.ReqID, + Total: totalNodesResponseLimit, + Nodes: nodesToRecords(nodes[i : i+1]), + }) + } }) if err := <-done; err != nil { t.Fatalf("unexpected error: %q", err) @@ -575,66 +537,6 @@ func TestUDPv5_talkRequest(t *testing.T) { } } -// This test checks that lookup works. -func TestUDPv5_lookup(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("fix me on win please") - } - t.Parallel() - test := newUDPV5Test(t) - t.Cleanup(test.close) - - // Lookup on empty table returns no nodes. - if results := test.udp.Lookup(lookupTestnet.target.ID()); len(results) > 0 { - t.Fatalf("lookup on empty table returned %d results: %#v", len(results), results) - } - - // Ensure the tester knows all nodes in lookupTestnet by IP. - for d, nn := range lookupTestnet.dists { - for i, key := range nn { - n := lookupTestnet.node(d, i) - test.getNode(key, &net.UDPAddr{IP: n.IP(), Port: n.UDP()}) - } - } - - // Seed table with initial node. - initialNode := lookupTestnet.node(256, 0) - fillTable(test.table, []*node{wrapNode(initialNode)}) - - // Start the lookup. - resultC := make(chan []*enode.Node, 1) - go func() { - resultC <- test.udp.Lookup(lookupTestnet.target.ID()) - test.close() - }() - - // Answer lookup packets. - asked := make(map[enode.ID]bool) - for done := false; !done; { - done = test.waitPacketOut(func(p v5wire.Packet, to *net.UDPAddr, _ v5wire.Nonce) { - recipient, key := lookupTestnet.nodeByAddr(to) - switch p := p.(type) { - case *v5wire.Ping: - test.packetInFrom(key, to, &v5wire.Pong{ReqID: p.ReqID}) - case *v5wire.Findnode: - if asked[recipient.ID()] { - t.Error("Asked node", recipient.ID(), "twice") - } - asked[recipient.ID()] = true - nodes := lookupTestnet.neighborsAtDistances(recipient, p.Distances, 16) - t.Logf("Got FINDNODE for %v, returning %d nodes", p.Distances, len(nodes)) - for _, resp := range packNodes(p.ReqID, nodes) { - test.packetInFrom(key, to, resp) - } - } - }) - } - - // Verify result nodes. - results := <-resultC - checkLookupResults(t, lookupTestnet, results) -} - // This test checks the local node can be utilised to set key-values. func TestUDPv5_LocalNode(t *testing.T) { if runtime.GOOS == "windows" { @@ -729,6 +631,17 @@ func (c *testCodec) decodeFrame(input []byte) (frame testCodecFrame, p v5wire.Pa } func newUDPV5Test(t *testing.T) *udpV5Test { + return newUDPV5TestContext(context.Background(), t) +} + +func newUDPV5TestContext(ctx context.Context, t *testing.T) *udpV5Test { + ctx = disableLookupSlowdown(ctx) + + replyTimeout := contextGetReplyTimeout(ctx) + if replyTimeout == 0 { + replyTimeout = 50 * time.Millisecond + } + test := &udpV5Test{ t: t, pipe: newpipe(), @@ -740,7 +653,7 @@ func newUDPV5Test(t *testing.T) *udpV5Test { } t.Cleanup(test.close) var err error - test.db, err = enode.OpenDB(test.t.TempDir()) + test.db, err = enode.OpenDB("") if err != nil { panic(err) } @@ -748,12 +661,13 @@ func newUDPV5Test(t *testing.T) *udpV5Test { ln := enode.NewLocalNode(test.db, test.localkey) ln.SetStaticIP(net.IP{10, 0, 0, 1}) ln.Set(enr.UDP(30303)) - ctx := context.Background() - ctx = disableLookupSlowdown(ctx) test.udp, err = ListenV5(ctx, test.pipe, ln, Config{ PrivateKey: test.localkey, Log: testlog.Logger(t, log.LvlError), ValidSchemes: enode.ValidSchemesForTesting, + ReplyTimeout: replyTimeout, + + TableRevalidateInterval: time.Hour, }) if err != nil { panic(err) @@ -792,7 +706,7 @@ func (test *udpV5Test) getNode(key *ecdsa.PrivateKey, addr *net.UDPAddr) *enode. id := enode.PubkeyToIDV4(&key.PublicKey) ln := test.nodesByID[id] if ln == nil { - db, err := enode.OpenDB(test.t.TempDir()) + db, err := enode.OpenDB("") if err != nil { panic(err) } @@ -826,6 +740,12 @@ func (test *udpV5Test) waitPacketOut(validate interface{}) (closed bool) { } ln := test.nodesByIP[string(dgram.to.IP)] if ln == nil { + _, _, packet, err := test.udp.codec.Decode(dgram.data, test.pipe.LocalAddr().String()) + if err != nil { + test.t.Errorf("failed to decode a UDP packet: %v", err) + } else { + test.t.Errorf("attempt to send UDP packet: %v", packet.Name()) + } test.t.Fatalf("attempt to send to non-existing node %v", &dgram.to) return false } @@ -853,7 +773,23 @@ func (test *udpV5Test) close() { n.Database().Close() } } - if len(test.pipe.queue) != 0 { - test.t.Fatalf("%d unmatched UDP packets in queue", len(test.pipe.queue)) + + unmatchedCount := len(test.pipe.queue) + if (unmatchedCount > 0) && !test.t.Failed() { + test.t.Errorf("%d unmatched UDP packets in queue", unmatchedCount) + + for len(test.pipe.queue) > 0 { + dgram, err := test.pipe.receive() + if err != nil { + test.t.Errorf("Failed to receive remaining UDP packets: %v", err) + break + } + _, _, packet, err := test.udp.codec.Decode(dgram.data, test.pipe.LocalAddr().String()) + if err != nil { + test.t.Errorf("Failed to decode a remaining UDP packet: %v", err) + } else { + test.t.Errorf("Remaining UDP packet: %v", packet.Name()) + } + } } }