diff --git a/gossipsub.go b/gossipsub.go index 68bef4a5..68bc5e18 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -74,6 +74,8 @@ var ( GossipSubIWantFollowupTime = 3 * time.Second GossipSubIDontWantMessageThreshold = 1024 // 1KB GossipSubIDontWantMessageTTL = 3 // 3 heartbeats + + GossipSubMaxIWantsPerMessageIDPerHeartbeat = 10 ) type checksum struct { @@ -239,6 +241,11 @@ type GossipSubParams struct { // IDONTWANT is cleared when it's older than the TTL. IDontWantMessageTTL int + + // MaxIWantsPerMessageIDPerHeartbeat is the maximum number of pending IWANT + // requests allowed per message ID per heartbeat. This helps limit the + // number of duplicates we'll receive from peers. + MaxIWantsPerMessageIDPerHeartbeat int } // NewGossipSub returns a new PubSub object using the default GossipSubRouter as the router. @@ -276,6 +283,10 @@ func DefaultGossipSubRouter(h host.Host) *GossipSubRouter { feature: GossipSubDefaultFeatures, tagTracer: newTagTracer(h.ConnManager()), params: params, + // number of allowed IWANTs per message ID. If the message ID is + // missing, it must be initialized to + // `MaxIWantsPerMessageIDPerHeartbeat` + allowedIWantCount: make(map[string]int), } } @@ -315,6 +326,8 @@ func DefaultGossipSubParams() GossipSubParams { IDontWantMessageThreshold: GossipSubIDontWantMessageThreshold, IDontWantMessageTTL: GossipSubIDontWantMessageTTL, SlowHeartbeatWarning: 0.1, + + MaxIWantsPerMessageIDPerHeartbeat: GossipSubMaxIWantsPerMessageIDPerHeartbeat, } } @@ -479,6 +492,10 @@ type GossipSubRouter struct { connect chan connectInfo // px connection requests cab peerstore.AddrBook + // allowed number of IWANTs per message ID. Must be initialized to + // `MaxIWantsPerMessageIDPerHeartbeat` if message id is missing. + allowedIWantCount map[string]int + protos []protocol.ID feature GossipSubFeatureTest @@ -713,11 +730,13 @@ func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus { func (gs *GossipSubRouter) Preprocess(from peer.ID, msgs []*Message) { tmids := make(map[string][]string) for _, msg := range msgs { + mid := gs.p.idGen.ID(msg) + delete(gs.allowedIWantCount, mid) if len(msg.GetData()) < gs.params.IDontWantMessageThreshold { continue } topic := msg.GetTopic() - tmids[topic] = append(tmids[topic], gs.p.idGen.ID(msg)) + tmids[topic] = append(tmids[topic], mid) } for topic, mids := range tmids { if len(mids) == 0 { @@ -803,7 +822,20 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb. if gs.p.seenMessage(mid) { continue } + + allowedIWants, ok := gs.allowedIWantCount[mid] + if !ok { + allowedIWants = gs.params.MaxIWantsPerMessageIDPerHeartbeat + } + + // Check if we've exceeded the maximum number of pending IWANTs for this message + if allowedIWants <= 0 { + log.Debugf("IHAVE: ignoring IHAVE for message %s from peer %s; too many inflight IWANTs", mid, p) + continue + } + iwant[mid] = struct{}{} + gs.allowedIWantCount[mid] = allowedIWants - 1 } } @@ -1453,6 +1485,9 @@ func (gs *GossipSubRouter) heartbeat() { toprune := make(map[peer.ID][]string) noPX := make(map[peer.ID]bool) + // reset number of allowed IWANT requests + gs.resetAllowedIWants() + // clean up expired backoffs gs.clearBackoff() @@ -1702,6 +1737,13 @@ func (gs *GossipSubRouter) heartbeat() { gs.mcache.Shift() } +func (gs *GossipSubRouter) resetAllowedIWants() { + if len(gs.allowedIWantCount) > 0 { + // throw away the old map and make a new one + gs.allowedIWantCount = make(map[string]int) + } +} + func (gs *GossipSubRouter) clearIHaveCounters() { if len(gs.peerhave) > 0 { // throw away the old map and make a new one diff --git a/gossipsub_test.go b/gossipsub_test.go index 9f450d87..ca87e454 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -3860,3 +3860,94 @@ func BenchmarkSplitRPCLargeMessages(b *testing.B) { } }) } + +func TestGossipsubLimitIWANT(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 3) + denseConnect(t, hosts) + + psubs := make([]*PubSub, 3) + + defaultParams := DefaultGossipSubParams() + defaultParams.MaxIWantsPerMessageIDPerHeartbeat = 1 + // Delay the heartbeat so we don't reset our count of allowed IWANTS for the + // first part of the test. + defaultParams.HeartbeatInitialDelay = 4 * time.Second + defaultParams.HeartbeatInterval = time.Second + + psubs[0] = getGossipsub(ctx, hosts[0], WithGossipSubParams(defaultParams)) + + var iwantsRecvd atomic.Int32 + + copy(psubs[1:], getGossipsubs(ctx, hosts[1:], WithRawTracer(&mockRawTracer{ + onRecvRPC: func(rpc *RPC) { + if len(rpc.GetControl().GetIwant()) > 0 { + iwantsRecvd.Add(1) + } + }, + }))) + + topicString := "foobar" + for _, ps := range psubs { + topic, err := ps.Join(topicString) + if err != nil { + t.Fatal(err) + } + + _, err = topic.Subscribe() + if err != nil { + t.Fatal(err) + } + } + time.Sleep(2 * time.Second) + + publishIWant := func() { + psubs[1].eval <- func() { + psubs[1].rt.(*GossipSubRouter).sendRPC(hosts[0].ID(), &RPC{ + RPC: pb.RPC{ + Control: &pb.ControlMessage{ + Ihave: []*pb.ControlIHave{ + { + TopicID: &topicString, + MessageIDs: []string{"1"}, + }, + }, + }, + }, + }, false) + } + + psubs[2].eval <- func() { + psubs[2].rt.(*GossipSubRouter).sendRPC(hosts[0].ID(), &RPC{ + RPC: pb.RPC{ + Control: &pb.ControlMessage{ + Ihave: []*pb.ControlIHave{ + { + TopicID: &topicString, + MessageIDs: []string{"1"}, + }, + }, + }, + }, + }, false) + } + } + + publishIWant() + time.Sleep(time.Second) + + if iwantsRecvd.Swap(0) != 1 { + t.Fatal("Expected exactly 1 IWANT due to limits") + } + + // Wait for heartbeat to reset limit + time.Sleep(2 * time.Second) + + publishIWant() + time.Sleep(time.Second) + + if iwantsRecvd.Swap(0) != 1 { + t.Fatal("Expected exactly 1 IWANT due to limits") + } +}