Skip to content

Commit 998124d

Browse files
committed
Cross-thread RPC via async channel
RpcChannel allows using a pair of [async channels](https://github.com/status-im/nim-async-channels) to communicate between threads as if they were connected over a socket or other transport. This setup avoids having to copy the data onto the OS buffer and instead uses a ThreadSignalPtr to wake the target thread. While it is not the most performant way to make cross-thread calls, it is quite convenient for services that already expose a JSON-RPC server - the choice of an internal channel then becomes a simple configuration option where the only thing that changes is the setup.
1 parent e78d257 commit 998124d

File tree

4 files changed

+216
-1
lines changed

4 files changed

+216
-1
lines changed

json_rpc.nimble

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ requires "nim >= 1.6.0",
2727
"websock >= 0.2.1 & < 0.3.0",
2828
"serialization >= 0.4.4",
2929
"json_serialization >= 0.4.2",
30+
"https://github.com/status-im/nim-async-channels",
3031
"unittest2"
3132

3233
let nimc = getEnv("NIMC", "nim") # Which nim compiler to use

json_rpc/rpcchannels.nim

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
## This module provides a lightweight, thread‑safe JSON‑RPC channel that can be
2+
## used to connect a client and a server running in different threads, reusing
3+
## existing JSON-RPC infrastructure already present in the application.
4+
5+
{.push raises: [], gcsafe.}
6+
7+
import ./[client, errors, router, server], asyncchannels, ./private/jrpc_sys
8+
export client, errors, server
9+
10+
# --------------------------------------------------------------------------- #
11+
# Types
12+
# --------------------------------------------------------------------------- #
13+
14+
type
15+
RpcChannel* = object
16+
## An RPC channel represents a thread‑safe, bidirectional communications
17+
## channel from which a single "server" and a single "client" can be formed.
18+
##
19+
## The channel can be allocated in any thread while the server and client
20+
## instances should be created in the thread where they will be used,
21+
## passing to them the `RpcChannelPtrs` instance returned from `open`.
22+
recv, send: AsyncChannel[seq[byte]]
23+
24+
RpcChannelPtrs* = object ## Raw pointer pair that can be moved to another thread.
25+
recv, send: ptr AsyncChannel[seq[byte]]
26+
# The `recv` pointer is the channel that receives data, the `send` pointer
27+
# is the channel that sends data. The two pointers are swapped when
28+
# the channel is handed to the opposite side.
29+
30+
RpcChannelClient* = ref object of RpcConnection
31+
channel: RpcChannelPtrs
32+
loop: Future[void]
33+
34+
RpcChannelServer* = ref object of RpcServer
35+
client: RpcChannelClient
36+
37+
# --------------------------------------------------------------------------- #
38+
# Public procedures
39+
# --------------------------------------------------------------------------- #
40+
41+
proc open*(c: var RpcChannel): Result[RpcChannelPtrs, string] =
42+
## Open the channel, returning a channel pair that can be passed to the
43+
## server and client threads respectively.
44+
##
45+
## Only one server and client instance each may use the returned channel
46+
## pairs. The returned `RpcChannelPtrs` are raw pointers that must be
47+
## moved to the thread that will own the client or server.
48+
?c.recv.open()
49+
50+
c.send.open().isOkOr:
51+
c.recv.close()
52+
return err(error)
53+
54+
ok (RpcChannelPtrs(recv: addr c.recv, send: addr c.send))
55+
56+
proc close*(c: var RpcChannel) =
57+
c.recv.close()
58+
c.recv.reset()
59+
c.send.close()
60+
c.send.reset()
61+
62+
proc new*(
63+
T: type RpcChannelClient, channel: RpcChannelPtrs, router = default(ref RpcRouter)
64+
): T =
65+
## Create a new `RpcChannelClient` that will use the supplied `channel`.
66+
## If a `router` is supplied, it will be used to route incoming requests.
67+
## The returned client is ready to be connected with `connect`.
68+
let router =
69+
if router != nil:
70+
proc(
71+
request: RequestBatchRx
72+
): Future[seq[byte]] {.async: (raises: [], raw: true).} =
73+
router[].route(request)
74+
else:
75+
nil
76+
77+
T(channel: channel, router: router, remote: "client")
78+
79+
proc newRpcChannelClient*(
80+
channel: RpcChannelPtrs, router = default(ref RpcRouter)
81+
): RpcChannelClient =
82+
## Convenience wrapper that creates a new `RpcChannelClient` from a
83+
## `RpcChannelPtrs` pair. The client can be used immediately or after
84+
## calling `connect`.
85+
RpcChannelClient.new(channel, router)
86+
87+
method send*(
88+
client: RpcChannelClient, reqData: seq[byte]
89+
) {.async: (raises: [CancelledError, JsonRpcError]).} =
90+
## Send a raw JSON‑RPC request to the remote side.
91+
## The data is written synchronously to the underlying channel.
92+
client.channel.send[].sendSync(reqData)
93+
94+
method request*(
95+
client: RpcChannelClient, reqData: seq[byte]
96+
): Future[seq[byte]] {.async: (raises: [CancelledError, JsonRpcError]).} =
97+
## Send a request and wait for the corresponding response.
98+
## The request is sent synchronously and the future returned by
99+
## `client.processMessage` is awaited.
100+
client.withPendingFut(fut):
101+
client.channel.send[].sendSync(reqData)
102+
103+
await fut
104+
105+
proc processData(client: RpcChannelClient) {.async: (raises: []).} =
106+
## Internal loop that receives data from the channel, processes it
107+
## with `client.processMessage`, and sends back any response.
108+
## The loop terminates when the channel is closed or a
109+
## `CancelledError` is raised.
110+
var lastError: ref JsonRpcError
111+
try:
112+
while true:
113+
let
114+
data = await client.channel.recv.recv()
115+
resp = await client.processMessage(data)
116+
117+
if resp.len > 0:
118+
client.channel.send[].sendSync(resp)
119+
except CancelledError:
120+
discard # shutting down
121+
122+
if lastError == nil:
123+
lastError = (ref RpcTransportError)(msg: "Connection closed")
124+
125+
client.clearPending(lastError)
126+
127+
if not client.onDisconnect.isNil:
128+
client.onDisconnect()
129+
130+
proc connect*(
131+
client: RpcChannelClient
132+
) {.async: (raises: [CancelledError, JsonRpcError]).} =
133+
## Start the client's background processing loop.
134+
## After calling this, the client is ready to send requests.
135+
doAssert client.loop == nil, "Must not already be connected"
136+
client.loop = client.processData()
137+
138+
method close*(client: RpcChannelClient) {.async: (raises: []).} =
139+
## Gracefully shut down the client.
140+
## Cancels the background loop and waits for it to finish.
141+
if client.loop != nil:
142+
let loop = move(client.loop)
143+
await loop.cancelAndWait()
144+
145+
proc new*(T: type RpcChannelServer, channel: RpcChannelPtrs): T =
146+
## Create a new `RpcChannelServer` that will listen on the supplied
147+
## `channel`. The server owns a fresh `RpcRouter` instance.
148+
let
149+
res = T(router: RpcRouter.init())
150+
# Compared to the client, swap the channels in the server
151+
channel = RpcChannelPtrs(recv: channel.send, send: channel.recv)
152+
router = proc(
153+
request: RequestBatchRx
154+
): Future[seq[byte]] {.async: (raises: [], raw: true).} =
155+
res[].router.route(request)
156+
157+
client = RpcChannelClient(channel: channel, router: router, remote: "server")
158+
159+
res.client = client
160+
res
161+
162+
proc start*(server: RpcChannelServer) =
163+
## Start the RPC server.
164+
## The server's background loop is started and the client is ready to
165+
## receive requests.
166+
167+
# `connect` for a thread channel is actually synchronous and cannot fail so
168+
# we can ignore the future being returned
169+
discard server.client.connect()
170+
server.connections.incl server.client
171+
172+
proc stop*(server: RpcChannelServer) =
173+
discard
174+
175+
proc closeWait*(server: RpcChannelServer) {.async: (raises: []).} =
176+
## Gracefully shut down the server.
177+
server.connections.excl server.client
178+
await server.client.close()
179+

tests/all.nim

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ import
2121
test_router_rpc,
2222
test_callsigs,
2323
test_client_hook,
24-
test_batch_call
24+
test_batch_call,
25+
test_rpcchannels

tests/testrpcchannels.nim

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# json-rpc
2+
# Copyright (c) 2019-2023 Status Research & Development GmbH
3+
# Licensed under either of
4+
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
5+
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
6+
# at your option.
7+
# This file may not be copied, modified, or distributed except according to
8+
# those terms.
9+
10+
import chronos/unittest2/asynctests, ../json_rpc/rpcchannels, ./private/helpers
11+
12+
# Create RPC on server
13+
proc setupServer*(srv: RpcServer) =
14+
srv.rpc("myProc") do(input: string, data: array[0 .. 3, int]):
15+
return %("Hello " & input & " data: " & $data)
16+
17+
proc serverThread(chan: RpcChannelPtrs) {.thread.} =
18+
var srv = RpcChannelServer.new(chan)
19+
setupServer(srv)
20+
srv.start()
21+
waitFor sleepAsync(1.seconds)
22+
waitFor srv.closeWait()
23+
24+
suite "Thread channel RPC":
25+
asyncTest "Successful RPC call":
26+
var chan: RpcChannel
27+
var ptrs = chan.open().expect("")
28+
var server: Thread[RpcChannelPtrs]
29+
var client = newRpcChannelClient(ptrs)
30+
31+
createThread(server, serverThread, ptrs)
32+
waitFor client.connect()
33+
let r = waitFor client.call("myProc", %[%"abc", %[1, 2, 3, 4]])
34+
check r.string == "\"Hello abc data: [1, 2, 3, 4]\""

0 commit comments

Comments
 (0)