Skip to content

Commit

Permalink
Allow responses to be sent on the same connection.
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman S. Borschel committed Feb 16, 2020
1 parent 51b7102 commit ae6bfe9
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 109 deletions.
1 change: 0 additions & 1 deletion core/src/connection/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ where
mod tests {
use super::*;
use crate::transport;
use futures::prelude::*;

#[test]
fn incoming_event() {
Expand Down
17 changes: 10 additions & 7 deletions misc/core-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
let protocols_handler = quote!{::libp2p::swarm::ProtocolsHandler};
let into_proto_select_ident = quote!{::libp2p::swarm::IntoProtocolsHandlerSelect};
let peer_id = quote!{::libp2p::core::PeerId};
let connection_id = quote!{::libp2p::core::connection::ConnectionId};
let connected_point = quote!{::libp2p::core::ConnectedPoint};
let listener_id = quote!{::libp2p::core::connection::ListenerId};

Expand Down Expand Up @@ -294,10 +295,10 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
})
};

// Build the list of variants to put in the body of `inject_node_event()`.
// Build the list of variants to put in the body of `inject_event()`.
//
// The event type is a construction of nested `#either_ident`s of the events of the children.
// We call `inject_node_event` on the corresponding child.
// We call `inject_event` on the corresponding child.
let inject_node_event_stmts = data_struct.fields.iter().enumerate().filter(|f| !is_ignored(&f.1)).enumerate().map(|(enum_n, (field_n, field))| {
let mut elem = if enum_n != 0 {
quote!{ #either_ident::Second(ev) }
Expand All @@ -310,8 +311,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}

Some(match field.ident {
Some(ref i) => quote!{ #elem => self.#i.inject_node_event(peer_id, ev) },
None => quote!{ #elem => self.#field_n.inject_node_event(peer_id, ev) },
Some(ref i) => quote!{ #elem => self.#i.inject_event(peer_id, connection_id, ev) },
None => quote!{ #elem => self.#field_n.inject_event(peer_id, connection_id, ev) },
})
});

Expand Down Expand Up @@ -411,9 +412,10 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id });
}
std::task::Poll::Ready(#network_behaviour_action::SendEvent { peer_id, event }) => {
return std::task::Poll::Ready(#network_behaviour_action::SendEvent {
std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, connection, event }) => {
return std::task::Poll::Ready(#network_behaviour_action::NotifyHandler {
peer_id,
connection,
event: #wrapped_event,
});
}
Expand Down Expand Up @@ -485,9 +487,10 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#(#inject_listener_closed_stmts);*
}

fn inject_node_event(
fn inject_event(
&mut self,
peer_id: #peer_id,
connection_id: #connection_id,
event: <<Self::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::OutEvent
) {
match event {
Expand Down
12 changes: 10 additions & 2 deletions misc/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@

use crate::service::{MdnsService, MdnsPacket, build_query_response, build_service_discovery_response};
use futures::prelude::*;
use libp2p_core::{address_translation, ConnectedPoint, Multiaddr, PeerId, multiaddr::Protocol};
use libp2p_core::{
ConnectedPoint,
Multiaddr,
PeerId,
address_translation,
connection::ConnectionId,
multiaddr::Protocol
};
use libp2p_swarm::{
NetworkBehaviour,
NetworkBehaviourAction,
Expand Down Expand Up @@ -191,9 +198,10 @@ impl NetworkBehaviour for Mdns {

fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}

fn inject_node_event(
fn inject_event(
&mut self,
_: PeerId,
_: ConnectionId,
_ev: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
void::unreachable(_ev)
Expand Down
23 changes: 15 additions & 8 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubs
use crate::topic::Topic;
use cuckoofilter::CuckooFilter;
use fnv::FnvHashSet;
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
use libp2p_swarm::{
NetworkBehaviour,
NetworkBehaviourAction,
Expand Down Expand Up @@ -80,8 +80,9 @@ impl Floodsub {
// Send our topics to this node if we're already connected to it.
if self.connected_peers.contains_key(&peer_id) {
for topic in self.subscribed_topics.iter().cloned() {
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
connection: None,
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
Expand Down Expand Up @@ -113,8 +114,9 @@ impl Floodsub {
}

for peer in self.connected_peers.keys() {
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
connection: None,
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
Expand Down Expand Up @@ -143,8 +145,9 @@ impl Floodsub {
self.subscribed_topics.remove(pos);

for peer in self.connected_peers.keys() {
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
connection: None,
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
Expand Down Expand Up @@ -208,8 +211,9 @@ impl Floodsub {
continue;
}

self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
connection: None,
event: FloodsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
Expand All @@ -235,8 +239,9 @@ impl NetworkBehaviour for Floodsub {
// We need to send our subscriptions to the newly-connected node.
if self.target_peers.contains(&id) {
for topic in self.subscribed_topics.iter().cloned() {
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: id.clone(),
connection: None,
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
Expand All @@ -262,9 +267,10 @@ impl NetworkBehaviour for Floodsub {
}
}

fn inject_node_event(
fn inject_event(
&mut self,
propagation_source: PeerId,
_connection: ConnectionId,
event: InnerMessage,
) {
// We ignore successful sends event.
Expand Down Expand Up @@ -338,8 +344,9 @@ impl NetworkBehaviour for Floodsub {
}

for (peer_id, rpc) in rpcs_to_dispatch {
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
connection: None,
event: rpc,
});
}
Expand Down
46 changes: 31 additions & 15 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::protocol::{
};
use crate::topic::{Topic, TopicHash};
use futures::prelude::*;
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use log::{debug, error, info, trace, warn};
use lru::LruCache;
Expand Down Expand Up @@ -146,8 +146,9 @@ impl Gossipsub {

for peer in peer_list {
debug!("Sending SUBSCRIBE to peer: {:?}", peer);
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
connection: None,
event: event.clone(),
});
}
Expand Down Expand Up @@ -191,8 +192,9 @@ impl Gossipsub {

for peer in peer_list {
debug!("Sending UNSUBSCRIBE to peer: {:?}", peer);
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
connection: None,
event: event.clone(),
});
}
Expand Down Expand Up @@ -281,8 +283,9 @@ impl Gossipsub {
// Send to peers we know are subscribed to the topic.
for peer_id in recipient_peers.iter() {
debug!("Sending message to peer: {:?}", peer_id);
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
connection: None,
event: event.clone(),
});
}
Expand Down Expand Up @@ -461,8 +464,9 @@ impl Gossipsub {
debug!("IWANT: Sending cached messages to peer: {:?}", peer_id);
// Send the messages to the peer
let message_list = cached_messages.into_iter().map(|entry| entry.1).collect();
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
connection: None,
event: Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: message_list,
Expand Down Expand Up @@ -508,8 +512,9 @@ impl Gossipsub {
"GRAFT: Not subscribed to topics - Sending PRUNE to peer: {:?}",
peer_id
);
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
connection: None,
event: Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
Expand Down Expand Up @@ -851,8 +856,9 @@ impl Gossipsub {
grafts.append(&mut prunes);

// send the control messages
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
connection: None,
event: Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
Expand All @@ -869,8 +875,9 @@ impl Gossipsub {
topic_hash: topic_hash.clone(),
})
.collect();
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
connection: None,
event: Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
Expand Down Expand Up @@ -908,8 +915,9 @@ impl Gossipsub {

for peer in recipient_peers.iter() {
debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
connection: None,
event: event.clone(),
});
}
Expand Down Expand Up @@ -970,8 +978,9 @@ impl Gossipsub {
/// Takes each control action mapping and turns it into a message
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain() {
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer,
connection: None,
event: Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
Expand Down Expand Up @@ -1010,8 +1019,9 @@ impl NetworkBehaviour for Gossipsub {

if !subscriptions.is_empty() {
// send our subscriptions to the peer
self.events.push_back(NetworkBehaviourAction::SendEvent {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: id.clone(),
connection: None,
event: Arc::new(GossipsubRpc {
messages: Vec::new(),
subscriptions,
Expand Down Expand Up @@ -1074,7 +1084,7 @@ impl NetworkBehaviour for Gossipsub {
debug_assert!(was_in.is_some());
}

fn inject_node_event(&mut self, propagation_source: PeerId, event: GossipsubRpc) {
fn inject_event(&mut self, propagation_source: PeerId, _: ConnectionId, event: GossipsubRpc) {
// Handle subscriptions
// Update connected peers topics
self.handle_received_subscriptions(&event.subscriptions, &propagation_source);
Expand Down Expand Up @@ -1128,16 +1138,22 @@ impl NetworkBehaviour for Gossipsub {
if let Some(event) = self.events.pop_front() {
// clone send event reference if others references are present
match event {
NetworkBehaviourAction::SendEvent {
NetworkBehaviourAction::NotifyHandler {
peer_id,
connection,
event: send_event,
} => match Arc::try_unwrap(send_event) {
Ok(event) => {
return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event });
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
connection,
event
});
}
Err(event) => {
return Poll::Ready(NetworkBehaviourAction::SendEvent {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
connection,
event: (*event).clone(),
});
}
Expand Down
Loading

0 comments on commit ae6bfe9

Please sign in to comment.