Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ var (
GossipSubIWantFollowupTime = 3 * time.Second
GossipSubIDontWantMessageThreshold = 1024 // 1KB
GossipSubIDontWantMessageTTL = 3 // 3 heartbeats

GossipSubMaxIWantsPerMessageIDPerHeartbeat = 10
)

type checksum struct {
Expand Down Expand Up @@ -239,6 +241,11 @@ type GossipSubParams struct {

// IDONTWANT is cleared when it's older than the TTL.
IDontWantMessageTTL int

// MaxIWantsPerMessageIDPerHeartbeat is the maximum number of pending IWANT
// requests allowed per message ID per heartbeat. This helps limit the
// number of duplicates we'll receive from peers.
MaxIWantsPerMessageIDPerHeartbeat int
}

// NewGossipSub returns a new PubSub object using the default GossipSubRouter as the router.
Expand Down Expand Up @@ -276,6 +283,10 @@ func DefaultGossipSubRouter(h host.Host) *GossipSubRouter {
feature: GossipSubDefaultFeatures,
tagTracer: newTagTracer(h.ConnManager()),
params: params,
// number of allowed IWANTs per message ID. If the message ID is
// missing, it must be initialized to
// `MaxIWantsPerMessageIDPerHeartbeat`
allowedIWantCount: make(map[string]int),
}
}

Expand Down Expand Up @@ -315,6 +326,8 @@ func DefaultGossipSubParams() GossipSubParams {
IDontWantMessageThreshold: GossipSubIDontWantMessageThreshold,
IDontWantMessageTTL: GossipSubIDontWantMessageTTL,
SlowHeartbeatWarning: 0.1,

MaxIWantsPerMessageIDPerHeartbeat: GossipSubMaxIWantsPerMessageIDPerHeartbeat,
}
}

Expand Down Expand Up @@ -479,6 +492,10 @@ type GossipSubRouter struct {
connect chan connectInfo // px connection requests
cab peerstore.AddrBook

// allowed number of IWANTs per message ID. Must be initialized to
// `MaxIWantsPerMessageIDPerHeartbeat` if message id is missing.
allowedIWantCount map[string]int

protos []protocol.ID
feature GossipSubFeatureTest

Expand Down Expand Up @@ -713,11 +730,13 @@ func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus {
func (gs *GossipSubRouter) Preprocess(from peer.ID, msgs []*Message) {
tmids := make(map[string][]string)
for _, msg := range msgs {
mid := gs.p.idGen.ID(msg)
delete(gs.allowedIWantCount, mid)
if len(msg.GetData()) < gs.params.IDontWantMessageThreshold {
continue
}
topic := msg.GetTopic()
tmids[topic] = append(tmids[topic], gs.p.idGen.ID(msg))
tmids[topic] = append(tmids[topic], mid)
}
for topic, mids := range tmids {
if len(mids) == 0 {
Expand Down Expand Up @@ -803,7 +822,20 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
if gs.p.seenMessage(mid) {
continue
}

allowedIWants, ok := gs.allowedIWantCount[mid]
if !ok {
allowedIWants = gs.params.MaxIWantsPerMessageIDPerHeartbeat
}

// Check if we've exceeded the maximum number of pending IWANTs for this message
if allowedIWants <= 0 {
log.Debugf("IHAVE: ignoring IHAVE for message %s from peer %s; too many inflight IWANTs", mid, p)
continue
}

iwant[mid] = struct{}{}
gs.allowedIWantCount[mid] = allowedIWants - 1
}
}

Expand Down Expand Up @@ -1453,6 +1485,9 @@ func (gs *GossipSubRouter) heartbeat() {
toprune := make(map[peer.ID][]string)
noPX := make(map[peer.ID]bool)

// reset number of allowed IWANT requests
gs.resetAllowedIWants()

// clean up expired backoffs
gs.clearBackoff()

Expand Down Expand Up @@ -1702,6 +1737,13 @@ func (gs *GossipSubRouter) heartbeat() {
gs.mcache.Shift()
}

func (gs *GossipSubRouter) resetAllowedIWants() {
if len(gs.allowedIWantCount) > 0 {
// throw away the old map and make a new one
gs.allowedIWantCount = make(map[string]int)
}
}

func (gs *GossipSubRouter) clearIHaveCounters() {
if len(gs.peerhave) > 0 {
// throw away the old map and make a new one
Expand Down
91 changes: 91 additions & 0 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3860,3 +3860,94 @@ func BenchmarkSplitRPCLargeMessages(b *testing.B) {
}
})
}

func TestGossipsubLimitIWANT(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 3)
denseConnect(t, hosts)

psubs := make([]*PubSub, 3)

defaultParams := DefaultGossipSubParams()
defaultParams.MaxIWantsPerMessageIDPerHeartbeat = 1
// Delay the heartbeat so we don't reset our count of allowed IWANTS for the
// first part of the test.
defaultParams.HeartbeatInitialDelay = 4 * time.Second
defaultParams.HeartbeatInterval = time.Second

psubs[0] = getGossipsub(ctx, hosts[0], WithGossipSubParams(defaultParams))

var iwantsRecvd atomic.Int32

copy(psubs[1:], getGossipsubs(ctx, hosts[1:], WithRawTracer(&mockRawTracer{
onRecvRPC: func(rpc *RPC) {
if len(rpc.GetControl().GetIwant()) > 0 {
iwantsRecvd.Add(1)
}
},
})))

topicString := "foobar"
for _, ps := range psubs {
topic, err := ps.Join(topicString)
if err != nil {
t.Fatal(err)
}

_, err = topic.Subscribe()
if err != nil {
t.Fatal(err)
}
}
time.Sleep(2 * time.Second)

publishIWant := func() {
psubs[1].eval <- func() {
psubs[1].rt.(*GossipSubRouter).sendRPC(hosts[0].ID(), &RPC{
RPC: pb.RPC{
Control: &pb.ControlMessage{
Ihave: []*pb.ControlIHave{
{
TopicID: &topicString,
MessageIDs: []string{"1"},
},
},
},
},
}, false)
}

psubs[2].eval <- func() {
psubs[2].rt.(*GossipSubRouter).sendRPC(hosts[0].ID(), &RPC{
RPC: pb.RPC{
Control: &pb.ControlMessage{
Ihave: []*pb.ControlIHave{
{
TopicID: &topicString,
MessageIDs: []string{"1"},
},
},
},
},
}, false)
}
}

publishIWant()
time.Sleep(time.Second)

if iwantsRecvd.Swap(0) != 1 {
t.Fatal("Expected exactly 1 IWANT due to limits")
}

// Wait for heartbeat to reset limit
time.Sleep(2 * time.Second)

publishIWant()
time.Sleep(time.Second)

if iwantsRecvd.Swap(0) != 1 {
t.Fatal("Expected exactly 1 IWANT due to limits")
}
}