Skip to content

Commit

Permalink
feat(qp): introduce wrapper type for send_ops_flags and refactor qp_ex
Browse files Browse the repository at this point in the history
We introduce SendOperationFlags for building qp_ex, by default, we
would create an extended qp with write / send / read support, just
like what a basic qp supports.

At the same time, we put `NonNull<ibv_qp_ex>` in ExtendedQueuePair
instead of `NonNull<ibv_qp>`, this should be more accurate.

Signed-off-by: Luke Yue <[email protected]>
  • Loading branch information
dragonJACson committed Sep 17, 2024
1 parent 02761ca commit 1ad3335
Showing 1 changed file with 49 additions and 12 deletions.
61 changes: 49 additions & 12 deletions src/verbs/queue_pair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use bitmask_enum::bitmask;
use lazy_static::lazy_static;
use rdma_mummy_sys::{
ibv_create_qp, ibv_create_qp_ex, ibv_destroy_qp, ibv_modify_qp, ibv_qp, ibv_qp_attr, ibv_qp_attr_mask, ibv_qp_cap,
ibv_qp_attr_mask, ibv_qp_cap, ibv_qp_ex, ibv_qp_init_attr, ibv_qp_init_attr_ex, ibv_qp_state, ibv_qp_type,
ibv_rx_hash_conf,
ibv_qp_create_send_ops_flags, ibv_qp_ex, ibv_qp_init_attr, ibv_qp_init_attr_ex, ibv_qp_init_attr_mask,
ibv_qp_state, ibv_qp_to_qp_ex, ibv_qp_type, ibv_rx_hash_conf,
};
use std::{
io,
Expand Down Expand Up @@ -57,6 +57,24 @@ impl From<u32> for QueuePairState {
}
}

#[bitmask(u64)]
#[bitmask_config(vec_debug)]
pub enum SendOperationFlags {
Write = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_RDMA_WRITE.0 as _,
WriteWithImmediate = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM.0 as _,
Send = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_SEND.0 as _,
SendWithImmediate = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_SEND_WITH_IMM.0 as _,
Read = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_RDMA_READ.0 as _,
AtomicCompareAndSwap = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP.0 as _,
AtomicFetchAndAdd = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD.0 as _,
LocalInvalidate = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_LOCAL_INV.0 as _,
BindMemoryWindow = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_BIND_MW.0 as _,
SendWithInvalidate = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_SEND_WITH_INV.0 as _,
TcpSegmentationOffload = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_TSO.0 as _,
Flush = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_FLUSH.0 as _,
AtomicWrite = ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_ATOMIC_WRITE.0 as _,
}

pub trait QueuePair {
//! return the basic handle of QP;
//! we mark this method unsafe because the lifetime of ibv_qp is not
Expand Down Expand Up @@ -279,24 +297,21 @@ impl QueuePair for BasicQueuePair<'_> {

#[derive(Debug)]
pub struct ExtendedQueuePair<'res> {
// TODO: ibv_create_qp_ex returns ibv_qp instead of ibv_qp_ex, to be fixed
pub(crate) qp_ex: NonNull<ibv_qp>,
pub(crate) qp_ex: NonNull<ibv_qp_ex>,
// phantom data for protection domain & completion queues
_phantom: PhantomData<&'res ()>,
}

impl Drop for ExtendedQueuePair<'_> {
fn drop(&mut self) {
// TODO: convert qp_ex to qp (port ibv_qp_ex_to_qp in rdma-mummy-sys)
let ret = unsafe { ibv_destroy_qp(self.qp_ex.as_ptr().cast()) };
let ret = unsafe { ibv_destroy_qp(self.qp().as_ptr()) };
assert_eq!(ret, 0)
}
}

impl QueuePair for ExtendedQueuePair<'_> {
unsafe fn qp(&self) -> NonNull<ibv_qp> {
// TODO: convert qp_ex to qp (port ibv_qp_ex_to_qp in rdma-mummy-sys)
self.qp_ex.cast()
NonNull::new_unchecked(&mut (*self.qp_ex.as_ptr()).qp_base as _)
}
}

Expand Down Expand Up @@ -326,7 +341,10 @@ impl<'res> QueuePairBuilder<'res> {
},
qp_type: QueuePairType::ReliableConnection as _,
sq_sig_all: 0,
comp_mask: 0,
// when building an extended qp instead of a basic qp, we need to pass in
// these essential attributes.
comp_mask: ibv_qp_init_attr_mask::IBV_QP_INIT_ATTR_PD.0
| ibv_qp_init_attr_mask::IBV_QP_INIT_ATTR_SEND_OPS_FLAGS.0,
pd: pd.pd.as_ptr(),
xrcd: null_mut(),
create_flags: 0,
Expand Down Expand Up @@ -386,6 +404,11 @@ impl<'res> QueuePairBuilder<'res> {
self
}

pub fn setup_send_ops_flags(&mut self, send_ops_flags: SendOperationFlags) -> &mut Self {
self.init_attr.send_ops_flags = send_ops_flags.bits;
self
}

// build basic qp
pub fn build(&self) -> Result<BasicQueuePair<'res>, String> {
let qp = unsafe {
Expand All @@ -410,11 +433,25 @@ impl<'res> QueuePairBuilder<'res> {
}

// build extended qp
pub fn build_ex(&mut self) -> Result<ExtendedQueuePair<'res>, String> {
let qp = unsafe { ibv_create_qp_ex((*self.init_attr.pd).context, &mut self.init_attr).unwrap_or(null_mut()) };
pub fn build_ex(&self) -> Result<ExtendedQueuePair<'res>, String> {
let mut attr = self.init_attr.clone();

// unless user specified, we assume every extended qp would support send,
// write and read, just as what basic qp supports.
if attr.send_ops_flags == 0 {
attr.send_ops_flags = (SendOperationFlags::Send
| SendOperationFlags::SendWithImmediate
| SendOperationFlags::Write
| SendOperationFlags::WriteWithImmediate
| SendOperationFlags::Read)
.into()
}

let qp = unsafe { ibv_create_qp_ex((*(attr.pd)).context, &mut attr).unwrap_or(null_mut()) };

Ok(ExtendedQueuePair {
qp_ex: NonNull::new(qp).ok_or(format!("ibv_create_qp failed, {}", io::Error::last_os_error()))?,
qp_ex: NonNull::new(unsafe { ibv_qp_to_qp_ex(qp) })
.ok_or(format!("ibv_create_qp_ex failed, {}", io::Error::last_os_error()))?,
_phantom: PhantomData,
})
}
Expand Down

0 comments on commit 1ad3335

Please sign in to comment.