diff --git a/miner/worker.go b/miner/worker.go
index 9fb248efa2c18e2b8561149e0285dd9d3fd8adea..daabd3db559126b6e1f3ed0788c2a724b44f6f4a 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -201,7 +201,7 @@ func (self *worker) wait() {
 				}
 				self.mux.Post(core.NewMinedBlockEvent{block})
 
-				glog.V(logger.Info).Infof("🔨 Mined block #%v", block.Number())
+				glog.V(logger.Info).Infof("🔨  Mined block #%v", block.Number())
 
 				jsonlogger.LogJson(&logger.EthMinerNewBlock{
 					BlockHash:     block.Hash().Hex(),
diff --git a/p2p/server.go b/p2p/server.go
index eaffc9d13873895470351f359196c02e8ff92429..b5c4a1f59df1f98b3ab399d7e15bf0a1f7eac9b8 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -359,9 +359,11 @@ func (srv *Server) dialLoop() {
 					rand.Read(target[:])
 					findresults <- srv.ntab.Lookup(target)
 				}()
-				refresh.Stop()
+			} else {
+				// Make sure we check again if the peer count falls
+				// below MaxPeers.
+				refresh.Reset(refreshPeersInterval)
 			}
-
 		case dest := <-srv.peerConnect:
 			dial(dest)
 		case dests := <-findresults:
@@ -371,7 +373,10 @@ func (srv *Server) dialLoop() {
 			refresh.Reset(refreshPeersInterval)
 		case dest := <-dialed:
 			delete(dialing, dest.ID)
-
+			if len(dialing) == 0 {
+				// Check again immediately after dialing all current candidates.
+				refresh.Reset(0)
+			}
 		case <-srv.quit:
 			// TODO: maybe wait for active dials
 			return
diff --git a/rpc/api.go b/rpc/api.go
index 4b61fa3a58e184b5e2c6bb7fc762b6c512d6e9c7..bf5066f9a44c840808093f96d342ac87f26224a9 100644
--- a/rpc/api.go
+++ b/rpc/api.go
@@ -182,8 +182,8 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
 		if err != nil {
 			return err
 		}
-
-		*reply = v
+		// TODO unwrap the parent method's ToHex call
+		*reply = newHexData(common.FromHex(v))
 	case "eth_flush":
 		return NewNotImplementedError(req.Method)
 	case "eth_getBlockByHash":
diff --git a/ui/qt/qwhisper/whisper.go b/ui/qt/qwhisper/whisper.go
index 3c2d0a4b91686fae40db3006a27d50ef20224975..50b0626f56682378ceb852ea710de94446b7e2e8 100644
--- a/ui/qt/qwhisper/whisper.go
+++ b/ui/qt/qwhisper/whisper.go
@@ -41,7 +41,7 @@ func (self *Whisper) Post(payload []string, to, from string, topics []string, pr
 			TTL:    time.Duration(ttl) * time.Second,
 			To:     crypto.ToECDSAPub(common.FromHex(to)),
 			From:   key,
-			Topics: whisper.TopicsFromString(topics...),
+			Topics: whisper.NewTopicsFromStrings(topics...),
 		})
 
 		if err != nil {
@@ -106,7 +106,7 @@ func filterFromMap(opts map[string]interface{}) (f whisper.Filter) {
 	if topicList, ok := opts["topics"].(*qml.List); ok {
 		var topics []string
 		topicList.Convert(&topics)
-		f.Topics = whisper.TopicsFromString(topics...)
+		f.Topics = whisper.NewTopicsFromStrings(topics...)
 	}
 
 	return
diff --git a/whisper/envelope.go b/whisper/envelope.go
index f35a40a42e0df9ab09446ed2abb0f142b300b9f7..0a817e26efbabec2ab8930e75859467d3ba93c68 100644
--- a/whisper/envelope.go
+++ b/whisper/envelope.go
@@ -20,16 +20,16 @@ import (
 type Envelope struct {
 	Expiry uint32 // Whisper protocol specifies int32, really should be int64
 	TTL    uint32 // ^^^^^^
-	Topics [][]byte
+	Topics []Topic
 	Data   []byte
 	Nonce  uint32
 
-	hash common.Hash
+	hash common.Hash // Cached hash of the envelope to avoid rehashing every time
 }
 
 // NewEnvelope wraps a Whisper message with expiration and destination data
 // included into an envelope for network forwarding.
-func NewEnvelope(ttl time.Duration, topics [][]byte, msg *Message) *Envelope {
+func NewEnvelope(ttl time.Duration, topics []Topic, msg *Message) *Envelope {
 	return &Envelope{
 		Expiry: uint32(time.Now().Add(ttl).Unix()),
 		TTL:    uint32(ttl.Seconds()),
@@ -59,16 +59,6 @@ func (self *Envelope) Seal(pow time.Duration) {
 	}
 }
 
-// valid checks whether the claimed proof of work was indeed executed.
-// TODO: Is this really useful? Isn't this always true?
-func (self *Envelope) valid() bool {
-	d := make([]byte, 64)
-	copy(d[:32], self.rlpWithoutNonce())
-	binary.BigEndian.PutUint32(d[60:], self.Nonce)
-
-	return common.FirstBitSet(common.BigD(crypto.Sha3(d))) > 0
-}
-
 // rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce.
 func (self *Envelope) rlpWithoutNonce() []byte {
 	enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data})
@@ -85,20 +75,19 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) {
 	}
 	data = data[1:]
 
-	if message.Flags&128 == 128 {
-		if len(data) < 65 {
-			return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 65")
+	if message.Flags&signatureFlag == signatureFlag {
+		if len(data) < signatureLength {
+			return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < len(signature)")
 		}
-		message.Signature, data = data[:65], data[65:]
+		message.Signature, data = data[:signatureLength], data[signatureLength:]
 	}
 	message.Payload = data
 
-	// Short circuit if the encryption was requested
+	// Decrypt the message, if requested
 	if key == nil {
 		return message, nil
 	}
-	// Otherwise try to decrypt the message
-	message.Payload, err = crypto.Decrypt(key, message.Payload)
+	err = message.decrypt(key)
 	switch err {
 	case nil:
 		return message, nil
diff --git a/whisper/filter.go b/whisper/filter.go
index b33f2c1a259ca475e3cec773fee7b3a8979c6920..8fcc45afd2b5ca2a33f8e7bbed0136c5bdc4c7d2 100644
--- a/whisper/filter.go
+++ b/whisper/filter.go
@@ -1,10 +1,13 @@
+// Contains the message filter for fine grained subscriptions.
+
 package whisper
 
 import "crypto/ecdsa"
 
+// Filter is used to subscribe to specific types of whisper messages.
 type Filter struct {
-	To     *ecdsa.PublicKey
-	From   *ecdsa.PublicKey
-	Topics [][]byte
-	Fn     func(*Message)
+	To     *ecdsa.PublicKey // Recipient of the message
+	From   *ecdsa.PublicKey // Sender of the message
+	Topics []Topic          // Topics to watch messages on
+	Fn     func(*Message)   // Handler in case of a match
 }
diff --git a/whisper/main.go b/whisper/main.go
index 422f0fa3bc86919d35384b535605b0adf373708c..3c8c3801f7fb03a5ec0c5c93206aa747144b2eb4 100644
--- a/whisper/main.go
+++ b/whisper/main.go
@@ -69,10 +69,10 @@ func selfSend(shh *whisper.Whisper, payload []byte) error {
 	})
 	// Wrap the payload and encrypt it
 	msg := whisper.NewMessage(payload)
-	envelope, err := msg.Wrap(whisper.DefaultProofOfWork, whisper.Options{
+	envelope, err := msg.Wrap(whisper.DefaultPoW, whisper.Options{
 		From: id,
 		To:   &id.PublicKey,
-		TTL:  whisper.DefaultTimeToLive,
+		TTL:  whisper.DefaultTTL,
 	})
 	if err != nil {
 		return fmt.Errorf("failed to seal message: %v", err)
diff --git a/whisper/message.go b/whisper/message.go
index 2666ee6e009b4ba1d203bb64768d011fee613115..07c67356773295a656ecd6adbfc9b280557d2b2d 100644
--- a/whisper/message.go
+++ b/whisper/message.go
@@ -30,13 +30,14 @@ type Options struct {
 	From   *ecdsa.PrivateKey
 	To     *ecdsa.PublicKey
 	TTL    time.Duration
-	Topics [][]byte
+	Topics []Topic
 }
 
 // NewMessage creates and initializes a non-signed, non-encrypted Whisper message.
 func NewMessage(payload []byte) *Message {
-	// Construct an initial flag set: bit #1 = 0 (no signature), rest random
-	flags := byte(rand.Intn(128))
+	// Construct an initial flag set: no signature, rest random
+	flags := byte(rand.Intn(256))
+	flags &= ^signatureFlag
 
 	// Assemble and return the message
 	return &Message{
@@ -61,7 +62,7 @@ func NewMessage(payload []byte) *Message {
 func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) {
 	// Use the default TTL if non was specified
 	if options.TTL == 0 {
-		options.TTL = DefaultTimeToLive
+		options.TTL = DefaultTTL
 	}
 	// Sign and encrypt the message if requested
 	if options.From != nil {
@@ -84,7 +85,7 @@ func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error)
 // sign calculates and sets the cryptographic signature for the message , also
 // setting the sign flag.
 func (self *Message) sign(key *ecdsa.PrivateKey) (err error) {
-	self.Flags |= 1 << 7
+	self.Flags |= signatureFlag
 	self.Signature, err = crypto.Sign(self.hash(), key)
 	return
 }
@@ -93,6 +94,11 @@ func (self *Message) sign(key *ecdsa.PrivateKey) (err error) {
 func (self *Message) Recover() *ecdsa.PublicKey {
 	defer func() { recover() }() // in case of invalid signature
 
+	// Short circuit if no signature is present
+	if self.Signature == nil {
+		return nil
+	}
+	// Otherwise try and recover the signature
 	pub, err := crypto.SigToPub(self.hash(), self.Signature)
 	if err != nil {
 		glog.V(logger.Error).Infof("Could not get public key from signature: %v", err)
@@ -102,8 +108,14 @@ func (self *Message) Recover() *ecdsa.PublicKey {
 }
 
 // encrypt encrypts a message payload with a public key.
-func (self *Message) encrypt(to *ecdsa.PublicKey) (err error) {
-	self.Payload, err = crypto.Encrypt(to, self.Payload)
+func (self *Message) encrypt(key *ecdsa.PublicKey) (err error) {
+	self.Payload, err = crypto.Encrypt(key, self.Payload)
+	return
+}
+
+// decrypt decrypts an encrypted payload with a private key.
+func (self *Message) decrypt(key *ecdsa.PrivateKey) (err error) {
+	self.Payload, err = crypto.Decrypt(key, self.Payload)
 	return
 }
 
diff --git a/whisper/message_test.go b/whisper/message_test.go
index 8d4c5e9907ba7b26e3a1b3207271169a382336e6..18a254e5c936316a704755f6ea489bea8dcfbb0d 100644
--- a/whisper/message_test.go
+++ b/whisper/message_test.go
@@ -13,11 +13,11 @@ func TestMessageSimpleWrap(t *testing.T) {
 	payload := []byte("hello world")
 
 	msg := NewMessage(payload)
-	if _, err := msg.Wrap(DefaultProofOfWork, Options{}); err != nil {
+	if _, err := msg.Wrap(DefaultPoW, Options{}); err != nil {
 		t.Fatalf("failed to wrap message: %v", err)
 	}
-	if msg.Flags&128 != 0 {
-		t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0)
+	if msg.Flags&signatureFlag != 0 {
+		t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0)
 	}
 	if len(msg.Signature) != 0 {
 		t.Fatalf("signature found for simple wrapping: 0x%x", msg.Signature)
@@ -36,13 +36,13 @@ func TestMessageCleartextSignRecover(t *testing.T) {
 	payload := []byte("hello world")
 
 	msg := NewMessage(payload)
-	if _, err := msg.Wrap(DefaultProofOfWork, Options{
+	if _, err := msg.Wrap(DefaultPoW, Options{
 		From: key,
 	}); err != nil {
 		t.Fatalf("failed to sign message: %v", err)
 	}
-	if msg.Flags&128 != 128 {
-		t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1)
+	if msg.Flags&signatureFlag != signatureFlag {
+		t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag)
 	}
 	if bytes.Compare(msg.Payload, payload) != 0 {
 		t.Fatalf("payload mismatch after signing: have 0x%x, want 0x%x", msg.Payload, payload)
@@ -69,14 +69,14 @@ func TestMessageAnonymousEncryptDecrypt(t *testing.T) {
 	payload := []byte("hello world")
 
 	msg := NewMessage(payload)
-	envelope, err := msg.Wrap(DefaultProofOfWork, Options{
+	envelope, err := msg.Wrap(DefaultPoW, Options{
 		To: &key.PublicKey,
 	})
 	if err != nil {
 		t.Fatalf("failed to encrypt message: %v", err)
 	}
-	if msg.Flags&128 != 0 {
-		t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0)
+	if msg.Flags&signatureFlag != 0 {
+		t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0)
 	}
 	if len(msg.Signature) != 0 {
 		t.Fatalf("signature found for anonymous message: 0x%x", msg.Signature)
@@ -104,15 +104,15 @@ func TestMessageFullCrypto(t *testing.T) {
 
 	payload := []byte("hello world")
 	msg := NewMessage(payload)
-	envelope, err := msg.Wrap(DefaultProofOfWork, Options{
+	envelope, err := msg.Wrap(DefaultPoW, Options{
 		From: fromKey,
 		To:   &toKey.PublicKey,
 	})
 	if err != nil {
 		t.Fatalf("failed to encrypt message: %v", err)
 	}
-	if msg.Flags&128 != 128 {
-		t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1)
+	if msg.Flags&signatureFlag != signatureFlag {
+		t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag)
 	}
 	if len(msg.Signature) == 0 {
 		t.Fatalf("no signature found for signed message")
diff --git a/whisper/peer.go b/whisper/peer.go
index 338166c25f3fe9e5d5d4b944e3ce68c1d347c2d0..e4301f37c302d439053f4f918db85637773d77ac 100644
--- a/whisper/peer.go
+++ b/whisper/peer.go
@@ -4,110 +4,160 @@ import (
 	"fmt"
 	"time"
 
+	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/rlp"
 	"gopkg.in/fatih/set.v0"
 )
 
-const (
-	protocolVersion uint64 = 0x02
-)
-
+// peer represents a whisper protocol peer connection.
 type peer struct {
 	host *Whisper
 	peer *p2p.Peer
 	ws   p2p.MsgReadWriter
 
-	// XXX Eventually this is going to reach exceptional large space. We need an expiry here
-	known *set.Set
+	known *set.Set // Messages already known by the peer to avoid wasting bandwidth
 
 	quit chan struct{}
 }
 
-func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer {
-	return &peer{host, p, ws, set.New(), make(chan struct{})}
-}
-
-func (self *peer) init() error {
-	if err := self.handleStatus(); err != nil {
-		return err
+// newPeer creates and initializes a new whisper peer connection, returning either
+// the newly constructed link or a failure reason.
+func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) (*peer, error) {
+	p := &peer{
+		host:  host,
+		peer:  remote,
+		ws:    rw,
+		known: set.New(),
+		quit:  make(chan struct{}),
 	}
-
-	return nil
+	if err := p.handshake(); err != nil {
+		return nil, err
+	}
+	return p, nil
 }
 
+// start initiates the peer updater, periodically broadcasting the whisper packets
+// into the network.
 func (self *peer) start() {
 	go self.update()
 	self.peer.Debugln("whisper started")
 }
 
+// stop terminates the peer updater, stopping message forwarding to it.
 func (self *peer) stop() {
+	close(self.quit)
 	self.peer.Debugln("whisper stopped")
+}
 
-	close(self.quit)
+// handshake sends the protocol initiation status message to the remote peer and
+// verifies the remote status too.
+func (self *peer) handshake() error {
+	// Send the handshake status message asynchronously
+	errc := make(chan error, 1)
+	go func() {
+		errc <- p2p.SendItems(self.ws, statusCode, protocolVersion)
+	}()
+	// Fetch the remote status packet and verify protocol match
+	packet, err := self.ws.ReadMsg()
+	if err != nil {
+		return err
+	}
+	if packet.Code != statusCode {
+		return fmt.Errorf("peer sent %x before status packet", packet.Code)
+	}
+	s := rlp.NewStream(packet.Payload)
+	if _, err := s.List(); err != nil {
+		return fmt.Errorf("bad status message: %v", err)
+	}
+	peerVersion, err := s.Uint()
+	if err != nil {
+		return fmt.Errorf("bad status message: %v", err)
+	}
+	if peerVersion != protocolVersion {
+		return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion)
+	}
+	// Wait until out own status is consumed too
+	if err := <-errc; err != nil {
+		return fmt.Errorf("failed to send status packet: %v", err)
+	}
+	return nil
 }
 
+// update executes periodic operations on the peer, including message transmission
+// and expiration.
 func (self *peer) update() {
-	relay := time.NewTicker(300 * time.Millisecond)
-out:
+	// Start the tickers for the updates
+	expire := time.NewTicker(expirationCycle)
+	transmit := time.NewTicker(transmissionCycle)
+
+	// Loop and transmit until termination is requested
 	for {
 		select {
-		case <-relay.C:
-			err := self.broadcast(self.host.envelopes())
-			if err != nil {
-				self.peer.Infoln("broadcast err:", err)
-				break out
+		case <-expire.C:
+			self.expire()
+
+		case <-transmit.C:
+			if err := self.broadcast(); err != nil {
+				self.peer.Infoln("broadcast failed:", err)
+				return
 			}
 
 		case <-self.quit:
-			break out
+			return
 		}
 	}
 }
 
-func (self *peer) broadcast(envelopes []*Envelope) error {
-	envs := make([]*Envelope, 0, len(envelopes))
-	for _, env := range envelopes {
-		if !self.known.Has(env.Hash()) {
-			envs = append(envs, env)
-			self.known.Add(env.Hash())
-		}
-	}
-	if len(envs) > 0 {
-		if err := p2p.Send(self.ws, envelopesMsg, envs); err != nil {
-			return err
-		}
-		self.peer.DebugDetailln("broadcasted", len(envs), "message(s)")
-	}
-	return nil
+// mark marks an envelope known to the peer so that it won't be sent back.
+func (self *peer) mark(envelope *Envelope) {
+	self.known.Add(envelope.Hash())
 }
 
-func (self *peer) addKnown(envelope *Envelope) {
-	self.known.Add(envelope.Hash())
+// marked checks if an envelope is already known to the remote peer.
+func (self *peer) marked(envelope *Envelope) bool {
+	return self.known.Has(envelope.Hash())
 }
 
-func (self *peer) handleStatus() error {
-	ws := self.ws
-	if err := p2p.SendItems(ws, statusMsg, protocolVersion); err != nil {
-		return err
-	}
-	msg, err := ws.ReadMsg()
-	if err != nil {
-		return err
+// expire iterates over all the known envelopes in the host and removes all
+// expired (unknown) ones from the known list.
+func (self *peer) expire() {
+	// Assemble the list of available envelopes
+	available := set.NewNonTS()
+	for _, envelope := range self.host.envelopes() {
+		available.Add(envelope.Hash())
 	}
-	if msg.Code != statusMsg {
-		return fmt.Errorf("peer send %x before status msg", msg.Code)
-	}
-	s := rlp.NewStream(msg.Payload)
-	if _, err := s.List(); err != nil {
-		return fmt.Errorf("bad status message: %v", err)
+	// Cross reference availability with known status
+	unmark := make(map[common.Hash]struct{})
+	self.known.Each(func(v interface{}) bool {
+		if !available.Has(v.(common.Hash)) {
+			unmark[v.(common.Hash)] = struct{}{}
+		}
+		return true
+	})
+	// Dump all known but unavailable
+	for hash, _ := range unmark {
+		self.known.Remove(hash)
 	}
-	pv, err := s.Uint()
-	if err != nil {
-		return fmt.Errorf("bad status message: %v", err)
+}
+
+// broadcast iterates over the collection of envelopes and transmits yet unknown
+// ones over the network.
+func (self *peer) broadcast() error {
+	// Fetch the envelopes and collect the unknown ones
+	envelopes := self.host.envelopes()
+	transmit := make([]*Envelope, 0, len(envelopes))
+	for _, envelope := range envelopes {
+		if !self.marked(envelope) {
+			transmit = append(transmit, envelope)
+			self.mark(envelope)
+		}
 	}
-	if pv != protocolVersion {
-		return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion)
+	// Transmit the unknown batch (potentially empty)
+	if err := p2p.Send(self.ws, messagesCode, transmit); err != nil {
+		return err
 	}
-	return msg.Discard() // ignore anything after protocol version
+	self.peer.DebugDetailln("broadcasted", len(transmit), "message(s)")
+
+	return nil
 }
diff --git a/whisper/peer_test.go b/whisper/peer_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..9008cdc593405f6780f32f847ff7df86632214e8
--- /dev/null
+++ b/whisper/peer_test.go
@@ -0,0 +1,242 @@
+package whisper
+
+import (
+	"testing"
+	"time"
+
+	"github.com/ethereum/go-ethereum/p2p"
+	"github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+type testPeer struct {
+	client *Whisper
+	stream *p2p.MsgPipeRW
+	termed chan struct{}
+}
+
+func startTestPeer() *testPeer {
+	// Create a simulated P2P remote peer and data streams to it
+	remote := p2p.NewPeer(discover.NodeID{}, "", nil)
+	tester, tested := p2p.MsgPipe()
+
+	// Create a whisper client and connect with it to the tester peer
+	client := New()
+	client.Start()
+
+	termed := make(chan struct{})
+	go func() {
+		defer client.Stop()
+		defer close(termed)
+		defer tested.Close()
+
+		client.handlePeer(remote, tested)
+	}()
+
+	return &testPeer{
+		client: client,
+		stream: tester,
+		termed: termed,
+	}
+}
+
+func startTestPeerInited() (*testPeer, error) {
+	peer := startTestPeer()
+
+	if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil {
+		peer.stream.Close()
+		return nil, err
+	}
+	if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil {
+		peer.stream.Close()
+		return nil, err
+	}
+	return peer, nil
+}
+
+func TestPeerStatusMessage(t *testing.T) {
+	tester := startTestPeer()
+
+	// Wait for the handshake status message and check it
+	if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
+		t.Fatalf("status message mismatch: %v", err)
+	}
+	// Terminate the node
+	tester.stream.Close()
+
+	select {
+	case <-tester.termed:
+	case <-time.After(time.Second):
+		t.Fatalf("local close timed out")
+	}
+}
+
+func TestPeerHandshakeFail(t *testing.T) {
+	tester := startTestPeer()
+
+	// Wait for and check the handshake
+	if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
+		t.Fatalf("status message mismatch: %v", err)
+	}
+	// Send an invalid handshake status and verify disconnect
+	if err := p2p.SendItems(tester.stream, messagesCode); err != nil {
+		t.Fatalf("failed to send malformed status: %v", err)
+	}
+	select {
+	case <-tester.termed:
+	case <-time.After(time.Second):
+		t.Fatalf("remote close timed out")
+	}
+}
+
+func TestPeerHandshakeSuccess(t *testing.T) {
+	tester := startTestPeer()
+
+	// Wait for and check the handshake
+	if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
+		t.Fatalf("status message mismatch: %v", err)
+	}
+	// Send a valid handshake status and make sure connection stays live
+	if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil {
+		t.Fatalf("failed to send status: %v", err)
+	}
+	select {
+	case <-tester.termed:
+		t.Fatalf("valid handshake disconnected")
+
+	case <-time.After(100 * time.Millisecond):
+	}
+	// Clean up the test
+	tester.stream.Close()
+
+	select {
+	case <-tester.termed:
+	case <-time.After(time.Second):
+		t.Fatalf("local close timed out")
+	}
+}
+
+func TestPeerSend(t *testing.T) {
+	// Start a tester and execute the handshake
+	tester, err := startTestPeerInited()
+	if err != nil {
+		t.Fatalf("failed to start initialized peer: %v", err)
+	}
+	defer tester.stream.Close()
+
+	// Construct a message and inject into the tester
+	message := NewMessage([]byte("peer broadcast test message"))
+	envelope, err := message.Wrap(DefaultPoW, Options{
+		TTL: DefaultTTL,
+	})
+	if err != nil {
+		t.Fatalf("failed to wrap message: %v", err)
+	}
+	if err := tester.client.Send(envelope); err != nil {
+		t.Fatalf("failed to send message: %v", err)
+	}
+	// Check that the message is eventually forwarded
+	payload := []interface{}{envelope}
+	if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
+		t.Fatalf("message mismatch: %v", err)
+	}
+	// Make sure that even with a re-insert, an empty batch is received
+	if err := tester.client.Send(envelope); err != nil {
+		t.Fatalf("failed to send message: %v", err)
+	}
+	if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
+		t.Fatalf("message mismatch: %v", err)
+	}
+}
+
+func TestPeerDeliver(t *testing.T) {
+	// Start a tester and execute the handshake
+	tester, err := startTestPeerInited()
+	if err != nil {
+		t.Fatalf("failed to start initialized peer: %v", err)
+	}
+	defer tester.stream.Close()
+
+	// Watch for all inbound messages
+	arrived := make(chan struct{}, 1)
+	tester.client.Watch(Filter{
+		Fn: func(message *Message) {
+			arrived <- struct{}{}
+		},
+	})
+	// Construct a message and deliver it to the tester peer
+	message := NewMessage([]byte("peer broadcast test message"))
+	envelope, err := message.Wrap(DefaultPoW, Options{
+		TTL: DefaultTTL,
+	})
+	if err != nil {
+		t.Fatalf("failed to wrap message: %v", err)
+	}
+	if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
+		t.Fatalf("failed to transfer message: %v", err)
+	}
+	// Check that the message is delivered upstream
+	select {
+	case <-arrived:
+	case <-time.After(time.Second):
+		t.Fatalf("message delivery timeout")
+	}
+	// Check that a resend is not delivered
+	if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
+		t.Fatalf("failed to transfer message: %v", err)
+	}
+	select {
+	case <-time.After(2 * transmissionCycle):
+	case <-arrived:
+		t.Fatalf("repeating message arrived")
+	}
+}
+
+func TestPeerMessageExpiration(t *testing.T) {
+	// Start a tester and execute the handshake
+	tester, err := startTestPeerInited()
+	if err != nil {
+		t.Fatalf("failed to start initialized peer: %v", err)
+	}
+	defer tester.stream.Close()
+
+	// Fetch the peer instance for later inspection
+	tester.client.peerMu.RLock()
+	if peers := len(tester.client.peers); peers != 1 {
+		t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1)
+	}
+	var peer *peer
+	for peer, _ = range tester.client.peers {
+		break
+	}
+	tester.client.peerMu.RUnlock()
+
+	// Construct a message and pass it through the tester
+	message := NewMessage([]byte("peer test message"))
+	envelope, err := message.Wrap(DefaultPoW, Options{
+		TTL: time.Second,
+	})
+	if err != nil {
+		t.Fatalf("failed to wrap message: %v", err)
+	}
+	if err := tester.client.Send(envelope); err != nil {
+		t.Fatalf("failed to send message: %v", err)
+	}
+	payload := []interface{}{envelope}
+	if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
+		t.Fatalf("message mismatch: %v", err)
+	}
+	// Check that the message is inside the cache
+	if !peer.known.Has(envelope.Hash()) {
+		t.Fatalf("message not found in cache")
+	}
+	// Discard messages until expiration and check cache again
+	exp := time.Now().Add(time.Second + expirationCycle)
+	for time.Now().Before(exp) {
+		if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
+			t.Fatalf("message mismatch: %v", err)
+		}
+	}
+	if peer.known.Has(envelope.Hash()) {
+		t.Fatalf("message not expired from cache")
+	}
+}
diff --git a/whisper/sort.go b/whisper/sort.go
deleted file mode 100644
index 313ba5ac0a63650b3d1ac863c93de808742a04f8..0000000000000000000000000000000000000000
--- a/whisper/sort.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package whisper
-
-import (
-	"sort"
-
-	"github.com/ethereum/go-ethereum/common"
-)
-
-type sortedKeys struct {
-	k []int32
-}
-
-func (self *sortedKeys) Len() int           { return len(self.k) }
-func (self *sortedKeys) Less(i, j int) bool { return self.k[i] < self.k[j] }
-func (self *sortedKeys) Swap(i, j int)      { self.k[i], self.k[j] = self.k[j], self.k[i] }
-
-func sortKeys(m map[int32]common.Hash) []int32 {
-	sorted := new(sortedKeys)
-	sorted.k = make([]int32, len(m))
-	i := 0
-	for key, _ := range m {
-		sorted.k[i] = key
-		i++
-	}
-
-	sort.Sort(sorted)
-
-	return sorted.k
-}
diff --git a/whisper/sort_test.go b/whisper/sort_test.go
deleted file mode 100644
index a61fde4c2d41656dae2ae9c4f25aadaece40fd01..0000000000000000000000000000000000000000
--- a/whisper/sort_test.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package whisper
-
-import (
-	"testing"
-
-	"github.com/ethereum/go-ethereum/common"
-)
-
-func TestSorting(t *testing.T) {
-	m := map[int32]common.Hash{
-		1: {1},
-		3: {3},
-		2: {2},
-		5: {5},
-	}
-	exp := []int32{1, 2, 3, 5}
-	res := sortKeys(m)
-	for i, k := range res {
-		if k != exp[i] {
-			t.Error(k, "failed. Expected", exp[i])
-		}
-	}
-}
diff --git a/whisper/topic.go b/whisper/topic.go
new file mode 100644
index 0000000000000000000000000000000000000000..a965c7cc2aecd9bf8325b0c04661c07995ce78cb
--- /dev/null
+++ b/whisper/topic.go
@@ -0,0 +1,61 @@
+// Contains the Whisper protocol Topic element. For formal details please see
+// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#topics.
+
+package whisper
+
+import "github.com/ethereum/go-ethereum/crypto"
+
+// Topic represents a cryptographically secure, probabilistic partial
+// classifications of a message, determined as the first (left) 4 bytes of the
+// SHA3 hash of some arbitrary data given by the original author of the message.
+type Topic [4]byte
+
+// NewTopic creates a topic from the 4 byte prefix of the SHA3 hash of the data.
+func NewTopic(data []byte) Topic {
+	prefix := [4]byte{}
+	copy(prefix[:], crypto.Sha3(data)[:4])
+	return Topic(prefix)
+}
+
+// NewTopics creates a list of topics from a list of binary data elements, by
+// iteratively calling NewTopic on each of them.
+func NewTopics(data ...[]byte) []Topic {
+	topics := make([]Topic, len(data))
+	for i, element := range data {
+		topics[i] = NewTopic(element)
+	}
+	return topics
+}
+
+// NewTopicFromString creates a topic using the binary data contents of the
+// specified string.
+func NewTopicFromString(data string) Topic {
+	return NewTopic([]byte(data))
+}
+
+// NewTopicsFromStrings creates a list of topics from a list of textual data
+// elements, by iteratively calling NewTopicFromString on each of them.
+func NewTopicsFromStrings(data ...string) []Topic {
+	topics := make([]Topic, len(data))
+	for i, element := range data {
+		topics[i] = NewTopicFromString(element)
+	}
+	return topics
+}
+
+// String converts a topic byte array to a string representation.
+func (self *Topic) String() string {
+	return string(self[:])
+}
+
+// TopicSet represents a hash set to check if a topic exists or not.
+type topicSet map[string]struct{}
+
+// NewTopicSet creates a topic hash set from a slice of topics.
+func newTopicSet(topics []Topic) topicSet {
+	set := make(map[string]struct{})
+	for _, topic := range topics {
+		set[topic.String()] = struct{}{}
+	}
+	return topicSet(set)
+}
diff --git a/whisper/topic_test.go b/whisper/topic_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..4015079dcf32521a54d041074509c4bd2ca13d7d
--- /dev/null
+++ b/whisper/topic_test.go
@@ -0,0 +1,67 @@
+package whisper
+
+import (
+	"bytes"
+	"testing"
+)
+
+var topicCreationTests = []struct {
+	data []byte
+	hash [4]byte
+}{
+	{hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: nil},
+	{hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: []byte{}},
+	{hash: [4]byte{0x8f, 0x9a, 0x2b, 0x7d}, data: []byte("test name")},
+}
+
+func TestTopicCreation(t *testing.T) {
+	// Create the topics individually
+	for i, tt := range topicCreationTests {
+		topic := NewTopic(tt.data)
+		if bytes.Compare(topic[:], tt.hash[:]) != 0 {
+			t.Errorf("binary test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash)
+		}
+	}
+	for i, tt := range topicCreationTests {
+		topic := NewTopicFromString(string(tt.data))
+		if bytes.Compare(topic[:], tt.hash[:]) != 0 {
+			t.Errorf("textual test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash)
+		}
+	}
+	// Create the topics in batches
+	binaryData := make([][]byte, len(topicCreationTests))
+	for i, tt := range topicCreationTests {
+		binaryData[i] = tt.data
+	}
+	textualData := make([]string, len(topicCreationTests))
+	for i, tt := range topicCreationTests {
+		textualData[i] = string(tt.data)
+	}
+
+	topics := NewTopics(binaryData...)
+	for i, tt := range topicCreationTests {
+		if bytes.Compare(topics[i][:], tt.hash[:]) != 0 {
+			t.Errorf("binary batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash)
+		}
+	}
+	topics = NewTopicsFromStrings(textualData...)
+	for i, tt := range topicCreationTests {
+		if bytes.Compare(topics[i][:], tt.hash[:]) != 0 {
+			t.Errorf("textual batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash)
+		}
+	}
+}
+
+func TestTopicSetCreation(t *testing.T) {
+	topics := make([]Topic, len(topicCreationTests))
+	for i, tt := range topicCreationTests {
+		topics[i] = NewTopic(tt.data)
+	}
+	set := newTopicSet(topics)
+	for i, tt := range topicCreationTests {
+		topic := NewTopic(tt.data)
+		if _, ok := set[topic.String()]; !ok {
+			t.Errorf("topic %d: not found in set", i)
+		}
+	}
+}
diff --git a/whisper/util.go b/whisper/util.go
deleted file mode 100644
index 7a222395fe9ee1bafa4eaa777f9e06c8799912b6..0000000000000000000000000000000000000000
--- a/whisper/util.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package whisper
-
-import "github.com/ethereum/go-ethereum/crypto"
-
-func hashTopic(topic []byte) []byte {
-	return crypto.Sha3(topic)[:4]
-}
-
-// NOTE this isn't DRY, but I don't want to iterate twice.
-
-// Returns a formatted topics byte slice.
-// data: unformatted data (e.g., no hashes needed)
-func Topics(data [][]byte) [][]byte {
-	d := make([][]byte, len(data))
-	for i, byts := range data {
-		d[i] = hashTopic(byts)
-	}
-	return d
-}
-
-func TopicsFromString(data ...string) [][]byte {
-	d := make([][]byte, len(data))
-	for i, str := range data {
-		d[i] = hashTopic([]byte(str))
-	}
-	return d
-}
-
-func bytesToMap(s [][]byte) map[string]struct{} {
-	m := make(map[string]struct{})
-	for _, topic := range s {
-		m[string(topic)] = struct{}{}
-	}
-
-	return m
-}
diff --git a/whisper/whisper.go b/whisper/whisper.go
index d803e27d46fb5bd078890c989d36dd877123fc1b..9317fad50e9e43c6276653497ef91968d30aff26 100644
--- a/whisper/whisper.go
+++ b/whisper/whisper.go
@@ -2,7 +2,6 @@ package whisper
 
 import (
 	"crypto/ecdsa"
-	"errors"
 	"sync"
 	"time"
 
@@ -17,9 +16,22 @@ import (
 )
 
 const (
-	statusMsg      = 0x0
-	envelopesMsg   = 0x01
-	whisperVersion = 0x02
+	statusCode   = 0x00
+	messagesCode = 0x01
+
+	protocolVersion uint64 = 0x02
+	protocolName           = "shh"
+
+	signatureFlag   = byte(1 << 7)
+	signatureLength = 65
+
+	expirationCycle   = 800 * time.Millisecond
+	transmissionCycle = 300 * time.Millisecond
+)
+
+const (
+	DefaultTTL = 50 * time.Second
+	DefaultPoW = 50 * time.Millisecond
 )
 
 type MessageEvent struct {
@@ -28,250 +40,298 @@ type MessageEvent struct {
 	Message *Message
 }
 
-const (
-	DefaultTimeToLive  = 50 * time.Second
-	DefaultProofOfWork = 50 * time.Millisecond
-)
-
+// Whisper represents a dark communication interface through the Ethereum
+// network, using its very own P2P communication layer.
 type Whisper struct {
 	protocol p2p.Protocol
 	filters  *filter.Filters
 
-	mmu      sync.RWMutex
-	messages map[common.Hash]*Envelope
-	expiry   map[uint32]*set.SetNonTS
+	keys map[string]*ecdsa.PrivateKey
 
-	quit chan struct{}
+	messages    map[common.Hash]*Envelope // Pool of messages currently tracked by this node
+	expirations map[uint32]*set.SetNonTS  // Message expiration pool (TODO: something lighter)
+	poolMu      sync.RWMutex              // Mutex to sync the message and expiration pools
 
-	keys map[string]*ecdsa.PrivateKey
+	peers  map[*peer]struct{} // Set of currently active peers
+	peerMu sync.RWMutex       // Mutex to sync the active peer set
+
+	quit chan struct{}
 }
 
 func New() *Whisper {
 	whisper := &Whisper{
-		messages: make(map[common.Hash]*Envelope),
-		filters:  filter.New(),
-		expiry:   make(map[uint32]*set.SetNonTS),
-		quit:     make(chan struct{}),
-		keys:     make(map[string]*ecdsa.PrivateKey),
+		filters:     filter.New(),
+		keys:        make(map[string]*ecdsa.PrivateKey),
+		messages:    make(map[common.Hash]*Envelope),
+		expirations: make(map[uint32]*set.SetNonTS),
+		peers:       make(map[*peer]struct{}),
+		quit:        make(chan struct{}),
 	}
 	whisper.filters.Start()
 
 	// p2p whisper sub protocol handler
 	whisper.protocol = p2p.Protocol{
-		Name:    "shh",
-		Version: uint(whisperVersion),
+		Name:    protocolName,
+		Version: uint(protocolVersion),
 		Length:  2,
-		Run:     whisper.msgHandler,
+		Run:     whisper.handlePeer,
 	}
 
 	return whisper
 }
 
-func (self *Whisper) Version() uint {
-	return self.protocol.Version
-}
-
-func (self *Whisper) Start() {
-	glog.V(logger.Info).Infoln("Whisper started")
-	go self.update()
-}
-
-func (self *Whisper) Stop() {
-	close(self.quit)
+// Protocol returns the whisper sub-protocol handler for this particular client.
+func (self *Whisper) Protocol() p2p.Protocol {
+	return self.protocol
 }
 
-func (self *Whisper) Send(envelope *Envelope) error {
-	return self.add(envelope)
+// Version returns the whisper sub-protocols version number.
+func (self *Whisper) Version() uint {
+	return self.protocol.Version
 }
 
+// NewIdentity generates a new cryptographic identity for the client, and injects
+// it into the known identities for message decryption.
 func (self *Whisper) NewIdentity() *ecdsa.PrivateKey {
 	key, err := crypto.GenerateKey()
 	if err != nil {
 		panic(err)
 	}
-
 	self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key
 
 	return key
 }
 
+// HasIdentity checks if the the whisper node is configured with the private key
+// of the specified public pair.
 func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool {
 	return self.keys[string(crypto.FromECDSAPub(key))] != nil
 }
 
+// GetIdentity retrieves the private key of the specified public identity.
 func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey {
 	return self.keys[string(crypto.FromECDSAPub(key))]
 }
 
-// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool {
-// 	k := string(crypto.FromECDSAPub(key))
-// 	if _, ok := self.keys[k]; ok {
-// 		delete(self.keys, k)
-// 		return true
-// 	}
-// 	return false
-// }
-
-func (self *Whisper) Watch(opts Filter) int {
-	return self.filters.Install(filter.Generic{
-		Str1: string(crypto.FromECDSAPub(opts.To)),
-		Str2: string(crypto.FromECDSAPub(opts.From)),
-		Data: bytesToMap(opts.Topics),
+// Watch installs a new message handler to run in case a matching packet arrives
+// from the whisper network.
+func (self *Whisper) Watch(options Filter) int {
+	filter := filter.Generic{
+		Str1: string(crypto.FromECDSAPub(options.To)),
+		Str2: string(crypto.FromECDSAPub(options.From)),
+		Data: newTopicSet(options.Topics),
 		Fn: func(data interface{}) {
-			opts.Fn(data.(*Message))
+			options.Fn(data.(*Message))
 		},
-	})
+	}
+	return self.filters.Install(filter)
 }
 
+// Unwatch removes an installed message handler.
 func (self *Whisper) Unwatch(id int) {
 	self.filters.Uninstall(id)
 }
 
-func (self *Whisper) Messages(id int) (messages []*Message) {
-	filter := self.filters.Get(id)
-	if filter != nil {
-		for _, e := range self.messages {
-			if msg, key := self.open(e); msg != nil {
-				f := createFilter(msg, e.Topics, key)
-				if self.filters.Match(filter, f) {
-					messages = append(messages, msg)
+// Send injects a message into the whisper send queue, to be distributed in the
+// network in the coming cycles.
+func (self *Whisper) Send(envelope *Envelope) error {
+	return self.add(envelope)
+}
+
+func (self *Whisper) Start() {
+	glog.V(logger.Info).Infoln("Whisper started")
+	go self.update()
+}
+
+func (self *Whisper) Stop() {
+	close(self.quit)
+	glog.V(logger.Info).Infoln("Whisper stopped")
+}
+
+// Messages retrieves the currently pooled messages matching a filter id.
+func (self *Whisper) Messages(id int) []*Message {
+	messages := make([]*Message, 0)
+	if filter := self.filters.Get(id); filter != nil {
+		for _, envelope := range self.messages {
+			if message := self.open(envelope); message != nil {
+				if self.filters.Match(filter, createFilter(message, envelope.Topics)) {
+					messages = append(messages, message)
 				}
 			}
 		}
 	}
-
-	return
+	return messages
 }
 
-// Main handler for passing whisper messages to whisper peer objects
-func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
-	wpeer := NewPeer(self, peer, ws)
-	// initialise whisper peer (handshake/status)
-	if err := wpeer.init(); err != nil {
+// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool {
+// 	k := string(crypto.FromECDSAPub(key))
+// 	if _, ok := self.keys[k]; ok {
+// 		delete(self.keys, k)
+// 		return true
+// 	}
+// 	return false
+// }
+
+// handlePeer is called by the underlying P2P layer when the whisper sub-protocol
+// connection is negotiated.
+func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
+	// Create, initialize and start the whisper peer
+	whisperPeer, err := newPeer(self, peer, rw)
+	if err != nil {
 		return err
 	}
-	// kick of the main handler for broadcasting/managing envelopes
-	go wpeer.start()
-	defer wpeer.stop()
-
-	// Main *read* loop. Writing is done by the peer it self.
+	whisperPeer.start()
+	defer whisperPeer.stop()
+
+	// Start tracking the active peer
+	self.peerMu.Lock()
+	self.peers[whisperPeer] = struct{}{}
+	self.peerMu.Unlock()
+
+	defer func() {
+		self.peerMu.Lock()
+		delete(self.peers, whisperPeer)
+		self.peerMu.Unlock()
+	}()
+	// Read and process inbound messages directly to merge into client-global state
 	for {
-		msg, err := ws.ReadMsg()
+		// Fetch the next packet and decode the contained envelopes
+		packet, err := rw.ReadMsg()
 		if err != nil {
 			return err
 		}
-
 		var envelopes []*Envelope
-		if err := msg.Decode(&envelopes); err != nil {
-			peer.Infoln(err)
+		if err := packet.Decode(&envelopes); err != nil {
+			peer.Infof("failed to decode enveloped: %v", err)
 			continue
 		}
-
+		// Inject all envelopes into the internal pool
 		for _, envelope := range envelopes {
 			if err := self.add(envelope); err != nil {
 				// TODO Punish peer here. Invalid envelope.
-				peer.Debugln(err)
+				peer.Debugf("failed to pool envelope: %f", err)
 			}
-			wpeer.addKnown(envelope)
+			whisperPeer.mark(envelope)
 		}
 	}
 }
 
-// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed.
+// add inserts a new envelope into the message pool to be distributed within the
+// whisper network. It also inserts the envelope into the expiration pool at the
+// appropriate time-stamp.
 func (self *Whisper) add(envelope *Envelope) error {
-	if !envelope.valid() {
-		return errors.New("invalid pow provided for envelope")
-	}
-
-	self.mmu.Lock()
-	defer self.mmu.Unlock()
+	self.poolMu.Lock()
+	defer self.poolMu.Unlock()
 
+	// Insert the message into the tracked pool
 	hash := envelope.Hash()
+	if _, ok := self.messages[hash]; ok {
+		glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope)
+		return nil
+	}
 	self.messages[hash] = envelope
-	if self.expiry[envelope.Expiry] == nil {
-		self.expiry[envelope.Expiry] = set.NewNonTS()
+
+	// Insert the message into the expiration pool for later removal
+	if self.expirations[envelope.Expiry] == nil {
+		self.expirations[envelope.Expiry] = set.NewNonTS()
 	}
+	if !self.expirations[envelope.Expiry].Has(hash) {
+		self.expirations[envelope.Expiry].Add(hash)
 
-	if !self.expiry[envelope.Expiry].Has(hash) {
-		self.expiry[envelope.Expiry].Add(hash)
+		// Notify the local node of a message arrival
 		go self.postEvent(envelope)
 	}
+	glog.V(logger.Detail).Infof("cached whisper envelope %x\n", envelope)
 
-	glog.V(logger.Detail).Infof("added whisper envelope %x\n", envelope)
+	return nil
+}
+
+// postEvent opens an envelope with the configured identities and delivers the
+// message upstream from application processing.
+func (self *Whisper) postEvent(envelope *Envelope) {
+	if message := self.open(envelope); message != nil {
+		self.filters.Notify(createFilter(message, envelope.Topics), message)
+	}
+}
 
+// open tries to decrypt a whisper envelope with all the configured identities,
+// returning the decrypted message and the key used to achieve it. If not keys
+// are configured, open will return the payload as if non encrypted.
+func (self *Whisper) open(envelope *Envelope) *Message {
+	// Short circuit if no identity is set, and assume clear-text
+	if len(self.keys) == 0 {
+		if message, err := envelope.Open(nil); err == nil {
+			return message
+		}
+	}
+	// Iterate over the keys and try to decrypt the message
+	for _, key := range self.keys {
+		message, err := envelope.Open(key)
+		if err == nil || err == ecies.ErrInvalidPublicKey {
+			message.To = &key.PublicKey
+			return message
+		}
+	}
+	// Failed to decrypt, don't return anything
 	return nil
 }
 
+// createFilter creates a message filter to check against installed handlers.
+func createFilter(message *Message, topics []Topic) filter.Filter {
+	return filter.Generic{
+		Str1: string(crypto.FromECDSAPub(message.To)),
+		Str2: string(crypto.FromECDSAPub(message.Recover())),
+		Data: newTopicSet(topics),
+	}
+}
+
+// update loops until the lifetime of the whisper node, updating its internal
+// state by expiring stale messages from the pool.
 func (self *Whisper) update() {
-	expire := time.NewTicker(800 * time.Millisecond)
-out:
+	// Start a ticker to check for expirations
+	expire := time.NewTicker(expirationCycle)
+
+	// Repeat updates until termination is requested
 	for {
 		select {
 		case <-expire.C:
 			self.expire()
+
 		case <-self.quit:
-			break out
+			return
 		}
 	}
 }
 
+// expire iterates over all the expiration timestamps, removing all stale
+// messages from the pools.
 func (self *Whisper) expire() {
-	self.mmu.Lock()
-	defer self.mmu.Unlock()
+	self.poolMu.Lock()
+	defer self.poolMu.Unlock()
 
 	now := uint32(time.Now().Unix())
-	for then, hashSet := range self.expiry {
+	for then, hashSet := range self.expirations {
+		// Short circuit if a future time
 		if then > now {
 			continue
 		}
-
+		// Dump all expired messages and remove timestamp
 		hashSet.Each(func(v interface{}) bool {
 			delete(self.messages, v.(common.Hash))
 			return true
 		})
-		self.expiry[then].Clear()
+		self.expirations[then].Clear()
 	}
 }
 
-func (self *Whisper) envelopes() (envelopes []*Envelope) {
-	self.mmu.RLock()
-	defer self.mmu.RUnlock()
+// envelopes retrieves all the messages currently pooled by the node.
+func (self *Whisper) envelopes() []*Envelope {
+	self.poolMu.RLock()
+	defer self.poolMu.RUnlock()
 
-	envelopes = make([]*Envelope, len(self.messages))
-	i := 0
+	envelopes := make([]*Envelope, 0, len(self.messages))
 	for _, envelope := range self.messages {
-		envelopes[i] = envelope
-		i++
-	}
-
-	return
-}
-
-func (self *Whisper) postEvent(envelope *Envelope) {
-	if message, key := self.open(envelope); message != nil {
-		self.filters.Notify(createFilter(message, envelope.Topics, key), message)
-	}
-}
-
-func (self *Whisper) open(envelope *Envelope) (*Message, *ecdsa.PrivateKey) {
-	for _, key := range self.keys {
-		if message, err := envelope.Open(key); err == nil || (err != nil && err == ecies.ErrInvalidPublicKey) {
-			message.To = &key.PublicKey
-
-			return message, key
-		}
-	}
-
-	return nil, nil
-}
-
-func (self *Whisper) Protocol() p2p.Protocol {
-	return self.protocol
-}
-
-func createFilter(message *Message, topics [][]byte, key *ecdsa.PrivateKey) filter.Filter {
-	return filter.Generic{
-		Str1: string(crypto.FromECDSAPub(&key.PublicKey)), Str2: string(crypto.FromECDSAPub(message.Recover())),
-		Data: bytesToMap(topics),
+		envelopes = append(envelopes, envelope)
 	}
+	return envelopes
 }
diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go
index b29e34a5e32846e2b928af50f16af181e32de057..def8e68d8bc0081b1c3791bd4af46c95f500cf06 100644
--- a/whisper/whisper_test.go
+++ b/whisper/whisper_test.go
@@ -1,38 +1,185 @@
 package whisper
 
 import (
-	"fmt"
 	"testing"
 	"time"
+
+	"github.com/ethereum/go-ethereum/p2p"
+	"github.com/ethereum/go-ethereum/p2p/discover"
 )
 
-func TestEvent(t *testing.T) {
-	res := make(chan *Message, 1)
-	whisper := New()
-	id := whisper.NewIdentity()
-	whisper.Watch(Filter{
-		To: &id.PublicKey,
+func startTestCluster(n int) []*Whisper {
+	// Create the batch of simulated peers
+	nodes := make([]*p2p.Peer, n)
+	for i := 0; i < n; i++ {
+		nodes[i] = p2p.NewPeer(discover.NodeID{}, "", nil)
+	}
+	whispers := make([]*Whisper, n)
+	for i := 0; i < n; i++ {
+		whispers[i] = New()
+		whispers[i].Start()
+	}
+	// Wire all the peers to the root one
+	for i := 1; i < n; i++ {
+		src, dst := p2p.MsgPipe()
+
+		go whispers[0].handlePeer(nodes[i], src)
+		go whispers[i].handlePeer(nodes[0], dst)
+	}
+	return whispers
+}
+
+func TestSelfMessage(t *testing.T) {
+	// Start the single node cluster
+	client := startTestCluster(1)[0]
+
+	// Start watching for self messages, signal any arrivals
+	self := client.NewIdentity()
+	done := make(chan struct{})
+
+	client.Watch(Filter{
+		To: &self.PublicKey,
 		Fn: func(msg *Message) {
-			res <- msg
+			close(done)
 		},
 	})
-
-	msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now())))
-	envelope, err := msg.Wrap(DefaultProofOfWork, Options{
-		TTL:  DefaultTimeToLive,
-		From: id,
-		To:   &id.PublicKey,
+	// Send a dummy message to oneself
+	msg := NewMessage([]byte("self whisper"))
+	envelope, err := msg.Wrap(DefaultPoW, Options{
+		From: self,
+		To:   &self.PublicKey,
+		TTL:  DefaultTTL,
 	})
 	if err != nil {
-		fmt.Println(err)
-		t.FailNow()
+		t.Fatalf("failed to wrap message: %v", err)
+	}
+	// Dump the message into the system and wait for it to pop back out
+	if err := client.Send(envelope); err != nil {
+		t.Fatalf("failed to send self-message: %v", err)
+	}
+	select {
+	case <-done:
+	case <-time.After(time.Second):
+		t.Fatalf("self-message receive timeout")
 	}
+}
+
+func TestDirectMessage(t *testing.T) {
+	// Start the sender-recipient cluster
+	cluster := startTestCluster(2)
 
-	tick := time.NewTicker(time.Second)
-	whisper.postEvent(envelope)
+	sender := cluster[0]
+	senderId := sender.NewIdentity()
+
+	recipient := cluster[1]
+	recipientId := recipient.NewIdentity()
+
+	// Watch for arriving messages on the recipient
+	done := make(chan struct{})
+	recipient.Watch(Filter{
+		To: &recipientId.PublicKey,
+		Fn: func(msg *Message) {
+			close(done)
+		},
+	})
+	// Send a dummy message from the sender
+	msg := NewMessage([]byte("direct whisper"))
+	envelope, err := msg.Wrap(DefaultPoW, Options{
+		From: senderId,
+		To:   &recipientId.PublicKey,
+		TTL:  DefaultTTL,
+	})
+	if err != nil {
+		t.Fatalf("failed to wrap message: %v", err)
+	}
+	if err := sender.Send(envelope); err != nil {
+		t.Fatalf("failed to send direct message: %v", err)
+	}
+	// Wait for an arrival or a timeout
 	select {
-	case <-res:
-	case <-tick.C:
-		t.Error("did not receive message")
+	case <-done:
+	case <-time.After(time.Second):
+		t.Fatalf("direct message receive timeout")
+	}
+}
+
+func TestAnonymousBroadcast(t *testing.T) {
+	testBroadcast(true, t)
+}
+
+func TestIdentifiedBroadcast(t *testing.T) {
+	testBroadcast(false, t)
+}
+
+func testBroadcast(anonymous bool, t *testing.T) {
+	// Start the single sender multi recipient cluster
+	cluster := startTestCluster(3)
+
+	sender := cluster[1]
+	targets := cluster[1:]
+	for _, target := range targets {
+		if !anonymous {
+			target.NewIdentity()
+		}
+	}
+	// Watch for arriving messages on the recipients
+	dones := make([]chan struct{}, len(targets))
+	for i := 0; i < len(targets); i++ {
+		done := make(chan struct{}) // need for the closure
+		dones[i] = done
+
+		targets[i].Watch(Filter{
+			Topics: NewTopicsFromStrings("broadcast topic"),
+			Fn: func(msg *Message) {
+				close(done)
+			},
+		})
+	}
+	// Send a dummy message from the sender
+	msg := NewMessage([]byte("broadcast whisper"))
+	envelope, err := msg.Wrap(DefaultPoW, Options{
+		Topics: NewTopicsFromStrings("broadcast topic"),
+		TTL:    DefaultTTL,
+	})
+	if err != nil {
+		t.Fatalf("failed to wrap message: %v", err)
+	}
+	if err := sender.Send(envelope); err != nil {
+		t.Fatalf("failed to send broadcast message: %v", err)
+	}
+	// Wait for an arrival on each recipient, or timeouts
+	timeout := time.After(time.Second)
+	for _, done := range dones {
+		select {
+		case <-done:
+		case <-timeout:
+			t.Fatalf("broadcast message receive timeout")
+		}
+	}
+}
+
+func TestMessageExpiration(t *testing.T) {
+	// Start the single node cluster and inject a dummy message
+	node := startTestCluster(1)[0]
+
+	message := NewMessage([]byte("expiring message"))
+	envelope, err := message.Wrap(DefaultPoW, Options{
+		TTL: time.Second,
+	})
+	if err != nil {
+		t.Fatalf("failed to wrap message: %v", err)
+	}
+	if err := node.Send(envelope); err != nil {
+		t.Fatalf("failed to inject message: %v", err)
+	}
+	// Check that the message is inside the cache
+	if _, ok := node.messages[envelope.Hash()]; !ok {
+		t.Fatalf("message not found in cache")
+	}
+	// Wait for expiration and check cache again
+	time.Sleep(time.Second)     // wait for expiration
+	time.Sleep(expirationCycle) // wait for cleanup cycle
+	if _, ok := node.messages[envelope.Hash()]; ok {
+		t.Fatalf("message not expired from cache")
 	}
 }
diff --git a/xeth/whisper.go b/xeth/whisper.go
index 51caec8d62ae25bfe585cf312ff0b15934a10e57..342910b5c310bea6559b9313a1f57820fde94df2 100644
--- a/xeth/whisper.go
+++ b/xeth/whisper.go
@@ -36,7 +36,7 @@ func (self *Whisper) Post(payload string, to, from string, topics []string, prio
 			TTL:    time.Duration(ttl) * time.Second,
 			To:     crypto.ToECDSAPub(common.FromHex(to)),
 			From:   key,
-			Topics: whisper.TopicsFromString(topics...),
+			Topics: whisper.NewTopicsFromStrings(topics...),
 		})
 
 		if err != nil {
@@ -71,7 +71,7 @@ func (self *Whisper) Watch(opts *Options) int {
 	filter := whisper.Filter{
 		To:     crypto.ToECDSAPub(common.FromHex(opts.To)),
 		From:   crypto.ToECDSAPub(common.FromHex(opts.From)),
-		Topics: whisper.TopicsFromString(opts.Topics...),
+		Topics: whisper.NewTopicsFromStrings(opts.Topics...),
 	}
 
 	var i int