Skip to content

Commit e78d257

Browse files
authored
Use client code also to handle server messaging in bidirectional transports (#252)
1 parent 0d25b6d commit e78d257

File tree

6 files changed

+108
-124
lines changed

6 files changed

+108
-124
lines changed

json_rpc/client.nim

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ when (NimMajor, NimMinor, NimPatch) < (2, 2, 6):
1818
discard Future[string]().value()
1919

2020
import
21-
std/[deques, json, tables, macros],
21+
std/[deques, hashes, json, tables, macros],
2222
chronos,
2323
chronicles,
2424
stew/byteutils,
@@ -61,11 +61,16 @@ type
6161
# Client identifier, for logging
6262
maxMessageSize*: int
6363

64-
router*: ref RpcRouter
64+
RpcConnection* = ref object of RpcClient
65+
router*:
66+
proc(request: RequestBatchRx): Future[string] {.async: (raises: []).}
6567
## Router used for transports that support bidirectional communication
6668

6769
GetJsonRpcRequestHeaders* = proc(): seq[(string, string)] {.gcsafe, raises: [].}
6870

71+
func hash*(v: RpcClient): Hash =
72+
cast[Hash](addr v[])
73+
6974
func parseResponse*(payload: openArray[byte], T: type): T {.raises: [JsonRpcError].} =
7075
try:
7176
JrpcSys.decode(payload, T)
@@ -111,7 +116,7 @@ proc callOnProcessMessage*(
111116
ok(true)
112117

113118
proc processMessage*(
114-
client: RpcClient, line: sink seq[byte]
119+
client: RpcConnection, line: seq[byte]
115120
): Future[Result[string, string]] {.async: (raises: []).} =
116121
if not ?client.callOnProcessMessage(line):
117122
return ok("")
@@ -142,7 +147,7 @@ proc processMessage*(
142147
return ok(wrapError(router.INVALID_REQUEST, exc.msg))
143148

144149
if client.router != nil:
145-
ok(await client.router[].route(request))
150+
ok(await client.router(request))
146151
else:
147152
ok("")
148153

json_rpc/clients/socketclient.nim

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import
1818
export client, errors
1919

2020
type
21-
RpcSocketClient* = ref object of RpcClient
21+
RpcSocketClient* = ref object of RpcConnection
2222
transport*: StreamTransport
2323
address*: TransportAddress
2424
loop*: Future[void]
@@ -28,6 +28,13 @@ proc new*(
2828
maxMessageSize = defaultMaxMessageSize,
2929
router = default(ref RpcRouter),
3030
): T =
31+
let router =
32+
if router != nil:
33+
proc(request: RequestBatchRx): Future[string] {.async: (raises: [], raw: true).} =
34+
router[].route(request)
35+
else:
36+
nil
37+
3138
T(maxMessageSize: maxMessageSize, router: router)
3239

3340
proc newRpcSocketClient*(
@@ -71,28 +78,28 @@ method request(
7178

7279
await fut
7380

74-
proc processData(client: RpcSocketClient) {.async: (raises: []).} =
81+
proc processMessages*(client: RpcSocketClient) {.async: (raises: []).} =
82+
# Provide backwards compat with consumers that don't set a max message size
83+
# for example by constructing RpcWebSocketHandler without going through init
84+
let maxMessageSize =
85+
if client.maxMessageSize == 0: defaultMaxMessageSize else: client.maxMessageSize
86+
7587
var lastError: ref JsonRpcError
7688
while true:
77-
let data =
78-
try:
79-
await client.transport.readLine(client.maxMessageSize)
80-
except CatchableError as exc:
81-
lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc)
89+
try:
90+
let data = await client.transport.readLine(maxMessageSize)
91+
if data == "":
8292
break
83-
if data == "":
84-
break
8593

86-
let resp = await(client.processMessage(data.toBytes())).valueOr:
87-
lastError = (ref RequestDecodeError)(msg: error, payload: data.toBytes())
88-
break
94+
let resp = await(client.processMessage(data.toBytes())).valueOr:
95+
lastError = (ref RequestDecodeError)(msg: error, payload: data.toBytes())
96+
break
8997

90-
if resp.len > 0:
91-
try:
98+
if resp.len > 0:
9299
discard await client.transport.write(resp & "\r\n")
93-
except CatchableError as exc:
94-
lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc)
95-
break
100+
except CatchableError as exc:
101+
lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc)
102+
break
96103

97104
if lastError == nil:
98105
lastError = (ref RpcTransportError)(msg: "Connection closed")
@@ -115,7 +122,7 @@ proc connect*(
115122

116123
client.address = address
117124
client.remote = $client.address
118-
client.loop = processData(client)
125+
client.loop = processMessages(client)
119126

120127
proc connect*(
121128
client: RpcSocketClient, address: string, port: Port

json_rpc/clients/websocketclient.nim

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import
2020
export client, errors
2121

2222
type
23-
RpcWebSocketClient* = ref object of RpcClient
23+
RpcWebSocketClient* = ref object of RpcConnection
2424
transport*: WSSession
2525
uri*: Uri
2626
loop*: Future[void]
@@ -32,6 +32,13 @@ proc new*(
3232
maxMessageSize = defaultMaxMessageSize,
3333
router = default(ref RpcRouter),
3434
): T =
35+
let router =
36+
if router != nil:
37+
proc(request: RequestBatchRx): Future[string] {.async: (raises: [], raw: true).} =
38+
router[].route(request)
39+
else:
40+
nil
41+
3542
T(getHeaders: getHeaders, maxMessageSize: maxMessageSize, router: router)
3643

3744
proc newRpcWebSocketClient*(
@@ -82,26 +89,26 @@ method request*(
8289

8390
await fut
8491

85-
proc processData(client: RpcWebSocketClient) {.async: (raises: []).} =
92+
proc processMessages*(client: RpcWebSocketClient) {.async: (raises: []).} =
93+
# Provide backwards compat with consumers that don't set a max message size
94+
# for example by constructing RpcWebSocketHandler without going through init
95+
let maxMessageSize =
96+
if client.maxMessageSize == 0: defaultMaxMessageSize else: client.maxMessageSize
97+
8698
var lastError: ref JsonRpcError
8799
while client.transport.readyState != ReadyState.Closed:
88-
var data =
89-
try:
90-
await client.transport.recvMsg(client.maxMessageSize)
91-
except CatchableError as exc:
92-
lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc)
93-
break
100+
try:
101+
let data = await client.transport.recvMsg(maxMessageSize)
94102

95-
let resp = await(client.processMessage(data)).valueOr:
96-
lastError = (ref RequestDecodeError)(msg: error, payload: data)
97-
break
103+
let resp = await(client.processMessage(data)).valueOr:
104+
lastError = (ref RequestDecodeError)(msg: error, payload: data)
105+
break
98106

99-
if resp.len > 0:
100-
try:
107+
if resp.len > 0:
101108
await client.transport.send(resp)
102-
except CatchableError as exc:
103-
lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc)
104-
break
109+
except CatchableError as exc:
110+
lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc)
111+
break
105112

106113
if lastError == nil:
107114
lastError = (ref RpcTransportError)(msg: "Connection closed")
@@ -162,7 +169,7 @@ proc connect*(
162169
client.transport = ws
163170
client.uri = uri
164171
client.remote = uri.hostname & ":" & uri.port
165-
client.loop = processData(client)
172+
client.loop = processMessages(client)
166173

167174
method close*(client: RpcWebSocketClient) {.async: (raises: []).} =
168175
await client.loop.cancelAndWait()

json_rpc/server.nim

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,20 @@
1010
{.push raises: [], gcsafe.}
1111

1212
import
13-
std/[json, sequtils],
13+
std/[json, sequtils, sets],
1414
chronos,
1515
./[client, errors, jsonmarshal, router],
1616
./private/jrpc_sys,
1717
./private/shared_wrapper
1818

19-
export
20-
chronos,
21-
client,
22-
jsonmarshal,
23-
router
19+
export chronos, client, jsonmarshal, router, sets
2420

2521
type
2622
RpcServer* = ref object of RootRef
2723
router*: RpcRouter
2824

29-
connections*: seq[RpcClient]
25+
# For servers that expose bidirectional connections, keep track of them
26+
connections*: HashSet[RpcConnection]
3027

3128
# ------------------------------------------------------------------------------
3229
# Constructors

json_rpc/servers/socketserver.nim

Lines changed: 20 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import
1616
json_serialization/std/net as jsnet,
1717
../private/utils,
1818
../[errors, server],
19+
../private/jrpc_sys,
1920
../clients/socketclient
2021

2122
export errors, server, jsnet
@@ -29,44 +30,30 @@ type
2930
processClientHook: StreamCallback2
3031
maxMessageSize: int
3132

32-
proc processClient(server: StreamServer, transport: StreamTransport) {.async: (raises: []).} =
33+
proc processClient(
34+
server: StreamServer, transport: StreamTransport
35+
) {.async: (raises: []).} =
3336
## Process transport data to the RPC server
3437

3538
let
3639
rpc = getUserData[RpcSocketServer](server)
3740
remote = transport.remoteAddress2().valueOr(default(TransportAddress))
38-
c = RpcSocketClient(transport: transport, address: remote, remote: $remote)
39-
40-
rpc.connections.add(c)
41-
42-
# Provide backwards compat with consumers that don't set a max message size
43-
# for example by constructing RpcWebSocketHandler without going through init
44-
let maxMessageSize =
45-
if rpc.maxMessageSize == 0: defaultMaxMessageSize else: rpc.maxMessageSize
46-
47-
try:
48-
while true:
49-
let req = await transport.readLine(maxMessageSize)
50-
if req == "":
51-
break
52-
53-
debug "Received JSON-RPC request",
54-
address = transport.remoteAddress(),
55-
len = req.len
56-
57-
let res = await rpc.route(req)
58-
if res.len > 0:
59-
discard await transport.write(res & "\r\n")
60-
except TransportError as ex:
61-
error "Transport closed during processing client",
62-
remote,
63-
msg=ex.msg
64-
except CancelledError:
65-
debug "JSON-RPC request processing cancelled", remote
66-
67-
rpc.connections.keepItIf(it != c)
68-
69-
await transport.closeWait()
41+
c = RpcSocketClient(
42+
transport: transport,
43+
address: remote,
44+
remote: $remote,
45+
maxMessageSize: rpc.maxMessageSize,
46+
router: proc(
47+
request: RequestBatchRx
48+
): Future[string] {.async: (raises: [], raw: true).} =
49+
rpc.router.route(request),
50+
)
51+
52+
rpc.connections.incl(c)
53+
54+
await c.processMessages()
55+
56+
rpc.connections.excl(c)
7057

7158
# Utility functions for setting up servers using stream transport addresses
7259

json_rpc/servers/websocketserver.nim

Lines changed: 26 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
{.push raises: [], gcsafe.}
1111

1212
import
13-
std/sequtils,
1413
chronicles,
1514
chronos,
1615
websock/[websock, types],
1716
websock/extensions/compression/deflate,
1817
json_serialization/std/net as jsnet,
1918
../[errors, server],
19+
../private/jrpc_sys,
2020
../clients/websocketclient
2121

2222
export errors, server, jsnet
@@ -46,58 +46,39 @@ type
4646

4747
proc serveHTTP*(rpc: RpcWebSocketHandler, request: HttpRequest)
4848
{.async: (raises: [CancelledError]).} =
49-
try:
49+
let ws = try:
5050
let server = rpc.wsserver
5151
let ws = await server.handleRequest(request)
5252
if ws.readyState != ReadyState.Open:
53-
error "Failed to open websocket connection",
54-
address = $request.uri
53+
error "Failed to open websocket connection", address = $request.uri
5554
return
56-
57-
trace "Websocket handshake completed"
58-
let c = RpcWebSocketClient(transport: ws, remote: $request.uri)
59-
rpc.connections.add(c)
60-
# Provide backwards compat with consumers that don't set a max message size
61-
# for example by constructing RpcWebSocketHandler without going through init
62-
let maxMessageSize =
63-
if rpc.maxMessageSize == 0: defaultMaxMessageSize else: rpc.maxMessageSize
64-
while ws.readyState != ReadyState.Closed:
65-
66-
let req = await ws.recvMsg(maxMessageSize)
67-
debug "Received JSON-RPC request",
68-
address = $request.uri,
69-
len = req.len
70-
71-
if ws.readyState == ReadyState.Closed:
72-
# if session already terminated by peer,
73-
# no need to send response
74-
break
75-
76-
if req.len == 0:
77-
await ws.close(
78-
reason = "cannot process zero length message"
79-
)
80-
break
81-
82-
let data = await rpc.route(req)
83-
84-
if data.len > 0:
85-
await ws.send(data)
86-
87-
rpc.connections.keepItIf(it != c)
88-
55+
ws
8956
except WebSocketError as exc:
90-
error "WebSocket error:",
91-
address = $request.uri, msg = exc.msg
92-
57+
error "WebSocket error:", address = $request.uri, msg = exc.msg
58+
return
9359
except CancelledError as exc:
9460
raise exc
95-
9661
except CatchableError as exc:
97-
debug "Internal error while processing JSON-RPC call", msg=exc.msg
98-
99-
proc handleRequest(rpc: RpcWebSocketServer, request: HttpRequest)
100-
{.async: (raises: [CancelledError]).} =
62+
debug "Internal error while processing JSON-RPC call", msg = exc.msg
63+
return
64+
65+
trace "Websocket handshake completed"
66+
let c = RpcWebSocketClient(
67+
transport: ws,
68+
remote: $request.uri,
69+
maxMessageSize: rpc.maxMessageSize,
70+
router: proc(request: RequestBatchRx): Future[string] {.async: (raises: [], raw: true).} =
71+
rpc.router.route(request),
72+
)
73+
rpc.connections.incl(c)
74+
75+
await c.processMessages()
76+
77+
rpc.connections.excl(c)
78+
79+
proc handleRequest(
80+
rpc: RpcWebSocketServer, request: HttpRequest
81+
) {.async: (raises: [CancelledError]).} =
10182
trace "Handling request:", uri = $request.uri
10283

10384
# if hook result is false,

0 commit comments

Comments
 (0)