Skip to content

Commit

Permalink
feat: remove named topic
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh-98 committed Oct 27, 2023
1 parent 0868f5d commit e92307f
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 176 deletions.
8 changes: 0 additions & 8 deletions library/c/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,6 @@ func waku_content_topic(applicationName *C.char, applicationVersion C.uint, cont
return execOkCB(onOkCb, contentTopic.String())
}

// Create a pubsub topic string according to RFC 23
//
//export waku_pubsub_topic
func waku_pubsub_topic(name *C.char, encoding *C.char, onOkCb C.WakuCallBack) C.int {
topic := library.PubsubTopic(C.GoString(name), C.GoString(encoding))
return execOkCB(onOkCb, topic)
}

// Get the default pubsub topic used in waku2: /waku/2/default-waku/proto
//
//export waku_default_pubsub_topic
Expand Down
7 changes: 1 addition & 6 deletions library/mobile/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,9 @@ func ContentTopic(applicationName string, applicationVersion int, contentTopicNa
return contentTopic.String()
}

// PubsubTopic creates a pubsub topic string according to RFC 23
func PubsubTopic(name string, encoding string) string {
return protocol.NewNamedShardingPubsubTopic(name + "/" + encoding).String()
}

// DefaultPubsubTopic returns the default pubsub topic used in waku2: /waku/2/default-waku/proto
func DefaultPubsubTopic() string {
return protocol.DefaultPubsubTopic().String()
return protocol.DefaultPubsubTopic{}.String()
}

// Peers retrieves the list of peers known by the waku node
Expand Down
9 changes: 2 additions & 7 deletions library/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,19 +340,14 @@ func ContentTopic(applicationName string, applicationVersion int, contentTopicNa
return contentTopic.String()
}

// PubsubTopic creates a pubsub topic string according to RFC 23
func PubsubTopic(name string, encoding string) string {
return protocol.NewNamedShardingPubsubTopic(name + "/" + encoding).String()
}

// DefaultPubsubTopic returns the default pubsub topic used in waku2: /waku/2/default-waku/proto
func DefaultPubsubTopic() string {
return protocol.DefaultPubsubTopic().String()
return protocol.DefaultPubsubTopic{}.String()
}

func getTopic(topic string) string {
if topic == "" {
return protocol.DefaultPubsubTopic().String()
return protocol.DefaultPubsubTopic{}.String()
}
return topic
}
Expand Down
2 changes: 1 addition & 1 deletion library/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func RelayEnoughPeers(topic string) (bool, error) {
return false, errWakuNodeNotReady
}

topicToCheck := protocol.DefaultPubsubTopic().String()
topicToCheck := protocol.DefaultPubsubTopic{}.String()
if topic != "" {
topicToCheck = topic
}
Expand Down
1 change: 0 additions & 1 deletion waku/v2/protocol/content_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
)

// DefaultContentTopic is the default content topic used in Waku network if no content topic is specified.
const DefaultContentTopic = "/waku/2/default-content/proto"

var ErrInvalidFormat = errors.New("invalid content topic format")
Expand Down
14 changes: 7 additions & 7 deletions waku/v2/protocol/enr/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,22 @@ func ContainsShard(record *enr.Record, cluster uint16, index uint16) bool {
return rs.Contains(cluster, index)
}

func ContainsShardWithNsTopic(record *enr.Record, topic protocol.NamespacedPubsubTopic) bool {
if topic.Kind() != protocol.StaticSharding {
func ContainsShardWithWakuTopic(record *enr.Record, topic protocol.WakuPubSubTopic) bool {
if shardTopic, err := protocol.ToShardPubsubTopic(topic); err != nil {
return false
} else {
return ContainsShard(record, shardTopic.Cluster(), shardTopic.Shard())
}
shardTopic := topic.(protocol.StaticShardingPubsubTopic)
return ContainsShard(record, shardTopic.Cluster(), shardTopic.Shard())
}

func ContainsRelayShard(record *enr.Record, topic protocol.StaticShardingPubsubTopic) bool {
return ContainsShardWithNsTopic(record, topic)
return ContainsShardWithWakuTopic(record, topic)
}

func ContainsShardTopic(record *enr.Record, topic string) bool {
shardTopic, err := protocol.ToShardedPubsubTopic(topic)
shardTopic, err := protocol.ToWakuPubsubTopic(topic)
if err != nil {
return false
}
return ContainsShardWithNsTopic(record, shardTopic)
return ContainsShardWithWakuTopic(record, shardTopic)
}
132 changes: 32 additions & 100 deletions waku/v2/protocol/pubsub_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,104 +7,43 @@ import (
"strings"
)

// Waku2PubsubTopicPrefix is the expected prefix to be used for pubsub topics
const Waku2PubsubTopicPrefix = "/waku/2"

// StaticShardingPubsubTopicPrefix is the expected prefix to be used for static sharding pubsub topics
const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs"

// ErrInvalidStructure indicates that the pubsub topic is malformed
var ErrInvalidStructure = errors.New("invalid topic structure")

// ErrInvalidTopicPrefix indicates that the pubsub topic is missing the prefix /waku/2
var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix)
var ErrMissingTopicName = errors.New("missing topic-name")
var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix)
var ErrMissingClusterIndex = errors.New("missing shard_cluster_index")
var ErrMissingShardNumber = errors.New("missing shard_number")

// ErrInvalidNumberFormat indicates that a number exceeds the allowed range
var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed")

// NamespacedPubsubTopicKind used to represent kind of NamespacedPubsubTopicKind
type NamespacedPubsubTopicKind int

const (
StaticSharding NamespacedPubsubTopicKind = iota
NamedSharding
)

// NamespacedPubsubTopic is an interface for namespace based pubSub topic
type NamespacedPubsubTopic interface {
type WakuPubSubTopic interface {
String() string
Kind() NamespacedPubsubTopicKind
Equal(NamespacedPubsubTopic) bool
}

// NamedShardingPubsubTopic is object for a NamedSharding type pubSub topic
type NamedShardingPubsubTopic struct {
NamespacedPubsubTopic
kind NamespacedPubsubTopicKind
name string
}

// NewNamedShardingPubsubTopic creates a new NamedShardingPubSubTopic
func NewNamedShardingPubsubTopic(name string) NamespacedPubsubTopic {
return NamedShardingPubsubTopic{
kind: NamedSharding,
name: name,
}
}

// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded
func (n NamedShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return n.kind
}

// Name is the name of the NamedSharded pubsub topic.
func (n NamedShardingPubsubTopic) Name() string {
return n.name
}
const defaultPubsubTopic = "/waku/2/default-waku/proto"

// Equal compares NamedShardingPubsubTopic
func (n NamedShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool {
return n.String() == t2.String()
}
type DefaultPubsubTopic struct{}

// String formats NamedShardingPubsubTopic to RFC 23 specific string format for pubsub topic.
func (n NamedShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%s", Waku2PubsubTopicPrefix, n.name)
func (DefaultPubsubTopic) String() string {
return defaultPubsubTopic
}

// Parse parses a topic string into a NamedShardingPubsubTopic
func (n *NamedShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, Waku2PubsubTopicPrefix) {
return ErrInvalidTopicPrefix
}
// StaticShardingPubsubTopicPrefix is the expected prefix to be used for static sharding pubsub topics
const StaticShardingPubsubTopicPrefix = "/waku/2/rs"

topicName := topic[8:]
if len(topicName) == 0 {
return ErrMissingTopicName
}
// waku pubsub topic errors
var ErrNotWakuPubsubTopic = errors.New("not a waku pubsub topic")

n.kind = NamedSharding
n.name = topicName
// shard pubsub topic errors
var ErrNotShardPubsubTopic = errors.New("not a shard pubsub topic")
var ErrInvalidStructure = errors.New("invalid topic structure")
var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix)
var ErrMissingClusterIndex = errors.New("missing shard_cluster_index")
var ErrMissingShardNumber = errors.New("missing shard_number")

return nil
}
// ErrInvalidNumberFormat indicates that a number exceeds the allowed range
var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed")

// StaticShardingPubsubTopic describes a pubSub topic as per StaticSharding
type StaticShardingPubsubTopic struct {
NamespacedPubsubTopic
kind NamespacedPubsubTopicKind
cluster uint16
shard uint16
}

// NewStaticShardingPubsubTopic creates a new pubSub topic
func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) StaticShardingPubsubTopic {
return StaticShardingPubsubTopic{
kind: StaticSharding,
cluster: cluster,
shard: shard,
}
Expand All @@ -120,13 +59,8 @@ func (s StaticShardingPubsubTopic) Shard() uint16 {
return s.shard
}

// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded
func (s StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return s.kind
}

// Equal compares StaticShardingPubsubTopic
func (s StaticShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool {
func (s StaticShardingPubsubTopic) Equal(t2 StaticShardingPubsubTopic) bool {
return s.String() == t2.String()
}

Expand Down Expand Up @@ -168,31 +102,29 @@ func (s *StaticShardingPubsubTopic) Parse(topic string) error {

s.shard = uint16(shardInt)
s.cluster = uint16(clusterInt)
s.kind = StaticSharding

return nil
}

// ToShardedPubsubTopic takes a pubSub topic string and creates a NamespacedPubsubTopic object.
func ToShardedPubsubTopic(topic string) (NamespacedPubsubTopic, error) {
func ToShardPubsubTopic(topic WakuPubSubTopic) (StaticShardingPubsubTopic, error) {
if topic.String() != defaultPubsubTopic {
return topic.(StaticShardingPubsubTopic), nil
}
return StaticShardingPubsubTopic{}, ErrNotShardPubsubTopic
}

// ToWakuPubsubTopic takes a pubSub topic string and creates a WakuPubsubTopic object.
func ToWakuPubsubTopic(topic string) (WakuPubSubTopic, error) {
if topic == defaultPubsubTopic {
return DefaultPubsubTopic{}, nil
}
if strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
s := StaticShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
return s, err
}
return s, nil
}

s := NamedShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
}
return s, nil
}

// DefaultPubsubTopic is the default pubSub topic used in waku
func DefaultPubsubTopic() NamespacedPubsubTopic {
return NewNamedShardingPubsubTopic("default-waku/proto")
return nil, ErrNotWakuPubsubTopic
}
2 changes: 1 addition & 1 deletion waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")

// DefaultWakuTopic is the default pubsub topic used across all Waku protocols
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String()
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic{}.String()

// WakuRelay is the implementation of the Waku Relay protocol
type WakuRelay struct {
Expand Down
18 changes: 8 additions & 10 deletions waku/v2/protocol/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func NewRelayShards(cluster uint16, indices ...uint16) (RelayShards, error) {
return RelayShards{Cluster: cluster, Indices: indices}, nil
}

func (rs RelayShards) Topics() []NamespacedPubsubTopic {
var result []NamespacedPubsubTopic
func (rs RelayShards) Topics() []WakuPubSubTopic {
var result []WakuPubSubTopic
for _, i := range rs.Indices {
result = append(result, NewStaticShardingPubsubTopic(rs.Cluster, i))
}
Expand All @@ -72,14 +72,12 @@ func (rs RelayShards) Contains(cluster uint16, index uint16) bool {
return found
}

func (rs RelayShards) ContainsNamespacedTopic(topic NamespacedPubsubTopic) bool {
if topic.Kind() != StaticSharding {
func (rs RelayShards) ContainsShardPubsubTopic(topic WakuPubSubTopic) bool {
if shardedTopic, err := ToShardPubsubTopic(topic); err != nil {
return false
} else {
return rs.Contains(shardedTopic.Cluster(), shardedTopic.Shard())
}

shardedTopic := topic.(StaticShardingPubsubTopic)

return rs.Contains(shardedTopic.Cluster(), shardedTopic.Shard())
}

func TopicsToRelayShards(topic ...string) ([]RelayShards, error) {
Expand Down Expand Up @@ -123,11 +121,11 @@ func TopicsToRelayShards(topic ...string) ([]RelayShards, error) {
}

func (rs RelayShards) ContainsTopic(topic string) bool {
nsTopic, err := ToShardedPubsubTopic(topic)
wTopic, err := ToWakuPubsubTopic(topic)
if err != nil {
return false
}
return rs.ContainsNamespacedTopic(nsTopic)
return rs.ContainsShardPubsubTopic(wTopic)
}

func (rs RelayShards) IndicesList() ([]byte, error) {
Expand Down
Loading

0 comments on commit e92307f

Please sign in to comment.