good morning!!!!

Skip to content
Snippets Groups Projects
codec.go 5.11 KiB
Newer Older
a's avatar
a committed
package http

import (
a's avatar
a committed
	"bufio"
a's avatar
a committed
	"context"
	"encoding/base64"
	"errors"
a's avatar
a committed
	"fmt"
a's avatar
a committed
	"io"
a's avatar
a committed
	"mime"
a's avatar
a committed
	"net/http"
	"net/url"
a's avatar
a committed
	"strings"
a's avatar
a committed

	"gfx.cafe/open/jrpc/pkg/codec"
a's avatar
a committed
	"gfx.cafe/open/jrpc/pkg/serverutil"
a's avatar
a committed
)

a's avatar
a committed
var _ codec.ReaderWriter = (*Codec)(nil)

a's avatar
ok  
a committed
// Reusable codec. use Reset()
a's avatar
a committed
type Codec struct {
	ctx context.Context
	cn  func()

a's avatar
a committed
	r     *http.Request
	w     http.ResponseWriter
	wr    *bufio.Writer
a's avatar
a committed
	msgs  chan *serverutil.Bundle
a's avatar
a committed
	errCh chan httpError

	i codec.PeerInfo
}

type httpError struct {
	code int
	err  error
a's avatar
a committed
}

a's avatar
a committed
func NewCodec(w http.ResponseWriter, r *http.Request) *Codec {
a's avatar
ok  
a committed
	c := &Codec{}
	c.Reset(w, r)
	return c
}

func (c *Codec) Reset(w http.ResponseWriter, r *http.Request) {
a's avatar
a committed
	ir := io.Writer(w)
	if w == nil {
		ir = io.Discard
	}
a's avatar
ok  
a committed
	c.r = r
	c.w = w
	if c.wr == nil {
		c.wr = bufio.NewWriter(ir)
	} else {
		c.wr.Reset(ir)
a's avatar
a committed
	}
a's avatar
ok  
a committed
	c.msgs = make(chan *serverutil.Bundle, 1)
	c.errCh = make(chan httpError, 1)

	ctx := c.r.Context()
a's avatar
a committed
	c.ctx, c.cn = context.WithCancel(ctx)
	c.peerInfo()
	c.doRead()
a's avatar
a committed
}
a's avatar
ok  
a committed

a's avatar
a committed
func (c *Codec) peerInfo() {
	c.i.Transport = "http"
	c.i.RemoteAddr = c.r.RemoteAddr
	c.i.HTTP = codec.HttpInfo{
		Version:   c.r.Proto,
		UserAgent: c.r.UserAgent(),
		Host:      c.r.Host,
		Headers:   c.r.Header.Clone(),
a's avatar
a committed
	}
a's avatar
a committed
	c.i.HTTP.Origin = c.r.Header.Get("X-Real-Ip")
	if c.i.HTTP.Origin == "" {
		c.i.HTTP.Origin = c.r.Header.Get("X-Forwarded-For")
a's avatar
a committed
	}
a's avatar
a committed
	if c.i.HTTP.Origin == "" {
		c.i.HTTP.Origin = c.r.Header.Get("Origin")
a's avatar
a committed
	}
a's avatar
a committed
	if c.i.HTTP.Origin == "" {
		c.i.HTTP.Origin = c.r.RemoteAddr
a's avatar
a committed
	}
a's avatar
a committed
}

// gets the peer info
func (c *Codec) PeerInfo() codec.PeerInfo {
	return c.i
a's avatar
a committed
}

a's avatar
a committed
func (r *Codec) doReadGet() (msg *serverutil.Bundle, err error) {
a's avatar
a committed
	method_up := r.r.URL.Query().Get("method")
a's avatar
a committed
	if method_up == "" {
		method_up = r.r.URL.Path
	}
a's avatar
a committed
	params, _ := url.QueryUnescape(r.r.URL.Query().Get("params"))
	param := []byte(params)
	if pb, err := base64.URLEncoding.DecodeString(params); err == nil {
		param = pb
	}
	id := r.r.URL.Query().Get("id")
	if id == "" {
		id = "1"
	}
a's avatar
a committed
	return &serverutil.Bundle{
		Messages: []*codec.Message{{
			ID:     codec.NewId(id),
			Method: method_up,
			Params: param,
		}},
		Batch: false,
	}, nil
a's avatar
a committed
}

a's avatar
a committed
func (r *Codec) doReadRPC() (msg *serverutil.Bundle, err error) {
a's avatar
a committed
	method_up := r.r.URL.Query().Get("method")
	if method_up == "" {
		method_up = r.r.URL.Path
	}
	id := r.r.URL.Query().Get("id")
	if id == "" {
		id = "1"
	}
	data, err := io.ReadAll(r.r.Body)
	if err != nil {
		return nil, err
	}
a's avatar
a committed
	return &serverutil.Bundle{
		Messages: []*codec.Message{{
			ID:     codec.NewId(id),
			Method: method_up,
			Params: data,
		}},
		Batch: false,
	}, nil
}

func (r *Codec) doReadPost() (msg *serverutil.Bundle, err error) {
	data, err := io.ReadAll(r.r.Body)
	if err != nil {
		return nil, err
	}
	return serverutil.ParseBundle(data), nil
a's avatar
a committed
}

a's avatar
a committed
// validateRequest returns a non-zero response code and error message if the
// request is invalid.
func ValidateRequest(r *http.Request) (int, error) {
	if r.Method == http.MethodPut || r.Method == http.MethodDelete {
		return http.StatusMethodNotAllowed, errors.New("method not allowed")
a's avatar
a committed
	}
a's avatar
a committed
	if r.ContentLength > maxRequestContentLength {
		err := fmt.Errorf("content length too large (%d>%d)", r.ContentLength, maxRequestContentLength)
		return http.StatusRequestEntityTooLarge, err
a's avatar
a committed
	}
a's avatar
a committed
	// Allow OPTIONS (regardless of content-type)
	if r.Method == http.MethodOptions {
		return 0, nil
	}
	// Check content-type
	if mt, _, err := mime.ParseMediaType(r.Header.Get("content-type")); err == nil {
		for _, accepted := range acceptedContentTypes {
			if accepted == mt {
				return 0, nil
			}
		}
a's avatar
a committed
	}
a's avatar
a committed
	// Invalid content-type ignored for now
	return 0, nil
	//err := fmt.Errorf("invalid content type, only %s is supported", contentType)
	//return http.StatusUnsupportedMediaType, err
}

func (c *Codec) doRead() {
	code, err := ValidateRequest(c.r)
a's avatar
a committed
	if err != nil {
a's avatar
a committed
		c.errCh <- httpError{
			code: code,
			err:  err,
		}
a's avatar
a committed
		return
	}
a's avatar
a committed
	go func() {
a's avatar
a committed
		var data *serverutil.Bundle
a's avatar
a committed
		// TODO: implement eventsource
a's avatar
a committed
		switch strings.ToUpper(c.r.Method) {
a's avatar
a committed
		case http.MethodGet:
			data, err = c.doReadGet()
a's avatar
a committed
		case "RPC":
			data, err = c.doReadRPC()
a's avatar
a committed
		case http.MethodPost:
a's avatar
a committed
			data, err = c.doReadPost()
a's avatar
a committed
		}
		if err != nil {
			c.errCh <- httpError{
				code: http.StatusInternalServerError,
				err:  err,
			}
			return
		}
		c.msgs <- data
	}()
a's avatar
a committed
}

a's avatar
a committed
func (c *Codec) ReadBatch(ctx context.Context) ([]*codec.Message, bool, error) {
a's avatar
a committed
	select {
	case ans := <-c.msgs:
a's avatar
a committed
		return ans.Messages, ans.Batch, nil
a's avatar
a committed
	case err := <-c.errCh:
		http.Error(c.w, err.err.Error(), err.code)
a's avatar
a committed
		return nil, false, err.err
a's avatar
a committed
	case <-ctx.Done():
a's avatar
a committed
		return nil, false, ctx.Err()
a's avatar
a committed
	case <-c.ctx.Done():
a's avatar
a committed
		return nil, false, c.ctx.Err()
a's avatar
a committed
	}
}

// closes the connection
func (c *Codec) Close() error {
	c.cn()
	return nil
}

func (c *Codec) Write(p []byte) (n int, err error) {
a's avatar
a committed
	return c.wr.Write(p)
}

func (c *Codec) Flush() (err error) {
	err = c.wr.Flush()
	if err != nil {
		return err
	}
	c.cn()
	return
a's avatar
a committed
}

// Closed returns a channel which is closed when the connection is closed.
func (c *Codec) Closed() <-chan struct{} {
	return c.ctx.Done()
}

// RemoteAddr returns the peer address of the connection.
func (c *Codec) RemoteAddr() string {
a's avatar
a committed
	return c.r.RemoteAddr
a's avatar
a committed
}