Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
improve batch_send error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jbiseda committed Oct 31, 2023
1 parent 136ab21 commit afcdf7b
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 100 deletions.
21 changes: 11 additions & 10 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,17 @@ impl RepairService {

let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed");
if !batch.is_empty() {
if let Err(SendPktsError::IoError(err, num_failed)) =
batch_send(repair_socket, &batch)
{
error!(
"{} batch_send failed to send {}/{} packets first error {:?}",
id,
num_failed,
batch.len(),
err
);
match batch_send(repair_socket, &batch) {
Ok(()) => (),
Err(SendPktsError::IoError(err, num_failed)) => {
error!(
"{} batch_send failed to send {}/{} packets first error {:?}",
id,
num_failed,
batch.len(),
err
);
}
}
}
batch_send_repairs_elapsed.stop();
Expand Down
19 changes: 10 additions & 9 deletions core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1221,15 +1221,16 @@ impl ServeRepair {
}
}
if !pending_pongs.is_empty() {
if let Err(SendPktsError::IoError(err, num_failed)) =
batch_send(repair_socket, &pending_pongs)
{
warn!(
"batch_send failed to send {}/{} packets. First error: {:?}",
num_failed,
pending_pongs.len(),
err
);
match batch_send(repair_socket, &pending_pongs) {
Ok(()) => (),
Err(SendPktsError::IoError(err, num_failed)) => {
warn!(
"batch_send failed to send {}/{} packets. First error: {:?}",
num_failed,
pending_pongs.len(),
err
);
}
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions gossip/tests/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,16 @@ fn retransmit_to(
.filter(|addr| socket_addr_space.check(addr))
.collect()
};
if let Err(SendPktsError::IoError(ioerr, num_failed)) = multi_target_send(socket, data, &dests)
{
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
dests.len(),
);
match multi_target_send(socket, data, &dests) {
Ok(()) => (),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
dests.len(),
);
}
}
}

Expand Down
69 changes: 34 additions & 35 deletions streamer/src/nonblocking/sendmmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,10 @@ mod tests {
let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4];

let sender = UdpSocket::bind("0.0.0.0:0").await.expect("bind");
if let Err(SendPktsError::IoError(_, num_failed)) =
batch_send(&sender, &packet_refs[..]).await
{
assert_eq!(num_failed, 1);
}
if let Err(SendPktsError::IoError(_, num_failed)) =
multi_target_send(&sender, &packets[0], &dest_refs).await
{
assert_eq!(num_failed, 1);
}
let res = batch_send(&sender, &packet_refs[..]).await;
assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
let res = multi_target_send(&sender, &packets[0], &dest_refs).await;
assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
}

#[tokio::test]
Expand All @@ -205,11 +199,12 @@ mod tests {
(&packets[3][..], &ipv4broadcast),
(&packets[4][..], &ipv4local),
];
if let Err(SendPktsError::IoError(ioerror, num_failed)) =
batch_send(&sender, &packet_refs[..]).await
{
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
match batch_send(&sender, &packet_refs[..]).await {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
}
}

// test leading and trailing failures for batch_send
Expand All @@ -220,11 +215,12 @@ mod tests {
(&packets[3][..], &ipv4local),
(&packets[4][..], &ipv4broadcast),
];
if let Err(SendPktsError::IoError(ioerror, num_failed)) =
batch_send(&sender, &packet_refs[..]).await
{
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 3);
match batch_send(&sender, &packet_refs[..]).await {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 3);
}
}

// test consecutive intermediate failures for batch_send
Expand All @@ -235,11 +231,12 @@ mod tests {
(&packets[3][..], &ipv4broadcast),
(&packets[4][..], &ipv4local),
];
if let Err(SendPktsError::IoError(ioerror, num_failed)) =
batch_send(&sender, &packet_refs[..]).await
{
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
match batch_send(&sender, &packet_refs[..]).await {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
}
}

// test intermediate failures for multi_target_send
Expand All @@ -250,11 +247,12 @@ mod tests {
&ipv4broadcast,
&ipv4local,
];
if let Err(SendPktsError::IoError(ioerror, num_failed)) =
multi_target_send(&sender, &packets[0], &dest_refs).await
{
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
match multi_target_send(&sender, &packets[0], &dest_refs).await {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
}
}

// test leading and trailing failures for multi_target_send
Expand All @@ -265,11 +263,12 @@ mod tests {
&ipv4local,
&ipv4broadcast,
];
if let Err(SendPktsError::IoError(ioerror, num_failed)) =
multi_target_send(&sender, &packets[0], &dest_refs).await
{
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 3);
match multi_target_send(&sender, &packets[0], &dest_refs).await {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 3);
}
}
}
}
67 changes: 34 additions & 33 deletions streamer/src/sendmmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,10 @@ mod tests {
let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4];

let sender = UdpSocket::bind("0.0.0.0:0").expect("bind");
if let Err(SendPktsError::IoError(_, num_failed)) = batch_send(&sender, &packet_refs[..]) {
assert_eq!(num_failed, 1);
}
if let Err(SendPktsError::IoError(_, num_failed)) =
multi_target_send(&sender, &packets[0], &dest_refs)
{
assert_eq!(num_failed, 1);
}
let res = batch_send(&sender, &packet_refs[..]);
assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
let res = multi_target_send(&sender, &packets[0], &dest_refs);
assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
}

#[test]
Expand All @@ -307,11 +303,12 @@ mod tests {
(&packets[3][..], &ipv4broadcast),
(&packets[4][..], &ipv4local),
];
if let Err(SendPktsError::IoError(ioerror, num_failed)) =
batch_send(&sender, &packet_refs[..])
{
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
match batch_send(&sender, &packet_refs[..]) {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
}
}

// test leading and trailing failures for batch_send
Expand All @@ -322,11 +319,12 @@ mod tests {
(&packets[3][..], &ipv4local),
(&packets[4][..], &ipv4broadcast),
];
if let Err(SendPktsError::IoError(ioerror, num_failed)) =
batch_send(&sender, &packet_refs[..])
{
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 3);
match batch_send(&sender, &packet_refs[..]) {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 3);
}
}

// test consecutive intermediate failures for batch_send
Expand All @@ -337,11 +335,12 @@ mod tests {
(&packets[3][..], &ipv4broadcast),
(&packets[4][..], &ipv4local),
];
if let Err(SendPktsError::IoError(ioerror, num_failed)) =
batch_send(&sender, &packet_refs[..])
{
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
match batch_send(&sender, &packet_refs[..]) {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
}
}

// test intermediate failures for multi_target_send
Expand All @@ -352,11 +351,12 @@ mod tests {
&ipv4broadcast,
&ipv4local,
];
if let Err(SendPktsError::IoError(ioerror, num_failed)) =
multi_target_send(&sender, &packets[0], &dest_refs)
{
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
match multi_target_send(&sender, &packets[0], &dest_refs) {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 2);
}
}

// test leading and trailing failures for multi_target_send
Expand All @@ -367,11 +367,12 @@ mod tests {
&ipv4local,
&ipv4broadcast,
];
if let Err(SendPktsError::IoError(ioerror, num_failed)) =
multi_target_send(&sender, &packets[0], &dest_refs)
{
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 3);
match multi_target_send(&sender, &packets[0], &dest_refs) {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
assert_eq!(num_failed, 3);
}
}
}
}
9 changes: 6 additions & 3 deletions turbine/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,12 @@ pub fn broadcast_shreds(
transmit_stats.shred_select += shred_select.as_us();

let mut send_mmsg_time = Measure::start("send_mmsg");
if let Err(SendPktsError::IoError(ioerr, num_failed)) = batch_send(s, &packets[..]) {
transmit_stats.dropped_packets_udp += num_failed;
result = Err(Error::Io(ioerr));
match batch_send(s, &packets[..]) {
Ok(()) => (),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
transmit_stats.dropped_packets_udp += num_failed;
result = Err(Error::Io(ioerr));
}
}
send_mmsg_time.stop();
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
Expand Down
7 changes: 5 additions & 2 deletions turbine/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,11 @@ impl BroadcastRun for BroadcastDuplicatesRun {
.flatten()
.collect();

if let Err(SendPktsError::IoError(ioerr, _)) = batch_send(sock, &packets) {
return Err(Error::Io(ioerr));
match batch_send(sock, &packets) {
Ok(()) => (),
Err(SendPktsError::IoError(ioerr, _)) => {
return Err(Error::Io(ioerr));
}
}
Ok(())
}
Expand Down

0 comments on commit afcdf7b

Please sign in to comment.