Skip to content

Commit 95fc35b

Browse files
authored
Rewrite logic to process more than 127 bytes at a time (#14)
Increase the 'carry' size, decrease the num of queues and add ability to push the same piece of memory through the layers. While the code could be made even smarter, pushing this version is a good starting point. This change gives a 4x speedup.
1 parent bb18506 commit 95fc35b

File tree

4 files changed

+188
-131
lines changed

4 files changed

+188
-131
lines changed

commp.go

Lines changed: 138 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ type Calc struct {
2626
mu sync.Mutex
2727
}
2828
type state struct {
29-
bytesConsumed uint64
29+
quadsEnqueued uint64
3030
layerQueues [MaxLayers + 2]chan []byte // one extra layer for the initial leaves, one more for the dummy never-to-use channel
3131
resultCommP chan []byte
32-
carry []byte
32+
buffer []byte
3333
}
3434

3535
var _ hash.Hash = &Calc{} // make sure we are hash.Hash compliant
@@ -50,42 +50,49 @@ const MaxPiecePayload = MaxPieceSize / 128 * 127
5050
// at least this amount of bytes.
5151
const MinPiecePayload = uint64(65)
5252

53+
const (
54+
commpDigestSize = sha256simd.Size
55+
quadPayload = 127
56+
bufferSize = 256 * quadPayload // FIXME: tune better, chosen by rough experiment
57+
)
58+
5359
var (
54-
layerQueueDepth = 256 // SANCHECK: too much? too little? can't think this through right now...
60+
layerQueueDepth = 32 // FIXME: tune better, chosen by rough experiment
5561
shaPool = sync.Pool{New: func() interface{} { return sha256simd.New() }}
5662
stackedNulPadding [MaxLayers][]byte
5763
)
5864

5965
// initialize the nul padding stack (cheap to do upfront, just MaxLayers loops)
6066
func init() {
6167
h := shaPool.Get().(hash.Hash)
62-
defer shaPool.Put(h)
6368

64-
stackedNulPadding[0] = make([]byte, 32)
69+
stackedNulPadding[0] = make([]byte, commpDigestSize)
6570
for i := uint(1); i < MaxLayers; i++ {
6671
h.Reset()
67-
h.Write(stackedNulPadding[i-1]) // yes, got to
68-
h.Write(stackedNulPadding[i-1]) // do it twice
69-
stackedNulPadding[i] = h.Sum(make([]byte, 0, 32))
72+
h.Write(stackedNulPadding[i-1]) // yes, got to...
73+
h.Write(stackedNulPadding[i-1]) // ...do it twice
74+
stackedNulPadding[i] = h.Sum(make([]byte, 0, commpDigestSize))
7075
stackedNulPadding[i][31] &= 0x3F
7176
}
77+
78+
shaPool.Put(h)
7279
}
7380

7481
// BlockSize is the amount of bytes consumed by the commP algorithm in one go.
7582
// Write()ing data in multiples of BlockSize would obviate the need to maintain
7683
// an internal carry buffer. The BlockSize of this module is 127 bytes.
77-
func (cp *Calc) BlockSize() int { return 127 }
84+
func (cp *Calc) BlockSize() int { return quadPayload }
7885

7986
// Size is the amount of bytes returned on Sum()/Digest(), which is 32 bytes
8087
// for this module.
81-
func (cp *Calc) Size() int { return 32 }
88+
func (cp *Calc) Size() int { return commpDigestSize }
8289

8390
// Reset re-initializes the accumulator object, clearing its state and
8491
// terminating all background goroutines. It is safe to Reset() an accumulator
8592
// in any state.
8693
func (cp *Calc) Reset() {
8794
cp.mu.Lock()
88-
if cp.bytesConsumed != 0 {
95+
if cp.buffer != nil {
8996
// we are resetting without digesting: close everything out to terminate
9097
// the layer workers
9198
close(cp.layerQueues[0])
@@ -123,28 +130,33 @@ func (cp *Calc) Digest() (commP []byte, paddedPieceSize uint64, err error) {
123130
cp.mu.Unlock()
124131
}()
125132

126-
if cp.bytesConsumed < MinPiecePayload {
133+
if processed := cp.quadsEnqueued*quadPayload + uint64(len(cp.buffer)); processed < MinPiecePayload {
127134
err = xerrors.Errorf(
128135
"insufficient state accumulated: commP is not defined for inputs shorter than %d bytes, but only %d processed so far",
129-
MinPiecePayload, cp.bytesConsumed,
136+
MinPiecePayload, processed,
130137
)
131138
return
132139
}
133140

134141
// If any, flush remaining bytes padded up with zeroes
135-
if len(cp.carry) > 0 {
136-
if len(cp.carry) < 127 {
137-
cp.carry = append(cp.carry, make([]byte, 127-len(cp.carry))...)
142+
if len(cp.buffer) > 0 {
143+
if mod := len(cp.buffer) % quadPayload; mod != 0 {
144+
cp.buffer = append(cp.buffer, make([]byte, quadPayload-mod)...)
145+
}
146+
for len(cp.buffer) > 0 {
147+
// FIXME: there is a smarter way to do this instead of 127-at-a-time,
148+
// but that's for another PR
149+
cp.digestQuads(cp.buffer[:127])
150+
cp.buffer = cp.buffer[127:]
138151
}
139-
cp.digestLeading127Bytes(cp.carry)
140152
}
141153

142154
// This is how we signal to the bottom of the stack that we are done
143155
// which in turn collapses the rest all the way to resultCommP
144156
close(cp.layerQueues[0])
145157

158+
paddedPieceSize = cp.quadsEnqueued * 128
146159
// hacky round-up-to-next-pow2
147-
paddedPieceSize = ((cp.bytesConsumed + 126) / 127 * 128) // why is 6 afraid of 7...?
148160
if bits.OnesCount64(paddedPieceSize) != 1 {
149161
paddedPieceSize = 1 << uint(64-bits.LeadingZeros64(paddedPieceSize))
150162
}
@@ -161,115 +173,110 @@ func (cp *Calc) Digest() (commP []byte, paddedPieceSize uint64, err error) {
161173
// amount of bytes is about to go over the maximum currently supported by
162174
// Filecoin.
163175
func (cp *Calc) Write(input []byte) (int, error) {
164-
inputSize := len(input)
165-
if inputSize == 0 {
176+
if len(input) == 0 {
166177
return 0, nil
167178
}
168179

169180
cp.mu.Lock()
170181
defer cp.mu.Unlock()
171182

172-
if cp.bytesConsumed+uint64(inputSize) > MaxPiecePayload {
183+
if MaxPiecePayload <
184+
(cp.quadsEnqueued*quadPayload)+
185+
uint64(len(input)) {
173186
return 0, xerrors.Errorf(
174-
"writing %d bytes to the accumulator would overflow the maximum supported unpadded piece size %d",
187+
"writing additional %d bytes to the accumulator would overflow the maximum supported unpadded piece size %d",
175188
len(input), MaxPiecePayload,
176189
)
177190
}
178191

179192
// just starting: initialize internal state, start first background layer-goroutine
180-
if cp.bytesConsumed == 0 {
181-
cp.carry = make([]byte, 0, 127)
193+
if cp.buffer == nil {
194+
cp.buffer = make([]byte, 0, bufferSize)
182195
cp.resultCommP = make(chan []byte, 1)
183196
cp.layerQueues[0] = make(chan []byte, layerQueueDepth)
184197
cp.addLayer(0)
185198
}
186199

187-
cp.bytesConsumed += uint64(inputSize)
188-
189-
carrySize := len(cp.carry)
190-
if carrySize > 0 {
200+
// short Write() - just buffer it
201+
if len(cp.buffer)+len(input) < bufferSize {
202+
cp.buffer = append(cp.buffer, input...)
203+
return len(input), nil
204+
}
191205

192-
// super short Write - just carry it
193-
if carrySize+inputSize < 127 {
194-
cp.carry = append(cp.carry, input...)
195-
return inputSize, nil
196-
}
206+
totalInputBytes := len(input)
197207

198-
cp.carry = append(cp.carry, input[:127-carrySize]...)
199-
input = input[127-carrySize:]
208+
if toSplice := bufferSize - len(cp.buffer); toSplice < bufferSize {
209+
cp.buffer = append(cp.buffer, input[:toSplice]...)
210+
input = input[toSplice:]
200211

201-
cp.digestLeading127Bytes(cp.carry)
202-
cp.carry = cp.carry[:0]
212+
cp.digestQuads(cp.buffer)
213+
cp.buffer = cp.buffer[:0]
203214
}
204215

205-
for len(input) >= 127 {
206-
cp.digestLeading127Bytes(input)
207-
input = input[127:]
216+
for len(input) >= bufferSize {
217+
cp.digestQuads(input[:bufferSize])
218+
input = input[bufferSize:]
208219
}
209220

210221
if len(input) > 0 {
211-
cp.carry = cp.carry[:len(input)]
212-
copy(cp.carry, input)
222+
cp.buffer = append(cp.buffer, input...)
213223
}
214224

215-
return inputSize, nil
225+
return totalInputBytes, nil
216226
}
217227

218-
func (cp *Calc) digestLeading127Bytes(input []byte) {
219-
220-
// Holds this round's shifts of the original 127 bytes plus the 6 bit overflow
221-
// at the end of the expansion cycle. We *do not* reuse this array: it is
222-
// being fed piece-wise to hash254Into which in turn reuses it for the result
223-
var expander [128]byte
224-
225-
// Cycle over four(4) 31-byte groups, leaving 1 byte in between:
226-
// 31 + 1 + 31 + 1 + 31 + 1 + 31 = 127
227-
228-
// First 31 bytes + 6 bits are taken as-is (trimmed later)
229-
// Note that copying them into the expansion buffer is mandatory:
230-
// we will be feeding it to the workers which reuse the bottom half
231-
// of the chunk for the result
232-
copy(expander[:], input[:32])
233-
234-
// first 2-bit "shim" forced into the otherwise identical bitstream
235-
expander[31] &= 0x3F
236-
237-
// simplify pointer math
238-
inputPlus1, expanderPlus1 := input[1:], expander[1:]
239-
240-
// In: {{ C[7] C[6] }} X[7] X[6] X[5] X[4] X[3] X[2] X[1] X[0] Y[7] Y[6] Y[5] Y[4] Y[3] Y[2] Y[1] Y[0] Z[7] Z[6] Z[5]...
241-
// Out: X[5] X[4] X[3] X[2] X[1] X[0] C[7] C[6] Y[5] Y[4] Y[3] Y[2] Y[1] Y[0] X[7] X[6] Z[5] Z[4] Z[3]...
242-
for i := 31; i < 63; i++ {
243-
expanderPlus1[i] = inputPlus1[i]<<2 | input[i]>>6
244-
}
228+
// always called with power-of-2 amount of quads
229+
func (cp *Calc) digestQuads(inSlab []byte) {
230+
231+
quadsCount := len(inSlab) / 127
232+
cp.quadsEnqueued += uint64(quadsCount)
233+
outSlab := make([]byte, quadsCount*128)
234+
235+
for j := 0; j < quadsCount; j++ {
236+
// Cycle over four(4) 31-byte groups, leaving 1 byte in between:
237+
// 31 + 1 + 31 + 1 + 31 + 1 + 31 = 127
238+
input := inSlab[j*127 : (j+1)*127]
239+
expander := outSlab[j*128 : (j+1)*128]
240+
inputPlus1, expanderPlus1 := input[1:], expander[1:]
241+
242+
// First 31 bytes + 6 bits are taken as-is (trimmed later)
243+
// Note that copying them into the expansion buffer is mandatory:
244+
// we will be feeding it to the workers which reuse the bottom half
245+
// of the chunk for the result
246+
copy(expander[:], input[:32])
247+
248+
// first 2-bit "shim" forced into the otherwise identical bitstream
249+
expander[31] &= 0x3F
250+
251+
// In: {{ C[7] C[6] }} X[7] X[6] X[5] X[4] X[3] X[2] X[1] X[0] Y[7] Y[6] Y[5] Y[4] Y[3] Y[2] Y[1] Y[0] Z[7] Z[6] Z[5]...
252+
// Out: X[5] X[4] X[3] X[2] X[1] X[0] C[7] C[6] Y[5] Y[4] Y[3] Y[2] Y[1] Y[0] X[7] X[6] Z[5] Z[4] Z[3]...
253+
for i := 31; i < 63; i++ {
254+
expanderPlus1[i] = inputPlus1[i]<<2 | input[i]>>6
255+
}
245256

246-
// next 2-bit shim
247-
expander[63] &= 0x3F
257+
// next 2-bit shim
258+
expander[63] &= 0x3F
248259

249-
// ready to dispatch first half
250-
cp.layerQueues[0] <- expander[0:32]
251-
cp.layerQueues[0] <- expander[32:64]
260+
// In: {{ C[7] C[6] C[5] C[4] }} X[7] X[6] X[5] X[4] X[3] X[2] X[1] X[0] Y[7] Y[6] Y[5] Y[4] Y[3] Y[2] Y[1] Y[0] Z[7] Z[6] Z[5]...
261+
// Out: X[3] X[2] X[1] X[0] C[7] C[6] C[5] C[4] Y[3] Y[2] Y[1] Y[0] X[7] X[6] X[5] X[4] Z[3] Z[2] Z[1]...
262+
for i := 63; i < 95; i++ {
263+
expanderPlus1[i] = inputPlus1[i]<<4 | input[i]>>4
264+
}
252265

253-
// In: {{ C[7] C[6] C[5] C[4] }} X[7] X[6] X[5] X[4] X[3] X[2] X[1] X[0] Y[7] Y[6] Y[5] Y[4] Y[3] Y[2] Y[1] Y[0] Z[7] Z[6] Z[5]...
254-
// Out: X[3] X[2] X[1] X[0] C[7] C[6] C[5] C[4] Y[3] Y[2] Y[1] Y[0] X[7] X[6] X[5] X[4] Z[3] Z[2] Z[1]...
255-
for i := 63; i < 95; i++ {
256-
expanderPlus1[i] = inputPlus1[i]<<4 | input[i]>>4
257-
}
266+
// next 2-bit shim
267+
expander[95] &= 0x3F
258268

259-
// next 2-bit shim
260-
expander[95] &= 0x3F
269+
// In: {{ C[7] C[6] C[5] C[4] C[3] C[2] }} X[7] X[6] X[5] X[4] X[3] X[2] X[1] X[0] Y[7] Y[6] Y[5] Y[4] Y[3] Y[2] Y[1] Y[0] Z[7] Z[6] Z[5]...
270+
// Out: X[1] X[0] C[7] C[6] C[5] C[4] C[3] C[2] Y[1] Y[0] X[7] X[6] X[5] X[4] X[3] X[2] Z[1] Z[0] Y[7]...
271+
for i := 95; i < 126; i++ {
272+
expanderPlus1[i] = inputPlus1[i]<<6 | input[i]>>2
273+
}
261274

262-
// In: {{ C[7] C[6] C[5] C[4] C[3] C[2] }} X[7] X[6] X[5] X[4] X[3] X[2] X[1] X[0] Y[7] Y[6] Y[5] Y[4] Y[3] Y[2] Y[1] Y[0] Z[7] Z[6] Z[5]...
263-
// Out: X[1] X[0] C[7] C[6] C[5] C[4] C[3] C[2] Y[1] Y[0] X[7] X[6] X[5] X[4] X[3] X[2] Z[1] Z[0] Y[7]...
264-
for i := 95; i < 126; i++ {
265-
expanderPlus1[i] = inputPlus1[i]<<6 | input[i]>>2
275+
// the final 6 bit remainder is exactly the value of the last expanded byte
276+
expander[127] = input[126] >> 2
266277
}
267-
// the final 6 bit remainder is exactly the value of the last expanded byte
268-
expander[127] = input[126] >> 2
269278

270-
// and dispatch remainder
271-
cp.layerQueues[0] <- expander[64:96]
272-
cp.layerQueues[0] <- expander[96:128]
279+
cp.layerQueues[0] <- outSlab
273280
}
274281

275282
func (cp *Calc) addLayer(myIdx uint) {
@@ -280,60 +287,71 @@ func (cp *Calc) addLayer(myIdx uint) {
280287
cp.layerQueues[myIdx+1] = make(chan []byte, layerQueueDepth)
281288

282289
go func() {
283-
var chunkHold []byte
290+
var twinHold []byte
284291

285292
for {
286-
287-
chunk, queueIsOpen := <-cp.layerQueues[myIdx]
293+
slab, queueIsOpen := <-cp.layerQueues[myIdx]
288294

289295
// the dream is collapsing
290296
if !queueIsOpen {
297+
defer func() { twinHold = nil }()
291298

292299
// I am last
293300
if myIdx == MaxLayers || cp.layerQueues[myIdx+2] == nil {
294-
cp.resultCommP <- chunkHold
301+
cp.resultCommP <- append(make([]byte, 0, 32), twinHold[0:32]...)
295302
return
296303
}
297304

298-
if chunkHold != nil {
299-
cp.hash254Into(
300-
cp.layerQueues[myIdx+1],
301-
chunkHold,
302-
stackedNulPadding[myIdx],
303-
)
305+
if twinHold != nil {
306+
copy(twinHold[32:64], stackedNulPadding[myIdx])
307+
cp.hashSlab254(0, twinHold[0:64])
308+
cp.layerQueues[myIdx+1] <- twinHold[0:64:64]
304309
}
305310

306311
// signal the next in line that they are done too
307312
close(cp.layerQueues[myIdx+1])
308313
return
309314
}
310315

311-
if chunkHold == nil {
312-
chunkHold = chunk
313-
} else {
314-
315-
// We are last right now
316-
// n.b. we will not blow out of the preallocated layerQueues array,
317-
// as we disallow Write()s above a certain threshold
318-
if cp.layerQueues[myIdx+2] == nil {
319-
cp.addLayer(myIdx + 1)
320-
}
316+
var pushedWork bool
317+
318+
switch {
319+
case len(slab) > 1<<(5+myIdx):
320+
cp.hashSlab254(myIdx, slab)
321+
cp.layerQueues[myIdx+1] <- slab
322+
pushedWork = true
323+
case twinHold != nil:
324+
copy(twinHold[32:64], slab[0:32])
325+
cp.hashSlab254(0, twinHold[0:64])
326+
cp.layerQueues[myIdx+1] <- twinHold[0:32:64]
327+
pushedWork = true
328+
twinHold = nil
329+
default:
330+
twinHold = slab[0:32:64]
331+
}
321332

322-
cp.hash254Into(cp.layerQueues[myIdx+1], chunkHold, chunk)
323-
chunkHold = nil
333+
// Check whether we need another worker
334+
//
335+
// n.b. we will not blow out of the preallocated layerQueues array,
336+
// as we disallow Write()s above a certain threshold
337+
if pushedWork && cp.layerQueues[myIdx+2] == nil {
338+
cp.addLayer(myIdx + 1)
324339
}
325340
}
326341
}()
327342
}
328343

329-
func (cp *Calc) hash254Into(out chan<- []byte, half1ToOverwrite, half2 []byte) {
344+
func (cp *Calc) hashSlab254(layerIdx uint, slab []byte) {
330345
h := shaPool.Get().(hash.Hash)
331-
h.Reset()
332-
h.Write(half1ToOverwrite)
333-
h.Write(half2)
334-
d := h.Sum(half1ToOverwrite[:0]) // callers expect we will reuse-reduce-recycle
335-
d[31] &= 0x3F
336-
out <- d
346+
347+
stride := 1 << (5 + layerIdx)
348+
for i := 0; len(slab) > i+stride; i += 2 * stride {
349+
h.Reset()
350+
h.Write(slab[i : i+32])
351+
h.Write(slab[i+stride : 32+i+stride])
352+
h.Sum(slab[i:i])[31] &= 0x3F // callers expect we will reuse-reduce-recycle
353+
}
354+
337355
shaPool.Put(h)
338356
}
339357

0 commit comments

Comments
 (0)