good morning!!!!

Skip to content
Snippets Groups Projects
Select Git revision
  • 81f36df910533de63dc5ac66f38b5481961cc0c8
  • master default protected
  • v0.2.16-candidate
  • shivam/rpcAddBorTx
  • default-cli-config
  • shivam/minerRecommitFix
  • vcastellm/pos-296-bump-go-version-in-bor-and-heimdall
  • shivam/ethstats-backend-fix
  • v0.2.16-beta1-candidate
  • v0.2.15-beta3-candidate
  • shivam/newCli-IPC
  • v0.3.0-dev
  • checkpoint-whitelist-master
  • shivam/codecov
  • jdkanani/fix-typo-log
  • shivam/hardcoded-spans-v0.2.14
  • shivam/hardcoded-spans
  • shivam/fast-state-sync
  • shivam/fast-state-sync-master
  • gethv1.10.15-merge
  • fix-txpool-2
  • v0.2.14-tmp-span-hotfix
  • v0.2.15-beta2
  • v0.2.15-beta1
  • v0.3.0-beta3
  • v0.3.0-beta2
  • v0.3.0-beta1
  • v0.2.14
  • v0.2.13
  • v0.2.13-beta2
  • v0.2.13-beta1
  • v0.2.12
  • v0.2.12-beta3
  • v0.2.12-beta1
  • v0.2.12-beta2
  • v0.2.11
  • v0.2.10
  • v0.2.10-beta2
  • v0.2.9
  • v0.2.9-beta1
  • v0.2.8
41 results

server.go

Blame
  • Forked from github / maticnetwork / bor
    8962 commits behind the upstream repository.
    user avatar
    Felix Lange authored
    Message encoding functions have been renamed to catch any uses.
    The switch to the new encoder can cause subtle incompatibilities.
    If there are any users outside of our tree, they will at least be
    alerted that there was a change.
    
    NewMsg no longer exists. The replacements for EncodeMsg are called
    Send and SendItems.
    5ba51594
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    server.go 10.44 KiB
    package p2p
    
    import (
    	"bytes"
    	"crypto/ecdsa"
    	"errors"
    	"fmt"
    	"net"
    	"sync"
    	"time"
    
    	"github.com/ethereum/go-ethereum/logger"
    	"github.com/ethereum/go-ethereum/p2p/discover"
    	"github.com/ethereum/go-ethereum/p2p/nat"
    	"github.com/ethereum/go-ethereum/rlp"
    )
    
    const (
    	defaultDialTimeout   = 10 * time.Second
    	refreshPeersInterval = 30 * time.Second
    
    	// total timeout for encryption handshake and protocol
    	// handshake in both directions.
    	handshakeTimeout = 5 * time.Second
    	// maximum time allowed for reading a complete message.
    	// this is effectively the amount of time a connection can be idle.
    	frameReadTimeout = 1 * time.Minute
    	// maximum amount of time allowed for writing a complete message.
    	frameWriteTimeout = 5 * time.Second
    )
    
    var srvlog = logger.NewLogger("P2P Server")
    var srvjslog = logger.NewJsonLogger()
    
    // Server manages all peer connections.
    //
    // The fields of Server are used as configuration parameters.
    // You should set them before starting the Server. Fields may not be
    // modified while the server is running.
    type Server struct {
    	// This field must be set to a valid secp256k1 private key.
    	PrivateKey *ecdsa.PrivateKey
    
    	// MaxPeers is the maximum number of peers that can be
    	// connected. It must be greater than zero.
    	MaxPeers int
    
    	// Name sets the node name of this server.
    	// Use common.MakeName to create a name that follows existing conventions.
    	Name string
    
    	// Bootstrap nodes are used to establish connectivity
    	// with the rest of the network.
    	BootstrapNodes []*discover.Node
    
    	// Protocols should contain the protocols supported
    	// by the server. Matching protocols are launched for
    	// each peer.
    	Protocols []Protocol
    
    	// If ListenAddr is set to a non-nil address, the server
    	// will listen for incoming connections.
    	//
    	// If the port is zero, the operating system will pick a port. The
    	// ListenAddr field will be updated with the actual address when
    	// the server is started.
    	ListenAddr string
    
    	// If set to a non-nil value, the given NAT port mapper
    	// is used to make the listening port available to the
    	// Internet.
    	NAT nat.Interface
    
    	// If Dialer is set to a non-nil value, the given Dialer
    	// is used to dial outbound peer connections.
    	Dialer *net.Dialer
    
    	// If NoDial is true, the server will not dial any peers.
    	NoDial bool
    
    	// Hooks for testing. These are useful because we can inhibit
    	// the whole protocol stack.
    	setupFunc
    	newPeerHook
    
    	ourHandshake *protoHandshake
    
    	lock     sync.RWMutex
    	running  bool
    	listener net.Listener
    	peers    map[discover.NodeID]*Peer
    
    	ntab *discover.Table
    
    	quit        chan struct{}
    	loopWG      sync.WaitGroup // {dial,listen,nat}Loop
    	peerWG      sync.WaitGroup // active peer goroutines
    	peerConnect chan *discover.Node
    }
    
    type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node) (*conn, error)
    type newPeerHook func(*Peer)
    
    // Peers returns all connected peers.
    func (srv *Server) Peers() (peers []*Peer) {
    	srv.lock.RLock()
    	defer srv.lock.RUnlock()
    	for _, peer := range srv.peers {
    		if peer != nil {
    			peers = append(peers, peer)
    		}
    	}
    	return
    }
    
    // PeerCount returns the number of connected peers.
    func (srv *Server) PeerCount() int {
    	srv.lock.RLock()
    	n := len(srv.peers)
    	srv.lock.RUnlock()
    	return n
    }
    
    // SuggestPeer creates a connection to the given Node if it
    // is not already connected.
    func (srv *Server) SuggestPeer(n *discover.Node) {
    	srv.peerConnect <- n
    }
    
    // Broadcast sends an RLP-encoded message to all connected peers.
    // This method is deprecated and will be removed later.
    func (srv *Server) Broadcast(protocol string, code uint64, data interface{}) error {
    	var payload []byte
    	if data != nil {
    		var err error
    		payload, err = rlp.EncodeToBytes(data)
    		if err != nil {
    			return err
    		}
    	}
    	srv.lock.RLock()
    	defer srv.lock.RUnlock()
    	for _, peer := range srv.peers {
    		if peer != nil {
    			var msg = Msg{Code: code}
    			if data != nil {
    				msg.Payload = bytes.NewReader(payload)
    				msg.Size = uint32(len(payload))
    			}
    			peer.writeProtoMsg(protocol, msg)
    		}
    	}
    	return nil
    }
    
    // Start starts running the server.
    // Servers can be re-used and started again after stopping.
    func (srv *Server) Start() (err error) {
    	srv.lock.Lock()
    	defer srv.lock.Unlock()
    	if srv.running {
    		return errors.New("server already running")
    	}
    	srvlog.Infoln("Starting Server")
    
    	// static fields
    	if srv.PrivateKey == nil {
    		return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
    	}
    	if srv.MaxPeers <= 0 {
    		return fmt.Errorf("Server.MaxPeers must be > 0")
    	}
    	srv.quit = make(chan struct{})
    	srv.peers = make(map[discover.NodeID]*Peer)
    	srv.peerConnect = make(chan *discover.Node)
    	if srv.setupFunc == nil {
    		srv.setupFunc = setupConn
    	}
    
    	// node table
    	ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT)
    	if err != nil {
    		return err
    	}
    	srv.ntab = ntab
    
    	// handshake
    	srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self().ID}
    	for _, p := range srv.Protocols {
    		srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
    	}
    
    	// listen/dial
    	if srv.ListenAddr != "" {
    		if err := srv.startListening(); err != nil {
    			return err
    		}
    	}
    	if srv.Dialer == nil {
    		srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
    	}
    	if !srv.NoDial {
    		srv.loopWG.Add(1)
    		go srv.dialLoop()
    	}
    	if srv.NoDial && srv.ListenAddr == "" {
    		srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.")
    	}
    
    	srv.running = true
    	return nil
    }
    
    func (srv *Server) startListening() error {
    	listener, err := net.Listen("tcp", srv.ListenAddr)
    	if err != nil {
    		return err
    	}
    	laddr := listener.Addr().(*net.TCPAddr)
    	srv.ListenAddr = laddr.String()
    	srv.listener = listener
    	srv.loopWG.Add(1)
    	go srv.listenLoop()
    	if !laddr.IP.IsLoopback() && srv.NAT != nil {
    		srv.loopWG.Add(1)
    		go func() {
    			nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
    			srv.loopWG.Done()
    		}()
    	}
    	return nil
    }
    
    // Stop terminates the server and all active peer connections.
    // It blocks until all active connections have been closed.
    func (srv *Server) Stop() {
    	srv.lock.Lock()
    	if !srv.running {
    		srv.lock.Unlock()
    		return
    	}
    	srv.running = false
    	srv.lock.Unlock()
    
    	srvlog.Infoln("Stopping Server")
    	srv.ntab.Close()
    	if srv.listener != nil {
    		// this unblocks listener Accept
    		srv.listener.Close()
    	}
    	close(srv.quit)
    	srv.loopWG.Wait()
    
    	// No new peers can be added at this point because dialLoop and
    	// listenLoop are down. It is safe to call peerWG.Wait because
    	// peerWG.Add is not called outside of those loops.
    	for _, peer := range srv.peers {
    		peer.Disconnect(DiscQuitting)
    	}
    	srv.peerWG.Wait()
    }
    
    // main loop for adding connections via listening
    func (srv *Server) listenLoop() {
    	defer srv.loopWG.Done()
    	srvlog.Infoln("Listening on", srv.listener.Addr())
    	for {
    		conn, err := srv.listener.Accept()
    		if err != nil {
    			return
    		}
    		srvlog.Debugf("Accepted conn %v\n", conn.RemoteAddr())
    		srv.peerWG.Add(1)
    		go srv.startPeer(conn, nil)
    	}
    }
    
    func (srv *Server) dialLoop() {
    	defer srv.loopWG.Done()
    	refresh := time.NewTicker(refreshPeersInterval)
    	defer refresh.Stop()
    
    	srv.ntab.Bootstrap(srv.BootstrapNodes)
    	go srv.findPeers()
    
    	dialed := make(chan *discover.Node)
    	dialing := make(map[discover.NodeID]bool)
    
    	// TODO: limit number of active dials
    	// TODO: ensure only one findPeers goroutine is running
    	// TODO: pause findPeers when we're at capacity
    
    	for {
    		select {
    		case <-refresh.C:
    
    			go srv.findPeers()
    
    		case dest := <-srv.peerConnect:
    			// avoid dialing nodes that are already connected.
    			// there is another check for this in addPeer,
    			// which runs after the handshake.
    			srv.lock.Lock()
    			_, isconnected := srv.peers[dest.ID]
    			srv.lock.Unlock()
    			if isconnected || dialing[dest.ID] || dest.ID == srv.Self().ID {
    				continue
    			}
    
    			dialing[dest.ID] = true
    			srv.peerWG.Add(1)
    			go func() {
    				srv.dialNode(dest)
    				// at this point, the peer has been added
    				// or discarded. either way, we're not dialing it anymore.
    				dialed <- dest
    			}()
    
    		case dest := <-dialed:
    			delete(dialing, dest.ID)
    
    		case <-srv.quit:
    			// TODO: maybe wait for active dials
    			return
    		}
    	}
    }
    
    func (srv *Server) dialNode(dest *discover.Node) {
    	addr := &net.TCPAddr{IP: dest.IP, Port: dest.TCPPort}
    	srvlog.Debugf("Dialing %v\n", dest)
    	conn, err := srv.Dialer.Dial("tcp", addr.String())
    	if err != nil {
    		srvlog.DebugDetailf("dial error: %v", err)
    		return
    	}
    	srv.startPeer(conn, dest)
    }
    
    func (srv *Server) Self() *discover.Node {
    	return srv.ntab.Self()
    }
    
    func (srv *Server) findPeers() {
    	far := srv.Self().ID
    	for i := range far {
    		far[i] = ^far[i]
    	}
    	closeToSelf := srv.ntab.Lookup(srv.Self().ID)
    	farFromSelf := srv.ntab.Lookup(far)
    
    	for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ {
    		if i < len(closeToSelf) {
    			srv.peerConnect <- closeToSelf[i]
    		}
    		if i < len(farFromSelf) {
    			srv.peerConnect <- farFromSelf[i]
    		}
    	}
    }
    
    func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
    	// TODO: handle/store session token
    	fd.SetDeadline(time.Now().Add(handshakeTimeout))
    	conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest)
    	if err != nil {
    		fd.Close()
    		srvlog.Debugf("Handshake with %v failed: %v", fd.RemoteAddr(), err)
    		return
    	}
    
    	conn.MsgReadWriter = &netWrapper{
    		wrapped: conn.MsgReadWriter,
    		conn:    fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
    	}
    	p := newPeer(fd, conn, srv.Protocols)
    	if ok, reason := srv.addPeer(conn.ID, p); !ok {
    		srvlog.DebugDetailf("Not adding %v (%v)\n", p, reason)
    		p.politeDisconnect(reason)
    		return
    	}
    
    	srvlog.Debugf("Added %v\n", p)
    	srvjslog.LogJson(&logger.P2PConnected{
    		RemoteId:            fmt.Sprintf("%x", conn.ID[:]),
    		RemoteAddress:       fd.RemoteAddr().String(),
    		RemoteVersionString: conn.Name,
    		NumConnections:      srv.PeerCount(),
    	})
    
    	if srv.newPeerHook != nil {
    		srv.newPeerHook(p)
    	}
    	discreason := p.run()
    	srv.removePeer(p)
    
    	srvlog.Debugf("Removed %v (%v)\n", p, discreason)
    	srvjslog.LogJson(&logger.P2PDisconnected{
    		RemoteId:       fmt.Sprintf("%x", conn.ID[:]),
    		NumConnections: srv.PeerCount(),
    	})
    }
    
    func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
    	srv.lock.Lock()
    	defer srv.lock.Unlock()
    	switch {
    	case !srv.running:
    		return false, DiscQuitting
    	case len(srv.peers) >= srv.MaxPeers:
    		return false, DiscTooManyPeers
    	case srv.peers[id] != nil:
    		return false, DiscAlreadyConnected
    	case id == srv.Self().ID:
    		return false, DiscSelf
    	}
    	srv.peers[id] = p
    	return true, 0
    }
    
    func (srv *Server) removePeer(p *Peer) {
    	srv.lock.Lock()
    	delete(srv.peers, p.ID())
    	srv.lock.Unlock()
    	srv.peerWG.Done()
    }