good morning!!!!
Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
B
bor
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container registry
Harbor Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
open
bor
Commits
d6625ac3
Commit
d6625ac3
authored
Aug 9, 2016
by
Péter Szilágyi
Committed by
GitHub
Aug 9, 2016
Browse files
Options
Downloads
Plain Diff
Merge pull request #2891 from fjl/rpc-client-fixes
rpc: client bug fixes
parents
b46b3672
e3292539
Branches
Branches containing commit
Tags
Tags containing commit
No related merge requests found
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
rpc/client.go
+69
-33
69 additions, 33 deletions
rpc/client.go
rpc/client_example_test.go
+31
-26
31 additions, 26 deletions
rpc/client_example_test.go
rpc/client_test.go
+54
-3
54 additions, 3 deletions
rpc/client_test.go
rpc/notification_test.go
+4
-0
4 additions, 0 deletions
rpc/notification_test.go
with
158 additions
and
62 deletions
rpc/client.go
+
69
−
33
View file @
d6625ac3
...
@@ -18,6 +18,7 @@ package rpc
...
@@ -18,6 +18,7 @@ package rpc
import
(
import
(
"bytes"
"bytes"
"container/list"
"encoding/json"
"encoding/json"
"errors"
"errors"
"fmt"
"fmt"
...
@@ -37,16 +38,31 @@ import (
...
@@ -37,16 +38,31 @@ import (
var
(
var
(
ErrClientQuit
=
errors
.
New
(
"client is closed"
)
ErrClientQuit
=
errors
.
New
(
"client is closed"
)
ErrNoResult
=
errors
.
New
(
"no result in JSON-RPC response"
)
ErrNoResult
=
errors
.
New
(
"no result in JSON-RPC response"
)
ErrSubscriptionQueueOverflow
=
errors
.
New
(
"subscription queue overflow"
)
)
)
const
(
const
(
clientSubscriptionBuffer
=
100
// if exceeded, the client stops reading
// Timeouts
tcpKeepAliveInterval
=
30
*
time
.
Second
tcpKeepAliveInterval
=
30
*
time
.
Second
defaultDialTimeout
=
10
*
time
.
Second
// used when dialing if the context has no deadline
defaultDialTimeout
=
10
*
time
.
Second
// used when dialing if the context has no deadline
defaultWriteTimeout
=
10
*
time
.
Second
// used for calls if the context has no deadline
defaultWriteTimeout
=
10
*
time
.
Second
// used for calls if the context has no deadline
subscribeTimeout
=
5
*
time
.
Second
// overall timeout eth_subscribe, rpc_modules calls
subscribeTimeout
=
5
*
time
.
Second
// overall timeout eth_subscribe, rpc_modules calls
)
)
const
(
// Subscriptions are removed when the subscriber cannot keep up.
//
// This can be worked around by supplying a channel with sufficiently sized buffer,
// but this can be inconvenient and hard to explain in the docs. Another issue with
// buffered channels is that the buffer is static even though it might not be needed
// most of the time.
//
// The approach taken here is to maintain a per-subscription linked list buffer
// shrinks on demand. If the buffer reaches the size below, the subscription is
// dropped.
maxClientSubscriptionBuffer
=
8000
)
// BatchElem is an element in a batch request.
// BatchElem is an element in a batch request.
type
BatchElem
struct
{
type
BatchElem
struct
{
Method
string
Method
string
...
@@ -276,9 +292,9 @@ func (c *Client) BatchCall(b []BatchElem) error {
...
@@ -276,9 +292,9 @@ func (c *Client) BatchCall(b []BatchElem) error {
// to return a response for all of them. The wait duration is bounded by the
// to return a response for all of them. The wait duration is bounded by the
// context's deadline.
// context's deadline.
//
//
// In contrast to CallContext, BatchCallContext only returns
I/O
errors
. Any
// In contrast to CallContext, BatchCallContext only returns errors
that have occurred
// error specific to a request is reported through the
Error field of the
//
while sending the request. Any
error specific to a request is reported through the
// corresponding BatchElem.
//
Error field of the
corresponding BatchElem.
//
//
// Note that batch calls may not be executed atomically on the server side.
// Note that batch calls may not be executed atomically on the server side.
func
(
c
*
Client
)
BatchCallContext
(
ctx
context
.
Context
,
b
[]
BatchElem
)
error
{
func
(
c
*
Client
)
BatchCallContext
(
ctx
context
.
Context
,
b
[]
BatchElem
)
error
{
...
@@ -338,12 +354,14 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
...
@@ -338,12 +354,14 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
// sent to the given channel. The element type of the channel must match the
// sent to the given channel. The element type of the channel must match the
// expected type of content returned by the subscription.
// expected type of content returned by the subscription.
//
//
// Callers should not use the same channel for multiple calls to EthSubscribe.
// The context argument cancels the RPC request that sets up the subscription but has no
// The channel is closed when the notification is unsubscribed or an error
// effect on the subscription after EthSubscribe has returned.
// occurs. The error can be retrieved via the Err method of the subscription.
//
//
// Slow subscribers will block the clients ingress path eventually.
// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
func
(
c
*
Client
)
EthSubscribe
(
channel
interface
{},
args
...
interface
{})
(
*
ClientSubscription
,
error
)
{
// before considering the subscriber dead. The subscription Err channel will receive
// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
// that the channel usually has at least one reader to prevent this issue.
func
(
c
*
Client
)
EthSubscribe
(
ctx
context
.
Context
,
channel
interface
{},
args
...
interface
{})
(
*
ClientSubscription
,
error
)
{
// Check type of channel first.
// Check type of channel first.
chanVal
:=
reflect
.
ValueOf
(
channel
)
chanVal
:=
reflect
.
ValueOf
(
channel
)
if
chanVal
.
Kind
()
!=
reflect
.
Chan
||
chanVal
.
Type
()
.
ChanDir
()
&
reflect
.
SendDir
==
0
{
if
chanVal
.
Kind
()
!=
reflect
.
Chan
||
chanVal
.
Type
()
.
ChanDir
()
&
reflect
.
SendDir
==
0
{
...
@@ -365,8 +383,6 @@ func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*Client
...
@@ -365,8 +383,6 @@ func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*Client
resp
:
make
(
chan
*
jsonrpcMessage
),
resp
:
make
(
chan
*
jsonrpcMessage
),
sub
:
newClientSubscription
(
c
,
chanVal
),
sub
:
newClientSubscription
(
c
,
chanVal
),
}
}
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
subscribeTimeout
)
defer
cancel
()
// Send the subscription request.
// Send the subscription request.
// The arrival and validity of the response is signaled on sub.quit.
// The arrival and validity of the response is signaled on sub.quit.
...
@@ -398,6 +414,10 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error
...
@@ -398,6 +414,10 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error
err
:=
c
.
write
(
ctx
,
msg
)
err
:=
c
.
write
(
ctx
,
msg
)
c
.
sendDone
<-
err
c
.
sendDone
<-
err
return
err
return
err
case
<-
ctx
.
Done
()
:
// This can happen if the client is overloaded or unable to keep up with
// subscription notifications.
return
ctx
.
Err
()
case
<-
c
.
didQuit
:
case
<-
c
.
didQuit
:
return
ErrClientQuit
return
ErrClientQuit
}
}
...
@@ -653,8 +673,7 @@ func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription
...
@@ -653,8 +673,7 @@ func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription
channel
:
channel
,
channel
:
channel
,
quit
:
make
(
chan
struct
{}),
quit
:
make
(
chan
struct
{}),
err
:
make
(
chan
error
,
1
),
err
:
make
(
chan
error
,
1
),
// in is buffered so dispatch can continue even if the subscriber is slow.
in
:
make
(
chan
json
.
RawMessage
),
in
:
make
(
chan
json
.
RawMessage
,
clientSubscriptionBuffer
),
}
}
return
sub
return
sub
}
}
...
@@ -680,13 +699,16 @@ func (sub *ClientSubscription) Unsubscribe() {
...
@@ -680,13 +699,16 @@ func (sub *ClientSubscription) Unsubscribe() {
func
(
sub
*
ClientSubscription
)
quitWithError
(
err
error
,
unsubscribeServer
bool
)
{
func
(
sub
*
ClientSubscription
)
quitWithError
(
err
error
,
unsubscribeServer
bool
)
{
sub
.
quitOnce
.
Do
(
func
()
{
sub
.
quitOnce
.
Do
(
func
()
{
// The dispatch loop won't be able to execute the unsubscribe call
// if it is blocked on deliver. Close sub.quit first because it
// unblocks deliver.
close
(
sub
.
quit
)
if
unsubscribeServer
{
if
unsubscribeServer
{
sub
.
requestUnsubscribe
()
sub
.
requestUnsubscribe
()
}
}
if
err
!=
nil
{
if
err
!=
nil
{
sub
.
err
<-
err
sub
.
err
<-
err
}
}
close
(
sub
.
quit
)
})
})
}
}
...
@@ -706,32 +728,46 @@ func (sub *ClientSubscription) start() {
...
@@ -706,32 +728,46 @@ func (sub *ClientSubscription) start() {
func
(
sub
*
ClientSubscription
)
forward
()
(
err
error
,
unsubscribeServer
bool
)
{
func
(
sub
*
ClientSubscription
)
forward
()
(
err
error
,
unsubscribeServer
bool
)
{
cases
:=
[]
reflect
.
SelectCase
{
cases
:=
[]
reflect
.
SelectCase
{
{
Dir
:
reflect
.
SelectRecv
,
Chan
:
reflect
.
ValueOf
(
sub
.
quit
)},
{
Dir
:
reflect
.
SelectRecv
,
Chan
:
reflect
.
ValueOf
(
sub
.
quit
)},
{
Dir
:
reflect
.
SelectRecv
,
Chan
:
reflect
.
ValueOf
(
sub
.
in
)},
{
Dir
:
reflect
.
SelectSend
,
Chan
:
sub
.
channel
},
{
Dir
:
reflect
.
SelectSend
,
Chan
:
sub
.
channel
},
}
}
buffer
:=
list
.
New
()
defer
buffer
.
Init
()
for
{
for
{
select
{
var
chosen
int
case
result
:=
<-
sub
.
in
:
var
recv
reflect
.
Value
val
,
err
:=
sub
.
unmarshal
(
result
)
if
buffer
.
Len
()
==
0
{
if
err
!=
nil
{
// Idle, omit send case.
return
err
,
true
chosen
,
recv
,
_
=
reflect
.
Select
(
cases
[
:
2
])
}
else
{
// Non-empty buffer, send the first queued item.
cases
[
2
]
.
Send
=
reflect
.
ValueOf
(
buffer
.
Front
()
.
Value
)
chosen
,
recv
,
_
=
reflect
.
Select
(
cases
)
}
}
cases
[
1
]
.
Send
=
val
switch
chosen
,
_
,
_
:=
reflect
.
Select
(
cases
);
chosen
{
switch
chosen
{
case
0
:
// <-sub.quit
case
0
:
// <-sub.quit
return
nil
,
false
return
nil
,
false
case
1
:
// sub.channel<-
case
1
:
// <-sub.in
continue
val
,
err
:=
sub
.
unmarshal
(
recv
.
Interface
()
.
(
json
.
RawMessage
))
if
err
!=
nil
{
return
err
,
true
}
}
case
<-
sub
.
quit
:
if
buffer
.
Len
()
==
maxClientSubscriptionBuffer
{
return
nil
,
false
return
ErrSubscriptionQueueOverflow
,
true
}
buffer
.
PushBack
(
val
)
case
2
:
// sub.channel<-
cases
[
2
]
.
Send
=
reflect
.
Value
{}
// Don't hold onto the value.
buffer
.
Remove
(
buffer
.
Front
())
}
}
}
}
}
}
func
(
sub
*
ClientSubscription
)
unmarshal
(
result
json
.
RawMessage
)
(
reflect
.
Value
,
error
)
{
func
(
sub
*
ClientSubscription
)
unmarshal
(
result
json
.
RawMessage
)
(
interface
{}
,
error
)
{
val
:=
reflect
.
New
(
sub
.
etype
)
val
:=
reflect
.
New
(
sub
.
etype
)
err
:=
json
.
Unmarshal
(
result
,
val
.
Interface
())
err
:=
json
.
Unmarshal
(
result
,
val
.
Interface
())
return
val
.
Elem
(),
err
return
val
.
Elem
()
.
Interface
()
,
err
}
}
func
(
sub
*
ClientSubscription
)
requestUnsubscribe
()
error
{
func
(
sub
*
ClientSubscription
)
requestUnsubscribe
()
error
{
...
...
This diff is collapsed.
Click to expand it.
rpc/client_example_test.go
+
31
−
26
View file @
d6625ac3
...
@@ -22,6 +22,7 @@ import (
...
@@ -22,6 +22,7 @@ import (
"time"
"time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/context"
)
)
// In this example, our client whishes to track the latest 'block number'
// In this example, our client whishes to track the latest 'block number'
...
@@ -41,7 +42,16 @@ func ExampleClientSubscription() {
...
@@ -41,7 +42,16 @@ func ExampleClientSubscription() {
// Connect the client.
// Connect the client.
client
,
_
:=
rpc
.
Dial
(
"ws://127.0.0.1:8485"
)
client
,
_
:=
rpc
.
Dial
(
"ws://127.0.0.1:8485"
)
subch
:=
make
(
chan
Block
)
subch
:=
make
(
chan
Block
)
go
subscribeBlocks
(
client
,
subch
)
// Ensure that subch receives the latest block.
go
func
()
{
for
i
:=
0
;
;
i
++
{
if
i
>
0
{
time
.
Sleep
(
2
*
time
.
Second
)
}
subscribeBlocks
(
client
,
subch
)
}
}()
// Print events from the subscription as they arrive.
// Print events from the subscription as they arrive.
for
block
:=
range
subch
{
for
block
:=
range
subch
{
...
@@ -52,26 +62,22 @@ func ExampleClientSubscription() {
...
@@ -52,26 +62,22 @@ func ExampleClientSubscription() {
// subscribeBlocks runs in its own goroutine and maintains
// subscribeBlocks runs in its own goroutine and maintains
// a subscription for new blocks.
// a subscription for new blocks.
func
subscribeBlocks
(
client
*
rpc
.
Client
,
subch
chan
Block
)
{
func
subscribeBlocks
(
client
*
rpc
.
Client
,
subch
chan
Block
)
{
for
i
:=
0
;
;
i
++
{
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
if
i
>
0
{
defer
cancel
()
time
.
Sleep
(
2
*
time
.
Second
)
}
// Subscribe to new blocks.
// Subscribe to new blocks.
sub
,
err
:=
client
.
EthSubscribe
(
subch
,
"newBlocks"
)
sub
,
err
:=
client
.
EthSubscribe
(
ctx
,
subch
,
"newBlocks"
)
if
err
==
rpc
.
ErrClientQuit
{
if
err
!=
nil
{
return
// Stop reconnecting if the client was closed.
}
else
if
err
!=
nil
{
fmt
.
Println
(
"subscribe error:"
,
err
)
fmt
.
Println
(
"subscribe error:"
,
err
)
continue
return
}
}
// The connection is established now.
// The connection is established now.
// Update the channel with the current block.
// Update the channel with the current block.
var
lastBlock
Block
var
lastBlock
Block
if
err
:=
client
.
Call
(
&
lastBlock
,
"eth_getBlockByNumber"
,
"latest"
);
err
!=
nil
{
if
err
:=
client
.
Call
Context
(
ctx
,
&
lastBlock
,
"eth_getBlockByNumber"
,
"latest"
);
err
!=
nil
{
fmt
.
Println
(
"can't get latest block:"
,
err
)
fmt
.
Println
(
"can't get latest block:"
,
err
)
continue
return
}
}
subch
<-
lastBlock
subch
<-
lastBlock
...
@@ -80,4 +86,3 @@ func subscribeBlocks(client *rpc.Client, subch chan Block) {
...
@@ -80,4 +86,3 @@ func subscribeBlocks(client *rpc.Client, subch chan Block) {
// the connection.
// the connection.
fmt
.
Println
(
"connection lost: "
,
<-
sub
.
Err
())
fmt
.
Println
(
"connection lost: "
,
<-
sub
.
Err
())
}
}
}
This diff is collapsed.
Click to expand it.
rpc/client_test.go
+
54
−
3
View file @
d6625ac3
...
@@ -215,7 +215,7 @@ func TestClientSubscribeInvalidArg(t *testing.T) {
...
@@ -215,7 +215,7 @@ func TestClientSubscribeInvalidArg(t *testing.T) {
t
.
Error
(
string
(
buf
))
t
.
Error
(
string
(
buf
))
}
}
}()
}()
client
.
EthSubscribe
(
arg
,
"foo_bar"
)
client
.
EthSubscribe
(
context
.
Background
(),
arg
,
"foo_bar"
)
}
}
check
(
true
,
nil
)
check
(
true
,
nil
)
check
(
true
,
1
)
check
(
true
,
1
)
...
@@ -233,7 +233,7 @@ func TestClientSubscribe(t *testing.T) {
...
@@ -233,7 +233,7 @@ func TestClientSubscribe(t *testing.T) {
nc
:=
make
(
chan
int
)
nc
:=
make
(
chan
int
)
count
:=
10
count
:=
10
sub
,
err
:=
client
.
EthSubscribe
(
nc
,
"someSubscription"
,
count
,
0
)
sub
,
err
:=
client
.
EthSubscribe
(
context
.
Background
(),
nc
,
"someSubscription"
,
count
,
0
)
if
err
!=
nil
{
if
err
!=
nil
{
t
.
Fatal
(
"can't subscribe:"
,
err
)
t
.
Fatal
(
"can't subscribe:"
,
err
)
}
}
...
@@ -275,7 +275,7 @@ func TestClientSubscribeClose(t *testing.T) {
...
@@ -275,7 +275,7 @@ func TestClientSubscribeClose(t *testing.T) {
err
error
err
error
)
)
go
func
()
{
go
func
()
{
sub
,
err
=
client
.
EthSubscribe
(
nc
,
"hangSubscription"
,
999
)
sub
,
err
=
client
.
EthSubscribe
(
context
.
Background
(),
nc
,
"hangSubscription"
,
999
)
errc
<-
err
errc
<-
err
}()
}()
...
@@ -296,6 +296,57 @@ func TestClientSubscribeClose(t *testing.T) {
...
@@ -296,6 +296,57 @@ func TestClientSubscribeClose(t *testing.T) {
}
}
}
}
// This test checks that Client doesn't lock up when a single subscriber
// doesn't read subscription events.
func
TestClientNotificationStorm
(
t
*
testing
.
T
)
{
server
:=
newTestServer
(
"eth"
,
new
(
NotificationTestService
))
defer
server
.
Stop
()
doTest
:=
func
(
count
int
,
wantError
bool
)
{
client
:=
DialInProc
(
server
)
defer
client
.
Close
()
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
// Subscribe on the server. It will start sending many notifications
// very quickly.
nc
:=
make
(
chan
int
)
sub
,
err
:=
client
.
EthSubscribe
(
ctx
,
nc
,
"someSubscription"
,
count
,
0
)
if
err
!=
nil
{
t
.
Fatal
(
"can't subscribe:"
,
err
)
}
defer
sub
.
Unsubscribe
()
// Process each notification, try to run a call in between each of them.
for
i
:=
0
;
i
<
count
;
i
++
{
select
{
case
val
:=
<-
nc
:
if
val
!=
i
{
t
.
Fatalf
(
"(%d/%d) unexpected value %d"
,
i
,
count
,
val
)
}
case
err
:=
<-
sub
.
Err
()
:
if
wantError
&&
err
!=
ErrSubscriptionQueueOverflow
{
t
.
Fatalf
(
"(%d/%d) got error %q, want %q"
,
i
,
count
,
err
,
ErrSubscriptionQueueOverflow
)
}
else
if
!
wantError
{
t
.
Fatalf
(
"(%d/%d) got unexpected error %q"
,
i
,
count
,
err
)
}
return
}
var
r
int
err
:=
client
.
CallContext
(
ctx
,
&
r
,
"eth_echo"
,
i
)
if
err
!=
nil
{
if
!
wantError
{
t
.
Fatalf
(
"(%d/%d) call error: %v"
,
i
,
count
,
err
)
}
return
}
}
}
doTest
(
8000
,
false
)
doTest
(
10000
,
true
)
}
func
TestClientHTTP
(
t
*
testing
.
T
)
{
func
TestClientHTTP
(
t
*
testing
.
T
)
{
server
:=
newTestServer
(
"service"
,
new
(
Service
))
server
:=
newTestServer
(
"service"
,
new
(
Service
))
defer
server
.
Stop
()
defer
server
.
Stop
()
...
...
This diff is collapsed.
Click to expand it.
rpc/notification_test.go
+
4
−
0
View file @
d6625ac3
...
@@ -34,6 +34,10 @@ type NotificationTestService struct {
...
@@ -34,6 +34,10 @@ type NotificationTestService struct {
unblockHangSubscription
chan
struct
{}
unblockHangSubscription
chan
struct
{}
}
}
func
(
s
*
NotificationTestService
)
Echo
(
i
int
)
int
{
return
i
}
func
(
s
*
NotificationTestService
)
wasUnsubCallbackCalled
()
bool
{
func
(
s
*
NotificationTestService
)
wasUnsubCallbackCalled
()
bool
{
s
.
mu
.
Lock
()
s
.
mu
.
Lock
()
defer
s
.
mu
.
Unlock
()
defer
s
.
mu
.
Unlock
()
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment