Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core/swarm] Emit events for active connection close and fix disconnect(). #1619

Merged
merged 16 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
- [`parity-multiaddr` CHANGELOG](misc/multiaddr/CHANGELOG.md)
- [`libp2p-core-derive` CHANGELOG](misc/core-derive/CHANGELOG.md)

# Version 0.24.0 [unreleased]

- Update `libp2p-core`, `libp2p-swarm` and dependent crates.

# Version 0.23.0 (2020-08-03)

**NOTE**: For a smooth upgrade path from `0.21` to `> 0.22`
Expand Down
42 changes: 21 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ atomic = "0.4.6"
bytes = "0.5"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.20.0", path = "core" }
libp2p-core-derive = { version = "0.20.0", path = "misc/core-derive" }
libp2p-floodsub = { version = "0.20.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.20.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.20.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.21.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.20.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.22.0", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.20.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.20.0", path = "protocols/plaintext", optional = true }
libp2p-core = { version = "0.21.0", path = "core" }
libp2p-core-derive = { version = "0.20.2", path = "misc/core-derive" }
libp2p-floodsub = { version = "0.21.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.21.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.21.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.22.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.21.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.23.0", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.21.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.21.0", path = "protocols/plaintext", optional = true }
libp2p-pnet = { version = "0.19.1", path = "protocols/pnet", optional = true }
libp2p-request-response = { version = "0.1.0", path = "protocols/request-response", optional = true }
libp2p-secio = { version = "0.20.0", path = "protocols/secio", default-features = false, optional = true }
libp2p-swarm = { version = "0.20.0", path = "swarm" }
libp2p-uds = { version = "0.20.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.20.0", path = "transports/wasm-ext", optional = true }
libp2p-yamux = { version = "0.20.0", path = "muxers/yamux", optional = true }
libp2p-request-response = { version = "0.2.0", path = "protocols/request-response", optional = true }
libp2p-secio = { version = "0.21.0", path = "protocols/secio", default-features = false, optional = true }
libp2p-swarm = { version = "0.21.0", path = "swarm" }
libp2p-uds = { version = "0.21.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.21.0", path = "transports/wasm-ext", optional = true }
libp2p-yamux = { version = "0.21.0", path = "muxers/yamux", optional = true }
multiaddr = { package = "parity-multiaddr", version = "0.9.1", path = "misc/multiaddr" }
multihash = "0.11.0"
parking_lot = "0.10.0"
Expand All @@ -87,11 +87,11 @@ smallvec = "1.0"
wasm-timer = "0.2.4"

[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.20.0", path = "protocols/deflate", optional = true }
libp2p-dns = { version = "0.20.0", path = "transports/dns", optional = true }
libp2p-mdns = { version = "0.20.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.20.0", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.21.0", path = "transports/websocket", optional = true }
libp2p-deflate = { version = "0.21.0", path = "protocols/deflate", optional = true }
libp2p-dns = { version = "0.21.0", path = "transports/dns", optional = true }
libp2p-mdns = { version = "0.21.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.21.0", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.22.0", path = "transports/websocket", optional = true }

[dev-dependencies]
async-std = "1.6.2"
Expand Down
13 changes: 13 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# 0.21.0 [unreleased]

- Refactoring of connection close and disconnect behaviour. In particular, the former
`NetworkEvent::ConnectionError` is now `NetworkEvent::ConnectionClosed` with the `error`
field being an `Option` and `None` indicating an active (but not necessarily orderly) close.
This guarantees that `ConnectionEstablished` events are always eventually paired
with `ConnectionClosed` events, regardless of how connections are closed.
Correspondingly, `EstablishedConnection::close` is now `EstablishedConnection::start_close`
to reflect that an orderly close completes asynchronously in the background, with the
outcome observed by continued polling of the `Network`. In contrast, `disconnect`ing
a peer takes effect immediately without an orderly connection shutdown.
See [PR 1619](https://github.com/libp2p/rust-libp2p/pull/1619) for further details.

# 0.20.1 [2020-17-17]

- Update ed25519-dalek dependency.
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-core"
edition = "2018"
description = "Core traits and structs of libp2p"
version = "0.20.1"
version = "0.21.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
62 changes: 41 additions & 21 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,19 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
handler: H
},

/// An established connection has encountered an error.
ConnectionError {
/// An established connection has been closed.
ConnectionClosed {
/// The connection ID.
///
/// As a result of the error, the connection has been removed
/// from the `Manager` and is being closed. Hence this ID will
/// no longer resolve to a valid entry in the manager.
/// > **Note**: Closed connections are removed from the `Manager`.
/// > Hence this ID will no longer resolve to a valid entry in
/// > the manager.
id: ConnectionId,
/// Information about the connection that encountered the error.
/// Information about the closed connection.
connected: Connected<C>,
/// The error that occurred.
error: ConnectionError<HE>,
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<HE>>,
},

/// A connection has been established.
Expand Down Expand Up @@ -348,11 +349,11 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
/// Polls the manager for events relating to the managed connections.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, I, O, H, TE, HE, C>> {
// Advance the content of `local_spawns`.
while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.local_spawns), cx) {}
while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}

// Poll for the first event for which the manager still has a registered task, if any.
let event = loop {
match Stream::poll_next(Pin::new(&mut self.events_rx), cx) {
match self.events_rx.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
if self.tasks.contains_key(event.id()) { // (1)
break event
Expand Down Expand Up @@ -397,19 +398,18 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
old_endpoint: old,
new_endpoint: new,
}
},
task::Event::Error { id, error } => {
}
task::Event::Closed { id, error } => {
let id = ConnectionId(id);
let task = task.remove();
match task.state {
TaskState::Established(connected) =>
Event::ConnectionError { id, connected, error },
Event::ConnectionClosed { id, connected, error },
TaskState::Pending => unreachable!(
"`Event::Error` implies (2) occurred on that task and thus (3)."
"`Event::Closed` implies (2) occurred on that task and thus (3)."
),
}
}

})
} else {
unreachable!("By (1)")
Expand Down Expand Up @@ -455,10 +455,11 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
/// > task _may not be notified_ if sending the event fails due to
/// > the connection handler not being ready at this time.
pub fn notify_handler(&mut self, event: I) -> Result<(), I> {
let cmd = task::Command::NotifyHandler(event);
let cmd = task::Command::NotifyHandler(event); // (*)
self.task.get_mut().sender.try_send(cmd)
.map_err(|e| match e.into_inner() {
task::Command::NotifyHandler(event) => event
task::Command::NotifyHandler(event) => event,
_ => panic!("Unexpected command. Expected `NotifyHandler`") // see (*)
})
}

Expand All @@ -472,6 +473,22 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
self.task.get_mut().sender.poll_ready(cx).map_err(|_| ())
}

/// Sends a close command to the associated background task,
/// thus initiating a graceful active close of the connection.
///
/// Has no effect if the connection is already closing.
///
/// When the connection is ultimately closed, [`Event::ConnectionClosed`]
/// is emitted by [`Manager::poll`].
pub fn start_close(mut self) {
// Clone the sender so that we are guaranteed to have
// capacity for the close command (every sender gets a slot).
match self.task.get_mut().sender.clone().try_send(task::Command::Close) {
Ok(()) => {},
Err(e) => assert!(e.is_disconnected(), "No capacity for close command.")
}
}

/// Obtains information about the established connection.
pub fn connected(&self) -> &Connected<C> {
match &self.task.get().state {
Expand All @@ -480,16 +497,18 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
}
}

/// Closes the connection represented by this entry,
/// returning the connection information.
pub fn close(self) -> Connected<C> {
/// Instantly removes the entry from the manager, dropping
/// the command channel to the background task of the connection,
/// which will thus drop the connection asap without an orderly
/// close or emitting another event.
pub fn remove(self) -> Connected<C> {
match self.task.remove().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()")
}
}

/// Returns the connection id.
/// Returns the connection ID.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())
}
Expand All @@ -513,3 +532,4 @@ impl<'a, I, C> PendingEntry<'a, I, C> {
self.task.remove();
}
}

Loading