good morning!!!!

Skip to content
Snippets Groups Projects
codec.go 2.41 KiB
Newer Older
a's avatar
a committed
package rdwr
a's avatar
ipc
a committed

import (
	"bufio"
a's avatar
a committed
	"bytes"
a's avatar
ipc
a committed
	"context"
	"io"
Garet Halliday's avatar
Garet Halliday committed
	"sync"
a's avatar
ipc
a committed

a's avatar
a committed
	"github.com/goccy/go-json"
Garet Halliday's avatar
Garet Halliday committed

	"gfx.cafe/open/jrpc/pkg/codec"
a's avatar
a committed
	"gfx.cafe/open/jrpc/pkg/serverutil"
a's avatar
ipc
a committed
)

type Codec struct {
	ctx context.Context
	cn  func()

Garet Halliday's avatar
Garet Halliday committed
	rd     io.Reader
	wrLock sync.Mutex
a's avatar
a committed
	wr     *bytes.Buffer
	w      io.Writer
a's avatar
a committed
	msgs   chan *serverutil.Bundle
a's avatar
ipc
a committed
}

a's avatar
a committed
func NewCodec(rd io.Reader, wr io.Writer, onError func(error)) *Codec {
a's avatar
ipc
a committed
	ctx, cn := context.WithCancel(context.TODO())
a's avatar
a committed
	c := &Codec{
a's avatar
ipc
a committed
		ctx:  ctx,
		cn:   cn,
		rd:   bufio.NewReader(rd),
a's avatar
a committed
		wr:   new(bytes.Buffer),
		w:    wr,
a's avatar
a committed
		msgs: make(chan *serverutil.Bundle, 8),
a's avatar
ipc
a committed
	}
a's avatar
a committed
	go func() {
		err := c.listen()
		if err != nil && onError != nil {
			onError(err)
		}
	}()
a's avatar
a committed
	return c
}

a's avatar
a committed
func (c *Codec) listen() error {
a's avatar
a committed
	var msg json.RawMessage
a's avatar
ok  
a committed
	dec := json.NewDecoder(c.rd)
a's avatar
a committed
	for {
a's avatar
a committed
		// reading a message
a's avatar
ok  
a committed
		err := dec.Decode(&msg)
a's avatar
a committed
		if err != nil {
			c.cn()
a's avatar
a committed
			return err
a's avatar
a committed
		}
a's avatar
a committed
		c.msgs <- serverutil.ParseBundle(msg)
		msg = msg[:0]
a's avatar
a committed
	}
a's avatar
ipc
a committed
}

// gets the peer info
func (c *Codec) PeerInfo() codec.PeerInfo {
	return codec.PeerInfo{
		Transport:  "ipc",
		RemoteAddr: "",
		HTTP:       codec.HttpInfo{},
	}
}

a's avatar
a committed
func (c *Codec) ReadBatch(ctx context.Context) ([]*codec.Message, bool, error) {
a's avatar
ipc
a committed
	select {
	case ans := <-c.msgs:
a's avatar
a committed
		return ans.Messages, ans.Batch, nil
a's avatar
ipc
a committed
	case <-ctx.Done():
a's avatar
a committed
		return nil, false, ctx.Err()
a's avatar
ipc
a committed
	case <-c.ctx.Done():
a's avatar
a committed
		return nil, false, c.ctx.Err()
a's avatar
ipc
a committed
	}
}

// closes the connection
func (c *Codec) Close() error {
	c.cn()
	return nil
}

func (c *Codec) Write(p []byte) (n int, err error) {
Garet Halliday's avatar
Garet Halliday committed
	c.wrLock.Lock()
	defer c.wrLock.Unlock()
a's avatar
ipc
a committed
	return c.wr.Write(p)
}

func (c *Codec) Flush() (err error) {
Garet Halliday's avatar
Garet Halliday committed
	c.wrLock.Lock()
	defer c.wrLock.Unlock()
a's avatar
a committed
	if c.wr.Len() == 0 {
		return nil
	}
	defer c.wr.Reset()
	err = c.wr.WriteByte('\n')
	if err != nil {
		return err
	}
	_, err = c.wr.WriteTo(c.w)
	if err != nil {
		return err
a's avatar
a committed
	}
	return nil
a's avatar
ipc
a committed
}

// Closed returns a channel which is closed when the connection is closed.
func (c *Codec) Closed() <-chan struct{} {
	return c.ctx.Done()
}

// RemoteAddr returns the peer address of the connection.
func (c *Codec) RemoteAddr() string {
	return ""
}

a's avatar
a committed
// Dialrdwr attaches an in-process connection to the given RPC server.
Garet Halliday's avatar
Garet Halliday committed
// func Dialrdwr(handler *Server) *Client {
a's avatar
ipc
a committed
//	initctx := context.Background()
//	c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) {
//		p1, p2 := net.Pipe()
//		go handler.ServeCodec(NewCodec(p1))
//		return NewCodec(p2), nil
//	})
//	return c
Garet Halliday's avatar
Garet Halliday committed
// }