From 9b3e7493f89fec932858a9bc619e10729e3eec19 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 09:26:21 -0700 Subject: [PATCH 01/21] feat: implement peering service MVP for #6097 This feature will repeatedly reconnect (with a randomized exponential backoff) to peers in a set of "peered" peers. In the future, this should be extended to: 1. Include a CLI for modifying this list at runtime. 2. Include additional options for peers we want to _protect_ but not connect to. 3. Allow configuring timeouts, backoff, etc. 4. Allow groups? Possibly through textile threads. 5. Allow for runtime-only peering rules. 6. Different reconnect policies. But this MVP should be a significant step forward. This commit was moved from ipfs/kubo@978091a626a0e1f00a797fc4e2de99f4bfee943b --- peering/peering.go | 259 ++++++++++++++++++++++++++++++++++++++++ peering/peering_test.go | 6 + 2 files changed, 265 insertions(+) create mode 100644 peering/peering.go create mode 100644 peering/peering_test.go diff --git a/peering/peering.go b/peering/peering.go new file mode 100644 index 000000000..c543712c5 --- /dev/null +++ b/peering/peering.go @@ -0,0 +1,259 @@ +package peering + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" + + "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" +) + +// maxBackoff is the maximum time between reconnect attempts. +const ( + maxBackoff = 10 * time.Minute + connmgrTag = "ipfs-peering" + // This needs to be sufficient to prevent two sides from simultaneously + // dialing. + initialDelay = 5 * time.Second +) + +var logger = log.Logger("peering") + +type state int + +const ( + stateInit state = iota + stateRunning + stateStopped +) + +// peerHandler keeps track of all state related to a specific "peering" peer. +type peerHandler struct { + peer peer.ID + host host.Host + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + addrs []multiaddr.Multiaddr + timer *time.Timer + + nextDelay time.Duration +} + +func (ph *peerHandler) stop() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.timer != nil { + ph.timer.Stop() + ph.timer = nil + } +} + +func (ph *peerHandler) nextBackoff() time.Duration { + // calculate the timeout + if ph.nextDelay < maxBackoff { + ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay))) + } + return ph.nextDelay +} + +func (ph *peerHandler) reconnect() { + // Try connecting + + ph.mu.Lock() + addrs := append(([]multiaddr.Multiaddr)(nil), ph.addrs...) + ph.mu.Unlock() + + logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs) + + err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs}) + if err != nil { + logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err) + // Ok, we failed. Extend the timeout. + ph.mu.Lock() + if ph.timer != nil { + // Only counts if the timer still exists. If not, a + // connection _was_ somehow established. + ph.timer.Reset(ph.nextBackoff()) + } + // Otherwise, someone else has stopped us so we can assume that + // we're either connected or someone else will start us. + ph.mu.Unlock() + } + + // Always call this. We could have connected since we processed the + // error. + ph.stopIfConnected() +} + +func (ph *peerHandler) stopIfConnected() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.timer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected { + logger.Debugw("successfully reconnected", "peer", ph.peer) + ph.timer.Stop() + ph.timer = nil + ph.nextDelay = initialDelay + } +} + +// startIfDisconnected is the inverse of stopIfConnected. +func (ph *peerHandler) startIfDisconnected() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.timer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected { + logger.Debugw("disconnected from peer", "peer", ph.peer) + // Always start with a short timeout so we can stagger things a bit. + ph.timer = time.AfterFunc(ph.nextBackoff(), ph.reconnect) + } +} + +// PeeringService maintains connections to specified peers, reconnecting on +// disconnect with a back-off. +type PeeringService struct { + host host.Host + + mu sync.RWMutex + peers map[peer.ID]*peerHandler + + ctx context.Context + cancel context.CancelFunc + state state +} + +// NewPeeringService constructs a new peering service. Peers can be added and +// removed immediately, but connections won't be formed until `Start` is called. +func NewPeeringService(host host.Host) *PeeringService { + ps := &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)} + ps.ctx, ps.cancel = context.WithCancel(context.Background()) + return ps +} + +// Start starts the peering service, connecting and maintaining connections to +// all registered peers. It returns an error if the service has already been +// stopped. +func (ps *PeeringService) Start() error { + ps.mu.Lock() + defer ps.mu.Unlock() + + switch ps.state { + case stateInit: + logger.Infow("starting") + case stateRunning: + return nil + case stateStopped: + return errors.New("already stopped") + } + ps.host.Network().Notify((*netNotifee)(ps)) + ps.state = stateRunning + for _, handler := range ps.peers { + go handler.startIfDisconnected() + } + return nil +} + +// Stop stops the peering service. +func (ps *PeeringService) Stop() error { + ps.cancel() + ps.host.Network().StopNotify((*netNotifee)(ps)) + + ps.mu.Lock() + defer ps.mu.Unlock() + + if ps.state == stateRunning { + logger.Infow("stopping") + for _, handler := range ps.peers { + handler.stop() + } + } + return nil +} + +// AddPeer adds a peer to the peering service. This function may be safely +// called at any time: before the service is started, while running, or after it +// stops. +// +// Add peer may also be called multiple times for the same peer. The new +// addresses will replace the old. +func (ps *PeeringService) AddPeer(info peer.AddrInfo) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if handler, ok := ps.peers[info.ID]; ok { + logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs) + handler.addrs = info.Addrs + } else { + logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs) + ps.host.ConnManager().Protect(info.ID, connmgrTag) + + handler = &peerHandler{ + host: ps.host, + peer: info.ID, + addrs: info.Addrs, + nextDelay: initialDelay, + } + handler.ctx, handler.cancel = context.WithCancel(ps.ctx) + ps.peers[info.ID] = handler + if ps.state == stateRunning { + go handler.startIfDisconnected() + } + } +} + +// RemovePeer removes a peer from the peering service. This function may be +// safely called at any time: before the service is started, while running, or +// after it stops. +func (ps *PeeringService) RemovePeer(id peer.ID) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if handler, ok := ps.peers[id]; ok { + logger.Infow("peer removed", "peer", id) + ps.host.ConnManager().Unprotect(id, connmgrTag) + + handler.stop() + handler.cancel() + delete(ps.peers, id) + } +} + +type netNotifee PeeringService + +func (nn *netNotifee) Connected(_ network.Network, c network.Conn) { + ps := (*PeeringService)(nn) + + p := c.RemotePeer() + ps.mu.RLock() + defer ps.mu.RUnlock() + + if handler, ok := ps.peers[p]; ok { + // use a goroutine to avoid blocking events. + go handler.stopIfConnected() + } +} +func (nn *netNotifee) Disconnected(_ network.Network, c network.Conn) { + ps := (*PeeringService)(nn) + + p := c.RemotePeer() + ps.mu.RLock() + defer ps.mu.RUnlock() + + if handler, ok := ps.peers[p]; ok { + // use a goroutine to avoid blocking events. + go handler.startIfDisconnected() + } +} +func (nn *netNotifee) OpenedStream(network.Network, network.Stream) {} +func (nn *netNotifee) ClosedStream(network.Network, network.Stream) {} +func (nn *netNotifee) Listen(network.Network, multiaddr.Multiaddr) {} +func (nn *netNotifee) ListenClose(network.Network, multiaddr.Multiaddr) {} diff --git a/peering/peering_test.go b/peering/peering_test.go new file mode 100644 index 000000000..0be08dcdc --- /dev/null +++ b/peering/peering_test.go @@ -0,0 +1,6 @@ +package peering + +import "testing" + +func TestPeeringService(t *testing.T) { +} From 05cbfba7c7ceb73b2625fa1a9075db171f1801bd Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 18:53:55 -0700 Subject: [PATCH 02/21] fix: doc comment location Co-authored-by: Will This commit was moved from ipfs/kubo@8e52c7fb2d04482b108489e7cafd95ec6ebb0375 --- peering/peering.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peering/peering.go b/peering/peering.go index c543712c5..ef0aa4d2e 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -14,8 +14,8 @@ import ( "github.com/multiformats/go-multiaddr" ) -// maxBackoff is the maximum time between reconnect attempts. const ( + // maxBackoff is the maximum time between reconnect attempts. maxBackoff = 10 * time.Minute connmgrTag = "ipfs-peering" // This needs to be sufficient to prevent two sides from simultaneously From a7e3b3fb029bb795cda8e6ded37dc43870ecee36 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 19:11:13 -0700 Subject: [PATCH 03/21] fix: address peering service code feedback * better name for timer * cancel context from within stop This commit was moved from ipfs/kubo@0551c4dca843fe3a9665fdc49cd2dcecaeb85046 --- peering/peering.go | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/peering/peering.go b/peering/peering.go index ef0aa4d2e..9785b6555 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -40,20 +40,22 @@ type peerHandler struct { ctx context.Context cancel context.CancelFunc - mu sync.Mutex - addrs []multiaddr.Multiaddr - timer *time.Timer + mu sync.Mutex + addrs []multiaddr.Multiaddr + reconnectTimer *time.Timer nextDelay time.Duration } +// stop permanently stops the peer handler. func (ph *peerHandler) stop() { + ph.cancel() + ph.mu.Lock() defer ph.mu.Unlock() - - if ph.timer != nil { - ph.timer.Stop() - ph.timer = nil + if ph.reconnectTimer != nil { + ph.reconnectTimer.Stop() + ph.reconnectTimer = nil } } @@ -79,10 +81,10 @@ func (ph *peerHandler) reconnect() { logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err) // Ok, we failed. Extend the timeout. ph.mu.Lock() - if ph.timer != nil { - // Only counts if the timer still exists. If not, a + if ph.reconnectTimer != nil { + // Only counts if the reconnectTimer still exists. If not, a // connection _was_ somehow established. - ph.timer.Reset(ph.nextBackoff()) + ph.reconnectTimer.Reset(ph.nextBackoff()) } // Otherwise, someone else has stopped us so we can assume that // we're either connected or someone else will start us. @@ -98,10 +100,10 @@ func (ph *peerHandler) stopIfConnected() { ph.mu.Lock() defer ph.mu.Unlock() - if ph.timer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected { + if ph.reconnectTimer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected { logger.Debugw("successfully reconnected", "peer", ph.peer) - ph.timer.Stop() - ph.timer = nil + ph.reconnectTimer.Stop() + ph.reconnectTimer = nil ph.nextDelay = initialDelay } } @@ -111,10 +113,10 @@ func (ph *peerHandler) startIfDisconnected() { ph.mu.Lock() defer ph.mu.Unlock() - if ph.timer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected { + if ph.reconnectTimer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected { logger.Debugw("disconnected from peer", "peer", ph.peer) // Always start with a short timeout so we can stagger things a bit. - ph.timer = time.AfterFunc(ph.nextBackoff(), ph.reconnect) + ph.reconnectTimer = time.AfterFunc(ph.nextBackoff(), ph.reconnect) } } @@ -222,7 +224,6 @@ func (ps *PeeringService) RemovePeer(id peer.ID) { ps.host.ConnManager().Unprotect(id, connmgrTag) handler.stop() - handler.cancel() delete(ps.peers, id) } } From 2e1f0125922b51ae979a0c14d12a72461e51a5b4 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 20:02:23 -0700 Subject: [PATCH 04/21] test: add unit test for peering service This commit was moved from ipfs/kubo@fe2b289d3002ed11a1dbcc0521bc42d33f0d46ed --- peering/peering_test.go | 135 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 134 insertions(+), 1 deletion(-) diff --git a/peering/peering_test.go b/peering/peering_test.go index 0be08dcdc..0d03aaf8e 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -1,6 +1,139 @@ package peering -import "testing" +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + connmgr "github.com/libp2p/go-libp2p-connmgr" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/stretchr/testify/require" +) + +func newNode(ctx context.Context, t *testing.T) host.Host { + h, err := libp2p.New( + ctx, + libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), + // We'd like to set the connection manager low water to 0, but + // that would disable the connection manager. + libp2p.ConnectionManager(connmgr.NewConnManager(1, 100, 0)), + ) + require.NoError(t, err) + return h +} func TestPeeringService(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h1 := newNode(ctx, t) + ps1 := NewPeeringService(h1) + + h2 := newNode(ctx, t) + h3 := newNode(ctx, t) + h4 := newNode(ctx, t) + + // peer 1 -> 2 + ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + + // We haven't started so we shouldn't have any peers. + require.Never(t, func() bool { + return len(h1.Network().Peers()) > 0 + }, 100*time.Millisecond, 1*time.Second, "expected host 1 to have no peers") + + // Use p4 to take up the one slot we have in the connection manager. + for _, h := range []host.Host{h1, h2} { + require.NoError(t, h.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()})) + h.ConnManager().TagPeer(h4.ID(), "sticky-peer", 1000) + } + + // Now start. + require.NoError(t, ps1.Start()) + // starting twice is fine. + require.NoError(t, ps1.Start()) + + // We should eventually connect. + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 10*time.Millisecond) + + // Now explicitly connect to p3. + require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()})) + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 100*time.Millisecond) + + require.Len(t, h1.Network().Peers(), 3) + + // force a disconnect + h1.ConnManager().TrimOpenConns(ctx) + + // Should disconnect from p3. + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h3.ID()) != network.Connected + }, 5*time.Second, 10*time.Millisecond) + + // Should remain connected to p2 + require.Never(t, func() bool { + return h1.Network().Connectedness(h2.ID()) != network.Connected + }, 5*time.Second, 1*time.Second) + + // Now force h2 to disconnect (we have an asymmetric peering). + conns := h2.Network().ConnsToPeer(h1.ID()) + require.NotEmpty(t, conns) + h2.ConnManager().TrimOpenConns(ctx) + + // All conns to peer should eventually close. + for _, c := range conns { + require.Eventually(t, func() bool { + s, err := c.NewStream() + if s != nil { + _ = s.Reset() + } + return err != nil + }, 5*time.Second, 10*time.Millisecond) + } + + // Should eventually re-connect. + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + + // Unprotect 2 from 1. + ps1.RemovePeer(h2.ID()) + + // Trim connections. + h1.ConnManager().TrimOpenConns(ctx) + + // Should disconnect + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) != network.Connected + }, 5*time.Second, 10*time.Millisecond) + + // Should never reconnect. + require.Never(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 20*time.Second, 1*time.Second) + + // Until added back + ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + ps1.AddPeer(peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}) + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h3.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + + // Should be able to repeatedly stop. + require.NoError(t, ps1.Stop()) + require.NoError(t, ps1.Stop()) + + // Adding and removing should work after stopping. + ps1.AddPeer(peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) + ps1.RemovePeer(h2.ID()) } From c1fb740c08b103cd602c751a776eb6a0db4acc34 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 20:09:50 -0700 Subject: [PATCH 05/21] fix(peering): fix a race condition This commit was moved from ipfs/kubo@87a293f6801b176d9a947c3bf49c511e3dd98dce --- peering/peering.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/peering/peering.go b/peering/peering.go index 9785b6555..5f78a44f6 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -47,6 +47,24 @@ type peerHandler struct { nextDelay time.Duration } +// setAddrs sets the addresses for this peer. +func (ph *peerHandler) setAddrs(addrs []multiaddr.Multiaddr) { + // Not strictly necessary, but it helps to not trust the calling code. + addrCopy := make([]multiaddr.Multiaddr, len(addrs)) + copy(addrCopy, addrs) + + ph.mu.Lock() + defer ph.mu.Unlock() + ph.addrs = addrCopy +} + +// getAddrs returns a shared slice of addresses for this peer. Do not modify. +func (ph *peerHandler) getAddrs() []multiaddr.Multiaddr { + ph.mu.Lock() + defer ph.mu.Unlock() + return ph.addrs +} + // stop permanently stops the peer handler. func (ph *peerHandler) stop() { ph.cancel() @@ -69,11 +87,7 @@ func (ph *peerHandler) nextBackoff() time.Duration { func (ph *peerHandler) reconnect() { // Try connecting - - ph.mu.Lock() - addrs := append(([]multiaddr.Multiaddr)(nil), ph.addrs...) - ph.mu.Unlock() - + addrs := ph.getAddrs() logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs) err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs}) @@ -193,7 +207,7 @@ func (ps *PeeringService) AddPeer(info peer.AddrInfo) { if handler, ok := ps.peers[info.ID]; ok { logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs) - handler.addrs = info.Addrs + handler.setAddrs(info.Addrs) } else { logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs) ps.host.ConnManager().Protect(info.ID, connmgrTag) From 2ef70926050bf3dad357e33b94c4a5515b29c9da Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 20:20:13 -0700 Subject: [PATCH 06/21] fix: remove unecessary context This commit was moved from ipfs/kubo@17b3b02549ef96534b4d170674d4ccf174efb1d1 --- peering/peering.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/peering/peering.go b/peering/peering.go index 5f78a44f6..663f41017 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -141,18 +141,13 @@ type PeeringService struct { mu sync.RWMutex peers map[peer.ID]*peerHandler - - ctx context.Context - cancel context.CancelFunc - state state + state state } // NewPeeringService constructs a new peering service. Peers can be added and // removed immediately, but connections won't be formed until `Start` is called. func NewPeeringService(host host.Host) *PeeringService { - ps := &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)} - ps.ctx, ps.cancel = context.WithCancel(context.Background()) - return ps + return &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)} } // Start starts the peering service, connecting and maintaining connections to @@ -180,17 +175,18 @@ func (ps *PeeringService) Start() error { // Stop stops the peering service. func (ps *PeeringService) Stop() error { - ps.cancel() ps.host.Network().StopNotify((*netNotifee)(ps)) ps.mu.Lock() defer ps.mu.Unlock() - if ps.state == stateRunning { + switch ps.state { + case stateInit, stateRunning: logger.Infow("stopping") for _, handler := range ps.peers { handler.stop() } + ps.state = stateStopped } return nil } @@ -218,10 +214,16 @@ func (ps *PeeringService) AddPeer(info peer.AddrInfo) { addrs: info.Addrs, nextDelay: initialDelay, } - handler.ctx, handler.cancel = context.WithCancel(ps.ctx) + handler.ctx, handler.cancel = context.WithCancel(context.Background()) ps.peers[info.ID] = handler - if ps.state == stateRunning { + switch ps.state { + case stateRunning: go handler.startIfDisconnected() + case stateStopped: + // We still construct everything in this state because + // it's easier to reason about. But we should still free + // resources. + handler.cancel() } } } From 971983d0f081887933eeccbf057c5061d860d00d Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 21:18:45 -0700 Subject: [PATCH 07/21] fix: really cap the max backoff at 10 minutes While preserving some randomness. And add a test. This commit was moved from ipfs/kubo@e10289a93d50e3cc80a5a3692b98391cc1aab62b --- peering/peering.go | 18 ++++++++++++++++-- peering/peering_test.go | 19 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/peering/peering.go b/peering/peering.go index 663f41017..ed0b43226 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -14,10 +14,17 @@ import ( "github.com/multiformats/go-multiaddr" ) +// Seed the random number generator. +// +// We don't need good randomness, but we do need randomness. const ( // maxBackoff is the maximum time between reconnect attempts. maxBackoff = 10 * time.Minute - connmgrTag = "ipfs-peering" + // The backoff will be cut off when we get within 10% of the actual max. + // If we go over the max, we'll adjust the delay down to a random value + // between 90-100% of the max backoff. + maxBackoffJitter = 10 // % + connmgrTag = "ipfs-peering" // This needs to be sufficient to prevent two sides from simultaneously // dialing. initialDelay = 5 * time.Second @@ -78,10 +85,17 @@ func (ph *peerHandler) stop() { } func (ph *peerHandler) nextBackoff() time.Duration { - // calculate the timeout if ph.nextDelay < maxBackoff { ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay))) } + + // If we've gone over the max backoff, reduce it under the max. + if ph.nextDelay > maxBackoff { + ph.nextDelay = maxBackoff + // randomize the backoff a bit (10%). + ph.nextDelay -= time.Duration(rand.Int63n(int64(maxBackoff) * maxBackoffJitter / 100)) + } + return ph.nextDelay } diff --git a/peering/peering_test.go b/peering/peering_test.go index 0d03aaf8e..1f21b7816 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -137,3 +137,22 @@ func TestPeeringService(t *testing.T) { ps1.AddPeer(peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) ps1.RemovePeer(h2.ID()) } + +func TestNextBackoff(t *testing.T) { + minMaxBackoff := (100 - maxBackoffJitter) / 100 * maxBackoff + for x := 0; x < 1000; x++ { + ph := peerHandler{nextDelay: time.Second} + for min, max := time.Second*3/2, time.Second*5/2; min < minMaxBackoff; min, max = min*3/2, max*5/2 { + b := ph.nextBackoff() + if b > max || b < min { + t.Errorf("expected backoff %s to be between %s and %s", b, min, max) + } + } + for i := 0; i < 100; i++ { + b := ph.nextBackoff() + if b < minMaxBackoff || b > maxBackoff { + t.Fatal("failed to stay within max bounds") + } + } + } +} From d6255b5f2e8ba7423dc1fa73afdac58b16b48dbb Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 19 Dec 2020 16:16:00 +0700 Subject: [PATCH 08/21] update go-libp2p to v0.13.0 This commit was moved from ipfs/kubo@fcdf77c9b0fe33268ad6048d973143b98ddeb83a --- peering/peering_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peering/peering_test.go b/peering/peering_test.go index 1f21b7816..3bfdd9d94 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -90,7 +90,7 @@ func TestPeeringService(t *testing.T) { // All conns to peer should eventually close. for _, c := range conns { require.Eventually(t, func() bool { - s, err := c.NewStream() + s, err := c.NewStream(context.Background()) if s != nil { _ = s.Reset() } From f778bf106ac91bb8782c68b4b22d59673325518c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Sat, 6 Feb 2021 17:58:18 +0000 Subject: [PATCH 09/21] peering: add logs before many-second waits This test takes a full minute to run, and I was honestly thinking my run of "go test -v" had simply hung, as I saw no output and no apparent resource usage. The least we can do is print a few log messages before the potentially long waits, to hint that we're still making progress. Each of these "Eventually" and "Never" calls ends up blocking the test for a few seconds at a time. This commit was moved from ipfs/kubo@5e0c8bbf28e830c8a81bc81eee3633c0a176c30d --- peering/peering_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/peering/peering_test.go b/peering/peering_test.go index 3bfdd9d94..a6ce1332b 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -57,11 +57,13 @@ func TestPeeringService(t *testing.T) { require.NoError(t, ps1.Start()) // We should eventually connect. + t.Logf("waiting for h1 to connect to h2") require.Eventually(t, func() bool { return h1.Network().Connectedness(h2.ID()) == network.Connected }, 30*time.Second, 10*time.Millisecond) - // Now explicitly connect to p3. + // Now explicitly connect to h3. + t.Logf("waiting for h1's connection to h3 to work") require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()})) require.Eventually(t, func() bool { return h1.Network().Connectedness(h2.ID()) == network.Connected @@ -72,7 +74,8 @@ func TestPeeringService(t *testing.T) { // force a disconnect h1.ConnManager().TrimOpenConns(ctx) - // Should disconnect from p3. + // Should disconnect from h3. + t.Logf("waiting for h1's connection to h3 to disconnect") require.Eventually(t, func() bool { return h1.Network().Connectedness(h3.ID()) != network.Connected }, 5*time.Second, 10*time.Millisecond) @@ -88,6 +91,7 @@ func TestPeeringService(t *testing.T) { h2.ConnManager().TrimOpenConns(ctx) // All conns to peer should eventually close. + t.Logf("waiting for all connections to close") for _, c := range conns { require.Eventually(t, func() bool { s, err := c.NewStream(context.Background()) @@ -110,11 +114,13 @@ func TestPeeringService(t *testing.T) { h1.ConnManager().TrimOpenConns(ctx) // Should disconnect + t.Logf("waiting for h1 to disconnect from h2") require.Eventually(t, func() bool { return h1.Network().Connectedness(h2.ID()) != network.Connected }, 5*time.Second, 10*time.Millisecond) // Should never reconnect. + t.Logf("ensuring h1 is not connected to h2 again") require.Never(t, func() bool { return h1.Network().Connectedness(h2.ID()) == network.Connected }, 20*time.Second, 1*time.Second) @@ -122,6 +128,7 @@ func TestPeeringService(t *testing.T) { // Until added back ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) ps1.AddPeer(peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}) + t.Logf("wait for h1 to connect to h2 and h3 again") require.Eventually(t, func() bool { return h1.Network().Connectedness(h2.ID()) == network.Connected }, 30*time.Second, 1*time.Second) From 67c4cf0fd1d40dd5f1895acead670b67ee258417 Mon Sep 17 00:00:00 2001 From: Takashi Matsuda Date: Sat, 15 May 2021 11:34:54 +0900 Subject: [PATCH 10/21] fix: the test of peering.PeeringService This commit fixes the issue that the test of peering.PeeringService must check the connection of h3 but did not. This line seemed to be unintended. This commit was moved from ipfs/kubo@b3a6de8c6d3fa1649794d7ec9aa2e96adda875bd --- peering/peering_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peering/peering_test.go b/peering/peering_test.go index a6ce1332b..7ec42efb6 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -66,7 +66,7 @@ func TestPeeringService(t *testing.T) { t.Logf("waiting for h1's connection to h3 to work") require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()})) require.Eventually(t, func() bool { - return h1.Network().Connectedness(h2.ID()) == network.Connected + return h1.Network().Connectedness(h3.ID()) == network.Connected }, 30*time.Second, 100*time.Millisecond) require.Len(t, h1.Network().Peers(), 3) From 1f381a2b0bbee86af8d0d3c45379467bc59684ca Mon Sep 17 00:00:00 2001 From: Takashi Matsuda Date: Wed, 15 Sep 2021 23:46:16 +0900 Subject: [PATCH 11/21] feature: 'ipfs swarm peering' command (#8147) * feat: added swarm peering command supporting add, ls and rm Co-authored-by: Steven Allen This commit was moved from ipfs/kubo@a651045c502435b5dfea0d928dc243a7c02b6b04 --- peering/peering.go | 11 +++++++++++ peering/peering_test.go | 6 ++++++ 2 files changed, 17 insertions(+) diff --git a/peering/peering.go b/peering/peering.go index ed0b43226..3146769a0 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -242,6 +242,17 @@ func (ps *PeeringService) AddPeer(info peer.AddrInfo) { } } +// ListPeers lists peers in the peering service. +func (ps *PeeringService) ListPeers() []peer.AddrInfo { + out := make([]peer.AddrInfo, 0, len(ps.peers)) + for id, addrs := range ps.peers { + ai := peer.AddrInfo{ID: id} + ai.Addrs = append(ai.Addrs, addrs.addrs...) + out = append(out, ai) + } + return out +} + // RemovePeer removes a peer from the peering service. This function may be // safely called at any time: before the service is started, while running, or // after it stops. diff --git a/peering/peering_test.go b/peering/peering_test.go index 7ec42efb6..cf91e637c 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -39,6 +39,7 @@ func TestPeeringService(t *testing.T) { // peer 1 -> 2 ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) // We haven't started so we shouldn't have any peers. require.Never(t, func() bool { @@ -109,6 +110,7 @@ func TestPeeringService(t *testing.T) { // Unprotect 2 from 1. ps1.RemovePeer(h2.ID()) + require.NotContains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) // Trim connections. h1.ConnManager().TrimOpenConns(ctx) @@ -127,7 +129,9 @@ func TestPeeringService(t *testing.T) { // Until added back ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) ps1.AddPeer(peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}) + require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}) t.Logf("wait for h1 to connect to h2 and h3 again") require.Eventually(t, func() bool { return h1.Network().Connectedness(h2.ID()) == network.Connected @@ -142,7 +146,9 @@ func TestPeeringService(t *testing.T) { // Adding and removing should work after stopping. ps1.AddPeer(peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) + require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) ps1.RemovePeer(h2.ID()) + require.NotContains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) } func TestNextBackoff(t *testing.T) { From b36845795409beca11f134d79e9b938112ecd0b2 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 15 Sep 2021 18:59:08 +0200 Subject: [PATCH 12/21] fix: take the lock while listing peers This commit was moved from ipfs/kubo@92854db7aed4424fad117ceb4e13f64a80ff348b --- peering/peering.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/peering/peering.go b/peering/peering.go index 3146769a0..dbf955cde 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -244,6 +244,9 @@ func (ps *PeeringService) AddPeer(info peer.AddrInfo) { // ListPeers lists peers in the peering service. func (ps *PeeringService) ListPeers() []peer.AddrInfo { + ps.mu.RLock() + defer ps.mu.RUnlock() + out := make([]peer.AddrInfo, 0, len(ps.peers)) for id, addrs := range ps.peers { ai := peer.AddrInfo{ID: id} From 51b5ca2c9f8902d76072dc853cd788f63e98090a Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 29 Nov 2021 13:58:05 -0500 Subject: [PATCH 13/21] feat: go-libp2p 0.16, UnixFS autosharding and go-datastore with contexts (#8563) * plumb through go-datastore context changes * update go-libp2p to v0.16.0 * use LIBP2P_TCP_REUSEPORT instead of IPFS_REUSEPORT * use relay config * making deprecation notice match the go-ipfs-config key * docs(config): circuit relay v2 * docs(config): fix links and headers * feat(config): Internal.Libp2pForceReachability This switches to config that supports setting and reading Internal.Libp2pForceReachability OptionalString flag * use configuration option for static relays * chore: go-ipfs-config v0.18.0 https://github.com/ipfs/go-ipfs-config/releases/tag/v0.18.0 * feat: circuit v1 migration prompt when Swarm.EnableRelayHop is set (#8559) * exit when Swarm.EnableRelayHop is set * docs: Experimental.ShardingEnabled migration This ensures existing users of global sharding experiment get notified that the flag no longer works + that autosharding happens automatically. For people who NEED to keep the old behavior (eg. have no time to migrate today) there is a note about restoring it with `UnixFSShardingSizeThreshold`. * chore: add dag-jose code to the cid command output * add support for setting automatic unixfs sharding threshold from the config * test: have tests use low cutoff for sharding to mimic old behavior * test: change error message to match the current error * test: Add automatic sharding/unsharding tests (#8547) * test: refactored naming in the sharding sharness tests to make more sense * ci: set interop test executor to convenience image for Go1.16 + Node * ci: use interop master Co-authored-by: Marcin Rataj Co-authored-by: Marten Seemann Co-authored-by: Marcin Rataj Co-authored-by: Gus Eggert Co-authored-by: Lucas Molas This commit was moved from ipfs/kubo@52c177ced94a1dca6f2a440ba9f25a184ff75ddb --- peering/peering_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/peering/peering_test.go b/peering/peering_test.go index cf91e637c..27c9b7175 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -16,7 +16,6 @@ import ( func newNode(ctx context.Context, t *testing.T) host.Host { h, err := libp2p.New( - ctx, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), // We'd like to set the connection manager low water to 0, but // that would disable the connection manager. From f264de00b51eff162d506fa16a600f6cbd24a13a Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 8 Apr 2022 02:06:35 +0100 Subject: [PATCH 14/21] feat: opt-in Swarm.ResourceMgr (go-libp2p v0.18) (#8680) * update go-libp2p to v0.18.0 * initialize the resource manager * add resource manager stats/limit commands * load limit file when building resource manager * log absent limit file * write rcmgr to file when IPFS_DEBUG_RCMGR is set * fix: mark swarm limit|stats as experimental * feat(cfg): opt-in Swarm.ResourceMgr This ensures we can safely test the resource manager without impacting default behavior. - Resource manager is disabled by default - Default for Swarm.ResourceMgr.Enabled is false for now - Swarm.ResourceMgr.Limits allows user to tweak limits per specific scope in a way that is persisted across restarts - 'ipfs swarm limit system' outputs human-readable json - 'ipfs swarm limit system new-limits.json' sets new runtime limits (but does not change Swarm.ResourceMgr.Limits in the config) Conventions to make libp2p devs life easier: - 'IPFS_RCMGR=1 ipfs daemon' overrides the config and enables resource manager - 'limit.json' overrides implicit defaults from libp2p (if present) * docs(config): small tweaks * fix: skip libp2p.ResourceManager if disabled This ensures 'ipfs swarm limit|stats' work only when enabled. * fix: use NullResourceManager when disabled This reverts commit b19f7c9eca4cee4187f8cba3389dc2c930258512. after clarification feedback from https://github.com/ipfs/go-ipfs/pull/8680#discussion_r841680182 * style: rename IPFS_RCMGR to LIBP2P_RCMGR preexisting libp2p toggles use LIBP2P_ prefix * test: Swarm.ResourceMgr * fix: location of opt-in limit.json and rcmgr.json.gz Places these files inside of IPFS_PATH * Update docs/config.md * feat: expose rcmgr metrics when enabled (#8785) * add metrics for the resource manager * export protocol and service name in Prometheus metrics * fix: expose rcmgr metrics only when enabled Co-authored-by: Marcin Rataj * refactor: rcmgr_metrics.go * refactor: rcmgr_defaults.go This file defines implicit limit defaults used when Swarm.ResourceMgr.Enabled We keep vendored copy to ensure go-ipfs is not impacted when go-libp2p decides to change defaults in any of the future releases. * refactor: adjustedDefaultLimits Cleans up the way we initialize defaults and adds a fix for case when connection manager runs with high limits. It also hides `Swarm.ResourceMgr.Limits` until we have a better understanding what syntax makes sense. * chore: cleanup after a review * fix: restore go-ipld-prime v0.14.2 * fix: restore go-ds-flatfs v0.5.1 Co-authored-by: Lucas Molas Co-authored-by: Marcin Rataj This commit was moved from ipfs/kubo@514411bedbd7d22d277d31f13d8ad312224c3ad4 --- peering/peering_test.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/peering/peering_test.go b/peering/peering_test.go index 27c9b7175..09a54f2ce 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -6,20 +6,22 @@ import ( "time" "github.com/libp2p/go-libp2p" - connmgr "github.com/libp2p/go-libp2p-connmgr" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/stretchr/testify/require" ) -func newNode(ctx context.Context, t *testing.T) host.Host { +func newNode(t *testing.T) host.Host { + cm, err := connmgr.NewConnManager(1, 100, connmgr.WithGracePeriod(0)) + require.NoError(t, err) h, err := libp2p.New( libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), // We'd like to set the connection manager low water to 0, but // that would disable the connection manager. - libp2p.ConnectionManager(connmgr.NewConnManager(1, 100, 0)), + libp2p.ConnectionManager(cm), ) require.NoError(t, err) return h @@ -29,12 +31,12 @@ func TestPeeringService(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h1 := newNode(ctx, t) + h1 := newNode(t) ps1 := NewPeeringService(h1) - h2 := newNode(ctx, t) - h3 := newNode(ctx, t) - h4 := newNode(ctx, t) + h2 := newNode(t) + h3 := newNode(t) + h4 := newNode(t) // peer 1 -> 2 ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) From 9f54e443cc310aa19fa03561c3d5b760c25be4a2 Mon Sep 17 00:00:00 2001 From: Alvin Reyes Date: Sat, 11 Jun 2022 03:34:10 -0400 Subject: [PATCH 15/21] feat: add a public function on peering to get the state PR #9030 This commit was moved from ipfs/kubo@04651e395d6e7923f592bb2c04db872e00844ad6 --- peering/peering.go | 48 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/peering/peering.go b/peering/peering.go index dbf955cde..53b79bf75 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -4,6 +4,7 @@ import ( "context" "errors" "math/rand" + "strconv" "sync" "time" @@ -32,12 +33,25 @@ const ( var logger = log.Logger("peering") -type state int +type State uint + +func (s State) String() string { + switch s { + case StateInit: + return "init" + case StateRunning: + return "running" + case StateStopped: + return "stopped" + default: + return "unkown peering state: " + strconv.FormatUint(uint64(s), 10) + } +} const ( - stateInit state = iota - stateRunning - stateStopped + StateInit State = iota + StateRunning + StateStopped ) // peerHandler keeps track of all state related to a specific "peering" peer. @@ -155,7 +169,7 @@ type PeeringService struct { mu sync.RWMutex peers map[peer.ID]*peerHandler - state state + state State } // NewPeeringService constructs a new peering service. Peers can be added and @@ -172,35 +186,41 @@ func (ps *PeeringService) Start() error { defer ps.mu.Unlock() switch ps.state { - case stateInit: + case StateInit: logger.Infow("starting") - case stateRunning: + case StateRunning: return nil - case stateStopped: + case StateStopped: return errors.New("already stopped") } ps.host.Network().Notify((*netNotifee)(ps)) - ps.state = stateRunning + ps.state = StateRunning for _, handler := range ps.peers { go handler.startIfDisconnected() } return nil } +// GetState get the State of the PeeringService +func (ps *PeeringService) GetState() State { + ps.mu.RLock() + defer ps.mu.RUnlock() + return ps.state +} + // Stop stops the peering service. func (ps *PeeringService) Stop() error { ps.host.Network().StopNotify((*netNotifee)(ps)) - ps.mu.Lock() defer ps.mu.Unlock() switch ps.state { - case stateInit, stateRunning: + case StateInit, StateRunning: logger.Infow("stopping") for _, handler := range ps.peers { handler.stop() } - ps.state = stateStopped + ps.state = StateStopped } return nil } @@ -231,9 +251,9 @@ func (ps *PeeringService) AddPeer(info peer.AddrInfo) { handler.ctx, handler.cancel = context.WithCancel(context.Background()) ps.peers[info.ID] = handler switch ps.state { - case stateRunning: + case StateRunning: go handler.startIfDisconnected() - case stateStopped: + case StateStopped: // We still construct everything in this state because // it's easier to reason about. But we should still free // resources. From d7e9091fe3ab90abe6a5009bd21e6181fb62964f Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 29 Aug 2022 13:55:00 +0200 Subject: [PATCH 16/21] chore: bump go-libp2p v0.22.0 & go1.18&go1.19 Fixes: #9225 This commit was moved from ipfs/kubo@196887cbe5fbcd41243c1dfb0db681a1cc2914ff --- peering/peering.go | 6 +++--- peering/peering_test.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/peering/peering.go b/peering/peering.go index 53b79bf75..00801626f 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -9,9 +9,9 @@ import ( "time" "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" ) diff --git a/peering/peering_test.go b/peering/peering_test.go index 09a54f2ce..de07789c2 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -6,9 +6,9 @@ import ( "time" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/stretchr/testify/require" From 396c1667f203b3baa459be7f6ed733dd525f513b Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 17 Aug 2023 15:32:08 +0330 Subject: [PATCH 17/21] style: gofumpt and godot [skip changelog] (#10081) This commit was moved from ipfs/kubo@f12b372af9cc32975ff48397708fac3ec1f9966f --- peering/peering.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/peering/peering.go b/peering/peering.go index 00801626f..291d9491c 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -201,7 +201,7 @@ func (ps *PeeringService) Start() error { return nil } -// GetState get the State of the PeeringService +// GetState get the State of the PeeringService. func (ps *PeeringService) GetState() State { ps.mu.RLock() defer ps.mu.RUnlock() @@ -306,6 +306,7 @@ func (nn *netNotifee) Connected(_ network.Network, c network.Conn) { go handler.stopIfConnected() } } + func (nn *netNotifee) Disconnected(_ network.Network, c network.Conn) { ps := (*PeeringService)(nn) From dae313625705349710373dbc66bdc8290c90792c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Criado-P=C3=A9rez?= Date: Fri, 22 Sep 2023 13:08:26 +0200 Subject: [PATCH 18/21] docs: fix typos This commit was moved from ipfs/kubo@cc3c224c6278c4c62b8d732fefc54cf7246308d2 --- peering/peering.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peering/peering.go b/peering/peering.go index 291d9491c..34647d63c 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -44,7 +44,7 @@ func (s State) String() string { case StateStopped: return "stopped" default: - return "unkown peering state: " + strconv.FormatUint(uint64(s), 10) + return "unknown peering state: " + strconv.FormatUint(uint64(s), 10) } } From 41637c6a01e5f5118e57a07832871276d50cb9ff Mon Sep 17 00:00:00 2001 From: gammazero Date: Thu, 28 Sep 2023 13:46:40 -0700 Subject: [PATCH 19/21] peering: migrate from Kubo --- CHANGELOG.md | 2 ++ go.mod | 2 +- peering/peering.go | 3 +-- peering/peering_test.go | 4 ++-- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 71971240a..8494bdfde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ The following emojis are used to highlight certain changes: * The gateway now sets a `Cache-Control` header for requests under the `/ipns/` namespace if the TTL for the corresponding IPNS Records or DNSLink entities is known. * `boxo/bitswap/client`: * A new `WithoutDuplicatedBlockStats()` option can be used with `bitswap.New` and `bsclient.New`. This disable accounting for duplicated blocks, which requires a `blockstore.Has()` lookup for every received block and thus, can impact performance. +* ✨ Migrated repositories into Boxo + * [`github.com/ipfs/kubo/peering`](https://pkg.go.dev/github.com/ipfs/kubo/peering) => [`./peering`](./peering) ### Changed diff --git a/go.mod b/go.mod index 2f4d53569..cb0b34af4 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/ipfs/go-ipld-cbor v0.0.6 github.com/ipfs/go-ipld-format v0.5.0 github.com/ipfs/go-ipld-legacy v0.2.1 + github.com/ipfs/go-log v1.0.5 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipfs/go-metrics-interface v0.0.1 github.com/ipfs/go-peertaskqueue v0.8.1 @@ -107,7 +108,6 @@ require ( github.com/huin/goupnp v1.2.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect - github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-unixfs v0.4.5 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect diff --git a/peering/peering.go b/peering/peering.go index 34647d63c..cc549c369 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -209,7 +209,7 @@ func (ps *PeeringService) GetState() State { } // Stop stops the peering service. -func (ps *PeeringService) Stop() error { +func (ps *PeeringService) Stop() { ps.host.Network().StopNotify((*netNotifee)(ps)) ps.mu.Lock() defer ps.mu.Unlock() @@ -222,7 +222,6 @@ func (ps *PeeringService) Stop() error { } ps.state = StateStopped } - return nil } // AddPeer adds a peer to the peering service. This function may be safely diff --git a/peering/peering_test.go b/peering/peering_test.go index de07789c2..3d146e3e3 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -142,8 +142,8 @@ func TestPeeringService(t *testing.T) { }, 30*time.Second, 1*time.Second) // Should be able to repeatedly stop. - require.NoError(t, ps1.Stop()) - require.NoError(t, ps1.Stop()) + ps1.Stop() + ps1.Stop() // Adding and removing should work after stopping. ps1.AddPeer(peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) From 7926f7d95c540130e23aca6d0bf275357899a82a Mon Sep 17 00:00:00 2001 From: Jorropo Date: Tue, 24 Oct 2023 19:22:11 +0200 Subject: [PATCH 20/21] docs: improve changelog for peering --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8494bdfde..c17a9c46e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The following emojis are used to highlight certain changes: * A new `WithoutDuplicatedBlockStats()` option can be used with `bitswap.New` and `bsclient.New`. This disable accounting for duplicated blocks, which requires a `blockstore.Has()` lookup for every received block and thus, can impact performance. * ✨ Migrated repositories into Boxo * [`github.com/ipfs/kubo/peering`](https://pkg.go.dev/github.com/ipfs/kubo/peering) => [`./peering`](./peering) + A service which establish, overwatch and maintain long lived connections. ### Changed From b5cf3fedec15c0b52eaa83f08997c1ec90594011 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Tue, 24 Oct 2023 19:24:23 +0200 Subject: [PATCH 21/21] chore: remove outdated comment in peering --- peering/peering.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/peering/peering.go b/peering/peering.go index cc549c369..d1c54ead3 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -15,9 +15,6 @@ import ( "github.com/multiformats/go-multiaddr" ) -// Seed the random number generator. -// -// We don't need good randomness, but we do need randomness. const ( // maxBackoff is the maximum time between reconnect attempts. maxBackoff = 10 * time.Minute