diff --git a/cmd/sentry/download/downloader.go b/cmd/sentry/download/downloader.go index 7478e3b84f7f4935b3a5e62cd19b2f2a34efd898..e187c740763dad2b2ef4daf940e75403fc89dd58 100644 --- a/cmd/sentry/download/downloader.go +++ b/cmd/sentry/download/downloader.go @@ -68,7 +68,11 @@ func RecvUploadMessageLoop(ctx context.Context, default: } - SentryHandshake(ctx, sentry, cs) + if err := SentryHandshake(ctx, sentry, cs); err != nil { + log.Error("[RecvUploadMessage] sentry not ready yet", "err", err) + time.Sleep(time.Second) + continue + } if err := RecvUploadMessage(ctx, sentry, cs.HandleInboundMessage, wg); err != nil { log.Error("[RecvUploadMessage]", "err", err) } @@ -143,7 +147,11 @@ func RecvMessageLoop(ctx context.Context, default: } - SentryHandshake(ctx, sentry, cs) + if err := SentryHandshake(ctx, sentry, cs); err != nil { + log.Error("[RecvMessage] sentry not ready yet", "err", err) + time.Sleep(time.Second) + continue + } if err := RecvMessage(ctx, sentry, cs.HandleInboundMessage, wg); err != nil { log.Error("[RecvMessage]", "err", err) @@ -216,14 +224,14 @@ func RecvMessage( } } -func SentryHandshake(ctx context.Context, sentry remote.SentryClient, controlServer *ControlServerImpl) { +func SentryHandshake(ctx context.Context, sentry remote.SentryClient, controlServer *ControlServerImpl) error { _, err := sentry.SetStatus(ctx, makeStatusData(controlServer), grpc.WaitForReady(true)) if err != nil { if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { - return + return nil } - log.Error("sentry not ready yet", "err", err) } + return nil } type ControlServerImpl struct { diff --git a/cmd/sentry/download/sentry.go b/cmd/sentry/download/sentry.go index 9db1ff1ac7603af455d1eb48c3957f3d090496f1..634ff3c5f21dd155b5c749271cb67d3fc29a2108 100644 --- a/cmd/sentry/download/sentry.go +++ b/cmd/sentry/download/sentry.go @@ -816,8 +816,7 @@ func (ss *SentryServerImpl) SetStatus(_ context.Context, statusData *proto_sentr reply.Protocol = proto_sentry.Protocol_ETH65 } - init := ss.statusData == nil - if init { + if ss.P2pServer == nil { var err error if !ss.p2p.NoDiscovery { if len(ss.discoveryDNS) == 0 { @@ -840,7 +839,6 @@ func (ss *SentryServerImpl) SetStatus(_ context.Context, statusData *proto_sentr return reply, fmt.Errorf("could not start server: %w", err) } } - genesisHash = gointerfaces.ConvertH256ToHash(statusData.ForkData.Genesis) ss.P2pServer.LocalNode().Set(eth.CurrentENREntryFromForks(statusData.ForkData.Forks, genesisHash, statusData.MaxBlock)) ss.statusData = statusData return reply, nil diff --git a/turbo/txpool/p2p.go b/turbo/txpool/p2p.go index 4335336df497b178485f25ae15d4594dce3b067c..55e2b280f34514efddf550b5a5a51424625c7f51 100644 --- a/turbo/txpool/p2p.go +++ b/turbo/txpool/p2p.go @@ -7,14 +7,14 @@ import ( "fmt" "io" "math/rand" - "runtime/debug" - "strings" "sync" + "time" "github.com/ledgerwatch/erigon-lib/gointerfaces" proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" "github.com/ledgerwatch/erigon/cmd/sentry/download" "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/eth/fetcher" "github.com/ledgerwatch/erigon/eth/protocols/eth" @@ -258,8 +258,14 @@ func RecvTxMessageLoop(ctx context.Context, default: } - download.SentryHandshake(ctx, sentry, cs) - RecvTxMessage(ctx, sentry, handleInboundMessage, wg) + if err := download.SentryHandshake(ctx, sentry, cs); err != nil { + log.Error("[RecvTxMessage] sentry not ready yet", "err", err) + time.Sleep(time.Second) + continue + } + if err := RecvTxMessage(ctx, sentry, handleInboundMessage, wg); err != nil { + log.Error("[RecvTxMessage]", "err", err) + } } } @@ -269,20 +275,8 @@ func RecvTxMessage(ctx context.Context, sentry remote.SentryClient, handleInboundMessage func(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry remote.SentryClient) error, wg *sync.WaitGroup, -) { - // avoid crash because Erigon's core does many things - defer func() { - if r := recover(); r != nil { // just log is enough - panicReplacer := strings.NewReplacer("\n", " ", "\t", "", "\r", "") - stack := panicReplacer.Replace(string(debug.Stack())) - switch typed := r.(type) { - case error: - log.Error("[RecvTxMessage] fail", "err", fmt.Errorf("%w, trace: %s", typed, stack)) - default: - log.Error("[RecvTxMessage] fail", "err", fmt.Errorf("%w, trace: %s", typed, stack)) - } - } - }() +) (err error) { + defer func() { err = debug.ReportPanicAndRecover() }() // avoid crash because Erigon's core does many things streamCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -309,11 +303,11 @@ func RecvTxMessage(ctx context.Context, if errors.Is(err, io.EOF) { return } - log.Error("ReceiveTx messages failed", "error", err) - return + return err } - for req, err := stream.Recv(); ; req, err = stream.Recv() { + var req *proto_sentry.InboundMessage + for req, err = stream.Recv(); ; req, err = stream.Recv() { if err != nil { select { case <-ctx.Done(): @@ -326,8 +320,7 @@ func RecvTxMessage(ctx context.Context, if errors.Is(err, io.EOF) { return } - log.Error("[RecvTxMessage] Sentry disconnected", "error", err) - return + return err } if req == nil { return