From d28f4500464c198a8871c99f52845901f4f3f3f5 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 18 Oct 2019 02:39:37 -0400 Subject: [PATCH 1/3] expose the peer that propagates a message to the recipient --- floodsub_test.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 2 ++ pubsub.go | 42 ++++++++++++++++-------------------- validation.go | 10 ++------- 4 files changed, 79 insertions(+), 31 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 6e845e70..9927bf8d 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1398,3 +1398,59 @@ func readAllQueuedEvents(ctx context.Context, t *testing.T, sub *Subscription) m } return peerState } + +func TestMessageSender(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + + hosts := getNetHosts(t, ctx, 3) + psubs := getPubsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe(topic) + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + + time.Sleep(time.Millisecond * 100) + + for i:=0; i < 3; i++ { + for j := 0; j < 100; j++ { + msg := []byte(fmt.Sprintf("%d sent %d", i, j)) + + psubs[i].Publish(topic, msg) + + for k, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + + var expectedHost int + if i == k { + expectedHost = i + } else if k != 1 { + expectedHost = 1 + } else { + expectedHost = i + } + + if got.ReceivedFrom != hosts[expectedHost].ID() { + t.Fatal("got wrong message sender") + } + } + } + } +} \ No newline at end of file diff --git a/go.mod b/go.mod index c22fa9b7..d3e14043 100644 --- a/go.mod +++ b/go.mod @@ -11,3 +11,5 @@ require ( github.com/multiformats/go-multistream v0.1.0 github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee ) + +go 1.13 diff --git a/pubsub.go b/pubsub.go index 4635b388..2c8b72be 100644 --- a/pubsub.go +++ b/pubsub.go @@ -79,7 +79,7 @@ type PubSub struct { topics map[string]map[peer.ID]struct{} // sendMsg handles messages that have been validated - sendMsg chan *sendReq + sendMsg chan *Message // addVal handles validator registration requests addVal chan *addValReq @@ -135,6 +135,7 @@ type PubSubRouter interface { type Message struct { *pb.Message + ReceivedFrom peer.ID } func (m *Message) GetFrom() peer.ID { @@ -170,7 +171,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option getPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), getTopics: make(chan *topicReq), - sendMsg: make(chan *sendReq, 32), + sendMsg: make(chan *Message, 32), addVal: make(chan *addValReq), rmVal: make(chan *rmValReq), eval: make(chan func()), @@ -373,10 +374,10 @@ func (p *PubSub) processLoop(ctx context.Context) { p.handleIncomingRPC(rpc) case msg := <-p.publish: - p.pushMsg(p.host.ID(), msg) + p.pushMsg(msg) - case req := <-p.sendMsg: - p.publishMessage(req.from, req.msg.Message) + case msg := <-p.sendMsg: + p.publishMessage(msg) case req := <-p.addVal: p.val.AddValidator(req) @@ -522,12 +523,12 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) { // notifySubs sends a given message to all corresponding subscribers. // Only called from processLoop. -func (p *PubSub) notifySubs(msg *pb.Message) { +func (p *PubSub) notifySubs(msg *Message) { for _, topic := range msg.GetTopicIDs() { subs := p.myTopics[topic] for f := range subs { select { - case f.ch <- &Message{msg}: + case f.ch <- msg: default: log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic) } @@ -616,8 +617,8 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { continue } - msg := &Message{pmsg} - p.pushMsg(rpc.from, msg) + msg := &Message{pmsg, rpc.from} + p.pushMsg(msg) } p.rt.HandleRPC(rpc) @@ -629,7 +630,8 @@ func msgID(pmsg *pb.Message) string { } // pushMsg pushes a message performing validation as necessary -func (p *PubSub) pushMsg(src peer.ID, msg *Message) { +func (p *PubSub) pushMsg(msg *Message) { + src := msg.ReceivedFrom // reject messages from blacklisted peers if p.blacklist.Contains(src) { log.Warningf("dropping message from blacklisted peer %s", src) @@ -659,13 +661,13 @@ func (p *PubSub) pushMsg(src peer.ID, msg *Message) { } if p.markSeen(id) { - p.publishMessage(src, msg.Message) + p.publishMessage(msg) } } -func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) { - p.notifySubs(pmsg) - p.rt.Publish(from, pmsg) +func (p *PubSub) publishMessage(msg *Message) { + p.notifySubs(msg) + p.rt.Publish(msg.ReceivedFrom, msg.Message) } type addSubReq struct { @@ -734,10 +736,11 @@ func (p *PubSub) GetTopics() []string { // Publish publishes data to the given topic. func (p *PubSub) Publish(topic string, data []byte) error { seqno := p.nextSeqno() + hostID := p.host.ID() m := &pb.Message{ Data: data, TopicIDs: []string{topic}, - From: []byte(p.host.ID()), + From: []byte(hostID), Seqno: seqno, } if p.signKey != nil { @@ -747,7 +750,7 @@ func (p *PubSub) Publish(topic string, data []byte) error { return err } } - p.publish <- &Message{m} + p.publish <- &Message{m, hostID} return nil } @@ -763,13 +766,6 @@ type listPeerReq struct { topic string } -// sendReq is a request to call publishMessage. -// It is issued after message validation is done. -type sendReq struct { - from peer.ID - msg *Message -} - // ListPeers returns a list of peers we are connected to in the given topic. func (p *PubSub) ListPeers(topic string) []peer.ID { out := make(chan []peer.ID) diff --git a/validation.go b/validation.go index b7e473d9..9253f2ab 100644 --- a/validation.go +++ b/validation.go @@ -233,10 +233,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) { } // no async validators, send the message - v.p.sendMsg <- &sendReq{ - from: src, - msg: msg, - } + v.p.sendMsg <- msg } func (v *validation) validateSignature(msg *Message) bool { @@ -255,10 +252,7 @@ func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message return } - v.p.sendMsg <- &sendReq{ - from: src, - msg: msg, - } + v.p.sendMsg <- msg } func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message) bool { From c0494a42f4d8d1deb3b646d2e73ec53fbfc24db6 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 18 Oct 2019 03:25:24 -0400 Subject: [PATCH 2/3] minor variable refactor --- pubsub.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pubsub.go b/pubsub.go index 2c8b72be..7e34dad9 100644 --- a/pubsub.go +++ b/pubsub.go @@ -736,11 +736,11 @@ func (p *PubSub) GetTopics() []string { // Publish publishes data to the given topic. func (p *PubSub) Publish(topic string, data []byte) error { seqno := p.nextSeqno() - hostID := p.host.ID() + id := p.host.ID() m := &pb.Message{ Data: data, TopicIDs: []string{topic}, - From: []byte(hostID), + From: []byte(id), Seqno: seqno, } if p.signKey != nil { @@ -750,7 +750,7 @@ func (p *PubSub) Publish(topic string, data []byte) error { return err } } - p.publish <- &Message{m, hostID} + p.publish <- &Message{m, id} return nil } From 6551b41f12f9f91b05f395c7b4d9f59f81c2f4f4 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 18 Oct 2019 04:25:52 -0400 Subject: [PATCH 3/3] go fmt --- floodsub_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 9927bf8d..465114b6 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1423,7 +1423,7 @@ func TestMessageSender(t *testing.T) { time.Sleep(time.Millisecond * 100) - for i:=0; i < 3; i++ { + for i := 0; i < 3; i++ { for j := 0; j < 100; j++ { msg := []byte(fmt.Sprintf("%d sent %d", i, j)) @@ -1453,4 +1453,4 @@ func TestMessageSender(t *testing.T) { } } } -} \ No newline at end of file +}