good morning!!!!

Skip to content
Snippets Groups Projects
client.go 4.25 KiB
Newer Older
a's avatar
a committed
package http
a's avatar
rpc
a committed

import (
a's avatar
ok  
a committed
	"bytes"
a's avatar
rpc
a committed
	"context"
	"encoding/json"
	"errors"
a's avatar
a committed
	"fmt"
	"io"
a's avatar
ok  
a committed
	"net/http"
a's avatar
a committed
	"sync"
a's avatar
rpc
a committed
	"sync/atomic"

a's avatar
a committed
	"gfx.cafe/open/jrpc/pkg/codec"

a's avatar
a committed
	"gfx.cafe/util/go/bufpool"
Garet Halliday's avatar
Garet Halliday committed

	"gfx.cafe/open/jrpc/pkg/clientutil"
a's avatar
rpc
a committed
)

var (
	ErrClientQuit                = errors.New("client is closed")
	ErrNoResult                  = errors.New("no result in JSON-RPC response")
	ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow")
	errClientReconnected         = errors.New("client reconnected")
	errDead                      = errors.New("connection lost")
)

a's avatar
a committed
var _ codec.Conn = (*Client)(nil)
a's avatar
a committed

a's avatar
rpc
a committed
// Client represents a connection to an RPC server.
type Client struct {
a's avatar
ok  
a committed
	remote string
	c      *http.Client
a's avatar
rpc
a committed

a's avatar
ok  
a committed
	id atomic.Int64
a's avatar
a committed

	headers http.Header
a's avatar
a committed

a's avatar
a committed
	m       codec.Middlewares
a's avatar
a committed
	handler codec.Handler
a's avatar
a committed
	mu      sync.RWMutex
a's avatar
a committed
}

a's avatar
a committed
func (c *Client) Mount(h codec.Middleware) {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.m = append(c.m, h)
	c.handler = c.m.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {
		// do nothing on no handler
	})
a's avatar
a committed
}

func DialHTTP(target string) (*Client, error) {
	return Dial(nil, http.DefaultClient, target)
a's avatar
rpc
a committed
}

a's avatar
ok  
a committed
func Dial(ctx context.Context, client *http.Client, target string) (*Client, error) {
a's avatar
a committed
	if client == nil {
		client = http.DefaultClient
	}
a's avatar
a committed
	return &Client{remote: target, c: client, headers: http.Header{}}, nil
}

func (c *Client) SetHeader(key string, value string) {
a's avatar
a committed
	c.mu.Lock()
	defer c.mu.Unlock()
a's avatar
a committed
	c.headers.Set(key, value)
a's avatar
rpc
a committed
}

a's avatar
ok  
a committed
func (c *Client) Do(ctx context.Context, result any, method string, params any) error {
a's avatar
a committed
	req, err := codec.NewRequest(ctx, codec.NewId(c.id.Add(1)), method, params)
	if err != nil {
		return err
	}
a's avatar
a committed
	resp, err := c.post(req)
a's avatar
a committed
	if err != nil {
		return err
	}
a's avatar
ok  
a committed
	defer resp.Body.Close()
a's avatar
a committed
	if resp.StatusCode != 200 {
		b, _ := io.ReadAll(resp.Body)
		return &codec.HTTPError{
			StatusCode: resp.StatusCode,
			Status:     resp.Status,
			Body:       b,
		}
	}
a's avatar
a committed
	msg := clientutil.GetMessage()
	defer clientutil.PutMessage(msg)
a's avatar
ok  
a committed
	err = json.NewDecoder(resp.Body).Decode(&msg)
	if err != nil {
a's avatar
a committed
		return fmt.Errorf("decode json: %w", err)
a's avatar
ok  
a committed
	}
	if msg.Error != nil {
a's avatar
a committed
		return msg.Error
a's avatar
ok  
a committed
	}
a's avatar
a committed
	if result != nil && len(msg.Result) > 0 {
a's avatar
a committed
		err = json.Unmarshal(msg.Result, result)
a's avatar
ok  
a committed
		if err != nil {
			return err
		}
a's avatar
a committed
	}
a's avatar
ok  
a committed
	return nil
a's avatar
a committed
}

a's avatar
a committed
func (c *Client) Notify(ctx context.Context, method string, params any) error {
a's avatar
a committed
	req, err := codec.NewRequest(ctx, nil, method, params)
a's avatar
a committed
	if err != nil {
		return err
	}
a's avatar
a committed
	resp, err := c.post(req)
a's avatar
rpc
a committed
	if err != nil {
		return err
	}
a's avatar
a committed
	resp.Body.Close()
a's avatar
ok  
a committed
	return err
a's avatar
a committed
}

a's avatar
a committed
func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
	reqs := make([]*codec.Request, len(b))
Garet Halliday's avatar
Garet Halliday committed
	ids := make(map[int]int, len(b))
	for idx, v := range b {
a's avatar
a committed
		var rid *codec.ID
a's avatar
ok  
a committed
		if v.IsNotification {
a's avatar
a committed
		} else {
a's avatar
ok  
a committed
			id := int(c.id.Add(1))
Garet Halliday's avatar
Garet Halliday committed
			ids[idx] = id
a's avatar
a committed
			rid = codec.NewNumberIDPtr(int64(id))
		}
		req, err := codec.NewRequest(ctx, rid, v.Method, v.Params)
		if err != nil {
			return err
a's avatar
a committed
		}
a's avatar
a committed
		reqs = append(reqs, req)
a's avatar
a committed
	}
a's avatar
a committed
	dat, err := json.Marshal(reqs)
a's avatar
ok  
a committed
	if err != nil {
a's avatar
rpc
a committed
		return err
	}
a's avatar
a committed
	resp, err := c.postBuf(ctx, bytes.NewBuffer(dat))
a's avatar
rpc
a committed
	if err != nil {
a's avatar
ok  
a committed
		return err
a's avatar
rpc
a committed
	}
a's avatar
ok  
a committed
	defer resp.Body.Close()
a's avatar
rpc
a committed

a's avatar
ok  
a committed
	msgs := []*codec.Message{}
a's avatar
a committed
	for i := 0; i < len(ids); i++ {
		msg := clientutil.GetMessage()
		defer clientutil.PutMessage(msg)
		msgs = append(msgs, msg)
	}
a's avatar
ok  
a committed
	err = json.NewDecoder(resp.Body).Decode(&msgs)
a's avatar
rpc
a committed
	if err != nil {
		return err
	}
a's avatar
a committed
	clientutil.FillBatch(ids, msgs, b)
a's avatar
ok  
a committed
	return nil
a's avatar
rpc
a committed
}

a's avatar
ok  
a committed
func (c *Client) Close() error {
	return nil
a's avatar
rpc
a committed
}
a's avatar
a committed

func (c *Client) Closed() <-chan struct{} {
	return make(chan struct{})
}

func (c *Client) post(req *codec.Request) (*http.Response, error) {
	// TODO: use buffer for this
	buf := bufpool.GetStd()
	defer bufpool.PutStd(buf)
	buf.Reset()
	err := json.NewEncoder(buf).Encode(req)
	if err != nil {
		return nil, err
	}
	return c.postBuf(req.Context(), buf)
}

func (c *Client) postBuf(ctx context.Context, rd io.Reader) (*http.Response, error) {
	if ctx == nil {
		ctx = context.Background()
	}
	hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.remote, rd)
	if err != nil {
		return nil, err
	}
	func() {
		c.mu.RLock()
		defer c.mu.RUnlock()
		for k, v := range c.headers {
			for _, vv := range v {
				hreq.Header.Add(k, vv)
			}
		}
	}()
	hreq.Header.Add("Content-Type", "application/json")
	return c.c.Do(hreq)
}