good morning!!!!

Skip to content
Snippets Groups Projects
Unverified Commit 7194c847 authored by Felix Lange's avatar Felix Lange Committed by GitHub
Browse files

p2p/rlpx: reduce allocation and syscalls (#22899)

This change significantly improves the performance of RLPx message reads
and writes. In the previous implementation, reading and writing of
message frames performed multiple reads and writes on the underlying
network connection, and allocated a new []byte buffer for every read.

In the new implementation, reads and writes re-use buffers, and perform
much fewer system calls on the underlying connection. This doubles the
theoretically achievable throughput on a single connection, as shown by
the benchmark result:

    name             old speed      new speed       delta
    Throughput-8     70.3MB/s ± 0%  155.4MB/s ± 0%  +121.11%  (p=0.000 n=9+8)

The change also removes support for the legacy, pre-EIP-8 handshake encoding.
As of May 2021, no actively maintained client sends this format.
parent 2e7714f8
No related branches found
No related tags found
No related merge requests found
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rlpx
import (
"io"
)
// readBuffer implements buffering for network reads. This type is similar to bufio.Reader,
// with two crucial differences: the buffer slice is exposed, and the buffer keeps all
// read data available until reset.
//
// How to use this type:
//
// Keep a readBuffer b alongside the underlying network connection. When reading a packet
// from the connection, first call b.reset(). This empties b.data. Now perform reads
// through b.read() until the end of the packet is reached. The complete packet data is
// now available in b.data.
type readBuffer struct {
data []byte
end int
}
// reset removes all processed data which was read since the last call to reset.
// After reset, len(b.data) is zero.
func (b *readBuffer) reset() {
unprocessed := b.end - len(b.data)
copy(b.data[:unprocessed], b.data[len(b.data):b.end])
b.end = unprocessed
b.data = b.data[:0]
}
// read reads at least n bytes from r, returning the bytes.
// The returned slice is valid until the next call to reset.
func (b *readBuffer) read(r io.Reader, n int) ([]byte, error) {
offset := len(b.data)
have := b.end - len(b.data)
// If n bytes are available in the buffer, there is no need to read from r at all.
if have >= n {
b.data = b.data[:offset+n]
return b.data[offset : offset+n], nil
}
// Make buffer space available.
need := n - have
b.grow(need)
// Read.
rn, err := io.ReadAtLeast(r, b.data[b.end:cap(b.data)], need)
if err != nil {
return nil, err
}
b.end += rn
b.data = b.data[:offset+n]
return b.data[offset : offset+n], nil
}
// grow ensures the buffer has at least n bytes of unused space.
func (b *readBuffer) grow(n int) {
if cap(b.data)-b.end >= n {
return
}
need := n - (cap(b.data) - b.end)
offset := len(b.data)
b.data = append(b.data[:cap(b.data)], make([]byte, need)...)
b.data = b.data[:offset]
}
// writeBuffer implements buffering for network writes. This is essentially
// a convenience wrapper around a byte slice.
type writeBuffer struct {
data []byte
}
func (b *writeBuffer) reset() {
b.data = b.data[:0]
}
func (b *writeBuffer) appendZero(n int) []byte {
offset := len(b.data)
b.data = append(b.data, make([]byte, n)...)
return b.data[offset : offset+n]
}
func (b *writeBuffer) Write(data []byte) (int, error) {
b.data = append(b.data, data...)
return len(data), nil
}
const maxUint24 = int(^uint32(0) >> 8)
func readUint24(b []byte) uint32 {
return uint32(b[2]) | uint32(b[1])<<8 | uint32(b[0])<<16
}
func putUint24(v uint32, b []byte) {
b[0] = byte(v >> 16)
b[1] = byte(v >> 8)
b[2] = byte(v)
}
// growslice ensures b has the wanted length by either expanding it to its capacity
// or allocating a new slice if b has insufficient capacity.
func growslice(b []byte, wantLength int) []byte {
if len(b) >= wantLength {
return b
}
if cap(b) >= wantLength {
return b[:cap(b)]
}
return make([]byte, wantLength)
}
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rlpx
import (
"bytes"
"testing"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/assert"
)
func TestReadBufferReset(t *testing.T) {
reader := bytes.NewReader(hexutil.MustDecode("0x010202030303040505"))
var b readBuffer
s1, _ := b.read(reader, 1)
s2, _ := b.read(reader, 2)
s3, _ := b.read(reader, 3)
assert.Equal(t, []byte{1}, s1)
assert.Equal(t, []byte{2, 2}, s2)
assert.Equal(t, []byte{3, 3, 3}, s3)
b.reset()
s4, _ := b.read(reader, 1)
s5, _ := b.read(reader, 2)
assert.Equal(t, []byte{4}, s4)
assert.Equal(t, []byte{5, 5}, s5)
s6, err := b.read(reader, 2)
assert.EqualError(t, err, "EOF")
assert.Nil(t, s6)
}
This diff is collapsed.
......@@ -22,6 +22,7 @@ import (
"encoding/hex"
"fmt"
"io"
"math/rand"
"net"
"reflect"
"strings"
......@@ -30,6 +31,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/ecies"
"github.com/ethereum/go-ethereum/p2p/simulations/pipes"
"github.com/ethereum/go-ethereum/rlp"
"github.com/stretchr/testify/assert"
)
......@@ -124,7 +126,7 @@ func TestFrameReadWrite(t *testing.T) {
IngressMAC: hash,
EgressMAC: hash,
})
h := conn.handshake
h := conn.session
golden := unhex(`
00828ddae471818bb0bfa6b551d1cb42
......@@ -166,27 +168,11 @@ func (h fakeHash) Sum(b []byte) []byte { return append(b, h...) }
type handshakeAuthTest struct {
input string
isPlain bool
wantVersion uint
wantRest []rlp.RawValue
}
var eip8HandshakeAuthTests = []handshakeAuthTest{
// (Auth₁) RLPx v4 plain encoding
{
input: `
048ca79ad18e4b0659fab4853fe5bc58eb83992980f4c9cc147d2aa31532efd29a3d3dc6a3d89eaf
913150cfc777ce0ce4af2758bf4810235f6e6ceccfee1acc6b22c005e9e3a49d6448610a58e98744
ba3ac0399e82692d67c1f58849050b3024e21a52c9d3b01d871ff5f210817912773e610443a9ef14
2e91cdba0bd77b5fdf0769b05671fc35f83d83e4d3b0b000c6b2a1b1bba89e0fc51bf4e460df3105
c444f14be226458940d6061c296350937ffd5e3acaceeaaefd3c6f74be8e23e0f45163cc7ebd7622
0f0128410fd05250273156d548a414444ae2f7dea4dfca2d43c057adb701a715bf59f6fb66b2d1d2
0f2c703f851cbf5ac47396d9ca65b6260bd141ac4d53e2de585a73d1750780db4c9ee4cd4d225173
a4592ee77e2bd94d0be3691f3b406f9bba9b591fc63facc016bfa8
`,
isPlain: true,
wantVersion: 4,
},
// (Auth₂) EIP-8 encoding
{
input: `
......@@ -233,18 +219,6 @@ type handshakeAckTest struct {
}
var eip8HandshakeRespTests = []handshakeAckTest{
// (Ack₁) RLPx v4 plain encoding
{
input: `
049f8abcfa9c0dc65b982e98af921bc0ba6e4243169348a236abe9df5f93aa69d99cadddaa387662
b0ff2c08e9006d5a11a278b1b3331e5aaabf0a32f01281b6f4ede0e09a2d5f585b26513cb794d963
5a57563921c04a9090b4f14ee42be1a5461049af4ea7a7f49bf4c97a352d39c8d02ee4acc416388c
1c66cec761d2bc1c72da6ba143477f049c9d2dde846c252c111b904f630ac98e51609b3b1f58168d
dca6505b7196532e5f85b259a20c45e1979491683fee108e9660edbf38f3add489ae73e3dda2c71b
d1497113d5c755e942d1
`,
wantVersion: 4,
},
// (Ack₂) EIP-8 encoding
{
input: `
......@@ -287,10 +261,13 @@ var eip8HandshakeRespTests = []handshakeAckTest{
},
}
var (
keyA, _ = crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee")
keyB, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
)
func TestHandshakeForwardCompatibility(t *testing.T) {
var (
keyA, _ = crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee")
keyB, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
pubA = crypto.FromECDSAPub(&keyA.PublicKey)[1:]
pubB = crypto.FromECDSAPub(&keyB.PublicKey)[1:]
ephA, _ = crypto.HexToECDSA("869d6ecf5211f1cc60418a13b9d870b22959d0c16f02bec714c960dd2298a32d")
......@@ -304,7 +281,7 @@ func TestHandshakeForwardCompatibility(t *testing.T) {
_ = authSignature
)
makeAuth := func(test handshakeAuthTest) *authMsgV4 {
msg := &authMsgV4{Version: test.wantVersion, Rest: test.wantRest, gotPlain: test.isPlain}
msg := &authMsgV4{Version: test.wantVersion, Rest: test.wantRest}
copy(msg.Signature[:], authSignature)
copy(msg.InitiatorPubkey[:], pubA)
copy(msg.Nonce[:], nonceA)
......@@ -319,9 +296,10 @@ func TestHandshakeForwardCompatibility(t *testing.T) {
// check auth msg parsing
for _, test := range eip8HandshakeAuthTests {
var h handshakeState
r := bytes.NewReader(unhex(test.input))
msg := new(authMsgV4)
ciphertext, err := readHandshakeMsg(msg, encAuthMsgLen, keyB, r)
ciphertext, err := h.readMsg(msg, keyB, r)
if err != nil {
t.Errorf("error for input %x:\n %v", unhex(test.input), err)
continue
......@@ -337,10 +315,11 @@ func TestHandshakeForwardCompatibility(t *testing.T) {
// check auth resp parsing
for _, test := range eip8HandshakeRespTests {
var h handshakeState
input := unhex(test.input)
r := bytes.NewReader(input)
msg := new(authRespV4)
ciphertext, err := readHandshakeMsg(msg, encAuthRespLen, keyA, r)
ciphertext, err := h.readMsg(msg, keyA, r)
if err != nil {
t.Errorf("error for input %x:\n %v", input, err)
continue
......@@ -356,14 +335,14 @@ func TestHandshakeForwardCompatibility(t *testing.T) {
// check derivation for (Auth₂, Ack₂) on recipient side
var (
hs = &encHandshake{
hs = &handshakeState{
initiator: false,
respNonce: nonceB,
randomPrivKey: ecies.ImportECDSA(ephB),
}
authCiphertext = unhex(eip8HandshakeAuthTests[1].input)
authRespCiphertext = unhex(eip8HandshakeRespTests[1].input)
authMsg = makeAuth(eip8HandshakeAuthTests[1])
authCiphertext = unhex(eip8HandshakeAuthTests[0].input)
authRespCiphertext = unhex(eip8HandshakeRespTests[0].input)
authMsg = makeAuth(eip8HandshakeAuthTests[0])
wantAES = unhex("80e8632c05fed6fc2a13b0f8d31a3cf645366239170ea067065aba8e28bac487")
wantMAC = unhex("2ea74ec5dae199227dff1af715362700e989d889d7a493cb0639691efb8e5f98")
wantFooIngressHash = unhex("0c7ec6340062cc46f5e9f1e3cf86f8c8c403c5a0964f5df0ebd34a75ddc86db5")
......@@ -388,6 +367,74 @@ func TestHandshakeForwardCompatibility(t *testing.T) {
}
}
func BenchmarkHandshakeRead(b *testing.B) {
var input = unhex(eip8HandshakeAuthTests[0].input)
for i := 0; i < b.N; i++ {
var (
h handshakeState
r = bytes.NewReader(input)
msg = new(authMsgV4)
)
if _, err := h.readMsg(msg, keyB, r); err != nil {
b.Fatal(err)
}
}
}
func BenchmarkThroughput(b *testing.B) {
pipe1, pipe2, err := pipes.TCPPipe()
if err != nil {
b.Fatal(err)
}
var (
conn1, conn2 = NewConn(pipe1, nil), NewConn(pipe2, &keyA.PublicKey)
handshakeDone = make(chan error, 1)
msgdata = make([]byte, 1024)
rand = rand.New(rand.NewSource(1337))
)
rand.Read(msgdata)
// Server side.
go func() {
defer conn1.Close()
// Perform handshake.
_, err := conn1.Handshake(keyA)
handshakeDone <- err
if err != nil {
return
}
conn1.SetSnappy(true)
// Keep sending messages until connection closed.
for {
if _, err := conn1.Write(0, msgdata); err != nil {
return
}
}
}()
// Set up client side.
defer conn2.Close()
if _, err := conn2.Handshake(keyB); err != nil {
b.Fatal("client handshake error:", err)
}
conn2.SetSnappy(true)
if err := <-handshakeDone; err != nil {
b.Fatal("server hanshake error:", err)
}
// Read N messages.
b.SetBytes(int64(len(msgdata)))
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, _, _, err := conn2.Read()
if err != nil {
b.Fatal("read error:", err)
}
}
}
func unhex(str string) []byte {
r := strings.NewReplacer("\t", "", " ", "", "\n", "")
b, err := hex.DecodeString(r.Replace(str))
......
......@@ -25,6 +25,7 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/rlpx"
......@@ -62,6 +63,10 @@ func (t *rlpxTransport) ReadMsg() (Msg, error) {
t.conn.SetReadDeadline(time.Now().Add(frameReadTimeout))
code, data, wireSize, err := t.conn.Read()
if err == nil {
// Protocol messages are dispatched to subprotocol handlers asynchronously,
// but package rlpx may reuse the returned 'data' buffer on the next call
// to Read. Copy the message data to avoid this being an issue.
data = common.CopyBytes(data)
msg = Msg{
ReceivedAt: time.Now(),
Code: code,
......
......@@ -34,6 +34,14 @@ func ListSize(contentSize uint64) uint64 {
return uint64(headsize(contentSize)) + contentSize
}
// IntSize returns the encoded size of the integer x.
func IntSize(x uint64) int {
if x < 0x80 {
return 1
}
return 1 + intsize(x)
}
// Split returns the content of first RLP value and any
// bytes after the value as subslices of b.
func Split(b []byte) (k Kind, content, rest []byte, err error) {
......
......@@ -263,6 +263,12 @@ func TestAppendUint64(t *testing.T) {
if !bytes.Equal(x, unhex(test.output)) {
t.Errorf("AppendUint64(%v, %d): got %x, want %s", test.slice, test.input, x, test.output)
}
// Check that IntSize returns the appended size.
length := len(x) - len(test.slice)
if s := IntSize(test.input); s != length {
t.Errorf("IntSize(%d): got %d, want %d", test.input, s, length)
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment