diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 0bc78f97f1..88301285ef 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -43,7 +43,7 @@ type discard ConnEventHandler* = - proc(peerId: PeerId, event: ConnEvent): Future[void] {.gcsafe, raises: [].} + proc(peerId: PeerId, event: ConnEvent): Future[void] {.gcsafe, async: (raises: []).} PeerEventKind* {.pure.} = enum Left @@ -58,7 +58,7 @@ type discard PeerEventHandler* = - proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe, raises: [].} + proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe, async: (raises: []).} ConnManager* = ref object of RootObj maxConnsPerPeer: int diff --git a/libp2p/protocols/connectivity/autonat/client.nim b/libp2p/protocols/connectivity/autonat/client.nim index 0347c6c8ea..dcfd092d8b 100644 --- a/libp2p/protocols/connectivity/autonat/client.nim +++ b/libp2p/protocols/connectivity/autonat/client.nim @@ -19,7 +19,9 @@ logScope: type AutonatClient* = ref object of RootObj -proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} = +proc sendDial( + conn: Connection, pid: PeerId, addrs: seq[MultiAddress] +) {.async: (raises: [CancelledError, LPStreamError]).} = let pb = AutonatDial( peerInfo: Opt.some(AutonatPeerInfo(id: Opt.some(pid), addrs: addrs)) ).encode() @@ -30,7 +32,9 @@ method dialMe*( switch: Switch, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress](), -): Future[MultiAddress] {.base, async.} = +): Future[MultiAddress] {. + base, async: (raises: [AutonatError, AutonatUnreachableError, CancelledError]) +.} = proc getResponseOrRaise( autonatMsg: Opt[AutonatMsg] ): AutonatDialResponse {.raises: [AutonatError].} = @@ -47,6 +51,8 @@ method dialMe*( await switch.dial(pid, @[AutonatCodec]) else: await switch.dial(pid, addrs, AutonatCodec) + except CancelledError as err: + raise err except DialFailedError as err: raise newException(AutonatError, "Unexpected error when dialling: " & err.msg, err) @@ -61,14 +67,37 @@ method dialMe*( incomingConnection.cancel() # Safer to always try to cancel cause we aren't sure if the peer dialled us or not if incomingConnection.completed(): - await (await incomingConnection).connection.close() - trace "sending Dial", addrs = switch.peerInfo.addrs - await conn.sendDial(switch.peerInfo.peerId, switch.peerInfo.addrs) - let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024))) + try: + await (await incomingConnection).connection.close() + except AlreadyExpectingConnectionError as e: + # this err is already handled above and could not happen later + error "Unexpected error", description = e.msg + + try: + trace "sending Dial", addrs = switch.peerInfo.addrs + await conn.sendDial(switch.peerInfo.peerId, switch.peerInfo.addrs) + except CancelledError as e: + raise e + except CatchableError as e: + raise newException(AutonatError, "Sending dial failed", e) + + var respBytes: seq[byte] + try: + respBytes = await conn.readLp(1024) + except CancelledError as e: + raise e + except CatchableError as e: + raise newException(AutonatError, "read Dial response failed", e) + + let response = getResponseOrRaise(AutonatMsg.decode(respBytes)) + return case response.status of ResponseStatus.Ok: - response.ma.tryGet() + try: + response.ma.tryGet() + except: + raiseAssert("checked with if") of ResponseStatus.DialError: raise newException( AutonatUnreachableError, "Peer could not dial us back: " & response.text.get("") diff --git a/libp2p/protocols/connectivity/autonat/server.nim b/libp2p/protocols/connectivity/autonat/server.nim index acc8ce1288..4a0917ddca 100644 --- a/libp2p/protocols/connectivity/autonat/server.nim +++ b/libp2p/protocols/connectivity/autonat/server.nim @@ -32,7 +32,9 @@ type Autonat* = ref object of LPProtocol switch*: Switch dialTimeout: Duration -proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} = +proc sendDial( + conn: Connection, pid: PeerId, addrs: seq[MultiAddress] +) {.async: (raises: [LPStreamError, CancelledError]).} = let pb = AutonatDial( peerInfo: Opt.some(AutonatPeerInfo(id: Opt.some(pid), addrs: addrs)) ).encode() diff --git a/libp2p/protocols/connectivity/autonat/service.nim b/libp2p/protocols/connectivity/autonat/service.nim index 95860c61c2..7d505f3f1e 100644 --- a/libp2p/protocols/connectivity/autonat/service.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -177,7 +177,9 @@ proc askConnectedPeers( if (await askPeer(self, switch, peer)) != Unknown: answersFromPeers.inc() -proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.async.} = +proc schedule( + service: AutonatService, switch: Switch, interval: Duration +) {.async: (raises: [CancelledError]).} = heartbeat "Scheduling AutonatService run", interval: await service.run(switch) @@ -214,15 +216,19 @@ method setup*( if self.askNewConnectedPeers: self.newConnectedPeerHandler = proc( peerId: PeerId, event: PeerEvent - ): Future[void] {.async.} = + ): Future[void] {.async: (raises: []).} = discard askPeer(self, switch, peerId) + switch.connManager.addPeerEventHandler( self.newConnectedPeerHandler, PeerEventKind.Joined ) + self.scheduleInterval.withValue(interval): self.scheduleHandle = schedule(self, switch, interval) + if self.enableAddressMapper: switch.peerInfo.addressMappers.add(self.addressMapper) + return hasBeenSetup method run*( diff --git a/libp2p/protocols/connectivity/dcutr/client.nim b/libp2p/protocols/connectivity/dcutr/client.nim index 19e0df4de6..03f3c94783 100644 --- a/libp2p/protocols/connectivity/dcutr/client.nim +++ b/libp2p/protocols/connectivity/dcutr/client.nim @@ -34,7 +34,7 @@ proc new*( proc startSync*( self: DcutrClient, switch: Switch, remotePeerId: PeerId, addrs: seq[MultiAddress] -) {.async.} = +) {.async: (raises: [DcutrError, CancelledError]).} = logScope: peerId = switch.peerInfo.peerId diff --git a/libp2p/protocols/connectivity/dcutr/core.nim b/libp2p/protocols/connectivity/dcutr/core.nim index 43c3b4dfc2..5c248da97f 100644 --- a/libp2p/protocols/connectivity/dcutr/core.nim +++ b/libp2p/protocols/connectivity/dcutr/core.nim @@ -50,7 +50,9 @@ proc decode*(_: typedesc[DcutrMsg], buf: seq[byte]): DcutrMsg {.raises: [DcutrEr raise newException(DcutrError, "Received malformed message") return dcutrMsg -proc send*(conn: Connection, msgType: MsgType, addrs: seq[MultiAddress]) {.async.} = +proc send*( + conn: Connection, msgType: MsgType, addrs: seq[MultiAddress] +) {.async: (raises: [CancelledError, LPStreamError]).} = let pb = DcutrMsg(msgType: msgType, addrs: addrs).encode() await conn.writeLp(pb.buffer) diff --git a/libp2p/protocols/connectivity/relay/relay.nim b/libp2p/protocols/connectivity/relay/relay.nim index 8e74d1123f..24cec3fa05 100644 --- a/libp2p/protocols/connectivity/relay/relay.nim +++ b/libp2p/protocols/connectivity/relay/relay.nim @@ -333,7 +333,7 @@ proc handleStreamV1(r: Relay, conn: Connection) {.async.} = proc setup*(r: Relay, switch: Switch) = r.switch = switch r.switch.addPeerEventHandler( - proc(peerId: PeerId, event: PeerEvent) {.async.} = + proc(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} = r.rsvp.del(peerId), Left, ) diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 2f7acf8754..c3b4e6bbf9 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -166,7 +166,12 @@ method init*(p: Identify) = proc identify*( self: Identify, conn: Connection, remotePeerId: PeerId -): Future[IdentifyInfo] {.async.} = +): Future[IdentifyInfo] {. + async: ( + raises: + [IdentityInvalidMsgError, IdentityNoMatchError, LPStreamError, CancelledError] + ) +.} = trace "initiating identify", conn var message = await conn.readLp(64 * 1024) if len(message) == 0: diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 7cc99064ab..17803c4e5f 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -680,7 +680,7 @@ proc init*[PubParams: object | bool]( topicsHigh: int.high, ) - proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = + proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} = if event.kind == PeerEventKind.Joined: pubsub.subscribePeer(peerId) else: diff --git a/libp2p/protocols/rendezvous.nim b/libp2p/protocols/rendezvous.nim index c06c4059ef..a4cb8c2438 100644 --- a/libp2p/protocols/rendezvous.nim +++ b/libp2p/protocols/rendezvous.nim @@ -676,7 +676,7 @@ proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} = proc setup*(rdv: RendezVous, switch: Switch) = rdv.switch = switch - proc handlePeer(peerId: PeerId, event: PeerEvent) {.async.} = + proc handlePeer(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} = if event.kind == PeerEventKind.Joined: rdv.peers.add(peerId) elif event.kind == PeerEventKind.Left: diff --git a/libp2p/services/autorelayservice.nim b/libp2p/services/autorelayservice.nim index ea6a834724..67301dc4f6 100644 --- a/libp2p/services/autorelayservice.nim +++ b/libp2p/services/autorelayservice.nim @@ -68,12 +68,14 @@ method setup*( let hasBeenSetUp = await procCall Service(self).setup(switch) if hasBeenSetUp: - proc handlePeerIdentified(peerId: PeerId, event: PeerEvent) {.async.} = + proc handlePeerIdentified( + peerId: PeerId, event: PeerEvent + ) {.async: (raises: []).} = trace "Peer Identified", peerId if self.relayPeers.len < self.maxNumRelays: self.peerAvailable.fire() - proc handlePeerLeft(peerId: PeerId, event: PeerEvent) {.async.} = + proc handlePeerLeft(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} = trace "Peer Left", peerId self.relayPeers.withValue(peerId, future): future[].cancel() diff --git a/libp2p/services/hpservice.nim b/libp2p/services/hpservice.nim index e7ea276e2d..eef40fbef3 100644 --- a/libp2p/services/hpservice.nim +++ b/libp2p/services/hpservice.nim @@ -39,8 +39,10 @@ proc new*( proc tryStartingDirectConn( self: HPService, switch: Switch, peerId: PeerId -): Future[bool] {.async.} = - proc tryConnect(address: MultiAddress): Future[bool] {.async.} = +): Future[bool] {.async: (raises: [CancelledError]).} = + proc tryConnect( + address: MultiAddress + ): Future[bool] {.async: (raises: [DialFailedError, CancelledError]).} = debug "Trying to create direct connection", peerId, address await switch.connect(peerId, @[address], true, false) debug "Direct connection created." @@ -57,13 +59,13 @@ proc tryStartingDirectConn( continue return false -proc closeRelayConn(relayedConn: Connection) {.async.} = +proc closeRelayConn(relayedConn: Connection) {.async: (raises: [CancelledError]).} = await sleepAsync(2000.milliseconds) # grace period before closing relayed connection await relayedConn.close() proc newConnectedPeerHandler( self: HPService, switch: Switch, peerId: PeerId, event: PeerEvent -) {.async.} = +) {.async: (raises: []).} = try: # Get all connections to the peer. If there is at least one non-relayed connection, return. let connections = switch.connManager.getConnections()[peerId].mapIt(it.connection) @@ -102,8 +104,13 @@ method setup*( except LPError as err: error "Failed to mount Dcutr", err = err.msg - self.newConnectedPeerHandler = proc(peerId: PeerId, event: PeerEvent) {.async.} = - await newConnectedPeerHandler(self, switch, peerId, event) + self.newConnectedPeerHandler = proc( + peerId: PeerId, event: PeerEvent + ) {.async: (raises: []).} = + try: + await newConnectedPeerHandler(self, switch, peerId, event) + except CancelledError: + trace "hole punching cancelled" switch.connManager.addPeerEventHandler( self.newConnectedPeerHandler, PeerEventKind.Joined diff --git a/tests/stubs/autonatclientstub.nim b/tests/stubs/autonatclientstub.nim index 9923ecf324..2090198d6a 100644 --- a/tests/stubs/autonatclientstub.nim +++ b/tests/stubs/autonatclientstub.nim @@ -37,14 +37,16 @@ method dialMe*( switch: Switch, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress](), -): Future[MultiAddress] {.async.} = +): Future[MultiAddress] {. + async: (raises: [AutonatError, AutonatUnreachableError, CancelledError]) +.} = self.dials += 1 if self.dials == self.expectedDials: self.finished.complete() case self.answer of Reachable: - return MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + return MultiAddress.init("/ip4/0.0.0.0/tcp/0").get() of NotReachable: raise newException(AutonatUnreachableError, "") of Unknown: diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 152f306673..2f21408707 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -308,7 +308,7 @@ suite "Switch": var step = 0 var kinds: set[ConnEventKind] - proc hook(peerId: PeerId, event: ConnEvent) {.async.} = + proc hook(peerId: PeerId, event: ConnEvent) {.async: (raises: []).} = kinds = kinds + {event.kind} case step of 0: @@ -356,7 +356,7 @@ suite "Switch": var step = 0 var kinds: set[ConnEventKind] - proc hook(peerId: PeerId, event: ConnEvent) {.async.} = + proc hook(peerId: PeerId, event: ConnEvent) {.async: (raises: []).} = kinds = kinds + {event.kind} case step of 0: @@ -404,7 +404,7 @@ suite "Switch": var step = 0 var kinds: set[PeerEventKind] - proc handler(peerId: PeerId, event: PeerEvent) {.async.} = + proc handler(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} = kinds = kinds + {event.kind} case step of 0: @@ -451,7 +451,7 @@ suite "Switch": var step = 0 var kinds: set[PeerEventKind] - proc handler(peerId: PeerId, event: PeerEvent) {.async.} = + proc handler(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} = kinds = kinds + {event.kind} case step of 0: @@ -504,7 +504,7 @@ suite "Switch": var step = 0 var kinds: set[PeerEventKind] - proc handler(peerId: PeerId, event: PeerEvent) {.async.} = + proc handler(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} = kinds = kinds + {event.kind} case step of 0: @@ -562,14 +562,17 @@ suite "Switch": var switches: seq[Switch] var done = newFuture[void]() var onConnect: Future[void] - proc hook(peerId: PeerId, event: ConnEvent) {.async.} = - case event.kind - of ConnEventKind.Connected: - await onConnect - await switches[0].disconnect(peerInfo.peerId) # trigger disconnect - of ConnEventKind.Disconnected: - check not switches[0].isConnected(peerInfo.peerId) - done.complete() + proc hook(peerId: PeerId, event: ConnEvent) {.async: (raises: []).} = + try: + case event.kind + of ConnEventKind.Connected: + await onConnect + await switches[0].disconnect(peerInfo.peerId) # trigger disconnect + of ConnEventKind.Disconnected: + check not switches[0].isConnected(peerInfo.peerId) + done.complete() + except: + check false # should not get here switches.add(newStandardSwitch(rng = rng)) @@ -597,20 +600,23 @@ suite "Switch": var switches: seq[Switch] var done = newFuture[void]() var onConnect: Future[void] - proc hook(peerId2: PeerId, event: ConnEvent) {.async.} = - case event.kind - of ConnEventKind.Connected: - if conns == 5: - await onConnect - await switches[0].disconnect(peerInfo.peerId) # trigger disconnect - return - - conns.inc - of ConnEventKind.Disconnected: - if conns == 1: - check not switches[0].isConnected(peerInfo.peerId) - done.complete() - conns.dec + proc hook(peerId2: PeerId, event: ConnEvent) {.async: (raises: []).} = + try: + case event.kind + of ConnEventKind.Connected: + if conns == 5: + await onConnect + await switches[0].disconnect(peerInfo.peerId) # trigger disconnect + return + + conns.inc + of ConnEventKind.Disconnected: + if conns == 1: + check not switches[0].isConnected(peerInfo.peerId) + done.complete() + conns.dec + except: + check false # should not get here switches.add(newStandardSwitch(maxConnsPerPeer = 10, rng = rng))