From d6d1632c3431213e17e66d2454c57d5dab0b9604 Mon Sep 17 00:00:00 2001 From: Mathias Kraus Date: Tue, 21 Jun 2022 23:06:26 +0200 Subject: [PATCH] iox-#23 Add support for all publisher and subscriber options from iceoryx v2 --- src/lib.rs | 4 +++- src/pb/ffi.rs | 6 +++++- src/pb/publisher.rs | 9 +++++++++ src/pb/publisher_options.rs | 4 ++++ src/queue_policy.rs | 17 +++++++++++++++++ src/sb/ffi.rs | 9 ++++++++- src/sb/subscriber.rs | 14 ++++++++++++++ src/sb/subscriber_options.rs | 6 ++++++ 8 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 src/queue_policy.rs diff --git a/src/lib.rs b/src/lib.rs index 3cf1a94..e6a417f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,14 +8,16 @@ extern crate cpp; mod error; +mod queue_policy; mod runtime; pub mod introspection; pub mod pb; pub mod sb; -// re-export structs +// re-export types pub use error::IceoryxError; +pub use queue_policy::{ConsumerTooSlowPolicy, QueueFullPolicy}; pub use runtime::Runtime; #[cfg(test)] diff --git a/src/pb/ffi.rs b/src/pb/ffi.rs index ee7e330..ccb939c 100644 --- a/src/pb/ffi.rs +++ b/src/pb/ffi.rs @@ -13,6 +13,7 @@ cpp! {{ using iox::capro::IdString_t; using iox::cxx::TruncateToCapacity; + using iox::popo::ConsumerTooSlowPolicy; using iox::popo::PublisherOptions; using iox::popo::PublisherPortUser; using iox::runtime::PoshRuntime; @@ -37,19 +38,22 @@ impl Publisher { let node_name = CString::new(&options.node_name as &str).expect("CString::new failed"); let node_name = node_name.as_ptr(); let offer_on_create = options.offer_on_create; + let subscriber_too_slow_policy = options.subscriber_too_slow_policy as u8; unsafe { let raw = cpp!([service as "const char *", instance as "const char *", event as "const char *", history_capacity as "uint64_t", node_name as "const char *", - offer_on_create as "bool"] + offer_on_create as "bool", + subscriber_too_slow_policy as "uint8_t"] -> *mut Publisher as "PublisherPortUser*" { PublisherOptions options; options.historyCapacity = history_capacity; options.nodeName = IdString_t(TruncateToCapacity, node_name); options.offerOnCreate = offer_on_create; + options.subscriberTooSlowPolicy = static_cast(subscriber_too_slow_policy); auto portData = PoshRuntime::getInstance().getMiddlewarePublisher( { IdString_t(TruncateToCapacity, service), diff --git a/src/pb/publisher.rs b/src/pb/publisher.rs index 762b193..73a8239 100644 --- a/src/pb/publisher.rs +++ b/src/pb/publisher.rs @@ -3,6 +3,7 @@ // SPDX-FileContributor: Mathias Kraus use super::{ffi::Publisher as FfiPublisher, sample::SampleMut, PublisherOptions, POD}; +use crate::ConsumerTooSlowPolicy; use crate::IceoryxError; use std::marker::PhantomData; @@ -36,6 +37,14 @@ impl<'a, T: POD> PublisherBuilder<'a, T> { self } + pub fn subscriber_too_slow_policy( + mut self, + subscriber_too_slow_policy: ConsumerTooSlowPolicy, + ) -> Self { + self.options.subscriber_too_slow_policy = subscriber_too_slow_policy; + self + } + pub fn create(mut self) -> Result, IceoryxError> { self.options.offer_on_create = true; let ffi_pub = FfiPublisher::new(self.service, self.instance, self.event, &self.options) diff --git a/src/pb/publisher_options.rs b/src/pb/publisher_options.rs index 58c8407..7f38868 100644 --- a/src/pb/publisher_options.rs +++ b/src/pb/publisher_options.rs @@ -2,12 +2,15 @@ // SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project // SPDX-FileContributor: Mathias Kraus +use crate::ConsumerTooSlowPolicy; + use std::marker::PhantomData; pub(super) struct PublisherOptions { pub history_capacity: u64, pub node_name: String, pub offer_on_create: bool, + pub subscriber_too_slow_policy: ConsumerTooSlowPolicy, _phantom: PhantomData<()>, } @@ -17,6 +20,7 @@ impl Default for PublisherOptions { history_capacity: 0, node_name: String::new(), offer_on_create: true, + subscriber_too_slow_policy: ConsumerTooSlowPolicy::DiscardOldestData, _phantom: PhantomData, } } diff --git a/src/queue_policy.rs b/src/queue_policy.rs new file mode 100644 index 0000000..26b3910 --- /dev/null +++ b/src/queue_policy.rs @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project +// SPDX-FileContributor: Mathias Kraus + +#[repr(C)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConsumerTooSlowPolicy { + WaitForConsumer, + DiscardOldestData, +} + +#[repr(C)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum QueueFullPolicy { + BlockProducer, + DiscardOldestData, +} diff --git a/src/sb/ffi.rs b/src/sb/ffi.rs index 551eb36..b4940f1 100644 --- a/src/sb/ffi.rs +++ b/src/sb/ffi.rs @@ -78,6 +78,7 @@ cpp! {{ using iox::SubscribeState; using iox::capro::IdString_t; using iox::cxx::TruncateToCapacity; + using iox::popo::QueueFullPolicy; using iox::popo::SubscriberOptions; using iox::popo::SubscriberPortUser; using iox::runtime::PoshRuntime; @@ -133,6 +134,8 @@ impl Subscriber { let node_name = CString::new(&options.node_name as &str).expect("CString::new failed"); let node_name = node_name.as_ptr(); let subscribe_on_create = options.subscribe_on_create; + let queue_full_policy = options.queue_full_policy as u8; + let requires_publisher_history_support = options.requires_publisher_history_support; unsafe { let raw = cpp!([service as "const char *", instance as "const char *", @@ -140,7 +143,9 @@ impl Subscriber { queue_capacity as "uint64_t", history_request as "uint64_t", node_name as "const char *", - subscribe_on_create as "bool"] + subscribe_on_create as "bool", + queue_full_policy as "uint8_t", + requires_publisher_history_support as "bool"] -> *mut Subscriber as "SubscriberPortUser*" { SubscriberOptions options; @@ -148,6 +153,8 @@ impl Subscriber { options.historyRequest = history_request; options.nodeName = IdString_t(TruncateToCapacity, node_name); options.subscribeOnCreate = subscribe_on_create; + options.queueFullPolicy = static_cast(queue_full_policy); + options.requiresPublisherHistorySupport = requires_publisher_history_support; auto portData = PoshRuntime::getInstance().getMiddlewareSubscriber( { IdString_t(TruncateToCapacity, service), diff --git a/src/sb/subscriber.rs b/src/sb/subscriber.rs index e803b2f..03744ac 100644 --- a/src/sb/subscriber.rs +++ b/src/sb/subscriber.rs @@ -7,6 +7,7 @@ use super::{ }; use super::{mt, st}; use crate::IceoryxError; +use crate::QueueFullPolicy; use std::marker::PhantomData; @@ -44,6 +45,19 @@ impl<'a, T> SubscriberBuilder<'a, T> { self } + pub fn queue_full_policy(mut self, queue_full_policy: QueueFullPolicy) -> Self { + self.options.queue_full_policy = queue_full_policy; + self + } + + pub fn requires_publisher_history_support( + mut self, + requires_publisher_history_support: bool, + ) -> Self { + self.options.requires_publisher_history_support = requires_publisher_history_support; + self + } + pub fn create(mut self) -> Result<(st::Subscriber, SampleReceiverToken), IceoryxError> { self.options.subscribe_on_create = true; let ffi_sub = ffi::Subscriber::new(self.service, self.instance, self.event, &self.options) diff --git a/src/sb/subscriber_options.rs b/src/sb/subscriber_options.rs index ce914c4..e1e84fe 100644 --- a/src/sb/subscriber_options.rs +++ b/src/sb/subscriber_options.rs @@ -2,6 +2,8 @@ // SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project // SPDX-FileContributor: Mathias Kraus +use crate::QueueFullPolicy; + use std::marker::PhantomData; pub(super) struct SubscriberOptions { @@ -9,6 +11,8 @@ pub(super) struct SubscriberOptions { pub history_request: u64, pub node_name: String, pub subscribe_on_create: bool, + pub queue_full_policy: QueueFullPolicy, + pub requires_publisher_history_support: bool, _phantom: PhantomData<()>, } @@ -19,6 +23,8 @@ impl Default for SubscriberOptions { history_request: 0, node_name: String::new(), subscribe_on_create: true, + queue_full_policy: QueueFullPolicy::DiscardOldestData, + requires_publisher_history_support: false, _phantom: PhantomData, } }