good morning!!!!

Skip to content
Snippets Groups Projects
Commit 6d92fdc0 authored by Bas van Kervel's avatar Bas van Kervel
Browse files

added support for batch requests

parent 22c7ce01
No related branches found
No related tags found
No related merge requests found
......@@ -12,7 +12,7 @@ type Codec int
// (de)serialization support for rpc interface
type ApiCoder interface {
// Parse message to request from underlying stream
ReadRequest() (*shared.Request, error)
ReadRequest() ([]*shared.Request, bool, error)
// Parse response message from underlying stream
ReadResponse() (interface{}, error)
// Encode response to encoded form in underlying stream
......
......@@ -8,33 +8,53 @@ import (
)
const (
MAX_RESPONSE_SIZE = 64 * 1024
MAX_REQUEST_SIZE = 1024 * 1024
MAX_RESPONSE_SIZE = 1024 * 1024
)
// Json serialization support
type JsonCodec struct {
c net.Conn
d *json.Decoder
e *json.Encoder
buffer []byte
bytesInBuffer int
}
// Create new JSON coder instance
func NewJsonCoder(conn net.Conn) ApiCoder {
return &JsonCodec{
c: conn,
d: json.NewDecoder(conn),
e: json.NewEncoder(conn),
buffer: make([]byte, MAX_REQUEST_SIZE),
bytesInBuffer: 0,
}
}
// Serialize obj to JSON and write it to conn
func (self *JsonCodec) ReadRequest() (*shared.Request, error) {
req := shared.Request{}
err := self.d.Decode(&req)
func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) {
n, err := self.c.Read(self.buffer[self.bytesInBuffer:])
if err != nil {
self.bytesInBuffer = 0
return nil, false, err
}
self.bytesInBuffer += n
singleRequest := shared.Request{}
err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &singleRequest)
if err == nil {
return &req, nil
self.bytesInBuffer = 0
requests := make([]*shared.Request, 1)
requests[0] = &singleRequest
return requests, false, nil
}
return nil, err
requests = make([]*shared.Request, 0)
err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &requests)
if err == nil {
self.bytesInBuffer = 0
return requests, true, nil
}
return nil, false, err
}
func (self *JsonCodec) ReadResponse() (interface{}, error) {
......@@ -66,7 +86,24 @@ func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) {
// Parse JSON data from conn to obj
func (self *JsonCodec) WriteResponse(res interface{}) error {
return self.e.Encode(&res)
data, err := json.Marshal(res)
if err != nil {
self.c.Close()
return err
}
bytesWritten := 0
for bytesWritten < len(data) {
n, err := self.c.Write(data[bytesWritten:])
if err != nil {
self.c.Close()
return err
}
bytesWritten += n
}
return nil
}
// Close decoder and encoder
......
......@@ -47,7 +47,7 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) {
codec := c.New(conn)
for {
req, err := codec.ReadRequest()
requests, isBatch, err := codec.ReadRequest()
if err == io.EOF {
codec.Close()
return
......@@ -57,15 +57,35 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) {
return
}
var rpcResponse interface{}
res, err := api.Execute(req)
if isBatch {
responses := make([]*interface{}, len(requests))
responseCount := 0
for _, req := range requests {
res, err := api.Execute(req)
if req.Id != nil {
rpcResponse := shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err)
responses[responseCount] = rpcResponse
responseCount += 1
}
}
rpcResponse = shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err)
err = codec.WriteResponse(rpcResponse)
if err != nil {
glog.V(logger.Error).Infof("comms send err - %v\n", err)
codec.Close()
return
err = codec.WriteResponse(responses[:responseCount])
if err != nil {
glog.V(logger.Error).Infof("comms send err - %v\n", err)
codec.Close()
return
}
} else {
var rpcResponse interface{}
res, err := api.Execute(requests[0])
rpcResponse = shared.NewRpcResponse(requests[0].Id, requests[0].Jsonrpc, res, err)
err = codec.WriteResponse(rpcResponse)
if err != nil {
glog.V(logger.Error).Infof("comms send err - %v\n", err)
codec.Close()
return
}
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment