Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/webrtc: Implement stream message framing #9

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# This file contains revisions that are to be ignored by git when running `git blame`.
#
# This does NOT work automatically, you first need to tell Git about this file.
# To do so, run `git config --global blame.ignoreRevsFile .git-blame-ignore-revs`.
# You may want to run this without `--global` if you have a different naming convention for this file in other repositories.
#
# Format with rustfmt
f701b24ec0f99be49444a6e7de950c66b01b2f3f
9 changes: 9 additions & 0 deletions misc/prost-codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,12 @@ pub enum Error {
std::io::Error,
),
}

impl From<Error> for std::io::Error {
fn from(e: Error) -> Self {
match e {
Error::Decode(e) => e.into(),
Error::Io(e) => e,
}
}
}
4 changes: 4 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

- Update to `libp2p-swarm` `v0.39.0`.

- Allow publishing with any `impl Into<TopicHash>` as a topic. See [PR 2862].

[PR 2862]: https://github.com/libp2p/rust-libp2p/pull/2862

# 0.40.0

- Update prost requirement from 0.10 to 0.11 which no longer installs the protoc Protobuf compiler.
Expand Down
9 changes: 5 additions & 4 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,19 +587,20 @@ where
}

/// Publishes a message with multiple topics to the network.
pub fn publish<H: Hasher>(
pub fn publish(
&mut self,
topic: Topic<H>,
topic: impl Into<TopicHash>,
data: impl Into<Vec<u8>>,
) -> Result<MessageId, PublishError> {
let data = data.into();
let topic = topic.into();

// Transform the data before building a raw_message.
let transformed_data = self
.data_transform
.outbound_transform(&topic.hash(), data.clone())?;
.outbound_transform(&topic, data.clone())?;

let raw_message = self.build_raw_message(topic.into(), transformed_data)?;
let raw_message = self.build_raw_message(topic, transformed_data)?;

// calculate the message id from the un-transformed data
let msg_id = self.config.message_id(&GossipsubMessage {
Expand Down
6 changes: 6 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
# 0.40.0 [unreleased]

- Add support for multiple protocol names. Update `Kademlia`, `KademliaConfig`,
and `KademliaProtocolConfig` accordingly. See [Issue 2837]. See [PR 2846].

- Update to `libp2p-swarm` `v0.39.0`.

[Issue 2837]: https://github.com/libp2p/rust-libp2p/issues/2837
[PR 2846]: https://github.com/libp2p/rust-libp2p/pull/2846

# 0.39.0

- Update prost requirement from 0.10 to 0.11 which no longer installs the protoc Protobuf compiler.
Expand Down
22 changes: 18 additions & 4 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,28 @@ impl Default for KademliaConfig {
}

impl KademliaConfig {
/// Sets custom protocol names.
///
/// Kademlia nodes only communicate with other nodes using the same protocol
/// name. Using custom name(s) therefore allows to segregate the DHT from
/// others, if that is desired.
///
/// More than one protocol name can be supplied. In this case the node will
/// be able to talk to other nodes supporting any of the provided names.
/// Multiple names must be used with caution to avoid network partitioning.
pub fn set_protocol_names(&mut self, names: Vec<Cow<'static, [u8]>>) -> &mut Self {
self.protocol_config.set_protocol_names(names);
self
}

/// Sets a custom protocol name.
///
/// Kademlia nodes only communicate with other nodes using the same protocol
/// name. Using a custom name therefore allows to segregate the DHT from
/// others, if that is desired.
#[deprecated(since = "0.40.0", note = "use `set_protocol_names()` instead")]
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
self.protocol_config.set_protocol_name(name);
self
self.set_protocol_names(std::iter::once(name.into()).collect())
}

/// Sets the timeout for a single query.
Expand Down Expand Up @@ -403,8 +417,8 @@ where
}

/// Get the protocol name of this kademlia instance.
pub fn protocol_name(&self) -> &[u8] {
self.protocol_config.protocol_name()
pub fn protocol_names(&self) -> &[Cow<'static, [u8]>] {
self.protocol_config.protocol_names()
}

/// Creates a new `Kademlia` network behaviour with the given configuration.
Expand Down
23 changes: 15 additions & 8 deletions protocols/kad/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,28 @@ impl From<KadPeer> for proto::message::Peer {
// `OutboundUpgrade` to be just a single message
#[derive(Debug, Clone)]
pub struct KademliaProtocolConfig {
protocol_name: Cow<'static, [u8]>,
protocol_names: Vec<Cow<'static, [u8]>>,
/// Maximum allowed size of a packet.
max_packet_size: usize,
}

impl KademliaProtocolConfig {
/// Returns the configured protocol name.
pub fn protocol_name(&self) -> &[u8] {
&self.protocol_name
pub fn protocol_names(&self) -> &[Cow<'static, [u8]>] {
&self.protocol_names
}

/// Modifies the protocol name used on the wire. Can be used to create incompatibilities
/// Modifies the protocol names used on the wire. Can be used to create incompatibilities
/// between networks on purpose.
pub fn set_protocol_names(&mut self, names: Vec<Cow<'static, [u8]>>) {
self.protocol_names = names;
}

/// Sets single protocol name used on the wire. Can be used to create incompatibilities
/// between networks on purpose.
#[deprecated(since = "0.40.0", note = "use `set_protocol_names()` instead")]
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) {
self.protocol_name = name.into();
self.set_protocol_names(std::iter::once(name.into()).collect());
}

/// Modifies the maximum allowed size of a single Kademlia packet.
Expand All @@ -168,18 +175,18 @@ impl KademliaProtocolConfig {
impl Default for KademliaProtocolConfig {
fn default() -> Self {
KademliaProtocolConfig {
protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME),
protocol_names: iter::once(Cow::Borrowed(DEFAULT_PROTO_NAME)).collect(),
max_packet_size: DEFAULT_MAX_PACKET_SIZE,
}
}
}

impl UpgradeInfo for KademliaProtocolConfig {
type Info = Cow<'static, [u8]>;
type InfoIter = iter::Once<Self::Info>;
type InfoIter = std::vec::IntoIter<Self::Info>;

fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.protocol_name.clone())
self.protocol_names.clone().into_iter()
}
}

Expand Down
9 changes: 9 additions & 0 deletions transports/webrtc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
asynchronous-codec = "0.6"
async-trait = "0.1"
bytes = "1"
fnv = "1.0"
Expand All @@ -22,18 +23,26 @@ libp2p-core = { version = "0.35.0", path = "../../core", default-features = fals
libp2p-noise = { version = "0.38.0", path = "../../transports/noise" }
log = "0.4"
multihash = { version = "0.16", default-features = false, features = ["sha2"] }
prost = "0.11"
prost-codec = { version = "0.2", path = "../../misc/prost-codec" }
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
stun = "0.4"
thiserror = "1"
tinytemplate = "1.2"
tokio-crate = { package = "tokio", version = "1.18", features = ["net"]}
tokio-util = { version = "0.7", features = ["compat"] }
webrtc = "0.4.0"

[build-dependencies]
prost-build = "0.11"

[dev-dependencies]
anyhow = "1.0"
env_logger = "0.9"
libp2p = { path = "../..", features = ["request-response"], default-features = false }
rand_core = "0.5"
rcgen = "0.9"
quickcheck = "1"
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }
asynchronous-codec = { version = "0.6" }
23 changes: 23 additions & 0 deletions transports/webrtc/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2022 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

fn main() {
prost_build::compile_protos(&["src/message.proto"], &["src"]).unwrap();
}
12 changes: 6 additions & 6 deletions transports/webrtc/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod poll_data_channel;
mod substream;

use futures::{
channel::{
Expand All @@ -42,7 +42,7 @@ use std::{
};

use crate::error::Error;
pub(crate) use poll_data_channel::PollDataChannel;
pub(crate) use substream::Substream;

const MAX_DATA_CHANNELS_IN_FLIGHT: usize = 10;

Expand All @@ -57,7 +57,7 @@ pub struct Connection {
incoming_data_channels_rx: mpsc::Receiver<Arc<DetachedDataChannel>>,

/// Temporary read buffer's capacity (equal for all data channels).
/// See [`PollDataChannel`] `read_buf_cap`.
/// See [`Substream`] `read_buf_cap`.
read_buf_cap: Option<usize>,

/// Future, which, once polled, will result in an outbound substream.
Expand Down Expand Up @@ -149,7 +149,7 @@ impl Connection {
}

impl<'a> StreamMuxer for Connection {
type Substream = PollDataChannel;
type Substream = Substream;
type Error = Error;

fn poll_inbound(
Expand All @@ -160,7 +160,7 @@ impl<'a> StreamMuxer for Connection {
Some(detached) => {
trace!("Incoming substream {}", detached.stream_identifier());

let mut ch = PollDataChannel::new(detached);
let mut ch = Substream::new(detached);
if let Some(cap) = self.read_buf_cap {
ch.set_read_buf_capacity(cap);
}
Expand Down Expand Up @@ -213,7 +213,7 @@ impl<'a> StreamMuxer for Connection {

match ready!(fut.as_mut().poll(cx)) {
Ok(detached) => {
let mut ch = PollDataChannel::new(detached);
let mut ch = Substream::new(detached);
if let Some(cap) = self.read_buf_cap {
ch.set_read_buf_capacity(cap);
}
Expand Down
Loading