Skip to content

Commit f1fcc75

Browse files
committed
Switch cl-el comms to RpcChannel
Instead of using a socket (and having to open a fake connection between EL and CL), this pr switches to status-im/nim-json-rpc#254 for internal communication. Eventually, one could make this more efficient by skipping the JSON step, but like this, we at least no longer have to open an Engine API port which makes this setup more secure and easy to deploy (fewer open ports). There's also fewer potential errors to contend with and payloads don't have to travel across the OS buffers and instead stay internal to the process.
1 parent a4e9ac3 commit f1fcc75

File tree

9 files changed

+55
-52
lines changed

9 files changed

+55
-52
lines changed

.gitmodules

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,7 @@
238238
path = vendor/nim-quic
239239
url = https://github.com/vacp2p/nim-quic
240240
branch = main
241+
[submodule "vendor/nim-async-channels"]
242+
path = vendor/nim-async-channels
243+
url = https://github.com/status-im/nim-async-channels
244+
branch = master

execution_chain/conf.nim

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,12 @@ type
447447
defaultValue: false
448448
name: "engine-api" .}: bool
449449

450+
engineApiChannelEnabled* {.
451+
hidden
452+
desc: "Enable the Engine API Channel"
453+
defaultValue: false
454+
name: "debug-engine-api-channel" .}: bool
455+
450456
engineApiPort* {.
451457
desc: "Listening port for the Engine API(http and ws)"
452458
defaultValue: defaultEngineApiPort
@@ -755,7 +761,7 @@ func getAllowedOrigins*(config: ExecutionClientConf): seq[Uri] =
755761
result.add parseUri(item)
756762

757763
func engineApiServerEnabled*(config: ExecutionClientConf): bool =
758-
config.engineApiEnabled or config.engineApiWsEnabled
764+
config.engineApiEnabled or config.engineApiWsEnabled or config.engineApiChannelEnabled
759765

760766
func shareServerWithEngineApi*(config: ExecutionClientConf): bool =
761767
config.engineApiServerEnabled and

execution_chain/el_sync.nim

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ import
1818
web3/[engine_api, primitives, conversions],
1919
beacon_chain/consensus_object_pools/blockchain_dag,
2020
beacon_chain/el/[el_manager, engine_api_conversions],
21-
beacon_chain/spec/[forks, presets, state_transition_block]
21+
beacon_chain/spec/[forks, presets, state_transition_block],
22+
json_rpc/client
2223

2324
logScope:
2425
topics = "elsync"
@@ -87,24 +88,15 @@ proc findSlot(
8788

8889
Opt.some importedSlot
8990

90-
proc syncToEngineApi*(dag: ChainDAGRef, url: EngineApiUrl) {.async.} =
91+
proc syncToEngineApi*(dag: ChainDAGRef, rpcClient: RpcClient) {.async.} =
9192
# Takes blocks from the CL and sends them to the EL - the attempt is made
9293
# optimistically until something unexpected happens (reorg etc) at which point
9394
# the process ends
9495

9596
let
9697
# Create the client for the engine api
97-
# And exchange the capabilities for a test communication
98-
web3 = await url.newWeb3()
99-
rpcClient = web3.provider
10098
(lastEra1Block, firstSlotAfterMerge) = dag.cfg.loadNetworkConfig()
10199

102-
defer:
103-
try:
104-
await web3.close()
105-
except:
106-
discard
107-
108100
# Load the EL state detials and create the beaconAPI client
109101
var elBlockNumber = uint64(await rpcClient.eth_blockNumber())
110102

execution_chain/nimbus.nim

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ proc workaround*(): int {.exportc.} =
1616
return int(Future[Quantity]().internalValue)
1717

1818
import
19-
std/[os, net, options, strformat, terminal, typetraits],
19+
std/[os, net, options, terminal, typetraits],
2020
stew/io2,
2121
chronos/threadsync,
2222
chronicles,
2323
metrics,
2424
metrics/chronos_httpserver,
25-
nimcrypto/sysrand,
2625
eth/enr/enr,
2726
eth/net/nat,
27+
json_rpc/rpcchannels,
2828
eth/p2p/discoveryv5/random2,
2929
beacon_chain/spec/[engine_authentication],
3030
beacon_chain/validators/keystore_management,
@@ -36,7 +36,6 @@ import
3636
nimbus_binary_common,
3737
process_state,
3838
],
39-
./rpc/jwt_auth,
4039
./[
4140
constants,
4241
conf as ecconf,
@@ -170,13 +169,13 @@ type
170169
tcpPort: Port
171170
udpPort: Port
172171
elSync: bool
172+
channel: RpcChannelPtrs
173173

174174
ExecutionThreadConfig = object
175175
tsp: ThreadSignalPtr
176176
tcpPort: Port
177177
udpPort: Option[Port]
178-
179-
var jwtKey: JwtSharedKey
178+
channel: RpcChannelPtrs
180179

181180
proc dataDir*(config: NimbusConf): string =
182181
string config.dataDirFlag.get(
@@ -190,14 +189,14 @@ proc justWait(tsp: ThreadSignalPtr) {.async: (raises: [CancelledError]).} =
190189
notice "Waiting failed", err = exc.msg
191190

192191
proc elSyncLoop(
193-
dag: ChainDAGRef, url: EngineApiUrl
192+
dag: ChainDAGRef, elManager: ELManager
194193
) {.async: (raises: [CancelledError]).} =
195194
while true:
196195
await sleepAsync(12.seconds)
197196

198197
# TODO trigger only when the EL needs syncing
199198
try:
200-
await syncToEngineApi(dag, url)
199+
await syncToEngineApi(dag, elManager.channel())
201200
except CatchableError as exc:
202201
# This can happen when the EL is busy doing some work, specially on
203202
# startup
@@ -208,17 +207,8 @@ proc runBeaconNode(p: BeaconThreadConfig) {.thread.} =
208207
stderr.writeLine error # Logging not yet set up
209208
quit QuitFailure
210209

211-
let engineUrl = EngineApiUrl.init(
212-
&"http://127.0.0.1:{defaultEngineApiPort}/", Opt.some(@(distinctBase(jwtKey)))
213-
)
214-
215210
config.metricsEnabled = false
216-
config.elUrls =
217-
@[
218-
EngineApiUrlConfigValue(
219-
url: engineUrl.url, jwtSecret: some toHex(distinctBase(jwtKey))
220-
)
221-
]
211+
config.elUrls = @[EngineApiUrlConfigValue(channel: Opt.some(p.channel))]
222212
config.statusBarEnabled = false # Multi-threading issues due to logging
223213
config.tcpPort = p.tcpPort
224214
config.udpPort = p.udpPort
@@ -259,11 +249,8 @@ proc runBeaconNode(p: BeaconThreadConfig) {.thread.} =
259249
proc runExecutionClient(p: ExecutionThreadConfig) {.thread.} =
260250
var config = makeConfig(ignoreUnknown = true)
261251
config.metricsEnabled = false
262-
config.engineApiEnabled = true
263-
config.engineApiPort = Port(defaultEngineApiPort)
264-
config.engineApiAddress = defaultAdminListenAddress
265-
config.jwtSecret.reset()
266-
config.jwtSecretValue = some toHex(distinctBase(jwtKey))
252+
config.engineApiEnabled = false
253+
config.engineApiChannelEnabled = true
267254
config.agentString = "nimbus"
268255
config.tcpPort = p.tcpPort
269256
config.udpPortFlag = p.udpPort
@@ -277,16 +264,14 @@ proc runExecutionClient(p: ExecutionThreadConfig) {.thread.} =
277264
com = setupCommonRef(config, taskpool)
278265

279266
dynamicLogScope(comp = "ec"):
280-
nimbus_execution_client.runExeClient(config, com, p.tsp.justWait())
267+
nimbus_execution_client.runExeClient(
268+
config, com, p.tsp.justWait(), channel = Opt.some p.channel
269+
)
281270

282271
# Stop the other thread as well, in case `runExeClient` stopped early
283272
waitFor p.tsp.fire()
284273

285274
proc runCombinedClient() =
286-
# Make it harder to connect to the (internal) engine - this will of course
287-
# go away
288-
discard randomBytes(distinctBase(jwtKey))
289-
290275
const banner = "Nimbus v0.0.1"
291276

292277
var config = NimbusConf.loadWithBanners(banner, copyright, [specBanner], true).valueOr:
@@ -325,6 +310,9 @@ proc runCombinedClient() =
325310
"Baked-in KZG setup is correct"
326311
)
327312

313+
var channel: RpcChannel
314+
let pairs = channel.open().expect("working channel")
315+
328316
var bnThread: Thread[BeaconThreadConfig]
329317
let bnStop = ThreadSignalPtr.new().expect("working ThreadSignalPtr")
330318
createThread(
@@ -335,6 +323,7 @@ proc runCombinedClient() =
335323
tcpPort: config.beaconTcpPort.get(config.tcpPort.get(Port defaultEth2TcpPort)),
336324
udpPort: config.beaconUdpPort.get(config.udpPort.get(Port defaultEth2TcpPort)),
337325
elSync: config.elSync,
326+
channel: pairs.client,
338327
),
339328
)
340329

@@ -357,6 +346,7 @@ proc runCombinedClient() =
357346
some(Port(uint16(config.udpPort.get()) + 1))
358347
else:
359348
none(Port),
349+
channel: pairs.server,
360350
),
361351
)
362352

execution_chain/nimbus_desc.nim

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import
2121
./sync/beacon as beacon_sync,
2222
./sync/wire_protocol,
2323
./beacon/beacon_engine,
24-
./common
24+
./common,
25+
json_rpc/rpcchannels
2526

2627
when enabledLogLevel == TRACE:
2728
import std/sequtils
@@ -42,6 +43,7 @@ type
4243
NimbusNode* = ref object
4344
httpServer*: NimbusHttpServerRef
4445
engineApiServer*: NimbusHttpServerRef
46+
engineApiChannel*: RpcChannelServer
4547
ethNode*: EthereumNode
4648
fc*: ForkedChainRef
4749
txPool*: TxPoolRef

execution_chain/nimbus_execution_client.nim

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,24 +185,24 @@ proc setupP2P(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef) =
185185
if not syncerShouldRun:
186186
nimbus.beaconSyncRef = BeaconSyncRef(nil)
187187

188-
proc init*(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef) =
188+
proc init*(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef, channel: Opt[RpcChannelPtrs]) =
189189
nimbus.accountsManager = new AccountsManager
190190
nimbus.rng = newRng()
191191

192192
basicServices(nimbus, config, com)
193193
manageAccounts(nimbus, config)
194194
setupP2P(nimbus, config, com)
195-
setupRpc(nimbus, config, com)
195+
setupRpc(nimbus, config, com, channel)
196196

197197
# Not starting syncer if there is definitely no way to run it. This
198198
# avoids polling (i.e. waiting for instructions) and some logging.
199199
if not nimbus.beaconSyncRef.isNil and
200200
not nimbus.beaconSyncRef.start():
201201
nimbus.beaconSyncRef = BeaconSyncRef(nil)
202202

203-
proc init*(T: type NimbusNode, config: ExecutionClientConf, com: CommonRef): T =
203+
proc init*(T: type NimbusNode, config: ExecutionClientConf, com: CommonRef, channel: Opt[RpcChannelPtrs]): T =
204204
let nimbus = T()
205-
nimbus.init(config, com)
205+
nimbus.init(config, com, channel)
206206
nimbus
207207

208208
proc preventLoadingDataDirForTheWrongNetwork(db: CoreDbRef; config: ExecutionClientConf) =
@@ -274,16 +274,17 @@ proc runExeClient*(
274274
com: CommonRef,
275275
stopper: StopFuture,
276276
nimbus = NimbusNode(nil),
277+
channel = Opt.none(RpcChannelPtrs),
277278
) =
278279
## Launches and runs the execution client for pre-configured `nimbus` and
279280
## `conf` argument descriptors.
280281
##
281282

282283
var nimbus = nimbus
283284
if nimbus.isNil:
284-
nimbus = NimbusNode.init(config, com)
285+
nimbus = NimbusNode.init(config, com, channel)
285286
else:
286-
nimbus.init(config, com)
287+
nimbus.init(config, com, channel)
287288

288289
defer:
289290
let

execution_chain/rpc.nim

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import
1313
chronicles,
1414
websock/websock,
15-
json_rpc/rpcserver,
15+
json_rpc/[rpcserver, rpcchannels],
1616
./rpc/[common, cors, debug, engine_api, jwt_auth, rpc_server, server_api],
1717
./[conf, nimbus_desc]
1818

@@ -23,7 +23,8 @@ export
2323
jwt_auth,
2424
cors,
2525
rpc_server,
26-
server_api
26+
server_api,
27+
rpcchannels
2728

2829
const DefaultChunkSize = 1024*1024
2930

@@ -53,7 +54,6 @@ func installRPC(server: RpcServer,
5354
if RpcFlag.Debug in flags:
5455
setupDebugRpc(com, nimbus.txPool, server)
5556

56-
5757
proc newRpcWebsocketHandler(): RpcWebSocketHandler =
5858
let rng = HmacDrbgContext.new()
5959
RpcWebSocketHandler(
@@ -198,8 +198,8 @@ proc addServices(handlers: var seq[RpcHandlerProc],
198198
handlers.addHandler(server)
199199

200200
proc setupRpc*(nimbus: NimbusNode, config: ExecutionClientConf,
201-
com: CommonRef) =
202-
if not config.engineApiEnabled:
201+
com: CommonRef, channel: Opt[RpcChannelPtrs]) =
202+
if not config.engineApiEnabled and channel.isNone():
203203
warn "Engine API disabled, the node will not respond to consensus client updates (enable with `--engine-api`)"
204204

205205
if not config.serverEnabled:
@@ -257,3 +257,10 @@ proc setupRpc*(nimbus: NimbusNode, config: ExecutionClientConf,
257257
quit(QuitFailure)
258258
nimbus.engineApiServer = res.get
259259
nimbus.engineApiServer.start()
260+
261+
if channel.isSome():
262+
nimbus.engineApiChannel = RpcChannelServer.new(channel[])
263+
264+
setupEngineAPI(nimbus.beaconEngine, nimbus.engineApiChannel)
265+
installRPC(nimbus.engineApiChannel, nimbus, config, com, serverApi, {RpcFlag.Eth})
266+
nimbus.engineApiChannel.start()

vendor/nim-async-channels

Submodule nim-async-channels added at 98602ae

0 commit comments

Comments
 (0)