diff --git a/Cargo.toml b/Cargo.toml index 7fa67c6828..f722042e84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,8 @@ want = { version = "0.3", optional = true } [dev-dependencies] form_urlencoded = "1" +futures-channel = { version = "0.3", features = ["sink"] } +futures-util = { version = "0.3", default-features = false, features = ["sink"] } http-body-util = "0.1" pretty_env_logger = "0.5" spmc = "0.3" diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 3558e5c611..08d5c90548 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -140,7 +140,6 @@ fn http2_parallel_x10_req_10mb(b: &mut test::Bencher) { } #[bench] -#[ignore] fn http2_parallel_x10_req_10kb_100_chunks(b: &mut test::Bencher) { let body = &[b'x'; 1024 * 10]; opts() @@ -152,7 +151,6 @@ fn http2_parallel_x10_req_10kb_100_chunks(b: &mut test::Bencher) { } #[bench] -#[ignore] fn http2_parallel_x10_req_10kb_100_chunks_adaptive_window(b: &mut test::Bencher) { let body = &[b'x'; 1024 * 10]; opts() @@ -165,7 +163,6 @@ fn http2_parallel_x10_req_10kb_100_chunks_adaptive_window(b: &mut test::Bencher) } #[bench] -#[ignore] fn http2_parallel_x10_req_10kb_100_chunks_max_window(b: &mut test::Bencher) { let body = &[b'x'; 1024 * 10]; opts() @@ -294,7 +291,7 @@ impl Opts { .build() .expect("rt build"), ); - //let exec = rt.clone(); + let exec = rt.clone(); let req_len = self.request_body.map(|b| b.len()).unwrap_or(0) as u64; let req_len = if self.request_chunks > 0 { @@ -344,19 +341,21 @@ impl Opts { let make_request = || { let chunk_cnt = self.request_chunks; let body = if chunk_cnt > 0 { - /* - let (mut tx, body) = Body::channel(); + let (mut tx, rx) = futures_channel::mpsc::channel(0); + let chunk = self .request_body .expect("request_chunks means request_body"); exec.spawn(async move { + use futures_util::SinkExt; + use hyper::body::Frame; for _ in 0..chunk_cnt { - tx.send_data(chunk.into()).await.expect("send_data"); + tx.send(Ok(Frame::data(bytes::Bytes::from(chunk)))) + .await + .expect("send_data"); } }); - body - */ - todo!("request_chunks"); + http_body_util::StreamBody::new(rx).boxed() } else if let Some(chunk) = self.request_body { http_body_util::Full::new(bytes::Bytes::from(chunk)).boxed() } else { diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 88b3107877..60504e68b3 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -342,7 +342,7 @@ mod tests { use super::{channel, Callback, Receiver}; #[derive(Debug)] - struct Custom(i32); + struct Custom(#[allow(dead_code)] i32); impl Future for Receiver { type Output = Option<(T, Callback)>;