Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add {.async: (raises).} to libp2p/stream modules #1050

Merged
merged 9 commits into from
Mar 5, 2024
29 changes: 19 additions & 10 deletions libp2p/muxers/mplex/coder.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
Expand Down Expand Up @@ -42,7 +42,10 @@ const MaxMsgSize* = 1 shl 20 # 1mb
proc newInvalidMplexMsgType*(): ref InvalidMplexMsgType =
newException(InvalidMplexMsgType, "invalid message type")

proc readMsg*(conn: Connection): Future[Msg] {.async.} =
proc readMsg*(
conn: Connection
): Future[Msg] {.async: (raises: [
CancelledError, LPStreamError, MuxerError]).} =
let header = await conn.readVarint()
trace "read header varint", varint = header, conn

Expand All @@ -55,10 +58,13 @@ proc readMsg*(conn: Connection): Future[Msg] {.async.} =

return (header shr 3, MessageType(msgType), data)

proc writeMsg*(conn: Connection,
id: uint64,
msgType: MessageType,
data: seq[byte] = @[]): Future[void] =
proc writeMsg*(
conn: Connection,
id: uint64,
msgType: MessageType,
data: seq[byte] = @[]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
var
left = data.len
offset = 0
Expand All @@ -84,8 +90,11 @@ proc writeMsg*(conn: Connection,
# message gets written before some of the chunks
conn.write(buf.buffer)

proc writeMsg*(conn: Connection,
id: uint64,
msgType: MessageType,
data: string): Future[void] =
proc writeMsg*(
conn: Connection,
id: uint64,
msgType: MessageType,
data: string
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
conn.writeMsg(id, msgType, data.toBytes())
88 changes: 49 additions & 39 deletions libp2p/muxers/mplex/lpchannel.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
Expand Down Expand Up @@ -28,7 +28,8 @@
declareHistogram libp2p_mplex_qtime, "message queuing time"

when defined(libp2p_network_protocols_metrics):
declareCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"]
declareCounter libp2p_protocols_bytes,
"total sent or received bytes", ["protocol", "direction"]

## Channel half-closed states
##
Expand Down Expand Up @@ -64,16 +65,16 @@

func shortLog*(s: LPChannel): auto =
try:
if s.isNil: "LPChannel(nil)"
if s == nil: "LPChannel(nil)"

Check warning on line 68 in libp2p/muxers/mplex/lpchannel.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/muxers/mplex/lpchannel.nim#L68

Added line #L68 was not covered by tests
elif s.name != $s.oid and s.name.len > 0:
&"{shortLog(s.conn.peerId)}:{s.oid}:{s.name}"
else: &"{shortLog(s.conn.peerId)}:{s.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)

Check warning on line 73 in libp2p/muxers/mplex/lpchannel.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/muxers/mplex/lpchannel.nim#L73

Added line #L73 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@etan-status should we be raising a Defect here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raiseAssert raises a Defect :)


chronicles.formatIt(LPChannel): shortLog(it)

proc open*(s: LPChannel) {.async.} =
proc open*(s: LPChannel) {.async: (raises: [CancelledError, LPStreamError]).} =
trace "Opening channel", s, conn = s.conn
if s.conn.isClosed:
return
Expand All @@ -82,20 +83,20 @@
s.isOpen = true
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
await s.conn.close()
raise exc

method closed*(s: LPChannel): bool =
s.closedLocal

proc closeUnderlying(s: LPChannel): Future[void] {.async.} =
proc closeUnderlying(s: LPChannel): Future[void] {.async: (raises: []).} =
## Channels may be closed for reading and writing in any order - we'll close
## the underlying bufferstream when both directions are closed
if s.closedLocal and s.atEof():
await procCall BufferStream(s).close()

proc reset*(s: LPChannel) {.async.} =
proc reset*(s: LPChannel) {.async: (raises: []).} =
if s.isClosed:
trace "Already closed", s
return
Expand All @@ -108,22 +109,21 @@

if s.isOpen and not s.conn.isClosed:
# If the connection is still active, notify the other end
proc resetMessage() {.async.} =
proc resetMessage() {.async: (raises: []).} =
try:
trace "sending reset message", s, conn = s.conn
await s.conn.writeMsg(s.id, s.resetCode) # write reset
except CatchableError as exc:
# No cancellations
await s.conn.close()
await noCancel s.conn.writeMsg(s.id, s.resetCode) # write reset
except LPStreamError as exc:
trace "Can't send reset message", s, conn = s.conn, msg = exc.msg
await s.conn.close()

asyncSpawn resetMessage()

await s.closeImpl() # noraises, nocancels
await s.closeImpl()

trace "Channel reset", s

method close*(s: LPChannel) {.async.} =
method close*(s: LPChannel) {.async: (raises: []).} =
## Close channel for writing - a message will be sent to the other peer
## informing them that the channel is closed and that we're waiting for
## their acknowledgement.
Expand All @@ -137,10 +137,9 @@
if s.isOpen and not s.conn.isClosed:
try:
await s.conn.writeMsg(s.id, s.closeCode) # write close
except CancelledError as exc:
except CancelledError:
await s.conn.close()
raise exc
except CatchableError as exc:
except LPStreamError as exc:
# It's harmless that close message cannot be sent - the connection is
# likely down already
await s.conn.close()
Expand All @@ -154,16 +153,17 @@
if s.objName.len == 0:
s.objName = LPChannelTrackerName

s.timeoutHandler = proc(): Future[void] {.gcsafe.} =
s.timeoutHandler = proc(): Future[void] {.async: (raises: [], raw: true).} =
trace "Idle timeout expired, resetting LPChannel", s
s.reset()

procCall BufferStream(s).initStream()

method readOnce*(s: LPChannel,
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
method readOnce*(
s: LPChannel,
pbytes: pointer,
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
## Mplex relies on reading being done regularly from every channel, or all
## channels are blocked - in particular, this means that reading from one
## channel must not be done from within a callback / read handler of another
Expand All @@ -186,15 +186,19 @@
if bytes == 0:
await s.closeUnderlying()
return bytes
except CatchableError as exc:
# readOnce in BufferStream generally raises on EOF or cancellation - for
# the former, resetting is harmless, for the latter it's necessary because
# data has been lost in s.readBuf and there's no way to gracefully recover /
# use the channel any more
except CancelledError as exc:
await s.reset()
raise exc
except LPStreamError as exc:
# Resetting is necessary because data has been lost in s.readBuf and
# there's no way to gracefully recover / use the channel any more
await s.reset()
raise newLPStreamConnDownError(exc)

proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
proc prepareWrite(
s: LPChannel,
msg: seq[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
# prepareWrite is the slow path of writing a message - see conditions in
# write
if s.remoteReset:
Expand Down Expand Up @@ -222,7 +226,10 @@
await s.conn.writeMsg(s.id, s.msgCode, msg)

proc completeWrite(
s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} =
s: LPChannel,
fut: Future[void].Raising([CancelledError, LPStreamError]),
msgLen: int
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
try:
s.writes += 1

Expand Down Expand Up @@ -250,15 +257,19 @@
raise exc
except LPStreamEOFError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
trace "exception in lpchannel write handler", s, msg = exc.msg
await s.reset()
await s.conn.close()
raise newLPStreamConnDownError(exc)
finally:
s.writes -= 1

method write*(s: LPChannel, msg: seq[byte]): Future[void] =
method write*(
s: LPChannel,
msg: seq[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
## Write to mplex channel - there may be up to MaxWrite concurrent writes
## pending after which the peer is disconnected

Expand All @@ -279,13 +290,12 @@
method getWrapped*(s: LPChannel): Connection = s.conn

proc init*(
L: type LPChannel,
id: uint64,
conn: Connection,
initiator: bool,
name: string = "",
timeout: Duration = DefaultChanTimeout): LPChannel =

L: type LPChannel,
id: uint64,
conn: Connection,
initiator: bool,
name: string = "",
timeout: Duration = DefaultChanTimeout): LPChannel =
let chann = L(
id: id,
name: name,
Expand Down
Loading
Loading