From 91b769042857f542b2792b23ec407e1c9bd4fe8d Mon Sep 17 00:00:00 2001
From: Felix Lange <fjl@twurst.com>
Date: Tue, 12 Jul 2016 17:47:15 +0200
Subject: [PATCH] rpc: add new client, use it everywhere

The new client implementation supports concurrent requests,
subscriptions and replaces the various ad hoc RPC clients
throughout go-ethereum.
---
 accounts/abi/bind/backends/remote.go | 160 +-----
 cmd/geth/consolecmd.go               |  19 +-
 cmd/geth/monitorcmd.go               |  39 +-
 cmd/utils/client.go                  |  55 --
 console/bridge.go                    | 174 +++----
 console/console.go                   |   4 +-
 internal/jsre/pretty.go              |   2 +-
 node/node.go                         |   6 +-
 node/node_test.go                    |  34 +-
 rpc/client.go                        | 740 +++++++++++++++++++++++++++
 rpc/client_context_go1.4.go          |  60 +++
 rpc/client_context_go1.5.go          |  61 +++
 rpc/client_context_go1.6.go          |  55 ++
 rpc/client_context_go1.7.go          |  51 ++
 rpc/client_example_test.go           |  83 +++
 rpc/client_test.go                   | 489 ++++++++++++++++++
 rpc/errors.go                        |  63 +--
 rpc/http.go                          | 163 +++---
 rpc/inproc.go                        |  49 +-
 rpc/ipc.go                           |  79 +--
 rpc/ipc_unix.go                      |   6 +-
 rpc/ipc_windows.go                   |  15 +-
 rpc/json.go                          |  61 ++-
 rpc/notification.go                  |   2 +-
 rpc/notification_test.go             |  45 +-
 rpc/server.go                        |   2 +-
 rpc/server_test.go                   |  14 +-
 rpc/types.go                         |  33 +-
 rpc/utils.go                         |  29 --
 rpc/websocket.go                     | 160 +++---
 30 files changed, 2002 insertions(+), 751 deletions(-)
 delete mode 100644 cmd/utils/client.go
 create mode 100644 rpc/client.go
 create mode 100644 rpc/client_context_go1.4.go
 create mode 100644 rpc/client_context_go1.5.go
 create mode 100644 rpc/client_context_go1.6.go
 create mode 100644 rpc/client_context_go1.7.go
 create mode 100644 rpc/client_example_test.go
 create mode 100644 rpc/client_test.go

diff --git a/accounts/abi/bind/backends/remote.go b/accounts/abi/bind/backends/remote.go
index 4793143e4..58edd791a 100644
--- a/accounts/abi/bind/backends/remote.go
+++ b/accounts/abi/bind/backends/remote.go
@@ -17,11 +17,7 @@
 package backends
 
 import (
-	"encoding/json"
-	"fmt"
 	"math/big"
-	"sync"
-	"sync/atomic"
 
 	"github.com/ethereum/go-ethereum/accounts/abi/bind"
 	"github.com/ethereum/go-ethereum/common"
@@ -37,119 +33,34 @@ var _ bind.ContractBackend = (*rpcBackend)(nil)
 // rpcBackend implements bind.ContractBackend, and acts as the data provider to
 // Ethereum contracts bound to Go structs. It uses an RPC connection to delegate
 // all its functionality.
-//
-// Note: The current implementation is a blocking one. This should be replaced
-// by a proper async version when a real RPC client is created.
 type rpcBackend struct {
-	client rpc.Client // RPC client connection to interact with an API server
-	autoid uint32     // ID number to use for the next API request
-	lock   sync.Mutex // Singleton access until we get to request multiplexing
+	client *rpc.Client // RPC client connection to interact with an API server
 }
 
 // NewRPCBackend creates a new binding backend to an RPC provider that can be
 // used to interact with remote contracts.
-func NewRPCBackend(client rpc.Client) bind.ContractBackend {
-	return &rpcBackend{
-		client: client,
-	}
-}
-
-// request is a JSON RPC request package assembled internally from the client
-// method calls.
-type request struct {
-	JSONRPC string        `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
-	ID      int           `json:"id"`      // Auto incrementing ID number for this request
-	Method  string        `json:"method"`  // Remote procedure name to invoke on the server
-	Params  []interface{} `json:"params"`  // List of parameters to pass through (keep types simple)
-}
-
-// response is a JSON RPC response package sent back from the API server.
-type response struct {
-	JSONRPC string          `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
-	ID      int             `json:"id"`      // Auto incrementing ID number for this request
-	Error   *failure        `json:"error"`   // Any error returned by the remote side
-	Result  json.RawMessage `json:"result"`  // Whatever the remote side sends us in reply
-}
-
-// failure is a JSON RPC response error field sent back from the API server.
-type failure struct {
-	Code    int    `json:"code"`    // JSON RPC error code associated with the failure
-	Message string `json:"message"` // Specific error message of the failure
-}
-
-// request forwards an API request to the RPC server, and parses the response.
-//
-// This is currently painfully non-concurrent, but it will have to do until we
-// find the time for niceties like this :P
-func (b *rpcBackend) request(ctx context.Context, method string, params []interface{}) (json.RawMessage, error) {
-	b.lock.Lock()
-	defer b.lock.Unlock()
-
-	if ctx == nil {
-		ctx = context.Background()
-	}
-
-	// Ugly hack to serialize an empty list properly
-	if params == nil {
-		params = []interface{}{}
-	}
-	// Assemble the request object
-	reqID := int(atomic.AddUint32(&b.autoid, 1))
-	req := &request{
-		JSONRPC: "2.0",
-		ID:      reqID,
-		Method:  method,
-		Params:  params,
-	}
-	if err := b.client.Send(req); err != nil {
-		return nil, err
-	}
-	res := new(response)
-	errc := make(chan error, 1)
-	go func() {
-		errc <- b.client.Recv(res)
-	}()
-	select {
-	case err := <-errc:
-		if err != nil {
-			return nil, err
-		}
-	case <-ctx.Done():
-		return nil, ctx.Err()
-	}
-	if res.Error != nil {
-		if res.Error.Message == bind.ErrNoCode.Error() {
-			return nil, bind.ErrNoCode
-		}
-		return nil, fmt.Errorf("remote error: %s", res.Error.Message)
-	}
-	return res.Result, nil
+func NewRPCBackend(client *rpc.Client) bind.ContractBackend {
+	return &rpcBackend{client: client}
 }
 
 // HasCode implements ContractVerifier.HasCode by retrieving any code associated
 // with the contract from the remote node, and checking its size.
 func (b *rpcBackend) HasCode(ctx context.Context, contract common.Address, pending bool) (bool, error) {
-	// Execute the RPC code retrieval
 	block := "latest"
 	if pending {
 		block = "pending"
 	}
-	res, err := b.request(ctx, "eth_getCode", []interface{}{contract.Hex(), block})
-	if err != nil {
-		return false, err
-	}
 	var hex string
-	if err := json.Unmarshal(res, &hex); err != nil {
+	err := b.client.CallContext(ctx, &hex, "eth_getCode", contract, block)
+	if err != nil {
 		return false, err
 	}
-	// Convert the response back to a Go byte slice and return
 	return len(common.FromHex(hex)) > 0, nil
 }
 
 // ContractCall implements ContractCaller.ContractCall, delegating the execution of
 // a contract call to the remote node, returning the reply to for local processing.
 func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address, data []byte, pending bool) ([]byte, error) {
-	// Pack up the request into an RPC argument
 	args := struct {
 		To   common.Address `json:"to"`
 		Data string         `json:"data"`
@@ -157,63 +68,43 @@ func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address,
 		To:   contract,
 		Data: common.ToHex(data),
 	}
-	// Execute the RPC call and retrieve the response
 	block := "latest"
 	if pending {
 		block = "pending"
 	}
-	res, err := b.request(ctx, "eth_call", []interface{}{args, block})
-	if err != nil {
-		return nil, err
-	}
 	var hex string
-	if err := json.Unmarshal(res, &hex); err != nil {
+	err := b.client.CallContext(ctx, &hex, "eth_call", args, block)
+	if err != nil {
 		return nil, err
 	}
-	// Convert the response back to a Go byte slice and return
 	return common.FromHex(hex), nil
+
 }
 
 // PendingAccountNonce implements ContractTransactor.PendingAccountNonce, delegating
 // the current account nonce retrieval to the remote node.
 func (b *rpcBackend) PendingAccountNonce(ctx context.Context, account common.Address) (uint64, error) {
-	res, err := b.request(ctx, "eth_getTransactionCount", []interface{}{account.Hex(), "pending"})
+	var hex rpc.HexNumber
+	err := b.client.CallContext(ctx, &hex, "eth_getTransactionCount", account.Hex(), "pending")
 	if err != nil {
 		return 0, err
 	}
-	var hex string
-	if err := json.Unmarshal(res, &hex); err != nil {
-		return 0, err
-	}
-	nonce, ok := new(big.Int).SetString(hex, 0)
-	if !ok {
-		return 0, fmt.Errorf("invalid nonce hex: %s", hex)
-	}
-	return nonce.Uint64(), nil
+	return hex.Uint64(), nil
 }
 
 // SuggestGasPrice implements ContractTransactor.SuggestGasPrice, delegating the
 // gas price oracle request to the remote node.
 func (b *rpcBackend) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
-	res, err := b.request(ctx, "eth_gasPrice", nil)
-	if err != nil {
-		return nil, err
-	}
-	var hex string
-	if err := json.Unmarshal(res, &hex); err != nil {
+	var hex rpc.HexNumber
+	if err := b.client.CallContext(ctx, &hex, "eth_gasPrice"); err != nil {
 		return nil, err
 	}
-	price, ok := new(big.Int).SetString(hex, 0)
-	if !ok {
-		return nil, fmt.Errorf("invalid price hex: %s", hex)
-	}
-	return price, nil
+	return (*big.Int)(&hex), nil
 }
 
 // EstimateGasLimit implements ContractTransactor.EstimateGasLimit, delegating
 // the gas estimation to the remote node.
 func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address, contract *common.Address, value *big.Int, data []byte) (*big.Int, error) {
-	// Pack up the request into an RPC argument
 	args := struct {
 		From  common.Address  `json:"from"`
 		To    *common.Address `json:"to"`
@@ -226,19 +117,12 @@ func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address
 		Value: rpc.NewHexNumber(value),
 	}
 	// Execute the RPC call and retrieve the response
-	res, err := b.request(ctx, "eth_estimateGas", []interface{}{args})
+	var hex rpc.HexNumber
+	err := b.client.CallContext(ctx, &hex, "eth_estimateGas", args)
 	if err != nil {
 		return nil, err
 	}
-	var hex string
-	if err := json.Unmarshal(res, &hex); err != nil {
-		return nil, err
-	}
-	estimate, ok := new(big.Int).SetString(hex, 0)
-	if !ok {
-		return nil, fmt.Errorf("invalid estimate hex: %s", hex)
-	}
-	return estimate, nil
+	return (*big.Int)(&hex), nil
 }
 
 // SendTransaction implements ContractTransactor.SendTransaction, delegating the
@@ -248,13 +132,5 @@ func (b *rpcBackend) SendTransaction(ctx context.Context, tx *types.Transaction)
 	if err != nil {
 		return err
 	}
-	res, err := b.request(ctx, "eth_sendRawTransaction", []interface{}{common.ToHex(data)})
-	if err != nil {
-		return err
-	}
-	var hex string
-	if err := json.Unmarshal(res, &hex); err != nil {
-		return err
-	}
-	return nil
+	return b.client.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data))
 }
diff --git a/cmd/geth/consolecmd.go b/cmd/geth/consolecmd.go
index 257050a62..8d53809ce 100644
--- a/cmd/geth/consolecmd.go
+++ b/cmd/geth/consolecmd.go
@@ -19,9 +19,12 @@ package main
 import (
 	"os"
 	"os/signal"
+	"strings"
 
 	"github.com/ethereum/go-ethereum/cmd/utils"
 	"github.com/ethereum/go-ethereum/console"
+	"github.com/ethereum/go-ethereum/node"
+	"github.com/ethereum/go-ethereum/rpc"
 	"gopkg.in/urfave/cli.v1"
 )
 
@@ -99,7 +102,7 @@ func localConsole(ctx *cli.Context) error {
 // console to it.
 func remoteConsole(ctx *cli.Context) error {
 	// Attach to a remotely running geth instance and start the JavaScript console
-	client, err := utils.NewRemoteRPCClient(ctx)
+	client, err := dialRPC(ctx.Args().First())
 	if err != nil {
 		utils.Fatalf("Unable to attach to remote geth: %v", err)
 	}
@@ -127,6 +130,20 @@ func remoteConsole(ctx *cli.Context) error {
 	return nil
 }
 
+// dialRPC returns a RPC client which connects to the given endpoint.
+// The check for empty endpoint implements the defaulting logic
+// for "geth attach" and "geth monitor" with no argument.
+func dialRPC(endpoint string) (*rpc.Client, error) {
+	if endpoint == "" {
+		endpoint = node.DefaultIPCEndpoint()
+	} else if strings.HasPrefix(endpoint, "rpc:") || strings.HasPrefix(endpoint, "ipc:") {
+		// Backwards compatibility with geth < 1.5 which required
+		// these prefixes.
+		endpoint = endpoint[4:]
+	}
+	return rpc.Dial(endpoint)
+}
+
 // ephemeralConsole starts a new geth node, attaches an ephemeral JavaScript
 // console to it, and each of the files specified as arguments and tears the
 // everything down.
diff --git a/cmd/geth/monitorcmd.go b/cmd/geth/monitorcmd.go
index 11fdca89c..d1490dce2 100644
--- a/cmd/geth/monitorcmd.go
+++ b/cmd/geth/monitorcmd.go
@@ -21,11 +21,10 @@ import (
 	"math"
 	"reflect"
 	"runtime"
+	"sort"
 	"strings"
 	"time"
 
-	"sort"
-
 	"github.com/ethereum/go-ethereum/cmd/utils"
 	"github.com/ethereum/go-ethereum/node"
 	"github.com/ethereum/go-ethereum/rpc"
@@ -36,7 +35,7 @@ import (
 var (
 	monitorCommandAttachFlag = cli.StringFlag{
 		Name:  "attach",
-		Value: "ipc:" + node.DefaultIPCEndpoint(),
+		Value: node.DefaultIPCEndpoint(),
 		Usage: "API endpoint to attach to",
 	}
 	monitorCommandRowsFlag = cli.IntFlag{
@@ -69,12 +68,12 @@ to display multiple metrics simultaneously.
 // monitor starts a terminal UI based monitoring tool for the requested metrics.
 func monitor(ctx *cli.Context) error {
 	var (
-		client rpc.Client
+		client *rpc.Client
 		err    error
 	)
 	// Attach to an Ethereum node over IPC or RPC
 	endpoint := ctx.String(monitorCommandAttachFlag.Name)
-	if client, err = utils.NewRemoteRPCClientFromString(endpoint); err != nil {
+	if client, err = dialRPC(endpoint); err != nil {
 		utils.Fatalf("Unable to attach to geth node: %v", err)
 	}
 	defer client.Close()
@@ -159,30 +158,10 @@ func monitor(ctx *cli.Context) error {
 
 // retrieveMetrics contacts the attached geth node and retrieves the entire set
 // of collected system metrics.
-func retrieveMetrics(client rpc.Client) (map[string]interface{}, error) {
-	req := map[string]interface{}{
-		"id":      new(int64),
-		"method":  "debug_metrics",
-		"jsonrpc": "2.0",
-		"params":  []interface{}{true},
-	}
-
-	if err := client.Send(req); err != nil {
-		return nil, err
-	}
-
-	var res rpc.JSONSuccessResponse
-	if err := client.Recv(&res); err != nil {
-		return nil, err
-	}
-
-	if res.Result != nil {
-		if mets, ok := res.Result.(map[string]interface{}); ok {
-			return mets, nil
-		}
-	}
-
-	return nil, fmt.Errorf("unable to retrieve metrics")
+func retrieveMetrics(client *rpc.Client) (map[string]interface{}, error) {
+	var metrics map[string]interface{}
+	err := client.Call(&metrics, "debug_metrics", true)
+	return metrics, err
 }
 
 // resolveMetrics takes a list of input metric patterns, and resolves each to one
@@ -270,7 +249,7 @@ func fetchMetric(metrics map[string]interface{}, metric string) float64 {
 
 // refreshCharts retrieves a next batch of metrics, and inserts all the new
 // values into the active datasets and charts
-func refreshCharts(client rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) {
+func refreshCharts(client *rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) {
 	values, err := retrieveMetrics(client)
 	for i, metric := range metrics {
 		if len(data) < 512 {
diff --git a/cmd/utils/client.go b/cmd/utils/client.go
deleted file mode 100644
index cc9647580..000000000
--- a/cmd/utils/client.go
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of go-ethereum.
-//
-// go-ethereum is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// go-ethereum is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
-
-package utils
-
-import (
-	"fmt"
-	"strings"
-
-	"github.com/ethereum/go-ethereum/node"
-	"github.com/ethereum/go-ethereum/rpc"
-	"gopkg.in/urfave/cli.v1"
-)
-
-// NewRemoteRPCClient returns a RPC client which connects to a running geth instance.
-// Depending on the given context this can either be a IPC or a HTTP client.
-func NewRemoteRPCClient(ctx *cli.Context) (rpc.Client, error) {
-	if ctx.Args().Present() {
-		endpoint := ctx.Args().First()
-		return NewRemoteRPCClientFromString(endpoint)
-	}
-	// use IPC by default
-	return rpc.NewIPCClient(node.DefaultIPCEndpoint())
-}
-
-// NewRemoteRPCClientFromString returns a RPC client which connects to the given
-// endpoint. It must start with either `ipc:` or `rpc:` (HTTP).
-func NewRemoteRPCClientFromString(endpoint string) (rpc.Client, error) {
-	if strings.HasPrefix(endpoint, "ipc:") {
-		return rpc.NewIPCClient(endpoint[4:])
-	}
-	if strings.HasPrefix(endpoint, "rpc:") {
-		return rpc.NewHTTPClient(endpoint[4:])
-	}
-	if strings.HasPrefix(endpoint, "http://") {
-		return rpc.NewHTTPClient(endpoint)
-	}
-	if strings.HasPrefix(endpoint, "ws:") {
-		return rpc.NewWSClient(endpoint)
-	}
-	return nil, fmt.Errorf("invalid endpoint")
-}
diff --git a/console/bridge.go b/console/bridge.go
index b23e06837..06cb41d80 100644
--- a/console/bridge.go
+++ b/console/bridge.go
@@ -31,13 +31,13 @@ import (
 // bridge is a collection of JavaScript utility methods to bride the .js runtime
 // environment and the Go RPC connection backing the remote method calls.
 type bridge struct {
-	client   rpc.Client   // RPC client to execute Ethereum requests through
+	client   *rpc.Client  // RPC client to execute Ethereum requests through
 	prompter UserPrompter // Input prompter to allow interactive user feedback
 	printer  io.Writer    // Output writer to serialize any display strings to
 }
 
 // newBridge creates a new JavaScript wrapper around an RPC client.
-func newBridge(client rpc.Client, prompter UserPrompter, printer io.Writer) *bridge {
+func newBridge(client *rpc.Client, prompter UserPrompter, printer io.Writer) *bridge {
 	return &bridge{
 		client:   client,
 		prompter: prompter,
@@ -188,88 +188,86 @@ func (b *bridge) SleepBlocks(call otto.FunctionCall) (response otto.Value) {
 	return otto.FalseValue()
 }
 
-// Send will serialize the first argument, send it to the node and returns the response.
+type jsonrpcCall struct {
+	Id     int64
+	Method string
+	Params []interface{}
+}
+
+// Send implements the web3 provider "send" method.
 func (b *bridge) Send(call otto.FunctionCall) (response otto.Value) {
-	// Ensure that we've got a batch request (array) or a single request (object)
-	arg := call.Argument(0).Object()
-	if arg == nil || (arg.Class() != "Array" && arg.Class() != "Object") {
-		throwJSException("request must be an object or array")
-	}
-	// Convert the otto VM arguments to Go values
-	data, err := call.Otto.Call("JSON.stringify", nil, arg)
+	// Remarshal the request into a Go value.
+	JSON, _ := call.Otto.Object("JSON")
+	reqVal, err := JSON.Call("stringify", call.Argument(0))
 	if err != nil {
 		throwJSException(err.Error())
 	}
-	reqjson, err := data.ToString()
-	if err != nil {
-		throwJSException(err.Error())
-	}
-
 	var (
-		reqs  []rpc.JSONRequest
-		batch = true
+		rawReq = []byte(reqVal.String())
+		reqs   []jsonrpcCall
+		batch  bool
 	)
-	if err = json.Unmarshal([]byte(reqjson), &reqs); err != nil {
-		// single request?
-		reqs = make([]rpc.JSONRequest, 1)
-		if err = json.Unmarshal([]byte(reqjson), &reqs[0]); err != nil {
-			throwJSException("invalid request")
-		}
+	if rawReq[0] == '[' {
+		batch = true
+		json.Unmarshal(rawReq, &reqs)
+	} else {
 		batch = false
+		reqs = make([]jsonrpcCall, 1)
+		json.Unmarshal(rawReq, &reqs[0])
 	}
-	// Iteratively execute the requests
-	call.Otto.Set("response_len", len(reqs))
-	call.Otto.Run("var ret_response = new Array(response_len);")
 
-	for i, req := range reqs {
-		// Execute the RPC request and parse the reply
-		if err = b.client.Send(&req); err != nil {
-			return newErrorResponse(call, -32603, err.Error(), req.Id)
-		}
-		result := make(map[string]interface{})
-		if err = b.client.Recv(&result); err != nil {
-			return newErrorResponse(call, -32603, err.Error(), req.Id)
+	// Execute the requests.
+	resps, _ := call.Otto.Object("new Array()")
+	for _, req := range reqs {
+		resp, _ := call.Otto.Object(`({"jsonrpc":"2.0"})`)
+		resp.Set("id", req.Id)
+		var result json.RawMessage
+		err = b.client.Call(&result, req.Method, req.Params...)
+		switch err := err.(type) {
+		case nil:
+			if result == nil {
+				// Special case null because it is decoded as an empty
+				// raw message for some reason.
+				resp.Set("result", otto.NullValue())
+			} else {
+				resultVal, err := JSON.Call("parse", string(result))
+				if err != nil {
+					resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object()
+				} else {
+					resp.Set("result", resultVal)
+				}
+			}
+		case rpc.Error:
+			resp.Set("error", map[string]interface{}{
+				"code":    err.ErrorCode(),
+				"message": err.Error(),
+			})
+		default:
+			resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object()
 		}
-		// Feed the reply back into the JavaScript runtime environment
-		id, _ := result["id"]
-		jsonver, _ := result["jsonrpc"]
-
-		call.Otto.Set("ret_id", id)
-		call.Otto.Set("ret_jsonrpc", jsonver)
-		call.Otto.Set("response_idx", i)
+		resps.Call("push", resp)
+	}
 
-		if res, ok := result["result"]; ok {
-			payload, _ := json.Marshal(res)
-			call.Otto.Set("ret_result", string(payload))
-			response, err = call.Otto.Run(`
-				ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, result: JSON.parse(ret_result) };
-			`)
-			continue
-		}
-		if res, ok := result["error"]; ok {
-			payload, _ := json.Marshal(res)
-			call.Otto.Set("ret_result", string(payload))
-			response, err = call.Otto.Run(`
-				ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, error: JSON.parse(ret_result) };
-			`)
-			continue
-		}
-		return newErrorResponse(call, -32603, fmt.Sprintf("Invalid response"), new(int64))
+	// Return the responses either to the callback (if supplied)
+	// or directly as the return value.
+	if batch {
+		response = resps.Value()
+	} else {
+		response, _ = resps.Get("0")
 	}
-	// Convert single requests back from batch ones
-	if !batch {
-		call.Otto.Run("ret_response = ret_response[0];")
+	if fn := call.Argument(1).Object(); fn != nil && fn.Class() == "function" {
+		fn.Call("apply", response)
+		return otto.UndefinedValue()
 	}
-	// Execute any registered callbacks
-	if call.Argument(1).IsObject() {
-		call.Otto.Set("callback", call.Argument(1))
-		call.Otto.Run(`
-		if (Object.prototype.toString.call(callback) == '[object Function]') {
-			callback(null, ret_response);
-		}
-		`)
-	}
-	return
+	return response
+}
+
+func newErrorResponse(call otto.FunctionCall, code int, msg string, id interface{}) otto.Value {
+	// Bundle the error into a JSON RPC call response
+	m := map[string]interface{}{"version": "2.0", "id": id, "error": map[string]interface{}{"code": code, msg: msg}}
+	res, _ := json.Marshal(m)
+	val, _ := call.Otto.Run("(" + string(res) + ")")
+	return val
 }
 
 // throwJSException panics on an otto.Value. The Otto VM will recover from the
@@ -281,37 +279,3 @@ func throwJSException(msg interface{}) otto.Value {
 	}
 	panic(val)
 }
-
-// newErrorResponse creates a JSON RPC error response for a specific request id,
-// containing the specified error code and error message. Beside returning the
-// error to the caller, it also sets the ret_error and ret_response JavaScript
-// variables.
-func newErrorResponse(call otto.FunctionCall, code int, msg string, id interface{}) (response otto.Value) {
-	// Bundle the error into a JSON RPC call response
-	res := rpc.JSONErrResponse{
-		Version: rpc.JSONRPCVersion,
-		Id:      id,
-		Error: rpc.JSONError{
-			Code:    code,
-			Message: msg,
-		},
-	}
-	// Serialize the error response into JavaScript variables
-	errObj, err := json.Marshal(res.Error)
-	if err != nil {
-		glog.V(logger.Error).Infof("Failed to serialize JSON RPC error: %v", err)
-	}
-	resObj, err := json.Marshal(res)
-	if err != nil {
-		glog.V(logger.Error).Infof("Failed to serialize JSON RPC error response: %v", err)
-	}
-
-	if _, err = call.Otto.Run("ret_error = " + string(errObj)); err != nil {
-		glog.V(logger.Error).Infof("Failed to set `ret_error` to the occurred error: %v", err)
-	}
-	resVal, err := call.Otto.Run("ret_response = " + string(resObj))
-	if err != nil {
-		glog.V(logger.Error).Infof("Failed to set `ret_response` to the JSON RPC response: %v", err)
-	}
-	return resVal
-}
diff --git a/console/console.go b/console/console.go
index 00d1fea1d..f224f0c2e 100644
--- a/console/console.go
+++ b/console/console.go
@@ -52,7 +52,7 @@ const DefaultPrompt = "> "
 type Config struct {
 	DataDir  string       // Data directory to store the console history at
 	DocRoot  string       // Filesystem path from where to load JavaScript files from
-	Client   rpc.Client   // RPC client to execute Ethereum requests through
+	Client   *rpc.Client  // RPC client to execute Ethereum requests through
 	Prompt   string       // Input prompt prefix string (defaults to DefaultPrompt)
 	Prompter UserPrompter // Input prompter to allow interactive user feedback (defaults to TerminalPrompter)
 	Printer  io.Writer    // Output writer to serialize any display strings to (defaults to os.Stdout)
@@ -63,7 +63,7 @@ type Config struct {
 // JavaScript console attached to a running node via an external or in-process RPC
 // client.
 type Console struct {
-	client   rpc.Client   // RPC client to execute Ethereum requests through
+	client   *rpc.Client  // RPC client to execute Ethereum requests through
 	jsre     *jsre.JSRE   // JavaScript runtime environment running the interpreter
 	prompt   string       // Input prompt prefix string
 	prompter UserPrompter // Input prompter to allow interactive user feedback
diff --git a/internal/jsre/pretty.go b/internal/jsre/pretty.go
index 30d8660ff..f32e16243 100644
--- a/internal/jsre/pretty.go
+++ b/internal/jsre/pretty.go
@@ -116,7 +116,7 @@ func (ctx ppctx) printValue(v otto.Value, level int, inArray bool) {
 
 func (ctx ppctx) printObject(obj *otto.Object, level int, inArray bool) {
 	switch obj.Class() {
-	case "Array":
+	case "Array", "GoArray":
 		lv, _ := obj.Get("length")
 		len, _ := lv.ToInteger()
 		if len == 0 {
diff --git a/node/node.go b/node/node.go
index 1f517a027..ac8a7e8f0 100644
--- a/node/node.go
+++ b/node/node.go
@@ -505,16 +505,14 @@ func (n *Node) Restart() error {
 }
 
 // Attach creates an RPC client attached to an in-process API handler.
-func (n *Node) Attach() (rpc.Client, error) {
+func (n *Node) Attach() (*rpc.Client, error) {
 	n.lock.RLock()
 	defer n.lock.RUnlock()
 
-	// Short circuit if the node's not running
 	if n.server == nil {
 		return nil, ErrNodeStopped
 	}
-	// Otherwise attach to the API and return
-	return rpc.NewInProcRPCClient(n.inprocHandler), nil
+	return rpc.DialInProc(n.inprocHandler), nil
 }
 
 // Server retrieves the currently running P2P network layer. This method is meant
diff --git a/node/node_test.go b/node/node_test.go
index 372fc6b10..d9b26453b 100644
--- a/node/node_test.go
+++ b/node/node_test.go
@@ -507,21 +507,27 @@ func TestAPIGather(t *testing.T) {
 	}
 	// Register a batch of services with some configured APIs
 	calls := make(chan string, 1)
-
+	makeAPI := func(result string) *OneMethodApi {
+		return &OneMethodApi{fun: func() { calls <- result }}
+	}
 	services := map[string]struct {
 		APIs  []rpc.API
 		Maker InstrumentingWrapper
 	}{
-		"Zero APIs": {[]rpc.API{}, InstrumentedServiceMakerA},
-		"Single API": {[]rpc.API{
-			{"single", "1", &OneMethodApi{fun: func() { calls <- "single.v1" }}, true},
-		}, InstrumentedServiceMakerB},
-		"Many APIs": {[]rpc.API{
-			{"multi", "1", &OneMethodApi{fun: func() { calls <- "multi.v1" }}, true},
-			{"multi.v2", "2", &OneMethodApi{fun: func() { calls <- "multi.v2" }}, true},
-			{"multi.v2.nested", "2", &OneMethodApi{fun: func() { calls <- "multi.v2.nested" }}, true},
-		}, InstrumentedServiceMakerC},
+		"Zero APIs": {
+			[]rpc.API{}, InstrumentedServiceMakerA},
+		"Single API": {
+			[]rpc.API{
+				{Namespace: "single", Version: "1", Service: makeAPI("single.v1"), Public: true},
+			}, InstrumentedServiceMakerB},
+		"Many APIs": {
+			[]rpc.API{
+				{Namespace: "multi", Version: "1", Service: makeAPI("multi.v1"), Public: true},
+				{Namespace: "multi.v2", Version: "2", Service: makeAPI("multi.v2"), Public: true},
+				{Namespace: "multi.v2.nested", Version: "2", Service: makeAPI("multi.v2.nested"), Public: true},
+			}, InstrumentedServiceMakerC},
 	}
+
 	for id, config := range services {
 		config := config
 		constructor := func(*ServiceContext) (Service, error) {
@@ -554,12 +560,8 @@ func TestAPIGather(t *testing.T) {
 		{"multi.v2.nested_theOneMethod", "multi.v2.nested"},
 	}
 	for i, test := range tests {
-		if err := client.Send(rpc.JSONRequest{Id: []byte("1"), Version: "2.0", Method: test.Method}); err != nil {
-			t.Fatalf("test %d: failed to send API request: %v", i, err)
-		}
-		reply := new(rpc.JSONSuccessResponse)
-		if err := client.Recv(reply); err != nil {
-			t.Fatalf("test %d: failed to read API reply: %v", i, err)
+		if err := client.Call(nil, test.Method); err != nil {
+			t.Errorf("test %d: API request failed: %v", i, err)
 		}
 		select {
 		case result := <-calls:
diff --git a/rpc/client.go b/rpc/client.go
new file mode 100644
index 000000000..4ff9a8cb9
--- /dev/null
+++ b/rpc/client.go
@@ -0,0 +1,740 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+	"bytes"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"net"
+	"net/url"
+	"reflect"
+	"strconv"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/ethereum/go-ethereum/logger"
+	"github.com/ethereum/go-ethereum/logger/glog"
+	"golang.org/x/net/context"
+)
+
+var (
+	ErrClientQuit = errors.New("client is closed")
+	ErrNoResult   = errors.New("no result in JSON-RPC response")
+)
+
+const (
+	clientSubscriptionBuffer = 100 // if exceeded, the client stops reading
+	tcpKeepAliveInterval     = 30 * time.Second
+	defaultDialTimeout       = 10 * time.Second // used when dialing if the context has no deadline
+	defaultWriteTimeout      = 10 * time.Second // used for calls if the context has no deadline
+	subscribeTimeout         = 5 * time.Second  // overall timeout eth_subscribe, rpc_modules calls
+)
+
+// BatchElem is an element in a batch request.
+type BatchElem struct {
+	Method string
+	Args   []interface{}
+	// The result is unmarshaled into this field. Result must be set to a
+	// non-nil pointer value of the desired type, otherwise the response will be
+	// discarded.
+	Result interface{}
+	// Error is set if the server returns an error for this request, or if
+	// unmarshaling into Result fails. It is not set for I/O errors.
+	Error error
+}
+
+// A value of this type can a JSON-RPC request, notification, successful response or
+// error response. Which one it is depends on the fields.
+type jsonrpcMessage struct {
+	Version string          `json:"jsonrpc"`
+	ID      json.RawMessage `json:"id,omitempty"`
+	Method  string          `json:"method,omitempty"`
+	Params  json.RawMessage `json:"params,omitempty"`
+	Error   *jsonError      `json:"error,omitempty"`
+	Result  json.RawMessage `json:"result,omitempty"`
+}
+
+func (msg *jsonrpcMessage) isNotification() bool {
+	return msg.ID == nil && msg.Method != ""
+}
+
+func (msg *jsonrpcMessage) isResponse() bool {
+	return msg.hasValidID() && msg.Method == "" && len(msg.Params) == 0
+}
+
+func (msg *jsonrpcMessage) hasValidID() bool {
+	return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '['
+}
+
+func (msg *jsonrpcMessage) String() string {
+	b, _ := json.Marshal(msg)
+	return string(b)
+}
+
+// Client represents a connection to an RPC server.
+type Client struct {
+	idCounter   uint32
+	connectFunc func(ctx context.Context) (net.Conn, error)
+	isHTTP      bool
+
+	// writeConn is only safe to access outside dispatch, with the
+	// write lock held. The write lock is taken by sending on
+	// requestOp and released by sending on sendDone.
+	writeConn net.Conn
+
+	// for dispatch
+	close       chan struct{}
+	didQuit     chan struct{}                  // closed when client quits
+	reconnected chan net.Conn                  // where write/reconnect sends the new connection
+	readErr     chan error                     // errors from read
+	readResp    chan []*jsonrpcMessage         // valid messages from read
+	requestOp   chan *requestOp                // for registering response IDs
+	sendDone    chan error                     // signals write completion, releases write lock
+	respWait    map[string]*requestOp          // active requests
+	subs        map[string]*ClientSubscription // active subscriptions
+}
+
+type requestOp struct {
+	ids  []json.RawMessage
+	err  error
+	resp chan *jsonrpcMessage // receives up to len(ids) responses
+	sub  *ClientSubscription  // only set for EthSubscribe requests
+}
+
+func (op *requestOp) wait(ctx context.Context) (*jsonrpcMessage, error) {
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case resp := <-op.resp:
+		return resp, op.err
+	}
+}
+
+// Dial creates a new client for the given URL.
+//
+// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a
+// file name with no URL scheme, a local socket connection is established using UNIX
+// domain sockets on supported platforms and named pipes on Windows. If you want to
+// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead.
+//
+// For websocket connections, the origin is set to the local host name.
+//
+// The client reconnects automatically if the connection is lost.
+func Dial(rawurl string) (*Client, error) {
+	return DialContext(context.Background(), rawurl)
+}
+
+// DialContext creates a new RPC client, just like Dial.
+//
+// The context is used to cancel or time out the initial connection establishment. It does
+// not affect subsequent interactions with the client.
+func DialContext(ctx context.Context, rawurl string) (*Client, error) {
+	u, err := url.Parse(rawurl)
+	if err != nil {
+		return nil, err
+	}
+	switch u.Scheme {
+	case "http", "https":
+		return DialHTTP(rawurl)
+	case "ws", "wss":
+		return DialWebsocket(ctx, rawurl, "")
+	case "":
+		return DialIPC(ctx, rawurl)
+	default:
+		return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
+	}
+}
+
+func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) {
+	conn, err := connectFunc(initctx)
+	if err != nil {
+		return nil, err
+	}
+	_, isHTTP := conn.(*httpConn)
+
+	c := &Client{
+		writeConn:   conn,
+		isHTTP:      isHTTP,
+		connectFunc: connectFunc,
+		close:       make(chan struct{}),
+		didQuit:     make(chan struct{}),
+		reconnected: make(chan net.Conn),
+		readErr:     make(chan error),
+		readResp:    make(chan []*jsonrpcMessage),
+		requestOp:   make(chan *requestOp),
+		sendDone:    make(chan error, 1),
+		respWait:    make(map[string]*requestOp),
+		subs:        make(map[string]*ClientSubscription),
+	}
+	if !isHTTP {
+		go c.dispatch(conn)
+	}
+	return c, nil
+}
+
+func (c *Client) nextID() json.RawMessage {
+	id := atomic.AddUint32(&c.idCounter, 1)
+	return []byte(strconv.FormatUint(uint64(id), 10))
+}
+
+// SupportedModules calls the rpc_modules method, retrieving the list of
+// APIs that are available on the server.
+func (c *Client) SupportedModules() (map[string]string, error) {
+	var result map[string]string
+	ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
+	defer cancel()
+	err := c.CallContext(ctx, &result, "rpc_modules")
+	return result, err
+}
+
+// Close closes the client, aborting any in-flight requests.
+func (c *Client) Close() {
+	if c.isHTTP {
+		return
+	}
+	select {
+	case c.close <- struct{}{}:
+		<-c.didQuit
+	case <-c.didQuit:
+	}
+}
+
+// Call performs a JSON-RPC call with the given arguments and unmarshals into
+// result if no error occurred.
+//
+// The result must be a pointer so that package json can unmarshal into it. You
+// can also pass nil, in which case the result is ignored.
+func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
+	ctx := context.Background()
+	return c.CallContext(ctx, result, method, args...)
+}
+
+// CallContext performs a JSON-RPC call with the given arguments. If the context is
+// canceled before the call has successfully returned, CallContext returns immediately.
+//
+// The result must be a pointer so that package json can unmarshal into it. You
+// can also pass nil, in which case the result is ignored.
+func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
+	msg, err := c.newMessage(method, args...)
+	if err != nil {
+		return err
+	}
+	op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
+
+	if c.isHTTP {
+		err = c.sendHTTP(ctx, op, msg)
+	} else {
+		err = c.send(ctx, op, msg)
+	}
+	if err != nil {
+		return err
+	}
+
+	// dispatch has accepted the request and will close the channel it when it quits.
+	switch resp, err := op.wait(ctx); {
+	case err != nil:
+		return err
+	case resp.Error != nil:
+		return resp.Error
+	case len(resp.Result) == 0:
+		return ErrNoResult
+	default:
+		return json.Unmarshal(resp.Result, &result)
+	}
+}
+
+// BatchCall sends all given requests as a single batch and waits for the server
+// to return a response for all of them.
+//
+// In contrast to Call, BatchCall only returns I/O errors. Any error specific to
+// a request is reported through the Error field of the corresponding BatchElem.
+//
+// Note that batch calls may not be executed atomically on the server side.
+func (c *Client) BatchCall(b []BatchElem) error {
+	ctx := context.Background()
+	return c.BatchCallContext(ctx, b)
+}
+
+// BatchCall sends all given requests as a single batch and waits for the server
+// to return a response for all of them. The wait duration is bounded by the
+// context's deadline.
+//
+// In contrast to CallContext, BatchCallContext only returns I/O errors. Any
+// error specific to a request is reported through the Error field of the
+// corresponding BatchElem.
+//
+// Note that batch calls may not be executed atomically on the server side.
+func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
+	msgs := make([]*jsonrpcMessage, len(b))
+	op := &requestOp{
+		ids:  make([]json.RawMessage, len(b)),
+		resp: make(chan *jsonrpcMessage, len(b)),
+	}
+	for i, elem := range b {
+		msg, err := c.newMessage(elem.Method, elem.Args...)
+		if err != nil {
+			return err
+		}
+		msgs[i] = msg
+		op.ids[i] = msg.ID
+	}
+
+	var err error
+	if c.isHTTP {
+		err = c.sendBatchHTTP(ctx, op, msgs)
+	} else {
+		err = c.send(ctx, op, msgs)
+	}
+
+	// Wait for all responses to come back.
+	for n := 0; n < len(b) && err == nil; n++ {
+		var resp *jsonrpcMessage
+		resp, err = op.wait(ctx)
+		if err != nil {
+			break
+		}
+		// Find the element corresponding to this response.
+		// The element is guaranteed to be present because dispatch
+		// only sends valid IDs to our channel.
+		var elem *BatchElem
+		for i := range msgs {
+			if bytes.Equal(msgs[i].ID, resp.ID) {
+				elem = &b[i]
+				break
+			}
+		}
+		if resp.Error != nil {
+			elem.Error = resp.Error
+			continue
+		}
+		if len(resp.Result) == 0 {
+			elem.Error = ErrNoResult
+			continue
+		}
+		elem.Error = json.Unmarshal(resp.Result, elem.Result)
+	}
+	return err
+}
+
+// EthSubscribe calls the "eth_subscribe" method with the given arguments,
+// registering a subscription. Server notifications for the subscription are
+// sent to the given channel. The element type of the channel must match the
+// expected type of content returned by the subscription.
+//
+// Callers should not use the same channel for multiple calls to EthSubscribe.
+// The channel is closed when the notification is unsubscribed or an error
+// occurs. The error can be retrieved via the Err method of the subscription.
+//
+// Slow subscribers will block the clients ingress path eventually.
+func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+	// Check type of channel first.
+	chanVal := reflect.ValueOf(channel)
+	if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
+		panic("first argument to EthSubscribe must be a writable channel")
+	}
+	if chanVal.IsNil() {
+		panic("channel given to EthSubscribe must not be nil")
+	}
+	if c.isHTTP {
+		return nil, ErrNotificationsUnsupported
+	}
+
+	msg, err := c.newMessage(subscribeMethod, args...)
+	if err != nil {
+		return nil, err
+	}
+	op := &requestOp{
+		ids:  []json.RawMessage{msg.ID},
+		resp: make(chan *jsonrpcMessage),
+		sub:  newClientSubscription(c, chanVal),
+	}
+	ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
+	defer cancel()
+
+	// Send the subscription request.
+	// The arrival and validity of the response is signaled on sub.quit.
+	if err := c.send(ctx, op, msg); err != nil {
+		return nil, err
+	}
+	if _, err := op.wait(ctx); err != nil {
+		return nil, err
+	}
+	return op.sub, nil
+}
+
+func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) {
+	params, err := json.Marshal(paramsIn)
+	if err != nil {
+		return nil, err
+	}
+	return &jsonrpcMessage{Version: "2.0", ID: c.nextID(), Method: method, Params: params}, nil
+}
+
+// send registers op with the dispatch loop, then sends msg on the connection.
+// if sending fails, op is deregistered.
+func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
+	select {
+	case c.requestOp <- op:
+		if glog.V(logger.Detail) {
+			glog.Info("sending ", msg)
+		}
+		err := c.write(ctx, msg)
+		c.sendDone <- err
+		return err
+	case <-c.didQuit:
+		return ErrClientQuit
+	}
+}
+
+func (c *Client) write(ctx context.Context, msg interface{}) error {
+	deadline, ok := ctx.Deadline()
+	if !ok {
+		deadline = time.Now().Add(defaultWriteTimeout)
+	}
+	// The previous write failed. Try to establish a new connection.
+	if c.writeConn == nil {
+		if err := c.reconnect(ctx); err != nil {
+			return err
+		}
+	}
+	c.writeConn.SetWriteDeadline(deadline)
+	err := json.NewEncoder(c.writeConn).Encode(msg)
+	if err != nil {
+		c.writeConn = nil
+	}
+	return err
+}
+
+func (c *Client) reconnect(ctx context.Context) error {
+	newconn, err := c.connectFunc(ctx)
+	if err != nil {
+		glog.V(logger.Detail).Infof("reconnect failed: %v", err)
+		return err
+	}
+	select {
+	case c.reconnected <- newconn:
+		c.writeConn = newconn
+		return nil
+	case <-c.didQuit:
+		newconn.Close()
+		return ErrClientQuit
+	}
+}
+
+// dispatch is the main loop of the client.
+// It sends read messages to waiting calls to Call and BatchCall
+// and subscription notifications to registered subscriptions.
+func (c *Client) dispatch(conn net.Conn) {
+	// Spawn the initial read loop.
+	go c.read(conn)
+
+	var (
+		lastOp        *requestOp    // tracks last send operation
+		requestOpLock = c.requestOp // nil while the send lock is held
+		reading       = true        // if true, a read loop is running
+	)
+	defer close(c.didQuit)
+	defer func() {
+		c.closeRequestOps(ErrClientQuit)
+		conn.Close()
+		if reading {
+			// Empty read channels until read is dead.
+			for {
+				select {
+				case <-c.readResp:
+				case <-c.readErr:
+					return
+				}
+			}
+		}
+	}()
+
+	for {
+		select {
+		case <-c.close:
+			return
+
+		// Read path.
+		case batch := <-c.readResp:
+			for _, msg := range batch {
+				switch {
+				case msg.isNotification():
+					if glog.V(logger.Detail) {
+						glog.Info("<-readResp: notification ", msg)
+					}
+					c.handleNotification(msg)
+				case msg.isResponse():
+					if glog.V(logger.Detail) {
+						glog.Info("<-readResp: response ", msg)
+					}
+					c.handleResponse(msg)
+				default:
+					if glog.V(logger.Debug) {
+						glog.Error("<-readResp: dropping weird message", msg)
+					}
+					// TODO: maybe close
+				}
+			}
+
+		case err := <-c.readErr:
+			glog.V(logger.Debug).Infof("<-readErr: %v", err)
+			c.closeRequestOps(err)
+			conn.Close()
+			reading = false
+
+		case newconn := <-c.reconnected:
+			glog.V(logger.Debug).Infof("<-reconnected: (reading=%t) %v", reading, conn.RemoteAddr())
+			if reading {
+				// Wait for the previous read loop to exit. This is a rare case.
+				conn.Close()
+				<-c.readErr
+			}
+			go c.read(newconn)
+			reading = true
+			conn = newconn
+
+		// Send path.
+		case op := <-requestOpLock:
+			// Stop listening for further send ops until the current one is done.
+			requestOpLock = nil
+			lastOp = op
+			for _, id := range op.ids {
+				c.respWait[string(id)] = op
+			}
+
+		case err := <-c.sendDone:
+			if err != nil {
+				// Remove response handlers for the last send. We remove those here
+				// because the error is already handled in Call or BatchCall. When the
+				// read loop goes down, it will signal all other current operations.
+				for _, id := range lastOp.ids {
+					delete(c.respWait, string(id))
+				}
+			}
+			// Listen for send ops again.
+			requestOpLock = c.requestOp
+			lastOp = nil
+		}
+	}
+}
+
+// closeRequestOps unblocks pending send ops and active subscriptions.
+func (c *Client) closeRequestOps(err error) {
+	didClose := make(map[*requestOp]bool)
+
+	for id, op := range c.respWait {
+		// Remove the op so that later calls will not close op.resp again.
+		delete(c.respWait, id)
+
+		if !didClose[op] {
+			op.err = err
+			close(op.resp)
+			didClose[op] = true
+		}
+	}
+	for id, sub := range c.subs {
+		delete(c.subs, id)
+		sub.quitWithError(err, false)
+	}
+}
+
+func (c *Client) handleNotification(msg *jsonrpcMessage) {
+	if msg.Method != notificationMethod {
+		glog.V(logger.Debug).Info("dropping non-subscription message: ", msg)
+		return
+	}
+	var subResult struct {
+		ID     string          `json:"subscription"`
+		Result json.RawMessage `json:"result"`
+	}
+	if err := json.Unmarshal(msg.Params, &subResult); err != nil {
+		glog.V(logger.Debug).Info("dropping invalid subscription message: ", msg)
+		return
+	}
+	if c.subs[subResult.ID] != nil {
+		c.subs[subResult.ID].deliver(subResult.Result)
+	}
+}
+
+func (c *Client) handleResponse(msg *jsonrpcMessage) {
+	op := c.respWait[string(msg.ID)]
+	if op == nil {
+		glog.V(logger.Debug).Infof("unsolicited response %v", msg)
+		return
+	}
+	delete(c.respWait, string(msg.ID))
+	// For normal responses, just forward the reply to Call/BatchCall.
+	if op.sub == nil {
+		op.resp <- msg
+		return
+	}
+	// For subscription responses, start the subscription if the server
+	// indicates success. EthSubscribe gets unblocked in either case through
+	// the op.resp channel.
+	defer close(op.resp)
+	if msg.Error != nil {
+		op.err = msg.Error
+		return
+	}
+	if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
+		go op.sub.start()
+		c.subs[op.sub.subid] = op.sub
+	}
+}
+
+// Reading happens on a dedicated goroutine.
+
+func (c *Client) read(conn net.Conn) error {
+	var (
+		buf json.RawMessage
+		dec = json.NewDecoder(conn)
+	)
+	readMessage := func() (rs []*jsonrpcMessage, err error) {
+		buf = buf[:0]
+		if err = dec.Decode(&buf); err != nil {
+			return nil, err
+		}
+		if isBatch(buf) {
+			err = json.Unmarshal(buf, &rs)
+		} else {
+			rs = make([]*jsonrpcMessage, 1)
+			err = json.Unmarshal(buf, &rs[0])
+		}
+		return rs, err
+	}
+
+	for {
+		resp, err := readMessage()
+		if err != nil {
+			c.readErr <- err
+			return err
+		}
+		c.readResp <- resp
+	}
+}
+
+// Subscriptions.
+
+// A ClientSubscription represents a subscription established through EthSubscribe.
+type ClientSubscription struct {
+	client  *Client
+	etype   reflect.Type
+	channel reflect.Value
+	subid   string
+	in      chan json.RawMessage
+
+	quitOnce sync.Once     // ensures quit is closed once
+	quit     chan struct{} // quit is closed when the subscription exits
+	errOnce  sync.Once     // ensures err is closed once
+	err      chan error
+}
+
+func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription {
+	sub := &ClientSubscription{
+		client:  c,
+		etype:   channel.Type().Elem(),
+		channel: channel,
+		quit:    make(chan struct{}),
+		err:     make(chan error, 1),
+		// in is buffered so dispatch can continue even if the subscriber is slow.
+		in: make(chan json.RawMessage, clientSubscriptionBuffer),
+	}
+	return sub
+}
+
+// Err returns the subscription error channel. The intended use of Err is to schedule
+// resubscription when the client connection is closed unexpectedly.
+//
+// The error channel receives a value when the subscription has ended due
+// to an error. The received error is ErrClientQuit if Close has been called
+// on the underlying client and no other error has occurred.
+//
+// The error channel is closed when Unsubscribe is called on the subscription.
+func (sub *ClientSubscription) Err() <-chan error {
+	return sub.err
+}
+
+// Unsubscribe unsubscribes the notification and closes the error channel.
+// It can safely be called more than once.
+func (sub *ClientSubscription) Unsubscribe() {
+	sub.quitWithError(nil, true)
+	sub.errOnce.Do(func() { close(sub.err) })
+}
+
+func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) {
+	sub.quitOnce.Do(func() {
+		if unsubscribeServer {
+			sub.requestUnsubscribe()
+		}
+		if err != nil {
+			sub.err <- err
+		}
+		close(sub.quit)
+	})
+}
+
+func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
+	select {
+	case sub.in <- result:
+		return true
+	case <-sub.quit:
+		return false
+	}
+}
+
+func (sub *ClientSubscription) start() {
+	sub.quitWithError(sub.forward())
+}
+
+func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
+	cases := []reflect.SelectCase{
+		{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
+		{Dir: reflect.SelectSend, Chan: sub.channel},
+	}
+	for {
+		select {
+		case result := <-sub.in:
+			val, err := sub.unmarshal(result)
+			if err != nil {
+				return err, true
+			}
+			cases[1].Send = val
+			switch chosen, _, _ := reflect.Select(cases); chosen {
+			case 0: // <-sub.quit
+				return nil, false
+			case 1: // sub.channel<-
+				continue
+			}
+		case <-sub.quit:
+			return nil, false
+		}
+	}
+}
+
+func (sub *ClientSubscription) unmarshal(result json.RawMessage) (reflect.Value, error) {
+	val := reflect.New(sub.etype)
+	err := json.Unmarshal(result, val.Interface())
+	return val.Elem(), err
+}
+
+func (sub *ClientSubscription) requestUnsubscribe() error {
+	var result interface{}
+	return sub.client.Call(&result, unsubscribeMethod, sub.subid)
+}
diff --git a/rpc/client_context_go1.4.go b/rpc/client_context_go1.4.go
new file mode 100644
index 000000000..ac956a17d
--- /dev/null
+++ b/rpc/client_context_go1.4.go
@@ -0,0 +1,60 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build !go1.5
+
+package rpc
+
+import (
+	"net"
+	"net/http"
+	"time"
+
+	"golang.org/x/net/context"
+)
+
+// In older versions of Go (below 1.5), dials cannot be canceled
+// via a channel or context. The context deadline can still applied.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+	dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
+	if deadline, ok := ctx.Deadline(); ok {
+		dialer.Deadline = deadline
+	} else {
+		dialer.Deadline = time.Now().Add(defaultDialTimeout)
+	}
+	return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+	return contextDialer(ctx).Dial(network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+	// Set Timeout on the client if the context has a deadline.
+	// Note that there is no default timeout (unlike in contextDialer) because
+	// the timeout applies to the entire request, including reads from body.
+	if deadline, ok := ctx.Deadline(); ok {
+		c2 := *c
+		c2.Timeout = deadline.Sub(time.Now())
+		c = &c2
+	}
+	req2 := *req
+	return c, &req2
+}
diff --git a/rpc/client_context_go1.5.go b/rpc/client_context_go1.5.go
new file mode 100644
index 000000000..4a007d9f8
--- /dev/null
+++ b/rpc/client_context_go1.5.go
@@ -0,0 +1,61 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build go1.5,!go1.6
+
+package rpc
+
+import (
+	"net"
+	"net/http"
+	"time"
+
+	"golang.org/x/net/context"
+)
+
+// In Go 1.5, dials cannot be canceled via a channel or context. The context deadline can
+// still be applied. Go 1.5 adds the ability to cancel HTTP requests via a channel.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+	dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
+	if deadline, ok := ctx.Deadline(); ok {
+		dialer.Deadline = deadline
+	} else {
+		dialer.Deadline = time.Now().Add(defaultDialTimeout)
+	}
+	return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+	return contextDialer(ctx).Dial(network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+	// Set Timeout on the client if the context has a deadline.
+	// Note that there is no default timeout (unlike in contextDialer) because
+	// the timeout applies to the entire request, including reads from body.
+	if deadline, ok := ctx.Deadline(); ok {
+		c2 := *c
+		c2.Timeout = deadline.Sub(time.Now())
+		c = &c2
+	}
+	req2 := *req
+	req2.Cancel = ctx.Done()
+	return c, &req2
+}
diff --git a/rpc/client_context_go1.6.go b/rpc/client_context_go1.6.go
new file mode 100644
index 000000000..67777ddc6
--- /dev/null
+++ b/rpc/client_context_go1.6.go
@@ -0,0 +1,55 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build go1.6,!go1.7
+
+package rpc
+
+import (
+	"net"
+	"net/http"
+	"time"
+
+	"golang.org/x/net/context"
+)
+
+// In Go 1.6, net.Dialer gained the ability to cancel via a channel.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+	dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval}
+	if deadline, ok := ctx.Deadline(); ok {
+		dialer.Deadline = deadline
+	} else {
+		dialer.Deadline = time.Now().Add(defaultDialTimeout)
+	}
+	return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+	return contextDialer(ctx).Dial(network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+	// We set Timeout on the client for Go <= 1.5. There
+	// is no need to do that here because the dial will be canceled
+	// by package http.
+	req2 := *req
+	req2.Cancel = ctx.Done()
+	return c, &req2
+}
diff --git a/rpc/client_context_go1.7.go b/rpc/client_context_go1.7.go
new file mode 100644
index 000000000..56ce12ab8
--- /dev/null
+++ b/rpc/client_context_go1.7.go
@@ -0,0 +1,51 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build go1.7
+
+package rpc
+
+import (
+	"context"
+	"net"
+	"net/http"
+	"time"
+)
+
+// In Go 1.7, context moved into the standard library and support
+// for cancelation via context was added to net.Dialer and http.Request.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+	dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval}
+	if deadline, ok := ctx.Deadline(); ok {
+		dialer.Deadline = deadline
+	} else {
+		dialer.Deadline = time.Now().Add(defaultDialTimeout)
+	}
+	return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+	d := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
+	return d.DialContext(ctx, network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+	return c, req.WithContext(ctx)
+}
diff --git a/rpc/client_example_test.go b/rpc/client_example_test.go
new file mode 100644
index 000000000..84b4b67bb
--- /dev/null
+++ b/rpc/client_example_test.go
@@ -0,0 +1,83 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc_test
+
+import (
+	"fmt"
+	"math/big"
+	"time"
+
+	"github.com/ethereum/go-ethereum/rpc"
+)
+
+// In this example, our client whishes to track the latest 'block number'
+// known to the server. The server supports two methods:
+//
+// eth_getBlockByNumber("latest", {})
+//    returns the latest block object.
+//
+// eth_subscribe("newBlocks")
+//    creates a subscription which fires block objects when new blocks arrive.
+
+type Block struct {
+	Number *big.Int
+}
+
+func ExampleClientSubscription() {
+	// Connect the client.
+	client, _ := rpc.Dial("ws://127.0.0.1:8485")
+	subch := make(chan Block)
+	go subscribeBlocks(client, subch)
+
+	// Print events from the subscription as they arrive.
+	for block := range subch {
+		fmt.Println("latest block:", block.Number)
+	}
+}
+
+// subscribeBlocks runs in its own goroutine and maintains
+// a subscription for new blocks.
+func subscribeBlocks(client *rpc.Client, subch chan Block) {
+	for i := 0; ; i++ {
+		if i > 0 {
+			time.Sleep(2 * time.Second)
+		}
+
+		// Subscribe to new blocks.
+		sub, err := client.EthSubscribe(subch, "newBlocks")
+		if err == rpc.ErrClientQuit {
+			return // Stop reconnecting if the client was closed.
+		} else if err != nil {
+			fmt.Println("subscribe error:", err)
+			continue
+		}
+
+		// The connection is established now.
+		// Update the channel with the current block.
+		var lastBlock Block
+		if err := client.Call(&lastBlock, "eth_getBlockByNumber", "latest"); err != nil {
+			fmt.Println("can't get latest block:", err)
+			continue
+		}
+		subch <- lastBlock
+
+		// The subscription will deliver events to the channel. Wait for the
+		// subscription to end for any reason, then loop around to re-establish
+		// the connection.
+		fmt.Println("connection lost: ", <-sub.Err())
+	}
+}
diff --git a/rpc/client_test.go b/rpc/client_test.go
new file mode 100644
index 000000000..58dceada0
--- /dev/null
+++ b/rpc/client_test.go
@@ -0,0 +1,489 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+	"fmt"
+	"math/rand"
+	"net"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"reflect"
+	"runtime"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/davecgh/go-spew/spew"
+	"github.com/ethereum/go-ethereum/logger"
+	"github.com/ethereum/go-ethereum/logger/glog"
+	"golang.org/x/net/context"
+)
+
+func TestClientRequest(t *testing.T) {
+	server := newTestServer("service", new(Service))
+	defer server.Stop()
+	client := DialInProc(server)
+	defer client.Close()
+
+	var resp Result
+	if err := client.Call(&resp, "service_echo", "hello", 10, &Args{"world"}); err != nil {
+		t.Fatal(err)
+	}
+	if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) {
+		t.Errorf("incorrect result %#v", resp)
+	}
+}
+
+func TestClientBatchRequest(t *testing.T) {
+	server := newTestServer("service", new(Service))
+	defer server.Stop()
+	client := DialInProc(server)
+	defer client.Close()
+
+	batch := []BatchElem{
+		{
+			Method: "service_echo",
+			Args:   []interface{}{"hello", 10, &Args{"world"}},
+			Result: new(Result),
+		},
+		{
+			Method: "service_echo",
+			Args:   []interface{}{"hello2", 11, &Args{"world"}},
+			Result: new(Result),
+		},
+		{
+			Method: "no_such_method",
+			Args:   []interface{}{1, 2, 3},
+			Result: new(int),
+		},
+	}
+	if err := client.BatchCall(batch); err != nil {
+		t.Fatal(err)
+	}
+	wantResult := []BatchElem{
+		{
+			Method: "service_echo",
+			Args:   []interface{}{"hello", 10, &Args{"world"}},
+			Result: &Result{"hello", 10, &Args{"world"}},
+		},
+		{
+			Method: "service_echo",
+			Args:   []interface{}{"hello2", 11, &Args{"world"}},
+			Result: &Result{"hello2", 11, &Args{"world"}},
+		},
+		{
+			Method: "no_such_method",
+			Args:   []interface{}{1, 2, 3},
+			Result: new(int),
+			Error:  &jsonError{Code: -32601, Message: "The method no_such_method_ does not exist/is not available"},
+		},
+	}
+	if !reflect.DeepEqual(batch, wantResult) {
+		t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
+	}
+}
+
+// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
+func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
+func TestClientCancelHTTP(t *testing.T)      { testClientCancel("http", t) }
+func TestClientCancelIPC(t *testing.T)       { testClientCancel("ipc", t) }
+
+// This test checks that requests made through CallContext can be canceled by canceling
+// the context.
+func testClientCancel(transport string, t *testing.T) {
+	server := newTestServer("service", new(Service))
+	defer server.Stop()
+
+	// What we want to achieve is that the context gets canceled
+	// at various stages of request processing. The interesting cases
+	// are:
+	//  - cancel during dial
+	//  - cancel while performing a HTTP request
+	//  - cancel while waiting for a response
+	//
+	// To trigger those, the times are chosen such that connections
+	// are killed within the deadline for every other call (maxKillTimeout
+	// is 2x maxCancelTimeout).
+	//
+	// Once a connection is dead, there is a fair chance it won't connect
+	// successfully because the accept is delayed by 1s.
+	maxContextCancelTimeout := 300 * time.Millisecond
+	fl := &flakeyListener{
+		maxAcceptDelay: 1 * time.Second,
+		maxKillTimeout: 600 * time.Millisecond,
+	}
+
+	var client *Client
+	switch transport {
+	case "ws", "http":
+		c, hs := httpTestClient(server, transport, fl)
+		defer hs.Close()
+		client = c
+	case "ipc":
+		c, l := ipcTestClient(server, fl)
+		defer l.Close()
+		client = c
+	default:
+		panic("unknown transport: " + transport)
+	}
+
+	// These tests take a lot of time, run them all at once.
+	// You probably want to run with -parallel 1 or comment out
+	// the call to t.Parallel if you enable the logging.
+	t.Parallel()
+	// glog.SetV(6)
+	// glog.SetToStderr(true)
+	// defer glog.SetToStderr(false)
+	// glog.Infoln("testing ", transport)
+
+	// The actual test starts here.
+	var (
+		wg       sync.WaitGroup
+		nreqs    = 10
+		ncallers = 6
+	)
+	caller := func(index int) {
+		defer wg.Done()
+		for i := 0; i < nreqs; i++ {
+			var (
+				ctx     context.Context
+				cancel  func()
+				timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout)))
+			)
+			if index < ncallers/2 {
+				// For half of the callers, create a context without deadline
+				// and cancel it later.
+				ctx, cancel = context.WithCancel(context.Background())
+				time.AfterFunc(timeout, cancel)
+			} else {
+				// For the other half, create a context with a deadline instead. This is
+				// different because the context deadline is used to set the socket write
+				// deadline.
+				ctx, cancel = context.WithTimeout(context.Background(), timeout)
+			}
+			// Now perform a call with the context.
+			// The key thing here is that no call will ever complete successfully.
+			err := client.CallContext(ctx, nil, "service_sleep", 2*maxContextCancelTimeout)
+			if err != nil {
+				glog.V(logger.Debug).Infoln("got expected error:", err)
+			} else {
+				t.Errorf("no error for call with %v wait time", timeout)
+			}
+			cancel()
+		}
+	}
+	wg.Add(ncallers)
+	for i := 0; i < ncallers; i++ {
+		go caller(i)
+	}
+	wg.Wait()
+}
+
+func TestClientSubscribeInvalidArg(t *testing.T) {
+	server := newTestServer("service", new(Service))
+	defer server.Stop()
+	client := DialInProc(server)
+	defer client.Close()
+
+	check := func(shouldPanic bool, arg interface{}) {
+		defer func() {
+			err := recover()
+			if shouldPanic && err == nil {
+				t.Errorf("EthSubscribe should've panicked for %#v", arg)
+			}
+			if !shouldPanic && err != nil {
+				t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg)
+				buf := make([]byte, 1024*1024)
+				buf = buf[:runtime.Stack(buf, false)]
+				t.Error(err)
+				t.Error(string(buf))
+			}
+		}()
+		client.EthSubscribe(arg, "foo_bar")
+	}
+	check(true, nil)
+	check(true, 1)
+	check(true, (chan int)(nil))
+	check(true, make(<-chan int))
+	check(false, make(chan int))
+	check(false, make(chan<- int))
+}
+
+func TestClientSubscribe(t *testing.T) {
+	server := newTestServer("eth", new(NotificationTestService))
+	defer server.Stop()
+	client := DialInProc(server)
+	defer client.Close()
+
+	nc := make(chan int)
+	count := 10
+	sub, err := client.EthSubscribe(nc, "someSubscription", count, 0)
+	if err != nil {
+		t.Fatal("can't subscribe:", err)
+	}
+	for i := 0; i < count; i++ {
+		if val := <-nc; val != i {
+			t.Fatalf("value mismatch: got %d, want %d", val, i)
+		}
+	}
+
+	sub.Unsubscribe()
+	select {
+	case v := <-nc:
+		t.Fatal("received value after unsubscribe:", v)
+	case err := <-sub.Err():
+		if err != nil {
+			t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
+		}
+	case <-time.After(1 * time.Second):
+		t.Fatalf("subscription not closed within 1s after unsubscribe")
+	}
+}
+
+// In this test, the connection drops while EthSubscribe is
+// waiting for a response.
+func TestClientSubscribeClose(t *testing.T) {
+	service := &NotificationTestService{
+		gotHangSubscriptionReq:  make(chan struct{}),
+		unblockHangSubscription: make(chan struct{}),
+	}
+	server := newTestServer("eth", service)
+	defer server.Stop()
+	client := DialInProc(server)
+	defer client.Close()
+
+	var (
+		nc   = make(chan int)
+		errc = make(chan error)
+		sub  *ClientSubscription
+		err  error
+	)
+	go func() {
+		sub, err = client.EthSubscribe(nc, "hangSubscription", 999)
+		errc <- err
+	}()
+
+	<-service.gotHangSubscriptionReq
+	client.Close()
+	service.unblockHangSubscription <- struct{}{}
+
+	select {
+	case err := <-errc:
+		if err == nil {
+			t.Errorf("EthSubscribe returned nil error after Close")
+		}
+		if sub != nil {
+			t.Error("EthSubscribe returned non-nil subscription after Close")
+		}
+	case <-time.After(1 * time.Second):
+		t.Fatalf("EthSubscribe did not return within 1s after Close")
+	}
+}
+
+func TestClientHTTP(t *testing.T) {
+	server := newTestServer("service", new(Service))
+	defer server.Stop()
+
+	client, hs := httpTestClient(server, "http", nil)
+	defer hs.Close()
+	defer client.Close()
+
+	// Launch concurrent requests.
+	var (
+		results    = make([]Result, 100)
+		errc       = make(chan error)
+		wantResult = Result{"a", 1, new(Args)}
+	)
+	defer client.Close()
+	for i := range results {
+		i := i
+		go func() {
+			errc <- client.Call(&results[i], "service_echo",
+				wantResult.String, wantResult.Int, wantResult.Args)
+		}()
+	}
+
+	// Wait for all of them to complete.
+	timeout := time.NewTimer(5 * time.Second)
+	defer timeout.Stop()
+	for i := range results {
+		select {
+		case err := <-errc:
+			if err != nil {
+				t.Fatal(err)
+			}
+		case <-timeout.C:
+			t.Fatalf("timeout (got %d/%d) results)", i+1, len(results))
+		}
+	}
+
+	// Check results.
+	for i := range results {
+		if !reflect.DeepEqual(results[i], wantResult) {
+			t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult)
+		}
+	}
+}
+
+func TestClientReconnect(t *testing.T) {
+	startServer := func(addr string) (*Server, net.Listener) {
+		srv := newTestServer("service", new(Service))
+		l, err := net.Listen("tcp", addr)
+		if err != nil {
+			t.Fatal(err)
+		}
+		go http.Serve(l, srv.WebsocketHandler("*"))
+		return srv, l
+	}
+
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+
+	// Start a server and corresponding client.
+	s1, l1 := startServer("127.0.0.1:0")
+	client, err := DialContext(ctx, "ws://"+l1.Addr().String())
+	if err != nil {
+		t.Fatal("can't dial", err)
+	}
+
+	// Perform a call. This should work because the server is up.
+	var resp Result
+	if err := client.CallContext(ctx, &resp, "service_echo", "", 1, nil); err != nil {
+		t.Fatal(err)
+	}
+
+	// Shut down the server and try calling again. It shouldn't work.
+	l1.Close()
+	s1.Stop()
+	if err := client.CallContext(ctx, &resp, "service_echo", "", 2, nil); err == nil {
+		t.Error("successful call while the server is down")
+		t.Logf("resp: %#v", resp)
+	}
+
+	// Allow for some cool down time so we can listen on the same address again.
+	time.Sleep(2 * time.Second)
+
+	// Start it up again and call again. The connection should be reestablished.
+	// We spawn multiple calls here to check whether this hangs somehow.
+	s2, l2 := startServer(l1.Addr().String())
+	defer l2.Close()
+	defer s2.Stop()
+
+	start := make(chan struct{})
+	errors := make(chan error, 20)
+	for i := 0; i < cap(errors); i++ {
+		go func() {
+			<-start
+			var resp Result
+			errors <- client.CallContext(ctx, &resp, "service_echo", "", 3, nil)
+		}()
+	}
+	close(start)
+	errcount := 0
+	for i := 0; i < cap(errors); i++ {
+		if err = <-errors; err != nil {
+			errcount++
+		}
+	}
+	t.Log("err:", err)
+	if errcount > 1 {
+		t.Errorf("expected one error after disconnect, got %d", errcount)
+	}
+}
+
+func newTestServer(serviceName string, service interface{}) *Server {
+	server := NewServer()
+	if err := server.RegisterName(serviceName, service); err != nil {
+		panic(err)
+	}
+	return server
+}
+
+func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) {
+	// Create the HTTP server.
+	var hs *httptest.Server
+	switch transport {
+	case "ws":
+		hs = httptest.NewUnstartedServer(srv.WebsocketHandler("*"))
+	case "http":
+		hs = httptest.NewUnstartedServer(srv)
+	default:
+		panic("unknown HTTP transport: " + transport)
+	}
+	// Wrap the listener if required.
+	if fl != nil {
+		fl.Listener = hs.Listener
+		hs.Listener = fl
+	}
+	// Connect the client.
+	hs.Start()
+	client, err := Dial(transport + "://" + hs.Listener.Addr().String())
+	if err != nil {
+		panic(err)
+	}
+	return client, hs
+}
+
+func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) {
+	// Listen on a random endpoint.
+	endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63())
+	if runtime.GOOS == "windows" {
+		endpoint = `\\.\pipe\` + endpoint
+	} else {
+		endpoint = os.TempDir() + "/" + endpoint
+	}
+	l, err := ipcListen(endpoint)
+	if err != nil {
+		panic(err)
+	}
+	// Connect the listener to the server.
+	if fl != nil {
+		fl.Listener = l
+		l = fl
+	}
+	go srv.ServeListener(l)
+	// Connect the client.
+	client, err := Dial(endpoint)
+	if err != nil {
+		panic(err)
+	}
+	return client, l
+}
+
+// flakeyListener kills accepted connections after a random timeout.
+type flakeyListener struct {
+	net.Listener
+	maxKillTimeout time.Duration
+	maxAcceptDelay time.Duration
+}
+
+func (l *flakeyListener) Accept() (net.Conn, error) {
+	delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay)))
+	time.Sleep(delay)
+
+	c, err := l.Listener.Accept()
+	if err == nil {
+		timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout)))
+		time.AfterFunc(timeout, func() {
+			glog.V(logger.Debug).Infof("killing conn %v after %v", c.LocalAddr(), timeout)
+			c.Close()
+		})
+	}
+	return c, err
+}
diff --git a/rpc/errors.go b/rpc/errors.go
index bc352fc45..9cf9dc60c 100644
--- a/rpc/errors.go
+++ b/rpc/errors.go
@@ -24,74 +24,43 @@ type methodNotFoundError struct {
 	method  string
 }
 
-func (e *methodNotFoundError) Code() int {
-	return -32601
-}
+func (e *methodNotFoundError) ErrorCode() int { return -32601 }
 
 func (e *methodNotFoundError) Error() string {
 	return fmt.Sprintf("The method %s%s%s does not exist/is not available", e.service, serviceMethodSeparator, e.method)
 }
 
 // received message isn't a valid request
-type invalidRequestError struct {
-	message string
-}
+type invalidRequestError struct{ message string }
 
-func (e *invalidRequestError) Code() int {
-	return -32600
-}
+func (e *invalidRequestError) ErrorCode() int { return -32600 }
 
-func (e *invalidRequestError) Error() string {
-	return e.message
-}
+func (e *invalidRequestError) Error() string { return e.message }
 
 // received message is invalid
-type invalidMessageError struct {
-	message string
-}
+type invalidMessageError struct{ message string }
 
-func (e *invalidMessageError) Code() int {
-	return -32700
-}
+func (e *invalidMessageError) ErrorCode() int { return -32700 }
 
-func (e *invalidMessageError) Error() string {
-	return e.message
-}
+func (e *invalidMessageError) Error() string { return e.message }
 
 // unable to decode supplied params, or an invalid number of parameters
-type invalidParamsError struct {
-	message string
-}
+type invalidParamsError struct{ message string }
 
-func (e *invalidParamsError) Code() int {
-	return -32602
-}
+func (e *invalidParamsError) ErrorCode() int { return -32602 }
 
-func (e *invalidParamsError) Error() string {
-	return e.message
-}
+func (e *invalidParamsError) Error() string { return e.message }
 
 // logic error, callback returned an error
-type callbackError struct {
-	message string
-}
+type callbackError struct{ message string }
 
-func (e *callbackError) Code() int {
-	return -32000
-}
+func (e *callbackError) ErrorCode() int { return -32000 }
 
-func (e *callbackError) Error() string {
-	return e.message
-}
+func (e *callbackError) Error() string { return e.message }
 
 // issued when a request is received after the server is issued to stop.
-type shutdownError struct {
-}
+type shutdownError struct{}
 
-func (e *shutdownError) Code() int {
-	return -32000
-}
+func (e *shutdownError) ErrorCode() int { return -32000 }
 
-func (e *shutdownError) Error() string {
-	return "server is shutting down"
-}
+func (e *shutdownError) Error() string { return "server is shutting down" }
diff --git a/rpc/http.go b/rpc/http.go
index 9283ce0ec..afcdd4bd6 100644
--- a/rpc/http.go
+++ b/rpc/http.go
@@ -22,71 +22,108 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
+	"net"
 	"net/http"
-	"net/url"
 	"strings"
+	"sync"
+	"time"
 
 	"github.com/rs/cors"
+	"golang.org/x/net/context"
 )
 
 const (
 	maxHTTPRequestContentLength = 1024 * 128
 )
 
-// httpClient connects to a geth RPC server over HTTP.
-type httpClient struct {
-	endpoint   *url.URL    // HTTP-RPC server endpoint
-	httpClient http.Client // reuse connection
-	lastRes    []byte      // HTTP requests are synchronous, store last response
+var nullAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:0")
+
+type httpConn struct {
+	client    *http.Client
+	req       *http.Request
+	closeOnce sync.Once
+	closed    chan struct{}
+}
+
+// httpConn is treated specially by Client.
+func (hc *httpConn) LocalAddr() net.Addr              { return nullAddr }
+func (hc *httpConn) RemoteAddr() net.Addr             { return nullAddr }
+func (hc *httpConn) SetReadDeadline(time.Time) error  { return nil }
+func (hc *httpConn) SetWriteDeadline(time.Time) error { return nil }
+func (hc *httpConn) SetDeadline(time.Time) error      { return nil }
+func (hc *httpConn) Write([]byte) (int, error)        { panic("Write called") }
+
+func (hc *httpConn) Read(b []byte) (int, error) {
+	<-hc.closed
+	return 0, io.EOF
+}
+
+func (hc *httpConn) Close() error {
+	hc.closeOnce.Do(func() { close(hc.closed) })
+	return nil
 }
 
-// NewHTTPClient create a new RPC clients that connection to a geth RPC server
-// over HTTP.
-func NewHTTPClient(endpoint string) (Client, error) {
-	url, err := url.Parse(endpoint)
+// DialHTTP creates a new RPC clients that connection to an RPC server over HTTP.
+func DialHTTP(endpoint string) (*Client, error) {
+	req, err := http.NewRequest("POST", endpoint, nil)
 	if err != nil {
 		return nil, err
 	}
-	return &httpClient{endpoint: url}, nil
-}
+	req.Header.Set("Content-Type", "application/json")
+	req.Header.Set("Accept", "application/json")
 
-// Send will serialize the given msg to JSON and sends it to the RPC server.
-// Since HTTP is synchronous the response is stored until Recv is called.
-func (client *httpClient) Send(msg interface{}) error {
-	var body []byte
-	var err error
+	initctx := context.Background()
+	return newClient(initctx, func(context.Context) (net.Conn, error) {
+		return &httpConn{client: new(http.Client), req: req, closed: make(chan struct{})}, nil
+	})
+}
 
-	client.lastRes = nil
-	if body, err = json.Marshal(msg); err != nil {
+func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
+	hc := c.writeConn.(*httpConn)
+	respBody, err := hc.doRequest(ctx, msg)
+	if err != nil {
 		return err
 	}
+	defer respBody.Close()
+	var respmsg jsonrpcMessage
+	if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
+		return err
+	}
+	op.resp <- &respmsg
+	return nil
+}
 
-	resp, err := client.httpClient.Post(client.endpoint.String(), "application/json", bytes.NewReader(body))
+func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
+	hc := c.writeConn.(*httpConn)
+	respBody, err := hc.doRequest(ctx, msgs)
 	if err != nil {
 		return err
 	}
-
-	defer resp.Body.Close()
-	if resp.StatusCode == http.StatusOK {
-		client.lastRes, err = ioutil.ReadAll(resp.Body)
+	defer respBody.Close()
+	var respmsgs []jsonrpcMessage
+	if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil {
 		return err
 	}
-
-	return fmt.Errorf("request failed: %s", resp.Status)
-}
-
-// Recv will try to deserialize the last received response into the given msg.
-func (client *httpClient) Recv(msg interface{}) error {
-	return json.Unmarshal(client.lastRes, &msg)
+	for _, respmsg := range respmsgs {
+		op.resp <- &respmsg
+	}
+	return nil
 }
 
-// Close is not necessary for httpClient
-func (client *httpClient) Close() {
-}
+func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
+	body, err := json.Marshal(msg)
+	if err != nil {
+		return nil, err
+	}
+	client, req := requestWithContext(hc.client, hc.req, ctx)
+	req.Body = ioutil.NopCloser(bytes.NewReader(body))
+	req.ContentLength = int64(len(body))
 
-// SupportedModules will return the collection of offered RPC modules.
-func (client *httpClient) SupportedModules() (map[string]string, error) {
-	return SupportedModules(client)
+	resp, err := client.Do(req)
+	if err != nil {
+		return nil, err
+	}
+	return resp.Body, nil
 }
 
 // httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method.
@@ -100,43 +137,39 @@ func (t *httpReadWriteNopCloser) Close() error {
 	return nil
 }
 
-// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests,
-// send the request to the given API provider and sends the response back to the caller.
-func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
-	return func(w http.ResponseWriter, r *http.Request) {
-		if r.ContentLength > maxHTTPRequestContentLength {
-			http.Error(w,
-				fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
-				http.StatusRequestEntityTooLarge)
-			return
-		}
-
-		w.Header().Set("content-type", "application/json")
-
-		// create a codec that reads direct from the request body until
-		// EOF and writes the response to w and order the server to process
-		// a single request.
-		codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
-		defer codec.Close()
-		srv.ServeSingleRequest(codec, OptionMethodInvocation)
+// NewHTTPServer creates a new HTTP RPC server around an API provider.
+//
+// Deprecated: Server implements http.Handler
+func NewHTTPServer(corsString string, srv *Server) *http.Server {
+	return &http.Server{Handler: newCorsHandler(srv, corsString)}
+}
+
+// ServeHTTP serves JSON-RPC requests over HTTP.
+func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.ContentLength > maxHTTPRequestContentLength {
+		http.Error(w,
+			fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
+			http.StatusRequestEntityTooLarge)
+		return
 	}
+	w.Header().Set("content-type", "application/json")
+
+	// create a codec that reads direct from the request body until
+	// EOF and writes the response to w and order the server to process
+	// a single request.
+	codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
+	defer codec.Close()
+	srv.ServeSingleRequest(codec, OptionMethodInvocation)
 }
 
-// NewHTTPServer creates a new HTTP RPC server around an API provider.
-func NewHTTPServer(corsString string, srv *Server) *http.Server {
+func newCorsHandler(srv *Server, corsString string) http.Handler {
 	var allowedOrigins []string
 	for _, domain := range strings.Split(corsString, ",") {
 		allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain))
 	}
-
 	c := cors.New(cors.Options{
 		AllowedOrigins: allowedOrigins,
 		AllowedMethods: []string{"POST", "GET"},
 	})
-
-	handler := c.Handler(newJSONHTTPHandler(srv))
-
-	return &http.Server{
-		Handler: handler,
-	}
+	return c.Handler(srv)
 }
diff --git a/rpc/inproc.go b/rpc/inproc.go
index 250f5c787..f72b97497 100644
--- a/rpc/inproc.go
+++ b/rpc/inproc.go
@@ -17,45 +17,18 @@
 package rpc
 
 import (
-	"encoding/json"
-	"io"
 	"net"
-)
-
-// inProcClient is an in-process buffer stream attached to an RPC server.
-type inProcClient struct {
-	server *Server
-	cl     io.Closer
-	enc    *json.Encoder
-	dec    *json.Decoder
-}
 
-// Close tears down the request channel of the in-proc client.
-func (c *inProcClient) Close() {
-	c.cl.Close()
-}
-
-// NewInProcRPCClient creates an in-process buffer stream attachment to a given
-// RPC server.
-func NewInProcRPCClient(handler *Server) Client {
-	p1, p2 := net.Pipe()
-	go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
-	return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)}
-}
-
-// Send marshals a message into a json format and injects in into the client
-// request channel.
-func (c *inProcClient) Send(msg interface{}) error {
-	return c.enc.Encode(msg)
-}
-
-// Recv reads a message from the response channel and tries to parse it into the
-// given msg interface.
-func (c *inProcClient) Recv(msg interface{}) error {
-	return c.dec.Decode(msg)
-}
+	"golang.org/x/net/context"
+)
 
-// Returns the collection of modules the RPC server offers.
-func (c *inProcClient) SupportedModules() (map[string]string, error) {
-	return SupportedModules(c)
+// NewInProcClient attaches an in-process connection to the given RPC server.
+func DialInProc(handler *Server) *Client {
+	initctx := context.Background()
+	c, _ := newClient(initctx, func(context.Context) (net.Conn, error) {
+		p1, p2 := net.Pipe()
+		go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
+		return p2, nil
+	})
+	return c
 }
diff --git a/rpc/ipc.go b/rpc/ipc.go
index 05d8909ca..c2b9e3871 100644
--- a/rpc/ipc.go
+++ b/rpc/ipc.go
@@ -17,68 +17,39 @@
 package rpc
 
 import (
-	"encoding/json"
 	"net"
+
+	"github.com/ethereum/go-ethereum/logger"
+	"github.com/ethereum/go-ethereum/logger/glog"
+	"golang.org/x/net/context"
 )
 
-// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on Windows this is a named pipe
+// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on
+// Windows this is a named pipe
 func CreateIPCListener(endpoint string) (net.Listener, error) {
 	return ipcListen(endpoint)
 }
 
-// ipcClient represent an IPC RPC client. It will connect to a given endpoint and tries to communicate with a node using
-// JSON serialization.
-type ipcClient struct {
-	endpoint string
-	conn     net.Conn
-	out      *json.Encoder
-	in       *json.Decoder
-}
-
-// NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded.
-// On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a
-// named pipe.
-func NewIPCClient(endpoint string) (Client, error) {
-	conn, err := newIPCConnection(endpoint)
-	if err != nil {
-		return nil, err
-	}
-	return &ipcClient{endpoint: endpoint, conn: conn, in: json.NewDecoder(conn), out: json.NewEncoder(conn)}, nil
-}
-
-// Send will serialize the given message and send it to the server.
-// When sending the message fails it will try to reconnect once and send the message again.
-func (client *ipcClient) Send(msg interface{}) error {
-	if err := client.out.Encode(msg); err == nil {
-		return nil
-	}
-
-	// retry once
-	client.conn.Close()
-
-	conn, err := newIPCConnection(client.endpoint)
-	if err != nil {
-		return err
+// ServeListener accepts connections on l, serving JSON-RPC on them.
+func (srv *Server) ServeListener(l net.Listener) error {
+	for {
+		conn, err := l.Accept()
+		if err != nil {
+			return err
+		}
+		glog.V(logger.Detail).Infoln("accepted conn", conn.RemoteAddr())
+		go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
 	}
-
-	client.conn = conn
-	client.in = json.NewDecoder(conn)
-	client.out = json.NewEncoder(conn)
-
-	return client.out.Encode(msg)
-}
-
-// Recv will read a message from the connection and tries to parse it. It assumes the received message is JSON encoded.
-func (client *ipcClient) Recv(msg interface{}) error {
-	return client.in.Decode(&msg)
-}
-
-// Close will close the underlying IPC connection
-func (client *ipcClient) Close() {
-	client.conn.Close()
 }
 
-// SupportedModules will return the collection of offered RPC modules.
-func (client *ipcClient) SupportedModules() (map[string]string, error) {
-	return SupportedModules(client)
+// DialIPC create a new IPC client that connects to the given endpoint. On Unix it assumes
+// the endpoint is the full path to a unix socket, and Windows the endpoint is an
+// identifier for a named pipe.
+//
+// The context is used for the initial connection establishment. It does not
+// affect subsequent interactions with the client.
+func DialIPC(ctx context.Context, endpoint string) (*Client, error) {
+	return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
+		return newIPCConnection(ctx, endpoint)
+	})
 }
diff --git a/rpc/ipc_unix.go b/rpc/ipc_unix.go
index 9ece01240..a25b21627 100644
--- a/rpc/ipc_unix.go
+++ b/rpc/ipc_unix.go
@@ -22,6 +22,8 @@ import (
 	"net"
 	"os"
 	"path/filepath"
+
+	"golang.org/x/net/context"
 )
 
 // ipcListen will create a Unix socket on the given endpoint.
@@ -40,6 +42,6 @@ func ipcListen(endpoint string) (net.Listener, error) {
 }
 
 // newIPCConnection will connect to a Unix socket on the given endpoint.
-func newIPCConnection(endpoint string) (net.Conn, error) {
-	return net.DialUnix("unix", nil, &net.UnixAddr{Name: endpoint, Net: "unix"})
+func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
+	return dialContext(ctx, "unix", endpoint)
 }
diff --git a/rpc/ipc_windows.go b/rpc/ipc_windows.go
index 8342d04d5..68234d215 100644
--- a/rpc/ipc_windows.go
+++ b/rpc/ipc_windows.go
@@ -22,16 +22,27 @@ import (
 	"net"
 	"time"
 
+	"golang.org/x/net/context"
 	"gopkg.in/natefinch/npipe.v2"
 )
 
+// This is used if the dialing context has no deadline. It is much smaller than the
+// defaultDialTimeout because named pipes are local and there is no need to wait so long.
+const defaultPipeDialTimeout = 2 * time.Second
+
 // ipcListen will create a named pipe on the given endpoint.
 func ipcListen(endpoint string) (net.Listener, error) {
 	return npipe.Listen(endpoint)
 }
 
 // newIPCConnection will connect to a named pipe with the given endpoint as name.
-func newIPCConnection(endpoint string) (net.Conn, error) {
-	timeout := 5 * time.Second
+func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
+	timeout := defaultPipeDialTimeout
+	if deadline, ok := ctx.Deadline(); ok {
+		timeout = deadline.Sub(time.Now())
+		if timeout < 0 {
+			timeout = 0
+		}
+	}
 	return npipe.DialTimeout(endpoint, timeout)
 }
diff --git a/rpc/json.go b/rpc/json.go
index ee931bc87..a7053e3f5 100644
--- a/rpc/json.go
+++ b/rpc/json.go
@@ -30,49 +30,43 @@ import (
 )
 
 const (
-	JSONRPCVersion         = "2.0"
+	jsonrpcVersion         = "2.0"
 	serviceMethodSeparator = "_"
 	subscribeMethod        = "eth_subscribe"
 	unsubscribeMethod      = "eth_unsubscribe"
 	notificationMethod     = "eth_subscription"
 )
 
-// JSON-RPC request
-type JSONRequest struct {
+type jsonRequest struct {
 	Method  string          `json:"method"`
 	Version string          `json:"jsonrpc"`
 	Id      json.RawMessage `json:"id,omitempty"`
 	Payload json.RawMessage `json:"params,omitempty"`
 }
 
-// JSON-RPC response
-type JSONSuccessResponse struct {
+type jsonSuccessResponse struct {
 	Version string      `json:"jsonrpc"`
 	Id      interface{} `json:"id,omitempty"`
 	Result  interface{} `json:"result"`
 }
 
-// JSON-RPC error object
-type JSONError struct {
+type jsonError struct {
 	Code    int         `json:"code"`
 	Message string      `json:"message"`
 	Data    interface{} `json:"data,omitempty"`
 }
 
-// JSON-RPC error response
-type JSONErrResponse struct {
+type jsonErrResponse struct {
 	Version string      `json:"jsonrpc"`
 	Id      interface{} `json:"id,omitempty"`
-	Error   JSONError   `json:"error"`
+	Error   jsonError   `json:"error"`
 }
 
-// JSON-RPC notification payload
 type jsonSubscription struct {
 	Subscription string      `json:"subscription"`
 	Result       interface{} `json:"result,omitempty"`
 }
 
-// JSON-RPC notification
 type jsonNotification struct {
 	Version string           `json:"jsonrpc"`
 	Method  string           `json:"method"`
@@ -91,6 +85,17 @@ type jsonCodec struct {
 	rw     io.ReadWriteCloser // connection
 }
 
+func (err *jsonError) Error() string {
+	if err.Message == "" {
+		return fmt.Sprintf("json-rpc error %d", err.Code)
+	}
+	return err.Message
+}
+
+func (err *jsonError) ErrorCode() int {
+	return err.Code
+}
+
 // NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0
 func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec {
 	d := json.NewDecoder(rwc)
@@ -113,7 +118,7 @@ func isBatch(msg json.RawMessage) bool {
 // ReadRequestHeaders will read new requests without parsing the arguments. It will
 // return a collection of requests, an indication if these requests are in batch
 // form or an error when the incoming message could not be read/parsed.
-func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) {
+func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, Error) {
 	c.decMu.Lock()
 	defer c.decMu.Unlock()
 
@@ -148,8 +153,8 @@ func checkReqId(reqId json.RawMessage) error {
 // parseRequest will parse a single request from the given RawMessage. It will return
 // the parsed request, an indication if the request was a batch or an error when
 // the request could not be parsed.
-func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
-	var in JSONRequest
+func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
+	var in jsonRequest
 	if err := json.Unmarshal(incomingMsg, &in); err != nil {
 		return nil, false, &invalidMessageError{err.Error()}
 	}
@@ -197,8 +202,8 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
 
 // parseBatchRequest will parse a batch request into a collection of requests from the given RawMessage, an indication
 // if the request was a batch or an error when the request could not be read.
-func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
-	var in []JSONRequest
+func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
+	var in []jsonRequest
 	if err := json.Unmarshal(incomingMsg, &in); err != nil {
 		return nil, false, &invalidMessageError{err.Error()}
 	}
@@ -253,7 +258,7 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro
 
 // ParseRequestArguments tries to parse the given params (json.RawMessage) with the given types. It returns the parsed
 // values or an error when the parsing failed.
-func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, RPCError) {
+func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, Error) {
 	if args, ok := params.(json.RawMessage); !ok {
 		return nil, &invalidParamsError{"Invalid params supplied"}
 	} else {
@@ -264,7 +269,7 @@ func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interf
 // parsePositionalArguments tries to parse the given args to an array of values with the given types.
 // It returns the parsed values or an error when the args could not be parsed. Missing optional arguments
 // are returned as reflect.Zero values.
-func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, RPCError) {
+func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, Error) {
 	params := make([]interface{}, 0, len(callbackArgs))
 	for _, t := range callbackArgs {
 		params = append(params, reflect.New(t).Interface())
@@ -302,31 +307,31 @@ func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type)
 // CreateResponse will create a JSON-RPC success response with the given id and reply as result.
 func (c *jsonCodec) CreateResponse(id interface{}, reply interface{}) interface{} {
 	if isHexNum(reflect.TypeOf(reply)) {
-		return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)}
+		return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)}
 	}
-	return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: reply}
+	return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: reply}
 }
 
 // CreateErrorResponse will create a JSON-RPC error response with the given id and error.
-func (c *jsonCodec) CreateErrorResponse(id interface{}, err RPCError) interface{} {
-	return &JSONErrResponse{Version: JSONRPCVersion, Id: id, Error: JSONError{Code: err.Code(), Message: err.Error()}}
+func (c *jsonCodec) CreateErrorResponse(id interface{}, err Error) interface{} {
+	return &jsonErrResponse{Version: jsonrpcVersion, Id: id, Error: jsonError{Code: err.ErrorCode(), Message: err.Error()}}
 }
 
 // CreateErrorResponseWithInfo will create a JSON-RPC error response with the given id and error.
 // info is optional and contains additional information about the error. When an empty string is passed it is ignored.
-func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{} {
-	return &JSONErrResponse{Version: JSONRPCVersion, Id: id,
-		Error: JSONError{Code: err.Code(), Message: err.Error(), Data: info}}
+func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{} {
+	return &jsonErrResponse{Version: jsonrpcVersion, Id: id,
+		Error: jsonError{Code: err.ErrorCode(), Message: err.Error(), Data: info}}
 }
 
 // CreateNotification will create a JSON-RPC notification with the given subscription id and event as params.
 func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} {
 	if isHexNum(reflect.TypeOf(event)) {
-		return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod,
+		return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
 			Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}}
 	}
 
-	return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod,
+	return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
 		Params: jsonSubscription{Subscription: subid, Result: event}}
 }
 
diff --git a/rpc/notification.go b/rpc/notification.go
index e84e26a58..875433071 100644
--- a/rpc/notification.go
+++ b/rpc/notification.go
@@ -28,7 +28,7 @@ import (
 
 var (
 	// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
-	ErrNotificationsUnsupported = errors.New("notifications not supported")
+	ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport")
 
 	// ErrNotificationNotFound is returned when the notification for the given id is not found
 	ErrNotificationNotFound = errors.New("notification not found")
diff --git a/rpc/notification_test.go b/rpc/notification_test.go
index 1bcede177..280503222 100644
--- a/rpc/notification_test.go
+++ b/rpc/notification_test.go
@@ -19,20 +19,31 @@ package rpc
 import (
 	"encoding/json"
 	"net"
+	"sync"
 	"testing"
 	"time"
 
 	"golang.org/x/net/context"
 )
 
-type NotificationTestService struct{}
+type NotificationTestService struct {
+	mu           sync.Mutex
+	unsubscribed bool
 
-var (
-	unsubCallbackCalled = false
-)
+	gotHangSubscriptionReq  chan struct{}
+	unblockHangSubscription chan struct{}
+}
+
+func (s *NotificationTestService) wasUnsubCallbackCalled() bool {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.unsubscribed
+}
 
 func (s *NotificationTestService) Unsubscribe(subid string) {
-	unsubCallbackCalled = true
+	s.mu.Lock()
+	s.unsubscribed = true
+	s.mu.Unlock()
 }
 
 func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) {
@@ -60,6 +71,26 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
 	return subscription, nil
 }
 
+// HangSubscription blocks on s.unblockHangSubscription before
+// sending anything.
+func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) {
+	notifier, supported := NotifierFromContext(ctx)
+	if !supported {
+		return nil, ErrNotificationsUnsupported
+	}
+
+	s.gotHangSubscriptionReq <- struct{}{}
+	<-s.unblockHangSubscription
+	subscription, err := notifier.NewSubscription(s.Unsubscribe)
+	if err != nil {
+		return nil, err
+	}
+	go func() {
+		subscription.Notify(val)
+	}()
+	return subscription, nil
+}
+
 func TestNotifications(t *testing.T) {
 	server := NewServer()
 	service := &NotificationTestService{}
@@ -90,7 +121,7 @@ func TestNotifications(t *testing.T) {
 	}
 
 	var subid string
-	response := JSONSuccessResponse{Result: subid}
+	response := jsonSuccessResponse{Result: subid}
 	if err := in.Decode(&response); err != nil {
 		t.Fatal(err)
 	}
@@ -114,7 +145,7 @@ func TestNotifications(t *testing.T) {
 	clientConn.Close() // causes notification unsubscribe callback to be called
 	time.Sleep(1 * time.Second)
 
-	if !unsubCallbackCalled {
+	if !service.wasUnsubCallbackCalled() {
 		t.Error("unsubscribe callback not called after closing connection")
 	}
 }
diff --git a/rpc/server.go b/rpc/server.go
index a9bdef285..040805a5c 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -381,7 +381,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s
 // readRequest requests the next (batch) request from the codec. It will return the collection
 // of requests, an indication if the request was a batch, the invalid request identifier and an
 // error when the request could not be read/parsed.
-func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) {
+func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
 	reqs, batch, err := codec.ReadRequestHeaders()
 	if err != nil {
 		return nil, batch, err
diff --git a/rpc/server_test.go b/rpc/server_test.go
index de47e1afd..e6840bde4 100644
--- a/rpc/server_test.go
+++ b/rpc/server_test.go
@@ -21,6 +21,7 @@ import (
 	"net"
 	"reflect"
 	"testing"
+	"time"
 
 	"golang.org/x/net/context"
 )
@@ -48,6 +49,13 @@ func (s *Service) EchoWithCtx(ctx context.Context, str string, i int, args *Args
 	return Result{str, i, args}
 }
 
+func (s *Service) Sleep(ctx context.Context, duration time.Duration) {
+	select {
+	case <-time.After(duration):
+	case <-ctx.Done():
+	}
+}
+
 func (s *Service) Rets() (string, error) {
 	return "", nil
 }
@@ -85,8 +93,8 @@ func TestServerRegisterName(t *testing.T) {
 		t.Fatalf("Expected service calc to be registered")
 	}
 
-	if len(svc.callbacks) != 4 {
-		t.Errorf("Expected 4 callbacks for service 'calc', got %d", len(svc.callbacks))
+	if len(svc.callbacks) != 5 {
+		t.Errorf("Expected 5 callbacks for service 'calc', got %d", len(svc.callbacks))
 	}
 
 	if len(svc.subscriptions) != 1 {
@@ -126,7 +134,7 @@ func testServerMethodExecution(t *testing.T, method string) {
 		t.Fatal(err)
 	}
 
-	response := JSONSuccessResponse{Result: &Result{}}
+	response := jsonSuccessResponse{Result: &Result{}}
 	if err := in.Decode(&response); err != nil {
 		t.Fatal(err)
 	}
diff --git a/rpc/types.go b/rpc/types.go
index 460581715..2a7268ad8 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -62,7 +62,7 @@ type serverRequest struct {
 	callb         *callback
 	args          []reflect.Value
 	isUnsubscribe bool
-	err           RPCError
+	err           Error
 }
 
 type serviceRegistry map[string]*service       // collection of services
@@ -88,15 +88,13 @@ type rpcRequest struct {
 	id       interface{}
 	isPubSub bool
 	params   interface{}
-	err      RPCError // invalid batch element
+	err      Error // invalid batch element
 }
 
-// RPCError implements RPC error, is add support for error codec over regular go errors
-type RPCError interface {
-	// RPC error code
-	Code() int
-	// Error message
-	Error() string
+// Error wraps RPC errors, which contain an error code in addition to the message.
+type Error interface {
+	Error() string  // returns the message
+	ErrorCode() int // returns the code
 }
 
 // ServerCodec implements reading, parsing and writing RPC messages for the server side of
@@ -104,15 +102,15 @@ type RPCError interface {
 // multiple go-routines concurrently.
 type ServerCodec interface {
 	// Read next request
-	ReadRequestHeaders() ([]rpcRequest, bool, RPCError)
+	ReadRequestHeaders() ([]rpcRequest, bool, Error)
 	// Parse request argument to the given types
-	ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, RPCError)
+	ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, Error)
 	// Assemble success response, expects response id and payload
 	CreateResponse(interface{}, interface{}) interface{}
 	// Assemble error response, expects response id and error
-	CreateErrorResponse(interface{}, RPCError) interface{}
+	CreateErrorResponse(interface{}, Error) interface{}
 	// Assemble error response with extra information about the error through info
-	CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{}
+	CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{}
 	// Create notification response
 	CreateNotification(string, interface{}) interface{}
 	// Write msg to client.
@@ -274,14 +272,3 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
 func (bn *BlockNumber) Int64() int64 {
 	return (int64)(*bn)
 }
-
-// Client defines the interface for go client that wants to connect to a geth RPC endpoint
-type Client interface {
-	// SupportedModules returns the collection of API's the server offers
-	SupportedModules() (map[string]string, error)
-
-	Send(req interface{}) error
-	Recv(msg interface{}) error
-
-	Close()
-}
diff --git a/rpc/utils.go b/rpc/utils.go
index fe482e19d..1ac6698f5 100644
--- a/rpc/utils.go
+++ b/rpc/utils.go
@@ -20,7 +20,6 @@ import (
 	"crypto/rand"
 	"encoding/hex"
 	"errors"
-	"fmt"
 	"math/big"
 	"reflect"
 	"unicode"
@@ -227,31 +226,3 @@ func newSubscriptionID() (string, error) {
 	}
 	return "0x" + hex.EncodeToString(subid[:]), nil
 }
-
-// SupportedModules returns the collection of API's that the RPC server offers
-// on which the given client connects.
-func SupportedModules(client Client) (map[string]string, error) {
-	req := JSONRequest{
-		Id:      []byte("1"),
-		Version: "2.0",
-		Method:  MetadataApi + "_modules",
-	}
-	if err := client.Send(req); err != nil {
-		return nil, err
-	}
-
-	var response JSONSuccessResponse
-	if err := client.Recv(&response); err != nil {
-		return nil, err
-	}
-	if response.Result != nil {
-		mods := make(map[string]string)
-		if modules, ok := response.Result.(map[string]interface{}); ok {
-			for m, v := range modules {
-				mods[m] = fmt.Sprintf("%s", v)
-			}
-			return mods, nil
-		}
-	}
-	return nil, fmt.Errorf("unable to retrieve modules")
-}
diff --git a/rpc/websocket.go b/rpc/websocket.go
index fe9354d94..fc3cd0709 100644
--- a/rpc/websocket.go
+++ b/rpc/websocket.go
@@ -17,36 +17,39 @@
 package rpc
 
 import (
+	"crypto/tls"
 	"fmt"
+	"net"
 	"net/http"
+	"net/url"
 	"os"
 	"strings"
-	"sync"
 
 	"github.com/ethereum/go-ethereum/logger"
 	"github.com/ethereum/go-ethereum/logger/glog"
+	"golang.org/x/net/context"
 	"golang.org/x/net/websocket"
 	"gopkg.in/fatih/set.v0"
 )
 
-// wsReaderWriterCloser reads and write payloads from and to a websocket  connection.
-type wsReaderWriterCloser struct {
-	c *websocket.Conn
-}
-
-// Read will read incoming payload data into p.
-func (rw *wsReaderWriterCloser) Read(p []byte) (int, error) {
-	return rw.c.Read(p)
-}
-
-// Write writes p to the websocket.
-func (rw *wsReaderWriterCloser) Write(p []byte) (int, error) {
-	return rw.c.Write(p)
+// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.
+//
+// allowedOrigins should be a comma-separated list of allowed origin URLs.
+// To allow connections with any origin, pass "*".
+func (srv *Server) WebsocketHandler(allowedOrigins string) http.Handler {
+	return websocket.Server{
+		Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")),
+		Handler: func(conn *websocket.Conn) {
+			srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
+		},
+	}
 }
 
-// Close closes the websocket connection.
-func (rw *wsReaderWriterCloser) Close() error {
-	return rw.c.Close()
+// NewWSServer creates a new websocket RPC server around an API provider.
+//
+// Deprecated: use Server.WebsocketHandler
+func NewWSServer(allowedOrigins string, srv *Server) *http.Server {
+	return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)}
 }
 
 // wsHandshakeValidator returns a handler that verifies the origin during the
@@ -87,96 +90,63 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http
 	return f
 }
 
-// NewWSServer creates a new websocket RPC server around an API provider.
-func NewWSServer(allowedOrigins string, handler *Server) *http.Server {
-	return &http.Server{
-		Handler: websocket.Server{
-			Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")),
-			Handler: func(conn *websocket.Conn) {
-				handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn}),
-					OptionMethodInvocation|OptionSubscriptions)
-			},
-		},
+// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server
+// that is listening on the given endpoint.
+//
+// The context is used for the initial connection establishment. It does not
+// affect subsequent interactions with the client.
+func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) {
+	if origin == "" {
+		var err error
+		if origin, err = os.Hostname(); err != nil {
+			return nil, err
+		}
+		if strings.HasPrefix(endpoint, "wss") {
+			origin = "https://" + strings.ToLower(origin)
+		} else {
+			origin = "http://" + strings.ToLower(origin)
+		}
+	}
+	config, err := websocket.NewConfig(endpoint, origin)
+	if err != nil {
+		return nil, err
 	}
-}
-
-// wsClient represents a RPC client that communicates over websockets with a
-// RPC server.
-type wsClient struct {
-	endpoint string
-	connMu   sync.Mutex
-	conn     *websocket.Conn
-}
 
-// NewWSClientj creates a new RPC client that communicates with a RPC server
-// that is listening on the given endpoint using JSON encoding.
-func NewWSClient(endpoint string) (Client, error) {
-	return &wsClient{endpoint: endpoint}, nil
+	return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
+		return wsDialContext(ctx, config)
+	})
 }
 
-// connection will return a websocket connection to the RPC server. It will
-// (re)connect when necessary.
-func (client *wsClient) connection() (*websocket.Conn, error) {
-	if client.conn != nil {
-		return client.conn, nil
+func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) {
+	var conn net.Conn
+	var err error
+	switch config.Location.Scheme {
+	case "ws":
+		conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location))
+	case "wss":
+		dialer := contextDialer(ctx)
+		conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig)
+	default:
+		err = websocket.ErrBadScheme
 	}
-
-	origin, err := os.Hostname()
 	if err != nil {
 		return nil, err
 	}
-
-	origin = "http://" + origin
-	client.conn, err = websocket.Dial(client.endpoint, "", origin)
-
-	return client.conn, err
-}
-
-// SupportedModules is the collection of modules the RPC server offers.
-func (client *wsClient) SupportedModules() (map[string]string, error) {
-	return SupportedModules(client)
-}
-
-// Send writes the JSON serialized msg to the websocket. It will create a new
-// websocket connection to the server if the client is currently not connected.
-func (client *wsClient) Send(msg interface{}) (err error) {
-	client.connMu.Lock()
-	defer client.connMu.Unlock()
-
-	var conn *websocket.Conn
-	if conn, err = client.connection(); err == nil {
-		if err = websocket.JSON.Send(conn, msg); err != nil {
-			client.conn.Close()
-			client.conn = nil
-		}
+	ws, err := websocket.NewClient(config, conn)
+	if err != nil {
+		conn.Close()
+		return nil, err
 	}
-
-	return err
+	return ws, err
 }
 
-// Recv reads a JSON message from the websocket and unmarshals it into msg.
-func (client *wsClient) Recv(msg interface{}) (err error) {
-	client.connMu.Lock()
-	defer client.connMu.Unlock()
+var wsPortMap = map[string]string{"ws": "80", "wss": "443"}
 
-	var conn *websocket.Conn
-	if conn, err = client.connection(); err == nil {
-		if err = websocket.JSON.Receive(conn, msg); err != nil {
-			client.conn.Close()
-			client.conn = nil
+func wsDialAddress(location *url.URL) string {
+	if _, ok := wsPortMap[location.Scheme]; ok {
+		if _, _, err := net.SplitHostPort(location.Host); err != nil {
+			return net.JoinHostPort(location.Host, wsPortMap[location.Scheme])
 		}
 	}
-	return
-}
-
-// Close closes the underlaying websocket connection.
-func (client *wsClient) Close() {
-	client.connMu.Lock()
-	defer client.connMu.Unlock()
-
-	if client.conn != nil {
-		client.conn.Close()
-		client.conn = nil
-	}
-
+	return location.Host
 }
-- 
GitLab