diff --git a/Cargo.lock b/Cargo.lock index a111a92bca..c9a7bb3e50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5785,10 +5785,8 @@ name = "zenoh-transport" version = "1.2.1" dependencies = [ "async-trait", - "crossbeam-utils", "flume", "futures", - "futures-util", "lazy_static", "lz4_flex", "paste", diff --git a/Cargo.toml b/Cargo.toml index 2b93d472d6..3b81950a16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,7 +92,6 @@ console-subscriber = "0.4.0" const_format = "0.2.33" crc = "3.2.1" criterion = "0.5" -crossbeam-utils = "0.8.20" crossbeam-queue = "0.3.12" derive_more = { version = "1.0.0", features = ["as_ref"] } derive-new = "0.7.0" diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index fa95f16d6e..2c58dd3557 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -471,14 +471,14 @@ /// then amount of memory being allocated for each queue is SIZE_XXX * LINK_MTU. /// If qos is false, then only the DATA priority will be allocated. size: { - control: 2, - real_time: 2, - interactive_high: 2, - interactive_low: 2, + control: 1, + real_time: 1, + interactive_high: 1, + interactive_low: 1, data_high: 2, - data: 2, - data_low: 2, - background: 2, + data: 4, + data_low: 4, + background: 4, }, /// Congestion occurs when the queue is empty (no available batch). congestion_control: { @@ -506,12 +506,6 @@ /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. time_limit: 1, }, - allocation: { - /// Mode for memory allocation of batches in the priority queues. - /// - "init": batches are allocated at queue initialization time. - /// - "lazy": batches are allocated when needed up to the maximum number of batches configured in the size configuration parameter. - mode: "lazy", - }, }, }, /// Configure the zenoh RX parameters of a link diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index fccee57a24..9b06afd0ae 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -274,14 +274,14 @@ impl QueueSizeConf { impl Default for QueueSizeConf { fn default() -> Self { Self { - control: 2, - real_time: 2, - interactive_low: 2, - interactive_high: 2, + control: 1, + real_time: 1, + interactive_low: 1, + interactive_high: 1, data_high: 2, - data: 2, + data: 4, data_low: 2, - background: 2, + background: 1, } } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index aeeebc065d..2594f54085 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -508,13 +508,6 @@ validated_struct::validator! { /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. time_limit: u64, }, - /// Perform lazy memory allocation of batches in the prioritiey queues. If set to false all batches are initialized at - /// initialization time. If set to true the batches will be allocated when needed up to the maximum number of batches - /// configured in the size configuration parameter. - pub allocation: #[derive(Default, Copy, PartialEq, Eq)] - QueueAllocConf { - pub mode: QueueAllocMode, - }, }, // Number of threads used for TX threads: usize, @@ -659,14 +652,6 @@ validated_struct::validator! { } } -#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum QueueAllocMode { - Init, - #[default] - Lazy, -} - #[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "snake_case")] pub enum ShmInitMode { diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 23dc24b433..a1b505de51 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -44,7 +44,7 @@ transport_ws = ["zenoh-link/transport_ws"] transport_serial = ["zenoh-link/transport_serial"] transport_compression = [] transport_unixpipe = ["zenoh-link/transport_unixpipe"] -transport_vsock= ["zenoh-link/transport_vsock"] +transport_vsock = ["zenoh-link/transport_vsock"] stats = ["zenoh-protocol/stats"] test = [] unstable = [] @@ -52,20 +52,20 @@ default = ["test", "transport_multilink"] [dependencies] async-trait = { workspace = true } -crossbeam-utils = { workspace = true } tokio = { workspace = true, features = [ - "sync", - "fs", - "time", - "macros", - "rt-multi-thread", - "io-util", - "net", + "sync", + "fs", + "time", + "macros", + "rt-multi-thread", + "io-util", + "net", ] } lazy_static = { workspace = true } -tokio-util = { workspace = true, features = ["rt"]} +tokio-util = { workspace = true, features = ["rt"] } flume = { workspace = true } -tracing = {workspace = true} +futures = { workspace = true } +tracing = { workspace = true } lz4_flex = { workspace = true } paste = { workspace = true } rand = { workspace = true, features = ["default"] } @@ -90,8 +90,6 @@ zenoh-task = { workspace = true } [dev-dependencies] -futures-util = { workspace = true } -zenoh-util = {workspace = true } +zenoh-util = { workspace = true } zenoh-protocol = { workspace = true, features = ["test"] } -futures = { workspace = true } zenoh-link-commons = { workspace = true } diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 0a0a41cf91..682d4c2682 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -202,7 +202,7 @@ pub struct WBatch { #[cfg(feature = "stats")] pub stats: WBatchStats, // an ephemeral batch will not be recycled in the pipeline - // it can be used to push a stop fragment when no batch are available + // it can be used to push a drop fragment when no batch are available pub ephemeral: bool, } @@ -212,9 +212,9 @@ impl WBatch { buffer: BBuf::with_capacity(config.mtu as usize), codec: Zenoh080Batch::new(), config, - ephemeral: false, #[cfg(feature = "stats")] stats: WBatchStats::default(), + ephemeral: false, }; // Bring the batch in a clear state diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 8448406fda..700c127237 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -12,16 +12,18 @@ // ZettaScale Zenoh Team, // use std::{ - fmt, - ops::Add, + cell::UnsafeCell, + cmp::min, + fmt, mem, + mem::MaybeUninit, sync::{ - atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering}, + atomic::{AtomicU8, Ordering}, Arc, Mutex, MutexGuard, }, time::{Duration, Instant}, }; -use crossbeam_utils::CachePadded; +use futures::future::OptionFuture; use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter}; use zenoh_buffers::{ reader::{HasReader, Reader}, @@ -29,7 +31,7 @@ use zenoh_buffers::{ ZBuf, }; use zenoh_codec::{transport::batch::BatchError, WCodec, Zenoh080}; -use zenoh_config::{QueueAllocConf, QueueAllocMode, QueueSizeConf}; +use zenoh_config::QueueSizeConf; use zenoh_core::zlock; use zenoh_protocol::{ core::Priority, @@ -38,27 +40,19 @@ use zenoh_protocol::{ fragment, fragment::FragmentHeader, frame::{self, FrameHeader}, - AtomicBatchSize, BatchSize, TransportMessage, + BatchSize, TransportMessage, }, }; -use zenoh_sync::{event, Notifier, WaitDeadlineError, Waiter}; +use zenoh_sync::{event, Notifier, WaitDeadlineError, WaitError, Waiter}; use super::{ batch::{Encode, WBatch}, - priority::{TransportChannelTx, TransportPriorityTx}, + priority::TransportPriorityTx, }; use crate::common::batch::BatchConfig; const RBLEN: usize = QueueSizeConf::MAX; -// Inner structure to reuse serialization batches -struct StageInRefill { - n_ref_r: Waiter, - s_ref_r: RingBufferReader, - batch_config: (usize, BatchConfig), - batch_allocs: usize, -} - #[derive(Debug)] pub(crate) struct TransportClosed; impl fmt::Display for TransportClosed { @@ -68,269 +62,405 @@ impl fmt::Display for TransportClosed { } impl std::error::Error for TransportClosed {} -impl StageInRefill { - fn pull(&mut self) -> Option { - match self.s_ref_r.pull() { - Some(b) => Some(b), - None if self.batch_allocs < self.batch_config.0 => { - self.batch_allocs += 1; - Some(WBatch::new(self.batch_config.1)) - } - None => None, - } - } +/// Batch pool from which you can acquire batches and refill them after use. +/// +/// It is initialized with a maximum batch count; the count is decreased when +/// batches are acquired, and increased when they are refilled. +/// Moreover, the pull contains a single pre-allocated batch, that is taken +/// when a batch is acquired, and refilled later. +/// +/// The pool carries two flags that are set depending on the situation: +/// - congestion: if there is no available batch, and a deadline has been reached, +/// the flag is set; it will be unset whenever a batch is refilled. +/// - batching: if there is more than one in-flight batch, the flag is set as it +/// means the network is not sending batches quickly enough, and the pipeline +/// should batch messages more; the flag is unset whenever the pool is completely +/// refilled, or manually after reaching a batching duration limit. +/// +struct BatchPool { + count: u8, + state: AtomicU8, + refill: UnsafeCell>, +} - fn wait(&self) -> bool { - self.n_ref_r.wait().is_ok() +// SAFETY: `refill` cell is properly synchronized using the atomic state. +// See `BatchPool::try_acquire`/`BatchPool::refill`. +unsafe impl Send for BatchPool {} +// SAFETY: `refill` cell is properly synchronized using the atomic state. +// See `BatchPool::try_acquire`/`BatchPool::refill`. +unsafe impl Sync for BatchPool {} + +impl BatchPool { + const COUNT_MASK: u8 = (1 << 5) - 1; + const REFILLED_FLAG: u8 = 1 << 5; + const BATCHING_FLAG: u8 = 1 << 6; + const CONGESTED_FLAG: u8 = 1 << 7; + + /// Initializes the batch pool with a given maximum count of batches. + fn new(max_count: usize) -> Self { + Self { + count: max_count as u8, + state: AtomicU8::new(max_count as u8), + refill: UnsafeCell::new(MaybeUninit::uninit()), + } } - fn wait_deadline(&self, instant: Instant) -> Result { - match self.n_ref_r.wait_deadline(instant) { - Ok(()) => Ok(true), - Err(WaitDeadlineError::Deadline) => Ok(false), - Err(WaitDeadlineError::WaitError) => Err(TransportClosed), + /// Tries to acquire a batch, returning it if there is one available. + /// + /// It will try to reuse the refilled batch if there is one, and if it + /// has the requested capacity, otherwise, a new one is allocated. + /// + /// The pool will enter in batching mode if there is more than one + /// in-flight batch. + /// If no batch is available and `set_congested` is `true`, then + /// the congested flag will be set. + /// + /// # Safety + /// + /// This method must not be called concurrently, as a single thread can acquire + /// the refilled batch at a time. + unsafe fn try_acquire(&self, batch_config: BatchConfig, set_congested: bool) -> Option { + let mut state = self.state.load(Ordering::Acquire); + let mut batch = None; + loop { + // If there is no batch available (the count is 0), if `set_congested` is true, + // try setting the congested flag and return `None` after; otherwise, return + // `None` directly. + // It may be tempting to use a `fetch_and_or` to set the flag, but it could + // happen concurrently with a batch refill, so a CAS is more correct. + while state & Self::COUNT_MASK == 0 { + if !set_congested { + return None; + } + match self.state.compare_exchange_weak( + state, + state | Self::CONGESTED_FLAG, + Ordering::Relaxed, + Ordering::Acquire, + ) { + Ok(_) => return None, + Err(s) => state = s, + } + } + let mut next_state = state - 1; + if state & Self::COUNT_MASK < self.count { + next_state |= Self::BATCHING_FLAG; + } + if state & Self::REFILLED_FLAG != 0 { + if batch.is_none() { + // SAFETY: State has "refilled" flag set, and has been loaded with + // acquire ordering. As the flag is stored with `released` ordering + // only after writing to the cell, there is happens-before relation + // between this read and the previous write, so the cell is safe to + // access. Also, the function contract guarantees there is no + // concurrent read. + // It is also not possible for the flag to change if the following + // CAS fails, so there is no need to discard the read batch in this + // case. + // The flag must be unset with release ordering, to prevent the read + // of the cell to be reordered after the CAS, ensuring the cell can + // be safely modified as soon as the state is read with the flag + // unset. + batch = Some(unsafe { (*self.refill.get()).assume_init_read() }); + } + next_state &= !Self::REFILLED_FLAG; + } + match self.state.compare_exchange_weak( + state, + next_state, + Ordering::Release, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } } + Some(match batch { + Some(b) if b.buffer.capacity() >= batch_config.mtu as usize => b, + _ => WBatch::new(batch_config), + }) } -} - -lazy_static::lazy_static! { - static ref LOCAL_EPOCH: Instant = Instant::now(); -} - -type AtomicMicroSeconds = AtomicU32; -type MicroSeconds = u32; -struct AtomicBackoff { - active: CachePadded, - bytes: CachePadded, - first_write: CachePadded, -} - -// Inner structure to link the initial stage with the final stage of the pipeline -struct StageInOut { - n_out_w: Notifier, - s_out_w: RingBufferWriter, - atomic_backoff: Arc, -} - -impl StageInOut { - #[inline] - fn notify(&self, bytes: BatchSize) { - self.atomic_backoff.bytes.store(bytes, Ordering::Relaxed); - if !self.atomic_backoff.active.load(Ordering::Relaxed) { - let _ = self.n_out_w.notify(); + /// Refill the batch, making it available on the other side. + /// + /// The first batch to be refilled will be put in the pre-allocated slot + /// for later reused. If the slot is already full, then the batch is + /// discarded. + /// + /// The batching mode is stopped when the pool is completely refilled. + /// + /// # Safety + /// + /// This method must not be called concurrently, as a single thread can refill a batch + /// at a time. + unsafe fn refill(&self, batch: WBatch) { + let mut batch = Some(batch); + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Increment the count by one and unset the congestion flag. + let mut next_state = (state + 1) & !Self::CONGESTED_FLAG; + // If the batch count is back to the maximum, unset the batching flag. + if next_state & Self::COUNT_MASK == self.count { + next_state &= !Self::BATCHING_FLAG; + } + // If there is no batch in the refill cell, put the refilled batch in. + if state & Self::REFILLED_FLAG == 0 { + if let Some(mut batch) = batch.take() { + // Clear the batch before reusing it. + batch.clear(); + // SAFETY: State has "refilled" flag unset. As the flag is only + // unset after reading the cell, there should be no concurrent + // read, and the cell is safe to write. Also, the function + // contract guarantees there is no concurrent write. + // It is also not possible for the flag to change if the + // following CAS fails, so once the batch has been written, it + // will stay in the cell until the CAS succeeds. + unsafe { (*self.refill.get()).write(batch) }; + } + next_state |= Self::REFILLED_FLAG; + } + match self.state.compare_exchange_weak( + state, + next_state, + Ordering::Release, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(s) => state = s, + } } } - #[inline] - fn move_batch(&mut self, batch: WBatch) { - let _ = self.s_out_w.push(batch); - self.atomic_backoff.bytes.store(0, Ordering::Relaxed); - let _ = self.n_out_w.notify(); + /// Returns if the congested flag has been set, meaning no batch is available. + /// The congested flag should usually be set after waiting a bit. + fn is_congested(&self) -> bool { + self.state.load(Ordering::Relaxed) & Self::CONGESTED_FLAG != 0 } -} -// Inner structure containing mutexes for current serialization batch and SNs -struct StageInMutex { - current: Arc>>, - priority: TransportPriorityTx, -} + /// Returns if the pool is in batching mode, meaning the network is not sending batches + /// quickly enough, and the pipeline should batch messages more. + fn is_batching(&self) -> bool { + self.state.load(Ordering::Relaxed) & Self::BATCHING_FLAG != 0 + } -impl StageInMutex { - #[inline] - fn current(&self) -> MutexGuard<'_, Option> { - zlock!(self.current) + /// Force stopping the batching mode, usually because a batching limit has been reached. + fn stop_batching(&self) { + self.state + .fetch_and(!Self::BATCHING_FLAG, Ordering::Relaxed); } +} - #[inline] - fn channel(&self, is_reliable: bool) -> MutexGuard<'_, TransportChannelTx> { - if is_reliable { - zlock!(self.priority.reliable) - } else { - zlock!(self.priority.best_effort) +impl Drop for BatchPool { + fn drop(&mut self) { + if *self.state.get_mut() & Self::REFILLED_FLAG != 0 { + // SAFETY: The flag has been set, so the batch has been set. + unsafe { self.refill.get_mut().assume_init_drop() } } } } #[derive(Debug)] -struct WaitTime { +struct Deadline { wait_time: Duration, max_wait_time: Option, + expiration: Option, } -impl WaitTime { +impl Deadline { fn new(wait_time: Duration, max_wait_time: Option) -> Self { Self { wait_time, max_wait_time, + expiration: None, } } - fn advance(&mut self, instant: &mut Instant) { - match &mut self.max_wait_time { - Some(max_wait_time) => { - if let Some(new_max_wait_time) = max_wait_time.checked_sub(self.wait_time) { - *instant += self.wait_time; - *max_wait_time = new_max_wait_time; - self.wait_time *= 2; - } - } - None => { - *instant += self.wait_time; - } + fn get_expiration(&mut self) -> Option { + if self.expiration.is_none() && self.wait_time > Duration::ZERO { + self.expiration = Some(Instant::now() + self.wait_time); } + self.expiration } - fn wait_time(&self) -> Duration { - self.wait_time + fn renew(&mut self) { + if let Some(expiration) = self.get_expiration() { + self.expiration = Some(expiration + self.wait_time); + if let Some(max) = self.max_wait_time { + self.max_wait_time = max.checked_sub(self.wait_time); + self.wait_time *= 2; + } + } } } -#[derive(Clone)] -enum DeadlineSetting { - Immediate, - Finite(Instant), +/// Shared structures of the stage-in, where we put all the mutexes. +/// It allows not messing with the borrow checker when we use a mutable +/// reference on the `StageIn` struct. +struct StageInShared { + /// Mutex protected stage-in, only one message can be written at a time. + queue: Mutex, + /// Current batch on which the pipeline is writing. + /// It will be taken by the tx task if no batch has been pushed. + current: Arc>>, + /// Sequence number generator. + chan_tx: TransportPriorityTx, + /// Batch pool, used to check congestion. + batch_pool: Arc, } -struct LazyDeadline { - deadline: Option, - wait_time: WaitTime, +// This is the initial stage of the pipeline where messages are serialized on +struct StageIn { + /// Batch queue. + batch_tx: RingBufferWriter, + /// Notifier used when batched are pushed, or when current batch has been written. + batch_notifier: Notifier, + /// Batch pool, to acquire available batches. + batch_pool: Arc, + /// Waiter for batch refilling. + refill_waiter: Waiter, + /// Indicates if small batches can be used. Everytime a batch is pushed with a smaller + /// length than the configured small capacity, the next batch will be allocated with + /// the small capacity. It reduces the memory overhead compared to always using maximum + /// size batches. + use_small_batch: bool, + /// Fragmentation buffer. + fragbuf: ZBuf, + /// If batching is enabled. + batching: bool, + /// Batch config, used to allocate new batches. + batch_config: BatchConfig, } -impl LazyDeadline { - fn new(wait_time: WaitTime) -> Self { - Self { - deadline: None, - wait_time, +impl StageIn { + /// Small size used when full batch size is not needed. Just a constant for now. + const SMALL_BATCH_SIZE: BatchSize = 1 << 11; + + /// Generate a config to allocate a new batch, using the configured small size + /// when previous batch didn't need more. + fn new_batch_config(&self) -> BatchConfig { + BatchConfig { + mtu: if self.use_small_batch { + min(self.batch_config.mtu, Self::SMALL_BATCH_SIZE) + } else { + self.batch_config.mtu + }, + ..self.batch_config } } - fn advance(&mut self) { - match self.deadline().to_owned() { - DeadlineSetting::Immediate => {} - DeadlineSetting::Finite(mut instant) => { - self.wait_time.advance(&mut instant); - self.deadline = Some(DeadlineSetting::Finite(instant)); + /// Retrieve the current batch, or allocate a new one if there is a slot available. + fn get_batch( + &mut self, + current: &mut MutexGuard<'_, Option>, + mut deadline: Option<&mut Deadline>, + ) -> Result, TransportClosed> { + // retrieve current batch if any + if let Some(batch) = current.take() { + return Ok(Some(batch)); + } + loop { + // try to acquire an available batch + if let Some(batch) = + // SAFETY: there is one batch pool per stage-in/stage-out pair, and batch + // is acquired behind an exclusive reference to stage-in, so they cannot + // be concurrent calls. + unsafe { self.batch_pool.try_acquire(self.new_batch_config(), false) } + { + return Ok(Some(batch)); + } + // otherwise, wait until one is available + if self.batch_pool.is_congested() { + return Ok(None); } + match deadline.as_mut().map(|d| d.get_expiration()) { + Some(Some(deadline)) => match self.refill_waiter.wait_deadline(deadline) { + Ok(..) => continue, + Err(WaitDeadlineError::Deadline) => break, + Err(WaitDeadlineError::WaitError) => return Err(TransportClosed), + }, + Some(None) => break, + None => match self.refill_waiter.wait() { + Ok(..) => continue, + Err(WaitError) => return Err(TransportClosed), + }, + } + } + // the deadline has been exceeded, try a last time, setting the congested flag + // if there is still no batch + if let Some(batch) = unsafe { self.batch_pool.try_acquire(self.new_batch_config(), true) } { + return Ok(Some(batch)); } + Ok(None) } - #[inline] - fn deadline(&mut self) -> &mut DeadlineSetting { - self.deadline - .get_or_insert_with(|| match self.wait_time.wait_time() { - Duration::ZERO => DeadlineSetting::Immediate, - nonzero_wait_time => DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time)), - }) + /// Pushes a batches to the tx task, notifying it. + fn push_batch(&mut self, batch: WBatch) { + self.batch_tx.push(batch); + let _ = self.batch_notifier.notify(); } -} -struct Deadline { - lazy_deadline: LazyDeadline, -} - -impl Deadline { - fn new(wait_time: Duration, max_wait_time: Option) -> Self { - Self { - lazy_deadline: LazyDeadline::new(WaitTime::new(wait_time, max_wait_time)), + /// Pushes a batch to the TX task if batching is disabled or ignored + /// (e.g. express messages), or reuse the batch for the next message. + /// + /// If the batch is small, the next batches will be allocated with a small capacity too. + fn push_or_reuse_batch( + &mut self, + mut current: MutexGuard<'_, Option>, + batch: WBatch, + force: bool, + ) { + if batch.len() <= Self::SMALL_BATCH_SIZE { + self.use_small_batch = true; } - } - - #[inline] - fn wait(&mut self, s_ref: &StageInRefill) -> Result { - match self.lazy_deadline.deadline() { - DeadlineSetting::Immediate => Ok(false), - DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant), + if !self.batching || force { + drop(current); + self.push_batch(batch); + } else { + *current = Some(batch); + drop(current); + // Notify the tx task only if not in batching mode, to not disturb the backoff. + if !self.batch_pool.is_batching() { + let _ = self.batch_notifier.notify(); + } } } - fn on_next_fragment(&mut self) { - self.lazy_deadline.advance(); - } -} - -// This is the initial stage of the pipeline where messages are serliazed on -struct StageIn { - s_ref: StageInRefill, - s_out: StageInOut, - mutex: StageInMutex, - fragbuf: ZBuf, - batching: bool, - // used for stop fragment - batch_config: BatchConfig, -} - -impl StageIn { + /// Pushes a message in the pipeline. + /// + /// It will get a write batch, serialize the message inside, and then push + /// or reuse the batch for the following messages. fn push_network_message( &mut self, + shared: &StageInShared, msg: &NetworkMessage, priority: Priority, deadline: &mut Deadline, ) -> Result { // Lock the current serialization batch. - let mut c_guard = self.mutex.current(); + let mut current = zlock!(shared.current); - macro_rules! zgetbatch_rets { - ($($restore_sn:stmt)?) => { - loop { - match c_guard.take() { - Some(batch) => break batch, - None => match self.s_ref.pull() { - Some(mut batch) => { - batch.clear(); - self.s_out.atomic_backoff.first_write.store( - LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, - Ordering::Relaxed, - ); - break batch; - } - None => { - // Wait for an available batch until deadline - if !deadline.wait(&self.s_ref)? { - // Still no available batch. - // Restore the sequence number and drop the message - $($restore_sn)? - tracing::trace!( - "Zenoh message dropped because it's over the deadline {:?}: {:?}", - deadline.lazy_deadline.wait_time, msg - ); - return Ok(false); - } - } - }, - } - } - }; - } - - macro_rules! zretok { - ($batch:expr, $msg:expr) => {{ - if !self.batching || $msg.is_express() { - // Move out existing batch - self.s_out.move_batch($batch); - return Ok(true); - } else { - let bytes = $batch.len(); - *c_guard = Some($batch); - drop(c_guard); - self.s_out.notify(bytes); - return Ok(true); - } - }}; - } - - // Get the current serialization batch. - let mut batch = zgetbatch_rets!(); - // Attempt the serialization on the current batch - let e = match batch.encode(msg) { - Ok(_) => zretok!(batch, msg), - Err(e) => e, + // Attempt the serialization on the current batch. + let Some(mut batch) = self.get_batch(&mut current, Some(deadline))? else { + return Ok(false); + }; + let need_new_frame = match batch.encode(msg) { + Ok(_) => { + self.push_or_reuse_batch(current, batch, msg.is_express()); + return Ok(true); + } + Err(BatchError::NewFrame) => true, + Err(_) => false, }; // Lock the channel. We are the only one that will be writing on it. - let mut tch = self.mutex.channel(msg.is_reliable()); + let tch = if msg.is_reliable() { + &shared.chan_tx.reliable + } else { + &shared.chan_tx.best_effort + }; + let mut tch = zlock!(tch); - // Retrieve the next SN + // Retrieve the next SN. let sn = tch.sn.get(); // The Frame @@ -340,33 +470,49 @@ impl StageIn { ext_qos: frame::ext::QoSType::new(priority), }; - if let BatchError::NewFrame = e { - // Attempt a serialization with a new frame - if batch.encode((msg, &frame)).is_ok() { - zretok!(batch, msg); - } + // Attempt a serialization with a new frame. + if need_new_frame && batch.encode((msg, &frame)).is_ok() { + self.push_or_reuse_batch(current, batch, msg.is_express()); + return Ok(true); } + // Attempt a second serialization on fully empty batch (do not use small batch) + self.use_small_batch = false; + let mut try_serialize_on_empty_batch = true; if !batch.is_empty() { - // Move out existing batch - self.s_out.move_batch(batch); - batch = zgetbatch_rets!(tch.sn.set(sn).unwrap()); + self.push_batch(batch); + match self.get_batch(&mut current, Some(deadline))? { + Some(b) => batch = b, + None => { + tch.sn.set(sn).unwrap(); + return Ok(false); + } + } + // If the batch was already empty, and it was a small batch, then we + // replace it with a full-capacity batch. + // We may be tempted to simply push the batch and get a new one, to reuse + // the same workflow as above, but empty batches mess up unix pipes. + } else if batch.buffer.capacity() < self.batch_config.mtu as usize { + batch = WBatch::new(self.new_batch_config()); + // Otherwise, it means that the message doesn't fit into a full-capacity + // batch and must be fragmented directly. + } else { + try_serialize_on_empty_batch = false; } - - // Attempt a second serialization on fully empty batch - if batch.encode((msg, &frame)).is_ok() { - zretok!(batch, msg); + if try_serialize_on_empty_batch && batch.encode((msg, &frame)).is_ok() { + self.push_or_reuse_batch(current, batch, msg.is_express()); + return Ok(true); } - // The second serialization attempt has failed. This means that the message is - // too large for the current batch size: we need to fragment. + // Attempt to serialize on empty batch has failed. This means that the message + // is too large for the current batch size: we need to fragment. // Reinsert the current batch for fragmentation. - *c_guard = Some(batch); + *current = Some(batch); // Take the expandable buffer and serialize the totality of the message - self.fragbuf.clear(); + let mut fragbuf = mem::take(&mut self.fragbuf); - let mut writer = self.fragbuf.writer(); + let mut writer = fragbuf.writer(); let codec = Zenoh080::new(); codec.write(&mut writer, msg).unwrap(); @@ -379,37 +525,49 @@ impl StageIn { ext_first: Some(fragment::ext::First::new()), ext_drop: None, }; - let mut reader = self.fragbuf.reader(); + let mut reader = fragbuf.reader(); while reader.can_read() { - // Get the current serialization batch - batch = zgetbatch_rets!({ - // If no fragment has been sent, the sequence number is just reset - if fragment.ext_first.is_some() { - tch.sn.set(sn).unwrap() - // Otherwise, an ephemeral batch is created to send the stop fragment - } else { - let mut batch = WBatch::new_ephemeral(self.batch_config); - self.fragbuf.clear(); - fragment.ext_drop = Some(fragment::ext::Drop::new()); - let _ = batch.encode((&mut self.fragbuf.reader(), &mut fragment)); - self.s_out.move_batch(batch); + match self.get_batch(&mut current, Some(deadline))? { + Some(b) => batch = b, + None => { + // If no fragment has been sent, the sequence number is just reset + if fragment.ext_first.is_some() { + tch.sn.set(sn).unwrap() + // Otherwise, an ephemeral batch is created to send the stop fragment + } else { + fragment.ext_drop = Some(fragment::ext::Drop::new()); + // Arbitrary constant that should be enough; in reality, only 10B + // are needed. If the protocol change, and the size is no longer + // correct, fragmentation tests should fail with the `unwrap` below. + const DROP_FRAGMENT_SIZE: BatchSize = 64; + let mut batch = WBatch::new_ephemeral(BatchConfig { + mtu: DROP_FRAGMENT_SIZE, + ..self.batch_config + }); + // Serialization fails if there is no payload, so add a dummy one. + let dummy_payload = ZBuf::from(vec![0u8]); + batch + .encode((&mut dummy_payload.reader(), &mut fragment)) + .unwrap(); + debug_assert!(!fragment.more); + self.push_batch(batch); + } + return Ok(false); } - }); - + } // Serialize the message fragment match batch.encode((&mut reader, &mut fragment)) { Ok(_) => { // Update the SN fragment.sn = tch.sn.get(); fragment.ext_first = None; - // Move the serialization batch into the OUT pipeline - self.s_out.move_batch(batch); + self.push_batch(batch); } Err(_) => { // Restore the sequence number tch.sn.set(sn).unwrap(); // Reinsert the batch - *c_guard = Some(batch); + *current = Some(batch); tracing::warn!( "Zenoh message dropped because it can not be fragmented: {:?}", msg @@ -418,220 +576,145 @@ impl StageIn { } } - // adopt deadline for the next fragment - deadline.on_next_fragment(); + // renew deadline for the next fragment + deadline.renew(); } // Clean the fragbuf + self.fragbuf = fragbuf; self.fragbuf.clear(); Ok(true) } #[inline] - fn push_transport_message(&mut self, msg: TransportMessage) -> bool { + fn push_transport_message( + &mut self, + shared: &StageInShared, + msg: TransportMessage, + ) -> Result { // Lock the current serialization batch. - let mut c_guard = self.mutex.current(); + let mut current = zlock!(shared.current); - macro_rules! zgetbatch_rets { - () => { - loop { - match c_guard.take() { - Some(batch) => break batch, - None => match self.s_ref.pull() { - Some(mut batch) => { - batch.clear(); - self.s_out.atomic_backoff.first_write.store( - LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, - Ordering::Relaxed, - ); - break batch; - } - None => { - if !self.s_ref.wait() { - return false; - } - } - }, - } - } - }; - } + // Attempt the serialization on the current batch. + let mut batch = self.get_batch(&mut current, None)?.unwrap(); - macro_rules! zretok { - ($batch:expr) => {{ - if !self.batching { - // Move out existing batch - self.s_out.move_batch($batch); - return true; - } else { - let bytes = $batch.len(); - *c_guard = Some($batch); - drop(c_guard); - self.s_out.notify(bytes); - return true; - } - }}; - } - - // Get the current serialization batch. - let mut batch = zgetbatch_rets!(); // Attempt the serialization on the current batch - match batch.encode(&msg) { - Ok(_) => zretok!(batch), - Err(_) => { - if !batch.is_empty() { - self.s_out.move_batch(batch); - batch = zgetbatch_rets!(); - } - } - }; - + if batch.encode(&msg).is_ok() { + drop(current); + self.push_batch(batch); + return Ok(true); + } // The first serialization attempt has failed. This means that the current // batch is full. Therefore, we move the current batch to stage out. - batch.encode(&msg).is_ok() - } -} - -// The result of the pull operation -enum Pull { - Some(WBatch), - None, - Backoff(MicroSeconds), -} - -// Inner structure to keep track and signal backoff operations -#[derive(Clone)] -struct Backoff { - threshold: MicroSeconds, - last_bytes: BatchSize, - atomic: Arc, - // active: bool, -} - -impl Backoff { - fn new(threshold: Duration, atomic: Arc) -> Self { - Self { - threshold: threshold.as_micros() as MicroSeconds, - last_bytes: 0, - atomic, - // active: false, + self.push_batch(batch); + self.use_small_batch = false; + batch = self.get_batch(&mut current, None)?.unwrap(); + if batch.encode(&msg).is_ok() { + drop(current); + self.push_batch(batch); + return Ok(true); } + *current = Some(batch); + Ok(false) } } -// Inner structure to link the final stage with the initial stage of the pipeline -struct StageOutIn { - s_out_r: RingBufferReader, +struct StageOut { + /// Batch queue. + batch_rx: RingBufferReader, + /// Current batch on which the pipeline is writing. + /// It will be taken if no batch has been pushed. current: Arc>>, - backoff: Backoff, + /// Batch pool, to refill pulled batches, and check the pipeline batching mode. + batch_pool: Arc, + /// Notifier for batch refilling. + refill_notifier: Notifier, + /// Current backoff deadline if there is one. + backoff: Option, + /// Latest successful pull instant, used to compute the next backoff deadline. + latest_pull: Instant, + /// Backoff duration limit. + batching_time_limit: Duration, } -impl StageOutIn { - #[inline] - fn try_pull(&mut self) -> Pull { - if let Some(batch) = self.s_out_r.pull() { - self.backoff.atomic.active.store(false, Ordering::Relaxed); - return Pull::Some(batch); - } - - self.try_pull_deep() - } - - fn try_pull_deep(&mut self) -> Pull { - // Verify first backoff is not active - let mut pull = !self.backoff.atomic.active.load(Ordering::Relaxed); - - // If backoff is active, verify the current number of bytes is equal to the old number - // of bytes seen in the previous backoff iteration - if !pull { - let new_bytes = self.backoff.atomic.bytes.load(Ordering::Relaxed); - let old_bytes = self.backoff.last_bytes; - self.backoff.last_bytes = new_bytes; - - pull = new_bytes == old_bytes; +impl StageOut { + /// Pull a batch from the given priority queue, or back off if the pipeline + /// is batching. + /// + /// The backoff deadline is computed from the latest successful pull instant. + /// Indeed, starting the backoff at pull could introduce unnecessary latency, + /// as a lot of messages could have been written between the latest pull and + /// this one. We could add a mechanism to start backoff at the time the first + /// message of the current batch is written, but I don't think it's worth the + /// complexity, and don't even know why it would be better. The less we wait, + /// the better is the latency, and starting from the latest pull still cover + /// the high throughput case. + fn pull(&mut self) -> Result, Instant> { + // First, try to pull a pushed batch. + if let Some(batch) = self.batch_rx.pull() { + self.backoff = None; + self.latest_pull = Instant::now(); + return Ok(Some(batch)); } - - // Verify that we have not been doing backoff for too long - let mut backoff = 0; - if !pull { - let diff = (LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds) - .saturating_sub(self.backoff.atomic.first_write.load(Ordering::Relaxed)); - - if diff >= self.backoff.threshold { - pull = true; - } else { - backoff = self.backoff.threshold - diff; + match self.backoff { + // If the backoff delay is reached, force stop the batching. + Some(backoff) if Instant::now() > backoff => { + self.batch_pool.stop_batching(); } - } - - if pull { - // It seems no new bytes have been written on the batch, try to pull - if let Ok(mut g) = self.current.try_lock() { - self.backoff.atomic.active.store(false, Ordering::Relaxed); - - // First try to pull from stage OUT to make sure we are not in the case - // where new_bytes == old_bytes are because of two identical serializations - if let Some(batch) = self.s_out_r.pull() { - return Pull::Some(batch); - } - - // An incomplete (non-empty) batch may be available in the state IN pipeline. - match g.take() { - Some(batch) => { - return Pull::Some(batch); - } - None => { - return Pull::None; - } - } + // If the backoff delay is not reached, continue waiting. + Some(backoff) => return Err(backoff), + // If the pipeline is batching, back off with the configuration delay. + None if self.batch_pool.is_batching() => { + // Starts backoff delay from the latest pull. + self.backoff = Some(self.latest_pull + self.batching_time_limit); + return Err(self.backoff.unwrap()); } + None => {} } - - // Activate backoff - self.backoff.atomic.active.store(true, Ordering::Relaxed); - - // Do backoff - Pull::Backoff(backoff) - } -} - -struct StageOutRefill { - n_ref_w: Notifier, - s_ref_w: RingBufferWriter, -} - -impl StageOutRefill { - fn refill(&mut self, batch: WBatch) { - assert!(self.s_ref_w.push(batch).is_none()); - let _ = self.n_ref_w.notify(); - } -} - -struct StageOut { - s_in: StageOutIn, - s_ref: StageOutRefill, -} - -impl StageOut { - #[inline] - fn try_pull(&mut self) -> Pull { - self.s_in.try_pull() + // Try to retrieve current batch. + let Ok(mut current) = self.current.try_lock() else { + // If the pipeline is currently writing, there are two possibilities: + // - either the pipeline is already batching, then we are here because + // backoff delay was reached + // - we are simply pulling at the right moment, it happens + // In both case, we can return with a minimal backoff delay. Batching + // has been disabled if it was active, so the task should be notified + // as soon as possible. + // Batching may be reenabled by the pipeline, but only after a batch + // has been pushed, so we don't care. + self.backoff = Some(Instant::now() + Duration::from_millis(1)); + return Err(self.backoff.unwrap()); + }; + self.backoff = None; + self.latest_pull = Instant::now(); + // Try to pull with the lock held to not miss a batch pushed in between. + if let Some(batch) = self.batch_rx.pull() { + return Ok(Some(batch)); + } + // Otherwise take the current batch if there was one. + Ok(current.take()) } - #[inline] + /// Refills a batch for the pipeline. fn refill(&mut self, batch: WBatch) { - self.s_ref.refill(batch); + if !batch.is_ephemeral() { + // SAFETY: there is one batch pool per stage-in/stage-out pair, and it + // refilled behind an exclusive reference to stage-out, so they cannot + // be concurrent calls. + unsafe { self.batch_pool.refill(batch) }; + let _ = self.refill_notifier.notify(); + } } - fn drain(&mut self, guard: &mut MutexGuard<'_, Option>) -> Vec { + fn drain(&mut self) -> Vec { let mut batches = vec![]; // Empty the ring buffer - while let Some(batch) = self.s_in.s_out_r.pull() { + while let Some(batch) = self.batch_rx.pull() { batches.push(batch); } // Take the current batch - if let Some(batch) = guard.take() { + if let Some(batch) = zlock!(self.current).take() { batches.push(batch); } batches @@ -646,7 +729,6 @@ pub(crate) struct TransmissionPipelineConf { pub(crate) wait_before_close: Duration, pub(crate) batching_enabled: bool, pub(crate) batching_time_limit: Duration, - pub(crate) queue_alloc: QueueAllocConf, } // A 2-stage transmission pipeline @@ -667,129 +749,67 @@ impl TransmissionPipeline { config.queue_size.iter() }; - // Create the channel for notifying that new batches are in the out ring buffer - // This is a MPSC channel - let (n_out_w, n_out_r) = event::new(); - - for (prio, num) in size_iter.enumerate() { - assert!(*num != 0 && *num <= RBLEN); - - // Create the refill ring buffer - // This is a SPSC ring buffer - let (mut s_ref_w, s_ref_r) = RingBuffer::::init(); - let mut batch_allocs = 0; - if *config.queue_alloc.mode() == QueueAllocMode::Init { - // Fill the refill ring buffer with batches - for _ in 0..*num { - let batch = WBatch::new(config.batch); - batch_allocs += 1; - assert!(s_ref_w.push(batch).is_none()); - } - } - // Create the channel for notifying that new batches are in the refill ring buffer - // This is a SPSC channel - let (n_ref_w, n_ref_r) = event::new(); + let (disable_notifier, disable_waiter) = event::new(); + let (batch_notifier, batch_waiter) = event::new(); + + for (prio, &num) in size_iter.enumerate() { + assert!(num != 0 && num <= RBLEN); + let batch_pool = Arc::new(BatchPool::new(num)); + let (refill_notifier, refill_waiter) = event::new(); + + let (batch_tx, batch_rx) = RingBuffer::::init(); - // Create the refill ring buffer - // This is a SPSC ring buffer - let (s_out_w, s_out_r) = RingBuffer::::init(); let current = Arc::new(Mutex::new(None)); - let bytes = Arc::new(AtomicBackoff { - active: CachePadded::new(AtomicBool::new(false)), - bytes: CachePadded::new(AtomicBatchSize::new(0)), - first_write: CachePadded::new(AtomicMicroSeconds::new( - LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, - )), - }); - stage_in.push(Mutex::new(StageIn { - s_ref: StageInRefill { - n_ref_r, - s_ref_r, - batch_config: (*num, config.batch), - batch_allocs, - }, - s_out: StageInOut { - n_out_w: n_out_w.clone(), - s_out_w, - atomic_backoff: bytes.clone(), - }, - mutex: StageInMutex { - current: current.clone(), - priority: priority[prio].clone(), - }, - fragbuf: ZBuf::empty(), - batching: config.batching_enabled, - batch_config: config.batch, - })); + stage_in.push(StageInShared { + queue: Mutex::new(StageIn { + batch_tx, + batch_notifier: batch_notifier.clone(), + batch_pool: batch_pool.clone(), + refill_waiter, + use_small_batch: true, + fragbuf: ZBuf::empty(), + batching: config.batching_enabled, + batch_config: config.batch, + }), + current: current.clone(), + chan_tx: priority[prio].clone(), + batch_pool: batch_pool.clone(), + }); // The stage out for this priority stage_out.push(StageOut { - s_in: StageOutIn { - s_out_r, - current, - backoff: Backoff::new(config.batching_time_limit, bytes), - }, - s_ref: StageOutRefill { n_ref_w, s_ref_w }, + batch_rx, + current, + batch_pool, + refill_notifier, + backoff: None, + latest_pull: Instant::now(), + batching_time_limit: config.batching_time_limit, }); } - let active = Arc::new(TransmissionPipelineStatus { - disabled: AtomicBool::new(false), - congested: AtomicU8::new(0), - }); let producer = TransmissionPipelineProducer { stage_in: stage_in.into_boxed_slice().into(), - status: active.clone(), + disable_notifier, wait_before_drop: config.wait_before_drop, wait_before_close: config.wait_before_close, }; let consumer = TransmissionPipelineConsumer { stage_out: stage_out.into_boxed_slice(), - n_out_r, - status: active, + batch_waiter, + disable_waiter, }; (producer, consumer) } } -struct TransmissionPipelineStatus { - // The whole pipeline is enabled or disabled - disabled: AtomicBool, - // Bitflags to indicate the given priority queue is congested - congested: AtomicU8, -} - -impl TransmissionPipelineStatus { - fn set_disabled(&self, status: bool) { - self.disabled.store(status, Ordering::Relaxed); - } - - fn is_disabled(&self) -> bool { - self.disabled.load(Ordering::Relaxed) - } - - fn set_congested(&self, priority: Priority, status: bool) { - let prioflag = 1 << priority as u8; - if status { - self.congested.fetch_or(prioflag, Ordering::Relaxed); - } else { - self.congested.fetch_and(!prioflag, Ordering::Relaxed); - } - } - - fn is_congested(&self, priority: Priority) -> bool { - let prioflag = 1 << priority as u8; - self.congested.load(Ordering::Relaxed) & prioflag != 0 - } -} - #[derive(Clone)] pub(crate) struct TransmissionPipelineProducer { // Each priority queue has its own Mutex - stage_in: Arc<[Mutex]>, - status: Arc, + stage_in: Arc<[StageInShared]>, + disable_notifier: Notifier, wait_before_drop: (Duration, Duration), wait_before_close: Duration, } @@ -808,10 +828,12 @@ impl TransmissionPipelineProducer { (0, Priority::DEFAULT) }; + let stage_in = &self.stage_in[idx]; + // If message is droppable, compute a deadline after which the sample could be dropped let (wait_time, max_wait_time) = if msg.is_droppable() { // Checked if we are blocked on the priority queue and we drop directly the message - if self.status.is_congested(priority) { + if stage_in.batch_pool.is_congested() { return Ok(false); } (self.wait_before_drop.0, Some(self.wait_before_drop.1)) @@ -820,148 +842,75 @@ impl TransmissionPipelineProducer { }; let mut deadline = Deadline::new(wait_time, max_wait_time); // Lock the channel. We are the only one that will be writing on it. - let mut queue = zlock!(self.stage_in[idx]); - // Check again for congestion in case it happens when blocking on the mutex. - if self.status.is_congested(priority) { - return Ok(false); - } - let mut sent = queue.push_network_message(&msg, priority, &mut deadline)?; - // If the message cannot be sent, mark the pipeline as congested. - if !sent { - self.status.set_congested(priority, true); - // During the time between deadline wakeup and setting the congested flag, - // all batches could have been refilled (especially if there is a single one), - // so try again with the same already expired deadline. - sent = queue.push_network_message(&msg, priority, &mut deadline)?; - // If the message is sent in the end, reset the status. - // Setting the status to `true` is only done with the stage_in mutex acquired, - // so it is not possible that further messages see the congestion flag set - // after this point. - if sent { - self.status.set_congested(priority, false); - } - // There is one edge case that is fortunately supported: if the message that - // has been pushed again is fragmented, we might have some batches actually - // refilled, but still end with dropping the message, not resetting the - // congested flag in that case. However, if some batches were available, - // that means that they would have still been pushed, so we can expect them to - // be refilled, and they will eventually unset the congested flag. - } - Ok(sent) + let mut queue = zlock!(stage_in.queue); + queue.push_network_message(stage_in, &msg, priority, &mut deadline) } #[inline] - pub(crate) fn push_transport_message(&self, msg: TransportMessage, priority: Priority) -> bool { + pub(crate) fn push_transport_message( + &self, + msg: TransportMessage, + priority: Priority, + ) -> Result { // If the queue is not QoS, it means that we only have one priority with index 0. let priority = if self.stage_in.len() > 1 { priority as usize } else { 0 }; + let stage_in = &self.stage_in[priority]; // Lock the channel. We are the only one that will be writing on it. - let mut queue = zlock!(self.stage_in[priority]); - queue.push_transport_message(msg) + let mut queue = zlock!(stage_in.queue); + queue.push_transport_message(stage_in, msg) } pub(crate) fn disable(&self) { - self.status.set_disabled(true); - - // Acquire all the locks, in_guard first, out_guard later - // Use the same locking order as in drain to avoid deadlocks - let mut in_guards: Vec> = - self.stage_in.iter().map(|x| zlock!(x)).collect(); - - // Unblock waiting pullers - for ig in in_guards.iter_mut() { - ig.s_out.notify(BatchSize::MAX); - } + let _ = self.disable_notifier.notify(); } } pub(crate) struct TransmissionPipelineConsumer { // A single Mutex for all the priority queues stage_out: Box<[StageOut]>, - n_out_r: Waiter, - status: Arc, + batch_waiter: Waiter, + disable_waiter: Waiter, } impl TransmissionPipelineConsumer { pub(crate) async fn pull(&mut self) -> Option<(WBatch, Priority)> { - while !self.status.is_disabled() { - let mut backoff = MicroSeconds::MAX; - // Calculate the backoff maximum - for (prio, queue) in self.stage_out.iter_mut().enumerate() { - match queue.try_pull() { - Pull::Some(batch) => { - let prio = Priority::try_from(prio as u8).unwrap(); - return Some((batch, prio)); - } - Pull::Backoff(deadline) => { - backoff = deadline; + loop { + let mut sleep = OptionFuture::default(); + for (i, stage_out) in self.stage_out.iter_mut().enumerate() { + let prio = Priority::try_from(i as u8).unwrap(); + match stage_out.pull() { + Ok(Some(batch)) => return Some((batch, prio)), + Ok(None) => continue, + Err(backoff) => { + sleep = Some(tokio::time::sleep_until(backoff.into())).into(); break; } - Pull::None => {} } } - - // In case of writing many small messages, `recv_async()` will most likely return immedietaly. - // While trying to pull from the queue, the stage_in `lock()` will most likely taken, leading to - // a spinning behaviour while attempting to take the lock. Yield the current task to avoid - // spinning the current task indefinitely. - tokio::task::yield_now().await; - - // Wait for the backoff to expire or for a new message - let res = tokio::time::timeout( - Duration::from_micros(backoff as u64), - self.n_out_r.wait_async(), - ) - .await; - match res { - Ok(Ok(())) => { - // We have received a notification from the channel that some bytes are available, retry to pull. - } - Ok(Err(_channel_error)) => { - // The channel is closed, we can't be notified anymore. Break the loop and return None. - break; - } - Err(_timeout) => { - // The backoff timeout expired. Be aware that tokio timeout may not sleep for short duration since - // it has time resolution of 1ms: https://docs.rs/tokio/latest/tokio/time/fn.sleep.html - } + tokio::select! { + biased; + _ = self.disable_waiter.wait_async() => return None, + _ = self.batch_waiter.wait_async() => {}, + Some(_) = sleep => {} } } - None } pub(crate) fn refill(&mut self, batch: WBatch, priority: Priority) { - if !batch.is_ephemeral() { - self.stage_out[priority as usize].refill(batch); - self.status.set_congested(priority, false); - } + self.stage_out[priority as usize].refill(batch); } pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> { - // Drain the remaining batches - let mut batches = vec![]; - - // Acquire all the locks, in_guard first, out_guard later - // Use the same locking order as in disable to avoid deadlocks - let locks = self - .stage_out - .iter() - .map(|x| x.s_in.current.clone()) - .collect::>(); - let mut currents: Vec>> = - locks.iter().map(|x| zlock!(x)).collect::>(); - - for (prio, s_out) in self.stage_out.iter_mut().enumerate() { - let mut bs = s_out.drain(&mut currents[prio]); - for b in bs.drain(..) { - batches.push((b, prio)); - } - } - - batches + self.stage_out + .iter_mut() + .map(StageOut::drain) + .enumerate() + .flat_map(|(prio, batches)| batches.into_iter().map(move |batch| (batch, prio))) + .collect() } } @@ -976,13 +925,13 @@ mod tests { time::{Duration, Instant}, }; - use tokio::{task, time::timeout}; + use futures::FutureExt; + use tokio::{task, task::JoinHandle, time::timeout}; use zenoh_buffers::{ reader::{DidntRead, HasReader}, ZBuf, }; use zenoh_codec::{RCodec, Zenoh080}; - use zenoh_config::{QueueAllocConf, QueueAllocMode}; use zenoh_protocol::{ core::{Bits, CongestionControl, Encoding, Priority}, network::{ext, Push}, @@ -1008,9 +957,6 @@ mod tests { wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)), wait_before_close: Duration::from_secs(5), batching_time_limit: Duration::from_micros(1), - queue_alloc: QueueAllocConf { - mode: QueueAllocMode::Init, - }, }; const CONFIG_NOT_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf { @@ -1025,9 +971,6 @@ mod tests { wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)), wait_before_close: Duration::from_secs(5), batching_time_limit: Duration::from_micros(1), - queue_alloc: QueueAllocConf { - mode: QueueAllocMode::Init, - }, }; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -1366,4 +1309,140 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn fragmentation_first_drop() { + zenoh_util::init_log_from_env_or("error"); + let config = TransmissionPipelineConf { + batch: BatchConfig { + mtu: 100, + is_streamed: true, + #[cfg(feature = "transport_compression")] + is_compression: false, + }, + queue_size: [2; Priority::NUM], + batching_enabled: true, + wait_before_drop: (Duration::from_millis(1), Duration::from_millis(10)), + wait_before_close: Duration::from_millis(1), + batching_time_limit: Duration::from_millis(1), + }; + let priorities = vec![TransportPriorityTx::make(Bits::from(u64::MAX)).unwrap()]; + // set max sequence number to use maximal serialization size + { + let mut tch = priorities[0].reliable.lock().unwrap(); + tch.sn.set(TransportSn::MAX - 10).unwrap(); + } + let (producer, mut consumer) = TransmissionPipeline::make(config, &priorities); + let message: NetworkMessage = Push { + wire_expr: "test".into(), + ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Drop, false), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + payload: vec![0u8; 200].into(), + }), + } + .into(); + let producer = task::spawn_blocking(move || { + assert!(!producer.push_network_message(message.clone()).unwrap()); + producer + }) + .await + .unwrap(); + + async fn pull_fragment(consumer: &mut TransmissionPipelineConsumer) -> Fragment { + let (batch, _) = consumer.pull().await.unwrap(); + let codec = Zenoh080::new(); + let msg: TransportMessage = codec.read(&mut &batch.as_slice()[2..]).unwrap(); + match msg.body { + TransportBody::Fragment(fragment) => fragment, + _ => unreachable!(), + } + } + // first fragment + let fragment = pull_fragment(&mut consumer).await; + assert!(fragment.ext_first.is_some() && fragment.ext_drop.is_none()); + // second fragment + let fragment = pull_fragment(&mut consumer).await; + assert!(fragment.ext_first.is_none() && fragment.ext_drop.is_none()); + // last fragment (should be the drop one) + let fragment = pull_fragment(&mut consumer).await; + assert!(fragment.ext_first.is_none() && fragment.ext_drop.is_some()); + // no more fragment + assert!(pull_fragment(&mut consumer).now_or_never().is_none()); + // drop producer at the end to not disable the pipeline + drop(producer); + } + + #[tokio::test] + async fn deterministic_batching_on_load() { + zenoh_util::init_log_from_env_or("error"); + let config = TransmissionPipelineConf { + batch: BatchConfig { + mtu: BatchSize::MAX, + is_streamed: true, + #[cfg(feature = "transport_compression")] + is_compression: false, + }, + queue_size: [4; Priority::NUM], + batching_enabled: true, + wait_before_drop: (Duration::from_millis(1000), Duration::from_millis(50000)), + wait_before_close: Duration::from_millis(5000000), + batching_time_limit: Duration::from_millis(10000), + }; + let priorities = vec![TransportPriorityTx::make(Bits::from(TransportSn::MAX)).unwrap()]; + let (producer, mut consumer) = TransmissionPipeline::make(config, &priorities); + let message: NetworkMessage = Push { + wire_expr: "test".into(), + ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, false), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + payload: vec![0u8; 8].into(), + }), + } + .into(); + let producer_task: JoinHandle> = task::spawn_blocking(move || loop { + producer.push_network_message(message.clone())?; + }); + + // wait until the producer task has started and pull the first batch; + // it can have arbirary size, should be quite small if we wake up in + // time + loop { + if let Some((batch, prio)) = consumer.pull().await { + consumer.refill(batch, prio); + break; + } + } + // following batch may still use small buffer size + let (batch, _) = consumer.pull().await.unwrap(); + // keep a batch not refilled to simulate network delay + let _batch_being_sent = batch; + // next batches should all have the maximum size + // don't pull more than 100 batches, because after, the + // sequence number size increase and the max size change + for _ in 0..100 { + let (batch, prio) = consumer.pull().await.unwrap(); + assert_eq!(batch.len(), 65533); + consumer.refill(batch, prio); + } + // drop the consumer, so it will make the producer fails and stop + drop(consumer); + assert!(matches!(producer_task.await.unwrap(), Err(TransportClosed))); + } } diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 6c4579568b..7f3adef4a6 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -15,7 +15,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use rand::{RngCore, SeedableRng}; use tokio::sync::Mutex as AsyncMutex; -use zenoh_config::{Config, LinkRxConf, QueueAllocConf, QueueConf, QueueSizeConf}; +use zenoh_config::{Config, LinkRxConf, QueueConf, QueueSizeConf}; use zenoh_crypto::{BlockCipher, PseudoRng}; use zenoh_link::NewLinkChannelSender; use zenoh_protocol::{ @@ -112,7 +112,6 @@ pub struct TransportManagerConfig { pub wait_before_close: Duration, pub queue_size: [usize; Priority::NUM], pub queue_backoff: Duration, - pub queue_alloc: QueueAllocConf, pub defrag_buff_size: usize, pub link_rx_buffer_size: usize, pub unicast: TransportManagerConfigUnicast, @@ -144,7 +143,6 @@ pub struct TransportManagerBuilder { wait_before_drop: (Duration, Duration), wait_before_close: Duration, queue_size: QueueSizeConf, - queue_alloc: QueueAllocConf, defrag_buff_size: usize, link_rx_buffer_size: usize, unicast: TransportManagerBuilderUnicast, @@ -193,11 +191,6 @@ impl TransportManagerBuilder { self } - pub fn queue_alloc(mut self, queue_alloc: QueueAllocConf) -> Self { - self.queue_alloc = queue_alloc; - self - } - pub fn wait_before_drop(mut self, wait_before_drop: (Duration, Duration)) -> Self { self.wait_before_drop = wait_before_drop; self @@ -273,7 +266,6 @@ impl TransportManagerBuilder { )); self = self.wait_before_close(duration_from_i64us(*cc_block.wait_before_close())); self = self.queue_size(link.tx().queue().size().clone()); - self = self.queue_alloc(*link.tx().queue().allocation()); self = self.tx_threads(*link.tx().threads()); self = self.protocols(link.protocols().clone()); @@ -334,7 +326,6 @@ impl TransportManagerBuilder { wait_before_close: self.wait_before_close, queue_size, queue_backoff: self.batching_time_limit, - queue_alloc: self.queue_alloc, defrag_buff_size: self.defrag_buff_size, link_rx_buffer_size: self.link_rx_buffer_size, unicast: unicast.config, @@ -386,7 +377,6 @@ impl Default for TransportManagerBuilder { ), wait_before_close: duration_from_i64us(*cc_block.wait_before_close()), queue_size: queue.size, - queue_alloc: queue.allocation, batching_time_limit: Duration::from_millis(backoff), defrag_buff_size: *link_rx.max_message_size(), link_rx_buffer_size: *link_rx.buffer_size(), diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index ec26d1246d..a4badd2e41 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -314,7 +314,6 @@ impl TransportLinkMulticastUniversal { wait_before_close: self.transport.manager.config.wait_before_close, batching_enabled: self.transport.manager.config.batching, batching_time_limit: self.transport.manager.config.queue_backoff, - queue_alloc: self.transport.manager.config.queue_alloc, }; // The pipeline let (producer, consumer) = TransmissionPipeline::make(tpc, &priority_tx); @@ -544,9 +543,8 @@ async fn rx_task( // The pool of buffers let mtu = link.inner.config.batch.mtu as usize; let mut n = rx_buffer_size / mtu; - if n == 0 { - tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer."); - n = 1; + if rx_buffer_size % mtu != 0 { + n += 1; } let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index b14bed85ee..b9db28b45e 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -220,7 +220,7 @@ impl TransportMulticastInner { session: false, } .into(); - pipeline.push_transport_message(msg, Priority::Background); + let _ = pipeline.push_transport_message(msg, Priority::Background); } } } diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index 2fe80b37f8..950d11f3c2 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -154,9 +154,8 @@ impl TransportUnicastLowlatency { let pool = { let mtu = link_rx.config.batch.mtu as usize; let mut n = rx_buffer_size / mtu; - if n == 0 { - tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link_rx} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer."); - n = 1; + if rx_buffer_size % mtu != 0 { + n += 1; } zenoh_sync::RecyclingObjectPool::new(n, move || vec![0_u8; mtu].into_boxed_slice()) }; diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 1e8dc594de..18011249be 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -66,7 +66,6 @@ impl TransportLinkUnicastUniversal { wait_before_close: transport.manager.config.wait_before_close, batching_enabled: transport.manager.config.batching, batching_time_limit: transport.manager.config.queue_backoff, - queue_alloc: transport.manager.config.queue_alloc, }; // The pipeline @@ -262,9 +261,8 @@ async fn rx_task( // The pool of buffers let mtu = link.config.batch.mtu as usize; let mut n = rx_buffer_size / mtu; - if n == 0 { - tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer."); - n = 1; + if rx_buffer_size % mtu != 0 { + n += 1; } let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index e3c357ffb6..6eb31b7e14 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -372,7 +372,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { } .into(); - p.push_transport_message(msg, Priority::Background); + let _ = p.push_transport_message(msg, Priority::Background); } // Terminate and clean up the transport self.delete().await