From f8eb8fe64c1fa4ba85fe3e6af1f8bcdf6c936b04 Mon Sep 17 00:00:00 2001
From: Anton Evangelatov <anton.evangelatov@gmail.com>
Date: Fri, 26 Apr 2019 16:41:12 +0200
Subject: [PATCH] cmd/swarm-smoke: check if chunks are at most prox host

swarm/network: measure how many chunks a node delivers (#1358)
---
 cmd/swarm/swarm-smoke/upload_and_sync.go | 103 ++++++++++++++++++++++-
 swarm/network/stream/peer.go             |   5 +-
 2 files changed, 101 insertions(+), 7 deletions(-)

diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go
index 6a434a0b2..bbcf66b26 100644
--- a/cmd/swarm/swarm-smoke/upload_and_sync.go
+++ b/cmd/swarm/swarm-smoke/upload_and_sync.go
@@ -19,10 +19,12 @@ package main
 import (
 	"bytes"
 	"context"
+	"encoding/hex"
 	"fmt"
 	"io/ioutil"
 	"math/rand"
 	"os"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -30,6 +32,7 @@ import (
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/rpc"
+	"github.com/ethereum/go-ethereum/swarm/chunk"
 	"github.com/ethereum/go-ethereum/swarm/storage"
 	"github.com/ethereum/go-ethereum/swarm/testutil"
 
@@ -88,6 +91,10 @@ func trackChunks(testData []byte, submitMetrics bool) error {
 	var wg sync.WaitGroup
 	wg.Add(len(hosts))
 
+	var mu sync.Mutex                    // mutex protecting the allHostsChunks and bzzAddrs maps
+	allHostChunks := map[string]string{} // host->bitvector of presence for chunks
+	bzzAddrs := map[string]string{}      // host->bzzAddr
+
 	for _, host := range hosts {
 		host := host
 		go func() {
@@ -96,6 +103,7 @@ func trackChunks(testData []byte, submitMetrics bool) error {
 
 			ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 			defer cancel()
+
 			rpcClient, err := rpc.DialContext(ctx, httpHost)
 			if rpcClient != nil {
 				defer rpcClient.Close()
@@ -106,14 +114,25 @@ func trackChunks(testData []byte, submitMetrics bool) error {
 				return
 			}
 
-			var hostChunks string
-			err = rpcClient.Call(&hostChunks, "bzz_has", addrs)
+			hostChunks, err := getChunksBitVectorFromHost(rpcClient, addrs)
+			if err != nil {
+				log.Error("error getting chunks bit vector from host", "err", err, "host", httpHost)
+				hasErr = true
+				return
+			}
+
+			bzzAddr, err := getBzzAddrFromHost(rpcClient)
 			if err != nil {
-				log.Error("error calling rpc client", "err", err, "host", httpHost)
+				log.Error("error getting bzz addrs from host", "err", err, "host", httpHost)
 				hasErr = true
 				return
 			}
 
+			mu.Lock()
+			allHostChunks[host] = hostChunks
+			bzzAddrs[host] = bzzAddr
+			mu.Unlock()
+
 			yes, no := 0, 0
 			for _, val := range hostChunks {
 				if val == '1' {
@@ -140,6 +159,8 @@ func trackChunks(testData []byte, submitMetrics bool) error {
 
 	wg.Wait()
 
+	checkChunksVsMostProxHosts(addrs, allHostChunks, bzzAddrs)
+
 	if !hasErr && submitMetrics {
 		// remove the chunks stored on the uploader node
 		globalYes -= len(addrs)
@@ -152,6 +173,82 @@ func trackChunks(testData []byte, submitMetrics bool) error {
 	return nil
 }
 
+// getChunksBitVectorFromHost returns a bit vector of presence for a given slice of chunks from a given host
+func getChunksBitVectorFromHost(client *rpc.Client, addrs []storage.Address) (string, error) {
+	var hostChunks string
+
+	err := client.Call(&hostChunks, "bzz_has", addrs)
+	if err != nil {
+		return "", err
+	}
+
+	return hostChunks, nil
+}
+
+// getBzzAddrFromHost returns the bzzAddr for a given host
+func getBzzAddrFromHost(client *rpc.Client) (string, error) {
+	var hive string
+
+	err := client.Call(&hive, "bzz_hive")
+	if err != nil {
+		return "", err
+	}
+
+	// we make an ugly assumption about the output format of the hive.String() method
+	// ideally we should replace this with an API call that returns the bzz addr for a given host,
+	// but this also works for now (provided we don't change the hive.String() method, which we haven't in some time
+	return strings.Split(strings.Split(hive, "\n")[3], " ")[10], nil
+}
+
+// checkChunksVsMostProxHosts is checking:
+// 1. whether a chunk has been found at less than 2 hosts. Considering our NN size, this should not happen.
+// 2. if a chunk is not found at its closest node. This should also not happen.
+// Together with the --only-upload flag, we could run this smoke test and make sure that our syncing
+// functionality is correct (without even trying to retrieve the content).
+//
+// addrs - a slice with all uploaded chunk refs
+// allHostChunks - host->bit vector, showing what chunks are present on what hosts
+// bzzAddrs - host->bzz address, used when determining the most proximate host for a given chunk
+func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[string]string, bzzAddrs map[string]string) {
+	for k, v := range bzzAddrs {
+		log.Trace("bzzAddr", "bzz", v, "host", k)
+	}
+
+	for i := range addrs {
+		var foundAt int
+		maxProx := -1
+		var maxProxHost string
+		for host := range allHostChunks {
+			if allHostChunks[host][i] == '1' {
+				foundAt++
+			}
+
+			ba, err := hex.DecodeString(bzzAddrs[host])
+			if err != nil {
+				panic(err)
+			}
+
+			// calculate the host closest to any chunk
+			prox := chunk.Proximity(addrs[i], ba)
+			if prox > maxProx {
+				maxProx = prox
+				maxProxHost = host
+			}
+		}
+
+		if allHostChunks[maxProxHost][i] == '0' {
+			log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
+		} else {
+			log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
+		}
+
+		// if chunk found at less than 2 hosts
+		if foundAt < 2 {
+			log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i])
+		}
+	}
+}
+
 func getAllRefs(testData []byte) (storage.AddressCollection, error) {
 	datadir, err := ioutil.TempDir("", "chunk-debug")
 	if err != nil {
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 98b237ce2..2514dcad4 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -134,7 +134,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
 func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
 	var msg interface{}
 
-	spanName := "send.chunk.delivery"
+	metrics.GetOrRegisterCounter("peer.deliver", nil).Inc(1)
 
 	//we send different types of messages if delivery is for syncing or retrievals,
 	//even if handling and content of the message are the same,
@@ -144,16 +144,13 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8,
 			Addr:  chunk.Address(),
 			SData: chunk.Data(),
 		}
-		spanName += ".syncing"
 	} else {
 		msg = &ChunkDeliveryMsgRetrieval{
 			Addr:  chunk.Address(),
 			SData: chunk.Data(),
 		}
-		spanName += ".retrieval"
 	}
 
-	ctx = context.WithValue(ctx, "stream_send_tag", nil)
 	return p.SendPriority(ctx, msg, priority)
 }
 
-- 
GitLab