Skip to content

Commit

Permalink
feat: replace DefaultPubsubTopic by Shard 32
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Nov 1, 2023
1 parent eb437e9 commit 96b6588
Show file tree
Hide file tree
Showing 76 changed files with 2,597 additions and 1,408 deletions.
7 changes: 4 additions & 3 deletions cmd/ping-community/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/identity/alias"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/transport"
wakuextn "github.com/status-im/status-go/services/wakuext"
)

Expand All @@ -47,8 +48,8 @@ var (
seedPhrase = flag.String("seed-phrase", "", "Seed phrase")
version = flag.Bool("version", false, "Print version and dump configuration")
communityID = flag.String("community-id", "", "The id of the community")
shardCluster = flag.Int("shard-cluster", common.UndefinedShardValue, "The shard cluster in which the of the community is published")
shardIndex = flag.Int("shard-index", common.UndefinedShardValue, "The shard index in which the community is published")
shardCluster = flag.Int("shard-cluster", transport.UndefinedShardValue, "The shard cluster in which the of the community is published")
shardIndex = flag.Int("shard-index", transport.UndefinedShardValue, "The shard index in which the community is published")
chatID = flag.String("chat-id", "", "The id of the chat")

dataDir = flag.String("dir", getDefaultDataDir(), "Directory used by node to store data")
Expand Down Expand Up @@ -148,7 +149,7 @@ func main() {
messenger := wakuextservice.Messenger()

var shard *common.Shard = nil
if shardCluster != nil && shardIndex != nil && *shardCluster != common.UndefinedShardValue && *shardIndex != common.UndefinedShardValue {
if shardCluster != nil && shardIndex != nil && *shardCluster != transport.UndefinedShardValue && *shardIndex != transport.UndefinedShardValue {
shard = &common.Shard{
Cluster: uint16(*shardCluster),
Index: uint16(*shardIndex),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ require (
github.com/mutecomm/go-sqlcipher/v4 v4.4.2
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/waku-org/go-waku v0.8.1-0.20230930175749-dcc828749f67
github.com/waku-org/go-waku v0.8.1-0.20231101155333-3bdcd18ab2e2
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2097,8 +2097,8 @@ github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZF
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 h1:0e1h+p84yBp0IN7AqgbZlV7lgFBjm214lgSOE7CeJmE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7/go.mod h1:pFvOZ9YTFsW0o5zJW7a0B5tr1owAijRWJctXJ2toL04=
github.com/waku-org/go-waku v0.8.1-0.20230930175749-dcc828749f67 h1:EL0KljfCIFPXbY1IfT0JjVIjJekuF951ys1WL2WnWyM=
github.com/waku-org/go-waku v0.8.1-0.20230930175749-dcc828749f67/go.mod h1:MnMLFtym7XUt+GNN4zTkjm5NJCsm7TERLWVPOV/Ct6w=
github.com/waku-org/go-waku v0.8.1-0.20231101155333-3bdcd18ab2e2 h1:d+4K0HKAbiHpbnKbFz9W8LiVYmzNtvRoUUPM4lVTLnU=
github.com/waku-org/go-waku v0.8.1-0.20231101155333-3bdcd18ab2e2/go.mod h1:hem2hnXK5BdabxwJULszM0Rh1Yj+gD9IxjwLCGPPaxs=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20230916173259-d284a3d8f2fd h1:cu7CsUo7BK6ac/v193RIaqAzUcmpa6MNY4xYW9AenQI=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20230916173259-d284a3d8f2fd/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
Expand Down
40 changes: 21 additions & 19 deletions node/status_node_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"time"

"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/server"
"github.com/status-im/status-go/signal"
"github.com/status-im/status-go/transactions"
Expand Down Expand Up @@ -308,25 +309,26 @@ func (b *StatusNode) wakuService(wakuCfg *params.WakuConfig, clusterCfg *params.
func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig, telemetryServerURL string) (*wakuv2.Waku, error) {
if b.wakuV2Srvc == nil {
cfg := &wakuv2.Config{
MaxMessageSize: wakucommon.DefaultMaxMessageSize,
Host: nodeConfig.WakuV2Config.Host,
Port: nodeConfig.WakuV2Config.Port,
LightClient: nodeConfig.WakuV2Config.LightClient,
KeepAliveInterval: nodeConfig.WakuV2Config.KeepAliveInterval,
Rendezvous: nodeConfig.Rendezvous,
WakuNodes: nodeConfig.ClusterConfig.WakuNodes,
PeerExchange: nodeConfig.WakuV2Config.PeerExchange,
EnableFilterFullNode: nodeConfig.WakuV2Config.EnableFilterFullNode,
EnableStore: nodeConfig.WakuV2Config.EnableStore,
StoreCapacity: nodeConfig.WakuV2Config.StoreCapacity,
StoreSeconds: nodeConfig.WakuV2Config.StoreSeconds,
DiscoveryLimit: nodeConfig.WakuV2Config.DiscoveryLimit,
DiscV5BootstrapNodes: nodeConfig.ClusterConfig.DiscV5BootstrapNodes,
Nameserver: nodeConfig.WakuV2Config.Nameserver,
EnableDiscV5: nodeConfig.WakuV2Config.EnableDiscV5,
UDPPort: nodeConfig.WakuV2Config.UDPPort,
AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate,
TelemetryServerURL: telemetryServerURL,
MaxMessageSize: wakucommon.DefaultMaxMessageSize,
Host: nodeConfig.WakuV2Config.Host,
Port: nodeConfig.WakuV2Config.Port,
LightClient: nodeConfig.WakuV2Config.LightClient,
KeepAliveInterval: nodeConfig.WakuV2Config.KeepAliveInterval,
Rendezvous: nodeConfig.Rendezvous,
WakuNodes: nodeConfig.ClusterConfig.WakuNodes,
PeerExchange: nodeConfig.WakuV2Config.PeerExchange,
EnableFilterFullNode: nodeConfig.WakuV2Config.EnableFilterFullNode,
EnableStore: nodeConfig.WakuV2Config.EnableStore,
StoreCapacity: nodeConfig.WakuV2Config.StoreCapacity,
StoreSeconds: nodeConfig.WakuV2Config.StoreSeconds,
DiscoveryLimit: nodeConfig.WakuV2Config.DiscoveryLimit,
DiscV5BootstrapNodes: nodeConfig.ClusterConfig.DiscV5BootstrapNodes,
Nameserver: nodeConfig.WakuV2Config.Nameserver,
EnableDiscV5: nodeConfig.WakuV2Config.EnableDiscV5,
UDPPort: nodeConfig.WakuV2Config.UDPPort,
AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate,
DefaultShardPubsubTopic: transport.DefaultShardPubsubTopic(),
TelemetryServerURL: telemetryServerURL,
}

if nodeConfig.WakuV2Config.MaxMessageSize > 0 {
Expand Down
4 changes: 1 addition & 3 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
datasyncproto "github.com/vacp2p/mvds/protobuf"
"go.uber.org/zap"

"github.com/waku-org/go-waku/waku/v2/protocol/relay"

"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/datasync"
Expand Down Expand Up @@ -457,7 +455,7 @@ func (s *MessageSender) sendPrivate(

messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID)
rawMessage.PubsubTopic = relay.DefaultWakuTopic // TODO: determine which pubsub topic should be used for 1:1 messages
rawMessage.PubsubTopic = transport.DefaultShardPubsubTopic() // TODO: determine which pubsub topic should be used for 1:1 messages

if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
Expand Down
19 changes: 0 additions & 19 deletions protocol/common/shard.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package common

import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"

"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/transport"
)

const MainStatusShardCluster = 16
const NonProtectedShardIndex = 64
const UndefinedShardValue = 0

type Shard struct {
Cluster uint16 `json:"cluster"`
Index uint16 `json:"index"`
Expand Down Expand Up @@ -48,16 +42,3 @@ func (s *Shard) Protobuffer() *protobuf.Shard {
Index: int32(s.Index),
}
}

func DefaultNonProtectedPubsubTopic(shard *Shard) string {
// TODO: remove the condition once DefaultWakuTopic usage
// is removed
if shard != nil {
return transport.GetPubsubTopic(&transport.Shard{
Cluster: MainStatusShardCluster,
Index: NonProtectedShardIndex,
})
}

return relay.DefaultWakuTopic
}
2 changes: 1 addition & 1 deletion protocol/communities/community.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ func (o *Community) DefaultFilters() []transport.FiltersToInitialize {

return []transport.FiltersToInitialize{
{ChatID: cID, PubsubTopic: communityPubsubTopic},
{ChatID: uncompressedPubKey, PubsubTopic: common.DefaultNonProtectedPubsubTopic(o.Shard())},
{ChatID: uncompressedPubKey, PubsubTopic: transport.DefaultNonProtectedPubsubTopic(o.Shard().TransportShard())},
{ChatID: uncompressedPubKey, PubsubTopic: communityPubsubTopic},
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
Expand Down
4 changes: 1 addition & 3 deletions protocol/communities/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/golang/protobuf/proto"

"github.com/waku-org/go-waku/waku/v2/protocol/relay"

"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down Expand Up @@ -3336,7 +3334,7 @@ func (m *Manager) GetPubsubTopic(communityID string) (string, error) {
}

if community == nil {
return relay.DefaultWakuTopic, nil
return transport.DefaultShardPubsubTopic(), nil
}

return transport.GetPubsubTopic(community.Shard().TransportShard()), nil
Expand Down
4 changes: 1 addition & 3 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"go.uber.org/zap"
"golang.org/x/time/rate"

"github.com/waku-org/go-waku/waku/v2/protocol/relay"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/event"
Expand Down Expand Up @@ -1749,7 +1747,7 @@ func (m *Messenger) Init() error {

switch chat.ChatType {
case ChatTypePublic, ChatTypeProfile:
filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: relay.DefaultWakuTopic})
filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: transport.DefaultShardPubsubTopic()})
case ChatTypeCommunityChat:
communityID, err := hexutil.Decode(chat.CommunityID)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun
CommunityID: community.ID(),
SkipProtocolLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN,
PubsubTopic: common.DefaultNonProtectedPubsubTopic(community.Shard()),
PubsubTopic: transport.DefaultNonProtectedPubsubTopic(community.Shard().TransportShard()),
}

_, err = m.sender.SendCommunityMessage(context.Background(), rawMessage)
Expand Down Expand Up @@ -1465,7 +1465,7 @@ func (m *Messenger) CancelRequestToJoinCommunity(ctx context.Context, request *r
CommunityID: community.ID(),
SkipProtocolLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_CANCEL_REQUEST_TO_JOIN,
PubsubTopic: common.DefaultNonProtectedPubsubTopic(community.Shard()),
PubsubTopic: transport.DefaultNonProtectedPubsubTopic(community.Shard().TransportShard()),
}
_, err = m.sender.SendCommunityMessage(context.Background(), rawMessage)

Expand Down Expand Up @@ -1573,7 +1573,7 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ
Sender: community.PrivateKey(),
SkipProtocolLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN_RESPONSE,
PubsubTopic: common.DefaultNonProtectedPubsubTopic(community.Shard()),
PubsubTopic: transport.DefaultNonProtectedPubsubTopic(community.Shard().TransportShard()),
}

_, err = m.sender.SendPrivate(context.Background(), pk, rawMessage)
Expand Down
4 changes: 1 addition & 3 deletions protocol/messenger_contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/golang/protobuf/proto"
"go.uber.org/zap"

"github.com/waku-org/go-waku/waku/v2/protocol/relay"

"github.com/status-im/status-go/deprecation"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
Expand Down Expand Up @@ -470,7 +468,7 @@ func (m *Messenger) addContact(ctx context.Context, pubKey, ensName, nickname, d
if !deprecation.ChatProfileDeprecated {
response.AddChat(profileChat)

_, err = m.transport.InitFilters([]transport.FiltersToInitialize{{ChatID: profileChat.ID, PubsubTopic: relay.DefaultWakuTopic}}, []*ecdsa.PublicKey{publicKey})
_, err = m.transport.InitFilters([]transport.FiltersToInitialize{{ChatID: profileChat.ID, PubsubTopic: transport.DefaultShardPubsubTopic()}}, []*ecdsa.PublicKey{publicKey})
if err != nil {
return nil, err
}
Expand Down
4 changes: 1 addition & 3 deletions protocol/messenger_mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/waku-org/go-waku/waku/v2/protocol/relay"

"github.com/status-im/status-go/connection"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
Expand Down Expand Up @@ -257,7 +255,7 @@ func (m *Messenger) syncBackup() error {

to := m.calculateMailserverTo()
from := uint32(m.getTimesource().GetCurrentTime()/1000) - oneMonthInSeconds
batch := MailserverBatch{From: from, To: to, PubsubTopic: relay.DefaultWakuTopic, Topics: []types.TopicType{filter.ContentTopic}}
batch := MailserverBatch{From: from, To: to, PubsubTopic: transport.DefaultShardPubsubTopic(), Topics: []types.TopicType{filter.ContentTopic}}
err := m.processMailserverBatch(batch)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion protocol/requests/set_community_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/transport"
)

type SetCommunityShard struct {
Expand All @@ -19,7 +20,7 @@ func (s *SetCommunityShard) Validate() error {
}
if s.Shard != nil {
// TODO: for now only MainStatusShard(16) is accepted
if s.Shard.Cluster != common.MainStatusShardCluster {
if s.Shard.Cluster != transport.MainStatusShardCluster {
return errors.New("invalid shard cluster")
}
if s.Shard.Index > 1023 {
Expand Down
16 changes: 7 additions & 9 deletions protocol/transport/filters_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/waku-org/go-waku/waku/v2/protocol/relay"

"github.com/status-im/status-go/eth-node/types"
)

Expand Down Expand Up @@ -163,8 +161,8 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com

communityPubsubTopic := GetPubsubTopic(cf.Shard)
topics := []string{communityPubsubTopic}
if communityPubsubTopic != relay.DefaultWakuTopic {
topics = append(topics, relay.DefaultWakuTopic)
if communityPubsubTopic != DefaultShardPubsubTopic() {
topics = append(topics, DefaultShardPubsubTopic())
}

// TODO: requests to join / cancels are currently being sent into the default waku topic.
Expand Down Expand Up @@ -387,7 +385,7 @@ func (f *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecds
return f.filters[chatID], nil
}

pubsubTopic := relay.DefaultWakuTopic
pubsubTopic := DefaultShardPubsubTopic()

// We set up a filter so we can publish,
// but we discard envelopes if listen is false.
Expand Down Expand Up @@ -428,7 +426,7 @@ func (f *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *e
return f.filters[chatID], nil
}

pubsubTopic := relay.DefaultWakuTopic
pubsubTopic := DefaultShardPubsubTopic()

// We set up a filter so we can publish,
// but we discard envelopes if listen is false.
Expand Down Expand Up @@ -467,7 +465,7 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter,
return f.filters[chatID], nil
}

pubsubTopic := relay.DefaultWakuTopic
pubsubTopic := DefaultShardPubsubTopic()
keyString := hex.EncodeToString(secret.Key)
filter, err := f.addSymmetric(keyString, pubsubTopic)
if err != nil {
Expand Down Expand Up @@ -521,7 +519,7 @@ func (f *FiltersManager) LoadDiscovery() ([]*Filter, error) {
personalDiscoveryChat := &Filter{
ChatID: personalDiscoveryTopic,
Identity: identityStr,
PubsubTopic: relay.DefaultWakuTopic,
PubsubTopic: DefaultShardPubsubTopic(),
Discovery: true,
Listen: true,
OneToOne: true,
Expand Down Expand Up @@ -592,7 +590,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
return f.filters[chatID], nil
}

pubsubTopic := relay.DefaultWakuTopic
pubsubTopic := DefaultShardPubsubTopic()

contactCodeFilter, err := f.addSymmetric(chatID, pubsubTopic)
if err != nil {
Expand Down
25 changes: 22 additions & 3 deletions protocol/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"go.uber.org/zap"

"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"

"github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/connection"
Expand Down Expand Up @@ -192,7 +191,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil
}

func (t *Transport) JoinPublic(chatID string) (*Filter, error) {
return t.filters.LoadPublic(chatID, relay.DefaultWakuTopic)
return t.filters.LoadPublic(chatID, DefaultShardPubsubTopic())
}

func (t *Transport) LeavePublic(chatID string) error {
Expand Down Expand Up @@ -683,5 +682,25 @@ func GetPubsubTopic(shard *Shard) string {
return protocol.NewStaticShardingPubsubTopic(shard.Cluster, shard.Index).String()
}

return relay.DefaultWakuTopic
return DefaultShardPubsubTopic()
}

func DefaultNonProtectedPubsubTopic(shard *Shard) string {
if shard != nil {
return GetPubsubTopic(&Shard{
Cluster: MainStatusShardCluster,
Index: NonProtectedShardIndex,
})
}

return DefaultShardPubsubTopic()
}

const MainStatusShardCluster = 16
const DefaultShardIndex = 32
const NonProtectedShardIndex = 64
const UndefinedShardValue = 0

func DefaultShardPubsubTopic() string {
return protocol.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String()
}
Loading

0 comments on commit 96b6588

Please sign in to comment.