Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package inproc
import (
"bufio"
"context"
"encoding/json"
"io"
"gfx.cafe/open/jrpc/pkg/codec"
)
type Codec struct {
ctx context.Context
cn func()
rd io.Reader
wr *bufio.Writer
msgs chan json.RawMessage
}
func NewCodec(rd io.Reader, wr io.Writer) *Codec {
ctx, cn := context.WithCancel(context.TODO())
return &Codec{
ctx: ctx,
cn: cn,
rd: bufio.NewReader(rd),
wr: bufio.NewWriter(wr),
msgs: make(chan json.RawMessage, 8),
}
}
// gets the peer info
func (c *Codec) PeerInfo() codec.PeerInfo {
return codec.PeerInfo{
Transport: "ipc",
RemoteAddr: "",
HTTP: codec.HttpInfo{},
}
}
// json.RawMessage can be an array of requests. if it is, then it is a batch request
func (c *Codec) ReadBatch(ctx context.Context) (msgs json.RawMessage, err error) {
select {
case ans := <-c.msgs:
return ans, nil
case <-ctx.Done():
return nil, ctx.Err()
case <-c.ctx.Done():
return nil, c.ctx.Err()
}
}
// closes the connection
func (c *Codec) Close() error {
c.cn()
return nil
}
func (c *Codec) Write(p []byte) (n int, err error) {
return c.wr.Write(p)
}
func (c *Codec) Flush() (err error) {
return c.wr.Flush()
}
// Closed returns a channel which is closed when the connection is closed.
func (c *Codec) Closed() <-chan struct{} {
return c.ctx.Done()
}
// RemoteAddr returns the peer address of the connection.
func (c *Codec) RemoteAddr() string {
return ""
}
// DialInProc attaches an in-process connection to the given RPC server.
//func DialInProc(handler *Server) *Client {
// initctx := context.Background()
// c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) {
// p1, p2 := net.Pipe()
// go handler.ServeCodec(NewCodec(p1))
// return NewCodec(p2), nil
// })
// return c
//}