From 4a18aa327d5080d60cda18072d29e0808f08b939 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 29 Sep 2023 12:13:45 +0530 Subject: [PATCH 01/13] Update peer selection options for lightPush --- waku/v2/peermanager/peer_manager.go | 14 +++++++ waku/v2/protocol/lightpush/waku_lightpush.go | 42 +++++++++++++------ .../lightpush/waku_lightpush_option.go | 35 +++++----------- waku/v2/utils/peer.go | 20 +++++++++ 4 files changed, 75 insertions(+), 36 deletions(-) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 6ded3fa27..967dfb944 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -483,3 +483,17 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, } return } + +func (pm *PeerManager) HandlePeerSelection(selectionType utils.PeerSelection, proto protocol.ID, + pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) { + + switch selectionType { + case utils.Automatic: + return pm.SelectPeer(proto, pubSubTopic, specificPeers...) + case utils.LowestRTT: + //TODO: Move this to peer-manager + return utils.SelectPeerWithLowestRTT(context.Background(), pm.host, proto, specificPeers, pm.logger) + default: + return "", errors.New("unknown peer selection type specified") + } +} diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 8570ce818..1eef2f92c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -17,6 +17,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -150,11 +151,6 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p return nil, errors.New("lightpush params are mandatory") } - if params.selectedPeer == "" { - wakuLP.metrics.RecordError(peerNotFoundFailure) - return nil, ErrNoPeersAvailable - } - if len(params.requestID) == 0 { return nil, ErrInvalidID } @@ -210,16 +206,12 @@ func (wakuLP *WakuLightPush) Stop() { wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) } -// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol -// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding. -func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { - if message == nil { - return nil, errors.New("message can't be null") - } +func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMessage, opts ...Option) (*lightPushParameters, error) { params := new(lightPushParameters) params.host = wakuLP.h params.log = wakuLP.log params.pm = wakuLP.pm + var err error optList := append(DefaultOptions(wakuLP.h), opts...) for _, opt := range optList { @@ -227,12 +219,38 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa } if params.pubsubTopic == "" { - var err error params.pubsubTopic, err = protocol.GetPubSubTopicFromContentTopic(message.ContentTopic) if err != nil { return nil, err } } + + //This condition is hacky and will be fixed later, + // once RTT based monitoring is also available in peer-manager + if params.pm == nil || params.peerSelectionType == utils.LowestRTT { + params.selectedPeer, err = utils.HandlePeerSelection(params.peerSelectionType, params.host, LightPushID_v20beta1, params.preferredPeers, params.log) + } else { + params.selectedPeer, err = wakuLP.pm.HandlePeerSelection(params.peerSelectionType, LightPushID_v20beta1, params.pubsubTopic, params.preferredPeers...) + } + if err != nil { + params.log.Error("selecting peer", zap.Error(err)) + wakuLP.metrics.RecordError(peerNotFoundFailure) + return nil, ErrNoPeersAvailable + } + return params, nil +} + +// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol +// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding. +func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { + if message == nil { + return nil, errors.New("message can't be null") + } + + params, err := wakuLP.handleOpts(ctx, message, opts...) + if err != nil { + return nil, err + } req := new(pb.PushRequest) req.Message = message req.PubsubTopic = params.pubsubTopic diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index f0d496d62..8d4263892 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -12,12 +12,14 @@ import ( ) type lightPushParameters struct { - host host.Host - selectedPeer peer.ID - requestID []byte - pm *peermanager.PeerManager - log *zap.Logger - pubsubTopic string + host host.Host + selectedPeer peer.ID + requestID []byte + pm *peermanager.PeerManager + log *zap.Logger + pubsubTopic string + peerSelectionType utils.PeerSelection + preferredPeers peer.IDSlice } // Option is the type of options accepted when performing LightPush protocol requests @@ -36,18 +38,8 @@ func WithPeer(p peer.ID) Option { // from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { return func(params *lightPushParameters) { - var p peer.ID - var err error - if params.pm == nil { - p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log) - } else { - p, err = params.pm.SelectPeer(LightPushID_v20beta1, "", fromThesePeers...) - } - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = utils.Automatic + params.preferredPeers = fromThesePeers } } @@ -63,12 +55,7 @@ func WithPubSubTopic(pubsubTopic string) Option { // from the node peerstore func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Option { return func(params *lightPushParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, LightPushID_v20beta1, fromThesePeers, params.log) - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = utils.LowestRTT } } diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index e2173b307..5dae987e4 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -15,6 +15,14 @@ import ( "go.uber.org/zap" ) +type PeerSelection int + +const ( + Unknown PeerSelection = iota + Automatic + LowestRTT +) + // ErrNoPeersAvailable is emitted when no suitable peers are found for // some protocol var ErrNoPeersAvailable = errors.New("no suitable peers found") @@ -161,3 +169,15 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolID pro return "", ErrNoPeersAvailable } } + +func HandlePeerSelection(selectionType PeerSelection, host host.Host, proto protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) { + switch selectionType { + case Automatic: + return SelectPeer(host, proto, specificPeers, log) + case LowestRTT: + //TODO: Move this to peer-manager + return SelectPeerWithLowestRTT(context.Background(), host, proto, specificPeers, log) + default: + return "", errors.New("unknown peer selection type specified") + } +} From f1cfc481a0aa83a4d97c32e206cb4da590b130c2 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 29 Sep 2023 14:39:35 +0530 Subject: [PATCH 02/13] Update peer selection options for filter --- waku/v2/protocol/filter/client.go | 26 +++++++++++++---- waku/v2/protocol/filter/options.go | 29 +++++-------------- .../lightpush/waku_lightpush_option.go | 4 +-- 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index b36f05671..60df00ac4 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -23,6 +23,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -266,11 +267,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot opt(params) } - if params.selectedPeer == "" { - wf.metrics.RecordError(peerNotFoundFailure) - return nil, ErrNoPeersAvailable - } - pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter) if err != nil { return nil, err @@ -278,14 +274,34 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot failedContentTopics := []string{} subscriptions := make([]*subscription.SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { + + //This condition is hacky and will be fixed later, + // once RTT based monitoring is also available in peer-manager + //TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic + if params.pm == nil || params.peerSelectionType == utils.LowestRTT { + params.selectedPeer, err = utils.HandlePeerSelection(params.peerSelectionType, params.host, FilterSubscribeID_v20beta1, params.preferredPeers, params.log) + } else { + params.selectedPeer, err = wf.pm.HandlePeerSelection(params.peerSelectionType, FilterSubscribeID_v20beta1, pubSubTopic, params.preferredPeers...) + } + + if params.selectedPeer == "" { + wf.metrics.RecordError(peerNotFoundFailure) + wf.log.Error("Failed to find peer for Filter subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) + continue + } + var cFilter protocol.ContentFilter cFilter.PubsubTopic = pubSubTopic cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...) + err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) if err != nil { wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Error(err)) failedContentTopics = append(failedContentTopics, cTopics...) + continue } subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter)) } diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 036165fbd..870b8133e 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -15,9 +15,11 @@ import ( type ( FilterSubscribeParameters struct { - selectedPeer peer.ID - requestID []byte - log *zap.Logger + selectedPeer peer.ID + peerSelectionType utils.PeerSelection + preferredPeers peer.IDSlice + requestID []byte + log *zap.Logger // Subscribe-specific host host.Host @@ -55,18 +57,8 @@ func WithPeer(p peer.ID) FilterSubscribeOption { // supports the chosen protocol, otherwise it will chose a peer from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - var p peer.ID - var err error - if params.pm == nil { - p, err = utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log) - } else { - p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, "", fromThesePeers...) - } - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = utils.Automatic + params.preferredPeers = fromThesePeers } } @@ -76,12 +68,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption // peer from the node peerstore func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log) - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = utils.LowestRTT } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 8d4263892..0078e3978 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -14,12 +14,12 @@ import ( type lightPushParameters struct { host host.Host selectedPeer peer.ID + peerSelectionType utils.PeerSelection + preferredPeers peer.IDSlice requestID []byte pm *peermanager.PeerManager log *zap.Logger pubsubTopic string - peerSelectionType utils.PeerSelection - preferredPeers peer.IDSlice } // Option is the type of options accepted when performing LightPush protocol requests From 6aa229b1ee34e52726ebf89be38d6ca493edbefa Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 3 Oct 2023 14:41:53 +0530 Subject: [PATCH 03/13] move to use peer selection functionality from peer manager --- waku/v2/node/wakunode2.go | 2 + waku/v2/node/wakunode2_test.go | 2 +- waku/v2/peermanager/peer_manager.go | 175 ++++++++++++++++-- waku/v2/peermanager/peer_manager_test.go | 35 ++-- waku/v2/peermanager/service_slot.go | 3 +- waku/v2/peermanager/service_slot_test.go | 7 +- waku/v2/protocol/filter/client.go | 10 +- waku/v2/protocol/filter/options.go | 7 +- waku/v2/protocol/legacy_filter/waku_filter.go | 7 +- .../legacy_filter/waku_filter_option.go | 32 ++-- waku/v2/protocol/lightpush/waku_lightpush.go | 22 +-- .../lightpush/waku_lightpush_option.go | 7 +- .../protocol/lightpush/waku_lightpush_test.go | 6 +- .../waku_peer_exchange_option.go | 35 ++-- .../peer_exchange/waku_peer_exchange_test.go | 2 +- waku/v2/protocol/store/waku_store_client.go | 44 ++--- .../store/waku_store_protocol_test.go | 12 +- waku/v2/utils/peer.go | 162 ---------------- waku/v2/utils/test/peer_test.go | 83 --------- 19 files changed, 274 insertions(+), 379 deletions(-) delete mode 100644 waku/v2/utils/test/peer_test.go diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 1035ed9fc..6e2fb1b48 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -283,6 +283,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } } + w.opts.legacyFilterOpts = append(w.opts.legacyFilterOpts, legacy_filter.WithPeerManager(w.peermanager)) + w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullNode, w.timesource, w.opts.prometheusReg, w.log, w.opts.legacyFilterOpts...) w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index cd5c17f84..303aeffff 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -263,7 +263,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { _, filter, err := wakuNode2.LegacyFilter().Subscribe(ctx, legacy_filter.ContentFilter{ Topic: string(relay.DefaultWakuTopic), - }) + }, legacy_filter.WithPeer(wakuNode1.host.ID())) require.NoError(t, err) // Sleep to make sure the filter is subscribed diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 967dfb944..0ce1176b0 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -3,6 +3,7 @@ package peermanager import ( "context" "errors" + "math/rand" "sync" "time" @@ -13,13 +14,13 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ma "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -45,6 +46,18 @@ type PeerManager struct { subRelayTopics map[string]*NodeTopicDetails } +type PeerSelection int + +const ( + Unknown PeerSelection = iota + Automatic + LowestRTT +) + +// ErrNoPeersAvailable is emitted when no suitable peers are found for +// some protocol +var ErrNoPeersAvailable = errors.New("no suitable peers found") + const peerConnectivityLoopSecs = 15 const maxConnsToPeerRatio = 5 @@ -161,10 +174,10 @@ func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers pee //Need to filter peers to check if they support relay if inPeers.Len() != 0 { - inRelayPeers, _ = utils.FilterPeersByProto(pm.host, inPeers, relay.WakuRelayID_v200) + inRelayPeers, _ = pm.FilterPeersByProto(inPeers, relay.WakuRelayID_v200) } if outPeers.Len() != 0 { - outRelayPeers, _ = utils.FilterPeersByProto(pm.host, outPeers, relay.WakuRelayID_v200) + outRelayPeers, _ = pm.FilterPeersByProto(outPeers, relay.WakuRelayID_v200) } return } @@ -426,7 +439,7 @@ func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic if err != nil { return "", err } - return pm.SelectPeer(proto, pubsubTopic, specificPeers...) + return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers}) } // SelectPeer is used to return a random peer that supports a given protocol. @@ -434,26 +447,26 @@ func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic // it supports the chosen protocol, otherwise it will chose a peer from the service slot. // If a peer cannot be found in the service slot, a peer will be selected from node peerstore // if pubSubTopic is specified, peer is selected from list that support the pubSubTopic -func (pm *PeerManager) SelectPeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) { +func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // Ideally depending on the query and our set of peers we take a subset of ideal peers. // This will require us to check for various factors such as: // - which topics they track // - latency? - if peerID := pm.selectServicePeer(proto, pubSubTopic, specificPeers...); peerID != nil { + if peerID := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.SpecificPeers...); peerID != nil { return *peerID, nil } // if not found in serviceSlots or proto == WakuRelayIDv200 - filteredPeers, err := utils.FilterPeersByProto(pm.host, specificPeers, proto) + filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) if err != nil { return "", err } - if pubSubTopic != "" { - filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, filteredPeers...) + if criteria.PubsubTopic != "" { + filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...) } - return utils.SelectRandomPeer(filteredPeers, pm.logger) + return selectRandomPeer(filteredPeers, pm.logger) } func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peerIDPtr *peer.ID) { @@ -473,7 +486,7 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, keys = append(keys, i) } selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, keys...) - peerID, err := utils.SelectRandomPeer(selectedPeers, pm.logger) + peerID, err := selectRandomPeer(selectedPeers, pm.logger) if err == nil { peerIDPtr = &peerID } else { @@ -484,16 +497,140 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, return } -func (pm *PeerManager) HandlePeerSelection(selectionType utils.PeerSelection, proto protocol.ID, - pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) { +type PeerSelectionCriteria struct { + SelectionType PeerSelection + Proto protocol.ID + PubsubTopic string + SpecificPeers peer.IDSlice + Ctx context.Context +} - switch selectionType { - case utils.Automatic: - return pm.SelectPeer(proto, pubSubTopic, specificPeers...) - case utils.LowestRTT: - //TODO: Move this to peer-manager - return utils.SelectPeerWithLowestRTT(context.Background(), pm.host, proto, specificPeers, pm.logger) +// HandlePeerSelection selects a peer based on selectionType specified. +// Context is required only in case of selectionType set to LowestRTT +func (pm *PeerManager) HandlePeerSelection(criteria PeerSelectionCriteria) (peer.ID, error) { + + switch criteria.SelectionType { + case Automatic: + return pm.SelectPeer(criteria) + case LowestRTT: + if criteria.Ctx == nil { + criteria.Ctx = context.Background() + pm.logger.Warn("context is not passed for peerSelectionwithRTT, using background context") + } + return pm.SelectPeerWithLowestRTT(criteria) default: return "", errors.New("unknown peer selection type specified") } } + +type pingResult struct { + p peer.ID + rtt time.Duration +} + +// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore +// TO OPTIMIZE: As of now the peer with lowest RTT is identified when select is called, this should be optimized +// to maintain the RTT as part of peer-scoring and just select based on that. +func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (peer.ID, error) { + var peers peer.IDSlice + if criteria.Ctx == nil { + criteria.Ctx = context.Background() + } + peerSet := criteria.SpecificPeers + if len(peerSet) == 0 { + peerSet = pm.host.Peerstore().Peers() + } + + for _, peer := range peerSet { + protocols, err := pm.host.Peerstore().SupportsProtocols(peer, criteria.Proto) + if err != nil { + return "", err + } + + if len(protocols) > 0 { + peers = append(peers, peer) + } + } + + wg := sync.WaitGroup{} + waitCh := make(chan struct{}) + pingCh := make(chan pingResult, 1000) + + wg.Add(len(peers)) + + go func() { + for _, p := range peers { + go func(p peer.ID) { + defer wg.Done() + ctx, cancel := context.WithTimeout(criteria.Ctx, 3*time.Second) + defer cancel() + result := <-ping.Ping(ctx, pm.host, p) + if result.Error == nil { + pingCh <- pingResult{ + p: p, + rtt: result.RTT, + } + } + }(p) + } + wg.Wait() + close(waitCh) + close(pingCh) + }() + + select { + case <-waitCh: + var min *pingResult + for p := range pingCh { + if min == nil { + min = &p + } else { + if p.rtt < min.rtt { + min = &p + } + } + } + if min == nil { + return "", ErrNoPeersAvailable + } + + return min.p, nil + case <-criteria.Ctx.Done(): + return "", ErrNoPeersAvailable + } +} + +// selectRandomPeer selects randomly a peer from the list of peers passed. +func selectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) { + if len(peers) >= 1 { + peerID := peers[rand.Intn(len(peers))] + // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned + return peerID, nil // nolint: gosec + } + + return "", ErrNoPeersAvailable +} + +// FilterPeersByProto filters list of peers that support specified protocols. +// If specificPeers is nil, all peers in the host's peerStore are considered for filtering. +func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { + peerSet := specificPeers + if len(peerSet) == 0 { + peerSet = pm.host.Peerstore().Peers() + } + + var peers peer.IDSlice + for _, peer := range peerSet { + protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...) + if err != nil { + return nil, err + } + + if len(protocols) > 0 { + peers = append(peers, peer) + } + } + return peers, nil +} diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index ce42ab432..6977fae0a 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -66,7 +66,7 @@ func TestServiceSlots(t *testing.T) { /////////////// // select peer from pm, currently only h2 is set in pm - peerID, err := pm.SelectPeer(protocol, "") + peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: protocol}) require.NoError(t, err) require.Equal(t, peerID, h2.ID()) @@ -75,7 +75,7 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) // check that returned peer is h2 or h3 peer - peerID, err = pm.SelectPeer(protocol, "") + peerID, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol}) require.NoError(t, err) if peerID == h2.ID() || peerID == h3.ID() { //Test success @@ -91,15 +91,15 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) defer h4.Close() - _, err = pm.SelectPeer(protocol1, "") - require.Error(t, err, utils.ErrNoPeersAvailable) + _, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol1}) + require.Error(t, err, ErrNoPeersAvailable) // add h4 peer for protocol1 _, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1)) require.NoError(t, err) //Test peer selection for protocol1 - peerID, err = pm.SelectPeer(protocol1, "") + peerID, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol1}) require.NoError(t, err) require.Equal(t, peerID, h4.ID()) @@ -127,19 +127,22 @@ func TestPeerSelection(t *testing.T) { _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) - _, err = pm.SelectPeer(protocol, "") + _, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol}) require.NoError(t, err) - peerID, err := pm.SelectPeer(protocol, "/waku/rs/2/2") + peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/2"}) require.NoError(t, err) require.Equal(t, h2.ID(), peerID) - _, err = pm.SelectPeer(protocol, "/waku/rs/2/3") - require.Error(t, utils.ErrNoPeersAvailable, err) + _, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/3"}) + require.Error(t, ErrNoPeersAvailable, err) - _, err = pm.SelectPeer(protocol, "/waku/rs/2/1") + _, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) require.NoError(t, err) + //Test for selectWithLowestRTT + pm.SelectPeerWithLowestRTT(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) + require.NoError(t, err) } func TestDefaultProtocol(t *testing.T) { @@ -149,8 +152,8 @@ func TestDefaultProtocol(t *testing.T) { // check peer for default protocol /////////////// //Test empty peer selection for relay protocol - _, err := pm.SelectPeer(relay.WakuRelayID_v200, "") - require.Error(t, err, utils.ErrNoPeersAvailable) + _, err := pm.SelectPeer(PeerSelectionCriteria{Proto: relay.WakuRelayID_v200}) + require.Error(t, err, ErrNoPeersAvailable) /////////////// // getting peer for default protocol @@ -164,7 +167,7 @@ func TestDefaultProtocol(t *testing.T) { require.NoError(t, err) // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. - peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, "") + peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: relay.WakuRelayID_v200}) require.NoError(t, err) require.Equal(t, peerID, h5.ID()) } @@ -184,13 +187,13 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2) require.NoError(t, err) - peerID, err := pm.SelectPeer(protocol2, "") + peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: protocol2}) require.NoError(t, err) require.Equal(t, peerID, h6.ID()) pm.RemovePeer(peerID) - _, err = pm.SelectPeer(protocol2, "") - require.Error(t, err, utils.ErrNoPeersAvailable) + _, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol2}) + require.Error(t, err, ErrNoPeersAvailable) } func TestConnectToRelayPeers(t *testing.T) { diff --git a/waku/v2/peermanager/service_slot.go b/waku/v2/peermanager/service_slot.go index c88711e76..df63b7ac2 100644 --- a/waku/v2/peermanager/service_slot.go +++ b/waku/v2/peermanager/service_slot.go @@ -6,7 +6,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/utils" ) type peerMap struct { @@ -26,7 +25,7 @@ func (pm *peerMap) getRandom() (peer.ID, error) { for pID := range pm.m { return pID, nil } - return "", utils.ErrNoPeersAvailable + return "", ErrNoPeersAvailable } diff --git a/waku/v2/peermanager/service_slot_test.go b/waku/v2/peermanager/service_slot_test.go index 381a14252..493685a71 100644 --- a/waku/v2/peermanager/service_slot_test.go +++ b/waku/v2/peermanager/service_slot_test.go @@ -6,7 +6,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/v2/utils" ) func TestServiceSlot(t *testing.T) { @@ -27,7 +26,7 @@ func TestServiceSlot(t *testing.T) { slots.getPeers(protocol).remove(peerID) // _, err = slots.getPeers(protocol).getRandom() - require.Equal(t, err, utils.ErrNoPeersAvailable) + require.Equal(t, err, ErrNoPeersAvailable) } func TestServiceSlotRemovePeerFromAll(t *testing.T) { @@ -50,7 +49,7 @@ func TestServiceSlotRemovePeerFromAll(t *testing.T) { slots.removePeer(peerID) // _, err = slots.getPeers(protocol).getRandom() - require.Equal(t, err, utils.ErrNoPeersAvailable) + require.Equal(t, err, ErrNoPeersAvailable) _, err = slots.getPeers(protocol1).getRandom() - require.Equal(t, err, utils.ErrNoPeersAvailable) + require.Equal(t, err, ErrNoPeersAvailable) } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 60df00ac4..f0e6d1354 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -23,7 +23,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" "github.com/waku-org/go-waku/waku/v2/timesource" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -275,13 +274,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot subscriptions := make([]*subscription.SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { - //This condition is hacky and will be fixed later, - // once RTT based monitoring is also available in peer-manager //TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic - if params.pm == nil || params.peerSelectionType == utils.LowestRTT { - params.selectedPeer, err = utils.HandlePeerSelection(params.peerSelectionType, params.host, FilterSubscribeID_v20beta1, params.preferredPeers, params.log) - } else { - params.selectedPeer, err = wf.pm.HandlePeerSelection(params.peerSelectionType, FilterSubscribeID_v20beta1, pubSubTopic, params.preferredPeers...) + if params.pm != nil { + params.selectedPeer, err = wf.pm.HandlePeerSelection(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, + Proto: FilterSubscribeID_v20beta1, PubsubTopic: pubSubTopic, SpecificPeers: params.preferredPeers, Ctx: ctx}) } if params.selectedPeer == "" { diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 870b8133e..aaf6d245d 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -9,14 +9,13 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) type ( FilterSubscribeParameters struct { selectedPeer peer.ID - peerSelectionType utils.PeerSelection + peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice requestID []byte log *zap.Logger @@ -57,7 +56,7 @@ func WithPeer(p peer.ID) FilterSubscribeOption { // supports the chosen protocol, otherwise it will chose a peer from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - params.peerSelectionType = utils.Automatic + params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers } } @@ -68,7 +67,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption // peer from the node peerstore func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - params.peerSelectionType = utils.LowestRTT + params.peerSelectionType = peermanager.LowestRTT } } diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index e28a0729b..000d7cdf3 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -13,6 +13,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -48,6 +49,7 @@ type ( WakuFilter struct { *protocol.CommonService h host.Host + pm *peermanager.PeerManager isFullNode bool msgSub relay.Subscription metrics Metrics @@ -237,7 +239,10 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil for _, opt := range optList { opt(params) } - + if wf.pm != nil { + params.selectedPeer, _ = wf.pm.HandlePeerSelection(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, + Proto: FilterID_v20beta1, PubsubTopic: filter.Topic, SpecificPeers: params.preferredPeers, Ctx: ctx}) + } if params.selectedPeer == "" { wf.metrics.RecordError(peerNotFoundFailure) return nil, ErrNoPeersAvailable diff --git a/waku/v2/protocol/legacy_filter/waku_filter_option.go b/waku/v2/protocol/legacy_filter/waku_filter_option.go index 8b548b64c..1fd67b628 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter_option.go +++ b/waku/v2/protocol/legacy_filter/waku_filter_option.go @@ -6,21 +6,24 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/waku-org/go-waku/waku/v2/peermanager" "go.uber.org/zap" ) type ( FilterSubscribeParameters struct { - host host.Host - selectedPeer peer.ID - log *zap.Logger + host host.Host + selectedPeer peer.ID + peerSelectionType peermanager.PeerSelection + preferredPeers peer.IDSlice + log *zap.Logger } FilterSubscribeOption func(*FilterSubscribeParameters) FilterParameters struct { Timeout time.Duration + pm *peermanager.PeerManager } Option func(*FilterParameters) @@ -32,6 +35,12 @@ func WithTimeout(timeout time.Duration) Option { } } +func WithPeerManager(pm *peermanager.PeerManager) Option { + return func(params *FilterParameters) { + params.pm = pm + } +} + func WithPeer(p peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { params.selectedPeer = p @@ -43,12 +52,8 @@ func WithPeer(p peer.ID) FilterSubscribeOption { // supports the chosen protocol, otherwise it will chose a peer from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeer(params.host, FilterID_v20beta1, fromThesePeers, params.log) - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.Automatic + params.preferredPeers = fromThesePeers } } @@ -58,12 +63,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption // peer from the node peerstore func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, FilterID_v20beta1, fromThesePeers, params.log) - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.LowestRTT } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 1eef2f92c..909ac999c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -17,7 +17,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -225,17 +224,16 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe } } - //This condition is hacky and will be fixed later, - // once RTT based monitoring is also available in peer-manager - if params.pm == nil || params.peerSelectionType == utils.LowestRTT { - params.selectedPeer, err = utils.HandlePeerSelection(params.peerSelectionType, params.host, LightPushID_v20beta1, params.preferredPeers, params.log) - } else { - params.selectedPeer, err = wakuLP.pm.HandlePeerSelection(params.peerSelectionType, LightPushID_v20beta1, params.pubsubTopic, params.preferredPeers...) - } - if err != nil { - params.log.Error("selecting peer", zap.Error(err)) - wakuLP.metrics.RecordError(peerNotFoundFailure) - return nil, ErrNoPeersAvailable + if params.pm != nil { + params.selectedPeer, err = wakuLP.pm.HandlePeerSelection( + peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, + Proto: LightPushID_v20beta1, PubsubTopic: params.pubsubTopic, + SpecificPeers: params.preferredPeers, Ctx: ctx}) + if err != nil { + params.log.Error("selecting peer", zap.Error(err)) + wakuLP.metrics.RecordError(peerNotFoundFailure) + return nil, ErrNoPeersAvailable + } } return params, nil } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 0078e3978..f239af654 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -7,14 +7,13 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) type lightPushParameters struct { host host.Host selectedPeer peer.ID - peerSelectionType utils.PeerSelection + peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice requestID []byte pm *peermanager.PeerManager @@ -38,7 +37,7 @@ func WithPeer(p peer.ID) Option { // from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { return func(params *lightPushParameters) { - params.peerSelectionType = utils.Automatic + params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers } } @@ -55,7 +54,7 @@ func WithPubSubTopic(pubsubTopic string) Option { // from the node peerstore func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Option { return func(params *lightPushParameters) { - params.peerSelectionType = utils.LowestRTT + params.peerSelectionType = peermanager.LowestRTT } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 5a9191949..e0bda40f8 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -106,6 +106,7 @@ func TestWakuLightPush(t *testing.T) { var lpOptions []Option lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) + lpOptions = append(lpOptions, WithPeer(host2.ID())) // Checking that msg hash is correct hash, err := client.PublishToTopic(ctx, msg2, lpOptions...) @@ -215,9 +216,10 @@ func TestWakuLightPushAutoSharding(t *testing.T) { <-sub2.Ch }() - + var lpOptions []Option + lpOptions = append(lpOptions, WithPeer(host2.ID())) // Verifying successful request - hash1, err := client.Publish(ctx, msg1) + hash1, err := client.Publish(ctx, msg1, lpOptions...) require.NoError(t, err) require.Equal(t, protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), string(pubSubTopic)).Hash(), hash1) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index 704e0d21b..844eb914f 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -6,7 +6,6 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/peermanager" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -26,23 +25,23 @@ func WithPeer(p peer.ID) PeerExchangeOption { } } -// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store +// WithAutomaticPeerSelection is an option used to randomly select a peer from the Waku peer store // to obtains peers from. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore +// Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { return func(params *PeerExchangeParameters) { - var p peer.ID - var err error if params.pm == nil { - p, err = utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log) + params.log.Info("automatic selection is not avaiable since peerManager is not initialized") } else { - p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, "", fromThesePeers...) - } - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) + p, err := params.pm.SelectPeer(peermanager.PeerSelectionCriteria{Proto: PeerExchangeID_v20alpha1, + SpecificPeers: fromThesePeers}) + if err == nil { + params.selectedPeer = p + } else { + params.log.Info("selecting peer", zap.Error(err)) + } } } } @@ -53,11 +52,17 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { // from the node peerstore func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) PeerExchangeOption { return func(params *PeerExchangeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log) - if err == nil { - params.selectedPeer = p + if params.pm == nil { + params.log.Info("automatic selection is not avaiable since peerManager is not initialized") } else { - params.log.Info("selecting peer", zap.Error(err)) + p, err := params.pm.SelectPeerWithLowestRTT( + peermanager.PeerSelectionCriteria{Proto: PeerExchangeID_v20alpha1, + SpecificPeers: fromThesePeers, Ctx: ctx}) + if err == nil { + params.selectedPeer = p + } else { + params.log.Info("selecting peer", zap.Error(err)) + } } } } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 93e4f0a1d..ca64b55f0 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -163,7 +163,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { err = host3.Peerstore().AddProtocols(host1.ID(), PeerExchangeID_v20alpha1) require.NoError(t, err) - err = px3.Request(context.Background(), 1) + err = px3.Request(context.Background(), 1, WithPeer(host1.ID())) require.NoError(t, err) time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 3855ff6a9..6575974e2 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -11,10 +11,10 @@ import ( "go.uber.org/zap" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" - "github.com/waku-org/go-waku/waku/v2/utils" ) type Query struct { @@ -81,12 +81,14 @@ func (r *Result) GetMessages() []*wpb.WakuMessage { type criteriaFN = func(msg *wpb.WakuMessage) (bool, error) type HistoryRequestParameters struct { - selectedPeer peer.ID - localQuery bool - requestID []byte - cursor *pb.Index - pageSize uint64 - asc bool + selectedPeer peer.ID + peerSelectionType peermanager.PeerSelection + preferredPeers peer.IDSlice + localQuery bool + requestID []byte + cursor *pb.Index + pageSize uint64 + asc bool s *WakuStore } @@ -104,20 +106,11 @@ func WithPeer(p peer.ID) HistoryRequestOption { // to request the message history. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore +// Note: This option is avaiable only with peerManager func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { return func(params *HistoryRequestParameters) { - var p peer.ID - var err error - if params.s.pm == nil { - p, err = utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log) - } else { - p, err = params.s.pm.SelectPeer(StoreID_v20beta4, "", fromThesePeers...) - } - if err == nil { - params.selectedPeer = p - } else { - params.s.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.Automatic + params.preferredPeers = fromThesePeers } } @@ -125,14 +118,10 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore +// Note: This option is avaiable only with peerManager func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log) - if err == nil { - params.selectedPeer = p - } else { - params.s.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.LowestRTT } } @@ -276,6 +265,11 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR opt(params) } + if store.pm != nil { + params.selectedPeer, _ = store.pm.HandlePeerSelection(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, + Proto: StoreID_v20beta4, PubsubTopic: query.Topic, SpecificPeers: params.preferredPeers, Ctx: ctx}) + } + if !params.localQuery && params.selectedPeer == "" { store.metrics.RecordError(peerNotFoundFailure) return nil, ErrNoPeersAvailable diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index f3490cfbe..a67ec7417 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -60,7 +60,9 @@ func TestWakuStoreProtocolQuery(t *testing.T) { ContentTopics: []string{topic1}, } - response, err := s2.Query(ctx, q, DefaultOptions()...) + var hrOptions []HistoryRequestOption + hrOptions = append(hrOptions, WithPeer(host1.ID())) + response, err := s2.Query(ctx, q, hrOptions...) require.NoError(t, err) require.Len(t, response.Messages, 1) @@ -155,7 +157,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { ContentTopics: []string{topic1}, } - response, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) + response, err := s2.Query(ctx, q, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.Len(t, response.Messages, 2) require.Equal(t, response.Messages[0].Timestamp, msg1.Timestamp) @@ -230,7 +232,7 @@ func TestWakuStoreResult(t *testing.T) { ContentTopics: []string{topic1}, } - result, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) + result, err := s2.Query(ctx, q, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.False(t, result.started) require.Len(t, result.GetMessages(), 0) @@ -332,7 +334,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { return msg.ContentTopic == "hello", nil } - foundMsg, err := s2.Find(ctx, q, fn, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) + foundMsg, err := s2.Find(ctx, q, fn, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.NotNil(t, foundMsg) require.Equal(t, "hello", foundMsg.ContentTopic) @@ -341,7 +343,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { return msg.ContentTopic == "bye", nil } - foundMsg, err = s2.Find(ctx, q, fn2, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) + foundMsg, err = s2.Find(ctx, q, fn2, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.Nil(t, foundMsg) } diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 5dae987e4..8321dc3e3 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -1,32 +1,10 @@ package utils import ( - "context" - "errors" - "math/rand" - "sync" - "time" - - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/multiformats/go-multiaddr" - "go.uber.org/zap" ) -type PeerSelection int - -const ( - Unknown PeerSelection = iota - Automatic - LowestRTT -) - -// ErrNoPeersAvailable is emitted when no suitable peers are found for -// some protocol -var ErrNoPeersAvailable = errors.New("no suitable peers found") - // GetPeerID is used to extract the peerID from a multiaddress func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P) @@ -41,143 +19,3 @@ func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { return peerID, nil } - -// FilterPeersByProto filters list of peers that support specified protocols. -// If specificPeers is nil, all peers in the host's peerStore are considered for filtering. -func FilterPeersByProto(host host.Host, specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { - peerSet := specificPeers - if len(peerSet) == 0 { - peerSet = host.Peerstore().Peers() - } - - var peers peer.IDSlice - for _, peer := range peerSet { - protocols, err := host.Peerstore().SupportsProtocols(peer, proto...) - if err != nil { - return nil, err - } - - if len(protocols) > 0 { - peers = append(peers, peer) - } - } - return peers, nil -} - -// SelectRandomPeer selects randomly a peer from the list of peers passed. -func SelectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) { - if len(peers) >= 1 { - peerID := peers[rand.Intn(len(peers))] - // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned - return peerID, nil // nolint: gosec - } - - return "", ErrNoPeersAvailable -} - -// SelectPeer is used to return a random peer that supports a given protocol. -// Note: Use this method only if WakuNode is not being initialized, otherwise use peermanager.SelectPeer. -// If a list of specific peers is passed, the peer will be chosen from that list assuming -// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore -func SelectPeer(host host.Host, protocolID protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) { - // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. - // Ideally depending on the query and our set of peers we take a subset of ideal peers. - // This will require us to check for various factors such as: - // - which topics they track - // - latency? - // - default store peer? - - peers, err := FilterPeersByProto(host, specificPeers, protocolID) - if err != nil { - return "", err - } - - return SelectRandomPeer(peers, log) -} - -type pingResult struct { - p peer.ID - rtt time.Duration -} - -// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time -// If a list of specific peers is passed, the peer will be chosen from that list assuming -// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore -func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolID protocol.ID, specificPeers []peer.ID, _ *zap.Logger) (peer.ID, error) { - var peers peer.IDSlice - - peerSet := specificPeers - if len(peerSet) == 0 { - peerSet = host.Peerstore().Peers() - } - - for _, peer := range peerSet { - protocols, err := host.Peerstore().SupportsProtocols(peer, protocolID) - if err != nil { - return "", err - } - - if len(protocols) > 0 { - peers = append(peers, peer) - } - } - - wg := sync.WaitGroup{} - waitCh := make(chan struct{}) - pingCh := make(chan pingResult, 1000) - - wg.Add(len(peers)) - - go func() { - for _, p := range peers { - go func(p peer.ID) { - defer wg.Done() - ctx, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() - result := <-ping.Ping(ctx, host, p) - if result.Error == nil { - pingCh <- pingResult{ - p: p, - rtt: result.RTT, - } - } - }(p) - } - wg.Wait() - close(waitCh) - close(pingCh) - }() - - select { - case <-waitCh: - var min *pingResult - for p := range pingCh { - if min == nil { - min = &p - } else { - if p.rtt < min.rtt { - min = &p - } - } - } - if min == nil { - return "", ErrNoPeersAvailable - } - - return min.p, nil - case <-ctx.Done(): - return "", ErrNoPeersAvailable - } -} - -func HandlePeerSelection(selectionType PeerSelection, host host.Host, proto protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) { - switch selectionType { - case Automatic: - return SelectPeer(host, proto, specificPeers, log) - case LowestRTT: - //TODO: Move this to peer-manager - return SelectPeerWithLowestRTT(context.Background(), host, proto, specificPeers, log) - default: - return "", errors.New("unknown peer selection type specified") - } -} diff --git a/waku/v2/utils/test/peer_test.go b/waku/v2/utils/test/peer_test.go deleted file mode 100644 index 44463c7d4..000000000 --- a/waku/v2/utils/test/peer_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package tests - -import ( - "context" - "crypto/rand" - "testing" - "time" - - "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/libp2p/go-libp2p/core/protocol" - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func TestSelectPeer(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - h1, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h1.Close() - - h2, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h2.Close() - - h3, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h3.Close() - - proto := protocol.ID("test/protocol") - - h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - - // No peers with selected protocol - _, err = utils.SelectPeer(h1, proto, nil, utils.Logger()) - require.Error(t, utils.ErrNoPeersAvailable, err) - - // Peers with selected protocol - _ = h1.Peerstore().AddProtocols(h2.ID(), proto) - _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - - _, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger()) - require.NoError(t, err) - -} - -func TestSelectPeerWithLowestRTT(t *testing.T) { - // help-wanted: how to slowdown the ping response to properly test the rtt - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - h1, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h1.Close() - - h2, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h2.Close() - - h3, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h3.Close() - - proto := protocol.ID("test/protocol") - - h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - - // No peers with selected protocol - _, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger()) - require.Error(t, utils.ErrNoPeersAvailable, err) - - // Peers with selected protocol - _ = h1.Peerstore().AddProtocols(h2.ID(), proto) - _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - - _, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger()) - require.NoError(t, err) -} From 6bb36380d96bf0f6a4781a314c65194cb642742b Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 3 Oct 2023 15:23:02 +0530 Subject: [PATCH 04/13] add comments --- waku/v2/peermanager/peer_manager.go | 19 +++++-------------- waku/v2/protocol/lightpush/waku_lightpush.go | 2 ++ 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 0ce1176b0..0ae8719d4 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -46,6 +46,7 @@ type PeerManager struct { subRelayTopics map[string]*NodeTopicDetails } +// PeerSelection provides various options based on which Peer is selected from a list of peers. type PeerSelection int const ( @@ -497,6 +498,7 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, return } +// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. type PeerSelectionCriteria struct { SelectionType PeerSelection Proto protocol.ID @@ -538,22 +540,11 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( if criteria.Ctx == nil { criteria.Ctx = context.Background() } - peerSet := criteria.SpecificPeers - if len(peerSet) == 0 { - peerSet = pm.host.Peerstore().Peers() - } - - for _, peer := range peerSet { - protocols, err := pm.host.Peerstore().SupportsProtocols(peer, criteria.Proto) - if err != nil { - return "", err - } - if len(protocols) > 0 { - peers = append(peers, peer) - } + peers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) + if err != nil { + return "", err } - wg := sync.WaitGroup{} waitCh := make(chan struct{}) pingCh := make(chan pingResult, 1000) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 909ac999c..f03cc8a7d 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -229,6 +229,8 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, Proto: LightPushID_v20beta1, PubsubTopic: params.pubsubTopic, SpecificPeers: params.preferredPeers, Ctx: ctx}) + } + if params.selectedPeer == "" { if err != nil { params.log.Error("selecting peer", zap.Error(err)) wakuLP.metrics.RecordError(peerNotFoundFailure) From 1f80dded51bab93343f9992338e319f153aced8f Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 3 Oct 2023 15:25:55 +0530 Subject: [PATCH 05/13] fix test lint issue --- waku/v2/peermanager/peer_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 6977fae0a..4a17a3b40 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -141,7 +141,7 @@ func TestPeerSelection(t *testing.T) { require.NoError(t, err) //Test for selectWithLowestRTT - pm.SelectPeerWithLowestRTT(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) + _, err = pm.SelectPeerWithLowestRTT(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) require.NoError(t, err) } From 330cf928c62655be88705a3027705cb3e0438331 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 4 Oct 2023 00:53:14 +0400 Subject: [PATCH 06/13] fix peer test failure --- waku/v2/peermanager/peer_manager.go | 12 +++++++++++- waku/v2/peermanager/peer_manager_test.go | 15 +++++++++++---- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 0ae8719d4..40e4ffb57 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -3,6 +3,7 @@ package peermanager import ( "context" "errors" + "fmt" "math/rand" "sync" "time" @@ -537,11 +538,16 @@ type pingResult struct { // to maintain the RTT as part of peer-scoring and just select based on that. func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (peer.ID, error) { var peers peer.IDSlice + var err error if criteria.Ctx == nil { criteria.Ctx = context.Background() } - peers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) + if criteria.PubsubTopic != "" { + peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...) + } + + peers, err = pm.FilterPeersByProto(peers, criteria.Proto) if err != nil { return "", err } @@ -563,6 +569,8 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( p: p, rtt: result.RTT, } + } else { + fmt.Println("Error in Ping", result) } }(p) } @@ -575,6 +583,7 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( case <-waitCh: var min *pingResult for p := range pingCh { + fmt.Println("ping result", p) if min == nil { min = &p } else { @@ -584,6 +593,7 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( } } if min == nil { + pm.logger.Info("Could not find min") return "", ErrNoPeersAvailable } diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 4a17a3b40..a23fdc623 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "fmt" + "strings" "testing" "time" @@ -20,16 +21,22 @@ import ( func getAddr(h host.Host) multiaddr.Multiaddr { id, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().Pretty())) - return h.Network().ListenAddresses()[0].Encapsulate(id) + var selectedAddr multiaddr.Multiaddr + //For now skipping circuit relay addresses as libp2p seems to be returning empty p2p-circuit addresses. + for _, addr := range h.Network().ListenAddresses() { + if strings.Contains(addr.String(), "p2p-circuit") { + continue + } + selectedAddr = addr + } + return selectedAddr.Encapsulate(id) } func initTest(t *testing.T) (context.Context, *PeerManager, func()) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) // hosts h1, err := tests.MakeHost(ctx, 0, rand.Reader) require.NoError(t, err) - defer h1.Close() // host 1 is used by peer manager pm := NewPeerManager(10, 20, utils.Logger()) From edf5a85469c39a47c37a18a6f0ed439c466ee8ee Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 4 Oct 2023 01:05:15 +0400 Subject: [PATCH 07/13] rename peer manager funcs --- waku/v2/peermanager/peer_manager.go | 10 +++---- waku/v2/peermanager/peer_manager_test.go | 28 +++++++++---------- waku/v2/protocol/filter/client.go | 2 +- waku/v2/protocol/legacy_filter/waku_filter.go | 2 +- waku/v2/protocol/lightpush/waku_lightpush.go | 2 +- waku/v2/protocol/store/waku_store_client.go | 2 +- 6 files changed, 23 insertions(+), 23 deletions(-) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 40e4ffb57..ab3242749 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -444,12 +444,12 @@ func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers}) } -// SelectPeer is used to return a random peer that supports a given protocol. +// SelectRandomPeer is used to return a random peer that supports a given protocol. // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol, otherwise it will chose a peer from the service slot. // If a peer cannot be found in the service slot, a peer will be selected from node peerstore // if pubSubTopic is specified, peer is selected from list that support the pubSubTopic -func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) { +func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // Ideally depending on the query and our set of peers we take a subset of ideal peers. // This will require us to check for various factors such as: @@ -508,13 +508,13 @@ type PeerSelectionCriteria struct { Ctx context.Context } -// HandlePeerSelection selects a peer based on selectionType specified. +// SelectPeer selects a peer based on selectionType specified. // Context is required only in case of selectionType set to LowestRTT -func (pm *PeerManager) HandlePeerSelection(criteria PeerSelectionCriteria) (peer.ID, error) { +func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) { switch criteria.SelectionType { case Automatic: - return pm.SelectPeer(criteria) + return pm.SelectRandomPeer(criteria) case LowestRTT: if criteria.Ctx == nil { criteria.Ctx = context.Background() diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index a23fdc623..385c6f0a7 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -33,7 +33,7 @@ func getAddr(h host.Host) multiaddr.Multiaddr { } func initTest(t *testing.T) (context.Context, *PeerManager, func()) { - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // hosts h1, err := tests.MakeHost(ctx, 0, rand.Reader) require.NoError(t, err) @@ -73,7 +73,7 @@ func TestServiceSlots(t *testing.T) { /////////////// // select peer from pm, currently only h2 is set in pm - peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: protocol}) + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) require.Equal(t, peerID, h2.ID()) @@ -82,7 +82,7 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) // check that returned peer is h2 or h3 peer - peerID, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol}) + peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) if peerID == h2.ID() || peerID == h3.ID() { //Test success @@ -98,7 +98,7 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) defer h4.Close() - _, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol1}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1}) require.Error(t, err, ErrNoPeersAvailable) // add h4 peer for protocol1 @@ -106,7 +106,7 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) //Test peer selection for protocol1 - peerID, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol1}) + peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1}) require.NoError(t, err) require.Equal(t, peerID, h4.ID()) @@ -134,21 +134,21 @@ func TestPeerSelection(t *testing.T) { _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) - _, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) - peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/2"}) + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/2"}) require.NoError(t, err) require.Equal(t, h2.ID(), peerID) - _, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/3"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/3"}) require.Error(t, ErrNoPeersAvailable, err) - _, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) require.NoError(t, err) //Test for selectWithLowestRTT - _, err = pm.SelectPeerWithLowestRTT(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) require.NoError(t, err) } @@ -159,7 +159,7 @@ func TestDefaultProtocol(t *testing.T) { // check peer for default protocol /////////////// //Test empty peer selection for relay protocol - _, err := pm.SelectPeer(PeerSelectionCriteria{Proto: relay.WakuRelayID_v200}) + _, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200}) require.Error(t, err, ErrNoPeersAvailable) /////////////// @@ -174,7 +174,7 @@ func TestDefaultProtocol(t *testing.T) { require.NoError(t, err) // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. - peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: relay.WakuRelayID_v200}) + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200}) require.NoError(t, err) require.Equal(t, peerID, h5.ID()) } @@ -194,12 +194,12 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2) require.NoError(t, err) - peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: protocol2}) + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) require.NoError(t, err) require.Equal(t, peerID, h6.ID()) pm.RemovePeer(peerID) - _, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol2}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) require.Error(t, err, ErrNoPeersAvailable) } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index f0e6d1354..e395007f0 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -276,7 +276,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot //TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic if params.pm != nil { - params.selectedPeer, err = wf.pm.HandlePeerSelection(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, + params.selectedPeer, err = wf.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, PubsubTopic: pubSubTopic, SpecificPeers: params.preferredPeers, Ctx: ctx}) } diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 000d7cdf3..2ce2939c3 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -240,7 +240,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil opt(params) } if wf.pm != nil { - params.selectedPeer, _ = wf.pm.HandlePeerSelection(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, + params.selectedPeer, _ = wf.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, Proto: FilterID_v20beta1, PubsubTopic: filter.Topic, SpecificPeers: params.preferredPeers, Ctx: ctx}) } if params.selectedPeer == "" { diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index f03cc8a7d..b9ade44c4 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -225,7 +225,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe } if params.pm != nil { - params.selectedPeer, err = wakuLP.pm.HandlePeerSelection( + params.selectedPeer, err = wakuLP.pm.SelectPeer( peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, Proto: LightPushID_v20beta1, PubsubTopic: params.pubsubTopic, SpecificPeers: params.preferredPeers, Ctx: ctx}) diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 6575974e2..0b359b6cc 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -266,7 +266,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR } if store.pm != nil { - params.selectedPeer, _ = store.pm.HandlePeerSelection(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, + params.selectedPeer, _ = store.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, Proto: StoreID_v20beta4, PubsubTopic: query.Topic, SpecificPeers: params.preferredPeers, Ctx: ctx}) } From 7e8b51ff808b12d05ffa8dd11da64c9d394dae37 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 12 Oct 2023 06:59:03 -0700 Subject: [PATCH 08/13] chore: fix new return for options --- waku/v2/protocol/filter/options.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 2c34eaa4b..dac050619 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -56,9 +56,10 @@ func WithPeer(p peer.ID) FilterSubscribeOption { // If a list of specific peers is passed, the peer will be chosen from that list assuming it // supports the chosen protocol, otherwise it will chose a peer from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { + return func(params *FilterSubscribeParameters) error { params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers + return nil } } @@ -67,8 +68,9 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption // from that list assuming it supports the chosen protocol, otherwise it will chose a // peer from the node peerstore func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { + return func(params *FilterSubscribeParameters) error { params.peerSelectionType = peermanager.LowestRTT + return nil } } From 0b6d93e455ddbfecf155b4b829dbc111b4c7d93f Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 12 Oct 2023 07:03:07 -0700 Subject: [PATCH 09/13] chore: fix log --- waku/v2/protocol/filter/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 268403506..21018932b 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -285,7 +285,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot if params.selectedPeer == "" { wf.metrics.RecordError(peerNotFoundFailure) - wf.log.Error("Failed to find peer for Filter subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Error(err)) failedContentTopics = append(failedContentTopics, cTopics...) continue From 48909845fec74106534fe7bdd8393a79d3a16669 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 12 Oct 2023 15:49:42 -0700 Subject: [PATCH 10/13] chore: address review comments --- waku/v2/peermanager/peer_manager.go | 8 ++------ waku/v2/protocol/filter/client.go | 11 ++++++++-- waku/v2/protocol/legacy_filter/waku_filter.go | 11 ++++++++-- .../waku_peer_exchange_option.go | 20 +++++++++++-------- .../store/waku_store_protocol_test.go | 4 +--- 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index ab3242749..a99b2cfc2 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -3,7 +3,6 @@ package peermanager import ( "context" "errors" - "fmt" "math/rand" "sync" "time" @@ -51,8 +50,7 @@ type PeerManager struct { type PeerSelection int const ( - Unknown PeerSelection = iota - Automatic + Automatic PeerSelection = iota LowestRTT ) @@ -570,7 +568,7 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( rtt: result.RTT, } } else { - fmt.Println("Error in Ping", result) + pm.logger.Debug("could not ping", logging.HostID("peer", p), zap.Error(result.Error)) } }(p) } @@ -583,7 +581,6 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( case <-waitCh: var min *pingResult for p := range pingCh { - fmt.Println("ping result", p) if min == nil { min = &p } else { @@ -593,7 +590,6 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( } } if min == nil { - pm.logger.Info("Could not find min") return "", ErrNoPeersAvailable } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 21018932b..e110880b5 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -279,8 +279,15 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot //TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic if params.pm != nil { - params.selectedPeer, err = wf.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, - Proto: FilterSubscribeID_v20beta1, PubsubTopic: pubSubTopic, SpecificPeers: params.preferredPeers, Ctx: ctx}) + params.selectedPeer, err = wf.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: FilterSubscribeID_v20beta1, + PubsubTopic: pubSubTopic, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) } if params.selectedPeer == "" { diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 2ce2939c3..1944692fe 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -240,8 +240,15 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil opt(params) } if wf.pm != nil { - params.selectedPeer, _ = wf.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, - Proto: FilterID_v20beta1, PubsubTopic: filter.Topic, SpecificPeers: params.preferredPeers, Ctx: ctx}) + params.selectedPeer, _ = wf.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: FilterID_v20beta1, + PubsubTopic: filter.Topic, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) } if params.selectedPeer == "" { wf.metrics.RecordError(peerNotFoundFailure) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index 844eb914f..a6164f07c 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -2,6 +2,7 @@ package peer_exchange import ( "context" + "errors" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -16,12 +17,13 @@ type PeerExchangeParameters struct { log *zap.Logger } -type PeerExchangeOption func(*PeerExchangeParameters) +type PeerExchangeOption func(*PeerExchangeParameters) error // WithPeer is an option used to specify the peerID to push a waku message to func WithPeer(p peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) { + return func(params *PeerExchangeParameters) error { params.selectedPeer = p + return nil } } @@ -31,17 +33,18 @@ func WithPeer(p peer.ID) PeerExchangeOption { // from the node peerstore // Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) { + return func(params *PeerExchangeParameters) error { if params.pm == nil { - params.log.Info("automatic selection is not avaiable since peerManager is not initialized") + return errors.New("automatic selection is not avaiable since peerManager is not initialized") } else { p, err := params.pm.SelectPeer(peermanager.PeerSelectionCriteria{Proto: PeerExchangeID_v20alpha1, SpecificPeers: fromThesePeers}) if err == nil { params.selectedPeer = p } else { - params.log.Info("selecting peer", zap.Error(err)) + return err } + return nil } } } @@ -51,9 +54,9 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) { + return func(params *PeerExchangeParameters) error { if params.pm == nil { - params.log.Info("automatic selection is not avaiable since peerManager is not initialized") + return errors.New("automatic selection is not avaiable since peerManager is not initialized") } else { p, err := params.pm.SelectPeerWithLowestRTT( peermanager.PeerSelectionCriteria{Proto: PeerExchangeID_v20alpha1, @@ -61,8 +64,9 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Pe if err == nil { params.selectedPeer = p } else { - params.log.Info("selecting peer", zap.Error(err)) + return err } + return nil } } } diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index a67ec7417..690861c9b 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -60,9 +60,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { ContentTopics: []string{topic1}, } - var hrOptions []HistoryRequestOption - hrOptions = append(hrOptions, WithPeer(host1.ID())) - response, err := s2.Query(ctx, q, hrOptions...) + response, err := s2.Query(ctx, q, WithPeer(host1.ID())) require.NoError(t, err) require.Len(t, response.Messages, 1) From 268c41e6d0ac7d1e5628922d52c8f873c74f5a19 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 12 Oct 2023 15:56:08 -0700 Subject: [PATCH 11/13] chore: fix lint error --- waku/v2/protocol/peer_exchange/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 1466b8452..f06035cfb 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -26,7 +26,10 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts optList := DefaultOptions(wakuPX.h) optList = append(optList, opts...) for _, opt := range optList { - opt(params) + err := opt(params) + if err != nil { + return err + } } if params.selectedPeer == "" { From 89da4151516cb14fecb1e032a5fed42b901a57ab Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 12 Oct 2023 18:01:24 -0700 Subject: [PATCH 12/13] fix: address few issues with test and not doing selection if preferredPeer is already specified --- waku/v2/node/wakunode2_test.go | 6 ++- waku/v2/protocol/filter/client.go | 12 +++-- waku/v2/protocol/filter/options.go | 3 +- waku/v2/protocol/filter/options_test.go | 2 +- waku/v2/protocol/legacy_filter/waku_filter.go | 2 +- .../legacy_filter/waku_filter_option.go | 3 +- .../legacy_filter/waku_filter_option_test.go | 2 +- waku/v2/protocol/lightpush/waku_lightpush.go | 13 +++-- .../lightpush/waku_lightpush_option.go | 4 +- .../lightpush/waku_lightpush_option_test.go | 2 +- waku/v2/protocol/peer_exchange/client.go | 14 ++++- .../waku_peer_exchange_option.go | 53 ++++++------------- waku/v2/protocol/store/waku_store_client.go | 20 +++++-- 13 files changed, 69 insertions(+), 67 deletions(-) diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 303aeffff..378ad0bca 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -17,6 +17,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/persistence/sqlite" "github.com/waku-org/go-waku/waku/v2/dnsdisc" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -303,9 +304,10 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.NoError(t, err) defer wakuNode3.Stop() - err = wakuNode3.DialPeerWithMultiAddress(ctx, wakuNode2.ListenAddresses()[0]) + //err = wakuNode3.DialPeerWithMultiAddress(ctx, wakuNode2.ListenAddresses()[0]) + _, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, store.StoreID_v20beta4) require.NoError(t, err) - + time.Sleep(2 * time.Second) // NODE2 should have returned the message received via filter result, err := wakuNode3.Store().Query(ctx, store.Query{}) require.NoError(t, err) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index e110880b5..3b1418361 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -276,10 +276,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot failedContentTopics := []string{} subscriptions := make([]*subscription.SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { - + var selectedPeer peer.ID //TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic - if params.pm != nil { - params.selectedPeer, err = wf.pm.SelectPeer( + if params.pm != nil && params.selectedPeer == "" { + selectedPeer, err = wf.pm.SelectPeer( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, @@ -288,9 +288,11 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot Ctx: ctx, }, ) + } else { + selectedPeer = params.selectedPeer } - if params.selectedPeer == "" { + if selectedPeer == "" { wf.metrics.RecordError(peerNotFoundFailure) wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Error(err)) @@ -309,7 +311,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot failedContentTopics = append(failedContentTopics, cTopics...) continue } - subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter)) + subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(selectedPeer, cFilter)) } if len(failedContentTopics) > 0 { diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index dac050619..d73bee91f 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -1,7 +1,6 @@ package filter import ( - "context" "sync" "time" @@ -67,7 +66,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption // with the lowest ping If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a // peer from the node peerstore -func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) error { params.peerSelectionType = peermanager.LowestRTT return nil diff --git a/waku/v2/protocol/filter/options_test.go b/waku/v2/protocol/filter/options_test.go index eeca00eed..d02bb2da2 100644 --- a/waku/v2/protocol/filter/options_test.go +++ b/waku/v2/protocol/filter/options_test.go @@ -21,7 +21,7 @@ func TestFilterOption(t *testing.T) { options := []FilterSubscribeOption{ WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithAutomaticPeerSelection(), - WithFastestPeerSelection(context.Background()), + WithFastestPeerSelection(), } params := new(FilterSubscribeParameters) diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 1944692fe..f2079d404 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -239,7 +239,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil for _, opt := range optList { opt(params) } - if wf.pm != nil { + if wf.pm != nil && params.selectedPeer == "" { params.selectedPeer, _ = wf.pm.SelectPeer( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, diff --git a/waku/v2/protocol/legacy_filter/waku_filter_option.go b/waku/v2/protocol/legacy_filter/waku_filter_option.go index 1fd67b628..4b103a367 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter_option.go +++ b/waku/v2/protocol/legacy_filter/waku_filter_option.go @@ -1,7 +1,6 @@ package legacy_filter import ( - "context" "time" "github.com/libp2p/go-libp2p/core/host" @@ -61,7 +60,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption // with the lowest ping If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a // peer from the node peerstore -func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { params.peerSelectionType = peermanager.LowestRTT } diff --git a/waku/v2/protocol/legacy_filter/waku_filter_option_test.go b/waku/v2/protocol/legacy_filter/waku_filter_option_test.go index ab732bbbc..bf0e0ffc2 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter_option_test.go +++ b/waku/v2/protocol/legacy_filter/waku_filter_option_test.go @@ -20,7 +20,7 @@ func TestFilterOption(t *testing.T) { options := []FilterSubscribeOption{ WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithAutomaticPeerSelection(), - WithFastestPeerSelection(context.Background()), + WithFastestPeerSelection(), } params := new(FilterSubscribeParameters) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index b9ade44c4..2de94bc24 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -224,11 +224,16 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe } } - if params.pm != nil { + if params.pm != nil && params.selectedPeer == "" { params.selectedPeer, err = wakuLP.pm.SelectPeer( - peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, - Proto: LightPushID_v20beta1, PubsubTopic: params.pubsubTopic, - SpecificPeers: params.preferredPeers, Ctx: ctx}) + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: LightPushID_v20beta1, + PubsubTopic: params.pubsubTopic, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) } if params.selectedPeer == "" { if err != nil { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index f239af654..55dba60af 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -1,8 +1,6 @@ package lightpush import ( - "context" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/peermanager" @@ -52,7 +50,7 @@ func WithPubSubTopic(pubsubTopic string) Option { // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore -func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Option { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option { return func(params *lightPushParameters) { params.peerSelectionType = peermanager.LowestRTT } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go index 6fa21677d..3bb72654c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go @@ -20,7 +20,7 @@ func TestLightPushOption(t *testing.T) { options := []Option{ WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithAutomaticPeerSelection(), - WithFastestPeerSelection(context.Background()), + WithFastestPeerSelection(), WithRequestID([]byte("requestID")), WithAutomaticRequestID(), } diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index f06035cfb..65a7305f3 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -26,12 +26,22 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts optList := DefaultOptions(wakuPX.h) optList = append(optList, opts...) for _, opt := range optList { - err := opt(params) + opt(params) + } + if params.pm != nil && params.selectedPeer == "" { + var err error + params.selectedPeer, err = wakuPX.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: PeerExchangeID_v20alpha1, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) if err != nil { return err } } - if params.selectedPeer == "" { wakuPX.metrics.RecordError(dialFailure) return ErrNoPeersAvailable diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index a6164f07c..57cf2455b 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -1,9 +1,6 @@ package peer_exchange import ( - "context" - "errors" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/peermanager" @@ -11,19 +8,20 @@ import ( ) type PeerExchangeParameters struct { - host host.Host - selectedPeer peer.ID - pm *peermanager.PeerManager - log *zap.Logger + host host.Host + selectedPeer peer.ID + peerSelectionType peermanager.PeerSelection + preferredPeers peer.IDSlice + pm *peermanager.PeerManager + log *zap.Logger } -type PeerExchangeOption func(*PeerExchangeParameters) error +type PeerExchangeOption func(*PeerExchangeParameters) // WithPeer is an option used to specify the peerID to push a waku message to func WithPeer(p peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) error { + return func(params *PeerExchangeParameters) { params.selectedPeer = p - return nil } } @@ -33,19 +31,9 @@ func WithPeer(p peer.ID) PeerExchangeOption { // from the node peerstore // Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) error { - if params.pm == nil { - return errors.New("automatic selection is not avaiable since peerManager is not initialized") - } else { - p, err := params.pm.SelectPeer(peermanager.PeerSelectionCriteria{Proto: PeerExchangeID_v20alpha1, - SpecificPeers: fromThesePeers}) - if err == nil { - params.selectedPeer = p - } else { - return err - } - return nil - } + return func(params *PeerExchangeParameters) { + params.peerSelectionType = peermanager.Automatic + params.preferredPeers = fromThesePeers } } @@ -53,21 +41,10 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore -func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) error { - if params.pm == nil { - return errors.New("automatic selection is not avaiable since peerManager is not initialized") - } else { - p, err := params.pm.SelectPeerWithLowestRTT( - peermanager.PeerSelectionCriteria{Proto: PeerExchangeID_v20alpha1, - SpecificPeers: fromThesePeers, Ctx: ctx}) - if err == nil { - params.selectedPeer = p - } else { - return err - } - return nil - } +func WithFastestPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { + return func(params *PeerExchangeParameters) { + params.peerSelectionType = peermanager.LowestRTT + params.preferredPeers = fromThesePeers } } diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 0b359b6cc..68a0b3903 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -119,7 +119,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore // Note: This option is avaiable only with peerManager -func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) HistoryRequestOption { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { return func(params *HistoryRequestParameters) { params.peerSelectionType = peermanager.LowestRTT } @@ -264,10 +264,20 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR for _, opt := range optList { opt(params) } - - if store.pm != nil { - params.selectedPeer, _ = store.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType, - Proto: StoreID_v20beta4, PubsubTopic: query.Topic, SpecificPeers: params.preferredPeers, Ctx: ctx}) + if store.pm != nil && params.selectedPeer == "" { + var err error + params.selectedPeer, err = store.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: StoreID_v20beta4, + PubsubTopic: query.Topic, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + if err != nil { + return nil, err + } } if !params.localQuery && params.selectedPeer == "" { From 07dfc540e794121b8b0ff9ab4ceaa3b79a974463 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 16 Oct 2023 22:06:51 +0530 Subject: [PATCH 13/13] Update waku/v2/node/wakunode2_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: richΛrd --- waku/v2/node/wakunode2_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 378ad0bca..71a3d101e 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -304,7 +304,6 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.NoError(t, err) defer wakuNode3.Stop() - //err = wakuNode3.DialPeerWithMultiAddress(ctx, wakuNode2.ListenAddresses()[0]) _, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, store.StoreID_v20beta4) require.NoError(t, err) time.Sleep(2 * time.Second)