diff --git a/examples/subscriber_multithreaded.rs b/examples/subscriber_multithreaded.rs index ebe4722..95c0254 100644 --- a/examples/subscriber_multithreaded.rs +++ b/examples/subscriber_multithreaded.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project // SPDX-FileContributor: Mathias Kraus -use iceoryx_rs::sb::{SampleReceiverWaitState, SubscribeState, TopicBuilder}; +use iceoryx_rs::sb::{SampleReceiverWaitState, SubscriberBuilder}; use iceoryx_rs::Runtime; use std::error::Error; @@ -10,31 +10,17 @@ use std::thread; use std::time::Duration; #[repr(C)] -struct CounterTopic { +struct Counter { counter: u32, } fn main() -> Result<(), Box> { Runtime::init("subscriber_multithreaded"); - let topic = TopicBuilder::::new("Radar", "FrontLeft", "Counter") - .queue_capacity(5) - .build(); - - let (subscriber, sample_receive_token) = topic.subscribe_mt(); - - let mut has_printed_waiting_for_subscription = false; - while subscriber.subscription_state() != SubscribeState::Subscribed { - if !has_printed_waiting_for_subscription { - println!("waiting for subscription ..."); - has_printed_waiting_for_subscription = true; - } - thread::sleep(Duration::from_millis(10)); - } - - if has_printed_waiting_for_subscription { - println!(" -> subscribed"); - } + let (subscriber, sample_receive_token) = + SubscriberBuilder::::new("Radar", "FrontLeft", "Counter") + .queue_capacity(5) + .create_mt()?; let sample_receiver = subscriber.get_sample_receiver(sample_receive_token); diff --git a/examples/subscriber_simple.rs b/examples/subscriber_simple.rs index db80801..7808ffe 100644 --- a/examples/subscriber_simple.rs +++ b/examples/subscriber_simple.rs @@ -2,38 +2,25 @@ // SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project // SPDX-FileContributor: Mathias Kraus -use iceoryx_rs::sb::{SubscribeState, TopicBuilder}; +use iceoryx_rs::sb::SubscriberBuilder; use iceoryx_rs::Runtime; +use std::error::Error; use std::thread; use std::time::Duration; #[repr(C)] -struct CounterTopic { +struct Counter { counter: u32, } -fn main() { +fn main() -> Result<(), Box> { Runtime::init("subscriber_simple"); - let topic = TopicBuilder::::new("Radar", "FrontLeft", "Counter") - .queue_capacity(5) - .build(); - - let (subscriber, sample_receive_token) = topic.subscribe(); - - let mut has_printed_waiting_for_subscription = false; - while subscriber.subscription_state() != SubscribeState::Subscribed { - if !has_printed_waiting_for_subscription { - println!("waiting for subscription ..."); - has_printed_waiting_for_subscription = true; - } - thread::sleep(Duration::from_millis(10)); - } - - if has_printed_waiting_for_subscription { - println!(" -> subscribed"); - } + let (subscriber, sample_receive_token) = + SubscriberBuilder::::new("Radar", "FrontLeft", "Counter") + .queue_capacity(5) + .create()?; let sample_receiver = subscriber.get_sample_receiver(sample_receive_token); diff --git a/src/error.rs b/src/error.rs index 0371154..0004b1d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,6 +10,8 @@ pub enum IceOryxError { ChunkAllocationFailed, #[error("could not create a publisher")] PublisherCreationFailed, + #[error("could not create a subscriber")] + SubscriberCreationFailed, #[error("number of allowed chunks to hold is exhausted")] TooManyChunksHoldInParallel, } diff --git a/src/introspection/memory/ffi.rs b/src/introspection/memory/ffi.rs index 2a408a2..59e7ca2 100644 --- a/src/introspection/memory/ffi.rs +++ b/src/introspection/memory/ffi.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project // SPDX-FileContributor: Mathias Kraus -use crate::sb::{Topic, TopicBuilder}; +use crate::sb::{InactiveSubscriber, SubscriberBuilder}; use std::ffi::CStr; use std::marker::PhantomData; @@ -123,11 +123,12 @@ pub struct MemPoolIntrospectionTopic { } impl MemPoolIntrospectionTopic { - pub fn new() -> Topic { - TopicBuilder::::new("Introspection", "RouDi_ID", "MemPool") + pub fn new() -> InactiveSubscriber { + SubscriberBuilder::::new("Introspection", "RouDi_ID", "MemPool") .queue_capacity(1) .history_request(1) - .build() + .create_without_subscribe() + .expect("Create subscriber") } pub fn memory_segments(&self) -> MemorySegmentContainer { diff --git a/src/introspection/port/ffi.rs b/src/introspection/port/ffi.rs index 4bda462..6ef49ca 100644 --- a/src/introspection/port/ffi.rs +++ b/src/introspection/port/ffi.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project // SPDX-FileContributor: Mathias Kraus -use crate::sb::{Topic, TopicBuilder}; +use crate::sb::{InactiveSubscriber, SubscriberBuilder}; use std::ffi::CStr; use std::os::raw::c_char; @@ -164,11 +164,12 @@ pub struct PortIntrospectionTopic { } impl PortIntrospectionTopic { - pub fn new() -> Topic { - TopicBuilder::::new("Introspection", "RouDi_ID", "Port") + pub fn new() -> InactiveSubscriber { + SubscriberBuilder::::new("Introspection", "RouDi_ID", "Port") .queue_capacity(1) .history_request(1) - .build() + .create_without_subscribe() + .expect("Create subscriber") } pub fn subscriber_ports(&self) -> SubscriberPortIntrospectionContainer { diff --git a/src/introspection/process/ffi.rs b/src/introspection/process/ffi.rs index 198c0e6..104ecc0 100644 --- a/src/introspection/process/ffi.rs +++ b/src/introspection/process/ffi.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project // SPDX-FileContributor: Mathias Kraus -use crate::sb::{Topic, TopicBuilder}; +use crate::sb::{InactiveSubscriber, SubscriberBuilder}; use std::ffi::CStr; use std::os::raw::c_char; @@ -60,11 +60,12 @@ pub struct ProcessIntrospectionTopic { } impl ProcessIntrospectionTopic { - pub fn new() -> Topic { - TopicBuilder::::new("Introspection", "RouDi_ID", "Process") + pub fn new() -> InactiveSubscriber { + SubscriberBuilder::::new("Introspection", "RouDi_ID", "Process") .queue_capacity(1) .history_request(1) - .build() + .create_without_subscribe() + .expect("Create subscriber") } pub fn processes(&self) -> ProcessIntrospectionContainer { diff --git a/src/sb/ffi.rs b/src/sb/ffi.rs index 84af15a..551eb36 100644 --- a/src/sb/ffi.rs +++ b/src/sb/ffi.rs @@ -121,7 +121,7 @@ impl Subscriber { instance: &str, event: &str, options: &SubscriberOptions, - ) -> Box { + ) -> Option> { let service = CString::new(service).expect("CString::new failed"); let service = service.as_ptr(); let instance = CString::new(instance).expect("CString::new failed"); @@ -159,7 +159,11 @@ impl Subscriber { return new SubscriberPortUser(portData); }); - Box::from_raw(raw) + if raw.is_null() { + None + } else { + Some(Box::from_raw(raw)) + } } } diff --git a/src/sb/mod.rs b/src/sb/mod.rs index 2530826..b3eedbb 100644 --- a/src/sb/mod.rs +++ b/src/sb/mod.rs @@ -6,12 +6,11 @@ mod ffi; mod sample; mod subscriber; mod subscriber_options; -mod topic; pub use ffi::SubscribeState; pub use sample::Sample; pub use sample::SampleReceiverWaitState; -pub use topic::{Topic, TopicBuilder}; +pub use subscriber::{InactiveSubscriber, SubscriberBuilder}; use subscriber_options::SubscriberOptions; diff --git a/src/sb/subscriber.rs b/src/sb/subscriber.rs index 4b087e4..356e894 100644 --- a/src/sb/subscriber.rs +++ b/src/sb/subscriber.rs @@ -3,21 +3,130 @@ // SPDX-FileContributor: Mathias Kraus use super::{ - ffi::SubscriberStrongRef, sample::SampleReceiver, topic::SampleReceiverToken, SubscribeState, - Topic, + ffi, ffi::SubscriberStrongRef, sample::SampleReceiver, SubscribeState, SubscriberOptions, }; +use super::{mt, st}; +use crate::IceOryxError; use std::marker::PhantomData; +pub struct SubscriberBuilder<'a, T> { + service: &'a str, + instance: &'a str, + event: &'a str, + options: SubscriberOptions, + phantom: PhantomData, +} + +impl<'a, T> SubscriberBuilder<'a, T> { + pub fn new(service: &'a str, instance: &'a str, event: &'a str) -> Self { + Self { + service, + instance, + event, + options: SubscriberOptions::default(), + phantom: PhantomData, + } + } + + pub fn queue_capacity(mut self, queue_capacity: u64) -> Self { + self.options.queue_capacity = queue_capacity; + self + } + + pub fn history_request(mut self, history_request: u64) -> Self { + self.options.history_request = history_request; + self + } + + pub fn node_name(mut self, node_name: String) -> Self { + self.options.node_name = node_name; + self + } + + pub fn create(mut self) -> Result<(st::Subscriber, SampleReceiverToken), IceOryxError> { + self.options.subscribe_on_create = true; + let ffi_sub = ffi::Subscriber::new(self.service, self.instance, self.event, &self.options) + .ok_or(IceOryxError::SubscriberCreationFailed)?; + + let subscriber = st::Subscriber { + ffi_sub: ffi::SubscriberRc::new(ffi_sub), + phantom: PhantomData, + }; + + Ok((subscriber, SampleReceiverToken {})) + } + + pub fn create_mt(mut self) -> Result<(mt::Subscriber, SampleReceiverToken), IceOryxError> { + self.options.subscribe_on_create = true; + let ffi_sub = ffi::Subscriber::new(self.service, self.instance, self.event, &self.options) + .ok_or(IceOryxError::SubscriberCreationFailed)?; + + let subscriber = mt::Subscriber { + ffi_sub: ffi::SubscriberArc::new(ffi_sub), + phantom: PhantomData, + }; + + Ok((subscriber, SampleReceiverToken {})) + } + + pub fn create_without_subscribe(mut self) -> Result, IceOryxError> { + self.options.subscribe_on_create = false; + let ffi_sub = ffi::Subscriber::new(self.service, self.instance, self.event, &self.options) + .ok_or(IceOryxError::SubscriberCreationFailed)?; + + Ok(InactiveSubscriber { + ffi_sub, + phantom: PhantomData, + }) + } +} + +pub struct SampleReceiverToken {} + +pub struct InactiveSubscriber { + ffi_sub: Box, + phantom: PhantomData, +} + +impl InactiveSubscriber { + fn from_ffi(ffi_sub: Box) -> Self { + Self { + ffi_sub, + phantom: PhantomData, + } + } + + pub fn subscribe(self) -> (st::Subscriber, SampleReceiverToken) { + self.ffi_sub.subscribe(); + ( + st::Subscriber::new_from_ffi(self.ffi_sub), + SampleReceiverToken {}, + ) + } + + pub fn subscribe_mt(self) -> (mt::Subscriber, SampleReceiverToken) { + self.ffi_sub.subscribe(); + ( + mt::Subscriber::new_from_ffi(self.ffi_sub), + SampleReceiverToken {}, + ) + } + + pub fn subscription_state(&self) -> SubscribeState { + self.ffi_sub.subscription_state() + } +} + pub struct Subscriber { ffi_sub: S, phantom: PhantomData, } impl Subscriber { - pub(super) fn new(subscriber: Topic) -> Self { + fn new_from_ffi(ffi_sub: Box) -> Self { Subscriber { - ffi_sub: S::new(subscriber.ffi_sub), + ffi_sub: S::new(ffi_sub), phantom: PhantomData, } } @@ -34,11 +143,11 @@ impl Subscriber { self.ffi_sub.as_ref().unset_condition_variable(); } - pub fn unsubscribe(self, sample_receiver: SampleReceiver) -> Topic { + pub fn unsubscribe(self, sample_receiver: SampleReceiver) -> InactiveSubscriber { self.ffi_sub.as_ref().unsubscribe(); drop(sample_receiver); - Topic::from_ffi(self.ffi_sub.take()) + InactiveSubscriber::from_ffi(self.ffi_sub.take()) } } diff --git a/src/sb/topic.rs b/src/sb/topic.rs deleted file mode 100644 index 3ddbc1a..0000000 --- a/src/sb/topic.rs +++ /dev/null @@ -1,85 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project -// SPDX-FileContributor: Mathias Kraus - -use super::{ffi, ffi::SubscribeState, SubscriberOptions}; -use super::{mt, st}; - -use std::marker::PhantomData; - -pub struct TopicBuilder<'a, T> { - service: &'a str, - instance: &'a str, - event: &'a str, - options: SubscriberOptions, - phantom: PhantomData, -} - -impl<'a, T> TopicBuilder<'a, T> { - pub fn new(service: &'a str, instance: &'a str, event: &'a str) -> Self { - Self { - service, - instance, - event, - options: SubscriberOptions::default(), - phantom: PhantomData, - } - } - - pub fn queue_capacity(mut self, queue_capacity: u64) -> Self { - self.options.queue_capacity = queue_capacity; - self - } - - pub fn history_request(mut self, history_request: u64) -> Self { - self.options.history_request = history_request; - self - } - - pub fn node_name(mut self, node_name: String) -> Self { - self.options.node_name = node_name; - self - } - - pub fn subscribe_on_create(mut self, subscribe_on_create: bool) -> Self { - self.options.subscribe_on_create = subscribe_on_create; - self - } - - pub fn build(self) -> Topic { - Topic { - ffi_sub: ffi::Subscriber::new(self.service, self.instance, self.event, &self.options), - phantom: PhantomData, - } - } -} - -pub struct SampleReceiverToken {} - -pub struct Topic { - pub(super) ffi_sub: Box, - phantom: PhantomData, -} - -impl Topic { - pub(super) fn from_ffi(ffi_sub: Box) -> Self { - Topic { - ffi_sub, - phantom: PhantomData, - } - } - - pub fn subscribe(self) -> (st::Subscriber, SampleReceiverToken) { - self.ffi_sub.subscribe(); - (st::Subscriber::new(self), SampleReceiverToken {}) - } - - pub fn subscribe_mt(self) -> (mt::Subscriber, SampleReceiverToken) { - self.ffi_sub.subscribe(); - (mt::Subscriber::new(self), SampleReceiverToken {}) - } - - pub fn subscription_state(&self) -> SubscribeState { - self.ffi_sub.subscription_state() - } -} diff --git a/src/tests/basic_pub_sub.rs b/src/tests/basic_pub_sub.rs index d90eec1..6a11a66 100644 --- a/src/tests/basic_pub_sub.rs +++ b/src/tests/basic_pub_sub.rs @@ -22,13 +22,13 @@ fn basic_pub_sub() -> Result<()> { Runtime::init("basic_pub_sub"); - let topic = sb::TopicBuilder::::new("Test", "BasicPubSub", "Counter") - .queue_capacity(5) - .build(); + let (subscriber, sample_receive_token) = + sb::SubscriberBuilder::::new("Test", "BasicPubSub", "Counter") + .queue_capacity(5) + .create()?; - let (subscriber, sample_receive_token) = topic.subscribe(); - - let publisher = pb::PublisherBuilder::::new("Test", "BasicPubSub", "Counter").create()?; + let publisher = + pb::PublisherBuilder::::new("Test", "BasicPubSub", "Counter").create()?; let mut sample = publisher.allocate_sample()?;