good morning!!!!
Skip to content
GitLab
Explore
Sign in
Commits on Source (5)
pooling
· 06a5b704
a
authored
Aug 06, 2023
06a5b704
fix exports
· a74c21c5
a
authored
Aug 07, 2023
a74c21c5
Add LICENSE
· 3c5392bb
a
authored
Aug 10, 2023
3c5392bb
content type application jayson
· 54ef76de
a
authored
Aug 11, 2023
54ef76de
Update dialer.go
· 180c42ba
a
authored
Aug 15, 2023
180c42ba
Hide whitespace changes
Inline
Side-by-side
LICENSE
0 → 100644
View file @
180c42ba
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <https://unlicense.org>
contrib/client/pooling.go
0 → 100644
View file @
180c42ba
package
client
import
(
"context"
"net"
"sync"
"sync/atomic"
"gfx.cafe/open/jrpc"
"gfx.cafe/open/jrpc/pkg/codec"
)
var
_
codec
.
Conn
=
(
*
Pooling
)(
nil
)
type
Pooling
struct
{
dialer
func
(
ctx
context
.
Context
)
(
jrpc
.
Conn
,
error
)
conns
chan
codec
.
Conn
base
codec
.
Conn
closed
atomic
.
Bool
middleware
[]
codec
.
Middleware
mu
sync
.
Mutex
}
func
NewPooling
(
dialer
func
(
ctx
context
.
Context
)
(
jrpc
.
Conn
,
error
),
max
int
)
*
Pooling
{
r
:=
&
Pooling
{
dialer
:
dialer
,
conns
:
make
(
chan
codec
.
Conn
,
max
),
}
return
r
}
func
(
r
*
Pooling
)
Do
(
ctx
context
.
Context
,
result
any
,
method
string
,
params
any
)
error
{
if
r
.
closed
.
Load
()
{
return
net
.
ErrClosed
}
errChan
:=
make
(
chan
error
)
go
func
()
{
conn
,
err
:=
r
.
getClient
(
ctx
)
if
err
!=
nil
{
errChan
<-
err
return
}
defer
r
.
putClient
(
conn
)
errChan
<-
conn
.
Do
(
ctx
,
result
,
method
,
params
)
}()
return
<-
errChan
}
func
(
r
*
Pooling
)
BatchCall
(
ctx
context
.
Context
,
b
...*
codec
.
BatchElem
)
error
{
if
r
.
closed
.
Load
()
{
return
net
.
ErrClosed
}
errChan
:=
make
(
chan
error
)
go
func
()
{
conn
,
err
:=
r
.
getClient
(
ctx
)
if
err
!=
nil
{
errChan
<-
err
return
}
defer
r
.
putClient
(
conn
)
errChan
<-
conn
.
BatchCall
(
ctx
,
b
...
)
}()
return
<-
errChan
}
func
(
p
*
Pooling
)
Mount
(
m
codec
.
Middleware
)
{
p
.
middleware
=
append
(
p
.
middleware
,
m
)
}
func
(
p
*
Pooling
)
Close
()
error
{
if
p
.
closed
.
CompareAndSwap
(
false
,
true
)
{
for
k
:=
range
p
.
conns
{
k
.
Close
()
}
}
return
nil
}
func
(
p
*
Pooling
)
Closed
()
<-
chan
struct
{}
{
return
make
(
<-
chan
struct
{})
}
func
(
r
*
Pooling
)
getClient
(
ctx
context
.
Context
)
(
jrpc
.
Conn
,
error
)
{
select
{
case
<-
ctx
.
Done
()
:
return
nil
,
ctx
.
Err
()
case
<-
r
.
conns
:
default
:
}
conn
,
err
:=
r
.
dialer
(
ctx
)
if
err
!=
nil
{
return
nil
,
err
}
return
conn
,
nil
}
func
(
r
*
Pooling
)
putClient
(
conn
jrpc
.
Conn
)
{
if
r
.
closed
.
Load
()
{
return
}
select
{
case
<-
conn
.
Closed
()
:
default
:
}
select
{
case
r
.
conns
<-
conn
:
default
:
conn
.
Close
()
}
}
contrib/client/reconnecting.go
View file @
180c42ba
...
...
@@ -8,6 +8,8 @@ import (
"gfx.cafe/open/jrpc/pkg/codec"
)
var
_
jrpc
.
Conn
=
(
*
Reconnecting
)(
nil
)
type
Reconnecting
struct
{
dialer
func
(
ctx
context
.
Context
)
(
jrpc
.
Conn
,
error
)
base
codec
.
Conn
...
...
contrib/codecs/dialer.go
View file @
180c42ba
...
...
@@ -8,7 +8,6 @@ import (
"gfx.cafe/open/jrpc/contrib/codecs/http"
"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
"gfx.cafe/open/jrpc/contrib/codecs/websocket"
"gfx.cafe/open/jrpc/exp/redis"
"gfx.cafe/open/jrpc/pkg/codec"
)
...
...
@@ -22,12 +21,12 @@ func DialContext(ctx context.Context, u string) (codec.Conn, error) {
return
http
.
Dial
(
ctx
,
nil
,
u
)
case
"ws"
,
"wss"
:
return
websocket
.
DialWebsocket
(
ctx
,
u
,
""
)
case
"redis"
:
domain
:=
pu
.
Query
()
.
Get
(
"domain"
)
if
domain
==
""
{
domain
=
"jrpc"
}
return
redis
.
Dial
(
pu
.
Host
,
domain
),
nil
//
case "redis":
//
domain := pu.Query().Get("domain")
//
if domain == "" {
//
domain = "jrpc"
//
}
//
return redis.Dial(pu.Host, domain), nil
case
"tcp"
:
tcpAddr
,
err
:=
net
.
ResolveTCPAddr
(
"tcp"
,
u
)
if
err
!=
nil
{
...
...
contrib/codecs/http/client.go
View file @
180c42ba
...
...
@@ -125,6 +125,7 @@ func (c *Client) post(req *codec.Request) (*http.Response, error) {
hreq
.
Header
.
Add
(
k
,
vv
)
}
}
c
.
headers
.
Add
(
"Content-Type"
,
"application/json"
)
return
c
.
c
.
Do
(
hreq
)
}
...
...
exports.go
View file @
180c42ba
...
...
@@ -20,7 +20,7 @@ type (
// ResponseWriter is used to write responses to the request
ResponseWriter
=
codec
.
ResponseWriter
// StreamingConn is a conn that supports streaming methods
StreamingConn
=
codec
.
Conn
StreamingConn
=
codec
.
Streaming
Conn
// Request is the request object
Request
=
codec
.
Request
// Server is a jrpc server
...
...