Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Namespace prefix support #1792

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
532b891
namespaces support
DenisBiryukov91 Feb 12, 2025
be3dbad
do not use internal KE_AT
DenisBiryukov91 Feb 13, 2025
2894b9b
clippy fixes
DenisBiryukov91 Feb 13, 2025
dcb611b
typo fix
DenisBiryukov91 Feb 13, 2025
2bf73c4
code clean up
DenisBiryukov91 Feb 13, 2025
a059e62
namespaces support for advanced pub/sub
DenisBiryukov91 Feb 13, 2025
b3e388f
tests clean up
DenisBiryukov91 Feb 13, 2025
daed38b
use custom namespace stripping function instead of keyexpr::strip_prefix
DenisBiryukov91 Feb 13, 2025
52afd07
lint fixes
DenisBiryukov91 Feb 14, 2025
32d0c34
fix namespace prefix matching
DenisBiryukov91 Feb 14, 2025
2717a99
lint fix
DenisBiryukov91 Feb 14, 2025
35d65b3
fix namespace interaction with advanced pub-sub and add tests
DenisBiryukov91 Feb 14, 2025
0a0ed06
lint fixes
DenisBiryukov91 Feb 14, 2025
a4fa993
move nested part of advanced pub-sub key expressions to the beginning
DenisBiryukov91 Feb 19, 2025
57a23fc
pre declare prefixes for subscriber / queryable
DenisBiryukov91 Feb 20, 2025
016e232
rename SessionInner::optimize_key_expression -> optimize_nonwild_prefix
DenisBiryukov91 Feb 20, 2025
6a32fa9
Merge branch 'main' into namespaces
DenisBiryukov91 Feb 20, 2025
e36fb7b
specify session for QueryInner::empty
DenisBiryukov91 Feb 20, 2025
011babd
add dedicated type for non-wild keyexpr
DenisBiryukov91 Feb 20, 2025
1fd4024
Merge branch 'advanced-pub-sub-keyexpr-permute' into namespaces
DenisBiryukov91 Feb 20, 2025
0d39103
no longer ignore keyexpr starting from @
DenisBiryukov91 Feb 20, 2025
9f2afd2
implement namespaces through dyn trait
DenisBiryukov91 Feb 21, 2025
1295f12
undo unnecessary change
DenisBiryukov91 Feb 21, 2025
91e5a57
make only one downcast attempt
DenisBiryukov91 Feb 24, 2025
302dd5f
lint fix
DenisBiryukov91 Feb 24, 2025
e002c6a
update config documentation for namespace
DenisBiryukov91 Feb 26, 2025
24482ee
Merge branch 'main' into namespace-as-dyn
DenisBiryukov91 Feb 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ pub use zenoh_protocol::core::{
whatami, EndPoint, Locator, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor,
};
use zenoh_protocol::{
core::{key_expr::OwnedKeyExpr, Bits},
core::{
key_expr::{OwnedKeyExpr, OwnedNonWildKeyExpr},
Bits,
},
transport::{BatchSize, TransportSn},
};
use zenoh_result::{bail, zerror, ZResult};
Expand Down Expand Up @@ -632,6 +635,13 @@ validated_struct::validator! {

},

/// Namespace prefix
/// If not None, all outgoing key expressions will be
/// automatically prefixed with specified string,
/// and all incoming key expressions will be stripped
/// of specified prefix
pub namespace: Option<OwnedNonWildKeyExpr>,

/// Configuration of the downsampling.
downsampling: Vec<DownsamplingItemConf>,

Expand Down
186 changes: 185 additions & 1 deletion commons/zenoh-keyexpr/src/key_expr/borrowed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use core::{

use zenoh_result::{bail, Error as ZError, ZResult};

use super::{canon::Canonize, OwnedKeyExpr, FORBIDDEN_CHARS};
use super::{canon::Canonize, OwnedKeyExpr, OwnedNonWildKeyExpr, FORBIDDEN_CHARS};

/// A [`str`] newtype that is statically known to be a valid key expression.
///
Expand Down Expand Up @@ -283,6 +283,87 @@ impl keyexpr {
result
}

/// Remove the specified namespace `prefix` from `self`.
///
/// This method works essentially like [`keyexpr::strip_prefix()`], but returns only the longest possible suffix.
/// Prefix can not contain '*' character.
#[cfg(feature = "internal")]
#[doc(hidden)]
pub fn strip_nonwild_prefix(&self, prefix: &nonwild_keyexpr) -> Option<&keyexpr> {
fn is_chunk_matching(target: &[u8], prefix: &[u8]) -> bool {
let mut target_idx: usize = 0;
let mut prefix_idx: usize = 0;
let mut target_prev: u8 = b'/';

while target_idx < target.len() && prefix_idx < prefix.len() {
if target[target_idx] == b'*' {
if target_prev == b'*' || target_idx + 1 == target.len() {
// either a ** wild chunk or a single * chunk at the end of the string - this matches anything
return true;
} else if target_prev == b'$' {
for i in prefix_idx..prefix.len() - 1 {
if is_chunk_matching(&target[target_idx + 1..], &prefix[i..]) {
return true;
}
}
}
} else if target[target_idx] == prefix[prefix_idx] {
prefix_idx += 1;
} else if target[target_idx] != b'$' {
// non-special character, which do not match the one in prefix
return false;
}
target_prev = target[target_idx];
target_idx += 1;
}
if prefix_idx != prefix.len() {
// prefix was not matched entirely
return false;
}
target_idx == target.len()
|| (target_idx + 2 == target.len() && target[target_idx] == b'$')
}

let target_bytes = self.0.as_bytes();
let prefix_bytes = prefix.0.as_bytes();

let mut target_idx = 0;
let mut prefix_idx = 0;

while target_idx < target_bytes.len() && prefix_idx < prefix_bytes.len() {
let target_end = target_idx
+ target_bytes[target_idx..]
.iter()
.position(|&i| i == b'/')
.unwrap_or(target_bytes.len() - target_idx);
let prefix_end = prefix_idx
+ prefix_bytes[prefix_idx..]
.iter()
.position(|&i| i == b'/')
.unwrap_or(prefix_bytes.len() - prefix_idx);
let target_chunk = &target_bytes[target_idx..target_end];
let prefix_chunk = &prefix_bytes[prefix_idx..prefix_end];
if target_chunk.len() == 2 && target_chunk[0] == b'*' {
// Safety: every chunk of keyexpr is also a valid keyexpr
return unsafe { Some(keyexpr::from_str_unchecked(&self.0[target_idx..])) };
}
if target_end == target_bytes.len() {
// target contains no more chunks than prefix and the last one is non double-wild - so it can not match
return None;
}
if !is_chunk_matching(target_chunk, prefix_chunk) {
return None;
}
if prefix_end == prefix_bytes.len() {
// Safety: every chunk of keyexpr is also a valid keyexpr
return unsafe { Some(keyexpr::from_str_unchecked(&self.0[(target_end + 1)..])) };
}
target_idx = target_end + 1;
prefix_idx = prefix_end + 1;
}
None
}

pub const fn as_str(&self) -> &str {
&self.0
}
Expand Down Expand Up @@ -759,6 +840,66 @@ impl ToOwned for keyexpr {
}
}

/// A keyexpr that is statically known not to contain any wild chunks.
#[allow(non_camel_case_types)]
#[repr(transparent)]
#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Debug)]
pub struct nonwild_keyexpr(keyexpr);

impl nonwild_keyexpr {
/// Attempts to construct a non-wild key expression from anything convertible to keyexpression.
///
/// Will return an Err if `t` isn't a valid key expression.
pub fn new<'a, T, E>(t: &'a T) -> Result<&'a Self, ZError>
where
&'a keyexpr: TryFrom<&'a T, Error = E>,
E: Into<ZError>,
T: ?Sized,
{
let ke: &'a keyexpr = t.try_into().map_err(|e: E| e.into())?;
ke.try_into()
}

/// # Safety
/// This constructs a [`nonwild_keyexpr`] without ensuring that it is a valid key-expression without wild chunks.
///
/// Much like [`core::str::from_utf8_unchecked`], this is memory-safe, but calling this without maintaining
/// [`nonwild_keyexpr`]'s invariants yourself may lead to unexpected behaviors, the Zenoh network dropping your messages.
pub const unsafe fn from_str_unchecked(s: &str) -> &Self {
core::mem::transmute(s)
}
}

impl Deref for nonwild_keyexpr {
type Target = keyexpr;
fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<'a> TryFrom<&'a keyexpr> for &'a nonwild_keyexpr {
type Error = ZError;
fn try_from(value: &'a keyexpr) -> Result<Self, Self::Error> {
if value.is_wild_impl() {
bail!("nonwild_keyexpr can not contain any wild chunks")
}
Ok(unsafe { core::mem::transmute::<&keyexpr, &nonwild_keyexpr>(value) })
}
}

impl Borrow<nonwild_keyexpr> for OwnedNonWildKeyExpr {
fn borrow(&self) -> &nonwild_keyexpr {
self
}
}

impl ToOwned for nonwild_keyexpr {
type Owned = OwnedNonWildKeyExpr;
fn to_owned(&self) -> Self::Owned {
OwnedNonWildKeyExpr::from(self)
}
}

#[test]
fn test_keyexpr_strip_prefix() {
let expectations = [
Expand Down Expand Up @@ -799,3 +940,46 @@ fn test_keyexpr_strip_prefix() {
assert_eq!(ke.strip_prefix(prefix), expected)
}
}

#[test]
fn test_keyexpr_strip_nonwild_prefix() {
let expectations = [
(("demo/example/test/**", "demo/example/test"), Some("**")),
(("demo/example/**", "demo/example/test"), Some("**")),
(("**", "demo/example/test"), Some("**")),
(
("demo/example/test/**/x$*/**", "demo/example/test"),
Some("**/x$*/**"),
),
(("demo/**/xyz", "demo/example/test"), Some("**/xyz")),
(("demo/**/test/**", "demo/example/test"), Some("**/test/**")),
(
("demo/**/ex$*/*/xyz", "demo/example/test"),
Some("**/ex$*/*/xyz"),
),
(
("demo/**/ex$*/t$*/xyz", "demo/example/test"),
Some("**/ex$*/t$*/xyz"),
),
(
("demo/**/te$*/*/xyz", "demo/example/test"),
Some("**/te$*/*/xyz"),
),
(("demo/example/test", "demo/example/test"), None),
(("demo/example/test1/something", "demo/example/test"), None),
(
("demo/example/test$*/something", "demo/example/test"),
Some("something"),
),
]
.map(|((a, b), expected)| {
(
(keyexpr::new(a).unwrap(), nonwild_keyexpr::new(b).unwrap()),
expected.map(|t| keyexpr::new(t).unwrap()),
)
});
for ((ke, prefix), expected) in expectations {
dbg!(ke, prefix);
assert_eq!(ke.strip_nonwild_prefix(prefix), expected)
}
}
2 changes: 1 addition & 1 deletion commons/zenoh-keyexpr/src/key_expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub(crate) const STAR_DSL: &[u8] = b"$*";
pub(crate) const FORBIDDEN_CHARS: [u8; 3] = [b'#', b'?', b'$'];

pub(crate) mod owned;
pub use owned::OwnedKeyExpr;
pub use owned::{OwnedKeyExpr, OwnedNonWildKeyExpr};

pub(crate) mod borrowed;
pub use borrowed::*;
Expand Down
57 changes: 56 additions & 1 deletion commons/zenoh-keyexpr/src/key_expr/owned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use core::{
str::FromStr,
};

use super::{canon::Canonize, keyexpr};
use super::{canon::Canonize, keyexpr, nonwild_keyexpr};

/// A [`Arc<str>`] newtype that is statically known to be a valid key expression.
///
Expand Down Expand Up @@ -165,3 +165,58 @@ impl From<OwnedKeyExpr> for String {
ke.as_str().to_owned()
}
}

/// A [`Arc<str>`] newtype that is statically known to be a valid nonwild key expression.
///
/// See [`nonwild_keyexpr`](super::borrowed::nonwild_keyexpr).
#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Deserialize)]
#[cfg_attr(feature = "std", derive(schemars::JsonSchema))]
#[serde(try_from = "String")]
pub struct OwnedNonWildKeyExpr(pub(crate) Arc<str>);
impl serde::Serialize for OwnedNonWildKeyExpr {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.0.serialize(serializer)
}
}

impl TryFrom<String> for OwnedNonWildKeyExpr {
type Error = zenoh_result::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
let ke = <&keyexpr as TryFrom<&str>>::try_from(value.as_str())?;
<&nonwild_keyexpr as TryFrom<&keyexpr>>::try_from(ke)?;
Ok(Self(value.into()))
}
}
impl<'a> From<&'a nonwild_keyexpr> for OwnedNonWildKeyExpr {
fn from(val: &'a nonwild_keyexpr) -> Self {
OwnedNonWildKeyExpr(Arc::from(val.as_str()))
}
}

impl Deref for OwnedNonWildKeyExpr {
type Target = nonwild_keyexpr;
fn deref(&self) -> &Self::Target {
unsafe { nonwild_keyexpr::from_str_unchecked(&self.0) }
}
}

#[allow(clippy::suspicious_arithmetic_impl)]
impl Div<&keyexpr> for &OwnedNonWildKeyExpr {
type Output = OwnedKeyExpr;
fn div(self, rhs: &keyexpr) -> Self::Output {
let s: String = [self.as_str(), "/", rhs.as_str()].concat();
OwnedKeyExpr::autocanonize(s).unwrap() // Joining 2 key expressions should always result in a canonizable string.
}
}

#[allow(clippy::suspicious_arithmetic_impl)]
impl Div<&nonwild_keyexpr> for &OwnedNonWildKeyExpr {
type Output = OwnedKeyExpr;
fn div(self, rhs: &nonwild_keyexpr) -> Self::Output {
let s: String = [self.as_str(), "/", rhs.as_str()].concat();
s.try_into().unwrap() // Joining 2 non wild key expressions should always result in a non wild string.
}
}
11 changes: 11 additions & 0 deletions commons/zenoh-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,17 @@ pub fn ke(tokens: TokenStream) -> TokenStream {
}
}

/// Equivalent to [`nonwild_keyexpr::new`](zenoh_keyexpr::nonwild_keyexpr::new), but the check is run at compile-time and will throw a compile error in case of failure.
#[proc_macro]
pub fn nonwild_ke(tokens: TokenStream) -> TokenStream {
let value: LitStr = syn::parse(tokens).unwrap();
let ke = value.value();
match zenoh_keyexpr::nonwild_keyexpr::new(&ke) {
Ok(_) => quote!(unsafe { zenoh::key_expr::nonwild_keyexpr::from_str_unchecked(#ke)}).into(),
Err(e) => panic!("{}", e),
}
}

mod zenoh_runtime_derive;
use syn::DeriveInput;
use zenoh_runtime_derive::{derive_generic_runtime_param, derive_register_param};
Expand Down
22 changes: 11 additions & 11 deletions zenoh-ext/src/advanced_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ use zenoh::{
qos::{CongestionControl, Priority},
query::{Queryable, ZenohParameters},
sample::{Locality, Sample, SampleBuilder},
Resolvable, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_AT, KE_STARSTAR,
Resolvable, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_STARSTAR,
};

pub(crate) static KE_UHLC: &keyexpr = ke!("uhlc");
#[zenoh_macros::unstable]
kedefine!(
pub(crate) ke_liveliness: "@adv/${entity:*}/${zid:*}/${eid:*}/${meta:**}/@/${remaining:**}",
pub(crate) ke_liveliness: "${remaining:**}/@adv/${entity:*}/${zid:*}/${eid:*}/${meta:**}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current form a keyexpr is transformed from a/b/c to @foo/prefix/a/b/c.
Would it make sense to simply invert the order to a/b/c/@foo/prefix?
Would it make the whole namespace prefix logic simpler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the comment, In this PR it is indeed supposed to be transformed into a/b/c/@foo/prefix

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. I see the intended behaviour was introduced by #1780 and integrated in this PR. I was momentarily misled by this PR title.

);

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -122,7 +122,7 @@ impl CacheConfig {
pub struct AdvancedCacheBuilder<'a, 'b, 'c> {
session: &'a Session,
pub_key_expr: ZResult<KeyExpr<'b>>,
queryable_prefix: Option<ZResult<KeyExpr<'c>>>,
queryable_suffix: Option<ZResult<KeyExpr<'c>>>,
queryable_origin: Locality,
history: CacheConfig,
liveliness: bool,
Expand All @@ -138,21 +138,21 @@ impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> {
AdvancedCacheBuilder {
session,
pub_key_expr,
queryable_prefix: Some(Ok((KE_ADV_PREFIX / KE_STARSTAR / KE_AT).into())),
queryable_suffix: Some(Ok((KE_ADV_PREFIX / KE_STARSTAR).into())),
queryable_origin: Locality::default(),
history: CacheConfig::default(),
liveliness: false,
}
}

/// Change the prefix used for queryable.
/// Change the suffix used for queryable.
#[zenoh_macros::unstable]
pub fn queryable_prefix<TryIntoKeyExpr>(mut self, queryable_prefix: TryIntoKeyExpr) -> Self
pub fn queryable_suffix<TryIntoKeyExpr>(mut self, queryable_suffix: TryIntoKeyExpr) -> Self
where
TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
{
self.queryable_prefix = Some(queryable_prefix.try_into().map_err(Into::into));
self.queryable_suffix = Some(queryable_suffix.try_into().map_err(Into::into));
self
}

Expand Down Expand Up @@ -220,11 +220,11 @@ impl AdvancedCache {
#[zenoh_macros::unstable]
fn new(conf: AdvancedCacheBuilder<'_, '_, '_>) -> ZResult<AdvancedCache> {
let key_expr = conf.pub_key_expr?.into_owned();
// the queryable_prefix (optional), and the key_expr for AdvancedCache's queryable ("[<queryable_prefix>]/<pub_key_expr>")
let queryable_key_expr = match conf.queryable_prefix {
// the queryable_suffix (optional), and the key_expr for AdvancedCache's queryable ("<pub_key_expr>/[<queryable_suffix>]")
let queryable_key_expr = match conf.queryable_suffix {
None => key_expr.clone(),
Some(Ok(ke)) => (&ke) / &key_expr,
Some(Err(e)) => bail!("Invalid key expression for queryable_prefix: {}", e),
Some(Ok(ke)) => &key_expr / &ke,
Some(Err(e)) => bail!("Invalid key expression for queryable_suffix: {}", e),
};
tracing::debug!(
"Create AdvancedCache on {} with max_samples={:?}",
Expand Down
Loading
Loading