From fbb62c7a35ebf27b4517d558bb2fcefc98a837f8 Mon Sep 17 00:00:00 2001
From: leonardchinonso <36096513+leonardchinonso@users.noreply.github.com>
Date: Thu, 31 Mar 2022 13:40:09 +0100
Subject: [PATCH] Chore/test log subscription (#3786)

* saving

* Implemented and tested subscription of logs

* Fixed lint errors

* fixed compilation error

* Removed print statements across code

* made review changes

* Validated hex addresses
---
 cmd/devnettest/commands/account.go    |  20 +++--
 cmd/devnettest/commands/block.go      |   4 +
 cmd/devnettest/commands/event.go      |  29 ++++++
 cmd/devnettest/commands/parity.go     |   7 +-
 cmd/devnettest/requests/utils.go      |   9 ++
 cmd/devnettest/services/block.go      |  43 +--------
 cmd/devnettest/services/event.go      | 124 ++++++++++++++++++++++++++
 cmd/rpcdaemon/commands/eth_filters.go |   2 +-
 8 files changed, 187 insertions(+), 51 deletions(-)
 create mode 100644 cmd/devnettest/commands/event.go
 create mode 100644 cmd/devnettest/services/event.go

diff --git a/cmd/devnettest/commands/account.go b/cmd/devnettest/commands/account.go
index a6d32a01cb..4dc1ebb771 100644
--- a/cmd/devnettest/commands/account.go
+++ b/cmd/devnettest/commands/account.go
@@ -10,17 +10,17 @@ import (
 )
 
 var (
-	addr     string
-	blockNum string
+	accountAddr string
+	blockNum    string
 )
 
 func init() {
-	getBalanceCmd.Flags().StringVar(&addr, "addr", "", "String address to check")
+	getBalanceCmd.Flags().StringVar(&accountAddr, "addr", "", "String address to check")
 	getBalanceCmd.MarkFlagRequired("addr")
 	getBalanceCmd.Flags().StringVar(&blockNum, "block-num", "latest", "String denoting block number")
 	rootCmd.AddCommand(getBalanceCmd)
 
-	getTransactionCountCmd.Flags().StringVar(&addr, "addr", "", "String address to check")
+	getTransactionCountCmd.Flags().StringVar(&accountAddr, "addr", "", "String address to check")
 	getTransactionCountCmd.MarkFlagRequired("addr")
 	getTransactionCountCmd.Flags().StringVar(&blockNum, "block-num", "latest", "String denoting block number")
 	rootCmd.AddCommand(getTransactionCountCmd)
@@ -41,7 +41,11 @@ var getBalanceCmd = &cobra.Command{
 		if clearDev {
 			defer services.ClearDevDB()
 		}
-		address := common.HexToAddress(addr)
+		if !common.IsHexAddress(accountAddr) {
+			fmt.Printf("address: %v, is not a valid hex address\n", accountAddr)
+			return
+		}
+		address := common.HexToAddress(accountAddr)
 		if err := requests.GetBalance(reqId, address, blockNum); err != nil {
 			fmt.Printf("could not get balance: %v\n", err)
 		}
@@ -63,7 +67,11 @@ var getTransactionCountCmd = &cobra.Command{
 		if clearDev {
 			defer services.ClearDevDB()
 		}
-		address := common.HexToAddress(addr)
+		if !common.IsHexAddress(accountAddr) {
+			fmt.Printf("address: %v, is not a valid hex address\n", accountAddr)
+			return
+		}
+		address := common.HexToAddress(accountAddr)
 		if err := requests.GetTransactionCountCmd(reqId, address, blockNum); err != nil {
 			fmt.Printf("could not get transaction count: %v\n", err)
 		}
diff --git a/cmd/devnettest/commands/block.go b/cmd/devnettest/commands/block.go
index 34dbc3633d..8aab81ec14 100644
--- a/cmd/devnettest/commands/block.go
+++ b/cmd/devnettest/commands/block.go
@@ -3,6 +3,7 @@ package commands
 import (
 	"context"
 	"fmt"
+	"github.com/ledgerwatch/erigon/common"
 
 	"github.com/ledgerwatch/erigon/cmd/devnettest/requests"
 	"github.com/ledgerwatch/erigon/cmd/devnettest/services"
@@ -41,6 +42,9 @@ var sendTxCmd = &cobra.Command{
 			if sendAddr == "" {
 				return fmt.Errorf("string address to send to must be present")
 			}
+			if !common.IsHexAddress(sendAddr) {
+				return fmt.Errorf("address: %v, is not a valid hex address\n", sendAddr)
+			}
 		}
 		return nil
 	},
diff --git a/cmd/devnettest/commands/event.go b/cmd/devnettest/commands/event.go
new file mode 100644
index 0000000000..58a3e36c5c
--- /dev/null
+++ b/cmd/devnettest/commands/event.go
@@ -0,0 +1,29 @@
+package commands
+
+import (
+	"fmt"
+
+	"github.com/ledgerwatch/erigon/cmd/devnettest/services"
+	"github.com/spf13/cobra"
+)
+
+var (
+	eventAddr   []string
+	eventTopics []string
+)
+
+func init() {
+	LogsCmd.Flags().StringSliceVar(&eventAddr, "addr", []string{}, "an address or a list of addresses separated by commas")
+	LogsCmd.Flags().StringSliceVar(&eventTopics, "topics", []string{}, "a topic or a list of topics separated by commas")
+	rootCmd.AddCommand(LogsCmd)
+}
+
+var LogsCmd = &cobra.Command{
+	Use:   "logs",
+	Short: "Subscribes to log event sends a notification each time a new log appears",
+	Run: func(cmd *cobra.Command, args []string) {
+		if err := services.Logs(eventAddr, eventTopics); err != nil {
+			fmt.Printf("could not subscribe to log events: %v\n", err)
+		}
+	},
+}
diff --git a/cmd/devnettest/commands/parity.go b/cmd/devnettest/commands/parity.go
index 3d77dd5e07..2ef80b3938 100644
--- a/cmd/devnettest/commands/parity.go
+++ b/cmd/devnettest/commands/parity.go
@@ -16,7 +16,7 @@ var (
 )
 
 func init() {
-	listStorageKeysCmd.Flags().StringVar(&addr, "addr", "", "String address to list keys")
+	listStorageKeysCmd.Flags().StringVar(&accountAddr, "addr", "", "String address to list keys")
 	listStorageKeysCmd.MarkFlagRequired("addr")
 	listStorageKeysCmd.Flags().StringVar(&offsetAddr, "offset", "", "Offset storage key from which the batch should start")
 	listStorageKeysCmd.Flags().IntVar(&quantity, "quantity", 10, "Integer number of addresses to display in a batch")
@@ -32,7 +32,10 @@ var listStorageKeysCmd = &cobra.Command{
 		if clearDev {
 			defer services.ClearDevDB()
 		}
-		toAddress := common.HexToAddress(addr)
+		if !common.IsHexAddress(accountAddr) {
+			return fmt.Errorf("address: %v, is not a valid hex address\n", accountAddr)
+		}
+		toAddress := common.HexToAddress(accountAddr)
 		offset := common.Hex2Bytes(strings.TrimSuffix(offsetAddr, "0x"))
 		if err := requests.ParityList(reqId, toAddress, quantity, offset, blockNum); err != nil {
 			fmt.Printf("error getting parity list: %v\n", err)
diff --git a/cmd/devnettest/requests/utils.go b/cmd/devnettest/requests/utils.go
index ef49310a00..aa32cef3d8 100644
--- a/cmd/devnettest/requests/utils.go
+++ b/cmd/devnettest/requests/utils.go
@@ -25,3 +25,12 @@ func parseResponse(resp interface{}) (string, error) {
 
 	return string(result), nil
 }
+
+// NamespaceAndSubMethodFromMethod splits a parent method into namespace and the actual method
+func NamespaceAndSubMethodFromMethod(method string) (string, string, error) {
+	parts := strings.SplitN(method, "_", 2)
+	if len(parts) != 2 {
+		return "", "", fmt.Errorf("invalid string to split")
+	}
+	return parts[0], parts[1], nil
+}
diff --git a/cmd/devnettest/services/block.go b/cmd/devnettest/services/block.go
index 0ea4246244..4086b8c020 100644
--- a/cmd/devnettest/services/block.go
+++ b/cmd/devnettest/services/block.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"fmt"
 	"math/big"
-	"strings"
 	"time"
 
 	"github.com/holiman/uint256"
@@ -110,7 +109,7 @@ func SearchBlockForTx(txnHash common.Hash) (uint64, error) {
 	fmt.Println()
 	fmt.Println("Connected to web socket successfully")
 
-	blockN, err := subscribe(client, "eth_newHeads", txnHash)
+	blockN, err := subscribeToNewHeads(client, "eth_newHeads", txnHash)
 	if err != nil {
 		return 0, fmt.Errorf("failed to subscribe to ws: %v", err)
 	}
@@ -118,46 +117,6 @@ func SearchBlockForTx(txnHash common.Hash) (uint64, error) {
 	return blockN, nil
 }
 
-// subscribe makes a ws subscription using eth_newHeads
-func subscribe(client *rpc.Client, method string, hash common.Hash) (uint64, error) {
-	parts := strings.SplitN(method, "_", 2)
-	namespace := parts[0]
-	method = parts[1]
-	ch := make(chan interface{})
-	sub, err := client.Subscribe(context.Background(), namespace, ch, []interface{}{method}...)
-	if err != nil {
-		return uint64(0), fmt.Errorf("client failed to subscribe: %v", err)
-	}
-	defer sub.Unsubscribe()
-
-	var (
-		blockCount int
-		blockN     uint64
-	)
-ForLoop:
-	for {
-		select {
-		case v := <-ch:
-			blockCount++
-			blockNumber := v.(map[string]interface{})["number"]
-			fmt.Printf("Searching for the transaction in block with number: %+v, type: %[1]T\n", blockNumber.(string))
-			num, foundTx, err := blockHasHash(client, hash, blockNumber.(string))
-			if err != nil {
-				return uint64(0), fmt.Errorf("could not verify if current block contains the tx hash: %v", err)
-			}
-			if foundTx || blockCount == 128 {
-				blockN = num
-				break ForLoop
-			}
-		case err := <-sub.Err():
-			return uint64(0), fmt.Errorf("subscription error from client: %v", err)
-		}
-	}
-
-	return blockN, nil
-
-}
-
 // blockHasHash checks if the current block has the transaction hash in its list of transactions
 func blockHasHash(client *rpc.Client, hash common.Hash, blockNumber string) (uint64, bool, error) {
 	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
diff --git a/cmd/devnettest/services/event.go b/cmd/devnettest/services/event.go
new file mode 100644
index 0000000000..3aa0f906c0
--- /dev/null
+++ b/cmd/devnettest/services/event.go
@@ -0,0 +1,124 @@
+package services
+
+import (
+	"context"
+	"fmt"
+	"github.com/ledgerwatch/erigon/cmd/devnettest/requests"
+	"github.com/ledgerwatch/erigon/common"
+	"github.com/ledgerwatch/erigon/rpc"
+)
+
+const numberOfIterations = 128
+
+// subscribe connects to a websocket and returns the subscription handler and a channel buffer
+func subscribe(client *rpc.Client, method string, args ...interface{}) (*rpc.ClientSubscription, chan interface{}, error) {
+	var (
+		namespace string
+		subMethod string
+		splitErr  error
+	)
+
+	namespace, subMethod, splitErr = requests.NamespaceAndSubMethodFromMethod(method)
+	if splitErr != nil {
+		return nil, nil, fmt.Errorf("cannot get namespace and method from method: %v", splitErr)
+	}
+
+	ch := make(chan interface{})
+	var arr = []interface{}{subMethod}
+	arr = append(arr, args...)
+
+	sub, err := client.Subscribe(context.Background(), namespace, ch, arr...)
+	if err != nil {
+		return nil, nil, fmt.Errorf("client failed to subscribe: %v", err)
+	}
+
+	return sub, ch, nil
+}
+
+// subscribeToNewHeads makes a ws subscription for eth_newHeads
+func subscribeToNewHeads(client *rpc.Client, method string, hash common.Hash) (uint64, error) {
+	sub, ch, err := subscribe(client, method)
+	if err != nil {
+		return uint64(0), fmt.Errorf("error subscribing to newHeads: %v", err)
+	}
+	defer sub.Unsubscribe()
+
+	var (
+		blockCount int
+		blockN     uint64
+	)
+ForLoop:
+	for {
+		select {
+		case v := <-ch:
+			blockCount++
+			blockNumber := v.(map[string]interface{})["number"]
+			fmt.Printf("Searching for the transaction in block with number: %+v\n", blockNumber.(string))
+			num, foundTx, err := blockHasHash(client, hash, blockNumber.(string))
+			if err != nil {
+				return uint64(0), fmt.Errorf("could not verify if current block contains the tx hash: %v", err)
+			}
+			if foundTx || blockCount == numberOfIterations {
+				blockN = num
+				break ForLoop
+			}
+		case err := <-sub.Err():
+			return uint64(0), fmt.Errorf("subscription error from client: %v", err)
+		}
+	}
+
+	return blockN, nil
+}
+
+// Logs dials a websocket connection and listens for log events by calling subscribeToLogs
+func Logs(addresses, topics []string) error {
+	client, clientErr := rpc.DialWebsocket(context.Background(), "ws://127.0.0.1:8545", "")
+	if clientErr != nil {
+		return fmt.Errorf("failed to dial websocket: %v", clientErr)
+	}
+	fmt.Println()
+	fmt.Println("Connected to web socket successfully")
+
+	if err := subscribeToLogs(client, "eth_logs", addresses, topics); err != nil {
+		return fmt.Errorf("failed to subscribe to logs: %v", err)
+	}
+
+	return nil
+}
+
+// subscribeToLogs makes a ws subscription for eth_subscribeLogs
+func subscribeToLogs(client *rpc.Client, method string, addresses []string, topics []string) error {
+	params := map[string][]string{
+		"address": addresses,
+		"topics":  topics,
+	}
+
+	sub, ch, err := subscribe(client, method, params)
+	if err != nil {
+		return fmt.Errorf("error subscribing to logs: %v", err)
+	}
+	defer sub.Unsubscribe()
+
+	var count int
+
+ForLoop:
+	for {
+		select {
+		case v := <-ch:
+			count++
+			_map := v.(map[string]interface{})
+			for k, val := range _map {
+				fmt.Printf("%s: %+v, ", k, val)
+			}
+			fmt.Println()
+			fmt.Println()
+			if count == numberOfIterations {
+				break ForLoop
+			}
+		case err := <-sub.Err():
+			return fmt.Errorf("subscription error from client: %v", err)
+		}
+	}
+
+	return nil
+}
diff --git a/cmd/rpcdaemon/commands/eth_filters.go b/cmd/rpcdaemon/commands/eth_filters.go
index 85db9e123e..0d92d5b9ff 100644
--- a/cmd/rpcdaemon/commands/eth_filters.go
+++ b/cmd/rpcdaemon/commands/eth_filters.go
@@ -112,7 +112,7 @@ func (api *APIImpl) NewPendingTransactions(ctx context.Context) (*rpc.Subscripti
 	return rpcSub, nil
 }
 
-// SubscribeLogs send a notification each time a new log appears.
+// Logs send a notification each time a new log appears.
 func (api *APIImpl) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) {
 	if api.filters == nil {
 		return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
-- 
GitLab