Skip to content

Commit

Permalink
iox-#23 Refine subscriber API
Browse files Browse the repository at this point in the history
  • Loading branch information
elBoberido committed Jun 21, 2022
1 parent 54bc6a9 commit 7a17ef5
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 154 deletions.
26 changes: 6 additions & 20 deletions examples/subscriber_multithreaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,25 @@
// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project
// SPDX-FileContributor: Mathias Kraus

use iceoryx_rs::sb::{SampleReceiverWaitState, SubscribeState, TopicBuilder};
use iceoryx_rs::sb::{SampleReceiverWaitState, SubscriberBuilder};
use iceoryx_rs::Runtime;

use std::error::Error;
use std::thread;
use std::time::Duration;

#[repr(C)]
struct CounterTopic {
struct Counter {
counter: u32,
}

fn main() -> Result<(), Box<dyn Error>> {
Runtime::init("subscriber_multithreaded");

let topic = TopicBuilder::<CounterTopic>::new("Radar", "FrontLeft", "Counter")
.queue_capacity(5)
.build();

let (subscriber, sample_receive_token) = topic.subscribe_mt();

let mut has_printed_waiting_for_subscription = false;
while subscriber.subscription_state() != SubscribeState::Subscribed {
if !has_printed_waiting_for_subscription {
println!("waiting for subscription ...");
has_printed_waiting_for_subscription = true;
}
thread::sleep(Duration::from_millis(10));
}

if has_printed_waiting_for_subscription {
println!(" -> subscribed");
}
let (subscriber, sample_receive_token) =
SubscriberBuilder::<Counter>::new("Radar", "FrontLeft", "Counter")
.queue_capacity(5)
.create_mt()?;

let sample_receiver = subscriber.get_sample_receiver(sample_receive_token);

Expand Down
29 changes: 8 additions & 21 deletions examples/subscriber_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,25 @@
// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project
// SPDX-FileContributor: Mathias Kraus

use iceoryx_rs::sb::{SubscribeState, TopicBuilder};
use iceoryx_rs::sb::SubscriberBuilder;
use iceoryx_rs::Runtime;

use std::error::Error;
use std::thread;
use std::time::Duration;

#[repr(C)]
struct CounterTopic {
struct Counter {
counter: u32,
}

fn main() {
fn main() -> Result<(), Box<dyn Error>> {
Runtime::init("subscriber_simple");

let topic = TopicBuilder::<CounterTopic>::new("Radar", "FrontLeft", "Counter")
.queue_capacity(5)
.build();

let (subscriber, sample_receive_token) = topic.subscribe();

let mut has_printed_waiting_for_subscription = false;
while subscriber.subscription_state() != SubscribeState::Subscribed {
if !has_printed_waiting_for_subscription {
println!("waiting for subscription ...");
has_printed_waiting_for_subscription = true;
}
thread::sleep(Duration::from_millis(10));
}

if has_printed_waiting_for_subscription {
println!(" -> subscribed");
}
let (subscriber, sample_receive_token) =
SubscriberBuilder::<Counter>::new("Radar", "FrontLeft", "Counter")
.queue_capacity(5)
.create()?;

let sample_receiver = subscriber.get_sample_receiver(sample_receive_token);

Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum IceOryxError {
ChunkAllocationFailed,
#[error("could not create a publisher")]
PublisherCreationFailed,
#[error("could not create a subscriber")]
SubscriberCreationFailed,
#[error("number of allowed chunks to hold is exhausted")]
TooManyChunksHoldInParallel,
}
9 changes: 5 additions & 4 deletions src/introspection/memory/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project
// SPDX-FileContributor: Mathias Kraus

use crate::sb::{Topic, TopicBuilder};
use crate::sb::{InactiveSubscriber, SubscriberBuilder};

use std::ffi::CStr;
use std::marker::PhantomData;
Expand Down Expand Up @@ -123,11 +123,12 @@ pub struct MemPoolIntrospectionTopic {
}

impl MemPoolIntrospectionTopic {
pub fn new() -> Topic<Self> {
TopicBuilder::<Self>::new("Introspection", "RouDi_ID", "MemPool")
pub fn new() -> InactiveSubscriber<Self> {
SubscriberBuilder::<Self>::new("Introspection", "RouDi_ID", "MemPool")
.queue_capacity(1)
.history_request(1)
.build()
.create_without_subscribe()
.expect("Create subscriber")
}

pub fn memory_segments(&self) -> MemorySegmentContainer {
Expand Down
9 changes: 5 additions & 4 deletions src/introspection/port/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project
// SPDX-FileContributor: Mathias Kraus

use crate::sb::{Topic, TopicBuilder};
use crate::sb::{InactiveSubscriber, SubscriberBuilder};

use std::ffi::CStr;
use std::os::raw::c_char;
Expand Down Expand Up @@ -164,11 +164,12 @@ pub struct PortIntrospectionTopic {
}

impl PortIntrospectionTopic {
pub fn new() -> Topic<Self> {
TopicBuilder::<Self>::new("Introspection", "RouDi_ID", "Port")
pub fn new() -> InactiveSubscriber<Self> {
SubscriberBuilder::<Self>::new("Introspection", "RouDi_ID", "Port")
.queue_capacity(1)
.history_request(1)
.build()
.create_without_subscribe()
.expect("Create subscriber")
}

pub fn subscriber_ports(&self) -> SubscriberPortIntrospectionContainer {
Expand Down
9 changes: 5 additions & 4 deletions src/introspection/process/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project
// SPDX-FileContributor: Mathias Kraus

use crate::sb::{Topic, TopicBuilder};
use crate::sb::{InactiveSubscriber, SubscriberBuilder};

use std::ffi::CStr;
use std::os::raw::c_char;
Expand Down Expand Up @@ -60,11 +60,12 @@ pub struct ProcessIntrospectionTopic {
}

impl ProcessIntrospectionTopic {
pub fn new() -> Topic<Self> {
TopicBuilder::<Self>::new("Introspection", "RouDi_ID", "Process")
pub fn new() -> InactiveSubscriber<Self> {
SubscriberBuilder::<Self>::new("Introspection", "RouDi_ID", "Process")
.queue_capacity(1)
.history_request(1)
.build()
.create_without_subscribe()
.expect("Create subscriber")
}

pub fn processes(&self) -> ProcessIntrospectionContainer {
Expand Down
8 changes: 6 additions & 2 deletions src/sb/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl Subscriber {
instance: &str,
event: &str,
options: &SubscriberOptions,
) -> Box<Self> {
) -> Option<Box<Self>> {
let service = CString::new(service).expect("CString::new failed");
let service = service.as_ptr();
let instance = CString::new(instance).expect("CString::new failed");
Expand Down Expand Up @@ -159,7 +159,11 @@ impl Subscriber {
return new SubscriberPortUser(portData);
});

Box::from_raw(raw)
if raw.is_null() {
None
} else {
Some(Box::from_raw(raw))
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/sb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ mod ffi;
mod sample;
mod subscriber;
mod subscriber_options;
mod topic;

pub use ffi::SubscribeState;
pub use sample::Sample;
pub use sample::SampleReceiverWaitState;
pub use topic::{Topic, TopicBuilder};
pub use subscriber::{InactiveSubscriber, SubscriberBuilder};

use subscriber_options::SubscriberOptions;

Expand Down
121 changes: 115 additions & 6 deletions src/sb/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,130 @@
// SPDX-FileContributor: Mathias Kraus

use super::{
ffi::SubscriberStrongRef, sample::SampleReceiver, topic::SampleReceiverToken, SubscribeState,
Topic,
ffi, ffi::SubscriberStrongRef, sample::SampleReceiver, SubscribeState, SubscriberOptions,
};
use super::{mt, st};
use crate::IceOryxError;

use std::marker::PhantomData;

pub struct SubscriberBuilder<'a, T> {
service: &'a str,
instance: &'a str,
event: &'a str,
options: SubscriberOptions,
phantom: PhantomData<T>,
}

impl<'a, T> SubscriberBuilder<'a, T> {
pub fn new(service: &'a str, instance: &'a str, event: &'a str) -> Self {
Self {
service,
instance,
event,
options: SubscriberOptions::default(),
phantom: PhantomData,
}
}

pub fn queue_capacity(mut self, queue_capacity: u64) -> Self {
self.options.queue_capacity = queue_capacity;
self
}

pub fn history_request(mut self, history_request: u64) -> Self {
self.options.history_request = history_request;
self
}

pub fn node_name(mut self, node_name: String) -> Self {
self.options.node_name = node_name;
self
}

pub fn create(mut self) -> Result<(st::Subscriber<T>, SampleReceiverToken), IceOryxError> {
self.options.subscribe_on_create = true;
let ffi_sub = ffi::Subscriber::new(self.service, self.instance, self.event, &self.options)
.ok_or(IceOryxError::SubscriberCreationFailed)?;

let subscriber = st::Subscriber {
ffi_sub: ffi::SubscriberRc::new(ffi_sub),
phantom: PhantomData,
};

Ok((subscriber, SampleReceiverToken {}))
}

pub fn create_mt(mut self) -> Result<(mt::Subscriber<T>, SampleReceiverToken), IceOryxError> {
self.options.subscribe_on_create = true;
let ffi_sub = ffi::Subscriber::new(self.service, self.instance, self.event, &self.options)
.ok_or(IceOryxError::SubscriberCreationFailed)?;

let subscriber = mt::Subscriber {
ffi_sub: ffi::SubscriberArc::new(ffi_sub),
phantom: PhantomData,
};

Ok((subscriber, SampleReceiverToken {}))
}

pub fn create_without_subscribe(mut self) -> Result<InactiveSubscriber<T>, IceOryxError> {
self.options.subscribe_on_create = false;
let ffi_sub = ffi::Subscriber::new(self.service, self.instance, self.event, &self.options)
.ok_or(IceOryxError::SubscriberCreationFailed)?;

Ok(InactiveSubscriber {
ffi_sub,
phantom: PhantomData,
})
}
}

pub struct SampleReceiverToken {}

pub struct InactiveSubscriber<T> {
ffi_sub: Box<ffi::Subscriber>,
phantom: PhantomData<T>,
}

impl<T> InactiveSubscriber<T> {
fn from_ffi(ffi_sub: Box<ffi::Subscriber>) -> Self {
Self {
ffi_sub,
phantom: PhantomData,
}
}

pub fn subscribe(self) -> (st::Subscriber<T>, SampleReceiverToken) {
self.ffi_sub.subscribe();
(
st::Subscriber::new_from_ffi(self.ffi_sub),
SampleReceiverToken {},
)
}

pub fn subscribe_mt(self) -> (mt::Subscriber<T>, SampleReceiverToken) {
self.ffi_sub.subscribe();
(
mt::Subscriber::new_from_ffi(self.ffi_sub),
SampleReceiverToken {},
)
}

pub fn subscription_state(&self) -> SubscribeState {
self.ffi_sub.subscription_state()
}
}

pub struct Subscriber<T, S: SubscriberStrongRef> {
ffi_sub: S,
phantom: PhantomData<T>,
}

impl<T, S: SubscriberStrongRef> Subscriber<T, S> {
pub(super) fn new(subscriber: Topic<T>) -> Self {
fn new_from_ffi(ffi_sub: Box<ffi::Subscriber>) -> Self {
Subscriber {
ffi_sub: S::new(subscriber.ffi_sub),
ffi_sub: S::new(ffi_sub),
phantom: PhantomData,
}
}
Expand All @@ -34,11 +143,11 @@ impl<T, S: SubscriberStrongRef> Subscriber<T, S> {
self.ffi_sub.as_ref().unset_condition_variable();
}

pub fn unsubscribe(self, sample_receiver: SampleReceiver<T, S>) -> Topic<T> {
pub fn unsubscribe(self, sample_receiver: SampleReceiver<T, S>) -> InactiveSubscriber<T> {
self.ffi_sub.as_ref().unsubscribe();

drop(sample_receiver);

Topic::from_ffi(self.ffi_sub.take())
InactiveSubscriber::from_ffi(self.ffi_sub.take())
}
}
Loading

0 comments on commit 7a17ef5

Please sign in to comment.