From f49daafca3b7aed68f66ac69e6762bdebaff7f15 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Fri, 10 May 2024 17:59:59 -0500 Subject: [PATCH] test --- lib/fed/decoder.go | 210 ++++++++++++++++++++++++++++++++++----------- lib/fed/encoder.go | 161 ++++++++++++++++++++++------------ lib/fed/packet.go | 2 +- 3 files changed, 267 insertions(+), 106 deletions(-) diff --git a/lib/fed/decoder.go b/lib/fed/decoder.go index f2488768..3928cb02 100644 --- a/lib/fed/decoder.go +++ b/lib/fed/decoder.go @@ -1,11 +1,11 @@ package fed import ( - "bufio" "encoding/binary" "errors" "io" "math" + "strings" "gfx.cafe/gfx/pggat/lib/util/decorator" ) @@ -13,13 +13,15 @@ import ( type Decoder struct { noCopy decorator.NoCopy - reader bufio.Reader + buffer [defaultBufferSize]byte + bufferWrite int + bufferRead int + reader io.Reader - typ Type - len int - pos int - - buf [8]byte + packetType Type + packetLength int + packetPos int + decodeBuf [8]byte } func NewDecoder(r io.Reader) *Decoder { @@ -29,110 +31,182 @@ func NewDecoder(r io.Reader) *Decoder { } func (T *Decoder) Reset(r io.Reader) { - T.len = 0 - T.pos = 0 - T.reader.Reset(r) + T.packetLength = 0 + T.packetPos = 0 + T.bufferRead = 0 + T.bufferWrite = 0 + T.reader = r +} + +func (T *Decoder) refill() error { + n, err := T.reader.Read(T.buffer[T.bufferWrite:]) + T.bufferWrite += n + return err +} + +func (T *Decoder) discard(n int) error { + for n > 0 { + if T.bufferWrite != 0 { + count := max(n, T.bufferWrite-T.bufferRead) + T.bufferRead += count + n -= count + if T.bufferRead == T.bufferWrite { + T.bufferRead = 0 + T.bufferWrite = 0 + } else { + break + } + } + + if err := T.refill(); err != nil { + return err + } + } + + return nil +} + +func (T *Decoder) read(b []byte) (n int, err error) { + if T.bufferWrite != 0 { + n = copy(b, T.buffer[T.bufferRead:T.bufferWrite]) + T.bufferRead += n + if T.bufferRead == T.bufferWrite { + T.bufferRead = 0 + T.bufferWrite = 0 + } + return + } + + if len(b) > len(T.buffer) { + return T.reader.Read(b) + } + + // read into buffer first + err = T.refill() + n = copy(b, T.buffer[:T.bufferWrite]) + T.bufferRead = n + if T.bufferRead == T.bufferWrite { + T.bufferRead = 0 + T.bufferWrite = 0 + } + return +} + +func (T *Decoder) readFull(b []byte) (n int, err error) { + for n < len(b) { + var count int + count, err = T.read(b[n:]) + n += count + if err != nil { + if err == io.EOF && n != 0 { + err = io.ErrUnexpectedEOF + } + return + } + } + return } func (T *Decoder) Read(b []byte) (n int, err error) { - rem := T.len - T.pos + rem := T.packetLength - T.packetPos if rem == 0 { err = io.EOF return } if len(b) > rem { - n, err = T.reader.Read(b[:rem]) + n, err = T.read(b[:rem]) } else { - n, err = T.reader.Read(b) + n, err = T.read(b) } - T.pos += n + T.packetPos += n return } func (T *Decoder) Buffered() int { - return T.reader.Buffered() + return T.bufferWrite - T.bufferRead } var ErrOverranReadBuffer = errors.New("overran read buffer") func (T *Decoder) ReadByte() (byte, error) { - rem := T.len - T.pos + rem := T.packetLength - T.packetPos if rem < 0 { return 0, ErrOverranReadBuffer } else if rem > 0 { - _, err := T.reader.Discard(rem) - if err != nil { + if err := T.discard(rem); err != nil { return 0, err } } - T.typ = 0 - T.len = 0 - T.pos = 0 - return T.reader.ReadByte() + T.packetType = 0 + T.packetLength = 0 + T.packetPos = 0 + if _, err := T.readFull(T.decodeBuf[:1]); err != nil { + return 0, err + } + return T.decodeBuf[0], nil } func (T *Decoder) Next(typed bool) error { - rem := T.len - T.pos + rem := T.packetLength - T.packetPos if rem < 0 { return ErrOverranReadBuffer } else if rem > 0 { - _, err := T.reader.Discard(rem) - if err != nil { + if err := T.discard(rem); err != nil { return err } } var err error if typed { - _, err = io.ReadFull(&T.reader, T.buf[:5]) + _, err = T.read(T.decodeBuf[:5]) } else { - T.buf[0] = 0 - _, err = io.ReadFull(&T.reader, T.buf[1:5]) + T.decodeBuf[0] = 0 + _, err = T.read(T.decodeBuf[1:5]) } if err != nil { return err } - T.typ = Type(T.buf[0]) - T.len = int(binary.BigEndian.Uint32(T.buf[1:5])) - 4 - T.pos = 0 + T.packetType = Type(T.decodeBuf[0]) + T.packetLength = int(binary.BigEndian.Uint32(T.decodeBuf[1:5])) - 4 + T.packetPos = 0 return nil } func (T *Decoder) Type() Type { - return T.typ + return T.packetType } func (T *Decoder) Length() int { - return T.len + return T.packetLength } func (T *Decoder) Position() int { - return T.pos + return T.packetPos } func (T *Decoder) Uint8() (uint8, error) { - v, err := T.reader.ReadByte() - T.pos += 1 - return v, err + _, err := T.readFull(T.decodeBuf[:1]) + T.packetPos += 1 + return T.decodeBuf[0], err } func (T *Decoder) Uint16() (uint16, error) { - _, err := io.ReadFull(&T.reader, T.buf[:2]) - T.pos += 2 - return binary.BigEndian.Uint16(T.buf[:2]), err + _, err := T.readFull(T.decodeBuf[:2]) + T.packetPos += 2 + return binary.BigEndian.Uint16(T.decodeBuf[:2]), err } func (T *Decoder) Uint32() (uint32, error) { - _, err := io.ReadFull(&T.reader, T.buf[:4]) - T.pos += 4 - return binary.BigEndian.Uint32(T.buf[:4]), err + _, err := T.readFull(T.decodeBuf[:4]) + T.packetPos += 4 + return binary.BigEndian.Uint32(T.decodeBuf[:4]), err } func (T *Decoder) Uint64() (uint64, error) { - _, err := io.ReadFull(&T.reader, T.buf[:8]) - T.pos += 8 - return binary.BigEndian.Uint64(T.buf[:8]), err + _, err := T.readFull(T.decodeBuf[:8]) + T.packetPos += 8 + return binary.BigEndian.Uint64(T.decodeBuf[:8]), err } func (T *Decoder) Int8() (int8, error) { @@ -166,10 +240,44 @@ func (T *Decoder) Float64() (float64, error) { } func (T *Decoder) String() (string, error) { - s, err := T.reader.ReadString(0) - if err != nil { - return "", err + if T.bufferWrite == 0 { + if err := T.refill(); err != nil { + return "", err + } + } + + for i, v := range T.buffer[T.bufferRead:T.bufferWrite] { + if v == 0 { + res := string(T.buffer[T.bufferRead : T.bufferRead+i-1]) + T.bufferRead += i + if T.bufferRead == T.bufferWrite { + T.bufferRead = 0 + T.bufferWrite = 0 + } + T.packetPos += i + return res, nil + } + } + + var builder strings.Builder + builder.Write(T.buffer[T.bufferRead:T.bufferWrite]) + for { + if err := T.refill(); err != nil { + T.packetPos += builder.Len() + return builder.String(), err + } + + for i, v := range T.buffer[T.bufferRead:T.bufferWrite] { + if v == 0 { + builder.Write(T.buffer[T.bufferRead : T.bufferRead+i-1]) + T.bufferRead += i + if T.bufferRead == T.bufferWrite { + T.bufferRead = 0 + T.bufferWrite = 0 + } + T.packetPos += builder.Len() + 1 + return builder.String(), nil + } + } } - T.pos += len(s) - return s[:len(s)-1], nil } diff --git a/lib/fed/encoder.go b/lib/fed/encoder.go index 4c78b6bc..a4a22124 100644 --- a/lib/fed/encoder.go +++ b/lib/fed/encoder.go @@ -1,7 +1,6 @@ package fed import ( - "bufio" "encoding/binary" "errors" "io" @@ -10,26 +9,24 @@ import ( "gfx.cafe/gfx/pggat/lib/util/decorator" ) +const defaultBufferSize = 16 * 1024 + type Encoder struct { noCopy decorator.NoCopy - writer bufio.Writer - - typ Type - len int - pos int + buffer [defaultBufferSize]byte + bufferPos int + writer io.Writer - buf [8]byte + packetType Type + packetLength int + packetPos int } -const defaultBufferSize = 16 * 1024 - func NewEncoder(w io.Writer) *Encoder { - e := new(Encoder) - bw := bufio.NewWriterSize(w, defaultBufferSize) - e.writer = *bw - e.Reset(w) - return e + return &Encoder{ + writer: w, + } } var ( @@ -37,86 +34,138 @@ var ( ) func (T *Encoder) Reset(w io.Writer) { - T.writer.Reset(w) + T.bufferPos = 0 + T.writer = w } -func (T *Encoder) ReadFrom(r io.Reader) (int64, error) { - return T.writer.ReadFrom(r) +func (T *Encoder) ReadFrom(r *Decoder) (int, error) { + var n int + for { + if T.bufferPos >= len(T.buffer) { + if err := T.Flush(); err != nil { + return n, err + } + } + count, err := r.Read(T.buffer[T.bufferPos:]) + T.bufferPos += count + n += count + if err == io.EOF { + break + } + if err != nil { + return n, err + } + } + return n, nil } func (T *Encoder) Flush() error { - return T.writer.Flush() + pos := T.bufferPos + T.bufferPos = 0 + _, err := T.writer.Write(T.buffer[:pos]) + return err +} + +func (T *Encoder) writeByte(b byte) error { + if T.bufferPos+1 > len(T.buffer) { + if err := T.Flush(); err != nil { + return err + } + } + T.buffer[T.bufferPos] = b + T.bufferPos++ + return nil } func (T *Encoder) WriteByte(b byte) error { - if T.pos != T.len { + if T.packetPos != T.packetLength { return ErrWrongNumberOfBytes } - T.typ = 0 - T.len = 0 - T.pos = 0 - return T.writer.WriteByte(b) + T.packetType = 0 + T.packetLength = 0 + T.packetPos = 0 + return T.writeByte(b) } func (T *Encoder) Next(typ Type, length int) error { - if T.pos != T.len { + if T.packetPos != T.packetLength { return ErrWrongNumberOfBytes } if typ != 0 { - if err := T.writer.WriteByte(byte(typ)); err != nil { + if err := T.writeByte(byte(typ)); err != nil { return err } } - binary.BigEndian.PutUint32(T.buf[:4], uint32(length+4)) - _, err := T.writer.Write(T.buf[:4]) + if T.bufferPos+4 > len(T.buffer) { + if err := T.Flush(); err != nil { + return err + } + } + binary.BigEndian.PutUint32(T.buffer[T.bufferPos:T.bufferPos+4], uint32(length+4)) + T.bufferPos += 4 - T.typ = typ - T.len = length - T.pos = 0 + T.packetType = typ + T.packetLength = length + T.packetPos = 0 - return err + return nil } func (T *Encoder) Type() Type { - return T.typ + return T.packetType } func (T *Encoder) Length() int { - return T.len + return T.packetLength } func (T *Encoder) Position() int { - return T.pos + return T.packetPos } func (T *Encoder) Uint8(v uint8) error { - err := T.writer.WriteByte(v) - T.pos += 1 + err := T.writeByte(v) + T.packetPos += 1 return err } func (T *Encoder) Uint16(v uint16) error { - binary.BigEndian.PutUint16(T.buf[:2], v) - _, err := T.writer.Write(T.buf[:2]) - T.pos += 2 - return err + if T.bufferPos+2 > len(T.buffer) { + if err := T.Flush(); err != nil { + return err + } + } + binary.BigEndian.PutUint16(T.buffer[T.bufferPos:T.bufferPos+2], v) + T.bufferPos += 2 + T.packetPos += 2 + return nil } func (T *Encoder) Uint32(v uint32) error { - binary.BigEndian.PutUint32(T.buf[:4], v) - _, err := T.writer.Write(T.buf[:4]) - T.pos += 4 - return err + if T.bufferPos+4 > len(T.buffer) { + if err := T.Flush(); err != nil { + return err + } + } + binary.BigEndian.PutUint32(T.buffer[T.bufferPos:T.bufferPos+4], v) + T.bufferPos += 4 + T.packetPos += 4 + return nil } func (T *Encoder) Uint64(v uint64) error { - binary.BigEndian.PutUint64(T.buf[:8], v) - _, err := T.writer.Write(T.buf[:8]) - T.pos += 8 - return err + if T.bufferPos+8 > len(T.buffer) { + if err := T.Flush(); err != nil { + return err + } + } + binary.BigEndian.PutUint64(T.buffer[T.bufferPos:T.bufferPos+8], v) + T.bufferPos += 8 + T.packetPos += 8 + return nil } func (T *Encoder) Int8(v int8) error { @@ -144,11 +193,15 @@ func (T *Encoder) Float64(v float64) error { } func (T *Encoder) String(v string) error { - n, err := T.writer.WriteString(v) - if err != nil { - return err + for len(v) > 0 { + n := copy(T.buffer[T.bufferPos:], v) + T.bufferPos += n + if T.bufferPos >= len(T.buffer) { + if err := T.Flush(); err != nil { + return err + } + } + v = v[n:] } - err = T.writer.WriteByte(0) - T.pos += n + 1 - return err + return T.writeByte(0) } diff --git a/lib/fed/packet.go b/lib/fed/packet.go index 8f69e98a..47e23949 100644 --- a/lib/fed/packet.go +++ b/lib/fed/packet.go @@ -32,7 +32,7 @@ func (T PendingPacket) WriteTo(encoder *Encoder) error { return err } } - encoder.pos += count + encoder.packetPos += count return nil } -- GitLab