Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions partialmessages/partialmsgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,14 @@ type PublishOptions struct {
// PublishToPeers limits the publishing to only the specified peers.
// If nil, will use the topic's mesh peers.
PublishToPeers []peer.ID
// EagerPush is data that will be eagerly pushed to peers in a PartialMessage
EagerPush []byte
// EagerPushWithPartsMetadata is used to eagerly push data to peers if we have
// not yet received their parts metadata. The encoded partial message sent
// is generated by calling `PartialMessageBytes` with the provided
// PartsMetadata.
//
// If nil, no partial message is sent to a peer unless we have received
// their partsMetadata first.
EagerPushWithPartsMetadata PartsMetadata
}

type Router interface {
Expand Down Expand Up @@ -225,6 +231,15 @@ func (e *PartialMessagesExtension) PublishPartial(topic string, partial Message,
} else {
peers = e.peersToPublishTo(topic, state)
}

var eagerData []byte
if opts.EagerPushWithPartsMetadata != nil {
eagerData, err = partial.PartialMessageBytes(opts.EagerPushWithPartsMetadata)
if err != nil {
return err
}
}

for p := range peers {
log := e.Logger.With("peer", p)
requestedPartial := e.router.PeerRequestsPartial(p, topic)
Expand Down Expand Up @@ -262,10 +277,13 @@ func (e *PartialMessagesExtension) PublishPartial(topic string, partial Message,
// Only send the eager push to the peer if:
// - we didn't reply to an explicit request
// - we have something to eager push
if requestedPartial && !inResponseToIWant && len(opts.EagerPush) > 0 {
if requestedPartial && !inResponseToIWant && len(eagerData) > 0 {
log.Debug("Eager pushing")
sendRPC = true
rpc.PartialMessage = opts.EagerPush
rpc.PartialMessage = eagerData
// Merge the peer's empty partsMetadata with the parts we eagerly pushed.
// This tracks what has been sent to the peer and avoids sending duplicates.
pState.partsMetadata = e.MergePartsMetadata(topic, pState.partsMetadata, opts.EagerPushWithPartsMetadata)
}

// Only send parts metadata if it was different then before
Expand Down Expand Up @@ -369,6 +387,17 @@ func (e *PartialMessagesExtension) HandleRPC(from peer.ID, rpc *pb.PartialMessag
pState.partsMetadata = e.MergePartsMetadata(rpc.GetTopicID(), pState.partsMetadata, rpc.PartsMetadata)
}

if pState, ok := state.peerState[from]; ok && len(pState.sentPartsMetadata) > 0 && len(rpc.PartialMessage) > 0 {
// We have previously sent this peer our parts metadata and they have
// sent us a partial message. We can update the peer's view of our parts
// by merging our parts and their parts.
//
// This works if they are responding to our request or
// if they send data eagerly. In the latter case, they will update our
// view when they receive our parts metadata.
pState.sentPartsMetadata = e.MergePartsMetadata(rpc.GetTopicID(), pState.sentPartsMetadata, pState.partsMetadata)
}

return e.OnIncomingRPC(from, rpc)
}

Expand Down
64 changes: 29 additions & 35 deletions partialmessages/partialmsgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type testPartialMessage struct {
Parts [testPartialMessageLeaves][]byte
Proofs [testPartialMessageLeaves][]merkle.ProofStep

republish func(*testPartialMessage, []byte)
republish func(partial *testPartialMessage, newParts PartsMetadata)
onErr func(error)
}

Expand Down Expand Up @@ -263,7 +263,7 @@ func (t *testPartialMessageChecker) MergePartsMetadata(left, right PartsMetadata

// EmptyMessage implements InvariantChecker.
func (t *testPartialMessageChecker) EmptyMessage() *testPartialMessage {
return &testPartialMessage{Commitment: t.fullMessage.Commitment, republish: func(pm *testPartialMessage, _ []byte) {}}
return &testPartialMessage{Commitment: t.fullMessage.Commitment, republish: func(pm *testPartialMessage, _ PartsMetadata) {}}
}

// Equal implements InvariantChecker.
Expand Down Expand Up @@ -337,7 +337,7 @@ func TestExamplePartialMessageImpl(t *testing.T) {
if err != nil {
t.Fatal(err)
}
full.republish = func(pm *testPartialMessage, _ []byte) {}
full.republish = func(pm *testPartialMessage, _ PartsMetadata) {}

invariant := &testPartialMessageChecker{
fullMessage: full,
Expand Down Expand Up @@ -433,20 +433,21 @@ func createPeers(t *testing.T, topic string, n int, nonMesh bool) *testPeers {
if pm == nil {
pm = &testPartialMessage{
Commitment: groupID,
republish: func(pm *testPartialMessage, _ []byte) {
republish: func(pm *testPartialMessage, _ PartsMetadata) {
handlers[i].PublishPartial(topic, pm, PublishOptions{})
},
}
testPeers.partialMessages[currentPeer][topic][groupKey] = pm
}

beforeParts := pm.PartsMetadata()
// Extend the partial message with the incoming data
recvdNewData := pm.extendFromEncodedPartialMessage(from, rpc.PartialMessage)

if recvdNewData {
// Publish to all peers our new data.
// We'll request and fulfill any
handler.PublishPartial(topic, pm, PublishOptions{})
newParts := bitmap.Bitmap(pm.PartsMetadata())
newParts.Xor(bitmap.Bitmap(beforeParts))
pm.republish(pm, PartsMetadata(newParts))
return nil
}

Expand Down Expand Up @@ -505,7 +506,7 @@ func (tp *testPeers) getOrCreatePartialMessage(peerIndex int, topic string, grou
handler := tp.handlers[peerIndex]
pm = &testPartialMessage{
Commitment: groupID,
republish: func(pm *testPartialMessage, _ []byte) {
republish: func(pm *testPartialMessage, _ PartsMetadata) {
handler.PublishPartial(topic, pm, PublishOptions{})
},
}
Expand Down Expand Up @@ -558,7 +559,7 @@ func newFullTestMessage(r io.Reader, ext *PartialMessagesExtension, topic string
for i := range out.Parts {
out.Proofs[i] = merkle.MerkleProof(out.Parts[:], i)
}
out.republish = func(pm *testPartialMessage, _ []byte) {
out.republish = func(pm *testPartialMessage, _ PartsMetadata) {
ext.PublishPartial(topic, pm, PublishOptions{})
}
return out, nil
Expand All @@ -567,7 +568,7 @@ func newFullTestMessage(r io.Reader, ext *PartialMessagesExtension, topic string
func newEmptyTestMessage(commitment []byte, ext *PartialMessagesExtension, topic string) *testPartialMessage {
return &testPartialMessage{
Commitment: commitment,
republish: func(pm *testPartialMessage, _ []byte) {
republish: func(pm *testPartialMessage, _ PartsMetadata) {
ext.PublishPartial(topic, pm, PublishOptions{})
},
}
Expand Down Expand Up @@ -619,16 +620,14 @@ func TestPartialMessages(t *testing.T) {
t.Fatal(err)
}

msgBytes, err := h1Msg.PartialMessageBytes(nil)
// h1 knows the full message and eager pushes
err = peers.handlers[0].PublishPartial(topic, h1Msg, PublishOptions{
EagerPushWithPartsMetadata: []byte{0x00}, // All parts
})
if err != nil {
t.Fatal(err)
}

// h1 knows the full message and eager pushes
peers.handlers[0].PublishPartial(topic, h1Msg, PublishOptions{
EagerPush: msgBytes,
})

// h2 will receive partial message data through OnIncomingRPC
// We can access it through our tracking system
lastPartialMessageh2 := peers.getOrCreatePartialMessage(1, topic, h1Msg.Commitment)
Expand All @@ -646,14 +645,10 @@ func TestPartialMessages(t *testing.T) {
if err != nil {
t.Fatal(err)
}
msgBytes, err = h2Msg.PartialMessageBytes(nil)
if err != nil {
t.Fatal(err)
}

// h2 knows the full message and eager pushes
peers.handlers[1].PublishPartial(topic, h2Msg, PublishOptions{
EagerPush: msgBytes,
EagerPushWithPartsMetadata: []byte{0x00}, // All parts h2 has
})

// h1 will receive partial message data through OnIncomingRPC
Expand Down Expand Up @@ -701,7 +696,6 @@ func TestPartialMessages(t *testing.T) {
peers := createPeers(t, topic, 2, true)
peers.network.addPeers()
defer peers.network.removePeers()
slog.Debug("starting")

h1Msg, err := newFullTestMessage(rand, peers.handlers[0], topic)
if err != nil {
Expand Down Expand Up @@ -767,8 +761,8 @@ func TestPartialMessages(t *testing.T) {
count++
}
}
if count != 2 {
t.Fatal("h2 should only have sent two parts updates")
if count != 1 {
t.Fatal("h2 should only have sent one part updates")
}

// Assert h2 has the full message
Expand Down Expand Up @@ -827,8 +821,8 @@ func TestPartialMessages(t *testing.T) {
count++
}
}
if count != 2 {
t.Fatal("h2 should only have sent two parts updates")
if count != 1 {
t.Fatal("h2 should only have sent one part update")
}

// Assert h2 has the full message
Expand Down Expand Up @@ -968,26 +962,26 @@ func TestPartialMessages(t *testing.T) {
h1Msg.Parts[i] = fullMsg.Parts[i]
h1Msg.Proofs[i] = fullMsg.Proofs[i]
}
h1MsgEncoded, err := h1Msg.PartialMessageBytes(nil)
if err != nil {
t.Fatal(err)
}

// Peer 2 has no parts
h2Msg := newEmptyTestMessage(fullMsg.Commitment, peers.handlers[1], topic)

// Eagerly push new data to peers
h2Msg.republish = func(pm *testPartialMessage, newData []byte) {
peers.handlers[1].PublishPartial(topic, pm, PublishOptions{EagerPush: newData})
h2Msg.republish = func(pm *testPartialMessage, newParts PartsMetadata) {
// Flip because we want to select these parts for the eager push
bitmap.Bitmap(newParts).Flip()
peers.handlers[1].PublishPartial(topic, pm, PublishOptions{EagerPushWithPartsMetadata: newParts})
}
// Peer 3 has no parts
h3Msg := newEmptyTestMessage(fullMsg.Commitment, peers.handlers[2], topic)
// Eagerly push new data to peers
h3Msg.republish = func(pm *testPartialMessage, newData []byte) {
peers.handlers[2].PublishPartial(topic, pm, PublishOptions{EagerPush: newData})
h3Msg.republish = func(pm *testPartialMessage, newParts PartsMetadata) {
// Flip because we want to select these parts for the eager push
bitmap.Bitmap(newParts).Flip()
peers.handlers[2].PublishPartial(topic, pm, PublishOptions{EagerPushWithPartsMetadata: newParts})
}
// All peers publish their partial messages
peers.handlers[0].PublishPartial(topic, h1Msg, PublishOptions{EagerPush: h1MsgEncoded})
peers.handlers[0].PublishPartial(topic, h1Msg, PublishOptions{EagerPushWithPartsMetadata: []byte{0x00}})
peers.registerMessage(0, topic, h1Msg)
peers.handlers[1].PublishPartial(topic, h2Msg, PublishOptions{})
peers.registerMessage(1, topic, h2Msg)
Expand Down