good morning!!!!

Skip to content
Snippets Groups Projects
Unverified Commit 8ff98108 authored by rene's avatar rene Committed by GitHub
Browse files

cmd/devp2p: fix flakey tests in CI (#22757)

This PR fixes a couple of issues in the eth test suite that caused flakiness when run in the CI.
parent afc1abd8
No related branches found
No related tags found
No related merge requests found
...@@ -217,17 +217,7 @@ func (s *Suite) TestLargeAnnounce_66(t *utesting.T) { ...@@ -217,17 +217,7 @@ func (s *Suite) TestLargeAnnounce_66(t *utesting.T) {
sendConn.Close() sendConn.Close()
} }
// Test the last block as a valid block // Test the last block as a valid block
sendConn, receiveConn := s.setupConnection66(t), s.setupConnection66(t) s.sendNextBlock66(t)
defer sendConn.Close()
defer receiveConn.Close()
s.testAnnounce66(t, sendConn, receiveConn, blocks[3])
// update test suite chain
s.chain.blocks = append(s.chain.blocks, s.fullChain.blocks[nextBlock])
// wait for client to update its chain
if err := receiveConn.waitForBlock66(s.fullChain.blocks[nextBlock]); err != nil {
t.Fatal(err)
}
} }
func (s *Suite) TestOldAnnounce_66(t *utesting.T) { func (s *Suite) TestOldAnnounce_66(t *utesting.T) {
......
...@@ -229,8 +229,11 @@ func (s *Suite) waitAnnounce66(t *utesting.T, conn *Conn, blockAnnouncement *New ...@@ -229,8 +229,11 @@ func (s *Suite) waitAnnounce66(t *utesting.T, conn *Conn, blockAnnouncement *New
func (c *Conn) waitForBlock66(block *types.Block) error { func (c *Conn) waitForBlock66(block *types.Block) error {
defer c.SetReadDeadline(time.Time{}) defer c.SetReadDeadline(time.Time{})
timeout := time.Now().Add(20 * time.Second) c.SetReadDeadline(time.Now().Add(20 * time.Second))
c.SetReadDeadline(timeout) // note: if the node has not yet imported the block, it will respond
// to the GetBlockHeaders request with an empty BlockHeaders response,
// so the GetBlockHeaders request must be sent again until the BlockHeaders
// response contains the desired header.
for { for {
req := eth.GetBlockHeadersPacket66{ req := eth.GetBlockHeadersPacket66{
RequestId: 54, RequestId: 54,
...@@ -253,9 +256,11 @@ func (c *Conn) waitForBlock66(block *types.Block) error { ...@@ -253,9 +256,11 @@ func (c *Conn) waitForBlock66(block *types.Block) error {
if reqID != req.RequestId { if reqID != req.RequestId {
return fmt.Errorf("request ID mismatch: wanted %d, got %d", req.RequestId, reqID) return fmt.Errorf("request ID mismatch: wanted %d, got %d", req.RequestId, reqID)
} }
if len(msg) > 0 { for _, header := range msg {
if header.Number.Uint64() == block.NumberU64() {
return nil return nil
} }
}
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
case *NewPooledTransactionHashes: case *NewPooledTransactionHashes:
// ignore old announcements // ignore old announcements
...@@ -319,10 +324,10 @@ func (s *Suite) sendNextBlock66(t *utesting.T) { ...@@ -319,10 +324,10 @@ func (s *Suite) sendNextBlock66(t *utesting.T) {
} }
// send announcement and wait for node to request the header // send announcement and wait for node to request the header
s.testAnnounce66(t, sendConn, receiveConn, blockAnnouncement) s.testAnnounce66(t, sendConn, receiveConn, blockAnnouncement)
// update test suite chain
s.chain.blocks = append(s.chain.blocks, s.fullChain.blocks[nextBlock])
// wait for client to update its chain // wait for client to update its chain
if err := receiveConn.waitForBlock66(s.chain.Head()); err != nil { if err := receiveConn.waitForBlock66(s.fullChain.blocks[nextBlock]); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// update test suite chain
s.chain.blocks = append(s.chain.blocks, s.fullChain.blocks[nextBlock])
} }
...@@ -260,22 +260,28 @@ func (s *Suite) TestGetBlockBodies(t *utesting.T) { ...@@ -260,22 +260,28 @@ func (s *Suite) TestGetBlockBodies(t *utesting.T) {
// TestBroadcast tests whether a block announcement is correctly // TestBroadcast tests whether a block announcement is correctly
// propagated to the given node's peer(s). // propagated to the given node's peer(s).
func (s *Suite) TestBroadcast(t *utesting.T) { func (s *Suite) TestBroadcast(t *utesting.T) {
s.sendNextBlock(t)
}
func (s *Suite) sendNextBlock(t *utesting.T) {
sendConn, receiveConn := s.setupConnection(t), s.setupConnection(t) sendConn, receiveConn := s.setupConnection(t), s.setupConnection(t)
defer sendConn.Close() defer sendConn.Close()
defer receiveConn.Close() defer receiveConn.Close()
// create new block announcement
nextBlock := len(s.chain.blocks) nextBlock := len(s.chain.blocks)
blockAnnouncement := &NewBlock{ blockAnnouncement := &NewBlock{
Block: s.fullChain.blocks[nextBlock], Block: s.fullChain.blocks[nextBlock],
TD: s.fullChain.TD(nextBlock + 1), TD: s.fullChain.TD(nextBlock + 1),
} }
// send announcement and wait for node to request the header
s.testAnnounce(t, sendConn, receiveConn, blockAnnouncement) s.testAnnounce(t, sendConn, receiveConn, blockAnnouncement)
// update test suite chain
s.chain.blocks = append(s.chain.blocks, s.fullChain.blocks[nextBlock])
// wait for client to update its chain // wait for client to update its chain
if err := receiveConn.waitForBlock(s.chain.Head()); err != nil { if err := receiveConn.waitForBlock(s.fullChain.blocks[nextBlock]); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// update test suite chain
s.chain.blocks = append(s.chain.blocks, s.fullChain.blocks[nextBlock])
} }
// TestMaliciousHandshake tries to send malicious data during the handshake. // TestMaliciousHandshake tries to send malicious data during the handshake.
...@@ -394,18 +400,7 @@ func (s *Suite) TestLargeAnnounce(t *utesting.T) { ...@@ -394,18 +400,7 @@ func (s *Suite) TestLargeAnnounce(t *utesting.T) {
sendConn.Close() sendConn.Close()
} }
// Test the last block as a valid block // Test the last block as a valid block
sendConn := s.setupConnection(t) s.sendNextBlock(t)
receiveConn := s.setupConnection(t)
defer sendConn.Close()
defer receiveConn.Close()
s.testAnnounce(t, sendConn, receiveConn, blocks[3])
// update test suite chain
s.chain.blocks = append(s.chain.blocks, s.fullChain.blocks[nextBlock])
// wait for client to update its chain
if err := receiveConn.waitForBlock(s.fullChain.blocks[nextBlock]); err != nil {
t.Fatal(err)
}
} }
func (s *Suite) TestOldAnnounce(t *utesting.T) { func (s *Suite) TestOldAnnounce(t *utesting.T) {
......
...@@ -334,8 +334,11 @@ loop: ...@@ -334,8 +334,11 @@ loop:
func (c *Conn) waitForBlock(block *types.Block) error { func (c *Conn) waitForBlock(block *types.Block) error {
defer c.SetReadDeadline(time.Time{}) defer c.SetReadDeadline(time.Time{})
timeout := time.Now().Add(20 * time.Second) c.SetReadDeadline(time.Now().Add(20 * time.Second))
c.SetReadDeadline(timeout) // note: if the node has not yet imported the block, it will respond
// to the GetBlockHeaders request with an empty BlockHeaders response,
// so the GetBlockHeaders request must be sent again until the BlockHeaders
// response contains the desired header.
for { for {
req := &GetBlockHeaders{Origin: eth.HashOrNumber{Hash: block.Hash()}, Amount: 1} req := &GetBlockHeaders{Origin: eth.HashOrNumber{Hash: block.Hash()}, Amount: 1}
if err := c.Write(req); err != nil { if err := c.Write(req); err != nil {
...@@ -343,9 +346,11 @@ func (c *Conn) waitForBlock(block *types.Block) error { ...@@ -343,9 +346,11 @@ func (c *Conn) waitForBlock(block *types.Block) error {
} }
switch msg := c.Read().(type) { switch msg := c.Read().(type) {
case *BlockHeaders: case *BlockHeaders:
if len(*msg) > 0 { for _, header := range *msg {
if header.Number.Uint64() == block.NumberU64() {
return nil return nil
} }
}
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
default: default:
return fmt.Errorf("invalid message: %s", pretty.Sdump(msg)) return fmt.Errorf("invalid message: %s", pretty.Sdump(msg))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment