Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework the pipeline: batch allocation + deterministic backoff #1769

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ console-subscriber = "0.4.0"
const_format = "0.2.33"
crc = "3.2.1"
criterion = "0.5"
crossbeam-utils = "0.8.20"
crossbeam-queue = "0.3.12"
derive_more = { version = "1.0.0", features = ["as_ref"] }
derive-new = "0.7.0"
Expand Down
20 changes: 7 additions & 13 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -471,14 +471,14 @@
/// then amount of memory being allocated for each queue is SIZE_XXX * LINK_MTU.
/// If qos is false, then only the DATA priority will be allocated.
size: {
control: 2,
real_time: 2,
interactive_high: 2,
interactive_low: 2,
control: 1,
real_time: 1,
interactive_high: 1,
interactive_low: 1,
data_high: 2,
data: 2,
data_low: 2,
background: 2,
data: 4,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted the previous commit, so the numbers are back to the original ones. Throughputs of big messages was indeed very impacted by a smaller batch number, 130kmsg/s -> 50kmsg/s for 70kB messages.
@Mallets Do you think we should modify the numbers? for example increasing the hight priority to 2? In fact this PR does lazy allocation, and only keep one buffer in memory, so we could put any numbers, it would not change memory consumption at all, it just a matter of throughput/congestion.

data_low: 4,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The numbers are not coherent with commons/zenoh-config/src/defaults.rs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, good catch !

background: 4,
},
/// Congestion occurs when the queue is empty (no available batch).
congestion_control: {
Expand Down Expand Up @@ -506,12 +506,6 @@
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: 1,
},
allocation: {
/// Mode for memory allocation of batches in the priority queues.
/// - "init": batches are allocated at queue initialization time.
/// - "lazy": batches are allocated when needed up to the maximum number of batches configured in the size configuration parameter.
mode: "lazy",
},
},
},
/// Configure the zenoh RX parameters of a link
Expand Down
12 changes: 6 additions & 6 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,14 @@ impl QueueSizeConf {
impl Default for QueueSizeConf {
fn default() -> Self {
Self {
control: 2,
real_time: 2,
interactive_low: 2,
interactive_high: 2,
control: 1,
real_time: 1,
interactive_low: 1,
interactive_high: 1,
data_high: 2,
data: 2,
data: 4,
data_low: 2,
background: 2,
background: 1,
}
}
}
Expand Down
15 changes: 0 additions & 15 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,13 +508,6 @@ validated_struct::validator! {
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: u64,
},
/// Perform lazy memory allocation of batches in the prioritiey queues. If set to false all batches are initialized at
/// initialization time. If set to true the batches will be allocated when needed up to the maximum number of batches
/// configured in the size configuration parameter.
pub allocation: #[derive(Default, Copy, PartialEq, Eq)]
QueueAllocConf {
pub mode: QueueAllocMode,
},
},
// Number of threads used for TX
threads: usize,
Expand Down Expand Up @@ -659,14 +652,6 @@ validated_struct::validator! {
}
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum QueueAllocMode {
Init,
#[default]
Lazy,
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ShmInitMode {
Expand Down
26 changes: 12 additions & 14 deletions io/zenoh-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,28 @@ transport_ws = ["zenoh-link/transport_ws"]
transport_serial = ["zenoh-link/transport_serial"]
transport_compression = []
transport_unixpipe = ["zenoh-link/transport_unixpipe"]
transport_vsock= ["zenoh-link/transport_vsock"]
transport_vsock = ["zenoh-link/transport_vsock"]
stats = ["zenoh-protocol/stats"]
test = []
unstable = []
default = ["test", "transport_multilink"]

[dependencies]
async-trait = { workspace = true }
crossbeam-utils = { workspace = true }
tokio = { workspace = true, features = [
"sync",
"fs",
"time",
"macros",
"rt-multi-thread",
"io-util",
"net",
"sync",
"fs",
"time",
"macros",
"rt-multi-thread",
"io-util",
"net",
] }
lazy_static = { workspace = true }
tokio-util = { workspace = true, features = ["rt"]}
tokio-util = { workspace = true, features = ["rt"] }
flume = { workspace = true }
tracing = {workspace = true}
futures = { workspace = true }
tracing = { workspace = true }
lz4_flex = { workspace = true }
paste = { workspace = true }
rand = { workspace = true, features = ["default"] }
Expand All @@ -90,8 +90,6 @@ zenoh-task = { workspace = true }


[dev-dependencies]
futures-util = { workspace = true }
zenoh-util = {workspace = true }
zenoh-util = { workspace = true }
zenoh-protocol = { workspace = true, features = ["test"] }
futures = { workspace = true }
zenoh-link-commons = { workspace = true }
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/common/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ pub struct WBatch {
#[cfg(feature = "stats")]
pub stats: WBatchStats,
// an ephemeral batch will not be recycled in the pipeline
// it can be used to push a stop fragment when no batch are available
// it can be used to push a drop fragment when no batch are available
pub ephemeral: bool,
}

Expand All @@ -212,9 +212,9 @@ impl WBatch {
buffer: BBuf::with_capacity(config.mtu as usize),
codec: Zenoh080Batch::new(),
config,
ephemeral: false,
#[cfg(feature = "stats")]
stats: WBatchStats::default(),
ephemeral: false,
};

// Bring the batch in a clear state
Expand Down
Loading
Loading