Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1771,9 +1771,9 @@ proc reconstructDataColumns(node: BeaconNode, slot: Slot) =
node.dag.cfg.NUMBER_OF_CUSTODY_GROUPS div 2:
return

# Currently, this logic is broken
if true:
return
# # Currently, this logic is broken
# if true:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can just remove this if entirely, rather than commenting it out.

# return

logScope:
slot = slot
Expand Down
159 changes: 137 additions & 22 deletions beacon_chain/spec/peerdas_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import
from std/algorithm import sort
from std/sequtils import toSeq
from stew/staticfor import staticFor
from system/ansi_c import c_malloc, c_free

type
CellBytes = array[fulu.CELLS_PER_EXT_BLOB, Cell]
Expand Down Expand Up @@ -153,58 +154,172 @@ proc recover_cells_and_proofs_parallel*(
tp: Taskpool,
dataColumns: seq[ref fulu.DataColumnSidecar]):
Result[seq[CellsAndProofs], cstring] =
## This helper recovers blobs from the data column sidecars parallelly
## Recover blobs from data column sidecars in parallel.
## - Uses unmanaged C buffers for worker inputs so no Nim GC objects
## - Bounds in-flight tasks to limit peak memory/alloc pressure.
## - Ensures all spawned tasks are awaited (drained) on any early return.

if dataColumns.len == 0:
return err("DataColumnSidecar: Length should not be 0")
if dataColumns.len > NUMBER_OF_COLUMNS:
return err("DataColumnSidecar: Length exceeds NUMBER_OF_COLUMNS")

let
columnCount = dataColumns.len
blobCount = dataColumns[0].column.len

for column in dataColumns:
if not (blobCount == column.column.len):
if blobCount != column.column.len:
return err("DataColumns do not have the same length")

# Worker that runs on a taskpool thread. It only sees raw pointers and
# constructs its own worker-local seqs (on the worker's heap) before calling
# the KZG recovery routine. Keeps GC objects thread-local.
proc workerRecover(idxPtr: ptr CellIndex, cellsPtr: ptr Cell,
columnCount: int): Result[CellsAndProofs, void] =
## Worker runs on a taskpool thread. It receives raw C buffers (ptr) and
## converts them into worker-local seqs before calling the KZG recovery
## routine, so no Nim GC objects cross thread-local heaps.
var
localIndices = newSeq[CellIndex](columnCount)
localCells = newSeq[Cell](columnCount)
let
idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr)
cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr)
for j in 0 ..< columnCount:
localIndices[j] = idxArr[j]
localCells[j] = cellsArr[j]
# use the task wrapper which maps string errors to void
recoverCellsAndKzgProofsTask(localIndices, localCells)

var
pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]]
pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] = @[]
pendingIdxPtrs: seq[ptr CellIndex] = @[]
pendingCellsPtrs: seq[ptr Cell] = @[]
res = newSeq[CellsAndProofs](blobCount)

# pre-size sequences so we can index-assign without reallocs
pendingFuts.setLen(blobCount)
pendingIdxPtrs.setLen(blobCount)
pendingCellsPtrs.setLen(blobCount)

# track how many we've actually spawned
var spawned = 0

# Choose a sane limit for concurrent tasks to reduce peak memory/alloc pressure.
let maxInFlight = min(blobCount, 9)

let startTime = Moment.now()
const reconstructionTimeout = 2.seconds

# ---- Spawn phase with time limit ----
proc freePendingPtrPair(idxPtr: ptr CellIndex, cellsPtr: ptr Cell) =
if not idxPtr.isNil:
c_free(idxPtr)
if not cellsPtr.isNil:
c_free(cellsPtr)

proc freeAllAllocated() =
## Free all C memory allocated so far, without syncing futures.
## Only call this on error paths where we're aborting.
for j in 0 ..< spawned:
freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j])
pendingIdxPtrs[j] = nil
pendingCellsPtrs[j] = nil

var completed = 0

# ---- Spawn + bounded-await loop ----
for blobIdx in 0 ..< blobCount:
let now = Moment.now()
if (now - startTime) > reconstructionTimeout:
debug "PeerDAS reconstruction timed out while preparing columns",
spawned = pendingFuts.len, total = blobCount
break # Stop spawning new tasks
trace "PeerDAS reconstruction timed out while preparing columns",
spawned = spawned, total = blobCount
freeAllAllocated()
return err("Data column reconstruction timed out")

var
cellIndices = newSeq[CellIndex](columnCount)
cells = newSeq[Cell](columnCount)
# Allocate unmanaged C buffers and copy data into them
let
idxBytes = csize_t(columnCount) * csize_t(sizeof(CellIndex))
cellsBytes = csize_t(columnCount) * csize_t(sizeof(Cell))
idxPtr = cast[ptr CellIndex](c_malloc(idxBytes))
if idxPtr == nil:
freeAllAllocated()
return err("Failed to allocate memory for cell indices during reconstruction")
let cellsPtr = cast[ptr Cell](c_malloc(cellsBytes))
if cellsPtr == nil:
c_free(idxPtr)
freeAllAllocated()
return err("Failed to allocate memory for cell data during reconstruction")

# populate C buffers via UncheckedArray casts
let
idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr)
cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr)
for i in 0 ..< dataColumns.len:
cellIndices[i] = dataColumns[i][].index
cells[i] = dataColumns[i][].column[blobIdx]
pendingFuts.add(tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells))

# ---- Sync phase ----
for i in 0 ..< pendingFuts.len:
idxArr[i] = dataColumns[i][].index
cellsArr[i] = dataColumns[i][].column[blobIdx]

# store into pre-sized arrays by index and spawn worker
pendingIdxPtrs[spawned] = idxPtr
pendingCellsPtrs[spawned] = cellsPtr
pendingFuts[spawned] = tp.spawn workerRecover(idxPtr, cellsPtr, columnCount)
inc spawned

# If too many in-flight tasks, await the oldest one
while spawned - completed >= maxInFlight:
let now2 = Moment.now()
if (now2 - startTime) > reconstructionTimeout:
trace "PeerDAS reconstruction timed out while awaiting tasks",
completed = completed, totalSpawned = spawned
# Free memory for tasks we haven't synced yet
for j in completed ..< spawned:
freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j])
pendingIdxPtrs[j] = nil
pendingCellsPtrs[j] = nil
return err("Data column reconstruction timed out")

let futRes = sync pendingFuts[completed]
freePendingPtrPair(pendingIdxPtrs[completed], pendingCellsPtrs[completed])
pendingIdxPtrs[completed] = nil
pendingCellsPtrs[completed] = nil

if futRes.isErr:
# Free memory for remaining unsynced tasks
for j in completed + 1 ..< spawned:
freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j])
pendingIdxPtrs[j] = nil
pendingCellsPtrs[j] = nil
return err("KZG cells and proofs recovery failed")
res[completed] = futRes.get
inc completed

# ---- Wait for remaining spawned tasks ----
for i in completed ..< spawned:
let now = Moment.now()
if (now - startTime) > reconstructionTimeout:
debug "PeerDAS reconstruction timed out",
completed = i, totalSpawned = pendingFuts.len
trace "PeerDAS reconstruction timed out during final sync",
completed = i, totalSpawned = spawned
# Free memory for tasks we haven't synced yet
for j in i ..< spawned:
freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j])
pendingIdxPtrs[j] = nil
pendingCellsPtrs[j] = nil
return err("Data column reconstruction timed out")

let futRes = sync pendingFuts[i]
freePendingPtrPair(pendingIdxPtrs[i], pendingCellsPtrs[i])
pendingIdxPtrs[i] = nil
pendingCellsPtrs[i] = nil

if futRes.isErr:
# Free memory for remaining unsynced tasks
for j in i + 1 ..< spawned:
freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j])
pendingIdxPtrs[j] = nil
pendingCellsPtrs[j] = nil
return err("KZG cells and proofs recovery failed")

res[i] = futRes.get

if pendingFuts.len < blobCount:
return err("Data column reconstruction timed out")

ok(res)

proc assemble_data_column_sidecars*(
Expand Down
Loading