diff --git a/partialmessages/partialmsgs.go b/partialmessages/partialmsgs.go index ebe3f742..2aa34baa 100644 --- a/partialmessages/partialmsgs.go +++ b/partialmessages/partialmsgs.go @@ -140,8 +140,14 @@ type PublishOptions struct { // PublishToPeers limits the publishing to only the specified peers. // If nil, will use the topic's mesh peers. PublishToPeers []peer.ID - // EagerPush is data that will be eagerly pushed to peers in a PartialMessage - EagerPush []byte + // EagerPushWithPartsMetadata is used to eagerly push data to peers if we have + // not yet received their parts metadata. The encoded partial message sent + // is generated by calling `PartialMessageBytes` with the provided + // PartsMetadata. + // + // If nil, no partial message is sent to a peer unless we have received + // their partsMetadata first. + EagerPushWithPartsMetadata PartsMetadata } type Router interface { @@ -225,6 +231,15 @@ func (e *PartialMessagesExtension) PublishPartial(topic string, partial Message, } else { peers = e.peersToPublishTo(topic, state) } + + var eagerData []byte + if opts.EagerPushWithPartsMetadata != nil { + eagerData, err = partial.PartialMessageBytes(opts.EagerPushWithPartsMetadata) + if err != nil { + return err + } + } + for p := range peers { log := e.Logger.With("peer", p) requestedPartial := e.router.PeerRequestsPartial(p, topic) @@ -262,10 +277,13 @@ func (e *PartialMessagesExtension) PublishPartial(topic string, partial Message, // Only send the eager push to the peer if: // - we didn't reply to an explicit request // - we have something to eager push - if requestedPartial && !inResponseToIWant && len(opts.EagerPush) > 0 { + if requestedPartial && !inResponseToIWant && len(eagerData) > 0 { log.Debug("Eager pushing") sendRPC = true - rpc.PartialMessage = opts.EagerPush + rpc.PartialMessage = eagerData + // Merge the peer's empty partsMetadata with the parts we eagerly pushed. + // This tracks what has been sent to the peer and avoids sending duplicates. + pState.partsMetadata = e.MergePartsMetadata(topic, pState.partsMetadata, opts.EagerPushWithPartsMetadata) } // Only send parts metadata if it was different then before @@ -369,6 +387,17 @@ func (e *PartialMessagesExtension) HandleRPC(from peer.ID, rpc *pb.PartialMessag pState.partsMetadata = e.MergePartsMetadata(rpc.GetTopicID(), pState.partsMetadata, rpc.PartsMetadata) } + if pState, ok := state.peerState[from]; ok && len(pState.sentPartsMetadata) > 0 && len(rpc.PartialMessage) > 0 { + // We have previously sent this peer our parts metadata and they have + // sent us a partial message. We can update the peer's view of our parts + // by merging our parts and their parts. + // + // This works if they are responding to our request or + // if they send data eagerly. In the latter case, they will update our + // view when they receive our parts metadata. + pState.sentPartsMetadata = e.MergePartsMetadata(rpc.GetTopicID(), pState.sentPartsMetadata, pState.partsMetadata) + } + return e.OnIncomingRPC(from, rpc) } diff --git a/partialmessages/partialmsgs_test.go b/partialmessages/partialmsgs_test.go index 9a552e88..7b3f7815 100644 --- a/partialmessages/partialmsgs_test.go +++ b/partialmessages/partialmsgs_test.go @@ -128,7 +128,7 @@ type testPartialMessage struct { Parts [testPartialMessageLeaves][]byte Proofs [testPartialMessageLeaves][]merkle.ProofStep - republish func(*testPartialMessage, []byte) + republish func(partial *testPartialMessage, newParts PartsMetadata) onErr func(error) } @@ -263,7 +263,7 @@ func (t *testPartialMessageChecker) MergePartsMetadata(left, right PartsMetadata // EmptyMessage implements InvariantChecker. func (t *testPartialMessageChecker) EmptyMessage() *testPartialMessage { - return &testPartialMessage{Commitment: t.fullMessage.Commitment, republish: func(pm *testPartialMessage, _ []byte) {}} + return &testPartialMessage{Commitment: t.fullMessage.Commitment, republish: func(pm *testPartialMessage, _ PartsMetadata) {}} } // Equal implements InvariantChecker. @@ -337,7 +337,7 @@ func TestExamplePartialMessageImpl(t *testing.T) { if err != nil { t.Fatal(err) } - full.republish = func(pm *testPartialMessage, _ []byte) {} + full.republish = func(pm *testPartialMessage, _ PartsMetadata) {} invariant := &testPartialMessageChecker{ fullMessage: full, @@ -433,20 +433,21 @@ func createPeers(t *testing.T, topic string, n int, nonMesh bool) *testPeers { if pm == nil { pm = &testPartialMessage{ Commitment: groupID, - republish: func(pm *testPartialMessage, _ []byte) { + republish: func(pm *testPartialMessage, _ PartsMetadata) { handlers[i].PublishPartial(topic, pm, PublishOptions{}) }, } testPeers.partialMessages[currentPeer][topic][groupKey] = pm } + beforeParts := pm.PartsMetadata() // Extend the partial message with the incoming data recvdNewData := pm.extendFromEncodedPartialMessage(from, rpc.PartialMessage) if recvdNewData { - // Publish to all peers our new data. - // We'll request and fulfill any - handler.PublishPartial(topic, pm, PublishOptions{}) + newParts := bitmap.Bitmap(pm.PartsMetadata()) + newParts.Xor(bitmap.Bitmap(beforeParts)) + pm.republish(pm, PartsMetadata(newParts)) return nil } @@ -505,7 +506,7 @@ func (tp *testPeers) getOrCreatePartialMessage(peerIndex int, topic string, grou handler := tp.handlers[peerIndex] pm = &testPartialMessage{ Commitment: groupID, - republish: func(pm *testPartialMessage, _ []byte) { + republish: func(pm *testPartialMessage, _ PartsMetadata) { handler.PublishPartial(topic, pm, PublishOptions{}) }, } @@ -558,7 +559,7 @@ func newFullTestMessage(r io.Reader, ext *PartialMessagesExtension, topic string for i := range out.Parts { out.Proofs[i] = merkle.MerkleProof(out.Parts[:], i) } - out.republish = func(pm *testPartialMessage, _ []byte) { + out.republish = func(pm *testPartialMessage, _ PartsMetadata) { ext.PublishPartial(topic, pm, PublishOptions{}) } return out, nil @@ -567,7 +568,7 @@ func newFullTestMessage(r io.Reader, ext *PartialMessagesExtension, topic string func newEmptyTestMessage(commitment []byte, ext *PartialMessagesExtension, topic string) *testPartialMessage { return &testPartialMessage{ Commitment: commitment, - republish: func(pm *testPartialMessage, _ []byte) { + republish: func(pm *testPartialMessage, _ PartsMetadata) { ext.PublishPartial(topic, pm, PublishOptions{}) }, } @@ -619,16 +620,14 @@ func TestPartialMessages(t *testing.T) { t.Fatal(err) } - msgBytes, err := h1Msg.PartialMessageBytes(nil) + // h1 knows the full message and eager pushes + err = peers.handlers[0].PublishPartial(topic, h1Msg, PublishOptions{ + EagerPushWithPartsMetadata: []byte{0x00}, // All parts + }) if err != nil { t.Fatal(err) } - // h1 knows the full message and eager pushes - peers.handlers[0].PublishPartial(topic, h1Msg, PublishOptions{ - EagerPush: msgBytes, - }) - // h2 will receive partial message data through OnIncomingRPC // We can access it through our tracking system lastPartialMessageh2 := peers.getOrCreatePartialMessage(1, topic, h1Msg.Commitment) @@ -646,14 +645,10 @@ func TestPartialMessages(t *testing.T) { if err != nil { t.Fatal(err) } - msgBytes, err = h2Msg.PartialMessageBytes(nil) - if err != nil { - t.Fatal(err) - } // h2 knows the full message and eager pushes peers.handlers[1].PublishPartial(topic, h2Msg, PublishOptions{ - EagerPush: msgBytes, + EagerPushWithPartsMetadata: []byte{0x00}, // All parts h2 has }) // h1 will receive partial message data through OnIncomingRPC @@ -701,7 +696,6 @@ func TestPartialMessages(t *testing.T) { peers := createPeers(t, topic, 2, true) peers.network.addPeers() defer peers.network.removePeers() - slog.Debug("starting") h1Msg, err := newFullTestMessage(rand, peers.handlers[0], topic) if err != nil { @@ -767,8 +761,8 @@ func TestPartialMessages(t *testing.T) { count++ } } - if count != 2 { - t.Fatal("h2 should only have sent two parts updates") + if count != 1 { + t.Fatal("h2 should only have sent one part updates") } // Assert h2 has the full message @@ -827,8 +821,8 @@ func TestPartialMessages(t *testing.T) { count++ } } - if count != 2 { - t.Fatal("h2 should only have sent two parts updates") + if count != 1 { + t.Fatal("h2 should only have sent one part update") } // Assert h2 has the full message @@ -968,26 +962,26 @@ func TestPartialMessages(t *testing.T) { h1Msg.Parts[i] = fullMsg.Parts[i] h1Msg.Proofs[i] = fullMsg.Proofs[i] } - h1MsgEncoded, err := h1Msg.PartialMessageBytes(nil) - if err != nil { - t.Fatal(err) - } // Peer 2 has no parts h2Msg := newEmptyTestMessage(fullMsg.Commitment, peers.handlers[1], topic) // Eagerly push new data to peers - h2Msg.republish = func(pm *testPartialMessage, newData []byte) { - peers.handlers[1].PublishPartial(topic, pm, PublishOptions{EagerPush: newData}) + h2Msg.republish = func(pm *testPartialMessage, newParts PartsMetadata) { + // Flip because we want to select these parts for the eager push + bitmap.Bitmap(newParts).Flip() + peers.handlers[1].PublishPartial(topic, pm, PublishOptions{EagerPushWithPartsMetadata: newParts}) } // Peer 3 has no parts h3Msg := newEmptyTestMessage(fullMsg.Commitment, peers.handlers[2], topic) // Eagerly push new data to peers - h3Msg.republish = func(pm *testPartialMessage, newData []byte) { - peers.handlers[2].PublishPartial(topic, pm, PublishOptions{EagerPush: newData}) + h3Msg.republish = func(pm *testPartialMessage, newParts PartsMetadata) { + // Flip because we want to select these parts for the eager push + bitmap.Bitmap(newParts).Flip() + peers.handlers[2].PublishPartial(topic, pm, PublishOptions{EagerPushWithPartsMetadata: newParts}) } // All peers publish their partial messages - peers.handlers[0].PublishPartial(topic, h1Msg, PublishOptions{EagerPush: h1MsgEncoded}) + peers.handlers[0].PublishPartial(topic, h1Msg, PublishOptions{EagerPushWithPartsMetadata: []byte{0x00}}) peers.registerMessage(0, topic, h1Msg) peers.handlers[1].PublishPartial(topic, h2Msg, PublishOptions{}) peers.registerMessage(1, topic, h2Msg)