good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit 76c77c44 authored by a's avatar a
Browse files

race

parent 4d1e6fd3
No related branches found
No related tags found
1 merge request!20mod
Pipeline #29762 failed
...@@ -3,7 +3,6 @@ package broker ...@@ -3,7 +3,6 @@ package broker
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"log"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
...@@ -66,13 +65,10 @@ func (b *ChannelBroker) SetDroppedMessageHandler(fn func(string, []byte)) *Chann ...@@ -66,13 +65,10 @@ func (b *ChannelBroker) SetDroppedMessageHandler(fn func(string, []byte)) *Chann
} }
func (b *ChannelBroker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) { func (b *ChannelBroker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) {
log.Println("start recv")
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Println("doned")
return nil, nil, ctx.Err() return nil, nil, ctx.Err()
case f := <-b.msgs: case f := <-b.msgs:
log.Println("recv", string(f.data))
return f.data, func(resp json.RawMessage) error { return f.data, func(resp json.RawMessage) error {
return b.Publish(context.Background(), f.topic, resp) return b.Publish(context.Background(), f.topic, resp)
}, nil }, nil
...@@ -84,7 +80,6 @@ func (b *ChannelBroker) WriteRequest(ctx context.Context, topic string, msg json ...@@ -84,7 +80,6 @@ func (b *ChannelBroker) WriteRequest(ctx context.Context, topic string, msg json
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case b.msgs <- &frame{data: msg, topic: topic}: case b.msgs <- &frame{data: msg, topic: topic}:
log.Println("wrote", string(msg))
} }
return nil return nil
} }
......
...@@ -25,6 +25,6 @@ func (s *Server) ServeSpoke(ctx context.Context, stream ServerSpoke) { ...@@ -25,6 +25,6 @@ func (s *Server) ServeSpoke(ctx context.Context, stream ServerSpoke) {
continue continue
} }
cd := NewCodec(req, fn) cd := NewCodec(req, fn)
s.Server.ServeCodec(ctx, cd) go s.Server.ServeCodec(ctx, cd)
} }
} }
...@@ -26,6 +26,8 @@ func (s *Set[T]) Remove(x T) { ...@@ -26,6 +26,8 @@ func (s *Set[T]) Remove(x T) {
} }
func (s *Set[T]) Each(fn func(x T) bool) { func (s *Set[T]) Each(fn func(x T) bool) {
s.mu.RLock()
defer s.mu.RUnlock()
for k := range s.m { for k := range s.m {
if !fn(k) { if !fn(k) {
return return
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment