From 08e5a2c291c885634eec14dfd0a40f6441dd2161 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 10 Apr 2023 14:33:32 +0100 Subject: [PATCH 1/2] Don't use outboundopeninfo --- protocols/kad/src/handler.rs | 74 +++++++++++++++--------------------- 1 file changed, 31 insertions(+), 43 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 22c1253dd1e..752a04b2879 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -67,8 +67,7 @@ pub struct KademliaHandler { /// List of outbound substreams that are waiting to become active next. /// Contains the request we want to send, and the user data if we expect an answer. - requested_streams: - VecDeque)>>, + requested_streams: VecDeque<(KadRequestMsg, Option)>, /// List of active inbound substreams with the state they are in. inbound_substreams: SelectAll>, @@ -507,19 +506,22 @@ where fn on_fully_negotiated_outbound( &mut self, - FullyNegotiatedOutbound { - protocol, - info: (msg, user_data), - }: FullyNegotiatedOutbound< + FullyNegotiatedOutbound { protocol, info: () }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, >, ) { - self.outbound_substreams - .push(OutboundSubstreamState::PendingSend( - protocol, msg, user_data, - )); + if let Some((msg, user_data)) = self.requested_streams.pop_front() { + self.outbound_substreams + .push(OutboundSubstreamState::PendingSend( + protocol, msg, user_data, + )); + } else { + debug_assert!(false, "Requested outbound stream without message") + } + self.num_requested_outbound_streams -= 1; + if let ProtocolStatus::Unconfirmed = self.protocol_status { // Upon the first successfully negotiated substream, we know that the // remote is configured with the same protocol name and we want @@ -587,9 +589,7 @@ where fn on_dial_upgrade_error( &mut self, DialUpgradeError { - info: (_, user_data), - error, - .. + info: (), error, .. }: DialUpgradeError< ::OutboundOpenInfo, ::OutboundProtocol, @@ -597,10 +597,12 @@ where ) { // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't // continue trying - if let Some(user_data) = user_data { + + if let Some((_, Some(user_data))) = self.requested_streams.pop_front() { self.outbound_substreams .push(OutboundSubstreamState::ReportError(error.into(), user_data)); } + self.num_requested_outbound_streams -= 1; } } @@ -614,8 +616,7 @@ where type Error = io::Error; // TODO: better error type? type InboundProtocol = Either; type OutboundProtocol = KademliaProtocolConfig; - // Message of the request to send to the remote, and user data if we expect an answer. - type OutboundOpenInfo = (KadRequestMsg, Option); + type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { @@ -645,10 +646,7 @@ where } KademliaHandlerIn::FindNodeReq { key, user_data } => { let msg = KadRequestMsg::FindNode { key }; - self.requested_streams.push_back(SubstreamProtocol::new( - self.config.protocol_config.clone(), - (msg, Some(user_data)), - )); + self.requested_streams.push_back((msg, Some(user_data))); } KademliaHandlerIn::FindNodeRes { closer_peers, @@ -656,10 +654,7 @@ where } => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }), KademliaHandlerIn::GetProvidersReq { key, user_data } => { let msg = KadRequestMsg::GetProviders { key }; - self.requested_streams.push_back(SubstreamProtocol::new( - self.config.protocol_config.clone(), - (msg, Some(user_data)), - )); + self.requested_streams.push_back((msg, Some(user_data))); } KademliaHandlerIn::GetProvidersRes { closer_peers, @@ -674,24 +669,15 @@ where ), KademliaHandlerIn::AddProvider { key, provider } => { let msg = KadRequestMsg::AddProvider { key, provider }; - self.requested_streams.push_back(SubstreamProtocol::new( - self.config.protocol_config.clone(), - (msg, None), - )); + self.requested_streams.push_back((msg, None)); } KademliaHandlerIn::GetRecord { key, user_data } => { let msg = KadRequestMsg::GetValue { key }; - self.requested_streams.push_back(SubstreamProtocol::new( - self.config.protocol_config.clone(), - (msg, Some(user_data)), - )); + self.requested_streams.push_back((msg, Some(user_data))); } KademliaHandlerIn::PutRecord { record, user_data } => { let msg = KadRequestMsg::PutValue { record }; - self.requested_streams.push_back(SubstreamProtocol::new( - self.config.protocol_config.clone(), - (msg, Some(user_data)), - )); + self.requested_streams.push_back((msg, Some(user_data))); } KademliaHandlerIn::GetRecordRes { record, @@ -750,11 +736,13 @@ where let num_in_progress_outbound_substreams = self.outbound_substreams.len() + self.num_requested_outbound_streams; - if num_in_progress_outbound_substreams < MAX_NUM_SUBSTREAMS { - if let Some(protocol) = self.requested_streams.pop_front() { - self.num_requested_outbound_streams += 1; - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }); - } + if num_in_progress_outbound_substreams < MAX_NUM_SUBSTREAMS + && !self.requested_streams.is_empty() + { + self.num_requested_outbound_streams += 1; + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(self.config.protocol_config.clone(), ()), + }); } if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() { @@ -825,7 +813,7 @@ where { type Item = ConnectionHandlerEvent< KademliaProtocolConfig, - (KadRequestMsg, Option), + (), KademliaHandlerEvent, io::Error, >; @@ -961,7 +949,7 @@ where { type Item = ConnectionHandlerEvent< KademliaProtocolConfig, - (KadRequestMsg, Option), + (), KademliaHandlerEvent, io::Error, >; From 2a24f1c8c3e4f79de6b0f241e7affb65a477d2ff Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 11 Apr 2023 23:56:48 +0200 Subject: [PATCH 2/2] Use correct condition for requesting new streams --- protocols/kad/src/handler_priv.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/protocols/kad/src/handler_priv.rs b/protocols/kad/src/handler_priv.rs index 7a1e544b8eb..5774212b88a 100644 --- a/protocols/kad/src/handler_priv.rs +++ b/protocols/kad/src/handler_priv.rs @@ -67,7 +67,7 @@ pub struct KademliaHandler { /// List of outbound substreams that are waiting to become active next. /// Contains the request we want to send, and the user data if we expect an answer. - requested_streams: VecDeque<(KadRequestMsg, Option)>, + pending_messages: VecDeque<(KadRequestMsg, Option)>, /// List of active inbound substreams with the state they are in. inbound_substreams: SelectAll>, @@ -498,7 +498,7 @@ where inbound_substreams: Default::default(), outbound_substreams: Default::default(), num_requested_outbound_streams: 0, - requested_streams: Default::default(), + pending_messages: Default::default(), keep_alive, protocol_status: ProtocolStatus::Unconfirmed, } @@ -511,7 +511,7 @@ where ::OutboundOpenInfo, >, ) { - if let Some((msg, user_data)) = self.requested_streams.pop_front() { + if let Some((msg, user_data)) = self.pending_messages.pop_front() { self.outbound_substreams .push(OutboundSubstreamState::PendingSend( protocol, msg, user_data, @@ -598,7 +598,7 @@ where // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't // continue trying - if let Some((_, Some(user_data))) = self.requested_streams.pop_front() { + if let Some((_, Some(user_data))) = self.pending_messages.pop_front() { self.outbound_substreams .push(OutboundSubstreamState::ReportError(error.into(), user_data)); } @@ -646,7 +646,7 @@ where } KademliaHandlerIn::FindNodeReq { key, user_data } => { let msg = KadRequestMsg::FindNode { key }; - self.requested_streams.push_back((msg, Some(user_data))); + self.pending_messages.push_back((msg, Some(user_data))); } KademliaHandlerIn::FindNodeRes { closer_peers, @@ -654,7 +654,7 @@ where } => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }), KademliaHandlerIn::GetProvidersReq { key, user_data } => { let msg = KadRequestMsg::GetProviders { key }; - self.requested_streams.push_back((msg, Some(user_data))); + self.pending_messages.push_back((msg, Some(user_data))); } KademliaHandlerIn::GetProvidersRes { closer_peers, @@ -669,15 +669,15 @@ where ), KademliaHandlerIn::AddProvider { key, provider } => { let msg = KadRequestMsg::AddProvider { key, provider }; - self.requested_streams.push_back((msg, None)); + self.pending_messages.push_back((msg, None)); } KademliaHandlerIn::GetRecord { key, user_data } => { let msg = KadRequestMsg::GetValue { key }; - self.requested_streams.push_back((msg, Some(user_data))); + self.pending_messages.push_back((msg, Some(user_data))); } KademliaHandlerIn::PutRecord { record, user_data } => { let msg = KadRequestMsg::PutValue { record }; - self.requested_streams.push_back((msg, Some(user_data))); + self.pending_messages.push_back((msg, Some(user_data))); } KademliaHandlerIn::GetRecordRes { record, @@ -737,7 +737,7 @@ where let num_in_progress_outbound_substreams = self.outbound_substreams.len() + self.num_requested_outbound_streams; if num_in_progress_outbound_substreams < MAX_NUM_SUBSTREAMS - && !self.requested_streams.is_empty() + && self.num_requested_outbound_streams < self.pending_messages.len() { self.num_requested_outbound_streams += 1; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {