Skip to content

Commit

Permalink
iox-#23 Add support for all publisher and subscriber options from ice…
Browse files Browse the repository at this point in the history
…oryx v2
  • Loading branch information
elBoberido committed Jun 25, 2022
1 parent 8dc7a59 commit d6d1632
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
extern crate cpp;

mod error;
mod queue_policy;
mod runtime;

pub mod introspection;
pub mod pb;
pub mod sb;

// re-export structs
// re-export types
pub use error::IceoryxError;
pub use queue_policy::{ConsumerTooSlowPolicy, QueueFullPolicy};
pub use runtime::Runtime;

#[cfg(test)]
Expand Down
6 changes: 5 additions & 1 deletion src/pb/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cpp! {{

using iox::capro::IdString_t;
using iox::cxx::TruncateToCapacity;
using iox::popo::ConsumerTooSlowPolicy;
using iox::popo::PublisherOptions;
using iox::popo::PublisherPortUser;
using iox::runtime::PoshRuntime;
Expand All @@ -37,19 +38,22 @@ impl Publisher {
let node_name = CString::new(&options.node_name as &str).expect("CString::new failed");
let node_name = node_name.as_ptr();
let offer_on_create = options.offer_on_create;
let subscriber_too_slow_policy = options.subscriber_too_slow_policy as u8;
unsafe {
let raw = cpp!([service as "const char *",
instance as "const char *",
event as "const char *",
history_capacity as "uint64_t",
node_name as "const char *",
offer_on_create as "bool"]
offer_on_create as "bool",
subscriber_too_slow_policy as "uint8_t"]
-> *mut Publisher as "PublisherPortUser*"
{
PublisherOptions options;
options.historyCapacity = history_capacity;
options.nodeName = IdString_t(TruncateToCapacity, node_name);
options.offerOnCreate = offer_on_create;
options.subscriberTooSlowPolicy = static_cast<ConsumerTooSlowPolicy>(subscriber_too_slow_policy);
auto portData = PoshRuntime::getInstance().getMiddlewarePublisher(
{
IdString_t(TruncateToCapacity, service),
Expand Down
9 changes: 9 additions & 0 deletions src/pb/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-FileContributor: Mathias Kraus

use super::{ffi::Publisher as FfiPublisher, sample::SampleMut, PublisherOptions, POD};
use crate::ConsumerTooSlowPolicy;
use crate::IceoryxError;

use std::marker::PhantomData;
Expand Down Expand Up @@ -36,6 +37,14 @@ impl<'a, T: POD> PublisherBuilder<'a, T> {
self
}

pub fn subscriber_too_slow_policy(
mut self,
subscriber_too_slow_policy: ConsumerTooSlowPolicy,
) -> Self {
self.options.subscriber_too_slow_policy = subscriber_too_slow_policy;
self
}

pub fn create(mut self) -> Result<Publisher<T>, IceoryxError> {
self.options.offer_on_create = true;
let ffi_pub = FfiPublisher::new(self.service, self.instance, self.event, &self.options)
Expand Down
4 changes: 4 additions & 0 deletions src/pb/publisher_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project
// SPDX-FileContributor: Mathias Kraus

use crate::ConsumerTooSlowPolicy;

use std::marker::PhantomData;

pub(super) struct PublisherOptions {
pub history_capacity: u64,
pub node_name: String,
pub offer_on_create: bool,
pub subscriber_too_slow_policy: ConsumerTooSlowPolicy,
_phantom: PhantomData<()>,
}

Expand All @@ -17,6 +20,7 @@ impl Default for PublisherOptions {
history_capacity: 0,
node_name: String::new(),
offer_on_create: true,
subscriber_too_slow_policy: ConsumerTooSlowPolicy::DiscardOldestData,
_phantom: PhantomData,
}
}
Expand Down
17 changes: 17 additions & 0 deletions src/queue_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project
// SPDX-FileContributor: Mathias Kraus

#[repr(C)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConsumerTooSlowPolicy {
WaitForConsumer,
DiscardOldestData,
}

#[repr(C)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueueFullPolicy {
BlockProducer,
DiscardOldestData,
}
9 changes: 8 additions & 1 deletion src/sb/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ cpp! {{
using iox::SubscribeState;
using iox::capro::IdString_t;
using iox::cxx::TruncateToCapacity;
using iox::popo::QueueFullPolicy;
using iox::popo::SubscriberOptions;
using iox::popo::SubscriberPortUser;
using iox::runtime::PoshRuntime;
Expand Down Expand Up @@ -133,21 +134,27 @@ impl Subscriber {
let node_name = CString::new(&options.node_name as &str).expect("CString::new failed");
let node_name = node_name.as_ptr();
let subscribe_on_create = options.subscribe_on_create;
let queue_full_policy = options.queue_full_policy as u8;
let requires_publisher_history_support = options.requires_publisher_history_support;
unsafe {
let raw = cpp!([service as "const char *",
instance as "const char *",
event as "const char *",
queue_capacity as "uint64_t",
history_request as "uint64_t",
node_name as "const char *",
subscribe_on_create as "bool"]
subscribe_on_create as "bool",
queue_full_policy as "uint8_t",
requires_publisher_history_support as "bool"]
-> *mut Subscriber as "SubscriberPortUser*"
{
SubscriberOptions options;
options.queueCapacity = queue_capacity;
options.historyRequest = history_request;
options.nodeName = IdString_t(TruncateToCapacity, node_name);
options.subscribeOnCreate = subscribe_on_create;
options.queueFullPolicy = static_cast<QueueFullPolicy>(queue_full_policy);
options.requiresPublisherHistorySupport = requires_publisher_history_support;
auto portData = PoshRuntime::getInstance().getMiddlewareSubscriber(
{
IdString_t(TruncateToCapacity, service),
Expand Down
14 changes: 14 additions & 0 deletions src/sb/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::{
};
use super::{mt, st};
use crate::IceoryxError;
use crate::QueueFullPolicy;

use std::marker::PhantomData;

Expand Down Expand Up @@ -44,6 +45,19 @@ impl<'a, T> SubscriberBuilder<'a, T> {
self
}

pub fn queue_full_policy(mut self, queue_full_policy: QueueFullPolicy) -> Self {
self.options.queue_full_policy = queue_full_policy;
self
}

pub fn requires_publisher_history_support(
mut self,
requires_publisher_history_support: bool,
) -> Self {
self.options.requires_publisher_history_support = requires_publisher_history_support;
self
}

pub fn create(mut self) -> Result<(st::Subscriber<T>, SampleReceiverToken), IceoryxError> {
self.options.subscribe_on_create = true;
let ffi_sub = ffi::Subscriber::new(self.service, self.instance, self.event, &self.options)
Expand Down
6 changes: 6 additions & 0 deletions src/sb/subscriber_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project
// SPDX-FileContributor: Mathias Kraus

use crate::QueueFullPolicy;

use std::marker::PhantomData;

pub(super) struct SubscriberOptions {
pub queue_capacity: u64,
pub history_request: u64,
pub node_name: String,
pub subscribe_on_create: bool,
pub queue_full_policy: QueueFullPolicy,
pub requires_publisher_history_support: bool,
_phantom: PhantomData<()>,
}

Expand All @@ -19,6 +23,8 @@ impl Default for SubscriberOptions {
history_request: 0,
node_name: String::new(),
subscribe_on_create: true,
queue_full_policy: QueueFullPolicy::DiscardOldestData,
requires_publisher_history_support: false,
_phantom: PhantomData,
}
}
Expand Down

0 comments on commit d6d1632

Please sign in to comment.