Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions execution_chain/common/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import
../db/[core_db, ledger, storage_types, fcu_db],
../utils/[utils],
".."/[constants, errors, version_info],
"."/[chain_config, evmforks, genesis, hardforks],
taskpools
"."/[chain_config, evmforks, genesis, hardforks]

export
chain_config,
Expand All @@ -27,9 +26,12 @@ export
evmforks,
genesis,
hardforks,
taskpools,
utils

when compileOption("threads"):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tersec Thoughts on using this switch? Is this the recommended way to check if threads are enabled at compile time?

import taskpools
export taskpools

type
HeaderChainUpdateCB* = proc(hdr: Header; fin: Hash32) {.gcsafe, raises: [].}
## Inform `CL` sub-module `header_chain_cache` about new head.
Expand Down Expand Up @@ -87,8 +89,9 @@ type
gasLimit: uint64
## Desired gas limit when building a block

taskpool*: Taskpool
## Shared task pool for offloading computation to other threads
when compileOption("threads"):
taskpool*: Taskpool
## Shared task pool for offloading computation to other threads

statelessProviderEnabled*: bool
## Enable the stateless provider. This turns on the features required
Expand Down Expand Up @@ -165,7 +168,6 @@ proc initializeDb(com: CommonRef) =

proc init(com : CommonRef,
db : CoreDbRef,
taskpool : Taskpool,
networkId : NetworkId,
config : ChainConfig,
genesis : Genesis,
Expand All @@ -181,7 +183,6 @@ proc init(com : CommonRef,
com.forkTransitionTable = config.toForkTransitionTable()
com.networkId = networkId
com.extraData = ShortClientId
com.taskpool = taskpool
com.gasLimit = DEFAULT_GAS_LIMIT

# com.forkIdCalculator and com.genesisHash are set
Expand Down Expand Up @@ -230,7 +231,6 @@ proc isBlockAfterTtd(com: CommonRef, header: Header, txFrame: CoreDbTxRef): bool
proc new*(
_: type CommonRef;
db: CoreDbRef;
taskpool: Taskpool;
networkId: NetworkId = MainNet;
params = networkParams(MainNet);
initializeDb = true;
Expand All @@ -243,7 +243,6 @@ proc new*(
new(result)
result.init(
db,
taskpool,
networkId,
params.config,
params.genesis,
Expand All @@ -254,7 +253,6 @@ proc new*(
proc new*(
_: type CommonRef;
db: CoreDbRef;
taskpool: Taskpool;
config: ChainConfig;
networkId: NetworkId = MainNet;
initializeDb = true;
Expand All @@ -267,7 +265,6 @@ proc new*(
new(result)
result.init(
db,
taskpool,
networkId,
config,
nil,
Expand Down
3 changes: 1 addition & 2 deletions execution_chain/core/chain/forked_chain/chain_private.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ proc processBlock*(c: ForkedChainRef,
skipValidation = false,
skipReceipts = false,
skipUncles = true,
skipStateRootCheck = finalized and not c.eagerStateRoot,
taskpool = c.com.taskpool,
skipStateRootCheck = finalized and not c.eagerStateRoot
)

if not vmState.com.statelessProviderEnabled:
Expand Down
1 change: 0 additions & 1 deletion execution_chain/core/chain/persist_blocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ proc persistBlock*(p: var Persister, blk: Block): Result[void, string] =
skipReceipts = skipValidation and PersistReceipts notin p.flags,
skipUncles = PersistUncles notin p.flags,
skipStateRootCheck = skipValidation,
taskpool = com.taskpool,
)

if not vmState.com.statelessProviderEnabled:
Expand Down
44 changes: 26 additions & 18 deletions execution_chain/core/executor/process_block.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,12 @@ import
./process_transaction,
eth/common/[keys, transaction_utils],
chronicles,
results,
taskpools

template withSender(txs: openArray[Transaction], body: untyped) =
# Execute transactions offloading the signature checking to the task pool if
# it's available
if taskpool == nil:
for txIndex {.inject.}, tx {.inject.} in txs:
let sender {.inject.} = tx.recoverSender().valueOr(default(Address))
body
else:
results

when compileOption("threads"):
import taskpools

template withSenderParallel(txs: openArray[Transaction], body: untyped, taskpool: Taskpool) =
type Entry = (Signature, Hash32, Flowvar[Address])

proc recoverTask(e: ptr Entry): Address {.nimcall.} =
Expand Down Expand Up @@ -69,21 +64,36 @@ template withSender(txs: openArray[Transaction], body: untyped) =

body

template withSenderSerial(txs: openArray[Transaction], body: untyped) =
for txIndex {.inject.}, tx {.inject.} in txs:
let sender {.inject.} = tx.recoverSender().valueOr(default(Address))
body

template withSender(vmState: BaseVMState, txs: openArray[Transaction], body: untyped) =
when compileOption("threads"):
# Execute transactions offloading the signature checking to the task pool if
# it's available
if vmState.com.taskpool == nil:
withSenderSerial(txs, body)
else:
withSenderParallel(txs, body, vmState.com.taskpool)
else:
withSenderSerial(txs, body)

# Factored this out of procBlkPreamble so that it can be used directly for
# stateless execution of specific transactions.
proc processTransactions*(
vmState: BaseVMState,
header: Header,
transactions: seq[Transaction],
skipReceipts = false,
collectLogs = false,
taskpool: Taskpool = nil,
collectLogs = false
): Result[void, string] =
vmState.receipts.setLen(if skipReceipts: 0 else: transactions.len)
vmState.cumulativeGasUsed = 0
vmState.allLogs = @[]

withSender(transactions):
vmState.withSender(transactions):
if sender == default(Address):
return err("Could not get sender for tx with index " & $(txIndex))

Expand All @@ -105,7 +115,6 @@ proc procBlkPreamble(
vmState: BaseVMState,
blk: Block,
skipValidation, skipReceipts, skipUncles: bool,
taskpool: Taskpool,
): Result[void, string] =
template header(): Header =
blk.header
Expand Down Expand Up @@ -161,7 +170,7 @@ proc procBlkPreamble(

let collectLogs = header.requestsHash.isSome and not skipValidation
?processTransactions(
vmState, header, blk.transactions, skipReceipts, collectLogs, taskpool
vmState, header, blk.transactions, skipReceipts, collectLogs
)
elif blk.transactions.len > 0:
return err("Transactions in block with empty txRoot")
Expand Down Expand Up @@ -296,10 +305,9 @@ proc processBlock*(
skipReceipts: bool = false,
skipUncles: bool = false,
skipStateRootCheck: bool = false,
taskpool: Taskpool = nil,
): Result[void, string] =
## Generalised function to processes `blk` for any network.
?vmState.procBlkPreamble(blk, skipValidation, skipReceipts, skipUncles, taskpool)
?vmState.procBlkPreamble(blk, skipValidation, skipReceipts, skipUncles)

# EIP-3675: no reward for miner in POA/POS
if not vmState.com.proofOfStake(blk.header, vmState.ledger.txFrame):
Expand Down
1 change: 0 additions & 1 deletion execution_chain/evm/async_evm.nim
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ proc init*(
): T =
let com = CommonRef.new(
DefaultDbMemory.newCoreDbRef(),
taskpool = nil,
config = chainConfigForNetwork(networkId),
initializeDb = false,
statelessProviderEnabled = true, # Enables collection of witness keys
Expand Down
14 changes: 9 additions & 5 deletions execution_chain/nimbus.nim
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,15 @@ proc runExecutionClient(p: ExecutionThreadConfig) {.thread.} =

info "Launching execution client", version = FullVersionStr, config

let
# TODO https://github.com/status-im/nim-taskpools/issues/6
# share taskpool between bn and ec
taskpool = setupTaskpool(int config.numThreads)
com = setupCommonRef(config, taskpool)
when compileOption("threads"):
let
# TODO https://github.com/status-im/nim-taskpools/issues/6
# share taskpool between bn and ec
taskpool = setupTaskpool(int config.numThreads)
com = setupCommonRef(config)
com.taskpool = taskpool
else:
let com = setupCommonRef(config)

dynamicLogScope(comp = "ec"):
nimbus_execution_client.runExeClient(config, com, p.tsp.justWait())
Expand Down
15 changes: 10 additions & 5 deletions execution_chain/nimbus_execution_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ proc preventLoadingDataDirForTheWrongNetwork(db: CoreDbRef; config: ExecutionCli
expected=calculatedId
quit(QuitFailure)

proc setupCommonRef*(config: ExecutionClientConf, taskpool: Taskpool): CommonRef =

proc setupCommonRef*(config: ExecutionClientConf): CommonRef =
let coreDB = AristoDbRocks.newCoreDbRef(
config.dataDir,
config.dbOptions(noKeyCache = config.cmd == NimbusCmd.`import`))
Expand All @@ -240,7 +241,6 @@ proc setupCommonRef*(config: ExecutionClientConf, taskpool: Taskpool): CommonRef

let com = CommonRef.new(
db = coreDB,
taskpool = taskpool,
networkId = config.networkId,
params = config.networkParams,
statelessProviderEnabled = config.statelessProviderEnabled,
Expand Down Expand Up @@ -347,9 +347,14 @@ proc main*(config = makeConfig(), nimbus = NimbusNode(nil)) {.noinline.} =
if metricsServer.isSome():
waitFor metricsServer.stopMetricsServer()

let
taskpool = setupTaskpool(config.numThreads)
com = setupCommonRef(config, taskpool)
when compileOption("threads"):
let
taskpool = setupTaskpool(config.numThreads)
com = setupCommonRef(config)
com.taskpool = taskpool
else:
let com = setupCommonRef(config)

defer:
com.db.finish()

Expand Down
5 changes: 2 additions & 3 deletions execution_chain/stateless/stateless_execution.nim
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ proc statelessProcessBlock*(
skipValidation = false,
skipReceipts = false,
skipUncles = true,
skipStateRootCheck = false,
taskpool = com.taskpool,
skipStateRootCheck = false
)
doAssert memoryVmState.ledger.getStateRoot() == blk.header.stateRoot

Expand All @@ -83,7 +82,7 @@ proc statelessProcessBlock*(
witness: ExecutionWitness, id: NetworkId, config: ChainConfig, blk: Block
): Result[void, string] =
let com = CommonRef.new(
db = nil, taskpool = nil, config = config, networkId = id, initializeDb = false
db = nil, config = config, networkId = id, initializeDb = false
)
statelessProcessBlock(witness, com, blk)

Expand Down
1 change: 0 additions & 1 deletion nimbus_verified_proxy/engine/rpc_frontend.nim
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ proc registerDefaultFrontend*(engine: RpcVerificationEngine) =
.} =
let com = CommonRef.new(
DefaultDbMemory.newCoreDbRef(),
taskpool = nil,
config = chainConfigForNetwork(engine.chainId),
initializeDb = false,
statelessProviderEnabled = true, # Enables collection of witness keys
Expand Down
2 changes: 1 addition & 1 deletion tests/eest/eest_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ proc prepareEnv*(
var testEnv = TestEnv()

let
com = CommonRef.new(memDB, nil, config,
com = CommonRef.new(memDB, config,
statelessProviderEnabled = statelessEnabled,
statelessWitnessValidation = statelessEnabled)
chain = ForkedChainRef.init(com, enableQueue = true, persistBatchSize = 1)
Expand Down
2 changes: 1 addition & 1 deletion tests/macro_assembler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ proc initVMEnv*(network: string): BaseVMState =
cdb = DefaultDbMemory.newCoreDbRef()
com = CommonRef.new(
cdb,
nil, conf,
conf,
conf.chainId.NetworkId)
parent = Header(stateRoot: EMPTY_ROOT_HASH)
parentHash = computeRlpHash(parent)
Expand Down
5 changes: 3 additions & 2 deletions tests/networking/p2p_test_helper.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ const
genesisFile = "tests/customgenesis/cancun123.json"

proc makeCom(config: ExecutionClientConf): CommonRef =
CommonRef.new(
let com = CommonRef.new(
newCoreDbRef DefaultDbMemory,
Taskpool.new(),
config.networkId,
config.networkParams
)
com.taskpool = Taskpool.new()
com

proc envConfig(): ExecutionClientConf =
makeConfig(@[
Expand Down
1 change: 0 additions & 1 deletion tests/test_coredb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ proc initRunnerDB(

result = CommonRef.new(
db = coreDB,
taskpool = nil,
networkId = networkId,
params = params)

Expand Down
1 change: 0 additions & 1 deletion tests/test_engine_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ proc setupConfig(genesisFile: string): ExecutionClientConf =
proc setupCom(config: ExecutionClientConf): CommonRef =
CommonRef.new(
newCoreDbRef DefaultDbMemory,
nil,
config.networkId,
config.networkParams
)
Expand Down
1 change: 0 additions & 1 deletion tests/test_evm_support.nim
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ proc runTestOverflow() =

let com = CommonRef.new(
newCoreDbRef(DefaultDbMemory),
nil,
config = chainConfigForNetwork(MainNet)
)

Expand Down
4 changes: 1 addition & 3 deletions tests/test_forked_chain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,13 @@ proc setupEnv(): TestEnv =
proc newCom(env: TestEnv): CommonRef =
CommonRef.new(
newCoreDbRef DefaultDbMemory,
nil,
env.config.networkId,
env.config.networkParams
)

proc newCom(env: TestEnv, db: CoreDbRef): CommonRef =
CommonRef.new(
db,
nil,
env.config.networkId,
env.config.networkParams
)
Expand Down Expand Up @@ -791,7 +789,7 @@ procSuite "ForkedChain mainnet replay":
setup:
let
era0 = Era1DbRef.init(sourcePath / "replay", "mainnet", 15537394'u64).expect("Era files present")
com = CommonRef.new(AristoDbMemory.newCoreDbRef(), nil)
com = CommonRef.new(AristoDbMemory.newCoreDbRef())
fc = ForkedChainRef.init(com, enableQueue = true)

asyncTest "Replay mainnet era, single FCU":
Expand Down
4 changes: 2 additions & 2 deletions tests/test_forkid.nim
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ template runComputeForkIdTest(network: untyped, name: string) =
test name & " Compute ForkId test":
var
params = networkParams(network)
com = CommonRef.new(newCoreDbRef DefaultDbMemory, nil, network, params)
com = CommonRef.new(newCoreDbRef DefaultDbMemory, network, params)

for x in `network IDs`:
let computedId = com.forkId(x.number, x.time)
Expand Down Expand Up @@ -328,7 +328,7 @@ proc runCompatibleForkIdTest() =
for testcase in ValidationTests:
var
params = networkParams(testcase.config)
com = CommonRef.new(newCoreDbRef DefaultDbMemory, nil, testcase.config, params)
com = CommonRef.new(newCoreDbRef DefaultDbMemory, testcase.config, params)

let fid = ForkId(hash: testcase.id.hash.to(Bytes4), next: testcase.id.next)
let compatible = com.compatibleForkId(fid, BlockNumber(testcase.head), EthTime(testcase.time))
Expand Down
2 changes: 1 addition & 1 deletion tests/test_generalstate_json.nim
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ proc dumpDebugData(ctx: TestCtx, vmState: BaseVMState, gasUsed: GasInt, success:

proc testFixtureIndexes(ctx: var TestCtx, testStatusIMPL: var TestStatus) =
let
com = CommonRef.new(newCoreDbRef DefaultDbMemory, nil, ctx.chainConfig)
com = CommonRef.new(newCoreDbRef DefaultDbMemory, ctx.chainConfig)
parent = Header(stateRoot: emptyRoot)
tracer = if ctx.trace:
newLegacyTracer({})
Expand Down
Loading
Loading