From ae899711b4c5cd689a67a311ae12c3e0aec73f29 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 8 Sep 2023 15:05:00 +0530 Subject: [PATCH 01/16] fix: updated filterv2 protocol as per rfc, make pubsub topic optional --- waku/v2/protocol/filter/client.go | 19 +++-- .../protocol/filter/pb/waku_filter_v2.pb.go | 74 ++++++++++--------- .../protocol/filter/pb/waku_filter_v2.proto | 4 +- waku/v2/protocol/filter/server.go | 12 +-- 4 files changed, 59 insertions(+), 50 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index d9448965e..f8c839d2c 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -160,15 +160,17 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str return } - if !wf.subscriptions.Has(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) { - logger.Warn("received messagepush with invalid subscription parameters", logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", messagePush.PubsubTopic), zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) + if !wf.subscriptions.Has(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) { + logger.Warn("received messagepush with invalid subscription parameters", + logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", *messagePush.PubsubTopic), + zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) wf.metrics.RecordError(invalidSubscriptionMessage) return } wf.metrics.RecordMessage() - wf.notify(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage) + wf.notify(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage) logger.Info("received message push") } @@ -184,7 +186,8 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, wf.subscriptions.Notify(remotePeerID, envelope) } -func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { +func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters, + reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) @@ -198,7 +201,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr request := &pb.FilterSubscribeRequest{ RequestId: hex.EncodeToString(params.requestID), FilterSubscribeType: reqType, - PubsubTopic: contentFilter.Topic, + PubsubTopic: &contentFilter.Topic, ContentTopics: contentFilter.ContentTopics, } @@ -388,7 +391,8 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte } // Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, + opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() @@ -474,7 +478,8 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co } // Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, + opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() diff --git a/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go b/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go index 0bea8dbc1..7bc044051 100644 --- a/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go +++ b/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 -// protoc v3.21.12 +// protoc-gen-go v1.31.0 +// protoc v4.23.4 // source: waku_filter_v2.proto // 12/WAKU2-FILTER rfc: https://rfc.vac.dev/spec/12/ @@ -84,7 +84,7 @@ type FilterSubscribeRequest struct { RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` FilterSubscribeType FilterSubscribeRequest_FilterSubscribeType `protobuf:"varint,2,opt,name=filter_subscribe_type,json=filterSubscribeType,proto3,enum=pb.FilterSubscribeRequest_FilterSubscribeType" json:"filter_subscribe_type,omitempty"` // Filter criteria - PubsubTopic string `protobuf:"bytes,10,opt,name=pubsub_topic,json=pubsubTopic,proto3" json:"pubsub_topic,omitempty"` + PubsubTopic *string `protobuf:"bytes,10,opt,name=pubsub_topic,json=pubsubTopic,proto3,oneof" json:"pubsub_topic,omitempty"` ContentTopics []string `protobuf:"bytes,11,rep,name=content_topics,json=contentTopics,proto3" json:"content_topics,omitempty"` } @@ -135,8 +135,8 @@ func (x *FilterSubscribeRequest) GetFilterSubscribeType() FilterSubscribeRequest } func (x *FilterSubscribeRequest) GetPubsubTopic() string { - if x != nil { - return x.PubsubTopic + if x != nil && x.PubsubTopic != nil { + return *x.PubsubTopic } return "" } @@ -218,7 +218,7 @@ type MessagePushV2 struct { unknownFields protoimpl.UnknownFields WakuMessage *pb.WakuMessage `protobuf:"bytes,1,opt,name=waku_message,json=wakuMessage,proto3" json:"waku_message,omitempty"` - PubsubTopic string `protobuf:"bytes,2,opt,name=pubsub_topic,json=pubsubTopic,proto3" json:"pubsub_topic,omitempty"` + PubsubTopic *string `protobuf:"bytes,2,opt,name=pubsub_topic,json=pubsubTopic,proto3,oneof" json:"pubsub_topic,omitempty"` } func (x *MessagePushV2) Reset() { @@ -261,8 +261,8 @@ func (x *MessagePushV2) GetWakuMessage() *pb.WakuMessage { } func (x *MessagePushV2) GetPubsubTopic() string { - if x != nil { - return x.PubsubTopic + if x != nil && x.PubsubTopic != nil { + return *x.PubsubTopic } return "" } @@ -272,7 +272,7 @@ var File_waku_filter_v2_proto protoreflect.FileDescriptor var file_waku_filter_v2_proto_rawDesc = []byte{ 0x0a, 0x14, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x5f, 0x76, 0x32, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x1a, 0x12, 0x77, 0x61, 0x6b, 0x75, - 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc6, + 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xdc, 0x02, 0x0a, 0x16, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, @@ -282,33 +282,35 @@ var file_waku_filter_v2_proto_rawDesc = []byte{ 0x74, 0x65, 0x72, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, 0x13, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x53, - 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x26, 0x0a, 0x0c, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x0a, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, - 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, - 0x73, 0x18, 0x0b, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, - 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x22, 0x5f, 0x0a, 0x13, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x13, 0x0a, - 0x0f, 0x53, 0x55, 0x42, 0x53, 0x43, 0x52, 0x49, 0x42, 0x45, 0x52, 0x5f, 0x50, 0x49, 0x4e, 0x47, - 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x55, 0x42, 0x53, 0x43, 0x52, 0x49, 0x42, 0x45, 0x10, - 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x55, 0x42, 0x53, 0x43, 0x52, 0x49, 0x42, 0x45, - 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x55, 0x4e, 0x53, 0x55, 0x42, 0x53, 0x43, 0x52, 0x49, 0x42, - 0x45, 0x5f, 0x41, 0x4c, 0x4c, 0x10, 0x03, 0x22, 0x7a, 0x0a, 0x17, 0x46, 0x69, 0x6c, 0x74, 0x65, - 0x72, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, - 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, - 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, - 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x64, 0x65, 0x73, - 0x63, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x44, - 0x65, 0x73, 0x63, 0x22, 0x66, 0x0a, 0x0d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x75, - 0x73, 0x68, 0x56, 0x32, 0x12, 0x32, 0x0a, 0x0c, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x62, 0x2e, - 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x0b, 0x77, 0x61, 0x6b, - 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x75, 0x62, 0x73, - 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, - 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x28, 0x09, 0x48, 0x00, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, 0x70, 0x69, + 0x63, 0x88, 0x01, 0x01, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x5f, + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x0b, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, + 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x22, 0x5f, 0x0a, 0x13, 0x46, + 0x69, 0x6c, 0x74, 0x65, 0x72, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x54, 0x79, + 0x70, 0x65, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x55, 0x42, 0x53, 0x43, 0x52, 0x49, 0x42, 0x45, 0x52, + 0x5f, 0x50, 0x49, 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x55, 0x42, 0x53, 0x43, + 0x52, 0x49, 0x42, 0x45, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x55, 0x42, 0x53, + 0x43, 0x52, 0x49, 0x42, 0x45, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x55, 0x4e, 0x53, 0x55, 0x42, + 0x53, 0x43, 0x52, 0x49, 0x42, 0x45, 0x5f, 0x41, 0x4c, 0x4c, 0x10, 0x03, 0x42, 0x0f, 0x0a, 0x0d, + 0x5f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x22, 0x7a, 0x0a, + 0x17, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x44, 0x65, 0x73, 0x63, 0x22, 0x7c, 0x0a, 0x0d, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x50, 0x75, 0x73, 0x68, 0x56, 0x32, 0x12, 0x32, 0x0a, 0x0c, 0x77, 0x61, + 0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x52, 0x0b, 0x77, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x26, + 0x0a, 0x0c, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x70, 0x75, 0x62, 0x73, 0x75, + 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -385,6 +387,8 @@ func file_waku_filter_v2_proto_init() { } } } + file_waku_filter_v2_proto_msgTypes[0].OneofWrappers = []interface{}{} + file_waku_filter_v2_proto_msgTypes[2].OneofWrappers = []interface{}{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/waku/v2/protocol/filter/pb/waku_filter_v2.proto b/waku/v2/protocol/filter/pb/waku_filter_v2.proto index f27d65afe..50ad632cc 100644 --- a/waku/v2/protocol/filter/pb/waku_filter_v2.proto +++ b/waku/v2/protocol/filter/pb/waku_filter_v2.proto @@ -19,7 +19,7 @@ message FilterSubscribeRequest { FilterSubscribeType filter_subscribe_type = 2; // Filter criteria - string pubsub_topic = 10; + optional string pubsub_topic = 10; repeated string content_topics = 11; } @@ -32,5 +32,5 @@ message FilterSubscribeResponse { // Protocol identifier: /vac/waku/filter-push/2.0.0-beta1 message MessagePushV2 { WakuMessage waku_message = 1; - string pubsub_topic = 2; + optional string pubsub_topic = 2; } \ No newline at end of file diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index b47c8db35..1baa2c963 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -153,7 +153,7 @@ func (wf *WakuFilterFullNode) ping(ctx context.Context, s network.Stream, logger } func (wf *WakuFilterFullNode) subscribe(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { - if request.PubsubTopic == "" { + if request.PubsubTopic == nil { wf.reply(ctx, s, request, http.StatusBadRequest, "pubsubtopic can't be empty") return } @@ -186,14 +186,14 @@ func (wf *WakuFilterFullNode) subscribe(ctx context.Context, s network.Stream, l } } - wf.subscriptions.Set(peerID, request.PubsubTopic, request.ContentTopics) + wf.subscriptions.Set(peerID, *request.PubsubTopic, request.ContentTopics) wf.metrics.RecordSubscriptions(wf.subscriptions.Count()) wf.reply(ctx, s, request, http.StatusOK) } func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { - if request.PubsubTopic == "" { + if request.PubsubTopic == nil { wf.reply(ctx, s, request, http.StatusBadRequest, "pubsubtopic can't be empty") return } @@ -207,7 +207,7 @@ func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, s network.Stream, wf.reply(ctx, s, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)) } - err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics) + err := wf.subscriptions.Delete(s.Conn().RemotePeer(), *request.PubsubTopic, request.ContentTopics) if err != nil { wf.reply(ctx, s, request, http.StatusNotFound, peerHasNoSubscription) } else { @@ -273,9 +273,9 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e zap.String("pubsubTopic", env.PubsubTopic()), zap.String("contentTopic", env.Message().ContentTopic), ) - + pubSubTopic := env.PubsubTopic() messagePush := &pb.MessagePushV2{ - PubsubTopic: env.PubsubTopic(), + PubsubTopic: &pubSubTopic, WakuMessage: env.Message(), } From 717df702c245bc5915aa6789cc5d028a349a02ec Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Sat, 9 Sep 2023 12:19:09 +0530 Subject: [PATCH 02/16] chore: make broadcaster optional in filter client --- waku/v2/protocol/filter/client.go | 10 ++++++---- waku/v2/protocol/filter/filter_test.go | 15 +++++++++------ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index f8c839d2c..f7277cb7d 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -40,7 +40,7 @@ type WakuFilterLightNode struct { cancel context.CancelFunc ctx context.Context h host.Host - broadcaster relay.Broadcaster + broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s timesource timesource.Timesource metrics Metrics wg *sync.WaitGroup @@ -63,6 +63,7 @@ var errNotStarted = errors.New("not started") var errAlreadyStarted = errors.New("already started") // NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options +// Note that broadcaster is optional. // Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode. // If using libp2p host, then pass peermanager as nil func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager, @@ -179,9 +180,10 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) { envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic) - // Broadcasting message so it's stored - wf.broadcaster.Submit(envelope) - + if wf.broadcaster != nil { + // Broadcasting message so it's stored + wf.broadcaster.Submit(envelope) + } // Notify filter subscribers wf.subscriptions.Notify(remotePeerID, envelope) } diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index 2d6853710..9e0467803 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -68,15 +68,17 @@ func (s *FilterTestSuite) makeWakuRelay(topic string) (*relay.WakuRelay, *relay. return relay, sub, host, broadcaster } -func (s *FilterTestSuite) makeWakuFilterLightNode(start bool) *WakuFilterLightNode { +func (s *FilterTestSuite) makeWakuFilterLightNode(start bool, withBroadcaster bool) *WakuFilterLightNode { port, err := tests.FindFreePort(s.T(), "", 5) s.Require().NoError(err) host, err := tests.MakeHost(context.Background(), port, rand.Reader) s.Require().NoError(err) - - b := relay.NewBroadcaster(10) - s.Require().NoError(b.Start(context.Background())) + var b relay.Broadcaster + if withBroadcaster { + b = relay.NewBroadcaster(10) + s.Require().NoError(b.Start(context.Background())) + } filterPush := NewWakuFilterLightNode(b, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log) filterPush.SetHost(host) s.lightNodeHost = host @@ -181,7 +183,8 @@ func (s *FilterTestSuite) SetupTest() { s.testTopic = "/waku/2/go/filter/test" s.testContentTopic = "TopicA" - s.lightNode = s.makeWakuFilterLightNode(true) + s.lightNode = s.makeWakuFilterLightNode(true, false) + //TODO: Add tests to verify broadcaster. s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic) @@ -383,7 +386,7 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { func (s *FilterTestSuite) TestStartStop() { var wg sync.WaitGroup wg.Add(2) - s.lightNode = s.makeWakuFilterLightNode(false) + s.lightNode = s.makeWakuFilterLightNode(false, false) stopNode := func() { for i := 0; i < 100000; i++ { From 763c8f64cf3eb0965507f01956ebac23abe322fb Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Sat, 9 Sep 2023 17:59:22 +0530 Subject: [PATCH 03/16] feat: update filter client to support autosharding --- library/filter.go | 16 +-- waku/v2/protocol/filter/client.go | 153 ++++++++++++++++--------- waku/v2/protocol/filter/filter_test.go | 43 ++++--- 3 files changed, 132 insertions(+), 80 deletions(-) diff --git a/library/filter.go b/library/filter.go index e78d6a3d1..7de2f9187 100644 --- a/library/filter.go +++ b/library/filter.go @@ -60,18 +60,20 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) { fOptions = append(fOptions, filter.WithAutomaticPeerSelection()) } - subscriptionDetails, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...) + subscriptions, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...) if err != nil { return "", err } - go func(subscriptionDetails *filter.SubscriptionDetails) { - for envelope := range subscriptionDetails.C { - send("message", toSubscriptionMessage(envelope)) - } - }(subscriptionDetails) + for _, subscriptionDetails := range subscriptions { + go func(subscriptionDetails *filter.SubscriptionDetails) { + for envelope := range subscriptionDetails.C { + send("message", toSubscriptionMessage(envelope)) + } + }(subscriptionDetails) + } - return marshalJSON(subscriptionDetails) + return marshalJSON(subscriptions) } // FilterPing is used to determine if a peer has an active subscription diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index f7277cb7d..93233c14c 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -49,6 +49,10 @@ type WakuFilterLightNode struct { pm *peermanager.PeerManager } +// ContentFilter is used to specify the filter to be applied for a FilterNode. +// Topic means pubSubTopic (which is optional in case of using contentTopics that following Static/Auto sharding) +// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic or a list of contentTopics (which follow static/Auto sharding) +// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic type ContentFilter struct { Topic string ContentTopics []string @@ -144,7 +148,6 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str return func(s network.Stream) { defer s.Close() logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer())) - if !wf.subscriptions.IsSubscribedTo(s.Conn().RemotePeer()) { logger.Warn("received message push from unknown peer", logging.HostID("peerID", s.Conn().RemotePeer())) wf.metrics.RecordError(unknownPeerMessagePush) @@ -222,7 +225,6 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr wf.metrics.RecordError(decodeRPCFailure) return err } - if filterSubscribeResponse.RequestId != request.RequestId { wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId)) wf.metrics.RecordError(requestIDMismatch) @@ -239,8 +241,35 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr return nil } +// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics +func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) { + pubSubTopicMap := make(map[string][]string) + + if contentFilter.Topic != "" { + pubSubTopicMap[contentFilter.Topic] = make([]string, 0) + pubSubTopicMap[contentFilter.Topic] = contentFilter.ContentTopics + } else { + //Parse the content-Topics to figure out shards. + for _, cTopicString := range contentFilter.ContentTopics { + cTopic, err := protocol.StringToContentTopic(cTopicString) + if err != nil { + return nil, errors.New(err.Error() + " : " + cTopicString) + } + pTopic := protocol.GetShardFromContentTopic(cTopic, protocol.GenerationZeroShardsCount) + _, ok := pubSubTopicMap[pTopic.String()] + if !ok { + pubSubTopicMap[pTopic.String()] = make([]string, 1) + } + pubSubTopicMap[pTopic.String()] = append(pubSubTopicMap[pTopic.String()], cTopicString) + } + } + return pubSubTopicMap, nil +} + // Subscribe setups a subscription to receive messages that match a specific content filter -func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) { +// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer. +// This may change if Filterv2 protocol is updated to handle such a scenario in a single request. +func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) ([]*SubscriptionDetails, error) { wf.RLock() defer wf.RUnlock() @@ -248,10 +277,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont return nil, errNotStarted } - if contentFilter.Topic == "" { - return nil, errors.New("topic is required") - } - if len(contentFilter.ContentTopics) == 0 { return nil, errors.New("at least one content topic is required") } @@ -276,12 +301,23 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont return nil, ErrNoPeersAvailable } - err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, contentFilter) + pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter) if err != nil { return nil, err } - - return wf.subscriptions.NewSubscription(params.selectedPeer, contentFilter.Topic, contentFilter.ContentTopics), nil + subscriptions := make([]*SubscriptionDetails, 0) + for pubSubTopic, cTopics := range pubSubTopicMap { + var cFilter ContentFilter + cFilter.Topic = pubSubTopic + cFilter.ContentTopics = cTopics + //TO OPTIMIZE: Should we parallelize these, if so till how many batches? + err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) + if err != nil { + return nil, err + } + subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter.Topic, cFilter.ContentTopics)) + } + return subscriptions, nil } // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol @@ -402,10 +438,6 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co return nil, errNotStarted } - if contentFilter.Topic == "" { - return nil, errors.New("topic is required") - } - if len(contentFilter.ContentTopics) == 0 { return nil, errors.New("at least one content topic is required") } @@ -419,57 +451,66 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co return nil, err } - resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) - for peerID := range wf.subscriptions.items { - if params.selectedPeer != "" && peerID != params.selectedPeer { - continue - } + pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter) + if err != nil { + return nil, err + } - subscriptions, ok := wf.subscriptions.items[peerID] - if !ok || subscriptions == nil { - continue - } + resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) + for pTopic, cTopics := range pubSubTopicMap { + var cFilter ContentFilter + cFilter.Topic = pTopic + cFilter.ContentTopics = cTopics + for peerID := range wf.subscriptions.items { + if params.selectedPeer != "" && peerID != params.selectedPeer { + continue + } - wf.cleanupSubscriptions(peerID, contentFilter) - if len(subscriptions.subscriptionsPerTopic) == 0 { - delete(wf.subscriptions.items, peerID) - } + subscriptions, ok := wf.subscriptions.items[peerID] + if !ok || subscriptions == nil { + continue + } - if params.wg != nil { - params.wg.Add(1) - } + wf.cleanupSubscriptions(peerID, cFilter) + if len(subscriptions.subscriptionsPerTopic) == 0 { + delete(wf.subscriptions.items, peerID) + } - go func(peerID peer.ID) { - defer func() { - if params.wg != nil { - params.wg.Done() - } - }() + if params.wg != nil { + params.wg.Add(1) + } - err := wf.request( - ctx, - &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, - pb.FilterSubscribeRequest_UNSUBSCRIBE, - contentFilter) - if err != nil { - ferr, ok := err.(*FilterError) - if ok && ferr.Code == http.StatusNotFound { - wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", peerID), zap.Error(err)) - } else { - wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) - return + go func(peerID peer.ID) { + defer func() { + if params.wg != nil { + params.wg.Done() + } + }() + + err := wf.request( + ctx, + &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, + pb.FilterSubscribeRequest_UNSUBSCRIBE, + cFilter) + if err != nil { + ferr, ok := err.(*FilterError) + if ok && ferr.Code == http.StatusNotFound { + wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", peerID), zap.Error(err)) + } else { + wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) + return + } } - } - if params.wg != nil { - resultChan <- WakuFilterPushResult{ - Err: err, - PeerID: peerID, + if params.wg != nil { + resultChan <- WakuFilterPushResult{ + Err: err, + PeerID: peerID, + } } - } - }(peerID) + }(peerID) + } } - if params.wg != nil { params.wg.Wait() } diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index 9e0467803..b880b0886 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -41,7 +41,7 @@ type FilterTestSuite struct { fullNodeHost host.Host wg *sync.WaitGroup contentFilter ContentFilter - subDetails *SubscriptionDetails + subDetails []*SubscriptionDetails log *zap.Logger } @@ -141,7 +141,7 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) s.wg.Wait() } -func (s *FilterTestSuite) subscribe(topic string, contentTopic string, peer peer.ID) *SubscriptionDetails { +func (s *FilterTestSuite) subscribe(topic string, contentTopic string, peer peer.ID) []*SubscriptionDetails { s.contentFilter = ContentFilter{ Topic: string(topic), ContentTopics: []string{contentTopic}, @@ -183,7 +183,7 @@ func (s *FilterTestSuite) SetupTest() { s.testTopic = "/waku/2/go/filter/test" s.testContentTopic = "TopicA" - s.lightNode = s.makeWakuFilterLightNode(true, false) + s.lightNode = s.makeWakuFilterLightNode(true, true) //TODO: Add tests to verify broadcaster. s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic) @@ -204,19 +204,19 @@ func (s *FilterTestSuite) TearDownTest() { } func (s *FilterTestSuite) TestWakuFilter() { - + s.log.Info("Running TestWakuFilter") // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Should be received s.waitForMsg(func() { s.publishMsg(s.testTopic, s.testContentTopic, "first") - }, s.subDetails.C) + }, s.subDetails[0].C) // Wrong content topic s.waitForTimeout(func() { s.publishMsg(s.testTopic, "TopicB", "second") - }, s.subDetails.C) + }, s.subDetails[0].C) _, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID())) s.Require().NoError(err) @@ -226,11 +226,11 @@ func (s *FilterTestSuite) TestWakuFilter() { // Should not receive after unsubscribe s.waitForTimeout(func() { s.publishMsg(s.testTopic, s.testContentTopic, "third") - }, s.subDetails.C) + }, s.subDetails[0].C) } func (s *FilterTestSuite) TestSubscriptionPing() { - + s.log.Info("Running TestSubscriptionPing") err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) s.Require().Error(err) filterErr, ok := err.(*FilterError) @@ -245,7 +245,7 @@ func (s *FilterTestSuite) TestSubscriptionPing() { } func (s *FilterTestSuite) TestPeerFailure() { - + s.log.Info("Running TestPeerFailure") broadcaster2 := relay.NewBroadcaster(10) s.Require().NoError(broadcaster2.Start(context.Background())) @@ -262,7 +262,7 @@ func (s *FilterTestSuite) TestPeerFailure() { s.waitForMsg(func() { s.publishMsg(s.testTopic, s.testContentTopic) - }, s.subDetails.C) + }, s.subDetails[0].C) // Failure is removed s.Require().False(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) @@ -289,18 +289,18 @@ func (s *FilterTestSuite) TestPeerFailure() { } func (s *FilterTestSuite) TestCreateSubscription() { - + s.log.Info("Running TestCreateSubscription") // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - s.waitForMsg(func() { _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) s.Require().NoError(err) - }, s.subDetails.C) + }, s.subDetails[0].C) } func (s *FilterTestSuite) TestModifySubscription() { + s.log.Info("Running TestModifySubscription") // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) @@ -309,7 +309,7 @@ func (s *FilterTestSuite) TestModifySubscription() { _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) s.Require().NoError(err) - }, s.subDetails.C) + }, s.subDetails[0].C) // Subscribe to another content_topic newContentTopic := "Topic_modified" @@ -319,10 +319,11 @@ func (s *FilterTestSuite) TestModifySubscription() { _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) s.Require().NoError(err) - }, s.subDetails.C) + }, s.subDetails[0].C) } func (s *FilterTestSuite) TestMultipleMessages() { + s.log.Info("Running TestMultipleMessages") // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) @@ -331,16 +332,18 @@ func (s *FilterTestSuite) TestMultipleMessages() { _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic) s.Require().NoError(err) - }, s.subDetails.C) + }, s.subDetails[0].C) s.waitForMsg(func() { _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic) s.Require().NoError(err) - }, s.subDetails.C) + }, s.subDetails[0].C) } func (s *FilterTestSuite) TestRunningGuard() { + s.log.Info("Running TestRunningGuard") + s.lightNode.Stop() contentFilter := ContentFilter{ @@ -361,6 +364,8 @@ func (s *FilterTestSuite) TestRunningGuard() { } func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { + s.log.Info("Running TestFireAndForgetAndCustomWg") + contentFilter := ContentFilter{ Topic: "test", ContentTopics: []string{"test"}, @@ -384,6 +389,8 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { } func (s *FilterTestSuite) TestStartStop() { + s.log.Info("Running TestStartStop") + var wg sync.WaitGroup wg.Add(2) s.lightNode = s.makeWakuFilterLightNode(false, false) @@ -411,3 +418,5 @@ func (s *FilterTestSuite) TestStartStop() { wg.Wait() } + +//TODO: Add tests for autosharding logic with filter. From 5bb8a39cfc37bfd5d0d6b88fa21835920c834738 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Sun, 10 Sep 2023 08:08:53 +0530 Subject: [PATCH 04/16] reverting optional pubSub topic in Filter.MessagePush --- waku/v2/protocol/filter/client.go | 6 +++--- waku/v2/protocol/filter/pb/waku_filter_v2.pb.go | 16 +++++++--------- waku/v2/protocol/filter/pb/waku_filter_v2.proto | 2 +- waku/v2/protocol/filter/server.go | 2 +- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index f7277cb7d..f75658dd0 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -161,9 +161,9 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str return } - if !wf.subscriptions.Has(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) { + if !wf.subscriptions.Has(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) { logger.Warn("received messagepush with invalid subscription parameters", - logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", *messagePush.PubsubTopic), + logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", messagePush.PubsubTopic), zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) wf.metrics.RecordError(invalidSubscriptionMessage) return @@ -171,7 +171,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str wf.metrics.RecordMessage() - wf.notify(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage) + wf.notify(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage) logger.Info("received message push") } diff --git a/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go b/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go index 7bc044051..b25edbe27 100644 --- a/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go +++ b/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go @@ -218,7 +218,7 @@ type MessagePushV2 struct { unknownFields protoimpl.UnknownFields WakuMessage *pb.WakuMessage `protobuf:"bytes,1,opt,name=waku_message,json=wakuMessage,proto3" json:"waku_message,omitempty"` - PubsubTopic *string `protobuf:"bytes,2,opt,name=pubsub_topic,json=pubsubTopic,proto3,oneof" json:"pubsub_topic,omitempty"` + PubsubTopic string `protobuf:"bytes,2,opt,name=pubsub_topic,json=pubsubTopic,proto3" json:"pubsub_topic,omitempty"` } func (x *MessagePushV2) Reset() { @@ -261,8 +261,8 @@ func (x *MessagePushV2) GetWakuMessage() *pb.WakuMessage { } func (x *MessagePushV2) GetPubsubTopic() string { - if x != nil && x.PubsubTopic != nil { - return *x.PubsubTopic + if x != nil { + return x.PubsubTopic } return "" } @@ -302,15 +302,14 @@ var file_waku_filter_v2_proto_rawDesc = []byte{ 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x44, 0x65, 0x73, 0x63, 0x22, 0x7c, 0x0a, 0x0d, 0x4d, 0x65, 0x73, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x44, 0x65, 0x73, 0x63, 0x22, 0x66, 0x0a, 0x0d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x75, 0x73, 0x68, 0x56, 0x32, 0x12, 0x32, 0x0a, 0x0c, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x52, 0x0b, 0x77, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x26, + 0x65, 0x52, 0x0b, 0x77, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, - 0x70, 0x69, 0x63, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x70, 0x75, 0x62, 0x73, 0x75, - 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, 0x70, 0x69, + 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -388,7 +387,6 @@ func file_waku_filter_v2_proto_init() { } } file_waku_filter_v2_proto_msgTypes[0].OneofWrappers = []interface{}{} - file_waku_filter_v2_proto_msgTypes[2].OneofWrappers = []interface{}{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/waku/v2/protocol/filter/pb/waku_filter_v2.proto b/waku/v2/protocol/filter/pb/waku_filter_v2.proto index 50ad632cc..5e9fcc5cb 100644 --- a/waku/v2/protocol/filter/pb/waku_filter_v2.proto +++ b/waku/v2/protocol/filter/pb/waku_filter_v2.proto @@ -32,5 +32,5 @@ message FilterSubscribeResponse { // Protocol identifier: /vac/waku/filter-push/2.0.0-beta1 message MessagePushV2 { WakuMessage waku_message = 1; - optional string pubsub_topic = 2; + string pubsub_topic = 2; } \ No newline at end of file diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 1baa2c963..73c0599be 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -275,7 +275,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e ) pubSubTopic := env.PubsubTopic() messagePush := &pb.MessagePushV2{ - PubsubTopic: &pubSubTopic, + PubsubTopic: pubSubTopic, WakuMessage: env.Message(), } From e2146521f4319d1ed540961567cb8b488ff6f1f5 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Sun, 10 Sep 2023 08:17:52 +0530 Subject: [PATCH 05/16] chore: add filter tests for autoshard --- waku/v2/protocol/filter/filter_test.go | 90 +++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 10 deletions(-) diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index b880b0886..d24e69e73 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -204,7 +204,6 @@ func (s *FilterTestSuite) TearDownTest() { } func (s *FilterTestSuite) TestWakuFilter() { - s.log.Info("Running TestWakuFilter") // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) @@ -230,7 +229,6 @@ func (s *FilterTestSuite) TestWakuFilter() { } func (s *FilterTestSuite) TestSubscriptionPing() { - s.log.Info("Running TestSubscriptionPing") err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) s.Require().Error(err) filterErr, ok := err.(*FilterError) @@ -245,7 +243,6 @@ func (s *FilterTestSuite) TestSubscriptionPing() { } func (s *FilterTestSuite) TestPeerFailure() { - s.log.Info("Running TestPeerFailure") broadcaster2 := relay.NewBroadcaster(10) s.Require().NoError(broadcaster2.Start(context.Background())) @@ -289,7 +286,6 @@ func (s *FilterTestSuite) TestPeerFailure() { } func (s *FilterTestSuite) TestCreateSubscription() { - s.log.Info("Running TestCreateSubscription") // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { @@ -300,7 +296,6 @@ func (s *FilterTestSuite) TestCreateSubscription() { } func (s *FilterTestSuite) TestModifySubscription() { - s.log.Info("Running TestModifySubscription") // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) @@ -323,7 +318,6 @@ func (s *FilterTestSuite) TestModifySubscription() { } func (s *FilterTestSuite) TestMultipleMessages() { - s.log.Info("Running TestMultipleMessages") // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) @@ -342,7 +336,6 @@ func (s *FilterTestSuite) TestMultipleMessages() { } func (s *FilterTestSuite) TestRunningGuard() { - s.log.Info("Running TestRunningGuard") s.lightNode.Stop() @@ -364,7 +357,6 @@ func (s *FilterTestSuite) TestRunningGuard() { } func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { - s.log.Info("Running TestFireAndForgetAndCustomWg") contentFilter := ContentFilter{ Topic: "test", @@ -389,7 +381,6 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { } func (s *FilterTestSuite) TestStartStop() { - s.log.Info("Running TestStartStop") var wg sync.WaitGroup wg.Add(2) @@ -419,4 +410,83 @@ func (s *FilterTestSuite) TestStartStop() { wg.Wait() } -//TODO: Add tests for autosharding logic with filter. +func (s *FilterTestSuite) TestAutoShard() { + + //Workaround as could not find a way to reuse setup test with params + // Stop what is run in setup + s.fullNode.Stop() + s.lightNode.Stop() + + cTopic1Str := "0/test/1/testTopic/proto" + cTopic1, err := protocol.StringToContentTopic(cTopic1Str) + s.Require().NoError(err) + //Computing pubSubTopic only for filterFullNode. + pubSubTopic := protocol.GetShardFromContentTopic(cTopic1, protocol.GenerationZeroShardsCount) + s.testContentTopic = cTopic1Str + s.testTopic = pubSubTopic.String() + + s.lightNode = s.makeWakuFilterLightNode(true, false) + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(pubSubTopic.String()) + + s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.Require().NoError(err) + + s.log.Info("Testing Autoshard:CreateSubscription") + s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID()) + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails[0].C) + + // Wrong content topic + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, "TopicB", "second") + }, s.subDetails[0].C) + + _, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID())) + s.Require().NoError(err) + + time.Sleep(1 * time.Second) + + // Should not receive after unsubscribe + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "third") + }, s.subDetails[0].C) + + s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID()) + + s.log.Info("Testing Autoshard:SubscriptionPing") + err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + s.Require().NoError(err) + + // Test ModifySubscription Subscribe to another content_topic + s.log.Info("Testing Autoshard:ModifySubscription") + + newContentTopic := "0/test/1/testTopic1/proto" + s.subDetails = s.subscribe("", newContentTopic, s.fullNodeHost.ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails[0].C) + _, err = s.lightNode.Unsubscribe(s.ctx, ContentFilter{ + Topic: s.testTopic, + ContentTopics: []string{newContentTopic}, + }) + s.Require().NoError(err) + + _, err = s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) + +} + +func (s *FilterTestSuite) BeforeTest(suiteName, testName string) { + s.log.Info("Executing ", zap.String("testName", testName)) +} + +func (s *FilterTestSuite) AfterTest(suiteName, testName string) { + s.log.Info("Finished executing ", zap.String("testName", testName)) +} From b94ab32655a780c894afe8adf28f0582867c9928 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 11 Sep 2023 10:06:24 +0530 Subject: [PATCH 06/16] chore:update examples --- examples/chat2/chat.go | 4 ++-- examples/filter2/main.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index 4e72cfe26..c525144f0 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -75,11 +75,11 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. filterOpt = filter.WithPeer(peerID) chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID)) } - theFilter, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt) + theFilters, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt) if err != nil { chat.ui.ErrorMessage(err) } else { - chat.C = theFilter.C + chat.C = theFilters[0].C //Picking first subscription since there is only 1 contentTopic specified. } } else { diff --git a/examples/filter2/main.go b/examples/filter2/main.go index 8e1ae9577..1fc7c330f 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -107,7 +107,7 @@ func main() { } go func() { - for env := range theFilter.C { + for env := range theFilter[0].C { //Safely picking first subscriptions since only 1 contentTopic is subscribed log.Info("Light node received msg, ", string(env.Message().Payload)) } log.Info("Message channel closed!") From 99801035c0d1c00c715e3ca38ae312962d12c3c9 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 11 Sep 2023 10:38:24 +0530 Subject: [PATCH 07/16] chore:update filter example for autosharding --- examples/filter2/main.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/filter2/main.go b/examples/filter2/main.go index 1fc7c330f..b3a3313bf 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -26,7 +26,7 @@ var log = logging.Logger("filter2") var pubSubTopic = protocol.DefaultPubsubTopic() -const contentTopic = "test" +const contentTopic = "/filter2test/1/testTopic/proto" func main() { hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:60000") @@ -97,7 +97,6 @@ func main() { // Send FilterRequest from light node to full node cf := filter.ContentFilter{ - Topic: pubSubTopic.String(), ContentTopics: []string{contentTopic}, } From 30f71917b04e4f4066f3fc2340247ee008acb2c9 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 11 Sep 2023 11:01:48 +0530 Subject: [PATCH 08/16] chore: update example README --- examples/filter2/README.md | 42 +++++++++++++++----------------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/examples/filter2/README.md b/examples/filter2/README.md index c78c67785..46a3626b7 100644 --- a/examples/filter2/README.md +++ b/examples/filter2/README.md @@ -21,28 +21,20 @@ The app will run 2 nodes ("full" node and "light" node), with light node subscri ## Flow description -1. Light node submits a FilterRequest through WakuNode.SubscribeFilter. This request is submitted to a particular peer. -Filter is stored in WakuNode.filters map. That's it. -DONE -2. Full node: we read incoming messages in WakuFilter.onRequest(). It is set as a stream handler on wakunode.Host for WakuFilterProtocolId. -3. In WakuFilter.onRequest(): - 3.1. We check whether it's a MessagePush or FilterRequest. - 3.2. If it's a MessagePush, then we're on a light node. Invoke pushHandler coming from WakuNode.mountFilter() - 3.3. If it's a FilterRequest, add a subscriber. -4. WakuNode.Subscribe has a message loop extracting WakuMessages from a wakurelay.Subscription object. -It denotes a pubsub topic subscription. -All envelopes are then submitted to node.broadcaster. - - -## Nim code flow -1. Light node: WakuFilter.subscribe(). Find a peer, wrileLP(FilterRequest). Store requestId in WakuNode.filters along with a ContentFilterHandler proc. -2. Full node: WakuFilter inherits LPProtocol. LPProtocol.handler invokes readLP() to read FilterRPC messages -3. this handler function has a signature (conn: Connection, proto: string). - 3.1. it checks whether a MessagePush or FilterRequest is received. - 3.2. (light node) if it's a MessagePush, then we're on a light node. Invoke pushHandler of MessagePushHandler type. This pushHandler comes from WakuNode.mountFilter(). It iterates through all registered WakuNode.filters (stored in step 1) and invokes their ContentFilterHandler proc. - 3.3. (full node) if it's a FilterRequest, create a Subscriber and add to WakuFilter.subscribers seq -4. (full node) Each time a message is received through GossipSub in wakunode.subscribe.defaultHandler(), we iterate through subscriptions. -5. (full node) One of these subscriptions is a filter subscription added by WakuNode.mountFilter(), which in turn is returned from WakuFilter.subscription() -6. (full node) This subscription iterates through subscribers added by WakuFilter.handler() fn (subscribers being light nodes) -7. (full node) Once subscriber peer is found, a message is pushed directly to the peer (go to step 3.2) - +### Light Node +1. A light node is created with option WithWakuFilterLightNode. +2. Starting this node sets stream handler on wakunode.Host for WakuFilterProtocolId. +3. Light node submits a FilterSubscribeRequest through WakuFilterLightNode.Subscribe. This request is submitted to a particular peer. +Filter is stored in WakuFilterLightNode.subscriptions map. That's it. +4. Now we wait on WakuFilterLightNode.onRequest to process any further messages. +5. On receiving a message check and notify all subscribers on relevant channel (which is part of subscription obbject). +6. If a broadcaster is specified, + WakuNode.Subscribe has a message loop extracting WakuMessages from a wakurelay.Subscription object.It denotes a pubsub topic subscription.All envelopes are then submitted to node.broadcaster. +### Full Node +1. Full node is created with option WithWakuFilterFullNode. +2. We read incoming messages in WithWakuFilterFullNode.onRequest(). It is set as a stream handler on wakunode.Host for WakuFilterProtocolId. +3. In WakuFilter.onRequest + * We check the type of FilterRequest and handle accordingly. + * If it's a FilterRequest for subscribe, add a subscriber. + * If it is a SubscriberPing request, check if subscriptions exists or not and respond accordingly. + * If it is an unsubscribe/unsubscribeAll request, check and remove relevant subscriptions. From ced8c0c6ec50f54fa594855f79770af3721c76a8 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 11 Sep 2023 11:07:07 +0530 Subject: [PATCH 09/16] chore:increase test-time for autoshard tests --- waku/v2/protocol/filter/filter_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index d24e69e73..e0291488c 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -416,6 +416,9 @@ func (s *FilterTestSuite) TestAutoShard() { // Stop what is run in setup s.fullNode.Stop() s.lightNode.Stop() + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds + s.ctx = ctx + s.ctxCancel = cancel cTopic1Str := "0/test/1/testTopic/proto" cTopic1, err := protocol.StringToContentTopic(cTopic1Str) From c868f31ea2b34218c7dfe160d0faebdc9051da95 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 11 Sep 2023 11:13:01 +0530 Subject: [PATCH 10/16] chore:update filter API docs for autosharding --- library/c/README.md | 2 +- library/c/api_filter.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/library/c/README.md b/library/c/README.md index 6b837afdd..f3c1866e2 100644 --- a/library/c/README.md +++ b/library/c/README.md @@ -88,7 +88,7 @@ The criteria to create subscription to a filter full node in JSON Format: Fields: - `contentTopics`: Array of content topics. -- `topic`: pubsub topic. +- `topic`: Optional pubsub topic. ### `LegacyFilterSubscription` type diff --git a/library/c/api_filter.go b/library/c/api_filter.go index f4279c382..5c88b4973 100644 --- a/library/c/api_filter.go +++ b/library/c/api_filter.go @@ -10,7 +10,7 @@ import "github.com/waku-org/go-waku/library" // filterJSON must contain a JSON with this format: // // { -// "pubsubTopic": "the pubsub topic" // mandatory +// "pubsubTopic": "the pubsub topic" // optional // "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10 // } // @@ -42,7 +42,7 @@ func waku_filter_ping(peerID *C.char, ms C.int, onErrCb C.WakuCallBack) C.int { // criteria // // { -// "pubsubTopic": "the pubsub topic" // mandatory +// "pubsubTopic": "the pubsub topic" // optional // "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10 // } // From b6d36c755844fa8aa56ee07e17c8fb7015e598a7 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 12 Sep 2023 10:44:56 +0530 Subject: [PATCH 11/16] fix: revert pubSubTopic as optional and add higher level validation --- waku/v2/protocol/filter/client.go | 13 +++++++++---- waku/v2/protocol/filter/pb/waku_filter_v2.pb.go | 16 +++++++++------- waku/v2/protocol/filter/pb/waku_filter_v2.proto | 2 +- waku/v2/protocol/filter/server.go | 2 +- waku/v2/protocol/filter/subscriptions_map.go | 2 +- 5 files changed, 21 insertions(+), 14 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index f75658dd0..6c9a213bb 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -160,10 +160,15 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str wf.metrics.RecordError(decodeRPCFailure) return } - - if !wf.subscriptions.Has(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) { + //For now returning failure, this will get addressed with autosharding changes for filter. + if messagePush.PubsubTopic == nil { + logger.Error("empty pubSub Topic", zap.Error(err)) + wf.metrics.RecordError(decodeRPCFailure) + return + } + if !wf.subscriptions.Has(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) { logger.Warn("received messagepush with invalid subscription parameters", - logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", messagePush.PubsubTopic), + logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", *messagePush.PubsubTopic), zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) wf.metrics.RecordError(invalidSubscriptionMessage) return @@ -171,7 +176,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str wf.metrics.RecordMessage() - wf.notify(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage) + wf.notify(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage) logger.Info("received message push") } diff --git a/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go b/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go index b25edbe27..7bc044051 100644 --- a/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go +++ b/waku/v2/protocol/filter/pb/waku_filter_v2.pb.go @@ -218,7 +218,7 @@ type MessagePushV2 struct { unknownFields protoimpl.UnknownFields WakuMessage *pb.WakuMessage `protobuf:"bytes,1,opt,name=waku_message,json=wakuMessage,proto3" json:"waku_message,omitempty"` - PubsubTopic string `protobuf:"bytes,2,opt,name=pubsub_topic,json=pubsubTopic,proto3" json:"pubsub_topic,omitempty"` + PubsubTopic *string `protobuf:"bytes,2,opt,name=pubsub_topic,json=pubsubTopic,proto3,oneof" json:"pubsub_topic,omitempty"` } func (x *MessagePushV2) Reset() { @@ -261,8 +261,8 @@ func (x *MessagePushV2) GetWakuMessage() *pb.WakuMessage { } func (x *MessagePushV2) GetPubsubTopic() string { - if x != nil { - return x.PubsubTopic + if x != nil && x.PubsubTopic != nil { + return *x.PubsubTopic } return "" } @@ -302,14 +302,15 @@ var file_waku_filter_v2_proto_rawDesc = []byte{ 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x44, 0x65, 0x73, 0x63, 0x22, 0x66, 0x0a, 0x0d, 0x4d, 0x65, 0x73, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x44, 0x65, 0x73, 0x63, 0x22, 0x7c, 0x0a, 0x0d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x75, 0x73, 0x68, 0x56, 0x32, 0x12, 0x32, 0x0a, 0x0c, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x52, 0x0b, 0x77, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, + 0x65, 0x52, 0x0b, 0x77, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x26, 0x0a, 0x0c, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, 0x70, 0x69, - 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x70, 0x75, 0x62, 0x73, 0x75, + 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -387,6 +388,7 @@ func file_waku_filter_v2_proto_init() { } } file_waku_filter_v2_proto_msgTypes[0].OneofWrappers = []interface{}{} + file_waku_filter_v2_proto_msgTypes[2].OneofWrappers = []interface{}{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/waku/v2/protocol/filter/pb/waku_filter_v2.proto b/waku/v2/protocol/filter/pb/waku_filter_v2.proto index 5e9fcc5cb..50ad632cc 100644 --- a/waku/v2/protocol/filter/pb/waku_filter_v2.proto +++ b/waku/v2/protocol/filter/pb/waku_filter_v2.proto @@ -32,5 +32,5 @@ message FilterSubscribeResponse { // Protocol identifier: /vac/waku/filter-push/2.0.0-beta1 message MessagePushV2 { WakuMessage waku_message = 1; - string pubsub_topic = 2; + optional string pubsub_topic = 2; } \ No newline at end of file diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 73c0599be..1baa2c963 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -275,7 +275,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e ) pubSubTopic := env.PubsubTopic() messagePush := &pb.MessagePushV2{ - PubsubTopic: pubSubTopic, + PubsubTopic: &pubSubTopic, WakuMessage: env.Message(), } diff --git a/waku/v2/protocol/filter/subscriptions_map.go b/waku/v2/protocol/filter/subscriptions_map.go index 5272fa9b9..167a2bccc 100644 --- a/waku/v2/protocol/filter/subscriptions_map.go +++ b/waku/v2/protocol/filter/subscriptions_map.go @@ -97,7 +97,7 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, topic string, contentTopics ... if !ok { return false } - + //TODO: Handle pubsubTopic as null // Check if pubsub topic exists subscriptions, ok := peerSubscription.subscriptionsPerTopic[topic] if !ok { From 8b1fcdd56d006a382efd420674a4e0a68e7e91f5 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 18 Sep 2023 19:01:31 +0530 Subject: [PATCH 12/16] chore: derive pubSubTopic in filter messagePush --- waku/v2/protocol/filter/client.go | 37 +++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 153c7613f..0f0fe01b5 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -131,15 +131,19 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str wf.metrics.RecordError(decodeRPCFailure) return } + pubSubTopic := messagePush.PubsubTopic //For now returning failure, this will get addressed with autosharding changes for filter. if messagePush.PubsubTopic == nil { - logger.Error("empty pubsub topic") - wf.metrics.RecordError(decodeRPCFailure) - return + *pubSubTopic, err = getPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic) + if err != nil { + logger.Error("could not derive pubSubTopic from contentTopic", zap.Error(err)) + wf.metrics.RecordError(decodeRPCFailure) + return + } } - if !wf.subscriptions.Has(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) { + if !wf.subscriptions.Has(s.Conn().RemotePeer(), *pubSubTopic, messagePush.WakuMessage.ContentTopic) { logger.Warn("received messagepush with invalid subscription parameters", - logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", *messagePush.PubsubTopic), + logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", *pubSubTopic), zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) wf.metrics.RecordError(invalidSubscriptionMessage) return @@ -147,7 +151,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str wf.metrics.RecordMessage() - wf.notify(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage) + wf.notify(s.Conn().RemotePeer(), *pubSubTopic, messagePush.WakuMessage) logger.Info("received message push") } @@ -214,6 +218,16 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr return nil } +func getPubSubTopicFromContentTopic(cTopicString string) (string, error) { + cTopic, err := protocol.StringToContentTopic(cTopicString) + if err != nil { + return "", errors.New(err.Error() + " : " + cTopicString) + } + pTopic := protocol.GetShardFromContentTopic(cTopic, protocol.GenerationZeroShardsCount) + + return pTopic.String(), nil +} + // This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) { pubSubTopicMap := make(map[string][]string) @@ -224,16 +238,15 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st } else { //Parse the content-Topics to figure out shards. for _, cTopicString := range contentFilter.ContentTopics { - cTopic, err := protocol.StringToContentTopic(cTopicString) + pTopicStr, err := getPubSubTopicFromContentTopic(cTopicString) if err != nil { - return nil, errors.New(err.Error() + " : " + cTopicString) + return nil, err } - pTopic := protocol.GetShardFromContentTopic(cTopic, protocol.GenerationZeroShardsCount) - _, ok := pubSubTopicMap[pTopic.String()] + _, ok := pubSubTopicMap[pTopicStr] if !ok { - pubSubTopicMap[pTopic.String()] = make([]string, 1) + pubSubTopicMap[pTopicStr] = make([]string, 1) } - pubSubTopicMap[pTopic.String()] = append(pubSubTopicMap[pTopic.String()], cTopicString) + pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString) } } return pubSubTopicMap, nil From ba856a040a2207c09b41a0bfb992bf60e0fa6d2a Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 19 Sep 2023 11:51:29 +0530 Subject: [PATCH 13/16] chore: address review comments --- waku/v2/protocol/filter/client.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 0f0fe01b5..30a25ff5c 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -131,19 +131,21 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str wf.metrics.RecordError(decodeRPCFailure) return } - pubSubTopic := messagePush.PubsubTopic + pubSubTopic := "" //For now returning failure, this will get addressed with autosharding changes for filter. if messagePush.PubsubTopic == nil { - *pubSubTopic, err = getPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic) + pubSubTopic, err = getPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic) if err != nil { logger.Error("could not derive pubSubTopic from contentTopic", zap.Error(err)) wf.metrics.RecordError(decodeRPCFailure) return } + } else { + pubSubTopic = *messagePush.PubsubTopic } - if !wf.subscriptions.Has(s.Conn().RemotePeer(), *pubSubTopic, messagePush.WakuMessage.ContentTopic) { + if !wf.subscriptions.Has(s.Conn().RemotePeer(), pubSubTopic, messagePush.WakuMessage.ContentTopic) { logger.Warn("received messagepush with invalid subscription parameters", - logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", *pubSubTopic), + logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic), zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) wf.metrics.RecordError(invalidSubscriptionMessage) return @@ -151,7 +153,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str wf.metrics.RecordMessage() - wf.notify(s.Conn().RemotePeer(), *pubSubTopic, messagePush.WakuMessage) + wf.notify(s.Conn().RemotePeer(), pubSubTopic, messagePush.WakuMessage) logger.Info("received message push") } @@ -221,7 +223,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr func getPubSubTopicFromContentTopic(cTopicString string) (string, error) { cTopic, err := protocol.StringToContentTopic(cTopicString) if err != nil { - return "", errors.New(err.Error() + " : " + cTopicString) + return "", fmt.Errorf("%s : %s", err.Error(), cTopicString) } pTopic := protocol.GetShardFromContentTopic(cTopic, protocol.GenerationZeroShardsCount) @@ -233,7 +235,6 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st pubSubTopicMap := make(map[string][]string) if contentFilter.Topic != "" { - pubSubTopicMap[contentFilter.Topic] = make([]string, 0) pubSubTopicMap[contentFilter.Topic] = contentFilter.ContentTopics } else { //Parse the content-Topics to figure out shards. @@ -244,7 +245,7 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st } _, ok := pubSubTopicMap[pTopicStr] if !ok { - pubSubTopicMap[pTopicStr] = make([]string, 1) + pubSubTopicMap[pTopicStr] = []string{} } pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString) } From 1d7dceb796000e9bf68f2d6a2f997ef09a2cce8f Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 19 Sep 2023 12:05:00 +0530 Subject: [PATCH 14/16] chore: docs changes to indicate sharding impact on pubSubTopic --- library/c/README.md | 2 +- library/c/api_filter.go | 4 ++-- waku/v2/protocol/filter/client.go | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/library/c/README.md b/library/c/README.md index f3c1866e2..b5e0fbb78 100644 --- a/library/c/README.md +++ b/library/c/README.md @@ -88,7 +88,7 @@ The criteria to create subscription to a filter full node in JSON Format: Fields: - `contentTopics`: Array of content topics. -- `topic`: Optional pubsub topic. +- `topic`: Optional pubsub topic when using contentTopics as per Autosharding. In case of named or static-sharding, pubSub topic is mandatory. ### `LegacyFilterSubscription` type diff --git a/library/c/api_filter.go b/library/c/api_filter.go index 5c88b4973..62a262881 100644 --- a/library/c/api_filter.go +++ b/library/c/api_filter.go @@ -10,7 +10,7 @@ import "github.com/waku-org/go-waku/library" // filterJSON must contain a JSON with this format: // // { -// "pubsubTopic": "the pubsub topic" // optional +// "pubsubTopic": "the pubsub topic" // optional if using autosharding, mandatory if using static or named sharding. // "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10 // } // @@ -42,7 +42,7 @@ func waku_filter_ping(peerID *C.char, ms C.int, onErrCb C.WakuCallBack) C.int { // criteria // // { -// "pubsubTopic": "the pubsub topic" // optional +// "pubsubTopic": "the pubsub topic" // optional if using autosharding, mandatory if using static or named sharding. // "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10 // } // diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 30a25ff5c..c6c70f097 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -44,9 +44,9 @@ type WakuFilterLightNode struct { } // ContentFilter is used to specify the filter to be applied for a FilterNode. -// Topic means pubSubTopic (which is optional in case of using contentTopics that following Static/Auto sharding) -// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic or a list of contentTopics (which follow static/Auto sharding) -// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic +// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding. +// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding) +// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm type ContentFilter struct { Topic string ContentTopics []string From 6457022304b2471f2a48614a5b0f07c6df1cf69b Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 19 Sep 2023 14:50:42 +0530 Subject: [PATCH 15/16] fix: handle partial errors durin subscribe and return failed content-topic details --- library/c/README.md | 14 ++++++++++---- library/c/api_filter.go | 2 +- library/filter.go | 13 ++++++++++--- waku/v2/protocol/filter/client.go | 15 ++++++++++++--- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/library/c/README.md b/library/c/README.md index b5e0fbb78..227bb54e3 100644 --- a/library/c/README.md +++ b/library/c/README.md @@ -884,15 +884,21 @@ Creates a subscription to a filter full node matching a content filter.. A status code. Refer to the [`Status codes`](#status-codes) section for possible values. -If the function is executed succesfully, `onOkCb` will receive the subscription details. +If the function is executed succesfully, `onOkCb` will receive the following subscription details along with any partial errors. For example: ```json { - "peerID": "....", - "pubsubTopic": "...", - "contentTopics": [...] + "subscriptions" : [ + { + "ID": "", + "peerID": "....", + "pubsubTopic": "...", + "contentTopics": [...] + } + ], + "error" : "subscriptions failed for contentTopics:,.." // Empty if all subscriptions are succesful } ``` diff --git a/library/c/api_filter.go b/library/c/api_filter.go index 62a262881..4a7496990 100644 --- a/library/c/api_filter.go +++ b/library/c/api_filter.go @@ -17,7 +17,7 @@ import "github.com/waku-org/go-waku/library" // peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node // If ms is greater than 0, the subscription must happen before the timeout // (in milliseconds) is reached, or an error will be returned -// It returns a json object containing the peerID to which we are subscribed to and the details of the subscription +// It returns a json object containing the details of the subscriptions along with any errors in case of partial failures // //export waku_filter_subscribe func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, onOkCb C.WakuCallBack, onErrCb C.WakuCallBack) C.int { diff --git a/library/filter.go b/library/filter.go index 7de2f9187..8dd743d7c 100644 --- a/library/filter.go +++ b/library/filter.go @@ -28,6 +28,11 @@ func toContentFilter(filterJSON string) (filter.ContentFilter, error) { }, nil } +type subscribeResult struct { + Subscriptions []*filter.SubscriptionDetails `json:"subscriptions"` + Error string `json:"error,omitEmpty"` +} + // FilterSubscribe is used to create a subscription to a filter node to receive messages func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) { cf, err := toContentFilter(filterJSON) @@ -61,7 +66,7 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) { } subscriptions, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...) - if err != nil { + if err != nil && subscriptions == nil { return "", err } @@ -72,8 +77,10 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) { } }(subscriptionDetails) } - - return marshalJSON(subscriptions) + var subResult subscribeResult + subResult.Subscriptions = subscriptions + subResult.Error = err.Error() + return marshalJSON(subResult) } // FilterPing is used to determine if a peer has an active subscription diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index c6c70f097..d89e4c0a8 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "net/http" + "strings" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -256,6 +257,7 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st // Subscribe setups a subscription to receive messages that match a specific content filter // If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer. // This may change if Filterv2 protocol is updated to handle such a scenario in a single request. +// Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics. func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) ([]*SubscriptionDetails, error) { wf.RLock() defer wf.RUnlock() @@ -291,6 +293,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont if err != nil { return nil, err } + failedContentTopics := []string{} subscriptions := make([]*SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { var cFilter ContentFilter @@ -299,12 +302,18 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont //TO OPTIMIZE: Should we parallelize these, if so till how many batches? err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) if err != nil { - return nil, err + wf.log.Error("Failed to subscribe for conentTopics ", + zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) } subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter.Topic, cFilter.ContentTopics)) } - return subscriptions, nil - + if len(failedContentTopics) > 0 { + return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ",")) + } else { + return subscriptions, nil + } } // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol From 36ca48d74530772f0c8cc78a47884974c045abdf Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 19 Sep 2023 18:25:35 +0530 Subject: [PATCH 16/16] chore: fix json lint error --- library/filter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/library/filter.go b/library/filter.go index 8dd743d7c..f450917ca 100644 --- a/library/filter.go +++ b/library/filter.go @@ -30,7 +30,7 @@ func toContentFilter(filterJSON string) (filter.ContentFilter, error) { type subscribeResult struct { Subscriptions []*filter.SubscriptionDetails `json:"subscriptions"` - Error string `json:"error,omitEmpty"` + Error string `json:"error,omitempty"` } // FilterSubscribe is used to create a subscription to a filter node to receive messages