good morning!!!!

Skip to content
Snippets Groups Projects
Commit 04e236d2 authored by a's avatar a
Browse files

updates mayve:

parent 1be5f024
Branches
Tags v0.0.7
No related merge requests found
...@@ -74,6 +74,10 @@ func (s *Stream) Sub(sb Subscriber) string { ...@@ -74,6 +74,10 @@ func (s *Stream) Sub(sb Subscriber) string {
return o.id return o.id
} }
func (s *Stream) Unsubscribe(id string) {
s.subs.Delete(id)
}
func (s *Stream) run() { func (s *Stream) run() {
go s.run_update() go s.run_update()
go s.run_send() go s.run_send()
......
...@@ -54,7 +54,7 @@ type cmd struct { ...@@ -54,7 +54,7 @@ type cmd struct {
Topic string `json:"topic"` Topic string `json:"topic"`
Patch gjson.RawMessage `json:"patch"` Patch gjson.RawMessage `json:"patch"`
Seq int `json:"seq"` Seq int `json:"seq"`
} } `json:"params"`
} }
func (ws *WebsocketClient) connect() (err error) { func (ws *WebsocketClient) connect() (err error) {
...@@ -134,8 +134,13 @@ func (w *Receiver) Seq() int { ...@@ -134,8 +134,13 @@ func (w *Receiver) Seq() int {
return w.seq return w.seq
} }
type sub_cmd struct {
Method string `json:"method"`
Params []string `json:"params"`
}
func (w *Receiver) subscribe() error { func (w *Receiver) subscribe() error {
return w.c.c.WriteJSON(map[string]any{"method": "topic_subscribe", "params": []string{w.topic}}) return w.c.c.WriteJSON(sub_cmd{Method: "topic_subscribe", Params: []string{w.topic}})
} }
func (w *Receiver) OnPatch(seq int, diffs []byte) error { func (w *Receiver) OnPatch(seq int, diffs []byte) error {
...@@ -159,7 +164,7 @@ func (w *Receiver) OnPatchRaw(seq int, diffs gjson.RawMessage) error { ...@@ -159,7 +164,7 @@ func (w *Receiver) OnPatchRaw(seq int, diffs gjson.RawMessage) error {
} }
func (w *Receiver) Close() { func (w *Receiver) Close() {
w.c.c.WriteJSON(map[string]any{"topic": "topic_unsubscribe", "params": []string{w.topic}}) w.c.c.WriteJSON(sub_cmd{Method: "topic_unsubscribe", Params: []string{w.topic}})
w.c.Lock() w.c.Lock()
defer w.c.Unlock() defer w.c.Unlock()
delete(w.c.Streams, w.topic) delete(w.c.Streams, w.topic)
......
...@@ -4,8 +4,8 @@ import ( ...@@ -4,8 +4,8 @@ import (
"context" "context"
"log" "log"
"net/http" "net/http"
"sync"
"gfx.cafe/open/goutil/generic"
"gfx.cafe/open/jsonpatch" "gfx.cafe/open/jsonpatch"
"gfx.cafe/open/jsonrpc2" "gfx.cafe/open/jsonrpc2"
"gfx.cafe/open/jsonrpc2/external/websocket" "gfx.cafe/open/jsonrpc2/external/websocket"
...@@ -14,10 +14,8 @@ import ( ...@@ -14,10 +14,8 @@ import (
var DefaultUpgrader = websocket.Upgrader{} // use default options var DefaultUpgrader = websocket.Upgrader{} // use default options
type WebsocketServer struct { type WebsocketServer struct {
Streams map[string]*Stream Streams generic.MapOf[string, *Stream]
u websocket.Upgrader u websocket.Upgrader
mu sync.RWMutex
} }
func NewWssServer(upgrader ...websocket.Upgrader) *WebsocketServer { func NewWssServer(upgrader ...websocket.Upgrader) *WebsocketServer {
...@@ -26,7 +24,6 @@ func NewWssServer(upgrader ...websocket.Upgrader) *WebsocketServer { ...@@ -26,7 +24,6 @@ func NewWssServer(upgrader ...websocket.Upgrader) *WebsocketServer {
u = upgrader[0] u = upgrader[0]
} }
return &WebsocketServer{ return &WebsocketServer{
Streams: make(map[string]*Stream),
u: u, u: u,
} }
} }
...@@ -43,6 +40,11 @@ func (ws *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { ...@@ -43,6 +40,11 @@ func (ws *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
defer c.Close() defer c.Close()
ws.handleClient(c, w, r)
}
func (ws *WebsocketServer) handleClient(c *websocket.Conn, w http.ResponseWriter, r *http.Request) {
routes := map[string]string{}
for { for {
_, message, err := c.ReadMessage() _, message, err := c.ReadMessage()
if err != nil { if err != nil {
...@@ -62,27 +64,31 @@ func (ws *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { ...@@ -62,27 +64,31 @@ func (ws *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
continue continue
} }
switch cmd.Method { switch cmd.Method {
case "close":
break
case "topic_subscribe": case "topic_subscribe":
for _, v := range cmd.Params { for _, v := range cmd.Params {
ws.mu.RLock() if st, ok := ws.Streams.Load(v); ok {
if st, ok := ws.Streams[v]; ok {
ws.mu.RUnlock()
sub := NewWssSubscriber(r.Context(), v, c) sub := NewWssSubscriber(r.Context(), v, c)
st.Sub(sub) xid_str := st.Sub(sub)
} else { routes[v] = xid_str
ws.mu.RUnlock()
} }
} }
case "topic_unsubscribe": case "topic_unsubscribe":
for _, v := range cmd.Params {
if id, ok := routes[v]; ok {
if st, ok := ws.Streams.Load(v); ok {
st.Unsubscribe(id)
}
}
}
} }
} }
} }
func ServerRegister[T any](w *WebsocketServer, route string, v T) *TStream[T] { func ServerRegister[T any](w *WebsocketServer, route string, v T) *TStream[T] {
o := New() o := New()
w.mu.Lock() w.Streams.Store(route, o)
defer w.mu.Unlock()
w.Streams[route] = o
return &TStream[T]{ return &TStream[T]{
o, o,
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment