Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

engine: tag peers based on usefulness #191

Merged
merged 4 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 133 additions & 8 deletions decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,35 @@ const (
outboxChanBuffer = 0
// maxMessageSize is the maximum size of the batched payload
maxMessageSize = 512 * 1024
// tagPrefix is the tag given to peers associated an engine
tagPrefix = "bs-engine-%s"
// tagFormat is the tag given to peers associated an engine
tagFormat = "bs-engine-%s-%s"

// tagWeight is the default weight for peers associated with an engine
tagWeight = 5
// queuedTagWeight is the default weight for peers that have work queued
// on their behalf.
queuedTagWeight = 10

// the alpha for the EWMA used to track short term usefulness
shortTermAlpha = 0.5

// the alpha for the EWMA used to track long term usefulness
longTermAlpha = 0.05

// long term ratio defines what "long term" means in terms of the
// shortTerm duration. Peers that interact once every longTermRatio are
// considered useful over the long term.
longTermRatio = 10

// long/short term scores for tagging peers
longTermScore = 10 // this is a high tag but it grows _very_ slowly.
shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
)

var (
// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
//
// this is only a variable to make testing easier.
shortTerm = 10 * time.Second
)

// Envelope contains a message for a Peer.
Expand Down Expand Up @@ -105,7 +129,8 @@ type Engine struct {

peerTagger PeerTagger

tag string
tagQueued, tagUseful string

lock sync.Mutex // protects the fields immediatly below
// ledgerMap lists Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger
Expand All @@ -123,18 +148,118 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger)
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
}
e.tag = fmt.Sprintf(tagPrefix, uuid.New().String())
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
e.peerRequestQueue = peertaskqueue.New(peertaskqueue.OnPeerAddedHook(e.onPeerAdded), peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved))
go e.taskWorker(ctx)
go e.scoreWorker(ctx)
return e
}

// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (e *Engine) scoreWorker(ctx context.Context) {
ticker := time.NewTicker(shortTerm)
defer ticker.Stop()

type update struct {
peer peer.ID
score int
}
var (
lastShortUpdate, lastLongUpdate time.Time
updates []update
)

for i := 0; ; i = (i + 1) % longTermRatio {
var now time.Time
select {
case now = <-ticker.C:
case <-ctx.Done():
return
}

// The long term update ticks every `longTermRatio` short
// intervals.
updateLong := i == 0

e.lock.Lock()
for _, ledger := range e.ledgerMap {
ledger.lk.Lock()

// Update the short-term score.
if ledger.lastExchange.After(lastShortUpdate) {
ledger.shortScore = ewma(ledger.shortScore, shortTermScore, shortTermAlpha)
} else {
ledger.shortScore = ewma(ledger.shortScore, 0, shortTermAlpha)
}

// Update the long-term score.
if updateLong {
if ledger.lastExchange.After(lastLongUpdate) {
ledger.longScore = ewma(ledger.longScore, longTermScore, longTermAlpha)
} else {
ledger.longScore = ewma(ledger.longScore, 0, longTermAlpha)
}
}

// Calculate the new score.
//
// The accounting score adjustment prefers peers _we_
// need over peers that need us. This doesn't help with
// leeching.
score := int((ledger.shortScore + ledger.longScore) * ((ledger.Accounting.Score())*.5 + .75))

// Avoid updating the connection manager unless there's a change. This can be expensive.
if ledger.score != score {
// put these in a list so we can perform the updates outside _global_ the lock.
updates = append(updates, update{ledger.Partner, score})
ledger.score = score
}
ledger.lk.Unlock()
}
e.lock.Unlock()

// record the times.
lastShortUpdate = now
if updateLong {
lastLongUpdate = now
}

// apply the updates
for _, update := range updates {
if update.score == 0 {
e.peerTagger.UntagPeer(update.peer, e.tagUseful)
} else {
e.peerTagger.TagPeer(update.peer, e.tagUseful, update.score)
}
}
// Keep the memory. It's not much and it saves us from having to allocate.
updates = updates[:0]
}
}

func (e *Engine) onPeerAdded(p peer.ID) {
e.peerTagger.TagPeer(p, e.tag, tagWeight)
e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
}

func (e *Engine) onPeerRemoved(p peer.ID) {
e.peerTagger.UntagPeer(p, e.tag)
e.peerTagger.UntagPeer(p, e.tagQueued)
}

// WantlistForPeer returns the currently understood want list for a given peer
Expand Down
103 changes: 84 additions & 19 deletions decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,63 @@ import (
testutil "github.com/libp2p/go-libp2p-core/test"
)

type peerTag struct {
done chan struct{}
peers map[peer.ID]int
}

type fakePeerTagger struct {
lk sync.Mutex
wait sync.WaitGroup
taggedPeers []peer.ID
lk sync.Mutex
tags map[string]*peerTag
}

func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
fpt.wait.Add(1)

fpt.lk.Lock()
defer fpt.lk.Unlock()
fpt.taggedPeers = append(fpt.taggedPeers, p)
if fpt.tags == nil {
fpt.tags = make(map[string]*peerTag, 1)
}
pt, ok := fpt.tags[tag]
if !ok {
pt = &peerTag{peers: make(map[peer.ID]int, 1), done: make(chan struct{})}
fpt.tags[tag] = pt
}
pt.peers[p] = n
}

func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
defer fpt.wait.Done()

fpt.lk.Lock()
defer fpt.lk.Unlock()
for i := 0; i < len(fpt.taggedPeers); i++ {
if fpt.taggedPeers[i] == p {
fpt.taggedPeers[i] = fpt.taggedPeers[len(fpt.taggedPeers)-1]
fpt.taggedPeers = fpt.taggedPeers[:len(fpt.taggedPeers)-1]
return
}
pt := fpt.tags[tag]
if pt == nil {
return
}
delete(pt.peers, p)
if len(pt.peers) == 0 {
close(pt.done)
delete(fpt.tags, tag)
}
}

func (fpt *fakePeerTagger) count() int {
func (fpt *fakePeerTagger) count(tag string) int {
fpt.lk.Lock()
defer fpt.lk.Unlock()
return len(fpt.taggedPeers)
if pt, ok := fpt.tags[tag]; ok {
return len(pt.peers)
}
return 0
}

func (fpt *fakePeerTagger) wait(tag string) {
fpt.lk.Lock()
pt := fpt.tags[tag]
if pt == nil {
fpt.lk.Unlock()
return
}
doneCh := pt.done
fpt.lk.Unlock()
<-doneCh
}

type engineSet struct {
Expand Down Expand Up @@ -241,16 +266,56 @@ func TestTaggingPeers(t *testing.T) {
next := <-sanfrancisco.Engine.Outbox()
envelope := <-next

if sanfrancisco.PeerTagger.count() != 1 {
if sanfrancisco.PeerTagger.count(sanfrancisco.Engine.tagQueued) != 1 {
t.Fatal("Incorrect number of peers tagged")
}
envelope.Sent()
<-sanfrancisco.Engine.Outbox()
sanfrancisco.PeerTagger.wait.Wait()
if sanfrancisco.PeerTagger.count() != 0 {
sanfrancisco.PeerTagger.wait(sanfrancisco.Engine.tagQueued)
if sanfrancisco.PeerTagger.count(sanfrancisco.Engine.tagQueued) != 0 {
t.Fatal("Peers should be untagged but weren't")
}
}

func TestTaggingUseful(t *testing.T) {
oldShortTerm := shortTerm
shortTerm = 1 * time.Millisecond
defer func() { shortTerm = oldShortTerm }()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
me := newEngine(ctx, "engine")
friend := peer.ID("friend")

block := blocks.NewBlock([]byte("foobar"))
msg := message.New(false)
msg.AddBlock(block)

for i := 0; i < 3; i++ {
if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
t.Fatal("Peers should be untagged but weren't")
}
me.Engine.MessageSent(friend, msg)
time.Sleep(shortTerm * 2)
if me.PeerTagger.count(me.Engine.tagUseful) != 1 {
t.Fatal("Peers should be tagged but weren't")
}
time.Sleep(shortTerm * 8)
}

if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(shortTerm * 2)
if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(shortTerm * 10)
if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
t.Fatal("peers should finally be untagged")
}
}

func partnerWants(e *Engine, keys []string, partner peer.ID) {
add := message.New(false)
for i, letter := range keys {
Expand Down
5 changes: 5 additions & 0 deletions decision/ewma.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package decision

func ewma(old, new, alpha float64) float64 {
return new*alpha + (1-alpha)*old

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien I know I am late to this PR but if you are willing to shoulder the import, I highly recommend using gonum/floats for the multiplications as it is significantly faster than stdlib.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather keep this case simple as I don't think this is going to be a bottleneck.

}
25 changes: 18 additions & 7 deletions decision/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (

func newLedger(p peer.ID) *ledger {
return &ledger{
wantList: wl.New(),
Partner: p,
sentToPeer: make(map[string]time.Time),
wantList: wl.New(),
Partner: p,
}
}

Expand All @@ -30,16 +29,19 @@ type ledger struct {
// lastExchange is the time of the last data exchange.
lastExchange time.Time

// These scores keep track of how useful we think this peer is. Short
// tracks short-term usefulness and long tracks long-term usefulness.
shortScore, longScore float64
// Score keeps track of the score used in the peer tagger. We track it
// here to avoid unnecessarily updating the tags in the connection manager.
score int

// exchangeCount is the number of exchanges with this peer
exchangeCount uint64

// wantList is a (bounded, small) set of keys that Partner desires.
wantList *wl.Wantlist

// sentToPeer is a set of keys to ensure we dont send duplicate blocks
// to a given peer
sentToPeer map[string]time.Time

// ref is the reference count for this ledger, its used to ensure we
// don't drop the reference to this ledger in multi-connection scenarios
ref int
Expand All @@ -63,10 +65,19 @@ type debtRatio struct {
BytesRecv uint64
}

// Value returns the debt ratio, sent:receive.
func (dr *debtRatio) Value() float64 {
return float64(dr.BytesSent) / float64(dr.BytesRecv+1)
}

// Score returns the debt _score_ on a 0-1 scale.
func (dr *debtRatio) Score() float64 {
if dr.BytesRecv == 0 {
return 0
}
return float64(dr.BytesRecv) / float64(dr.BytesRecv+dr.BytesSent)
}

func (l *ledger) SentBytes(n int) {
l.exchangeCount++
l.lastExchange = time.Now()
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.1 // indirect
github.com/google/uuid v1.1.1
github.com/hashicorp/golang-lru v0.5.1
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.2
github.com/ipfs/go-datastore v0.0.5
Expand Down Expand Up @@ -38,3 +37,5 @@ require (
golang.org/x/text v0.3.2 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)

go 1.12