Skip to content
This repository has been archived by the owner on May 11, 2022. It is now read-only.

Cleanup goroutine structure of autonat #41

Merged
merged 23 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1f8ed1d
Cleanup structure of autonat.
willscott Feb 14, 2020
580ba74
revert getAddrs removal
willscott Feb 18, 2020
52f9c2c
limit autonat probes to peers advertising the protocol
willscott Feb 19, 2020
3410cb6
add public-private transition test
willscott Feb 20, 2020
be5e2cc
watch for address change events
willscott Feb 26, 2020
4bfe538
More selectivityy about emitting nat status changes
willscott Mar 2, 2020
f40dd4c
fix probing logic
willscott Mar 3, 2020
d4d4785
stateful eventbus
aarshkshah1992 Mar 4, 2020
ab02c60
switch to single event type
willscott Mar 5, 2020
a5ecb39
fix logic bug in confidence
willscott Mar 5, 2020
44defc6
use a single reachability event instead of multiple routability events
aschmahmann Mar 5, 2020
f35bfc1
Merge branch 'master' of github.com:libp2p/go-libp2p-autonat into fea…
willscott Mar 6, 2020
bcb8241
bump go-libp2p-core v0.4.0
willscott Mar 6, 2020
fe1f5df
Merge branch 'feat/emit-stateful' of github.com:libp2p/go-libp2p-auto…
willscott Mar 6, 2020
c910ed5
review comments
willscott Mar 9, 2020
8ab19d2
additional review fixups
willscott Mar 9, 2020
7a6734c
stricter host checking
willscott Mar 10, 2020
ce79942
bump to new go-multiaddr-net version
willscott Mar 10, 2020
c81e328
shorten times + tests
willscott Mar 10, 2020
cf2e8a2
squash multiple address-updates that happen at once
willscott Mar 10, 2020
1134a3f
simplify peer selection logic
willscott Mar 10, 2020
d96a2e8
add comment/cleanup
willscott Mar 11, 2020
123114f
panic when no probe-able nodes
willscott Mar 11, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 136 additions & 130 deletions autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,29 @@ import (
"context"
"errors"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

// NATStatus is the state of NAT as detected by the ambient service.
type NATStatus int

const (
// NAT status is unknown; this means that the ambient service has not been
// NATStatusUnknown means that the ambient service has not been
// able to decide the presence of NAT in the most recent attempt to test
// dial through known autonat peers. initial state.
NATStatusUnknown NATStatus = iota
// NAT status is publicly dialable
// NATStatusPublic means this node believes it is externally dialable
NATStatusPublic
// NAT status is private network
// NATStatusPrivate means this node believes it is behind a NAT
NATStatusPrivate
)

Expand All @@ -50,62 +51,61 @@ type AmbientAutoNAT struct {
ctx context.Context
host host.Host

getAddrs GetAddrs

mx sync.Mutex
peers map[peer.ID][]ma.Multiaddr
status NATStatus
addr ma.Multiaddr
candidatePeers chan network.Conn
observations chan autoNATResult
status atomic.Value
// Reflects the confidence on of the NATStatus being private, as a single
// dialback may fail for reasons unrelated to NAT.
// If it is <3, then multiple autoNAT peers may be contacted for dialback
// If only a single autoNAT peer is known, then the confidence increases
// for each failure until it reaches 3.
confidence int
confidence int
lastInbound time.Time
lastProbe time.Time

emitUnknown event.Emitter
emitPublic event.Emitter
emitPrivate event.Emitter
}

// NewAutoNAT creates a new ambient NAT autodiscovery instance attached to a host
// If getAddrs is nil, h.Addrs will be used
func NewAutoNAT(ctx context.Context, h host.Host, getAddrs GetAddrs) AutoNAT {
if getAddrs == nil {
getAddrs = h.Addrs
}
type autoNATResult struct {
NATStatus
address ma.Multiaddr
}

// NewAutoNAT creates a new ambient NAT autodiscovery instance attached to a host
func NewAutoNAT(ctx context.Context, h host.Host) AutoNAT {
emitUnknown, _ := h.EventBus().Emitter(new(event.EvtLocalRoutabilityUnknown))
emitPublic, _ := h.EventBus().Emitter(new(event.EvtLocalRoutabilityPublic))
emitPrivate, _ := h.EventBus().Emitter(new(event.EvtLocalRoutabilityPrivate))

as := &AmbientAutoNAT{
ctx: ctx,
host: h,
getAddrs: getAddrs,
peers: make(map[peer.ID][]ma.Multiaddr),
status: NATStatusUnknown,
ctx: ctx,
host: h,
candidatePeers: make(chan network.Conn, 5),
observations: make(chan autoNATResult, 1),

emitUnknown: emitUnknown,
emitPublic: emitPublic,
emitPrivate: emitPrivate,
}
as.status.Store(autoNATResult{NATStatusUnknown, nil})

h.Network().Notify(as)
go as.background()

return as
}

// Status returns the AutoNAT observed reachability status.
func (as *AmbientAutoNAT) Status() NATStatus {
as.mx.Lock()
defer as.mx.Unlock()
return as.status
s := as.status.Load().(autoNATResult)
return s.NATStatus
}

func (as *AmbientAutoNAT) updateStatus(s NATStatus) {
as.status = s
switch s {
func (as *AmbientAutoNAT) emitStatus() {
status := as.status.Load().(autoNATResult)
switch status.NATStatus {
case NATStatusUnknown:
as.emitUnknown.Emit(event.EvtLocalRoutabilityUnknown{})
case NATStatusPublic:
Expand All @@ -115,158 +115,164 @@ func (as *AmbientAutoNAT) updateStatus(s NATStatus) {
}
}

// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) {
as.mx.Lock()
defer as.mx.Unlock()

if as.status != NATStatusPublic {
s := as.status.Load().(autoNATResult)
if s.NATStatus != NATStatusPublic {
return nil, errors.New("NAT Status is not public")
}

return as.addr, nil
return s.address, nil
}

func (as *AmbientAutoNAT) background() {
// wait a bit for the node to come online and establish some connections
// before starting autodetection
select {
case <-time.After(AutoNATBootDelay):
case <-as.ctx.Done():
return
}

delay := AutoNATBootDelay
for {
as.autodetect()

delay := AutoNATRefreshInterval
if as.status == NATStatusUnknown {
delay = AutoNATRetryInterval
}

select {
// new connection occured.
case conn := <-as.candidatePeers:
if conn.Stat().Direction == network.DirInbound && manet.IsPublicAddr(conn.RemoteMultiaddr()) {
as.lastInbound = time.Now()
}
// TODO: network changed.

// probe finished.
case result := <-as.observations:
as.recordObservation(result)
case <-time.After(delay):
case <-as.ctx.Done():
return
}
}
}

func (as *AmbientAutoNAT) autodetect() {
peers := as.getPeers()

if len(peers) == 0 {
log.Debugf("skipping NAT auto detection; no autonat peers")
return
}

cli := NewAutoNATClient(as.host, as.getAddrs)
ctx, cancel := context.WithTimeout(as.ctx, AutoNATRequestTimeout)
defer cancel()

var result struct {
sync.Mutex
private int
public int
pubaddr ma.Multiaddr
delay = as.scheduleProbe()
}
}

probe := 3 - as.confidence
if probe == 0 {
probe = 1
}
if probe > len(peers) {
probe = len(peers)
// scheduleProbe calculates when the next probe should be scheduled for,
// and launches it if that time is now.
func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
// Our baseline is a probe every 'AutoNATRefreshInterval'
// This is modulated by:
// * recent inbound connections make us willing to wait up to 2x longer between probes.
// * low confidence makes us speed up between probes.
fixedNow := time.Now()
currentStatus := as.status.Load().(autoNATResult)

nextProbe := fixedNow
if !as.lastProbe.IsZero() {
untilNext := AutoNATRefreshInterval
if currentStatus.NATStatus == NATStatusUnknown {
untilNext = AutoNATRetryInterval
} else if currentStatus.NATStatus == NATStatusPublic && as.lastInbound.After(as.lastProbe) {
untilNext *= 2
} else if as.confidence < 3 {
untilNext = AutoNATRetryInterval
}
nextProbe = as.lastProbe.Add(untilNext)
}

var wg sync.WaitGroup

for _, pi := range peers[:probe] {
wg.Add(1)
go func(pi peer.AddrInfo) {
defer wg.Done()

as.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
a, err := cli.DialBack(ctx, pi.ID)

switch {
case err == nil:
log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String())
result.Lock()
result.public++
result.pubaddr = a
result.Unlock()

case IsDialError(err):
log.Debugf("Dialback through %s failed", pi.ID.Pretty())
result.Lock()
result.private++
result.Unlock()

default:
log.Debugf("Dialback error through %s: %s", pi.ID.Pretty(), err)
}
}(pi)
if fixedNow.After(nextProbe) || fixedNow == nextProbe {
as.lastProbe = fixedNow
go as.probeNextPeer()
return AutoNATRetryInterval
}
return nextProbe.Sub(fixedNow)
}

wg.Wait()

as.mx.Lock()
if result.public > 0 {
// Update the current status based on an observed result.
func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
currentStatus := as.status.Load().(autoNATResult)
if observation.NATStatus == NATStatusPublic {
log.Debugf("NAT status is public")
if as.status == NATStatusPrivate {
if currentStatus.NATStatus == NATStatusPrivate {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
} else if as.confidence < 3 {
as.confidence++
}
as.addr = result.pubaddr
as.updateStatus(NATStatusPublic)
} else if result.private > 0 {
if observation.address != nil {
if currentStatus.address != nil && !observation.address.Equal(currentStatus.address) {
as.confidence--
}
as.status.Store(observation)
}
if currentStatus.address != nil || observation.address != nil {
as.emitStatus()
}
} else if observation.NATStatus == NATStatusPrivate {
log.Debugf("NAT status is private")
if as.status == NATStatusPublic {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
if currentStatus.NATStatus == NATStatusPublic {
if as.confidence < 1 {
as.confidence--
} else {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
as.status.Store(observation)
as.emitStatus()
}
} else if as.confidence < 3 {
as.confidence++
as.status.Store(observation)
as.emitStatus()
}
as.addr = nil
as.updateStatus(NATStatusPrivate)
} else if as.confidence > 0 {
// don't just flip to unknown, reduce confidence first
as.confidence--
} else {
log.Debugf("NAT status is unknown")
as.addr = nil
as.updateStatus(NATStatusUnknown)
as.status.Store(autoNATResult{NATStatusUnknown, nil})
as.emitStatus()
}
as.mx.Unlock()
}

func (as *AmbientAutoNAT) getPeers() []peer.AddrInfo {
as.mx.Lock()
defer as.mx.Unlock()
func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) {
cli := NewAutoNATClient(as.host)
ctx, cancel := context.WithTimeout(as.ctx, AutoNATRequestTimeout)
defer cancel()

if len(as.peers) == 0 {
return nil
as.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
a, err := cli.DialBack(ctx, pi.ID)

switch {
case err == nil:
log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String())
as.observations <- autoNATResult{NATStatusPublic, a}
case IsDialError(err):
log.Debugf("Dialback through %s failed", pi.ID.Pretty())
as.observations <- autoNATResult{NATStatusPrivate, nil}
default:
as.observations <- autoNATResult{NATStatusUnknown, nil}
}
}

var connected, others []peer.AddrInfo
func (as *AmbientAutoNAT) probeNextPeer() {
peers := as.host.Network().Peers()
if len(peers) == 0 {
return
}

for p, addrs := range as.peers {
connected := make([]peer.AddrInfo, 0, len(peers))
others := make([]peer.AddrInfo, 0, len(peers))

for _, p := range peers {
info := as.host.Peerstore().PeerInfo(p)
if as.host.Network().Connectedness(p) == network.Connected {
connected = append(connected, peer.AddrInfo{ID: p, Addrs: addrs})
connected = append(connected, info)
} else {
others = append(others, peer.AddrInfo{ID: p, Addrs: addrs})
others = append(others, info)
}
}
// TODO: track and exclude recently probed peers.

shufflePeers(connected)

if len(connected) < 3 {
if len(connected) > 0 {
as.probe(&connected[0])
return
} else if len(others) > 0 {
shufflePeers(others)
return append(connected, others...)
} else {
return connected
as.probe(&others[0])
}
}

Expand Down
6 changes: 2 additions & 4 deletions autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ func newDialResponseError(status pb.Message_ResponseStatus, text string) *pb.Mes

func makeAutoNAT(ctx context.Context, t *testing.T, ash host.Host) (host.Host, AutoNAT) {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
a := NewAutoNAT(ctx, h, nil)
a.(*AmbientAutoNAT).mx.Lock()
a.(*AmbientAutoNAT).peers[ash.ID()] = ash.Addrs()
a.(*AmbientAutoNAT).mx.Unlock()
h.Peerstore().AddAddrs(ash.ID(), ash.Addrs(), time.Minute)
a := NewAutoNAT(ctx, h)
return h, a
}

Expand Down
Loading