Skip to content

Commit

Permalink
Remove temp:// and some old MessageProxy variants (#9129)
Browse files Browse the repository at this point in the history
### Related

* Closes: #8761

### What

Removes the temporary `temp://` URI scheme and streamlines handling of
URIs.

---------

Co-authored-by: jprochazk <[email protected]>
Co-authored-by: Jan Procházka <[email protected]>
  • Loading branch information
3 people authored Feb 26, 2025
1 parent b181660 commit 811e48b
Show file tree
Hide file tree
Showing 47 changed files with 320 additions and 444 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6550,6 +6550,7 @@ dependencies = [
"nohash-hasher",
"once_cell",
"parking_lot",
"percent-encoding",
"rand",
"re_build_info",
"re_build_tools",
Expand All @@ -6565,6 +6566,7 @@ dependencies = [
"re_memory",
"re_smart_channel",
"re_types_core",
"re_uri",
"re_web_viewer_server",
"similar-asserts",
"thiserror 1.0.65",
Expand Down Expand Up @@ -7536,6 +7538,7 @@ dependencies = [
"re_sorbet",
"re_tracing",
"re_types",
"re_uri",
"re_video",
"re_viewer",
"re_viewer_context",
Expand Down Expand Up @@ -7616,6 +7619,7 @@ dependencies = [
"re_protos",
"re_sdk",
"re_sorbet",
"re_uri",
"re_video",
"re_web_viewer_server",
"tokio",
Expand Down
91 changes: 28 additions & 63 deletions crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use re_log::debug;
use re_grpc_client::message_proxy;
use re_log_types::LogMsg;
use re_smart_channel::{Receiver, SmartChannelSource, SmartMessageSource};

Expand Down Expand Up @@ -30,14 +30,8 @@ pub enum DataSource {
#[cfg(not(target_arch = "wasm32"))]
Stdin,

/// A recording on a Rerun dataplatform server, over `rerun://` gRPC interface.
RedapRecordingEndpoint(re_uri::RecordingEndpoint),

/// A catalog on a Rerun dataplatform server, over `rerun://` gRPC interface.
RedapCatalogEndpoint(re_uri::CatalogEndpoint),

/// A stream of messages over gRPC, relayed from the SDK.
MessageProxy { url: String },
/// A `rerun://` URI pointing to a recording or catalog.
RerunGrpcStream(re_uri::RedapUri),
}

// TODO(#9058): Temporary hack, see issue for how to fix this.
Expand All @@ -52,7 +46,7 @@ impl DataSource {
/// Tries to figure out if it looks like a local path,
/// a web-socket address, or a http url.
#[cfg_attr(target_arch = "wasm32", expect(clippy::needless_pass_by_value))]
pub fn from_uri(_file_source: re_log_types::FileSource, mut uri: String) -> Self {
pub fn from_uri(_file_source: re_log_types::FileSource, uri: String) -> Self {
#[cfg(not(target_arch = "wasm32"))]
{
use itertools::Itertools as _;
Expand Down Expand Up @@ -108,47 +102,14 @@ impl DataSource {
}
}

match re_uri::RedapUri::try_from(uri.as_str()) {
Ok(re_uri::RedapUri::Recording(endpoint)) => {
debug!("Recognized recording endpoint: {:?}", endpoint);
return Self::RedapRecordingEndpoint(endpoint);
}
Ok(re_uri::RedapUri::Catalog(endpoint)) => {
debug!("Recognized catalog endpoint: {:?}", endpoint);
return Self::RedapCatalogEndpoint(endpoint);
}
Ok(re_uri::RedapUri::Proxy(proxy)) => {
return Self::MessageProxy {
url: proxy.as_url(),
}
}
Err(_) => {} // Not a Rerun URI,
};
if let Ok(endpoint) = re_uri::RedapUri::try_from(uri.as_str()) {
return Self::RerunGrpcStream(endpoint);
}

if (uri.starts_with("http://") || uri.starts_with("https://"))
&& (uri.ends_with(".rrd") || uri.ends_with(".rbl"))
{
Self::RrdHttpUrl {
url: uri,
follow: false,
}
} else if uri.starts_with("http://") || uri.starts_with("https://") {
Self::MessageProxy { url: uri }
} else if uri.ends_with(".rrd") || uri.ends_with(".rbl") {
Self::RrdHttpUrl {
url: uri,
follow: false,
}
} else {
// If this is something like `foo.com` we can't know what it is until we connect to it.
// We could/should connect and see what it is, but for now we just take a wild guess instead:
re_log::debug!("Assuming gRPC endpoint");
if !uri.contains("://") {
// TODO(jan): this should be `https` if it's not localhost, anything hosted over public network
// should be going through https, anyway.
uri = format!("http://{uri}");
}
Self::MessageProxy { url: uri }
// by default, we just assume an rrd over http
Self::RrdHttpUrl {
url: uri,
follow: false,
}
}

Expand All @@ -160,8 +121,7 @@ impl DataSource {
Self::FileContents(_, file_contents) => Some(file_contents.name.clone()),
#[cfg(not(target_arch = "wasm32"))]
Self::Stdin => None,
Self::RedapCatalogEndpoint { .. } | Self::RedapRecordingEndpoint { .. } => None, // TODO(jleibs): This needs to come from the server.
Self::MessageProxy { .. } => None,
Self::RerunGrpcStream { .. } => None,
}
}

Expand Down Expand Up @@ -269,7 +229,7 @@ impl DataSource {
Ok(StreamSource::LogMessages(rx))
}

Self::RedapRecordingEndpoint(endpoint) => {
Self::RerunGrpcStream(re_uri::RedapUri::Recording(endpoint)) => {
re_log::debug!(
"Loading recording `{}` from `{}`…",
endpoint.recording_id,
Expand All @@ -280,7 +240,7 @@ impl DataSource {

let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::RerunGrpcStream { url: url.clone() },
re_smart_channel::SmartChannelSource::RerunGrpcStream { url: url.clone() },
re_smart_channel::SmartChannelSource::RedapGrpcStream { url: url.clone() },
);

let on_cmd = Box::new(move |cmd: re_grpc_client::redap::Command| match cmd {
Expand Down Expand Up @@ -309,11 +269,13 @@ impl DataSource {
Ok(StreamSource::LogMessages(rx))
}

Self::RedapCatalogEndpoint(endpoint) => Ok(StreamSource::CatalogData { endpoint }),
Self::RerunGrpcStream(re_uri::RedapUri::Catalog(endpoint)) => {
Ok(StreamSource::CatalogData { endpoint })
}

Self::MessageProxy { url } => re_grpc_client::message_proxy::stream(&url, on_msg)
.map_err(|err| err.into())
.map(StreamSource::LogMessages),
Self::RerunGrpcStream(re_uri::RedapUri::Proxy(endpoint)) => Ok(
StreamSource::LogMessages(message_proxy::stream(endpoint, on_msg)),
),
}
}
}
Expand Down Expand Up @@ -368,10 +330,13 @@ fn test_data_source_from_uri() {
"www.foo.zip/blueprint.rbl",
];
let grpc = [
"http://foo.zip",
"https://foo.zip",
"http://127.0.0.1:9876",
"https://redap.rerun.io",
"rerun://foo.zip",
"rerun+http://foo.zip",
"rerun+https://foo.zip",
"rerun://127.0.0.1:9876",
"rerun+http://127.0.0.1:9876",
"rerun://redap.rerun.io",
"rerun+https://redap.rerun.io",
];

let file_source = FileSource::DragAndDrop {
Expand Down Expand Up @@ -403,7 +368,7 @@ fn test_data_source_from_uri() {
for uri in grpc {
if !matches!(
DataSource::from_uri(file_source.clone(), uri.to_owned()),
DataSource::MessageProxy { .. }
DataSource::RerunGrpcStream { .. }
) {
eprintln!("Expected {uri:?} to be categorized as MessageProxy");
failed = true;
Expand Down
1 change: 0 additions & 1 deletion crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Communications with an Rerun Data Platform gRPC server.
pub mod message_proxy;
pub use message_proxy::MessageProxyUrl;

pub mod redap;

Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_grpc_client/src/message_proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod read;
pub use read::{stream, MessageProxyUrl};
pub use read::stream;

#[cfg(not(target_arch = "wasm32"))]
pub mod write;
Expand Down
117 changes: 9 additions & 108 deletions crates/store/re_grpc_client/src/message_proxy/read.rs
Original file line number Diff line number Diff line change
@@ -1,107 +1,41 @@
use std::fmt::Display;

use re_log_encoding::protobuf_conversions::log_msg_from_proto;
use re_log_types::LogMsg;
use re_protos::sdk_comms::v0::message_proxy_client::MessageProxyClient;
use re_protos::sdk_comms::v0::ReadMessagesRequest;
use tokio_stream::StreamExt;
use url::Url;

use crate::StreamError;
use crate::TonicStatusError;
use crate::MAX_DECODING_MESSAGE_SIZE;

pub fn stream(
url: &str,
endpoint: re_uri::ProxyEndpoint,
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> Result<re_smart_channel::Receiver<LogMsg>, InvalidMessageProxyUrl> {
re_log::debug!("Loading {url} via gRPC…");

let parsed_url = MessageProxyUrl::parse(url)?;
) -> re_smart_channel::Receiver<LogMsg> {
re_log::debug!("Loading {endpoint} via gRPC…");

let url = url.to_owned();
let url = format!("{endpoint}");
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::MessageProxy { url: url.clone() },
re_smart_channel::SmartChannelSource::MessageProxy { url },
);

crate::spawn_future(async move {
if let Err(err) = stream_async(parsed_url, &tx, on_msg).await {
if let Err(err) = stream_async(endpoint, &tx, on_msg).await {
tx.quit(Some(Box::new(err))).ok();
}
});

Ok(rx)
}

/// Represents a URL to a gRPC server.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MessageProxyUrl(String);

impl MessageProxyUrl {
/// Parses as a regular URL, the protocol must be `temp://`, `http://`, or `https://`.
pub fn parse(url: &str) -> Result<Self, InvalidMessageProxyUrl> {
if url.starts_with("http") {
let _ = Url::parse(url).map_err(|err| InvalidMessageProxyUrl {
url: url.to_owned(),
msg: err.to_string(),
})?;

Ok(Self(url.to_owned()))
}
// TODO(#8761): URL prefix
else if let Some(url) = url.strip_prefix("temp") {
let url = format!("http{url}");

let _ = Url::parse(&url).map_err(|err| InvalidMessageProxyUrl {
url: url.clone(),
msg: err.to_string(),
})?;

Ok(Self(url))
} else {
let scheme = url.split_once("://").map(|(a, _)| a).ok_or("unknown");

Err(InvalidMessageProxyUrl {
url: url.to_owned(),
msg: format!("Invalid scheme {scheme:?}, expected {:?}", "temp"),
})
}
}

pub fn to_http(&self) -> String {
self.0.clone()
}
}

impl Display for MessageProxyUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}

impl std::str::FromStr for MessageProxyUrl {
type Err = InvalidMessageProxyUrl;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::parse(s)
}
}

#[derive(Debug, thiserror::Error, PartialEq, Eq)]
#[error("invalid message proxy url {url:?}: {msg}")]
pub struct InvalidMessageProxyUrl {
pub url: String,
pub msg: String,
rx
}

async fn stream_async(
url: MessageProxyUrl,
endpoint: re_uri::ProxyEndpoint,
tx: &re_smart_channel::Sender<LogMsg>,
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> Result<(), StreamError> {
let mut client = {
let url = url.to_http();
let url = endpoint.origin.as_url();

#[cfg(target_arch = "wasm32")]
let tonic_client = {
Expand All @@ -118,7 +52,7 @@ async fn stream_async(
MessageProxyClient::new(tonic_client).max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)
};

re_log::debug!("Streaming messages from gRPC endpoint {url}");
re_log::debug!("Streaming messages from gRPC endpoint {endpoint}");

let mut stream = client
.read_messages(ReadMessagesRequest {})
Expand Down Expand Up @@ -154,36 +88,3 @@ async fn stream_async(

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_url() {
struct Case {
input: &'static str,
expected: &'static str,
}
let cases = [
Case {
input: "temp://127.0.0.1:9876",
expected: "http://127.0.0.1:9876",
},
Case {
input: "http://127.0.0.1:9876",
expected: "http://127.0.0.1:9876",
},
];

let mut failed = false;
for Case { input, expected } in cases {
let actual = MessageProxyUrl::parse(input).map(|v| v.to_http());
if actual != Ok(expected.to_owned()) {
eprintln!("expected {input:?} to parse as {expected:?}, got {actual:?} instead");
failed = true;
}
}
assert!(!failed, "one or more test cases failed");
}
}
Loading

0 comments on commit 811e48b

Please sign in to comment.