diff --git a/lib/auth/md5/md5.go b/lib/auth/md5/md5.go index a721bd5cd1977c3c42b951f19041ddfd8ff7a2be..bd715eba7b525c929d8476f7fdca9ab7470bdbc7 100644 --- a/lib/auth/md5/md5.go +++ b/lib/auth/md5/md5.go @@ -4,6 +4,8 @@ import ( "crypto/md5" "encoding/hex" "strings" + + "pggat2/lib/util/slices" ) func Encode(username, password string, salt [4]byte) string { @@ -18,10 +20,7 @@ func Encode(username, password string, salt [4]byte) string { hash.Write(hexEncoded) hash.Write(salt[:]) sum2 := hash.Sum(nil) - hexEncoded = hexEncoded[:0] - for i := 0; i < hex.EncodedLen(len(sum2)); i++ { - hexEncoded = append(hexEncoded, 0) - } + hexEncoded = slices.Resize(hexEncoded, hex.EncodedLen(len(sum2))) hex.Encode(hexEncoded, sum2) var out strings.Builder diff --git a/lib/auth/sasl/client.go b/lib/auth/sasl/client.go index 211fb7f6eef5c8d179de53064642d0b9ef7c5bf1..3310cd60ecad2e27faf33191a853e07a9588021a 100644 --- a/lib/auth/sasl/client.go +++ b/lib/auth/sasl/client.go @@ -19,7 +19,7 @@ func NewClient(mechanisms []string, username, password string) (Client, error) { for _, mechanism := range mechanisms { switch mechanism { case scram.SHA256: - return scram.NewClient(mechanism, username, password) + return scram.MakeClient(mechanism, username, password) } } return nil, ErrMechanismsNotSupported diff --git a/lib/auth/sasl/scram/client.go b/lib/auth/sasl/scram/client.go index a73f5ecaa6e03d1f99ea9fac966bf644e5f66ebb..61d0dfff2f3a30efb04de72a02a906b0f564954b 100644 --- a/lib/auth/sasl/scram/client.go +++ b/lib/auth/sasl/scram/client.go @@ -9,36 +9,36 @@ type Client struct { conversation *scram.ClientConversation } -func NewClient(mechanism, username, password string) (*Client, error) { +func MakeClient(mechanism, username, password string) (Client, error) { var generator scram.HashGeneratorFcn switch mechanism { case SHA256: generator = scram.SHA256 default: - return nil, ErrUnsupportedMethod + return Client{}, ErrUnsupportedMethod } client, err := generator.NewClient(username, password, "") if err != nil { - return nil, err + return Client{}, err } - return &Client{ + return Client{ name: mechanism, conversation: client.NewConversation(), }, nil } -func (T *Client) Name() string { +func (T Client) Name() string { return T.name } -func (T *Client) InitialResponse() []byte { +func (T Client) InitialResponse() []byte { return nil } -func (T *Client) Continue(bytes []byte) ([]byte, error) { +func (T Client) Continue(bytes []byte) ([]byte, error) { msg, err := T.conversation.Step(string(bytes)) if err != nil { return nil, err @@ -46,7 +46,7 @@ func (T *Client) Continue(bytes []byte) ([]byte, error) { return []byte(msg), nil } -func (T *Client) Final(bytes []byte) error { +func (T Client) Final(bytes []byte) error { _, err := T.Continue(bytes) return err } diff --git a/lib/auth/sasl/scram/server.go b/lib/auth/sasl/scram/server.go index 135ae7cd28a5a6b29c1eb3e7d06c30dd79d4b563..93da1e912da7233b0e5bf7788708e8c202e195f9 100644 --- a/lib/auth/sasl/scram/server.go +++ b/lib/auth/sasl/scram/server.go @@ -10,19 +10,19 @@ type Server struct { conversation *scram.ServerConversation } -func NewServer(mechanism, username, password string) (*Server, error) { +func MakeServer(mechanism, username, password string) (Server, error) { var generator scram.HashGeneratorFcn switch mechanism { case SHA256: generator = scram.SHA256 default: - return nil, ErrUnsupportedMethod + return Server{}, ErrUnsupportedMethod } client, err := generator.NewClient(username, password, "") if err != nil { - return nil, err + return Server{}, err } kf := scram.KeyFactors{ @@ -36,19 +36,19 @@ func NewServer(mechanism, username, password string) (*Server, error) { }, ) if err != nil { - return nil, err + return Server{}, err } - return &Server{ + return Server{ conversation: server.NewConversation(), }, nil } -func (T *Server) InitialResponse(bytes []byte) ([]byte, bool, error) { +func (T Server) InitialResponse(bytes []byte) ([]byte, bool, error) { return T.Continue(bytes) } -func (T *Server) Continue(bytes []byte) ([]byte, bool, error) { +func (T Server) Continue(bytes []byte) ([]byte, bool, error) { msg, err := T.conversation.Step(string(bytes)) if err != nil { return nil, false, err diff --git a/lib/auth/sasl/server.go b/lib/auth/sasl/server.go index f46624f8c0f57631898cbfaa31d2ecd85043f9fb..c834f839a929cbfdbebcb7000c7576b02bb9808d 100644 --- a/lib/auth/sasl/server.go +++ b/lib/auth/sasl/server.go @@ -10,7 +10,7 @@ type Server interface { func NewServer(mechanism, username, password string) (Server, error) { switch mechanism { case scram.SHA256: - return scram.NewServer(mechanism, username, password) + return scram.MakeServer(mechanism, username, password) default: return nil, ErrMechanismsNotSupported } diff --git a/lib/pnet/packet/in.go b/lib/pnet/packet/in.go index bc98448c31b2a95a51f23bd7ecc601f3828efc84..7d4094b404e09b734b95dce49439095a9d6fe275 100644 --- a/lib/pnet/packet/in.go +++ b/lib/pnet/packet/in.go @@ -164,3 +164,12 @@ func (T In) Bytes(b []byte) bool { T.buf.pos += len(b) return true } + +// UnsafeBytes returns a byte slice without copying. Use this only if you plan to be done with the slice when the In is reset or the data will be replaced with garbage. +func (T In) UnsafeBytes(count int) ([]byte, bool) { + rem := T.Remaining() + if count > len(rem) { + return nil, false + } + return rem[:count], true +} diff --git a/lib/pnet/packet/packets/v3.0/saslinitialresponse.go b/lib/pnet/packet/packets/v3.0/saslinitialresponse.go index 68815d52de4aeb563c1ee733d267eb6ce5616b3e..75cb65040980d0084de544ed36c2a2c0416e9982 100644 --- a/lib/pnet/packet/packets/v3.0/saslinitialresponse.go +++ b/lib/pnet/packet/packets/v3.0/saslinitialresponse.go @@ -22,11 +22,7 @@ func ReadSASLInitialResponse(in packet.In) (mechanism string, initialResponse [] return } - initialResponse = make([]byte, int(initialResponseSize)) - ok = in.Bytes(initialResponse[:]) - if !ok { - return - } + initialResponse, ok = in.UnsafeBytes(int(initialResponseSize)) return } diff --git a/lib/rob/schedulers/v2/pool/pool.go b/lib/rob/schedulers/v2/pool/pool.go index d5cf5db032e6a50d0f39c707054602eb1399ce94..d4f24031578bfb18be1961b0198eeda99d0eb2ea 100644 --- a/lib/rob/schedulers/v2/pool/pool.go +++ b/lib/rob/schedulers/v2/pool/pool.go @@ -108,17 +108,9 @@ func (T *Pool) StealFor(id uuid.UUID) { if s == q { continue } - works, ok := s.queue.Steal(q.constraints) - if !ok { - continue - } - if len(works) > 0 { - source := works[0].Source + if source, ok := s.queue.Steal(q.constraints, q.queue); ok { T.affinity[source] = id + break } - for _, work := range works { - q.queue.Queue(work) - } - break } } diff --git a/lib/rob/schedulers/v2/queue/queue.go b/lib/rob/schedulers/v2/queue/queue.go index 803b0104e375518149b400880f3b6e962959e6b0..7147779265a2ab3f63c15617f92a128af5465990 100644 --- a/lib/rob/schedulers/v2/queue/queue.go +++ b/lib/rob/schedulers/v2/queue/queue.go @@ -53,6 +53,10 @@ func (T *Queue) Queue(work job.Job) { T.mu.Lock() defer T.mu.Unlock() + T.queue(work) +} + +func (T *Queue) queue(work job.Job) { // try to schedule right away if ok := T.scheduleWork(work); ok { return @@ -117,31 +121,40 @@ func (T *Queue) scheduleWork(work job.Job) bool { } // Steal work from this Sink that is satisfied by constraints -func (T *Queue) Steal(constraints rob.Constraints) ([]job.Job, bool) { +func (T *Queue) Steal(constraints rob.Constraints, dst *Queue) (uuid.UUID, bool) { + if T == dst { + // cannot steal from ourselves + return uuid.Nil, false + } + T.mu.Lock() defer T.mu.Unlock() for stride, work, ok := T.scheduled.Min(); ok; stride, work, ok = T.scheduled.Next(stride) { if constraints.Satisfies(work.Constraints) { + source := work.Source + + dst.mu.Lock() + defer dst.mu.Unlock() + // steal it T.scheduled.Delete(stride) + dst.queue(work) + // steal pending pending, _ := T.pending[work.Source] - jobs := make([]job.Job, 0, pending.Length()+1) - jobs = append(jobs, work) - for work, ok = pending.PopFront(); ok; work, ok = pending.PopFront() { - jobs = append(jobs, work) + dst.queue(work) } - return jobs, true + return source, true } } // no stealable work - return nil, false + return uuid.Nil, false } func (T *Queue) Ready() <-chan struct{} {