diff --git a/Makefile b/Makefile index 8eb512b048a628dcf06a41c6dd39648997679528..0bfc8a6e2a9da30f2137b8657c2ad48d6f885116 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ GO = go GOBIN = $(CURDIR)/build/bin -GOTEST = GODEBUG=cgocheck=0 $(GO) test -tags nosqlite -trimpath ./... -p 2 +GOTEST = GODEBUG=cgocheck=0 $(GO) test -tags nosqlite,noboltdb -trimpath ./... -p 2 GIT_COMMIT ?= $(shell git rev-list -1 HEAD) GIT_BRANCH ?= $(shell git rev-parse --abbrev-ref HEAD) @@ -11,8 +11,8 @@ CGO_CFLAGS += -DMDBX_FORCE_ASSERTIONS=1 # Enable MDBX's asserts by default in 'd CGO_CFLAGS := CGO_CFLAGS="$(CGO_CFLAGS)" DBG_CGO_CFLAGS += -DMDBX_DEBUG=1 -GOBUILD = $(CGO_CFLAGS) $(GO) build -tags nosqlite -trimpath -ldflags "-X github.com/ledgerwatch/erigon/params.GitCommit=${GIT_COMMIT} -X github.com/ledgerwatch/erigon/params.GitBranch=${GIT_BRANCH} -X github.com/ledgerwatch/erigon/params.GitTag=${GIT_TAG}" -GO_DBG_BUILD = $(DBG_CGO_CFLAGS) $(GO) build -tags nosqlite -trimpath -tags=debug -ldflags "-X github.com/ledgerwatch/erigon/params.GitCommit=${GIT_COMMIT} -X github.com/ledgerwatch/erigon/params.GitBranch=${GIT_BRANCH} -X github.com/ledgerwatch/erigon/params.GitTag=${GIT_TAG}" -gcflags=all="-N -l" # see delve docs +GOBUILD = $(CGO_CFLAGS) $(GO) build -tags nosqlite,noboltdb -trimpath -ldflags "-X github.com/ledgerwatch/erigon/params.GitCommit=${GIT_COMMIT} -X github.com/ledgerwatch/erigon/params.GitBranch=${GIT_BRANCH} -X github.com/ledgerwatch/erigon/params.GitTag=${GIT_TAG}" +GO_DBG_BUILD = $(DBG_CGO_CFLAGS) $(GO) build -tags nosqlite,noboltdb -trimpath -tags=debug -ldflags "-X github.com/ledgerwatch/erigon/params.GitCommit=${GIT_COMMIT} -X github.com/ledgerwatch/erigon/params.GitBranch=${GIT_BRANCH} -X github.com/ledgerwatch/erigon/params.GitTag=${GIT_TAG}" -gcflags=all="-N -l" # see delve docs GO_MAJOR_VERSION = $(shell $(GO) version | cut -c 14- | cut -d' ' -f1 | cut -d'.' -f1) GO_MINOR_VERSION = $(shell $(GO) version | cut -c 14- | cut -d' ' -f1 | cut -d'.' -f2) diff --git a/cmd/downloader/downloader/downloader.go b/cmd/downloader/downloader/downloader.go index 72adb3c018c7b5d1d249086d46409092132ae530..7451a68a2f40e06fa7f3a40ba5df8f04e4a9e69e 100644 --- a/cmd/downloader/downloader/downloader.go +++ b/cmd/downloader/downloader/downloader.go @@ -3,7 +3,6 @@ package downloader import ( "context" "fmt" - "path/filepath" "runtime" "time" @@ -12,9 +11,8 @@ import ( common2 "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" + "github.com/ledgerwatch/erigon/cmd/downloader/downloader/torrentcfg" "github.com/ledgerwatch/log/v3" - mdbx2 "github.com/torquem-ch/mdbx-go/mdbx" ) const ASSERT = false @@ -22,36 +20,29 @@ const ASSERT = false type Protocols struct { TorrentClient *torrent.Client DB kv.RwDB + cfg *torrentcfg.Cfg } -func New(cfg *torrent.ClientConfig, snapshotDir *dir.Rw) (*Protocols, error) { - db, err := mdbx.NewMDBX(log.New()). - Flags(func(f uint) uint { return f | mdbx2.WriteMap | mdbx2.SafeNoSync }). - SyncPeriod(15 * time.Second). - Path(filepath.Join(snapshotDir.Path, "db")). - Open() - if err != nil { - return nil, err - } - - peerID, err := readPeerID(db) +func New(cfg *torrentcfg.Cfg, snapshotDir *dir.Rw) (*Protocols, error) { + peerID, err := readPeerID(cfg.DB) if err != nil { return nil, fmt.Errorf("get peer id: %w", err) } cfg.PeerID = string(peerID) - torrentClient, err := torrent.NewClient(cfg) + torrentClient, err := torrent.NewClient(cfg.ClientConfig) if err != nil { return nil, fmt.Errorf("fail to start torrent client: %w", err) } if len(peerID) == 0 { - if err = savePeerID(db, torrentClient.PeerID()); err != nil { + if err = savePeerID(cfg.DB, torrentClient.PeerID()); err != nil { return nil, fmt.Errorf("save peer id: %w", err) } } return &Protocols{ + cfg: cfg, TorrentClient: torrentClient, - DB: db, + DB: cfg.DB, }, nil } @@ -81,6 +72,9 @@ func (cli *Protocols) Close() { } cli.TorrentClient.Close() cli.DB.Close() + if cli.cfg.CompletionCloser != nil { + cli.cfg.CompletionCloser.Close() //nolint + } } func (cli *Protocols) PeerID() []byte { diff --git a/cmd/downloader/downloader/torrentcfg/mdbx_piece_completion.go b/cmd/downloader/downloader/torrentcfg/mdbx_piece_completion.go new file mode 100644 index 0000000000000000000000000000000000000000..8af9f11c913322582dc470c40d2249c884da7178 --- /dev/null +++ b/cmd/downloader/downloader/torrentcfg/mdbx_piece_completion.go @@ -0,0 +1,73 @@ +package torrentcfg + +import ( + "context" + "encoding/binary" + + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" + "github.com/ledgerwatch/erigon-lib/kv" +) + +const ( + complete = "c" + incomplete = "i" +) + +type mdbxPieceCompletion struct { + db kv.RwDB +} + +var _ storage.PieceCompletion = (*mdbxPieceCompletion)(nil) + +func NewMdbxPieceCompletion(db kv.RwDB) (ret storage.PieceCompletion, err error) { + ret = &mdbxPieceCompletion{db} + return +} + +func (me mdbxPieceCompletion) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) { + err = me.db.View(context.Background(), func(tx kv.Tx) error { + var key [4]byte + binary.BigEndian.PutUint32(key[:], uint32(pk.Index)) + cn.Ok = true + v, err := tx.GetOne(kv.BittorrentCompletion, append(pk.InfoHash[:], key[:]...)) + if err != nil { + return err + } + switch string(v) { + case complete: + cn.Complete = true + case incomplete: + cn.Complete = false + default: + cn.Ok = false + } + return nil + }) + return +} + +func (me mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool) error { + if c, err := me.Get(pk); err == nil && c.Ok && c.Complete == b { + return nil + } + return me.db.Update(context.Background(), func(tx kv.RwTx) error { + var key [4]byte + binary.BigEndian.PutUint32(key[:], uint32(pk.Index)) + + v := []byte(incomplete) + if b { + v = []byte(complete) + } + err := tx.Put(kv.BittorrentCompletion, append(pk.InfoHash[:], key[:]...), v) + if err != nil { + return err + } + return nil + }) +} + +func (me *mdbxPieceCompletion) Close() error { + me.db.Close() + return nil +} diff --git a/cmd/downloader/downloader/torrentcfg/mdbx_piece_completion_test.go b/cmd/downloader/downloader/torrentcfg/mdbx_piece_completion_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d66bc4d2951f0cac6dcbb477f7399c0c0b680335 --- /dev/null +++ b/cmd/downloader/downloader/torrentcfg/mdbx_piece_completion_test.go @@ -0,0 +1,37 @@ +package torrentcfg + +import ( + "testing" + + "github.com/ledgerwatch/erigon-lib/kv/memdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" +) + +func TestMdbxPieceCompletion(t *testing.T) { + db := memdb.NewTestDownloaderDB(t) + pc, err := NewMdbxPieceCompletion(db) + require.NoError(t, err) + defer pc.Close() + + pk := metainfo.PieceKey{} + + b, err := pc.Get(pk) + require.NoError(t, err) + assert.False(t, b.Ok) + + require.NoError(t, pc.Set(pk, false)) + + b, err = pc.Get(pk) + require.NoError(t, err) + assert.Equal(t, storage.Completion{Complete: false, Ok: true}, b) + + require.NoError(t, pc.Set(pk, true)) + + b, err = pc.Get(pk) + require.NoError(t, err) + assert.Equal(t, storage.Completion{Complete: true, Ok: true}, b) +} diff --git a/cmd/downloader/downloader/torrentcfg/torrentcfg.go b/cmd/downloader/downloader/torrentcfg/torrentcfg.go index 676eb1d468d9c78d6fac9d2703cba4477b169a40..fa493df3dc4bfa431d8c65ec30cc0201553057f7 100644 --- a/cmd/downloader/downloader/torrentcfg/torrentcfg.go +++ b/cmd/downloader/downloader/torrentcfg/torrentcfg.go @@ -10,6 +10,7 @@ import ( "github.com/anacrolix/torrent/storage" "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon-lib/common/dir" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/p2p/nat" "github.com/ledgerwatch/log/v3" "golang.org/x/time/rate" @@ -20,6 +21,12 @@ import ( // see https://wiki.theory.org/BitTorrentSpecification#Metainfo_File_Structure const DefaultPieceSize = 2 * 1024 * 1024 +type Cfg struct { + *torrent.ClientConfig + DB kv.RwDB + CompletionCloser io.Closer +} + func Default() *torrent.ClientConfig { torrentConfig := torrent.NewDefaultClientConfig() @@ -37,14 +44,14 @@ func Default() *torrent.ClientConfig { return torrentConfig } -func New(snapshotsDir *dir.Rw, verbosity lg.Level, natif nat.Interface, downloadRate, uploadRate datasize.ByteSize, port, maxPeers, connsPerFile int) (*torrent.ClientConfig, io.Closer, error) { +func New(snapshotsDir *dir.Rw, verbosity lg.Level, natif nat.Interface, downloadRate, uploadRate datasize.ByteSize, port, maxPeers, connsPerFile int, db kv.RwDB) (*Cfg, error) { torrentConfig := Default() // We would-like to reduce amount of goroutines in Erigon, so reducing next params torrentConfig.EstablishedConnsPerTorrent = connsPerFile // default: 50 torrentConfig.TorrentPeersHighWater = maxPeers // default: 500 torrentConfig.TorrentPeersLowWater = 5 // default: 50 torrentConfig.HalfOpenConnsPerTorrent = 5 // default: 25 - torrentConfig.TotalHalfOpenConns = 10 // default: 100 + torrentConfig.TotalHalfOpenConns = 100 // default: 100 torrentConfig.ListenPort = port torrentConfig.Seed = true @@ -71,8 +78,8 @@ func New(snapshotsDir *dir.Rw, verbosity lg.Level, natif nat.Interface, download } } // rates are divided by 2 - I don't know why it works, maybe bug inside torrent lib accounting - torrentConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(uploadRate.Bytes()/2), 2*DefaultPieceSize) // default: unlimited - torrentConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(downloadRate.Bytes()/2), 2*DefaultPieceSize) // default: unlimited + torrentConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(uploadRate.Bytes()), 2*DefaultPieceSize) // default: unlimited + torrentConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(downloadRate.Bytes()), 2*DefaultPieceSize) // default: unlimited // debug if lg.Debug == verbosity { @@ -81,11 +88,11 @@ func New(snapshotsDir *dir.Rw, verbosity lg.Level, natif nat.Interface, download torrentConfig.Logger = lg.Default.FilterLevel(verbosity) torrentConfig.Logger.Handlers = []lg.Handler{adapterHandler{}} - c, err := storage.NewBoltPieceCompletion(snapshotsDir.Path) + c, err := NewMdbxPieceCompletion(db) if err != nil { - return nil, nil, fmt.Errorf("NewBoltPieceCompletion: %w", err) + return nil, fmt.Errorf("NewBoltPieceCompletion: %w", err) } m := storage.NewMMapWithCompletion(snapshotsDir.Path, c) torrentConfig.DefaultStorage = m - return torrentConfig, m, nil + return &Cfg{ClientConfig: torrentConfig, DB: db, CompletionCloser: m}, nil } diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index c3d4f12afc54f7bacc3254bb4264622dd17c6fca..0668ac8b3e85a93cee4c295748d1cc0a134503d6 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -17,6 +17,8 @@ import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/dir" proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/cmd/downloader/downloader" "github.com/ledgerwatch/erigon/cmd/downloader/downloader/torrentcfg" "github.com/ledgerwatch/erigon/cmd/utils" @@ -26,6 +28,7 @@ import ( "github.com/ledgerwatch/log/v3" "github.com/pelletier/go-toml/v2" "github.com/spf13/cobra" + mdbx2 "github.com/torquem-ch/mdbx-go/mdbx" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/health" @@ -58,7 +61,7 @@ func init() { rootCmd.Flags().StringVar(&downloaderApiAddr, "downloader.api.addr", "127.0.0.1:9093", "external downloader api network address, for example: 127.0.0.1:9093 serves remote downloader interface") rootCmd.Flags().StringVar(&torrentVerbosity, "torrent.verbosity", lg.Warning.LogString(), "DEBUG | INFO | WARN | ERROR") rootCmd.Flags().StringVar(&downloadRateStr, "torrent.download.rate", "8mb", "bytes per second, example: 32mb") - rootCmd.Flags().StringVar(&uploadRateStr, "torrent.upload.rate", "8mb", "bytes per second, example: 32mb") + rootCmd.Flags().StringVar(&uploadRateStr, "torrent.upload.rate", "4mb", "bytes per second, example: 32mb") rootCmd.Flags().IntVar(&torrentPort, "torrent.port", 42069, "port to listen and serve BitTorrent protocol") rootCmd.Flags().IntVar(&torrentMaxPeers, "torrent.maxpeers", 10, "") rootCmd.Flags().IntVar(&torrentConnsPerFile, "torrent.conns.perfile", 5, "connections per file") @@ -134,11 +137,24 @@ func Downloader(ctx context.Context) error { return fmt.Errorf("invalid nat option %s: %w", natSetting, err) } - cfg, pieceCompletion, err := torrentcfg.New(snapshotDir, torrentLogLevel, natif, downloadRate, uploadRate, torrentPort, torrentMaxPeers, torrentConnsPerFile) + db, err := mdbx.NewMDBX(log.New()). + Flags(func(f uint) uint { return f | mdbx2.SafeNoSync }). + Label(kv.DownloaderDB). + WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { + return kv.DownloaderTablesCfg + }). + SyncPeriod(15 * time.Second). + Path(filepath.Join(snapshotDir.Path, "db")). + Open() if err != nil { return err } - defer pieceCompletion.Close() + + cfg, err := torrentcfg.New(snapshotDir, torrentLogLevel, natif, downloadRate, uploadRate, torrentPort, torrentMaxPeers, torrentConnsPerFile, db) + if err != nil { + return err + } + defer cfg.CompletionCloser.Close() protocols, err := downloader.New(cfg, snapshotDir) if err != nil { diff --git a/cmd/downloader/tracker/main.go b/cmd/downloader/tracker/main.go deleted file mode 100644 index 5ff9cb5847e15ced64e9bce1e2f613745933c0c3..0000000000000000000000000000000000000000 --- a/cmd/downloader/tracker/main.go +++ /dev/null @@ -1,379 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "net" - "net/http" - "os" - "strconv" - "strings" - "time" - - "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/tracker" - common2 "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon/cmd/utils" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/erigon/internal/debug" - "github.com/ledgerwatch/log/v3" - "github.com/spf13/cobra" -) - -const DefaultInterval = 60 //in seconds -const SoftLimit = 5 //in seconds -const DisconnectInterval = time.Minute //in seconds -var trackerID = "erigon snapshot tracker" - -func init() { - utils.CobraFlags(rootCmd, append(debug.Flags, utils.MetricFlags...)) -} - -func main() { - ctx, cancel := common2.RootContext() - defer cancel() - - if err := rootCmd.ExecuteContext(ctx); err != nil { - fmt.Println(err) - os.Exit(1) - } -} - -var rootCmd = &cobra.Command{ - Use: "start", - Short: "start tracker", - PersistentPreRun: func(cmd *cobra.Command, args []string) { - if err := debug.SetupCobra(cmd); err != nil { - panic(err) - } - }, - PersistentPostRun: func(cmd *cobra.Command, args []string) { - debug.Exit() - }, - Args: cobra.ExactArgs(1), - ArgAliases: []string{"snapshots dir"}, - RunE: func(cmd *cobra.Command, args []string) error { - db := mdbx.MustOpen(args[0]) - m := http.NewServeMux() - m.Handle("/announce", &Tracker{db: db}) - m.HandleFunc("/scrape", func(writer http.ResponseWriter, request *http.Request) { - log.Warn("scrape", "url", request.RequestURI) - ih := request.URL.Query().Get("info_hash") - if len(ih) != 20 { - log.Error("wronng infohash", "ih", ih, "l", len(ih)) - WriteResp(writer, ErrResponse{FailureReason: "incorrect infohash"}, false) - return - } - resp := ScrapeResponse{Files: map[string]*ScrapeData{ - ih: {}, - }} - - err := db.View(context.Background(), func(tx kv.Tx) error { - c, err := tx.Cursor(kv.SnapshotInfo) - if err != nil { - return err - } - defer c.Close() - return ethdb.Walk(c, append([]byte(ih), make([]byte, 20)...), 20*8, func(k, v []byte) (bool, error) { - a := AnnounceReqWithTime{} - err := json.Unmarshal(v, &a) - if err != nil { - log.Error("Fail to unmarshall", "k", common.Bytes2Hex(k), "err", err) - //skip failed - return true, nil - } - if time.Since(a.UpdatedAt) > 24*time.Hour { - log.Trace("Skipped", "k", common.Bytes2Hex(k), "last updated", a.UpdatedAt) - return true, nil - } - if a.Left == 0 { - resp.Files[ih].Downloaded++ - resp.Files[ih].Complete++ - } else { - resp.Files[ih].Incomplete++ - } - return true, nil - }) - }) - if err != nil { - log.Error("Walk", "err", err) - WriteResp(writer, ErrResponse{FailureReason: err.Error()}, false) - return - } - jsonResp, err := json.Marshal(resp) - if err == nil { - log.Info("scrape resp", "v", string(jsonResp)) - } else { - log.Info("marshall scrape resp", "err", err) - } - - WriteResp(writer, resp, false) - }) - m.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { - log.Warn("404", "url", request.RequestURI) - }) - - log.Info("Listen1") - go func() { - err := http.ListenAndServe(":80", m) - log.Error("error", "err", err) - }() - <-cmd.Context().Done() - return nil - }, -} - -type Tracker struct { - db kv.RwDB -} - -/* -/announce?compact=1 -&downloaded=0 -&event="started" -&info_hash=D%22%5C%80%F7%FD%12Z%EA%9B%F0%A5z%DA%AF%1F%A4%E1je -&left=0 -&peer_id=-GT0002-9%EA%FB+%BF%B3%AD%DE%8Ae%D0%B7 -&port=53631 -&supportcrypto=1 -&uploaded=0" -*/ -type AnnounceReqWithTime struct { - AnnounceReq - UpdatedAt time.Time -} -type AnnounceReq struct { - InfoHash []byte - PeerID []byte - RemoteAddr net.IP - Port int - Event string - Uploaded int64 - Downloaded int64 - SupportCrypto bool - Left int64 - Compact bool -} - -//type Peer struct { -// IP string -// Port int -// PeerID []byte -//} - -func (t *Tracker) ServeHTTP(w http.ResponseWriter, r *http.Request) { - log.Info("call", "url", r.RequestURI) - - req, err := ParseRequest(r) - if err != nil { - log.Error("Parse request", "err", err) - WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact) - return - } - if err = ValidateReq(req); err != nil { - log.Error("Validate failed", "err", err) - WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact) - return - } - - toSave := AnnounceReqWithTime{ - req, - time.Now(), - } - peerBytes, err := json.Marshal(toSave) - if err != nil { - log.Error("Json marshal", "err", err) - WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact) - return - } - - key := append(req.InfoHash, req.PeerID...) - if req.Event == tracker.Stopped.String() { - err = t.db.Update(context.Background(), func(tx kv.RwTx) error { - return tx.Delete(kv.SnapshotInfo, key, nil) - }) - if err != nil { - log.Error("Json marshal", "err", err) - WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact) - return - } - } else { - var prevBytes []byte - err = t.db.View(context.Background(), func(tx kv.Tx) error { - prevBytes, err = tx.GetOne(kv.SnapshotInfo, key) - return err - }) - if err != nil { - log.Error("get from db is return error", "err", err) - WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact) - return - } - if prevBytes == nil { - return - } - - prev := new(AnnounceReqWithTime) - err = json.Unmarshal(prevBytes, prev) - if err != nil { - log.Error("Unable to unmarshall", "err", err) - } - if time.Since(prev.UpdatedAt) < time.Second*SoftLimit { - //too early to update - WriteResp(w, ErrResponse{FailureReason: "too early to update"}, req.Compact) - return - - } - if err = t.db.Update(context.Background(), func(tx kv.RwTx) error { - return tx.Put(kv.SnapshotInfo, key, peerBytes) - }); err != nil { - log.Error("db.Put", "err", err) - WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact) - return - } - } - - resp := HttpResponse{ - Interval: DefaultInterval, - TrackerId: trackerID, - } - - if err := t.db.View(context.Background(), func(tx kv.Tx) error { - return tx.ForPrefix(kv.SnapshotInfo, append(req.InfoHash, make([]byte, 20)...), func(k, v []byte) error { - a := AnnounceReqWithTime{} - err = json.Unmarshal(v, &a) - if err != nil { - log.Error("Fail to unmarshall", "k", common.Bytes2Hex(k), "err", err) - //skip failed - return nil - } - if time.Since(a.UpdatedAt) > 5*DisconnectInterval { - log.Trace("Skipped requset", "peer", common.Bytes2Hex(a.PeerID), "last updated", a.UpdatedAt, "now", time.Now()) - return nil - } - if a.Left == 0 { - resp.Complete++ - } else { - resp.Incomplete++ - } - resp.Peers = append(resp.Peers, map[string]interface{}{ - "ip": a.RemoteAddr.String(), - "peer id": a.PeerID, - "port": a.Port, - }) - return nil - }) - }); err != nil { - log.Error("Walk", "err", err) - WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact) - return - } - jsonResp, err := json.Marshal(resp) - if err == nil { - log.Info("announce resp", "v", string(jsonResp)) - } else { - log.Info("marshall announce resp", "err", err) - } - - WriteResp(w, resp, req.Compact) -} - -func WriteResp(w http.ResponseWriter, res interface{}, compact bool) { - if _, ok := res.(ErrResponse); ok { - log.Error("Err", "err", res) - } - if compact { - err := bencode.NewEncoder(w).Encode(res) - if err != nil { - log.Error("Bencode encode", "err", err) - } - } else { - err := json.NewEncoder(w).Encode(res) - if err != nil { - log.Error("Json marshal", "err", err) - return - } - } -} - -func ParseRequest(r *http.Request) (AnnounceReq, error) { - q := r.URL.Query() - - var remoteAddr net.IP - if strings.Contains(r.RemoteAddr, ":") { - remoteAddr = net.ParseIP(strings.Split(r.RemoteAddr, ":")[0]) - } else { - remoteAddr = net.ParseIP(r.RemoteAddr) - } - - downloaded, err := strconv.ParseInt(q.Get("downloaded"), 10, 64) - if err != nil { - log.Warn("downloaded", "err", err) - return AnnounceReq{}, fmt.Errorf("downloaded %v - %w", q.Get("downloaded"), err) - } - uploaded, err := strconv.ParseInt(q.Get("uploaded"), 10, 64) - if err != nil { - log.Warn("uploaded", "err", err) - return AnnounceReq{}, fmt.Errorf("uploaded %v - %w", q.Get("uploaded"), err) - } - left, err := strconv.ParseInt(q.Get("left"), 10, 64) - if err != nil { - log.Warn("left", "err", err) - return AnnounceReq{}, fmt.Errorf("left: %v - %w", q.Get("left"), err) - } - port, err := strconv.Atoi(q.Get("port")) - if err != nil { - return AnnounceReq{}, fmt.Errorf("port: %v - %w", q.Get("port"), err) - } - - res := AnnounceReq{ - InfoHash: []byte(q.Get("info_hash")), - PeerID: []byte(q.Get("peer_id")), - RemoteAddr: remoteAddr, - Event: q.Get("event"), - Compact: q.Get("compact") == "1", - SupportCrypto: q.Get("supportcrypto") == "1", - Downloaded: downloaded, - Uploaded: uploaded, - Left: left, - Port: port, - } - return res, nil -} - -func ValidateReq(req AnnounceReq) error { - if len(req.InfoHash) != 20 { - return errors.New("invalid infohash") - } - if len(req.PeerID) != 20 { - return errors.New("invalid peer id") - } - if req.Port == 0 { - return errors.New("invalid port") - } - return nil -} - -type HttpResponse struct { - Interval int32 `bencode:"interval" json:"interval"` - TrackerId string `bencode:"tracker id" json:"tracker_id"` - Complete int32 `bencode:"complete" json:"complete"` - Incomplete int32 `bencode:"incomplete" json:"incomplete"` - Peers []map[string]interface{} `bencode:"peers" json:"peers"` -} -type ErrResponse struct { - FailureReason string `bencode:"failure reason" json:"failure_reason"` -} -type ScrapeResponse struct { - Files map[string]*ScrapeData `json:"files" bencode:"files"` -} - -type ScrapeData struct { - Complete int32 `bencode:"complete" json:"complete"` - Downloaded int32 `json:"downloaded" bencode:"downloaded"` - Incomplete int32 `json:"incomplete" bencode:"incomplete"` -} diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 08a130fcb8bc6009e8f3e08caecaf11d023d8ef6..7f7c346b8d756ac91001a0cb122e6229d16ad4b7 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -28,12 +28,14 @@ import ( "strings" "text/tabwriter" "text/template" + "time" lg "github.com/anacrolix/log" "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/kvcache" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/txpool" "github.com/ledgerwatch/erigon/cmd/downloader/downloader/torrentcfg" "github.com/ledgerwatch/log/v3" @@ -58,6 +60,7 @@ import ( "github.com/ledgerwatch/erigon/p2p/nat" "github.com/ledgerwatch/erigon/p2p/netutil" "github.com/ledgerwatch/erigon/params" + mdbx2 "github.com/torquem-ch/mdbx-go/mdbx" ) func init() { @@ -636,7 +639,7 @@ var ( } TorrentDownloadRateFlag = cli.StringFlag{ Name: "torrent.download.rate", - Value: "4mb", + Value: "8mb", Usage: "bytes per second, example: 32mb", } TorrentUploadRateFlag = cli.StringFlag{ @@ -1366,19 +1369,26 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Conf panic(err) } - torrentCfg, dirCloser, err := torrentcfg.New(cfg.SnapshotDir, + db := mdbx.NewMDBX(log.New()). + Flags(func(f uint) uint { return f | mdbx2.SafeNoSync }). + Label(kv.DownloaderDB). + WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DownloaderTablesCfg }). + SyncPeriod(15 * time.Second). + Path(filepath.Join(cfg.SnapshotDir.Path, "db")). + MustOpen() + var err error + cfg.Torrent, err = torrentcfg.New(cfg.SnapshotDir, torrentcfg.String2LogLevel[ctx.GlobalString(TorrentVerbosityFlag.Name)], nodeConfig.P2P.NAT, downloadRate, uploadRate, ctx.GlobalInt(TorrentPortFlag.Name), ctx.GlobalInt(TorrentMaxPeersFlag.Name), ctx.GlobalInt(TorrentConnsPerFileFlag.Name), + db, ) if err != nil { panic(err) } - cfg.Torrent = torrentCfg - cfg.TorrentDirCloser = dirCloser } nodeConfig.Http.Snapshot = cfg.Snapshot diff --git a/eth/backend.go b/eth/backend.go index 047dc8a1628ee07a0fb513893313c93aacaadbb7..34f5598d5ac828098ad4b7ab5f829f35d5f5b462 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -798,9 +798,6 @@ func (s *Ethereum) Stop() error { if s.downloadProtocols != nil { s.downloadProtocols.Close() } - if s.config.TorrentDirCloser != nil { - s.config.TorrentDirCloser.Close() //nolint - } if s.privateAPI != nil { shutdownDone := make(chan bool) go func() { diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index c7d1048c81f2d63d87265b63cf3e9ec322f63db9..68eee2e4aefd6d494cbde04301efd2e23df65a1f 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -18,7 +18,6 @@ package ethconfig import ( - "io" "math/big" "os" "os/user" @@ -27,17 +26,15 @@ import ( "strings" "time" - "github.com/anacrolix/torrent" "github.com/c2h5oh/datasize" "github.com/davecgh/go-spew/spew" "github.com/ledgerwatch/erigon-lib/common/dir" - "github.com/ledgerwatch/erigon/consensus/bor" - "github.com/ledgerwatch/erigon/params/networkname" - + "github.com/ledgerwatch/erigon/cmd/downloader/downloader/torrentcfg" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/aura" "github.com/ledgerwatch/erigon/consensus/aura/consensusconfig" + "github.com/ledgerwatch/erigon/consensus/bor" "github.com/ledgerwatch/erigon/consensus/clique" "github.com/ledgerwatch/erigon/consensus/db" "github.com/ledgerwatch/erigon/consensus/ethash" @@ -47,6 +44,7 @@ import ( "github.com/ledgerwatch/erigon/eth/gasprice" "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/params" + "github.com/ledgerwatch/erigon/params/networkname" "github.com/ledgerwatch/log/v3" ) @@ -175,10 +173,9 @@ type Config struct { BadBlockHash common.Hash // hash of the block marked as bad Snapshot Snapshot - Torrent *torrent.ClientConfig + Torrent *torrentcfg.Cfg - TorrentDirCloser io.Closer - SnapshotDir *dir.Rw + SnapshotDir *dir.Rw BlockDownloaderWindow int diff --git a/go.mod b/go.mod index fab7e6061aaff44153c43130a142021bf2fcf2bc..160d8eca3c57eca1b1158352b46fd157bbc2873e 100644 --- a/go.mod +++ b/go.mod @@ -16,15 +16,11 @@ require ( github.com/consensys/gnark-crypto v0.4.0 github.com/davecgh/go-spew v1.1.1 github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea - github.com/dlclark/regexp2 v1.4.0 // indirect - github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498 github.com/edsrzf/mmap-go v1.0.0 github.com/emicklei/dot v0.16.0 github.com/emirpasic/gods v1.12.0 - github.com/fatih/color v1.13.0 github.com/fjl/gencodec v0.0.0-20191126094850-e283372f291f github.com/fortytw2/leaktest v1.3.0 // indirect - github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/goccy/go-json v0.9.6 github.com/gofrs/flock v0.8.1 github.com/golang-jwt/jwt/v4 v4.3.0 @@ -41,7 +37,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible - github.com/ledgerwatch/erigon-lib v0.0.0-20220418023534-87a7a2124418 + github.com/ledgerwatch/erigon-lib v0.0.0-20220421013732-03014c56505e github.com/ledgerwatch/log/v3 v3.4.1 github.com/ledgerwatch/secp256k1 v1.0.0 github.com/magiconair/properties v1.8.6 // indirect diff --git a/go.sum b/go.sum index cb155f8256bbde89699c026cd64352867981a683..75757cecf8796cd05d79b72bc417c63e261edd82 100644 --- a/go.sum +++ b/go.sum @@ -311,11 +311,7 @@ github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea h1:j4317fAZh7X github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dlclark/regexp2 v1.4.0 h1:F1rxgk7p4uKjwIQxBs9oAXe5CqrXlCduYEJvrF4u93E= -github.com/dlclark/regexp2 v1.4.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= -github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498 h1:Y9vTBSsV4hSwPSj4bacAU/eSnV3dAxVpepaghAdhGoQ= -github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498/go.mod h1:Mw6PkjjMXWbTj+nnj4s3QPXq1jaT0s5pC0iFD4+BOAA= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= @@ -343,8 +339,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= -github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= -github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fjl/gencodec v0.0.0-20191126094850-e283372f291f h1:Y/gg/utVetS+WS6htAKCTDralkm/8hLIIUAtLFdbdQ8= github.com/fjl/gencodec v0.0.0-20191126094850-e283372f291f/go.mod h1:q+7Z5oyy8cvKF3TakcuihvQvBHFTnXjB+7UP1e2Q+1o= github.com/flanglet/kanzi-go v1.9.1-0.20211212184056-72dda96261ee h1:CaVlPeoz5kJQ+cAOV+ZDdlr3J2FmKyNkGu9LY+x7cDM= @@ -394,8 +388,6 @@ github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= -github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= -github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= @@ -612,8 +604,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220418023534-87a7a2124418 h1:qjyseSyJJqWIlHaoPeiVFPIwjvvozTrkkEQ9YohcxNs= -github.com/ledgerwatch/erigon-lib v0.0.0-20220418023534-87a7a2124418/go.mod h1:N0SNhcFu4P+uHJNOP3Di1RzLqKeql5RyjozseoqI69E= +github.com/ledgerwatch/erigon-lib v0.0.0-20220421013732-03014c56505e h1:/FPeXygFsEsBWnMSRQ8+r7t/6Ue+gY6yb9GykcriaJI= +github.com/ledgerwatch/erigon-lib v0.0.0-20220421013732-03014c56505e/go.mod h1:N0SNhcFu4P+uHJNOP3Di1RzLqKeql5RyjozseoqI69E= github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc= github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ= @@ -639,7 +631,6 @@ github.com/marten-seemann/qtls-go1-15 v0.1.0/go.mod h1:GyFwywLKkRt+6mfU99csTEY1j github.com/marten-seemann/qtls-go1-15 v0.1.1/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= github.com/matryer/moq v0.2.7/go.mod h1:kITsx543GOENm48TUAQyJ9+SAvFSr7iGQXPoth/VUBk= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= -github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.11 h1:nQ+aFkoE2TMGc0b68U2OKSexC+eq46+XwZzWXHRmPYs= github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=