diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 40b16075b1..68984f4c2e 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1771,9 +1771,9 @@ proc reconstructDataColumns(node: BeaconNode, slot: Slot) = node.dag.cfg.NUMBER_OF_CUSTODY_GROUPS div 2: return - # Currently, this logic is broken - if true: - return + # # Currently, this logic is broken + # if true: + # return logScope: slot = slot diff --git a/beacon_chain/spec/peerdas_helpers.nim b/beacon_chain/spec/peerdas_helpers.nim index eaf3fe1ac2..1ac36dcf76 100644 --- a/beacon_chain/spec/peerdas_helpers.nim +++ b/beacon_chain/spec/peerdas_helpers.nim @@ -23,6 +23,7 @@ import from std/algorithm import sort from std/sequtils import toSeq from stew/staticfor import staticFor +from system/ansi_c import c_malloc, c_free type CellBytes = array[fulu.CELLS_PER_EXT_BLOB, Cell] @@ -153,58 +154,172 @@ proc recover_cells_and_proofs_parallel*( tp: Taskpool, dataColumns: seq[ref fulu.DataColumnSidecar]): Result[seq[CellsAndProofs], cstring] = - ## This helper recovers blobs from the data column sidecars parallelly + ## Recover blobs from data column sidecars in parallel. + ## - Uses unmanaged C buffers for worker inputs so no Nim GC objects + ## - Bounds in-flight tasks to limit peak memory/alloc pressure. + ## - Ensures all spawned tasks are awaited (drained) on any early return. + if dataColumns.len == 0: return err("DataColumnSidecar: Length should not be 0") + if dataColumns.len > NUMBER_OF_COLUMNS: + return err("DataColumnSidecar: Length exceeds NUMBER_OF_COLUMNS") let columnCount = dataColumns.len blobCount = dataColumns[0].column.len for column in dataColumns: - if not (blobCount == column.column.len): + if blobCount != column.column.len: return err("DataColumns do not have the same length") + # Worker that runs on a taskpool thread. It only sees raw pointers and + # constructs its own worker-local seqs (on the worker's heap) before calling + # the KZG recovery routine. Keeps GC objects thread-local. + proc workerRecover(idxPtr: ptr CellIndex, cellsPtr: ptr Cell, + columnCount: int): Result[CellsAndProofs, void] = + ## Worker runs on a taskpool thread. It receives raw C buffers (ptr) and + ## converts them into worker-local seqs before calling the KZG recovery + ## routine, so no Nim GC objects cross thread-local heaps. + var + localIndices = newSeq[CellIndex](columnCount) + localCells = newSeq[Cell](columnCount) + let + idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr) + cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr) + for j in 0 ..< columnCount: + localIndices[j] = idxArr[j] + localCells[j] = cellsArr[j] + # use the task wrapper which maps string errors to void + recoverCellsAndKzgProofsTask(localIndices, localCells) + var - pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] + pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] = @[] + pendingIdxPtrs: seq[ptr CellIndex] = @[] + pendingCellsPtrs: seq[ptr Cell] = @[] res = newSeq[CellsAndProofs](blobCount) + # pre-size sequences so we can index-assign without reallocs + pendingFuts.setLen(blobCount) + pendingIdxPtrs.setLen(blobCount) + pendingCellsPtrs.setLen(blobCount) + + # track how many we've actually spawned + var spawned = 0 + + # Choose a sane limit for concurrent tasks to reduce peak memory/alloc pressure. + let maxInFlight = min(blobCount, 9) + let startTime = Moment.now() const reconstructionTimeout = 2.seconds - # ---- Spawn phase with time limit ---- + proc freePendingPtrPair(idxPtr: ptr CellIndex, cellsPtr: ptr Cell) = + if not idxPtr.isNil: + c_free(idxPtr) + if not cellsPtr.isNil: + c_free(cellsPtr) + + proc freeAllAllocated() = + ## Free all C memory allocated so far, without syncing futures. + ## Only call this on error paths where we're aborting. + for j in 0 ..< spawned: + freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) + pendingIdxPtrs[j] = nil + pendingCellsPtrs[j] = nil + + var completed = 0 + + # ---- Spawn + bounded-await loop ---- for blobIdx in 0 ..< blobCount: let now = Moment.now() if (now - startTime) > reconstructionTimeout: - debug "PeerDAS reconstruction timed out while preparing columns", - spawned = pendingFuts.len, total = blobCount - break # Stop spawning new tasks + trace "PeerDAS reconstruction timed out while preparing columns", + spawned = spawned, total = blobCount + freeAllAllocated() + return err("Data column reconstruction timed out") - var - cellIndices = newSeq[CellIndex](columnCount) - cells = newSeq[Cell](columnCount) + # Allocate unmanaged C buffers and copy data into them + let + idxBytes = csize_t(columnCount) * csize_t(sizeof(CellIndex)) + cellsBytes = csize_t(columnCount) * csize_t(sizeof(Cell)) + idxPtr = cast[ptr CellIndex](c_malloc(idxBytes)) + if idxPtr == nil: + freeAllAllocated() + return err("Failed to allocate memory for cell indices during reconstruction") + let cellsPtr = cast[ptr Cell](c_malloc(cellsBytes)) + if cellsPtr == nil: + c_free(idxPtr) + freeAllAllocated() + return err("Failed to allocate memory for cell data during reconstruction") + + # populate C buffers via UncheckedArray casts + let + idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr) + cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr) for i in 0 ..< dataColumns.len: - cellIndices[i] = dataColumns[i][].index - cells[i] = dataColumns[i][].column[blobIdx] - pendingFuts.add(tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells)) - - # ---- Sync phase ---- - for i in 0 ..< pendingFuts.len: + idxArr[i] = dataColumns[i][].index + cellsArr[i] = dataColumns[i][].column[blobIdx] + + # store into pre-sized arrays by index and spawn worker + pendingIdxPtrs[spawned] = idxPtr + pendingCellsPtrs[spawned] = cellsPtr + pendingFuts[spawned] = tp.spawn workerRecover(idxPtr, cellsPtr, columnCount) + inc spawned + + # If too many in-flight tasks, await the oldest one + while spawned - completed >= maxInFlight: + let now2 = Moment.now() + if (now2 - startTime) > reconstructionTimeout: + trace "PeerDAS reconstruction timed out while awaiting tasks", + completed = completed, totalSpawned = spawned + # Free memory for tasks we haven't synced yet + for j in completed ..< spawned: + freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) + pendingIdxPtrs[j] = nil + pendingCellsPtrs[j] = nil + return err("Data column reconstruction timed out") + + let futRes = sync pendingFuts[completed] + freePendingPtrPair(pendingIdxPtrs[completed], pendingCellsPtrs[completed]) + pendingIdxPtrs[completed] = nil + pendingCellsPtrs[completed] = nil + + if futRes.isErr: + # Free memory for remaining unsynced tasks + for j in completed + 1 ..< spawned: + freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) + pendingIdxPtrs[j] = nil + pendingCellsPtrs[j] = nil + return err("KZG cells and proofs recovery failed") + res[completed] = futRes.get + inc completed + + # ---- Wait for remaining spawned tasks ---- + for i in completed ..< spawned: let now = Moment.now() if (now - startTime) > reconstructionTimeout: - debug "PeerDAS reconstruction timed out", - completed = i, totalSpawned = pendingFuts.len + trace "PeerDAS reconstruction timed out during final sync", + completed = i, totalSpawned = spawned + # Free memory for tasks we haven't synced yet + for j in i ..< spawned: + freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) + pendingIdxPtrs[j] = nil + pendingCellsPtrs[j] = nil return err("Data column reconstruction timed out") let futRes = sync pendingFuts[i] + freePendingPtrPair(pendingIdxPtrs[i], pendingCellsPtrs[i]) + pendingIdxPtrs[i] = nil + pendingCellsPtrs[i] = nil + if futRes.isErr: + # Free memory for remaining unsynced tasks + for j in i + 1 ..< spawned: + freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) + pendingIdxPtrs[j] = nil + pendingCellsPtrs[j] = nil return err("KZG cells and proofs recovery failed") - res[i] = futRes.get - if pendingFuts.len < blobCount: - return err("Data column reconstruction timed out") - ok(res) proc assemble_data_column_sidecars*(