diff --git a/README.md b/README.md index 5dddf84ad8fa0d88a82e2e1aa5219bf0ece013ed..e967cd8af6f956b84e2cf07deddd6fd1e084b542 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,8 @@ go get nhooyr.io/websocket For a production quality example that demonstrates the complete API, see the [echo example](https://godoc.org/nhooyr.io/websocket#example-package--Echo). +For a full stack example, see [./chat-example](./chat-example). + ### Server ```go diff --git a/chat-example/README.md b/chat-example/README.md new file mode 100644 index 0000000000000000000000000000000000000000..ef06275db3bd1bb6ff2c8c85033e144a2b892965 --- /dev/null +++ b/chat-example/README.md @@ -0,0 +1,27 @@ +# Chat Example + +This directory contains a full stack example of a simple chat webapp using nhooyr.io/websocket. + +```bash +$ cd chat-example +$ go run . localhost:0 +listening on http://127.0.0.1:51055 +``` + +Visit the printed URL to submit and view broadcasted messages in a browser. + + + +## Structure + +The frontend is contained in `index.html`, `index.js` and `index.css`. It sets up the +DOM with a scrollable div at the top that is populated with new messages as they are broadcast. +At the bottom it adds a form to submit messages. +The messages are received via the WebSocket `/subscribe` endpoint and published via +the HTTP POST `/publish` endpoint. + +The server portion is `main.go` and `chat.go` and implements serving the static frontend +assets, the `/subscribe` WebSocket endpoint and the HTTP POST `/publish` endpoint. + +The code is well commented. I would recommend starting in `main.go` and then `chat.go` followed by +`index.html` and then `index.js`. diff --git a/chat-example/chat.go b/chat-example/chat.go new file mode 100644 index 0000000000000000000000000000000000000000..e6e355d04688bd37f7c2495a5483c794d4453a55 --- /dev/null +++ b/chat-example/chat.go @@ -0,0 +1,128 @@ +package main + +import ( + "context" + "errors" + "io" + "io/ioutil" + "log" + "net/http" + "sync" + "time" + + "nhooyr.io/websocket" +) + +// chatServer enables broadcasting to a set of subscribers. +type chatServer struct { + subscribersMu sync.RWMutex + subscribers map[chan<- []byte]struct{} +} + +// subscribeHandler accepts the WebSocket connection and then subscribes +// it to all future messages. +func (cs *chatServer) subscribeHandler(w http.ResponseWriter, r *http.Request) { + c, err := websocket.Accept(w, r, nil) + if err != nil { + log.Print(err) + return + } + defer c.Close(websocket.StatusInternalError, "") + + err = cs.subscribe(r.Context(), c) + if errors.Is(err, context.Canceled) { + return + } + if websocket.CloseStatus(err) == websocket.StatusNormalClosure || + websocket.CloseStatus(err) == websocket.StatusGoingAway { + return + } + if err != nil { + log.Print(err) + } +} + +// publishHandler reads the request body with a limit of 8192 bytes and then publishes +// the received message. +func (cs *chatServer) publishHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return + } + body := io.LimitReader(r.Body, 8192) + msg, err := ioutil.ReadAll(body) + if err != nil { + http.Error(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) + return + } + + cs.publish(msg) +} + +// subscribe subscribes the given WebSocket to all broadcast messages. +// It creates a msgs chan with a buffer of 16 to give some room to slower +// connections and then registers it. It then listens for all messages +// and writes them to the WebSocket. If the context is cancelled or +// an error occurs, it returns and deletes the subscription. +// +// It uses CloseRead to keep reading from the connection to process control +// messages and cancel the context if the connection drops. +func (cs *chatServer) subscribe(ctx context.Context, c *websocket.Conn) error { + ctx = c.CloseRead(ctx) + + msgs := make(chan []byte, 16) + cs.addSubscriber(msgs) + defer cs.deleteSubscriber(msgs) + + for { + select { + case msg := <-msgs: + err := writeTimeout(ctx, time.Second*5, c, msg) + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// publish publishes the msg to all subscribers. +// It never blocks and so messages to slow subscribers +// are dropped. +func (cs *chatServer) publish(msg []byte) { + cs.subscribersMu.RLock() + defer cs.subscribersMu.RUnlock() + + for c := range cs.subscribers { + select { + case c <- msg: + default: + } + } +} + +// addSubscriber registers a subscriber with a channel +// on which to send messages. +func (cs *chatServer) addSubscriber(msgs chan<- []byte) { + cs.subscribersMu.Lock() + if cs.subscribers == nil { + cs.subscribers = make(map[chan<- []byte]struct{}) + } + cs.subscribers[msgs] = struct{}{} + cs.subscribersMu.Unlock() +} + +// deleteSubscriber deletes the subscriber with the given msgs channel. +func (cs *chatServer) deleteSubscriber(msgs chan []byte) { + cs.subscribersMu.Lock() + delete(cs.subscribers, msgs) + cs.subscribersMu.Unlock() +} + +func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + return c.Write(ctx, websocket.MessageText, msg) +} diff --git a/chat-example/go.mod b/chat-example/go.mod new file mode 100644 index 0000000000000000000000000000000000000000..34fa5a69cef29a336862718fdc82ad48057f8704 --- /dev/null +++ b/chat-example/go.mod @@ -0,0 +1,5 @@ +module nhooyr.io/websocket/example-chat + +go 1.13 + +require nhooyr.io/websocket v1.8.2 diff --git a/chat-example/go.sum b/chat-example/go.sum new file mode 100644 index 0000000000000000000000000000000000000000..0755fca5eb20b3bfae3ab488ab2a6a723352f490 --- /dev/null +++ b/chat-example/go.sum @@ -0,0 +1,12 @@ +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/klauspost/compress v1.10.0 h1:92XGj1AcYzA6UrVdd4qIIBrT8OroryvRvdmg/IfmC7Y= +github.com/klauspost/compress v1.10.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +nhooyr.io/websocket v1.8.2 h1:LwdzfyyOZKtVFoXay6A39Acu03KmidSZ3YUUvPa13PA= +nhooyr.io/websocket v1.8.2/go.mod h1:LiqdCg1Cu7TPWxEvPjPa0TGYxCsy4pHNTN9gGluwBpQ= diff --git a/chat-example/index.css b/chat-example/index.css new file mode 100644 index 0000000000000000000000000000000000000000..2980466285849e233c95bfa0a2c9c2339a185e1c --- /dev/null +++ b/chat-example/index.css @@ -0,0 +1,81 @@ +body { + width: 100vw; + min-width: 320px; +} + +#root { + padding: 40px 20px; + max-width: 480px; + margin: auto; + height: 100vh; + + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; +} + +#root > * + * { + margin: 20px 0 0 0; +} + +/* 100vh on safari does not include the bottom bar. */ +@supports (-webkit-overflow-scrolling: touch) { + #root { + height: 85vh; + } +} + +#message-log { + width: 100%; + flex-grow: 1; + overflow: auto; +} + +#message-log p:first-child { + margin: 0; +} + +#message-log > * + * { + margin: 10px 0 0 0; +} + +#publish-form-container { + width: 100%; +} + +#publish-form { + width: 100%; + display: flex; + height: 40px; +} + +#publish-form > * + * { + margin: 0 0 0 10px; +} + +#publish-form input[type="text"] { + flex-grow: 1; + + -moz-appearance: none; + -webkit-appearance: none; + word-break: normal; + border-radius: 5px; + border: 1px solid #ccc; +} + +#publish-form input[type="submit"] { + color: white; + background-color: black; + border-radius: 5px; + padding: 5px 10px; + border: none; +} + +#publish-form input[type="submit"]:hover { + background-color: red; +} + +#publish-form input[type="submit"]:active { + background-color: red; +} diff --git a/chat-example/index.html b/chat-example/index.html new file mode 100644 index 0000000000000000000000000000000000000000..76ae8370149c2415bfd4925bacc9b886b3b89710 --- /dev/null +++ b/chat-example/index.html @@ -0,0 +1,25 @@ +<!DOCTYPE html> +<html lang="en-CA"> + <head> + <meta charset="UTF-8" /> + <title>nhooyr.io/websocket - Chat Example</title> + <meta name="viewport" content="width=device-width" /> + + <link href="https://unpkg.com/sanitize.css" rel="stylesheet" /> + <link href="https://unpkg.com/sanitize.css/typography.css" rel="stylesheet" /> + <link href="https://unpkg.com/sanitize.css/forms.css" rel="stylesheet" /> + <link href="/index.css" rel="stylesheet" /> + </head> + <body> + <div id="root"> + <div id="message-log"></div> + <div id="publish-form-container"> + <form id="publish-form"> + <input name="message" id="message-input" type="text" /> + <input value="Submit" type="submit" /> + </form> + </div> + </div> + <script type="text/javascript" src="/index.js"></script> + </body> +</html> diff --git a/chat-example/index.js b/chat-example/index.js new file mode 100644 index 0000000000000000000000000000000000000000..8fb3dfb8a6521c9fa4e64cd663556e17ea250ce1 --- /dev/null +++ b/chat-example/index.js @@ -0,0 +1,62 @@ +;(() => { + // expectingMessage is set to true + // if the user has just submitted a message + // and so we should scroll the next message into view when received. + let expectingMessage = false + function dial() { + const conn = new WebSocket(`ws://${location.host}/subscribe`) + + conn.addEventListener("close", ev => { + console.info("websocket disconnected, reconnecting in 1000ms", ev) + setTimeout(dial, 1000) + }) + conn.addEventListener("open", ev => { + console.info("websocket connected") + }) + + // This is where we handle messages received. + conn.addEventListener("message", ev => { + if (typeof ev.data !== "string") { + console.error("unexpected message type", typeof ev.data) + return + } + const p = appendLog(ev.data) + if (expectingMessage) { + p.scrollIntoView() + expectingMessage = false + } + }) + } + dial() + + const messageLog = document.getElementById("message-log") + const publishForm = document.getElementById("publish-form") + const messageInput = document.getElementById("message-input") + + // appendLog appends the passed text to messageLog. + function appendLog(text) { + const p = document.createElement("p") + // Adding a timestamp to each message makes the log easier to read. + p.innerText = `${new Date().toLocaleTimeString()}: ${text}` + messageLog.append(p) + return p + } + appendLog("Submit a message to get started!") + + // onsubmit publishes the message from the user when the form is submitted. + publishForm.onsubmit = ev => { + ev.preventDefault() + + const msg = messageInput.value + if (msg === "") { + return + } + messageInput.value = "" + + expectingMessage = true + fetch("/publish", { + method: "POST", + body: msg, + }) + } +})() diff --git a/chat-example/main.go b/chat-example/main.go new file mode 100644 index 0000000000000000000000000000000000000000..2a5209244445ee7443866418115635fdede8bc48 --- /dev/null +++ b/chat-example/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "errors" + "log" + "net" + "net/http" + "os" + "time" +) + +func main() { + log.SetFlags(0) + + err := run() + if err != nil { + log.Fatal(err) + } +} + +// run initializes the chatServer and routes and then +// starts a http.Server for the passed in address. +func run() error { + if len(os.Args) < 2 { + return errors.New("please provide an address to listen on as the first argument") + } + + l, err := net.Listen("tcp", os.Args[1]) + if err != nil { + return err + } + log.Printf("listening on http://%v", l.Addr()) + + var ws chatServer + + m := http.NewServeMux() + m.Handle("/", http.FileServer(http.Dir("."))) + m.HandleFunc("/subscribe", ws.subscribeHandler) + m.HandleFunc("/publish", ws.publishHandler) + + s := http.Server{ + Handler: m, + ReadTimeout: time.Second * 10, + WriteTimeout: time.Second * 10, + } + return s.Serve(l) +} diff --git a/ci/fmt.mk b/ci/fmt.mk index f313562c52a618cb9906c8c50f879cd58e4a887b..3512d02fda4caaccf5c78ae2fbe1e49ba8232a90 100644 --- a/ci/fmt.mk +++ b/ci/fmt.mk @@ -13,7 +13,7 @@ goimports: gen goimports -w "-local=$$(go list -m)" . prettier: - prettier --write --print-width=120 --no-semi --trailing-comma=all --loglevel=warn $$(git ls-files "*.yml" "*.md") + prettier --write --print-width=120 --no-semi --trailing-comma=all --loglevel=warn $$(git ls-files "*.yml" "*.md" "*.js" "*.css" "*.html") gen: stringer -type=opcode,MessageType,StatusCode -output=stringer.go diff --git a/conn_test.go b/conn_test.go index a7bfba0a00136c2f0e2a7402d8761d65894edbfb..64e6736fd334174196c29df92e718d3f1d9ae1b7 100644 --- a/conn_test.go +++ b/conn_test.go @@ -329,7 +329,7 @@ func newConnTest(t testing.TB, dialOpts *websocket.DialOptions, acceptOpts *webs } t.Helper() - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) tt = &connTest{t: t, ctx: ctx} tt.appendDone(cancel)