diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 1edbe864e0..701879a2fe 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -42,7 +42,10 @@ const MaxMsgSize* = 1 shl 20 # 1mb proc newInvalidMplexMsgType*(): ref InvalidMplexMsgType = newException(InvalidMplexMsgType, "invalid message type") -proc readMsg*(conn: Connection): Future[Msg] {.async.} = +proc readMsg*( + conn: Connection +): Future[Msg] {.async: (raises: [ + CancelledError, LPStreamError, MuxerError]).} = let header = await conn.readVarint() trace "read header varint", varint = header, conn @@ -55,10 +58,13 @@ proc readMsg*(conn: Connection): Future[Msg] {.async.} = return (header shr 3, MessageType(msgType), data) -proc writeMsg*(conn: Connection, - id: uint64, - msgType: MessageType, - data: seq[byte] = @[]): Future[void] = +proc writeMsg*( + conn: Connection, + id: uint64, + msgType: MessageType, + data: seq[byte] = @[] +): Future[void] {.async: (raises: [ + CancelledError, LPStreamError], raw: true).} = var left = data.len offset = 0 @@ -84,8 +90,11 @@ proc writeMsg*(conn: Connection, # message gets written before some of the chunks conn.write(buf.buffer) -proc writeMsg*(conn: Connection, - id: uint64, - msgType: MessageType, - data: string): Future[void] = +proc writeMsg*( + conn: Connection, + id: uint64, + msgType: MessageType, + data: string +): Future[void] {.async: (raises: [ + CancelledError, LPStreamError], raw: true).} = conn.writeMsg(id, msgType, data.toBytes()) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index d094b031ad..f43c092a2e 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -28,7 +28,8 @@ when defined(libp2p_mplex_metrics): declareHistogram libp2p_mplex_qtime, "message queuing time" when defined(libp2p_network_protocols_metrics): - declareCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"] + declareCounter libp2p_protocols_bytes, + "total sent or received bytes", ["protocol", "direction"] ## Channel half-closed states ## @@ -64,16 +65,16 @@ type func shortLog*(s: LPChannel): auto = try: - if s.isNil: "LPChannel(nil)" + if s == nil: "LPChannel(nil)" elif s.name != $s.oid and s.name.len > 0: &"{shortLog(s.conn.peerId)}:{s.oid}:{s.name}" else: &"{shortLog(s.conn.peerId)}:{s.oid}" except ValueError as exc: - raise newException(Defect, exc.msg) + raiseAssert(exc.msg) chronicles.formatIt(LPChannel): shortLog(it) -proc open*(s: LPChannel) {.async.} = +proc open*(s: LPChannel) {.async: (raises: [CancelledError, LPStreamError]).} = trace "Opening channel", s, conn = s.conn if s.conn.isClosed: return @@ -82,20 +83,20 @@ proc open*(s: LPChannel) {.async.} = s.isOpen = true except CancelledError as exc: raise exc - except CatchableError as exc: + except LPStreamError as exc: await s.conn.close() raise exc method closed*(s: LPChannel): bool = s.closedLocal -proc closeUnderlying(s: LPChannel): Future[void] {.async.} = +proc closeUnderlying(s: LPChannel): Future[void] {.async: (raises: []).} = ## Channels may be closed for reading and writing in any order - we'll close ## the underlying bufferstream when both directions are closed if s.closedLocal and s.atEof(): await procCall BufferStream(s).close() -proc reset*(s: LPChannel) {.async.} = +proc reset*(s: LPChannel) {.async: (raises: []).} = if s.isClosed: trace "Already closed", s return @@ -108,22 +109,21 @@ proc reset*(s: LPChannel) {.async.} = if s.isOpen and not s.conn.isClosed: # If the connection is still active, notify the other end - proc resetMessage() {.async.} = + proc resetMessage() {.async: (raises: []).} = try: trace "sending reset message", s, conn = s.conn - await s.conn.writeMsg(s.id, s.resetCode) # write reset - except CatchableError as exc: - # No cancellations - await s.conn.close() + await noCancel s.conn.writeMsg(s.id, s.resetCode) # write reset + except LPStreamError as exc: trace "Can't send reset message", s, conn = s.conn, msg = exc.msg + await s.conn.close() asyncSpawn resetMessage() - await s.closeImpl() # noraises, nocancels + await s.closeImpl() trace "Channel reset", s -method close*(s: LPChannel) {.async.} = +method close*(s: LPChannel) {.async: (raises: []).} = ## Close channel for writing - a message will be sent to the other peer ## informing them that the channel is closed and that we're waiting for ## their acknowledgement. @@ -137,10 +137,9 @@ method close*(s: LPChannel) {.async.} = if s.isOpen and not s.conn.isClosed: try: await s.conn.writeMsg(s.id, s.closeCode) # write close - except CancelledError as exc: + except CancelledError: await s.conn.close() - raise exc - except CatchableError as exc: + except LPStreamError as exc: # It's harmless that close message cannot be sent - the connection is # likely down already await s.conn.close() @@ -154,16 +153,17 @@ method initStream*(s: LPChannel) = if s.objName.len == 0: s.objName = LPChannelTrackerName - s.timeoutHandler = proc(): Future[void] {.gcsafe.} = + s.timeoutHandler = proc(): Future[void] {.async: (raises: [], raw: true).} = trace "Idle timeout expired, resetting LPChannel", s s.reset() procCall BufferStream(s).initStream() -method readOnce*(s: LPChannel, - pbytes: pointer, - nbytes: int): - Future[int] {.async.} = +method readOnce*( + s: LPChannel, + pbytes: pointer, + nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = ## Mplex relies on reading being done regularly from every channel, or all ## channels are blocked - in particular, this means that reading from one ## channel must not be done from within a callback / read handler of another @@ -186,15 +186,19 @@ method readOnce*(s: LPChannel, if bytes == 0: await s.closeUnderlying() return bytes - except CatchableError as exc: - # readOnce in BufferStream generally raises on EOF or cancellation - for - # the former, resetting is harmless, for the latter it's necessary because - # data has been lost in s.readBuf and there's no way to gracefully recover / - # use the channel any more + except CancelledError as exc: + await s.reset() + raise exc + except LPStreamError as exc: + # Resetting is necessary because data has been lost in s.readBuf and + # there's no way to gracefully recover / use the channel any more await s.reset() raise newLPStreamConnDownError(exc) -proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = +proc prepareWrite( + s: LPChannel, + msg: seq[byte] +): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} = # prepareWrite is the slow path of writing a message - see conditions in # write if s.remoteReset: @@ -222,7 +226,10 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = await s.conn.writeMsg(s.id, s.msgCode, msg) proc completeWrite( - s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} = + s: LPChannel, + fut: Future[void].Raising([CancelledError, LPStreamError]), + msgLen: int +): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} = try: s.writes += 1 @@ -250,7 +257,7 @@ proc completeWrite( raise exc except LPStreamEOFError as exc: raise exc - except CatchableError as exc: + except LPStreamError as exc: trace "exception in lpchannel write handler", s, msg = exc.msg await s.reset() await s.conn.close() @@ -258,7 +265,11 @@ proc completeWrite( finally: s.writes -= 1 -method write*(s: LPChannel, msg: seq[byte]): Future[void] = +method write*( + s: LPChannel, + msg: seq[byte] +): Future[void] {.async: (raises: [ + CancelledError, LPStreamError], raw: true).} = ## Write to mplex channel - there may be up to MaxWrite concurrent writes ## pending after which the peer is disconnected @@ -279,13 +290,12 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] = method getWrapped*(s: LPChannel): Connection = s.conn proc init*( - L: type LPChannel, - id: uint64, - conn: Connection, - initiator: bool, - name: string = "", - timeout: Duration = DefaultChanTimeout): LPChannel = - + L: type LPChannel, + id: uint64, + conn: Connection, + initiator: bool, + name: string = "", + timeout: Duration = DefaultChanTimeout): LPChannel = let chann = L( id: id, name: name, diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index d40b3817c4..d9970b3e66 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -56,7 +56,7 @@ proc newTooManyChannels(): ref TooManyChannels = proc newInvalidChannelIdError(): ref InvalidChannelIdError = newException(InvalidChannelIdError, "max allowed channel count exceeded") -proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = +proc cleanupChann(m: Mplex, chann: LPChannel) {.async: (raises: []), inline.} = ## remove the local channel from the internal tables ## try: @@ -68,19 +68,19 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = libp2p_mplex_channels.set( m.channels[chann.initiator].len.int64, labelValues = [$chann.initiator, $m.connection.peerId]) - except CatchableError as exc: + except CancelledError as exc: warn "Error cleaning up mplex channel", m, chann, msg = exc.msg -proc newStreamInternal*(m: Mplex, - initiator: bool = true, - chanId: uint64 = 0, - name: string = "", - timeout: Duration): LPChannel - {.gcsafe, raises: [InvalidChannelIdError].} = +proc newStreamInternal*( + m: Mplex, + initiator: bool = true, + chanId: uint64 = 0, + name: string = "", + timeout: Duration): LPChannel {.gcsafe, raises: [InvalidChannelIdError].} = ## create new channel/stream ## - let id = if initiator: - m.currentId.inc(); m.currentId + let id = + if initiator: m.currentId.inc(); m.currentId else: chanId if id in m.channels[initiator]: @@ -111,18 +111,14 @@ proc newStreamInternal*(m: Mplex, m.channels[initiator].len.int64, labelValues = [$initiator, $m.connection.peerId]) -proc handleStream(m: Mplex, chann: LPChannel) {.async.} = +proc handleStream(m: Mplex, chann: LPChannel) {.async: (raises: []).} = ## call the muxer stream handler for this channel ## - try: - await m.streamHandler(chann) - trace "finished handling stream", m, chann - doAssert(chann.closed, "connection not closed by handler!") - except CatchableError as exc: - trace "Exception in mplex stream handler", m, chann, msg = exc.msg - await chann.reset() + await m.streamHandler(chann) + trace "finished handling stream", m, chann + doAssert(chann.closed, "connection not closed by handler!") -method handle*(m: Mplex) {.async.} = +method handle*(m: Mplex) {.async: (raises: []).} = trace "Starting mplex handler", m try: while not m.connection.atEof: @@ -150,7 +146,7 @@ method handle*(m: Mplex) {.async.} = else: if m.channels[false].len > m.maxChannCount - 1: warn "too many channels created by remote peer", - allowedMax = MaxChannelCount, m + allowedMax = MaxChannelCount, m raise newTooManyChannels() let name = string.fromBytes(data) @@ -159,59 +155,65 @@ method handle*(m: Mplex) {.async.} = trace "Processing channel message", m, channel, data = data.shortLog case msgType: - of MessageType.New: - trace "created channel", m, channel - - if not isNil(m.streamHandler): - # Launch handler task - # All the errors are handled inside `handleStream()` procedure. - asyncSpawn m.handleStream(channel) - - of MessageType.MsgIn, MessageType.MsgOut: - if data.len > MaxMsgSize: - warn "attempting to send a packet larger than allowed", - allowed = MaxMsgSize, channel - raise newLPStreamLimitError() - - trace "pushing data to channel", m, channel, len = data.len - try: - await channel.pushData(data) - trace "pushed data to channel", m, channel, len = data.len - except LPStreamClosedError as exc: - # Channel is being closed, but `cleanupChann` was not yet triggered. - trace "pushing data to channel failed", m, channel, len = data.len, - msg = exc.msg - discard # Ignore message, same as if `cleanupChann` had completed. - - of MessageType.CloseIn, MessageType.CloseOut: - await channel.pushEof() - of MessageType.ResetIn, MessageType.ResetOut: - channel.remoteReset = true - await channel.reset() + of MessageType.New: + trace "created channel", m, channel + + if m.streamHandler != nil: + # Launch handler task + # All the errors are handled inside `handleStream()` procedure. + asyncSpawn m.handleStream(channel) + + of MessageType.MsgIn, MessageType.MsgOut: + if data.len > MaxMsgSize: + warn "attempting to send a packet larger than allowed", + allowed = MaxMsgSize, channel + raise newLPStreamLimitError() + + trace "pushing data to channel", m, channel, len = data.len + try: + await channel.pushData(data) + trace "pushed data to channel", m, channel, len = data.len + except LPStreamClosedError as exc: + # Channel is being closed, but `cleanupChann` was not yet triggered. + trace "pushing data to channel failed", m, channel, len = data.len, + msg = exc.msg + discard # Ignore message, same as if `cleanupChann` had completed. + + of MessageType.CloseIn, MessageType.CloseOut: + await channel.pushEof() + of MessageType.ResetIn, MessageType.ResetOut: + channel.remoteReset = true + await channel.reset() except CancelledError: debug "Unexpected cancellation in mplex handler", m except LPStreamEOFError as exc: trace "Stream EOF", m, msg = exc.msg - except CatchableError as exc: - debug "Unexpected exception in mplex read loop", m, msg = exc.msg + except LPStreamError as exc: + debug "Unexpected stream exception in mplex read loop", m, msg = exc.msg + except MuxerError as exc: + debug "Unexpected muxer exception in mplex read loop", m, msg = exc.msg finally: await m.close() trace "Stopped mplex handler", m -proc new*(M: type Mplex, - conn: Connection, - inTimeout: Duration = DefaultChanTimeout, - outTimeout: Duration = DefaultChanTimeout, - maxChannCount: int = MaxChannelCount): Mplex = +proc new*( + M: type Mplex, + conn: Connection, + inTimeout: Duration = DefaultChanTimeout, + outTimeout: Duration = DefaultChanTimeout, + maxChannCount: int = MaxChannelCount): Mplex = M(connection: conn, inChannTimeout: inTimeout, outChannTimeout: outTimeout, oid: genOid(), maxChannCount: maxChannCount) -method newStream*(m: Mplex, - name: string = "", - lazy: bool = false): Future[Connection] {.async.} = +method newStream*( + m: Mplex, + name: string = "", + lazy: bool = false +): Future[Connection] {.async: (raises: [ + CancelledError, LPStreamError, MuxerError]).} = let channel = m.newStreamInternal(timeout = m.inChannTimeout) if not lazy: @@ -219,7 +221,7 @@ method newStream*(m: Mplex, return Connection(channel) -method close*(m: Mplex) {.async.} = +method close*(m: Mplex) {.async: (raises: []).} = if m.isClosed: trace "Already closed", m return diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index bb573dbf22..fca06ee751 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -23,16 +23,17 @@ type MuxerError* = object of LPError TooManyChannels* = object of MuxerError - StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe, raises: [].} - MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe, raises: [].} + StreamHandler* = proc(conn: Connection): Future[void] {.async: (raises: []).} + MuxerHandler* = proc(muxer: Muxer): Future[void] {.async: (raises: []).} Muxer* = ref object of RootObj streamHandler*: StreamHandler - handler*: Future[void] + handler*: Future[void].Raising([]) connection*: Connection # user provider proc that returns a constructed Muxer - MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure, raises: [].} + MuxerConstructor* = + proc(conn: Connection): Muxer {.gcsafe, closure, raises: [].} # this wraps a creator proc that knows how to make muxers MuxerProvider* = object @@ -40,24 +41,32 @@ type codec*: string func shortLog*(m: Muxer): auto = - if isNil(m): "nil" + if m == nil: "nil" else: shortLog(m.connection) + chronicles.formatIt(Muxer): shortLog(it) # muxer interface -method newStream*(m: Muxer, name: string = "", lazy: bool = false): - Future[Connection] {.base, async.} = discard -method close*(m: Muxer) {.base, async.} = - if not isNil(m.connection): +method newStream*( + m: Muxer, + name: string = "", + lazy: bool = false +): Future[Connection] {.base, async: (raises: [ + CancelledError, LPStreamError, MuxerError], raw: true).} = + raiseAssert("Not implemented!") + +method close*(m: Muxer) {.base, async: (raises: []).} = + if m.connection != nil: await m.connection.close() -method handle*(m: Muxer): Future[void] {.base, async.} = discard -proc new*( - T: typedesc[MuxerProvider], - creator: MuxerConstructor, - codec: string): T {.gcsafe.} = +method handle*(m: Muxer): Future[void] {.base, async: (raises: []).} = discard +proc new*( + T: typedesc[MuxerProvider], + creator: MuxerConstructor, + codec: string): T {.gcsafe.} = let muxerProvider = T(newMuxer: creator, codec: codec) muxerProvider -method getStreams*(m: Muxer): seq[Connection] {.base.} = doAssert false, "not implemented" +method getStreams*(m: Muxer): seq[Connection] {.base.} = + raiseAssert("Not implemented!") diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index d83dbefc45..5f12a5ead1 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -27,11 +27,14 @@ const MaxChannelCount = 200 when defined(libp2p_yamux_metrics): - declareGauge(libp2p_yamux_channels, "yamux channels", labels = ["initiator", "peer"]) - declareHistogram libp2p_yamux_send_queue, "message send queue length (in byte)", - buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0] - declareHistogram libp2p_yamux_recv_queue, "message recv queue length (in byte)", - buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0] + declareGauge libp2p_yamux_channels, + "yamux channels", labels = ["initiator", "peer"] + declareHistogram libp2p_yamux_send_queue, + "message send queue length (in byte)", buckets = [ + 0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0] + declareHistogram libp2p_yamux_recv_queue, + "message recv queue length (in byte)", buckets = [ + 0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0] type YamuxError* = object of MuxerError @@ -60,7 +63,10 @@ type streamId: uint32 length: uint32 -proc readHeader(conn: LPStream): Future[YamuxHeader] {.async.} = +proc readHeader( + conn: LPStream +): Future[YamuxHeader] {.async: (raises: [ + CancelledError, LPStreamError, MuxerError]).} = var buffer: array[12, byte] await conn.readExactly(addr buffer[0], 12) @@ -74,10 +80,10 @@ proc readHeader(conn: LPStream): Future[YamuxHeader] {.async.} = return result proc `$`(header: YamuxHeader): string = - result = "{" & $header.msgType & ", " - result &= "{" & header.flags.foldl(if a != "": a & ", " & $b else: $b, "") & "}, " - result &= "streamId: " & $header.streamId & ", " - result &= "length: " & $header.length & "}" + "{" & $header.msgType & ", " & + "{" & header.flags.foldl(if a != "": a & ", " & $b else: $b, "") & "}, " & + "streamId: " & $header.streamId & ", " & + "length: " & $header.length & "}" proc encode(header: YamuxHeader): array[12, byte] = result[0] = header.version @@ -86,10 +92,14 @@ proc encode(header: YamuxHeader): array[12, byte] = result[4..7] = toBytesBE(header.streamId) result[8..11] = toBytesBE(header.length) -proc write(conn: LPStream, header: YamuxHeader): Future[void] {.gcsafe.} = +proc write( + conn: LPStream, + header: YamuxHeader +): Future[void] {.async: (raises: [ + CancelledError, LPStreamError], raw: true).} = trace "write directly on stream", h = $header var buffer = header.encode() - return conn.write(@buffer) + conn.write(@buffer) proc ping(T: type[YamuxHeader], flag: MsgFlags, pingData: uint32): T = T( @@ -107,11 +117,10 @@ proc goAway(T: type[YamuxHeader], status: GoAwayStatus): T = ) proc data( - T: type[YamuxHeader], - streamId: uint32, - length: uint32 = 0, - flags: set[MsgFlags] = {}, - ): T = + T: type[YamuxHeader], + streamId: uint32, + length: uint32 = 0, + flags: set[MsgFlags] = {}): T = T( version: YamuxVersion, msgType: MsgType.Data, @@ -121,11 +130,10 @@ proc data( ) proc windowUpdate( - T: type[YamuxHeader], - streamId: uint32, - delta: uint32, - flags: set[MsgFlags] = {}, - ): T = + T: type[YamuxHeader], + streamId: uint32, + delta: uint32, + flags: set[MsgFlags] = {}): T = T( version: YamuxVersion, msgType: MsgType.WindowUpdate, @@ -138,7 +146,7 @@ type ToSend = tuple data: seq[byte] sent: int - fut: Future[void] + fut: Future[void].Raising([CancelledError, LPStreamError]) YamuxChannel* = ref object of Connection id: uint32 recvWindow: int @@ -153,7 +161,7 @@ type recvQueue: seq[byte] isReset: bool remoteReset: bool - closedRemotely: Future[void] + closedRemotely: Future[void].Raising([]) closedLocally: bool receivedData: AsyncEvent returnedEof: bool @@ -162,7 +170,7 @@ proc `$`(channel: YamuxChannel): string = result = if channel.conn.dir == Out: "=> " else: "<= " result &= $channel.id var s: seq[string] = @[] - if channel.closedRemotely.done(): + if channel.closedRemotely.completed(): s.add("ClosedRemotely") if channel.closedLocally: s.add("ClosedLocally") @@ -184,17 +192,17 @@ proc lengthSendQueueWithLimit(channel: YamuxChannel): int = # 3 big messages if the peer is stalling. channel.sendQueue.foldl(a + min(b.data.len - b.sent, channel.maxSendQueueSize div 3), 0) -proc actuallyClose(channel: YamuxChannel) {.async.} = +proc actuallyClose(channel: YamuxChannel) {.async: (raises: []).} = if channel.closedLocally and channel.sendQueue.len == 0 and - channel.closedRemotely.done(): + channel.closedRemotely.completed(): await procCall Connection(channel).closeImpl() -proc remoteClosed(channel: YamuxChannel) {.async.} = - if not channel.closedRemotely.done(): +proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} = + if not channel.closedRemotely.completed(): channel.closedRemotely.complete() await channel.actuallyClose() -method closeImpl*(channel: YamuxChannel) {.async.} = +method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} = if not channel.closedLocally: channel.closedLocally = true channel.isEof = true @@ -204,7 +212,8 @@ method closeImpl*(channel: YamuxChannel) {.async.} = except CancelledError, LPStreamError: discard await channel.actuallyClose() -proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} = +proc reset( + channel: YamuxChannel, isLocal: bool = false) {.async: (raises: []).} = # If we reset locally, we want to flush up to a maximum of recvWindow # bytes. It's because the peer we're connected to can send us data before # it receives the reset. @@ -221,17 +230,18 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} = if not channel.closedLocally: if isLocal and not channel.isSending: try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Rst})) - except LPStreamEOFError as exc: discard - except LPStreamClosedError as exc: discard + except CancelledError, LPStreamError: discard await channel.close() - if not channel.closedRemotely.done(): + if not channel.closedRemotely.completed(): await channel.remoteClosed() channel.receivedData.fire() if not isLocal: # If the reset is remote, there's no reason to flush anything. channel.recvWindow = 0 -proc updateRecvWindow(channel: YamuxChannel) {.async.} = +proc updateRecvWindow( + channel: YamuxChannel +) {.async: (raises: [CancelledError, LPStreamError]).} = ## Send to the peer a window update when the recvWindow is empty enough ## # In order to avoid spamming a window update everytime a byte is read, @@ -249,14 +259,15 @@ proc updateRecvWindow(channel: YamuxChannel) {.async.} = trace "increasing the recvWindow", delta method readOnce*( - channel: YamuxChannel, - pbytes: pointer, - nbytes: int): - Future[int] {.async.} = + channel: YamuxChannel, + pbytes: pointer, + nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = ## Read from a yamux channel if channel.isReset: - raise if channel.remoteReset: + raise + if channel.remoteReset: newLPStreamResetError() elif channel.closedLocally: newLPStreamClosedError() @@ -269,7 +280,7 @@ method readOnce*( try: # https://github.com/status-im/nim-chronos/issues/516 discard await race(channel.closedRemotely, channel.receivedData.wait()) except ValueError: raiseAssert("Futures list is not empty") - if channel.closedRemotely.done() and channel.recvQueue.len == 0: + if channel.closedRemotely.completed() and channel.recvQueue.len == 0: channel.returnedEof = true channel.isEof = true return 0 @@ -277,7 +288,8 @@ method readOnce*( let toRead = min(channel.recvQueue.len, nbytes) var p = cast[ptr UncheckedArray[byte]](pbytes) - toOpenArray(p, 0, nbytes - 1)[0.. channel.maxSendQueueSize: - trace "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize, + trace "channel send queue too big, resetting", + maxSendQueueSize = channel.maxSendQueueSize, currentQueueSize = channel.lengthSendQueueWithLimit() - try: - await channel.reset(true) - except CatchableError as exc: - warn "failed to reset", msg=exc.msg + await channel.reset(isLocal = true) break let @@ -329,7 +343,7 @@ proc trySend(channel: YamuxChannel) {.async.} = sendBuffer[0..<12] = header.encode() - var futures: seq[Future[void]] + var futures: seq[Future[void].Raising([CancelledError, LPStreamError])] while inBuffer < toSend: # concatenate the different message we try to send into one buffer let (data, sent, fut) = channel.sendQueue[0] @@ -346,8 +360,15 @@ proc trySend(channel: YamuxChannel) {.async.} = trace "try to send the buffer", h = $header channel.sendWindow.dec(toSend) - try: await channel.conn.write(sendBuffer) - except CatchableError as exc: + try: + await channel.conn.write(sendBuffer) + except CancelledError: + trace "cancelled sending the buffer" + for fut in futures.items(): + fut.cancelSoon() + await channel.reset() + break + except LPStreamError as exc: trace "failed to send the buffer" let connDown = newLPStreamConnDownError(exc) for fut in futures.items(): @@ -358,7 +379,11 @@ proc trySend(channel: YamuxChannel) {.async.} = fut.complete() channel.activity = true -method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] = +method write*( + channel: YamuxChannel, + msg: seq[byte] +): Future[void] {.async: (raises: [ + CancelledError, LPStreamError], raw: true).} = ## Write to yamux channel ## result = newFuture[void]("Yamux Send") @@ -376,7 +401,9 @@ method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] = libp2p_yamux_send_queue.observe(channel.lengthSendQueue().int64) asyncSpawn channel.trySend() -proc open(channel: YamuxChannel) {.async.} = +proc open( + channel: YamuxChannel +) {.async: (raises: [CancelledError, LPStreamError]).} = ## Open a yamux channel by sending a window update with Syn or Ack flag ## if channel.opened: @@ -406,21 +433,28 @@ proc lenBySrc(m: Yamux, isSrc: bool): int = for v in m.channels.values(): if v.isSrc == isSrc: result += 1 -proc cleanupChannel(m: Yamux, channel: YamuxChannel) {.async.} = - await channel.join() +proc cleanupChannel(m: Yamux, channel: YamuxChannel) {.async: (raises: []).} = + try: + await channel.join() + except CancelledError: + discard m.channels.del(channel.id) when defined(libp2p_yamux_metrics): - libp2p_yamux_channels.set(m.lenBySrc(channel.isSrc).int64, [$channel.isSrc, $channel.peerId]) + libp2p_yamux_channels.set( + m.lenBySrc(channel.isSrc).int64, [$channel.isSrc, $channel.peerId]) if channel.isReset and channel.recvWindow > 0: m.flushed[channel.id] = channel.recvWindow -proc createStream(m: Yamux, id: uint32, isSrc: bool, - recvWindow: int, maxSendQueueSize: int): YamuxChannel = - # As you can see, during initialization, recvWindow can be larger than maxRecvWindow. +proc createStream( + m: Yamux, id: uint32, isSrc: bool, + recvWindow: int, maxSendQueueSize: int): YamuxChannel = + # During initialization, recvWindow can be larger than maxRecvWindow. # This is because the peer we're connected to will always assume # that the initial recvWindow is 256k. - # To solve this contradiction, no updateWindow will be sent until recvWindow is less - # than maxRecvWindow + # To solve this contradiction, no updateWindow will be sent until + # recvWindow is less than maxRecvWindow + proc newClosedRemotelyFut(): Future[void] {.async: (raises: [], raw: true).} = + newFuture[void]() var stream = YamuxChannel( id: id, maxRecvWindow: recvWindow, @@ -430,7 +464,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool, isSrc: isSrc, conn: m.connection, receivedData: newAsyncEvent(), - closedRemotely: newFuture[void]() + closedRemotely: newClosedRemotelyFut() ) stream.objName = "YamuxStream" if isSrc: @@ -439,9 +473,10 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool, else: stream.dir = Direction.In stream.timeout = m.inTimeout - stream.timeoutHandler = proc(): Future[void] {.gcsafe.} = - trace "Idle timeout expired, resetting YamuxChannel" - stream.reset(true) + stream.timeoutHandler = + proc(): Future[void] {.async: (raises: [], raw: true).} = + trace "Idle timeout expired, resetting YamuxChannel" + stream.reset(isLocal = true) stream.initStream() stream.peerId = m.connection.peerId stream.observedAddr = m.connection.observedAddr @@ -455,7 +490,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool, libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $stream.peerId]) return stream -method close*(m: Yamux) {.async.} = +method close*(m: Yamux) {.async: (raises: []).} = if m.isClosed == true: trace "Already closed" return @@ -464,24 +499,21 @@ method close*(m: Yamux) {.async.} = trace "Closing yamux" let channels = toSeq(m.channels.values()) for channel in channels: - await channel.reset(true) + await channel.reset(isLocal = true) try: await m.connection.write(YamuxHeader.goAway(NormalTermination)) - except CatchableError as exc: trace "failed to send goAway", msg=exc.msg + except CancelledError as exc: trace "cancelled sending goAway", msg = exc.msg + except LPStreamError as exc: trace "failed to send goAway", msg = exc.msg await m.connection.close() trace "Closed yamux" -proc handleStream(m: Yamux, channel: YamuxChannel) {.async.} = +proc handleStream(m: Yamux, channel: YamuxChannel) {.async: (raises: []).} = ## Call the muxer stream handler for this channel ## - try: - await m.streamHandler(channel) - trace "finished handling stream" - doAssert(channel.isClosed, "connection not closed by handler!") - except CatchableError as exc: - trace "Exception in yamux stream handler", msg = exc.msg - await channel.reset() - -method handle*(m: Yamux) {.async.} = + await m.streamHandler(channel) + trace "finished handling stream" + doAssert(channel.isClosed, "connection not closed by handler!") + +method handle*(m: Yamux) {.async: (raises: []).} = trace "Starting yamux handler", pid=m.connection.peerId try: while not m.connection.atEof: @@ -559,11 +591,24 @@ method handle*(m: Yamux) {.async.} = if MsgFlags.Rst in header.flags: trace "remote reset channel" await channel.reset() + except CancelledError as exc: + debug "Unexpected cancellation in yamux handler", msg = exc.msg except LPStreamEOFError as exc: trace "Stream EOF", msg = exc.msg + except LPStreamError as exc: + debug "Unexpected stream exception in yamux read loop", msg = exc.msg except YamuxError as exc: trace "Closing yamux connection", error=exc.msg - await m.connection.write(YamuxHeader.goAway(ProtocolError)) + try: + await m.connection.write(YamuxHeader.goAway(ProtocolError)) + except CancelledError, LPStreamError: + discard + except MuxerError as exc: + debug "Unexpected muxer exception in yamux read loop", msg = exc.msg + try: + await m.connection.write(YamuxHeader.goAway(ProtocolError)) + except CancelledError, LPStreamError: + discard finally: await m.close() trace "Stopped yamux handler" @@ -572,10 +617,11 @@ method getStreams*(m: Yamux): seq[Connection] = for c in m.channels.values: result.add(c) method newStream*( - m: Yamux, - name: string = "", - lazy: bool = false): Future[Connection] {.async.} = - + m: Yamux, + name: string = "", + lazy: bool = false +): Future[Connection] {.async: (raises: [ + CancelledError, LPStreamError, MuxerError]).} = if m.channels.len > m.maxChannCount - 1: raise newException(TooManyChannels, "max allowed channel count exceeded") let stream = m.createStream(m.currentId, true, m.windowSize, m.maxSendQueueSize) @@ -584,12 +630,13 @@ method newStream*( await stream.open() return stream -proc new*(T: type[Yamux], conn: Connection, - maxChannCount: int = MaxChannelCount, - windowSize: int = YamuxDefaultWindowSize, - maxSendQueueSize: int = MaxSendQueueSize, - inTimeout: Duration = 5.minutes, - outTimeout: Duration = 5.minutes): T = +proc new*( + T: type[Yamux], conn: Connection, + maxChannCount: int = MaxChannelCount, + windowSize: int = YamuxDefaultWindowSize, + maxSendQueueSize: int = MaxSendQueueSize, + inTimeout: Duration = 5.minutes, + outTimeout: Duration = 5.minutes): T = T( connection: conn, currentId: if conn.dir == Out: 1 else: 2, diff --git a/libp2p/protocols/connectivity/relay/rconn.nim b/libp2p/protocols/connectivity/relay/rconn.nim index 4f2732aac2..a4699a0773 100644 --- a/libp2p/protocols/connectivity/relay/rconn.nim +++ b/libp2p/protocols/connectivity/relay/rconn.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -23,11 +23,15 @@ type method readOnce*( self: RelayConnection, pbytes: pointer, - nbytes: int): Future[int] {.async.} = + nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError], raw: true).} = self.activity = true - return await self.conn.readOnce(pbytes, nbytes) + self.conn.readOnce(pbytes, nbytes) -method write*(self: RelayConnection, msg: seq[byte]): Future[void] {.async.} = +method write*( + self: RelayConnection, + msg: seq[byte] +): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} = self.dataSent.inc(msg.len) if self.limitData != 0 and self.dataSent > self.limitData: await self.close() @@ -35,25 +39,25 @@ method write*(self: RelayConnection, msg: seq[byte]): Future[void] {.async.} = self.activity = true await self.conn.write(msg) -method closeImpl*(self: RelayConnection): Future[void] {.async.} = +method closeImpl*(self: RelayConnection): Future[void] {.async: (raises: []).} = await self.conn.close() await procCall Connection(self).closeImpl() method getWrapped*(self: RelayConnection): Connection = self.conn proc new*( - T: typedesc[RelayConnection], - conn: Connection, - limitDuration: uint32, - limitData: uint64): T = + T: typedesc[RelayConnection], + conn: Connection, + limitDuration: uint32, + limitData: uint64): T = let rc = T(conn: conn, limitDuration: limitDuration, limitData: limitData) rc.dir = conn.dir rc.initStream() if limitDuration > 0: - proc checkDurationConnection() {.async.} = - let sleep = sleepAsync(limitDuration.seconds()) - await sleep or conn.join() - if sleep.finished: await conn.close() - else: sleep.cancel() + proc checkDurationConnection() {.async: (raises: []).} = + try: + await noCancel conn.join().wait(limitDuration.seconds()) + except AsyncTimeoutError: + await conn.close() asyncSpawn checkDurationConnection() return rc diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index 1d8e801a27..a5a2b8c168 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -99,10 +99,10 @@ type func shortLog*(conn: NoiseConnection): auto = try: - if conn.isNil: "NoiseConnection(nil)" + if conn == nil: "NoiseConnection(nil)" else: &"{shortLog(conn.peerId)}:{conn.oid}" except ValueError as exc: - raise newException(Defect, exc.msg) + raiseAssert(exc.msg) chronicles.formatIt(NoiseConnection): shortLog(it) @@ -112,7 +112,7 @@ proc genKeyPair(rng: var HmacDrbgContext): KeyPair = proc hashProtocol(name: string): MDigest[256] = # If protocol_name is less than or equal to HASHLEN bytes in length, - # sets h equal to protocol_name with zero bytes appended to make HASHLEN bytes. + # sets h to protocol_name with zero bytes appended to make HASHLEN bytes. # Otherwise sets h = HASH(protocol_name). if name.len <= 32: @@ -301,7 +301,9 @@ template read_s: untyped = msg.consume(rsLen) -proc readFrame(sconn: Connection): Future[seq[byte]] {.async.} = +proc readFrame( + sconn: Connection +): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} = var besize {.noinit.}: array[2, byte] await sconn.readExactly(addr besize[0], besize.len) let size = uint16.fromBytesBE(besize).int @@ -426,7 +428,9 @@ proc handshakeXXInbound( finally: burnMem(hs) -method readMessage*(sconn: NoiseConnection): Future[seq[byte]] {.async.} = +method readMessage*( + sconn: NoiseConnection +): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} = while true: # Discard 0-length payloads let frame = await sconn.stream.readFrame() sconn.activity = true @@ -458,7 +462,11 @@ proc encryptFrame( cipherFrame[2 + src.len().. 0, "nbytes must be positive integer") if s.isEof: @@ -174,7 +179,7 @@ method readOnce*(s: SecureConn, raise err except CancelledError as exc: raise exc - except CatchableError as err: + except LPStreamError as err: debug "Error while reading message from secure connection, closing.", error = err.name, message = err.msg, diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 558cf2df45..0561ee63ec 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -34,10 +34,10 @@ type func shortLog*(s: BufferStream): auto = try: - if s.isNil: "BufferStream(nil)" + if s == nil: "BufferStream(nil)" else: &"{shortLog(s.peerId)}:{s.oid}" except ValueError as exc: - raise newException(Defect, exc.msg) + raiseAssert(exc.msg) chronicles.formatIt(BufferStream): shortLog(it) @@ -55,14 +55,16 @@ method initStream*(s: BufferStream) = trace "BufferStream created", s proc new*( - T: typedesc[BufferStream], - timeout: Duration = DefaultConnectionTimeout): T = - + T: typedesc[BufferStream], + timeout: Duration = DefaultConnectionTimeout): T = let bufferStream = T(timeout: timeout) bufferStream.initStream() bufferStream -method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} = +method pushData*( + s: BufferStream, + data: seq[byte] +) {.base, async: (raises: [CancelledError, LPStreamError]).} = ## Write bytes to internal read buffer, use this to fill up the ## buffer with data. ## @@ -70,7 +72,7 @@ method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} = ## doAssert(not s.pushing, - &"Only one concurrent push allowed for stream {s.shortLog()}") + "Only one concurrent push allowed for stream " & s.shortLog()) if s.isClosed or s.pushedEof: raise newLPStreamClosedError() @@ -87,12 +89,14 @@ method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} = finally: s.pushing = false -method pushEof*(s: BufferStream) {.base, async.} = +method pushEof*( + s: BufferStream +) {.base, async: (raises: [CancelledError, LPStreamError]).} = if s.pushedEof: return doAssert(not s.pushing, - &"Only one concurrent push allowed for stream {s.shortLog()}") + "Only one concurrent push allowed for stream " & s.shortLog()) s.pushedEof = true @@ -108,13 +112,14 @@ method pushEof*(s: BufferStream) {.base, async.} = method atEof*(s: BufferStream): bool = s.isEof and s.readBuf.len == 0 -method readOnce*(s: BufferStream, - pbytes: pointer, - nbytes: int): - Future[int] {.async.} = +method readOnce*( + s: BufferStream, + pbytes: pointer, + nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = doAssert(nbytes > 0, "nbytes must be positive integer") doAssert(not s.reading, - &"Only one concurrent read allowed for stream {s.shortLog()}") + "Only one concurrent read allowed for stream " & s.shortLog()) if s.returnedEof: raise newLPStreamEOFError() @@ -135,13 +140,6 @@ method readOnce*(s: BufferStream, # Not very efficient, but shouldn't happen often s.readBuf.assign(@(p.toOpenArray(0, rbytes - 1)) & @(s.readBuf.data)) raise exc - except CatchableError as exc: - # When an exception happens here, the Bufferstream is effectively - # broken and no more reads will be valid - for now, return EOF if it's - # called again, though this is not completely true - EOF represents an - # "orderly" shutdown and that's not what happened here.. - s.returnedEof = true - raise exc finally: s.reading = false @@ -173,7 +171,8 @@ method readOnce*(s: BufferStream, return rbytes -method closeImpl*(s: BufferStream): Future[void] = +method closeImpl*( + s: BufferStream): Future[void] {.async: (raises: [], raw: true).} = ## close the stream and clear the buffer trace "Closing BufferStream", s, len = s.len @@ -209,8 +208,8 @@ method closeImpl*(s: BufferStream): Future[void] = if not s.readQueue.empty(): discard s.readQueue.popFirstNoWait() except AsyncQueueFullError, AsyncQueueEmptyError: - raise newException(Defect, getCurrentExceptionMsg()) + raiseAssert(getCurrentExceptionMsg()) trace "Closed BufferStream", s - procCall Connection(s).closeImpl() # noraises, nocancels + procCall Connection(s).closeImpl() diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 17a1ef3c3b..2076a99e6d 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -31,18 +31,22 @@ type tracked: bool when defined(libp2p_agents_metrics): - declareGauge(libp2p_peers_identity, "peers identities", labels = ["agent"]) - declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"]) - declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"]) + declareGauge libp2p_peers_identity, + "peers identities", labels = ["agent"] + declareCounter libp2p_peers_traffic_read, + "incoming traffic", labels = ["agent"] + declareCounter libp2p_peers_traffic_write, + "outgoing traffic", labels = ["agent"] -declareCounter(libp2p_network_bytes, "total traffic", labels = ["direction"]) +declareCounter libp2p_network_bytes, + "total traffic", labels = ["direction"] func shortLog*(conn: ChronosStream): auto = try: - if conn.isNil: "ChronosStream(nil)" + if conn == nil: "ChronosStream(nil)" else: &"{shortLog(conn.peerId)}:{conn.oid}" except ValueError as exc: - raise newException(Defect, exc.msg) + raiseAssert(exc.msg) chronicles.formatIt(ChronosStream): shortLog(it) @@ -50,17 +54,18 @@ method initStream*(s: ChronosStream) = if s.objName.len == 0: s.objName = ChronosStreamTrackerName - s.timeoutHandler = proc() {.async.} = + s.timeoutHandler = proc(): Future[void] {.async: (raises: [], raw: true).} = trace "Idle timeout expired, closing ChronosStream", s - await s.close() + s.close() procCall Connection(s).initStream() -proc init*(C: type ChronosStream, - client: StreamTransport, - dir: Direction, - timeout = DefaultChronosStreamTimeout, - observedAddr: Opt[MultiAddress]): ChronosStream = +proc init*( + C: type ChronosStream, + client: StreamTransport, + dir: Direction, + timeout = DefaultChronosStreamTimeout, + observedAddr: Opt[MultiAddress]): ChronosStream = result = C(client: client, timeout: timeout, dir: dir, @@ -94,7 +99,11 @@ when defined(libp2p_agents_metrics): libp2p_peers_identity.dec(labelValues = [s.shortAgent]) s.tracked = false -method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = +method readOnce*( + s: ChronosStream, + pbytes: pointer, + nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = if s.atEof: raise newLPStreamEOFError() withExceptions: @@ -107,7 +116,10 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {. libp2p_peers_traffic_read.inc(result.int64, labelValues = [s.shortAgent]) proc completeWrite( - s: ChronosStream, fut: Future[int], msgLen: int): Future[void] {.async.} = + s: ChronosStream, + fut: Future[int].Raising([TransportError, CancelledError]), + msgLen: int +): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} = withExceptions: # StreamTransport will only return written < msg.len on fatal failures where # further writing is not possible - in such cases, we'll raise here, @@ -124,7 +136,11 @@ proc completeWrite( if s.tracked: libp2p_peers_traffic_write.inc(msgLen.int64, labelValues = [s.shortAgent]) -method write*(s: ChronosStream, msg: seq[byte]): Future[void] = +method write*( + s: ChronosStream, + msg: seq[byte] +): Future[void] {.async: (raises: [ + CancelledError, LPStreamError], raw: true).} = # Avoid a copy of msg being kept in the closure created by `{.async.}` as this # drives up memory usage if msg.len == 0: @@ -145,19 +161,14 @@ method closed*(s: ChronosStream): bool = method atEof*(s: ChronosStream): bool = s.client.atEof() -method closeImpl*(s: ChronosStream) {.async.} = - try: - trace "Shutting down chronos stream", address = $s.client.remoteAddress(), s - - if not s.client.closed(): - await s.client.closeWait() +method closeImpl*( + s: ChronosStream) {.async: (raises: []).} = + trace "Shutting down chronos stream", address = $s.client.remoteAddress(), s - trace "Shutdown chronos stream", address = $s.client.remoteAddress(), s + if not s.client.closed(): + await s.client.closeWait() - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "Error closing chronosstream", s, msg = exc.msg + trace "Shutdown chronos stream", address = $s.client.remoteAddress(), s when defined(libp2p_agents_metrics): # do this after closing! diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index d9cfe34526..d33d78a958 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -27,25 +27,25 @@ const DefaultConnectionTimeout* = 5.minutes type - TimeoutHandler* = proc(): Future[void] {.gcsafe, raises: [].} + TimeoutHandler* = proc(): Future[void] {.async: (raises: []).} Connection* = ref object of LPStream - activity*: bool # reset every time data is sent or received - timeout*: Duration # channel timeout if no activity - timerTaskFut: Future[void] # the current timer instance + activity*: bool # reset every time data is sent or received + timeout*: Duration # channel timeout if no activity + timerTaskFut: Future[void].Raising([]) # the current timer instance timeoutHandler*: TimeoutHandler # timeout handler peerId*: PeerId observedAddr*: Opt[MultiAddress] - protocol*: string # protocol used by the connection, used as tag for metrics - transportDir*: Direction # The bottom level transport (generally the socket) direction + protocol*: string # protocol used by the connection, used as metrics tag + transportDir*: Direction # underlying transport (usually socket) direction when defined(libp2p_agents_metrics): shortAgent*: string -proc timeoutMonitor(s: Connection) {.async.} +proc timeoutMonitor(s: Connection) {.async: (raises: []).} func shortLog*(conn: Connection): string = try: - if conn.isNil: "Connection(nil)" + if conn == nil: "Connection(nil)" else: &"{shortLog(conn.peerId)}:{conn.oid}" except ValueError as exc: raiseAssert(exc.msg) @@ -58,23 +58,28 @@ method initStream*(s: Connection) = procCall LPStream(s).initStream() - doAssert(isNil(s.timerTaskFut)) + doAssert(s.timerTaskFut == nil) if s.timeout > 0.millis: trace "Monitoring for timeout", s, timeout = s.timeout s.timerTaskFut = s.timeoutMonitor() - if isNil(s.timeoutHandler): - s.timeoutHandler = proc(): Future[void] = - trace "Idle timeout expired, closing connection", s - s.close() + if s.timeoutHandler == nil: + s.timeoutHandler = + proc(): Future[void] {.async: (raises: [], raw: true).} = + trace "Idle timeout expired, closing connection", s + s.close() -method closeImpl*(s: Connection): Future[void] = +method closeImpl*(s: Connection): Future[void] {.async: (raises: []).} = # Cleanup timeout timer trace "Closing connection", s - if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished: - s.timerTaskFut.cancel() + if s.timerTaskFut != nil and not s.timerTaskFut.finished: + # Don't `cancelAndWait` here to avoid risking deadlock in this scenario: + # - `pollActivity` is waiting for `s.timeoutHandler` to complete. + # - `s.timeoutHandler` may have triggered `closeImpl` and we are now here. + # In this situation, we have to return for `s.timerTaskFut` to complete. + s.timerTaskFut.cancelSoon() s.timerTaskFut = nil trace "Closed connection", s @@ -84,7 +89,7 @@ method closeImpl*(s: Connection): Future[void] = func hash*(p: Connection): Hash = cast[pointer](p).hash -proc pollActivity(s: Connection): Future[bool] {.async.} = +proc pollActivity(s: Connection): Future[bool] {.async: (raises: []).} = if s.closed and s.atEof: return false # Done, no more monitoring @@ -95,22 +100,13 @@ proc pollActivity(s: Connection): Future[bool] {.async.} = # Inactivity timeout happened, call timeout monitor trace "Connection timed out", s - if not(isNil(s.timeoutHandler)): + if s.timeoutHandler != nil: trace "Calling timeout handler", s - - try: - await s.timeoutHandler() - except CancelledError: - # timeoutHandler is expected to be fast, but it's still possible that - # cancellation will happen here - no need to warn about it - we do want to - # stop the polling however - debug "Timeout handler cancelled", s - except CatchableError as exc: # Shouldn't happen - warn "exception in timeout handler", s, exc = exc.msg + await s.timeoutHandler() return false -proc timeoutMonitor(s: Connection) {.async.} = +proc timeoutMonitor(s: Connection) {.async: (raises: []).} = ## monitor the channel for inactivity ## ## if the timeout was hit, it means that @@ -129,21 +125,22 @@ proc timeoutMonitor(s: Connection) {.async.} = return method getWrapped*(s: Connection): Connection {.base.} = - doAssert(false, "not implemented!") + raiseAssert("Not implemented!") when defined(libp2p_agents_metrics): proc setShortAgent*(s: Connection, shortAgent: string) = var conn = s - while not isNil(conn): + while conn != nil: conn.shortAgent = shortAgent conn = conn.getWrapped() -proc new*(C: type Connection, - peerId: PeerId, - dir: Direction, - observedAddr: Opt[MultiAddress], - timeout: Duration = DefaultConnectionTimeout, - timeoutHandler: TimeoutHandler = nil): Connection = +proc new*( + C: type Connection, + peerId: PeerId, + dir: Direction, + observedAddr: Opt[MultiAddress], + timeout: Duration = DefaultConnectionTimeout, + timeoutHandler: TimeoutHandler = nil): Connection = result = C(peerId: peerId, dir: dir, timeout: timeout, diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index 67b89e7f86..60a9da4467 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -23,8 +23,8 @@ import ../varint, export errors -declareGauge(libp2p_open_streams, - "open stream instances", labels = ["type", "dir"]) +declareGauge libp2p_open_streams, + "open stream instances", labels = ["type", "dir"] export oids @@ -98,8 +98,9 @@ proc newLPStreamConnDownError*( parentException) func shortLog*(s: LPStream): auto = - if s.isNil: "LPStream(nil)" + if s == nil: "LPStream(nil)" else: $s.oid + chronicles.formatIt(LPStream): shortLog(it) method initStream*(s: LPStream) {.base.} = @@ -126,19 +127,21 @@ method atEof*(s: LPStream): bool {.base, public.} = s.isEof method readOnce*( - s: LPStream, - pbytes: pointer, - nbytes: int): - Future[int] {.base, async, public.} = + s: LPStream, + pbytes: pointer, + nbytes: int +): Future[int] {.base, async: (raises: [ + CancelledError, LPStreamError], raw: true), public.} = ## Reads whatever is available in the stream, ## up to `nbytes`. Will block if nothing is ## available - doAssert(false, "not implemented!") + raiseAssert("Not implemented!") -proc readExactly*(s: LPStream, - pbytes: pointer, - nbytes: int): - Future[void] {.async, public.} = +proc readExactly*( + s: LPStream, + pbytes: pointer, + nbytes: int +): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} = ## Waits for `nbytes` to be available, then read ## them and return them if s.atEof: @@ -172,10 +175,11 @@ proc readExactly*(s: LPStream, trace "couldn't read all bytes, incomplete data", s, nbytes, read raise newLPStreamIncompleteError() -proc readLine*(s: LPStream, - limit = 0, - sep = "\r\n"): Future[string] - {.async, public.} = +proc readLine*( + s: LPStream, + limit = 0, + sep = "\r\n" +): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} = ## Reads up to `limit` bytes are read, or a `sep` is found # TODO replace with something that exploits buffering better var lim = if limit <= 0: -1 else: limit @@ -201,7 +205,9 @@ proc readLine*(s: LPStream, if len(result) == lim: break -proc readVarint*(conn: LPStream): Future[uint64] {.async, public.} = +proc readVarint*( + conn: LPStream +): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} = var buffer: array[10, byte] @@ -219,7 +225,11 @@ proc readVarint*(conn: LPStream): Future[uint64] {.async, public.} = if true: # can't end with a raise apparently raise (ref InvalidVarintError)(msg: "Cannot parse varint") -proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, public.} = +proc readLp*( + s: LPStream, + maxSize: int +): Future[seq[byte]] {.async: (raises: [ + CancelledError, LPStreamError]), public.} = ## read length prefixed msg, with the length encoded as a varint let length = await s.readVarint() @@ -233,13 +243,21 @@ proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, public.} = var res = newSeqUninitialized[byte](length) await s.readExactly(addr res[0], res.len) - return res + res -method write*(s: LPStream, msg: seq[byte]): Future[void] {.base, public.} = +method write*( + s: LPStream, + msg: seq[byte] +): Future[void] {.async: (raises: [ + CancelledError, LPStreamError], raw: true), base, public.} = # Write `msg` to stream, waiting for the write to be finished - doAssert(false, "not implemented!") + raiseAssert("Not implemented!") -proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] {.public.} = +proc writeLp*( + s: LPStream, + msg: openArray[byte] +): Future[void] {.async: (raises: [ + CancelledError, LPStreamError], raw: true), public.} = ## Write `msg` with a varint-encoded length prefix let vbytes = PB.toBytes(msg.len().uint64) var buf = newSeqUninitialized[byte](msg.len() + vbytes.len) @@ -247,35 +265,53 @@ proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] {.public.} = buf[vbytes.len.. 0: - numberOfRead.inc() - await conn.close() + try: + var buffer: array[25600, byte] + while (await conn.readOnce(addr buffer[0], 25600)) > 0: + numberOfRead.inc() + except CancelledError, LPStreamError: + return + finally: + await conn.close() handlerBlocker.complete() let streamA = await yamuxa.newStream() @@ -80,12 +91,16 @@ suite "Yamux": suite "Window exhaustion": asyncTest "Basic exhaustion blocking": mSetup() - let readerBlocker = newFuture[void]() - yamuxb.streamHandler = proc(conn: Connection) {.async.} = + let readerBlocker = newBlockerFut() + yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} = await readerBlocker - var buffer: array[160000, byte] - discard await conn.readOnce(addr buffer[0], 160000) - await conn.close() + try: + var buffer: array[160000, byte] + discard await conn.readOnce(addr buffer[0], 160000) + except CancelledError, LPStreamError: + return + finally: + await conn.close() let streamA = await yamuxa.newStream() check streamA == yamuxa.getStreams()[0] @@ -103,12 +118,16 @@ suite "Yamux": asyncTest "Exhaustion doesn't block other channels": mSetup() - let readerBlocker = newFuture[void]() - yamuxb.streamHandler = proc(conn: Connection) {.async.} = + let readerBlocker = newBlockerFut() + yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} = await readerBlocker - var buffer: array[160000, byte] - discard await conn.readOnce(addr buffer[0], 160000) - await conn.close() + try: + var buffer: array[160000, byte] + discard await conn.readOnce(addr buffer[0], 160000) + except CancelledError, LPStreamError: + return + finally: + await conn.close() let streamA = await yamuxa.newStream() check streamA == yamuxa.getStreams()[0] @@ -120,10 +139,14 @@ suite "Yamux": # Now that the secondWriter is stuck, create a second stream # and exchange some data - yamuxb.streamHandler = proc(conn: Connection) {.async.} = - check (await conn.readLp(100)) == fromHex("1234") - await conn.writeLp(fromHex("5678")) - await conn.close() + yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} = + try: + check (await conn.readLp(100)) == fromHex("1234") + await conn.writeLp(fromHex("5678")) + except CancelledError, LPStreamError: + return + finally: + await conn.close() let streamB = await yamuxa.newStream() await streamB.writeLp(fromHex("1234")) @@ -138,15 +161,19 @@ suite "Yamux": asyncTest "Can set custom window size": mSetup() - let writerBlocker = newFuture[void]() + let writerBlocker = newBlockerFut() var numberOfRead = 0 - yamuxb.streamHandler = proc(conn: Connection) {.async.} = + yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} = YamuxChannel(conn).setMaxRecvWindow(20) - var buffer: array[256000, byte] - while (await conn.readOnce(addr buffer[0], 256000)) > 0: - numberOfRead.inc() - writerBlocker.complete() - await conn.close() + try: + var buffer: array[256000, byte] + while (await conn.readOnce(addr buffer[0], 256000)) > 0: + numberOfRead.inc() + writerBlocker.complete() + except CancelledError, LPStreamError: + return + finally: + await conn.close() let streamA = await yamuxa.newStream() check streamA == yamuxa.getStreams()[0] @@ -163,12 +190,16 @@ suite "Yamux": asyncTest "Saturate until reset": mSetup() - let writerBlocker = newFuture[void]() - yamuxb.streamHandler = proc(conn: Connection) {.async.} = + let writerBlocker = newBlockerFut() + yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} = await writerBlocker - var buffer: array[256, byte] - check: (await conn.readOnce(addr buffer[0], 256)) == 0 - await conn.close() + try: + var buffer: array[256, byte] + check: (await conn.readOnce(addr buffer[0], 256)) == 0 + except CancelledError, LPStreamError: + return + finally: + await conn.close() let streamA = await yamuxa.newStream() check streamA == yamuxa.getStreams()[0] @@ -184,12 +215,16 @@ suite "Yamux": asyncTest "Increase window size": mSetup(512000) - let readerBlocker = newFuture[void]() - yamuxb.streamHandler = proc(conn: Connection) {.async.} = + let readerBlocker = newBlockerFut() + yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} = await readerBlocker - var buffer: array[260000, byte] - discard await conn.readOnce(addr buffer[0], 260000) - await conn.close() + try: + var buffer: array[260000, byte] + discard await conn.readOnce(addr buffer[0], 260000) + except CancelledError, LPStreamError: + return + finally: + await conn.close() let streamA = await yamuxa.newStream() check streamA == yamuxa.getStreams()[0] @@ -207,17 +242,20 @@ suite "Yamux": asyncTest "Reduce window size": mSetup(64000) - let readerBlocker1 = newFuture[void]() - let readerBlocker2 = newFuture[void]() - yamuxb.streamHandler = proc(conn: Connection) {.async.} = - await readerBlocker1 - var buffer: array[256000, byte] - # For the first roundtrip, the send window size is assumed to be 256k - discard await conn.readOnce(addr buffer[0], 256000) - await readerBlocker2 - discard await conn.readOnce(addr buffer[0], 40000) - - await conn.close() + let readerBlocker1 = newBlockerFut() + let readerBlocker2 = newBlockerFut() + yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} = + try: + await readerBlocker1 + var buffer: array[256000, byte] + # For the first roundtrip, the send window size is assumed to be 256k + discard await conn.readOnce(addr buffer[0], 256000) + await readerBlocker2 + discard await conn.readOnce(addr buffer[0], 40000) + except CancelledError, LPStreamError: + return + finally: + await conn.close() let streamA = await yamuxa.newStream() check streamA == yamuxa.getStreams()[0] @@ -242,15 +280,18 @@ suite "Yamux": suite "Timeout testing": asyncTest "Check if InTimeout close both streams correctly": mSetup(inTo = 1.seconds) - let blocker = newFuture[void]() - let connBlocker = newFuture[void]() - - yamuxb.streamHandler = proc(conn: Connection) {.async.} = - check (await conn.readLp(100)) == fromHex("1234") - await conn.writeLp(fromHex("5678")) - await blocker - check conn.isClosed - connBlocker.complete() + let blocker = newBlockerFut() + let connBlocker = newBlockerFut() + + yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} = + try: + check (await conn.readLp(100)) == fromHex("1234") + await conn.writeLp(fromHex("5678")) + await blocker + check conn.isClosed + connBlocker.complete() + except CancelledError, LPStreamError: + await conn.close() let streamA = await yamuxa.newStream() check streamA == yamuxa.getStreams()[0] @@ -265,15 +306,18 @@ suite "Yamux": asyncTest "Check if OutTimeout close both streams correctly": mSetup(outTo = 1.seconds) - let blocker = newFuture[void]() - let connBlocker = newFuture[void]() - - yamuxb.streamHandler = proc(conn: Connection) {.async.} = - check (await conn.readLp(100)) == fromHex("1234") - await conn.writeLp(fromHex("5678")) - await blocker - check conn.isClosed - connBlocker.complete() + let blocker = newBlockerFut() + let connBlocker = newBlockerFut() + + yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} = + try: + check (await conn.readLp(100)) == fromHex("1234") + await conn.writeLp(fromHex("5678")) + await blocker + check conn.isClosed + connBlocker.complete() + except CancelledError, LPStreamError: + await conn.close() let streamA = await yamuxa.newStream() check streamA == yamuxa.getStreams()[0] @@ -290,11 +334,18 @@ suite "Yamux": asyncTest "Local & Remote close": mSetup() - yamuxb.streamHandler = proc(conn: Connection) {.async.} = - check (await conn.readLp(100)) == fromHex("1234") - await conn.close() + yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} = + try: + check (await conn.readLp(100)) == fromHex("1234") + except CancelledError, LPStreamError: + return + finally: + await conn.close() expect LPStreamClosedError: await conn.writeLp(fromHex("102030")) - check (await conn.readLp(100)) == fromHex("5678") + try: + check (await conn.readLp(100)) == fromHex("5678") + except CancelledError, LPStreamError: + return let streamA = await yamuxa.newStream() check streamA == yamuxa.getStreams()[0] @@ -306,13 +357,17 @@ suite "Yamux": asyncTest "Local & Remote reset": mSetup() - let blocker = newFuture[void]() + let blocker = newBlockerFut() - yamuxb.streamHandler = proc(conn: Connection) {.async.} = + yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} = await blocker - expect LPStreamResetError: discard await conn.readLp(100) - expect LPStreamResetError: await conn.writeLp(fromHex("1234")) - await conn.close() + try: + expect LPStreamResetError: discard await conn.readLp(100) + expect LPStreamResetError: await conn.writeLp(fromHex("1234")) + except CancelledError, LPStreamError: + return + finally: + await conn.close() let streamA = await yamuxa.newStream() check streamA == yamuxa.getStreams()[0]