diff --git a/CHANGELOG.md b/CHANGELOG.md index 71971240a..c17a9c46e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,9 @@ The following emojis are used to highlight certain changes: * The gateway now sets a `Cache-Control` header for requests under the `/ipns/` namespace if the TTL for the corresponding IPNS Records or DNSLink entities is known. * `boxo/bitswap/client`: * A new `WithoutDuplicatedBlockStats()` option can be used with `bitswap.New` and `bsclient.New`. This disable accounting for duplicated blocks, which requires a `blockstore.Has()` lookup for every received block and thus, can impact performance. +* ✨ Migrated repositories into Boxo + * [`github.com/ipfs/kubo/peering`](https://pkg.go.dev/github.com/ipfs/kubo/peering) => [`./peering`](./peering) + A service which establish, overwatch and maintain long lived connections. ### Changed diff --git a/go.mod b/go.mod index 2f4d53569..cb0b34af4 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/ipfs/go-ipld-cbor v0.0.6 github.com/ipfs/go-ipld-format v0.5.0 github.com/ipfs/go-ipld-legacy v0.2.1 + github.com/ipfs/go-log v1.0.5 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipfs/go-metrics-interface v0.0.1 github.com/ipfs/go-peertaskqueue v0.8.1 @@ -107,7 +108,6 @@ require ( github.com/huin/goupnp v1.2.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect - github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-unixfs v0.4.5 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect diff --git a/peering/peering.go b/peering/peering.go new file mode 100644 index 000000000..d1c54ead3 --- /dev/null +++ b/peering/peering.go @@ -0,0 +1,321 @@ +package peering + +import ( + "context" + "errors" + "math/rand" + "strconv" + "sync" + "time" + + "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" +) + +const ( + // maxBackoff is the maximum time between reconnect attempts. + maxBackoff = 10 * time.Minute + // The backoff will be cut off when we get within 10% of the actual max. + // If we go over the max, we'll adjust the delay down to a random value + // between 90-100% of the max backoff. + maxBackoffJitter = 10 // % + connmgrTag = "ipfs-peering" + // This needs to be sufficient to prevent two sides from simultaneously + // dialing. + initialDelay = 5 * time.Second +) + +var logger = log.Logger("peering") + +type State uint + +func (s State) String() string { + switch s { + case StateInit: + return "init" + case StateRunning: + return "running" + case StateStopped: + return "stopped" + default: + return "unknown peering state: " + strconv.FormatUint(uint64(s), 10) + } +} + +const ( + StateInit State = iota + StateRunning + StateStopped +) + +// peerHandler keeps track of all state related to a specific "peering" peer. +type peerHandler struct { + peer peer.ID + host host.Host + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + addrs []multiaddr.Multiaddr + reconnectTimer *time.Timer + + nextDelay time.Duration +} + +// setAddrs sets the addresses for this peer. +func (ph *peerHandler) setAddrs(addrs []multiaddr.Multiaddr) { + // Not strictly necessary, but it helps to not trust the calling code. + addrCopy := make([]multiaddr.Multiaddr, len(addrs)) + copy(addrCopy, addrs) + + ph.mu.Lock() + defer ph.mu.Unlock() + ph.addrs = addrCopy +} + +// getAddrs returns a shared slice of addresses for this peer. Do not modify. +func (ph *peerHandler) getAddrs() []multiaddr.Multiaddr { + ph.mu.Lock() + defer ph.mu.Unlock() + return ph.addrs +} + +// stop permanently stops the peer handler. +func (ph *peerHandler) stop() { + ph.cancel() + + ph.mu.Lock() + defer ph.mu.Unlock() + if ph.reconnectTimer != nil { + ph.reconnectTimer.Stop() + ph.reconnectTimer = nil + } +} + +func (ph *peerHandler) nextBackoff() time.Duration { + if ph.nextDelay < maxBackoff { + ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay))) + } + + // If we've gone over the max backoff, reduce it under the max. + if ph.nextDelay > maxBackoff { + ph.nextDelay = maxBackoff + // randomize the backoff a bit (10%). + ph.nextDelay -= time.Duration(rand.Int63n(int64(maxBackoff) * maxBackoffJitter / 100)) + } + + return ph.nextDelay +} + +func (ph *peerHandler) reconnect() { + // Try connecting + addrs := ph.getAddrs() + logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs) + + err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs}) + if err != nil { + logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err) + // Ok, we failed. Extend the timeout. + ph.mu.Lock() + if ph.reconnectTimer != nil { + // Only counts if the reconnectTimer still exists. If not, a + // connection _was_ somehow established. + ph.reconnectTimer.Reset(ph.nextBackoff()) + } + // Otherwise, someone else has stopped us so we can assume that + // we're either connected or someone else will start us. + ph.mu.Unlock() + } + + // Always call this. We could have connected since we processed the + // error. + ph.stopIfConnected() +} + +func (ph *peerHandler) stopIfConnected() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.reconnectTimer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected { + logger.Debugw("successfully reconnected", "peer", ph.peer) + ph.reconnectTimer.Stop() + ph.reconnectTimer = nil + ph.nextDelay = initialDelay + } +} + +// startIfDisconnected is the inverse of stopIfConnected. +func (ph *peerHandler) startIfDisconnected() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.reconnectTimer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected { + logger.Debugw("disconnected from peer", "peer", ph.peer) + // Always start with a short timeout so we can stagger things a bit. + ph.reconnectTimer = time.AfterFunc(ph.nextBackoff(), ph.reconnect) + } +} + +// PeeringService maintains connections to specified peers, reconnecting on +// disconnect with a back-off. +type PeeringService struct { + host host.Host + + mu sync.RWMutex + peers map[peer.ID]*peerHandler + state State +} + +// NewPeeringService constructs a new peering service. Peers can be added and +// removed immediately, but connections won't be formed until `Start` is called. +func NewPeeringService(host host.Host) *PeeringService { + return &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)} +} + +// Start starts the peering service, connecting and maintaining connections to +// all registered peers. It returns an error if the service has already been +// stopped. +func (ps *PeeringService) Start() error { + ps.mu.Lock() + defer ps.mu.Unlock() + + switch ps.state { + case StateInit: + logger.Infow("starting") + case StateRunning: + return nil + case StateStopped: + return errors.New("already stopped") + } + ps.host.Network().Notify((*netNotifee)(ps)) + ps.state = StateRunning + for _, handler := range ps.peers { + go handler.startIfDisconnected() + } + return nil +} + +// GetState get the State of the PeeringService. +func (ps *PeeringService) GetState() State { + ps.mu.RLock() + defer ps.mu.RUnlock() + return ps.state +} + +// Stop stops the peering service. +func (ps *PeeringService) Stop() { + ps.host.Network().StopNotify((*netNotifee)(ps)) + ps.mu.Lock() + defer ps.mu.Unlock() + + switch ps.state { + case StateInit, StateRunning: + logger.Infow("stopping") + for _, handler := range ps.peers { + handler.stop() + } + ps.state = StateStopped + } +} + +// AddPeer adds a peer to the peering service. This function may be safely +// called at any time: before the service is started, while running, or after it +// stops. +// +// Add peer may also be called multiple times for the same peer. The new +// addresses will replace the old. +func (ps *PeeringService) AddPeer(info peer.AddrInfo) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if handler, ok := ps.peers[info.ID]; ok { + logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs) + handler.setAddrs(info.Addrs) + } else { + logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs) + ps.host.ConnManager().Protect(info.ID, connmgrTag) + + handler = &peerHandler{ + host: ps.host, + peer: info.ID, + addrs: info.Addrs, + nextDelay: initialDelay, + } + handler.ctx, handler.cancel = context.WithCancel(context.Background()) + ps.peers[info.ID] = handler + switch ps.state { + case StateRunning: + go handler.startIfDisconnected() + case StateStopped: + // We still construct everything in this state because + // it's easier to reason about. But we should still free + // resources. + handler.cancel() + } + } +} + +// ListPeers lists peers in the peering service. +func (ps *PeeringService) ListPeers() []peer.AddrInfo { + ps.mu.RLock() + defer ps.mu.RUnlock() + + out := make([]peer.AddrInfo, 0, len(ps.peers)) + for id, addrs := range ps.peers { + ai := peer.AddrInfo{ID: id} + ai.Addrs = append(ai.Addrs, addrs.addrs...) + out = append(out, ai) + } + return out +} + +// RemovePeer removes a peer from the peering service. This function may be +// safely called at any time: before the service is started, while running, or +// after it stops. +func (ps *PeeringService) RemovePeer(id peer.ID) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if handler, ok := ps.peers[id]; ok { + logger.Infow("peer removed", "peer", id) + ps.host.ConnManager().Unprotect(id, connmgrTag) + + handler.stop() + delete(ps.peers, id) + } +} + +type netNotifee PeeringService + +func (nn *netNotifee) Connected(_ network.Network, c network.Conn) { + ps := (*PeeringService)(nn) + + p := c.RemotePeer() + ps.mu.RLock() + defer ps.mu.RUnlock() + + if handler, ok := ps.peers[p]; ok { + // use a goroutine to avoid blocking events. + go handler.stopIfConnected() + } +} + +func (nn *netNotifee) Disconnected(_ network.Network, c network.Conn) { + ps := (*PeeringService)(nn) + + p := c.RemotePeer() + ps.mu.RLock() + defer ps.mu.RUnlock() + + if handler, ok := ps.peers[p]; ok { + // use a goroutine to avoid blocking events. + go handler.startIfDisconnected() + } +} +func (nn *netNotifee) OpenedStream(network.Network, network.Stream) {} +func (nn *netNotifee) ClosedStream(network.Network, network.Stream) {} +func (nn *netNotifee) Listen(network.Network, multiaddr.Multiaddr) {} +func (nn *netNotifee) ListenClose(network.Network, multiaddr.Multiaddr) {} diff --git a/peering/peering_test.go b/peering/peering_test.go new file mode 100644 index 000000000..3d146e3e3 --- /dev/null +++ b/peering/peering_test.go @@ -0,0 +1,172 @@ +package peering + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/net/connmgr" + + "github.com/stretchr/testify/require" +) + +func newNode(t *testing.T) host.Host { + cm, err := connmgr.NewConnManager(1, 100, connmgr.WithGracePeriod(0)) + require.NoError(t, err) + h, err := libp2p.New( + libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), + // We'd like to set the connection manager low water to 0, but + // that would disable the connection manager. + libp2p.ConnectionManager(cm), + ) + require.NoError(t, err) + return h +} + +func TestPeeringService(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h1 := newNode(t) + ps1 := NewPeeringService(h1) + + h2 := newNode(t) + h3 := newNode(t) + h4 := newNode(t) + + // peer 1 -> 2 + ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + + // We haven't started so we shouldn't have any peers. + require.Never(t, func() bool { + return len(h1.Network().Peers()) > 0 + }, 100*time.Millisecond, 1*time.Second, "expected host 1 to have no peers") + + // Use p4 to take up the one slot we have in the connection manager. + for _, h := range []host.Host{h1, h2} { + require.NoError(t, h.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()})) + h.ConnManager().TagPeer(h4.ID(), "sticky-peer", 1000) + } + + // Now start. + require.NoError(t, ps1.Start()) + // starting twice is fine. + require.NoError(t, ps1.Start()) + + // We should eventually connect. + t.Logf("waiting for h1 to connect to h2") + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 10*time.Millisecond) + + // Now explicitly connect to h3. + t.Logf("waiting for h1's connection to h3 to work") + require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()})) + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h3.ID()) == network.Connected + }, 30*time.Second, 100*time.Millisecond) + + require.Len(t, h1.Network().Peers(), 3) + + // force a disconnect + h1.ConnManager().TrimOpenConns(ctx) + + // Should disconnect from h3. + t.Logf("waiting for h1's connection to h3 to disconnect") + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h3.ID()) != network.Connected + }, 5*time.Second, 10*time.Millisecond) + + // Should remain connected to p2 + require.Never(t, func() bool { + return h1.Network().Connectedness(h2.ID()) != network.Connected + }, 5*time.Second, 1*time.Second) + + // Now force h2 to disconnect (we have an asymmetric peering). + conns := h2.Network().ConnsToPeer(h1.ID()) + require.NotEmpty(t, conns) + h2.ConnManager().TrimOpenConns(ctx) + + // All conns to peer should eventually close. + t.Logf("waiting for all connections to close") + for _, c := range conns { + require.Eventually(t, func() bool { + s, err := c.NewStream(context.Background()) + if s != nil { + _ = s.Reset() + } + return err != nil + }, 5*time.Second, 10*time.Millisecond) + } + + // Should eventually re-connect. + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + + // Unprotect 2 from 1. + ps1.RemovePeer(h2.ID()) + require.NotContains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + + // Trim connections. + h1.ConnManager().TrimOpenConns(ctx) + + // Should disconnect + t.Logf("waiting for h1 to disconnect from h2") + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) != network.Connected + }, 5*time.Second, 10*time.Millisecond) + + // Should never reconnect. + t.Logf("ensuring h1 is not connected to h2 again") + require.Never(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 20*time.Second, 1*time.Second) + + // Until added back + ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + ps1.AddPeer(peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}) + require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}) + t.Logf("wait for h1 to connect to h2 and h3 again") + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h3.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + + // Should be able to repeatedly stop. + ps1.Stop() + ps1.Stop() + + // Adding and removing should work after stopping. + ps1.AddPeer(peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) + require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) + ps1.RemovePeer(h2.ID()) + require.NotContains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) +} + +func TestNextBackoff(t *testing.T) { + minMaxBackoff := (100 - maxBackoffJitter) / 100 * maxBackoff + for x := 0; x < 1000; x++ { + ph := peerHandler{nextDelay: time.Second} + for min, max := time.Second*3/2, time.Second*5/2; min < minMaxBackoff; min, max = min*3/2, max*5/2 { + b := ph.nextBackoff() + if b > max || b < min { + t.Errorf("expected backoff %s to be between %s and %s", b, min, max) + } + } + for i := 0; i < 100; i++ { + b := ph.nextBackoff() + if b < minMaxBackoff || b > maxBackoff { + t.Fatal("failed to stay within max bounds") + } + } + } +}