Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: namespaced pubsub topics #485

Merged
merged 1 commit into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mobile/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func ContentTopic(applicationName string, applicationVersion int, contentTopicNa
}

func PubsubTopic(name string, encoding string) string {
return protocol.NewPubsubTopic(name, encoding).String()
return protocol.NewNamedShardingPubsubTopic(name + "/" + encoding).String()
}

func DefaultPubsubTopic() string {
Expand Down
171 changes: 150 additions & 21 deletions waku/v2/protocol/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,22 @@ package protocol
import (
"errors"
"fmt"
"runtime/debug"
"strconv"
"strings"
)

const Waku2PubsubTopicPrefix = "/waku/2"
const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs"

var ErrInvalidFormat = errors.New("invalid format")
var ErrInvalidStructure = errors.New("invalid topic structure")
var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix)
var ErrMissingTopicName = errors.New("missing topic-name")
var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix)
var ErrMissingClusterIndex = errors.New("missing shard_cluster_index")
var ErrMissingShardNumber = errors.New("missing shard_number")
var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed")

type ContentTopic struct {
ApplicationName string
Expand Down Expand Up @@ -54,38 +65,156 @@ func StringToContentTopic(s string) (ContentTopic, error) {
}, nil
}

type PubsubTopic struct {
Name string
Encoding string
type NamespacedPubsubTopicKind int

const (
StaticSharding NamespacedPubsubTopicKind = iota
NamedSharding
)

type ShardedPubsubTopic interface {
String() string
Kind() NamespacedPubsubTopicKind
Equal(ShardedPubsubTopic) bool
}

type NamedShardingPubsubTopic struct {
ShardedPubsubTopic
kind NamespacedPubsubTopicKind
name string
}

func NewNamedShardingPubsubTopic(name string) ShardedPubsubTopic {
return NamedShardingPubsubTopic{
kind: NamedSharding,
name: name,
}
}

func (n NamedShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return n.kind
}

func (n NamedShardingPubsubTopic) Name() string {
return n.name
}

func (t PubsubTopic) String() string {
return fmt.Sprintf("/waku/2/%s/%s", t.Name, t.Encoding)
func (s NamedShardingPubsubTopic) Equal(t2 ShardedPubsubTopic) bool {
return s.String() == t2.String()
}

func DefaultPubsubTopic() PubsubTopic {
return NewPubsubTopic("default-waku", "proto")
func (n NamedShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%s", Waku2PubsubTopicPrefix, n.name)
}

func NewPubsubTopic(name string, encoding string) PubsubTopic {
return PubsubTopic{
Name: name,
Encoding: encoding,
func (s *NamedShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, Waku2PubsubTopicPrefix) {
return ErrInvalidTopicPrefix
}

topicName := topic[8:]
if len(topicName) == 0 {
return ErrMissingTopicName
}

s.kind = NamedSharding
s.name = topicName

return nil
}

func (t PubsubTopic) Equal(t2 PubsubTopic) bool {
return t.Name == t2.Name && t.Encoding == t2.Encoding
type StaticShardingPubsubTopic struct {
ShardedPubsubTopic
kind NamespacedPubsubTopicKind
cluster uint16
shard uint16
}

func StringToPubsubTopic(s string) (PubsubTopic, error) {
p := strings.Split(s, "/")
if len(p) != 5 || p[0] != "" || p[1] != "waku" || p[2] != "2" || p[3] == "" || p[4] == "" {
return PubsubTopic{}, ErrInvalidFormat
func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) ShardedPubsubTopic {
return StaticShardingPubsubTopic{
kind: StaticSharding,
cluster: cluster,
shard: shard,
}
}

return PubsubTopic{
Name: p[3],
Encoding: p[4],
}, nil
func (n StaticShardingPubsubTopic) Cluster() uint16 {
return n.cluster
}

func (n StaticShardingPubsubTopic) Shard() uint16 {
return n.shard
}

func (n StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return n.kind
}

func (s StaticShardingPubsubTopic) Equal(t2 ShardedPubsubTopic) bool {
return s.String() == t2.String()
}

func (n StaticShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, n.cluster, n.shard)
}

func (s *StaticShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
fmt.Println(topic, StaticShardingPubsubTopicPrefix)
return ErrInvalidShardedTopicPrefix
}

parts := strings.Split(topic[11:], "/")
if len(parts) != 2 {
return ErrInvalidStructure
}

clusterPart := parts[0]
if len(clusterPart) == 0 {
return ErrMissingClusterIndex
}

clusterInt, err := strconv.ParseUint(clusterPart, 10, 16)
if err != nil {
return ErrInvalidNumberFormat
}

shardPart := parts[1]
if len(shardPart) == 0 {
return ErrMissingShardNumber
}

shardInt, err := strconv.ParseUint(shardPart, 10, 16)
if err != nil {
return ErrInvalidNumberFormat
}

s.shard = uint16(shardInt)
s.cluster = uint16(clusterInt)
s.kind = StaticSharding

return nil
}

func ToShardedPubsubTopic(topic string) (ShardedPubsubTopic, error) {
if strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
s := StaticShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
}
return s, nil
} else {
debug.PrintStack()
s := NamedShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
}
return s, nil
}
}

func DefaultPubsubTopic() ShardedPubsubTopic {
return NewNamedShardingPubsubTopic("default-waku/proto")
}
50 changes: 32 additions & 18 deletions waku/v2/protocol/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,41 @@ func TestContentTopic(t *testing.T) {
require.False(t, ct.Equal(ct3))
}

func TestTopic(t *testing.T) {
topic := NewPubsubTopic("test", "proto")
require.Equal(t, topic.String(), "/waku/2/test/proto")
func TestNsPubsubTopic(t *testing.T) {
ns1 := NewNamedShardingPubsubTopic("waku-dev")
require.Equal(t, "/waku/2/waku-dev", ns1.String())

_, err := StringToPubsubTopic("/waku/-1/a/b")
require.Error(t, ErrInvalidFormat, err)

_, err = StringToPubsubTopic("waku/2/a/b")
require.Error(t, ErrInvalidFormat, err)
ns2 := NewStaticShardingPubsubTopic(0, 2)
require.Equal(t, "/waku/2/rs/0/2", ns2.String())

_, err = StringToPubsubTopic("////")
require.Error(t, ErrInvalidFormat, err)
require.True(t, ns1.Equal(ns1))
require.False(t, ns1.Equal(ns2))

_, err = StringToPubsubTopic("/waku/2/a")
require.Error(t, ErrInvalidFormat, err)

topic2, err := StringToPubsubTopic("/waku/2/test/proto")
topic := "/waku/2/waku-dev"
ns, err := ToShardedPubsubTopic(topic)
require.NoError(t, err)
require.Equal(t, topic.String(), topic2.String())
require.True(t, topic.Equal(topic2))
require.Equal(t, NamedSharding, ns.Kind())
require.Equal(t, "waku-dev", ns.(NamedShardingPubsubTopic).Name())

topic3 := NewPubsubTopic("test2", "proto")
require.False(t, topic.Equal(topic3))
topic = "/waku/2/rs/16/42"
ns, err = ToShardedPubsubTopic(topic)
require.NoError(t, err)
require.Equal(t, StaticSharding, ns.Kind())
require.Equal(t, uint16(16), ns.(StaticShardingPubsubTopic).Cluster())
require.Equal(t, uint16(42), ns.(StaticShardingPubsubTopic).Shard())

topic = "/waku/1/rs/16/42"
_, err = ToShardedPubsubTopic(topic)
require.Error(t, err)
require.ErrorIs(t, err, ErrInvalidTopicPrefix)

topic = "/waku/2/rs//02"
_, err = ToShardedPubsubTopic(topic)
require.Error(t, err)
require.ErrorIs(t, err, ErrMissingClusterIndex)

topic = "/waku/2/rs/xx/77"
_, err = ToShardedPubsubTopic(topic)
require.Error(t, err)
require.ErrorIs(t, err, ErrInvalidNumberFormat)
}