From 4fb2a14756df7258a2fad6dcc3de4456d8392a60 Mon Sep 17 00:00:00 2001 From: "alex.sharov" <AskAlexSharov@gmail.com> Date: Mon, 4 Apr 2022 21:07:02 +0700 Subject: [PATCH] add readahead to segment uncompress pipe --- turbo/app/snapshots.go | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index 7f8f8056ba..8c42ae3736 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -165,23 +165,28 @@ func doUncompress(cliCtx *cli.Context) error { if err != nil { return err } - wr := bufio.NewWriterSize(os.Stdout, etl.BufIOSize) - g := decompressor.MakeGetter() - var buf []byte - var EOL = []byte("\n") - for g.HasNext() { - buf, _ := g.Next(buf) - if _, err := wr.Write(buf); err != nil { - return err - } - if _, err := wr.Write(EOL); err != nil { - return err - } - select { - case <-ctx.Done(): - return ctx.Err() - default: + if err := decompressor.WithReadAhead(func() error { + wr := bufio.NewWriterSize(os.Stdout, etl.BufIOSize) + g := decompressor.MakeGetter() + var buf []byte + var EOL = []byte("\n") + for g.HasNext() { + buf, _ := g.Next(buf) + if _, err := wr.Write(buf); err != nil { + return err + } + if _, err := wr.Write(EOL); err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } } + return nil + }); err != nil { + return err } _ = ctx return nil -- GitLab