Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh-net): Add a Watchable struct for use in the Endpoint API #2806

Merged
merged 32 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
70f61a8
feat(iroh-net): Ported & modified `Watcher` implementation
matheus23 Oct 1, 2024
f946f07
feat(iroh-net): Add a Watchable struct
flub Oct 15, 2024
9982640
Merge branch 'main' into flub/watchable
matheus23 Nov 20, 2024
daa3b67
Test with loom, fix race condition, switch to Mutex for wakers
matheus23 Nov 20, 2024
1141fc9
Correct spelling
matheus23 Nov 20, 2024
22498c2
Improved `Watchable` and `Watcher` impl & API
matheus23 Nov 21, 2024
be17d23
Whoops fix test
matheus23 Nov 21, 2024
0e6bdf7
Compiles but fails tests
matheus23 Nov 22, 2024
a49e2a6
Merge branch 'main' into flub/watchable
matheus23 Dec 6, 2024
fe813ca
Fix `Watchable::set` implementation
matheus23 Dec 6, 2024
4a24c89
Address code review, make `Watcher` and `Watchable` available in pub API
matheus23 Dec 10, 2024
d287847
Merge branch 'main' into flub/watchable
matheus23 Dec 11, 2024
2de930b
Make `Watchable` not uninitialized by default.
matheus23 Dec 11, 2024
675e04d
Fix a possible race condition
matheus23 Dec 11, 2024
a186583
Adjust documentation to match implementation
matheus23 Dec 11, 2024
4dea829
Merge remote-tracking branch 'origin/main' into flub/watchable
matheus23 Dec 11, 2024
f7d4c24
Convert `DirectAddrStream` into `Watcher<Option<BTreeSet<DirectAddr>>>`
matheus23 Dec 11, 2024
3f0f65b
Merge `Endpoint::{home_relay, watch_home_realy}` together via a `Watc…
matheus23 Dec 11, 2024
b76f11b
Move `iroh::util::watchable` to `iroh::watchable` and make `util` `pu…
matheus23 Dec 11, 2024
90e825d
Adjust `util::watchable` imports to use `watchable`
matheus23 Dec 12, 2024
c6cde17
Merge remote-tracking branch 'origin/main' into flub/watchable
matheus23 Dec 12, 2024
c824573
`cargo +nightly clippy -p iroh --fix`
matheus23 Dec 12, 2024
8e485bb
`cargo make format`
matheus23 Dec 12, 2024
1974629
Small fixes
matheus23 Dec 12, 2024
890cb87
Merge remote-tracking branch 'origin/main' into flub/watchable
matheus23 Dec 12, 2024
e26a7f9
Add documentation about cancel safety and a small test
matheus23 Dec 12, 2024
bd58548
Merge remote-tracking branch 'origin/main' into flub/watchable
matheus23 Dec 12, 2024
6ca0f71
Better wording in documentation
matheus23 Dec 12, 2024
86bc24b
Remove unused imports
matheus23 Dec 12, 2024
37dead3
Fix doc links
matheus23 Dec 12, 2024
ac9d9d2
Fix newly introduced clippy lints
matheus23 Dec 12, 2024
6f078e9
Use `Watchable::default()` when appropriate
matheus23 Dec 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ missing_debug_implementations = "warn"
# require a feature enabled when using `--cfg docsrs` which we can not
# do. To enable for a crate set `#![cfg_attr(iroh_docsrs,
# feature(doc_auto_cfg))]` in the crate.
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)"] }
# We also have our own `iroh_loom` cfg to enable tokio-rs/loom testing.
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(iroh_loom)"] }

[workspace.lints.clippy]
unused-async = "warn"
1 change: 0 additions & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ tokio-tungstenite-wasm = "0.3"
tokio-util = { version = "0.7", features = ["io-util", "io", "codec", "rt"] }
tracing = "0.1"
url = { version = "2.5", features = ["serde"] }
watchable = "1.1.2"
webpki = { package = "rustls-webpki", version = "0.102" }
webpki-roots = "0.26"
x509-parser = "0.16"
Expand Down
5 changes: 2 additions & 3 deletions iroh/bench/src/iroh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{

use anyhow::{Context, Result};
use bytes::Bytes;
use futures_lite::StreamExt as _;
use iroh::{
endpoint::{Connection, ConnectionError, RecvStream, SendStream, TransportConfig},
Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl,
Expand Down Expand Up @@ -45,7 +44,7 @@ pub fn server_endpoint(
.unwrap();

if relay_url.is_some() {
ep.watch_home_relay().next().await;
ep.home_relay().initialized().await.unwrap();
}

let addr = ep.bound_sockets();
Expand Down Expand Up @@ -101,7 +100,7 @@ pub async fn connect_client(
.unwrap();

if relay_url.is_some() {
endpoint.watch_home_relay().next().await;
endpoint.home_relay().initialized().await?;
}

// TODO: We don't support passing client transport config currently
Expand Down
24 changes: 9 additions & 15 deletions iroh/examples/connect-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
//! Run the `listen-unreliable` example first (`iroh/examples/listen-unreliable.rs`), which will give you instructions on how to run this example to watch two nodes connect and exchange bytes.
use std::net::SocketAddr;

use anyhow::Context;
use clap::Parser;
use futures_lite::StreamExt;
use iroh::{Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey};
use tracing::info;

Expand Down Expand Up @@ -52,21 +50,17 @@ async fn main() -> anyhow::Result<()> {
.bind()
.await?;

let me = endpoint.node_id();
let node_addr = endpoint.node_addr().await?;
let me = node_addr.node_id;
println!("node id: {me}");
println!("node listening addresses:");
for local_endpoint in endpoint
.direct_addresses()
.next()
.await
.context("no endpoints")?
{
println!("\t{}", local_endpoint.addr)
}

let relay_url = endpoint
.home_relay()
.expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server");
node_addr
.direct_addresses
.iter()
.for_each(|addr| println!("\t{addr}"));
let relay_url = node_addr
.relay_url
.expect("Should have a relay URL, assuming a default endpoint setup.");
println!("node relay server url: {relay_url}\n");
// Build a `NodeAddr` from the node_id, relay url, and UDP addresses.
let addr = NodeAddr::from_parts(args.node_id, Some(args.relay_url), args.addrs);
Expand Down
7 changes: 4 additions & 3 deletions iroh/examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::net::SocketAddr;

use anyhow::Context;
use clap::Parser;
use futures_lite::StreamExt;
use iroh::{Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey};
use tracing::info;

Expand Down Expand Up @@ -57,15 +56,17 @@ async fn main() -> anyhow::Result<()> {
println!("node listening addresses:");
for local_endpoint in endpoint
.direct_addresses()
.next()
.initialized()
.await
.context("no endpoints")?
.context("no direct addresses")?
{
println!("\t{}", local_endpoint.addr)
}

let relay_url = endpoint
.home_relay()
.get()
.unwrap()
.expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server");
println!("node relay server url: {relay_url}\n");
// Build a `NodeAddr` from the node_id, relay url, and UDP addresses.
Expand Down
21 changes: 8 additions & 13 deletions iroh/examples/listen-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
//! This example uses the default relay servers to attempt to holepunch, and will use that relay server to relay packets if the two devices cannot establish a direct UDP connection.
//! run this example from the project root:
//! $ cargo run --example listen-unreliable
use anyhow::Context;
use futures_lite::StreamExt;
use iroh::{Endpoint, RelayMode, SecretKey};
use tracing::{info, warn};

Expand Down Expand Up @@ -37,23 +35,20 @@ async fn main() -> anyhow::Result<()> {
println!("node id: {me}");
println!("node listening addresses:");

let local_addrs = endpoint
.direct_addresses()
.next()
.await
.context("no endpoints")?
let node_addr = endpoint.node_addr().await?;
let local_addrs = node_addr
.direct_addresses
.into_iter()
.map(|endpoint| {
let addr = endpoint.addr.to_string();
.map(|addr| {
let addr = addr.to_string();
println!("\t{addr}");
addr
})
.collect::<Vec<_>>()
.join(" ");

let relay_url = endpoint
.home_relay()
.expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server");
let relay_url = node_addr
.relay_url
.expect("Should have a relay URL, assuming a default endpoint setup.");
println!("node relay server url: {relay_url}");
println!("\nin a separate terminal run:");

Expand Down
21 changes: 8 additions & 13 deletions iroh/examples/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
//! $ cargo run --example listen
use std::time::Duration;

use anyhow::Context;
use futures_lite::StreamExt;
use iroh::{endpoint::ConnectionError, Endpoint, RelayMode, SecretKey};
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -39,23 +37,20 @@ async fn main() -> anyhow::Result<()> {
println!("node id: {me}");
println!("node listening addresses:");

let local_addrs = endpoint
.direct_addresses()
.next()
.await
.context("no endpoints")?
let node_addr = endpoint.node_addr().await?;
let local_addrs = node_addr
.direct_addresses
.into_iter()
.map(|endpoint| {
let addr = endpoint.addr.to_string();
.map(|addr| {
let addr = addr.to_string();
println!("\t{addr}");
addr
})
.collect::<Vec<_>>()
.join(" ");

let relay_url = endpoint
.home_relay()
.expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server");
let relay_url = node_addr
.relay_url
.expect("Should have a relay URL, assuming a default endpoint setup.");
println!("node relay server url: {relay_url}");
println!("\nin a separate terminal run:");

Expand Down
22 changes: 6 additions & 16 deletions iroh/examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
use anyhow::{Context, Result};
use bytes::Bytes;
use clap::{Parser, Subcommand};
use futures_lite::StreamExt;
use indicatif::HumanBytes;
use iroh::{
endpoint::ConnectionError, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, SecretKey,
Expand Down Expand Up @@ -71,23 +70,18 @@ async fn provide(size: u64, relay_url: Option<String>) -> anyhow::Result<()> {

let node_id = endpoint.node_id();

for local_endpoint in endpoint
.direct_addresses()
.next()
.await
.context("no endpoints")?
{
for local_endpoint in endpoint.direct_addresses().initialized().await? {
println!("\t{}", local_endpoint.addr)
}

let relay_url = endpoint
.home_relay()
.get()?
.expect("should be connected to a relay server");
let local_addrs = endpoint
.direct_addresses()
.next()
.await
.context("no endpoints")?
.initialized()
.await?
.into_iter()
.map(|endpoint| endpoint.addr)
.collect::<Vec<_>>();
Expand Down Expand Up @@ -171,17 +165,13 @@ async fn fetch(ticket: &str, relay_url: Option<String>) -> anyhow::Result<()> {
let me = endpoint.node_id();
println!("node id: {me}");
println!("node listening addresses:");
for local_endpoint in endpoint
.direct_addresses()
.next()
.await
.context("no endpoints")?
{
for local_endpoint in endpoint.direct_addresses().initialized().await? {
println!("\t{}", local_endpoint.addr)
}

let relay_url = endpoint
.home_relay()
.get()?
.expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server");
println!("node relay server url: {relay_url}\n");

Expand Down
24 changes: 2 additions & 22 deletions iroh/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ mod tests {
}
}
}

#[derive(Debug)]
struct TestDiscovery {
node_id: NodeId,
Expand Down Expand Up @@ -578,7 +579,7 @@ mod tests {
new_endpoint(secret, disco).await
};
let ep1_addr = NodeAddr::new(ep1.node_id());
// wait for out address to be updated and thus published at least once
// wait for our address to be updated and thus published at least once
ep1.node_addr().await?;
let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
Ok(())
Expand Down Expand Up @@ -833,27 +834,6 @@ mod test_dns_pkarr {
Ok(())
}

#[tokio::test]
async fn pkarr_publish_dns_discover_empty_node_addr() -> Result<()> {
let _logging_guard = iroh_test::logging::setup();

let dns_pkarr_server = DnsPkarrServer::run().await?;
let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?;

let (ep1, _guard1) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;
let (ep2, _guard2) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;

// wait until our shared state received the update from pkarr publishing
dns_pkarr_server
.on_node(&ep1.node_id(), PUBLISH_TIMEOUT)
.await?;

// we connect only by node id!
let res = ep2.connect(ep1.node_id(), TEST_ALPN).await;
assert!(res.is_ok(), "connection established");
Ok(())
}

async fn ep_with_discovery(
relay_map: &RelayMap,
dns_pkarr_server: &DnsPkarrServer,
Expand Down
11 changes: 6 additions & 5 deletions iroh/src/discovery/local_swarm_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ use tokio::{
};
use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, error, info_span, trace, warn, Instrument};
use watchable::Watchable;

use crate::{
discovery::{Discovery, DiscoveryItem},
watchable::Watchable,
Endpoint,
};

Expand Down Expand Up @@ -152,8 +152,8 @@ impl LocalSwarmDiscovery {
)?;

let local_addrs: Watchable<Option<(Option<RelayUrl>, BTreeSet<SocketAddr>)>> =
Watchable::new(None);
let addrs_change = local_addrs.watch();
Watchable::default();
let mut addrs_change = local_addrs.watch();
let discovery_fut = async move {
let mut node_addrs: HashMap<PublicKey, Peer> = HashMap::default();
let mut subscribers = Subscribers::new();
Expand All @@ -169,7 +169,7 @@ impl LocalSwarmDiscovery {
msg = recv.recv() => {
msg
}
Ok(Some((_url, addrs)))= addrs_change.next_value_async() => {
Ok(Some((_url, addrs))) = addrs_change.updated() => {
tracing::trace!(?addrs, "LocalSwarmDiscovery address changed");
discovery.remove_all();
let addrs =
Expand Down Expand Up @@ -379,7 +379,8 @@ impl Discovery for LocalSwarmDiscovery {

fn publish(&self, url: Option<&RelayUrl>, addrs: &BTreeSet<SocketAddr>) {
self.local_addrs
.replace(Some((url.cloned(), addrs.clone())));
.set(Some((url.cloned(), addrs.clone())))
.ok();
}

fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
Expand Down
Loading
Loading