From cb8a7d979d1a4b2a32317ee0a77815ec2ea38b9f Mon Sep 17 00:00:00 2001
From: obscuren <geffobscura@gmail.com>
Date: Sun, 2 Feb 2014 16:15:39 +0100
Subject: [PATCH] upnp test

---
 ethereum.go | 107 ++++++++++++++++++++++++++++++++--------------------
 peer.go     |  35 ++++++++++++-----
 2 files changed, 91 insertions(+), 51 deletions(-)

diff --git a/ethereum.go b/ethereum.go
index a7a2f6b8c..eab40e93d 100644
--- a/ethereum.go
+++ b/ethereum.go
@@ -9,6 +9,7 @@ import (
 	"log"
 	"net"
 	"strconv"
+	"sync"
 	"sync/atomic"
 	"time"
 )
@@ -45,9 +46,14 @@ type Ethereum struct {
 	Addr net.Addr
 
 	nat NAT
+
+	peerMut sync.Mutex
+
+	// Capabilities for outgoing peers
+	serverCaps Caps
 }
 
-func New() (*Ethereum, error) {
+func New(caps Caps) (*Ethereum, error) {
 	//db, err := ethdb.NewLDBDatabase()
 	db, err := ethdb.NewMemDatabase()
 	if err != nil {
@@ -56,12 +62,11 @@ func New() (*Ethereum, error) {
 
 	ethutil.Config.Db = db
 
-	/*
-		nat, err := Discover()
-		if err != nil {
-			log.Printf("Can'them discover upnp: %v", err)
-		}
-	*/
+	nat, err := Discover()
+	if err != nil {
+		log.Printf("Can't discover upnp: %v", err)
+	}
+	log.Println(nat)
 
 	nonce, _ := ethutil.RandomUint64()
 	ethereum := &Ethereum{
@@ -69,7 +74,8 @@ func New() (*Ethereum, error) {
 		db:           db,
 		peers:        list.New(),
 		Nonce:        nonce,
-		//nat:          nat,
+		serverCaps:   caps,
+		nat:          nat,
 	}
 	ethereum.TxPool = ethchain.NewTxPool()
 	ethereum.TxPool.Speaker = ethereum
@@ -85,13 +91,8 @@ func (s *Ethereum) AddPeer(conn net.Conn) {
 	peer := NewPeer(conn, s, true)
 
 	if peer != nil {
-		if s.peers.Len() > 25 {
-			log.Println("SEED")
-			peer.Start(true)
-		} else {
-			s.peers.PushBack(peer)
-			peer.Start(false)
-		}
+		s.peers.PushBack(peer)
+		peer.Start(false)
 	}
 }
 
@@ -122,7 +123,7 @@ func (s *Ethereum) ConnectToPeer(addr string) error {
 		return nil
 	}
 
-	peer := NewOutboundPeer(addr, s)
+	peer := NewOutboundPeer(addr, s, s.serverCaps)
 
 	s.peers.PushBack(peer)
 
@@ -158,12 +159,18 @@ func (s *Ethereum) InboundPeers() []*Peer {
 }
 
 func (s *Ethereum) InOutPeers() []*Peer {
+	// Reap the dead peers first
+	s.reapPeers()
+
 	// Create a new peer slice with at least the length of the total peers
 	inboundPeers := make([]*Peer, s.peers.Len())
 	length := 0
 	eachPeer(s.peers, func(p *Peer, e *list.Element) {
-		inboundPeers[length] = p
-		length++
+		// Only return peers with an actual ip
+		if len(p.host) > 0 {
+			inboundPeers[length] = p
+			length++
+		}
 	})
 
 	return inboundPeers[:length]
@@ -171,6 +178,10 @@ func (s *Ethereum) InOutPeers() []*Peer {
 
 func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []interface{}) {
 	msg := ethwire.NewMessage(msgType, data)
+	s.BroadcastMsg(msg)
+}
+
+func (s *Ethereum) BroadcastMsg(msg *ethwire.Msg) {
 	eachPeer(s.peers, func(p *Peer, e *list.Element) {
 		p.QueueMessage(msg)
 	})
@@ -180,15 +191,25 @@ func (s *Ethereum) Peers() *list.List {
 	return s.peers
 }
 
-func (s *Ethereum) ReapDeadPeers() {
-	for {
-		eachPeer(s.peers, func(p *Peer, e *list.Element) {
-			if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
-				s.peers.Remove(e)
-			}
-		})
+func (s *Ethereum) reapPeers() {
+	s.peerMut.Lock()
+	defer s.peerMut.Unlock()
+
+	eachPeer(s.peers, func(p *Peer, e *list.Element) {
+		if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
+			s.peers.Remove(e)
+		}
+	})
+}
 
-		time.Sleep(processReapingTimeout * time.Second)
+func (s *Ethereum) ReapDeadPeerHandler() {
+	reapTimer := time.NewTicker(processReapingTimeout * time.Second)
+
+	for {
+		select {
+		case <-reapTimer.C:
+			s.reapPeers()
+		}
 	}
 }
 
@@ -241,29 +262,33 @@ func (s *Ethereum) Start() {
 	} else {
 		s.Addr = ln.Addr()
 		// Starting accepting connections
-		go func() {
-			log.Println("Ready and accepting connections")
-
-			for {
-				conn, err := ln.Accept()
-				if err != nil {
-					log.Println(err)
-
-					continue
-				}
-
-				go s.AddPeer(conn)
-			}
-		}()
+		log.Println("Ready and accepting connections")
+		// Start the peer handler
+		go s.peerHandler(ln)
 	}
 
+	go s.upnpUpdateThread()
+
 	// Start the reaping processes
-	go s.ReapDeadPeers()
+	go s.ReapDeadPeerHandler()
 
 	// Start the tx pool
 	s.TxPool.Start()
 }
 
+func (s *Ethereum) peerHandler(listener net.Listener) {
+	for {
+		conn, err := listener.Accept()
+		if err != nil {
+			log.Println(err)
+
+			continue
+		}
+
+		go s.AddPeer(conn)
+	}
+}
+
 func (s *Ethereum) Stop() {
 	// Close the database
 	defer s.db.Close()
diff --git a/peer.go b/peer.go
index 5d22b545c..a715e205d 100644
--- a/peer.go
+++ b/peer.go
@@ -24,6 +24,8 @@ const (
 	CapDiscoveryTy = 0x01
 	CapTxTy        = 0x02
 	CapChainTy     = 0x04
+
+	CapDefault = CapChainTy | CapTxTy | CapDiscoveryTy
 )
 
 var capsToString = map[Caps]string{
@@ -95,7 +97,7 @@ func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer {
 	}
 }
 
-func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer {
+func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer {
 	p := &Peer{
 		outputQueue: make(chan *ethwire.Msg, outputBufferSize),
 		quit:        make(chan bool),
@@ -103,6 +105,7 @@ func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer {
 		inbound:     false,
 		connected:   0,
 		disconnect:  0,
+		caps:        caps,
 	}
 
 	// Set up the connection in another goroutine so we don't block the main thread
@@ -165,7 +168,8 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) {
 // Outbound message handler. Outbound messages are handled here
 func (p *Peer) HandleOutbound() {
 	// The ping timer. Makes sure that every 2 minutes a ping is send to the peer
-	tickleTimer := time.NewTicker(2 * time.Minute)
+	pingTimer := time.NewTicker(2 * time.Minute)
+	serviceTimer := time.NewTicker(5 * time.Second)
 out:
 	for {
 		select {
@@ -175,11 +179,20 @@ out:
 
 			p.lastSend = time.Now()
 
-		case <-tickleTimer.C:
+		// Ping timer sends a ping to the peer each 2 minutes
+		case <-pingTimer.C:
 			p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, ""))
 
-			// Break out of the for loop if a quit message is posted
+		// Service timer takes care of peer broadcasting, transaction
+		// posting or block posting
+		case <-serviceTimer.C:
+			if p.caps&CapDiscoveryTy > 0 {
+				msg := p.peersMessage()
+				p.ethereum.BroadcastMsg(msg)
+			}
+
 		case <-p.quit:
+			// Break out of the for loop if a quit message is posted
 			break out
 		}
 	}
@@ -387,7 +400,7 @@ func (p *Peer) Stop() {
 
 func (p *Peer) pushHandshake() error {
 	msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{
-		uint32(0), uint32(0), "/Ethereum(G) v0.0.1/", CapChainTy | CapTxTy | CapDiscoveryTy, p.port,
+		uint32(0), uint32(0), "/Ethereum(G) v0.0.1/", p.caps, p.port,
 	})
 
 	p.QueueMessage(msg)
@@ -395,18 +408,20 @@ func (p *Peer) pushHandshake() error {
 	return nil
 }
 
-// Pushes the list of outbound peers to the client when requested
-func (p *Peer) pushPeers() {
+func (p *Peer) peersMessage() *ethwire.Msg {
 	outPeers := make([]interface{}, len(p.ethereum.InOutPeers()))
 	// Serialise each peer
 	for i, peer := range p.ethereum.InOutPeers() {
 		outPeers[i] = peer.RlpData()
 	}
 
-	// Send message to the peer with the known list of connected clients
-	msg := ethwire.NewMessage(ethwire.MsgPeersTy, outPeers)
+	// Return the message to the peer with the known list of connected clients
+	return ethwire.NewMessage(ethwire.MsgPeersTy, outPeers)
+}
 
-	p.QueueMessage(msg)
+// Pushes the list of outbound peers to the client when requested
+func (p *Peer) pushPeers() {
+	p.QueueMessage(p.peersMessage())
 }
 
 func (p *Peer) handleHandshake(msg *ethwire.Msg) {
-- 
GitLab