good morning!!!!

Skip to content
Snippets Groups Projects
responsewriter.go 1.57 KiB
Newer Older
a's avatar
a committed
package server

import (
a's avatar
ok  
a committed
	"context"
	"sync"
a's avatar
a committed

a's avatar
a committed
	"gfx.cafe/open/jrpc/pkg/jsonrpc"
a's avatar
a committed
)

a's avatar
a committed
// 16mb... should be more than enough for any batch.
a's avatar
ok  
a committed
// you shouldn't be batching more than this. really, you shouldn't be using batching at all.
a's avatar
a committed
// TODO: make this configurable
const maxBatchSizeBytes = 1024 * 1024 * 1024 * 16

a's avatar
a committed
var _ jsonrpc.ResponseWriter = (*streamingRespWriter)(nil)
a's avatar
a committed

a's avatar
a committed
// streamingRespWriter is NOT thread safe
type streamingRespWriter struct {
a's avatar
ok  
a committed
	cr  *callResponder
a's avatar
a committed
	msg *jsonrpc.Message
a's avatar
ok  
a committed
	ctx context.Context
a's avatar
a committed

a's avatar
a committed
	err error
a's avatar
a committed

a's avatar
ok  
a committed
	sendCalled bool

	mu sync.Mutex
a's avatar
a committed
}

a's avatar
a committed
func (c *streamingRespWriter) Send(v any, e error) (err error) {
a's avatar
ok  
a committed
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.msg.ID == nil {
a's avatar
a committed
		return jsonrpc.ErrCantSendNotification
a's avatar
ok  
a committed
	}
	if c.sendCalled {
a's avatar
a committed
		return jsonrpc.ErrSendAlreadyCalled
a's avatar
ok  
a committed
	}
	c.sendCalled = true
a's avatar
a committed
	ce := &callEnv{
a's avatar
a committed
		err: c.err,
		id:  c.msg.ID,
a's avatar
a committed
	}
	// only override error if not already set
	if ce.err == nil {
		ce.err = e
a's avatar
ok  
a committed
	}
a's avatar
a committed
	// only set value if value is not nil
	if v != nil {
		ce.v = v
a's avatar
ok  
a committed
	}
	err = c.cr.mu.Acquire(c.ctx, 1)
	if err != nil {
		return err
	}
	defer c.cr.mu.Release(1)
a's avatar
a committed
	if c.err != nil {
		e = c.err
	}
a's avatar
a committed
	if err = c.cr.send(c.ctx, ce); err != nil {
a's avatar
a committed
		return err
	}
a's avatar
a committed
	if err = c.cr.remote.Flush(); err != nil {
a's avatar
ok  
a committed
		return err
a's avatar
a committed
	}
	return nil
}

a's avatar
a committed
func (c *streamingRespWriter) Notify(method string, v any) error {
a's avatar
ok  
a committed
	err := c.cr.mu.Acquire(c.ctx, 1)
	if err != nil {
		return err
	}
	defer c.cr.mu.Release(1)
	err = c.cr.notify(c.ctx, &notifyEnv{
a's avatar
a committed
		method: method,
		dat:    v,
a's avatar
a  
a committed
	})
a's avatar
ok  
a committed
	if err != nil {
		return err
	}
	err = c.cr.remote.Flush()
	if err != nil {
		return err
	}
	return nil
a's avatar
a committed
}