good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit 4394be40 authored by a's avatar a
Browse files

noflushwriter

parent f8714cc4
Branches
Tags
No related merge requests found
Pipeline #32913 passed
...@@ -32,6 +32,7 @@ func flushIfFlusher(w io.Writer) error { ...@@ -32,6 +32,7 @@ func flushIfFlusher(w io.Writer) error {
return nil return nil
} }
// sends a flush in order to send an empty payload
func (m *MessageStream) Flush(ctx context.Context) error { func (m *MessageStream) Flush(ctx context.Context) error {
err := m.mu.Acquire(ctx, 1) err := m.mu.Acquire(ctx, 1)
if err != nil { if err != nil {
...@@ -110,6 +111,14 @@ type BatchWriter struct { ...@@ -110,6 +111,14 @@ type BatchWriter struct {
isNotFirst bool isNotFirst bool
} }
type writer struct {
w io.Writer
}
func (w *writer) Write(p []byte) (n int, err error) {
return w.w.Write(p)
}
// Start writing a batch to the stream. this function acquires the lock // Start writing a batch to the stream. this function acquires the lock
// caller MUST call Close() on the BatchWriter iff err == nil // caller MUST call Close() on the BatchWriter iff err == nil
func (m *MessageStream) NewBatch(ctx context.Context) (*BatchWriter, error) { func (m *MessageStream) NewBatch(ctx context.Context) (*BatchWriter, error) {
...@@ -129,7 +138,7 @@ func (m *MessageStream) NewBatch(ctx context.Context) (*BatchWriter, error) { ...@@ -129,7 +138,7 @@ func (m *MessageStream) NewBatch(ctx context.Context) (*BatchWriter, error) {
return &BatchWriter{ return &BatchWriter{
w: m.w, w: m.w,
ms: &MessageStream{ ms: &MessageStream{
w: io.MultiWriter(m.w), w: &writer{m.w},
}, },
mu: m.mu, mu: m.mu,
}, nil }, nil
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment