diff --git a/Cargo.toml b/Cargo.toml index 06c3e0443..820571c15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,5 @@ ntex-util = { path = "ntex-util" } ntex-glommio = { path = "ntex-glommio" } ntex-tokio = { path = "ntex-tokio" } ntex-async-std = { path = "ntex-async-std" } + +ntex-h2 = { git = "https://github.com/ntex-rs/ntex-h2.git", branch = "async-fn-in-trait" } diff --git a/ntex-bytes/Cargo.toml b/ntex-bytes/Cargo.toml index 24bf69090..f2628572c 100644 --- a/ntex-bytes/Cargo.toml +++ b/ntex-bytes/Cargo.toml @@ -27,4 +27,4 @@ simdutf8 = { version = "0.1.4", optional = true } [dev-dependencies] serde_test = "1.0" serde_json = "1.0" -ntex = { version = "0.7.0", features = ["tokio"] } +ntex = { version = "1.0.0", features = ["tokio"] } diff --git a/ntex-connect/Cargo.toml b/ntex-connect/Cargo.toml index ea815bce6..63dd77d7b 100644 --- a/ntex-connect/Cargo.toml +++ b/ntex-connect/Cargo.toml @@ -59,4 +59,4 @@ webpki-roots = { version = "0.25", optional = true } [dev-dependencies] rand = "0.8" env_logger = "0.10" -ntex = { version = "0.7.0", features = ["tokio"] } +ntex = { version = "1.0", features = ["tokio"] } diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 631815f21..33fac379c 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -29,4 +29,4 @@ pin-project-lite = "0.2" rand = "0.8" env_logger = "0.10" -ntex = { version = "0.7", features = ["tokio"] } +ntex = { version = "1.0.0", features = ["tokio"] } diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index f8a786b20..5ef867c18 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -839,10 +839,10 @@ mod tests { Poll::Ready(Err(())) } - async fn call<'a>( - &'a self, + async fn call( + &self, _: DispatchItem, - _: ServiceCtx<'a, Self>, + _: ServiceCtx<'_, Self>, ) -> Result { Ok(None) } diff --git a/ntex-macros/Cargo.toml b/ntex-macros/Cargo.toml index 5811cd087..3bf023f78 100644 --- a/ntex-macros/Cargo.toml +++ b/ntex-macros/Cargo.toml @@ -16,6 +16,6 @@ syn = { version = "^1", features = ["full", "parsing"] } proc-macro2 = "^1" [dev-dependencies] -ntex = { version = "0.7.0", features = ["tokio"] } +ntex = { version = "1.0.0", features = ["tokio"] } futures = "0.3" env_logger = "0.10" \ No newline at end of file diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index dddda5876..49a6dbdd8 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -20,5 +20,5 @@ pin-project-lite = "0.2.6" slab = "0.4" [dev-dependencies] -ntex = { version = "0.7.0", features = ["tokio"] } -ntex-util = "0.3.0" +ntex = { version = "1.0", features = ["tokio"] } +ntex-util = "1.0.0" diff --git a/ntex-service/src/and_then.rs b/ntex-service/src/and_then.rs index 291cc445b..3ac8d72a3 100644 --- a/ntex-service/src/and_then.rs +++ b/ntex-service/src/and_then.rs @@ -110,10 +110,10 @@ mod tests { Poll::Ready(Ok(())) } - async fn call<'a>( - &'a self, + async fn call( + &self, req: &'static str, - _: ServiceCtx<'a, Self>, + _: ServiceCtx<'_, Self>, ) -> Result { Ok(req) } @@ -131,10 +131,10 @@ mod tests { Poll::Ready(Ok(())) } - async fn call<'a>( - &'a self, + async fn call( + &self, req: &'static str, - _: ServiceCtx<'a, Self>, + _: ServiceCtx<'_, Self>, ) -> Result { Ok((req, "srv2")) } diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index f4fc4fb68..61a776793 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -212,7 +212,7 @@ mod tests { type Response = (); type Error = (); - async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { + async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> { Ok(()) } } diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index 97e3fd144..fd16b9c23 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -212,10 +212,10 @@ mod tests { self.1.poll_ready(cx).map(|_| Ok(())) } - async fn call<'a>( - &'a self, + async fn call( + &self, req: &'static str, - ctx: ServiceCtx<'a, Self>, + ctx: ServiceCtx<'_, Self>, ) -> Result { let _ = ctx.clone(); Ok(req) diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index 9354a65da..edf5245f2 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -69,7 +69,7 @@ pub use self::pipeline::{Pipeline, PipelineCall}; /// type Response = u64; /// type Error = Infallible; /// -/// async fn call<'a>(&'a self, req: u8, _: ServiceCtx<'a, Self>) -> Result { +/// async fn call(&self, req: u8, _: ServiceCtx<'_, Self>) -> Result { /// Ok(req as u64) /// } /// } diff --git a/ntex-service/src/map.rs b/ntex-service/src/map.rs index 40c5dee13..f09b888f4 100644 --- a/ntex-service/src/map.rs +++ b/ntex-service/src/map.rs @@ -162,7 +162,7 @@ mod tests { Poll::Ready(Ok(())) } - async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { + async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> { Ok(()) } } diff --git a/ntex-service/src/map_err.rs b/ntex-service/src/map_err.rs index f82f44412..0e4b8aebc 100644 --- a/ntex-service/src/map_err.rs +++ b/ntex-service/src/map_err.rs @@ -178,7 +178,7 @@ mod tests { } } - async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { + async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> { Err(()) } } diff --git a/ntex-service/src/middleware.rs b/ntex-service/src/middleware.rs index b87d6d5e7..b36565230 100644 --- a/ntex-service/src/middleware.rs +++ b/ntex-service/src/middleware.rs @@ -215,10 +215,10 @@ mod tests { self.0.poll_ready(cx) } - async fn call<'a>( - &'a self, + async fn call( + &self, req: R, - ctx: ServiceCtx<'a, Self>, + ctx: ServiceCtx<'_, Self>, ) -> Result { ctx.call(&self.0, req).await } diff --git a/ntex-service/src/then.rs b/ntex-service/src/then.rs index 107b2b774..f39a8889a 100644 --- a/ntex-service/src/then.rs +++ b/ntex-service/src/then.rs @@ -114,10 +114,10 @@ mod tests { Poll::Ready(Ok(())) } - async fn call<'a>( - &'a self, + async fn call( + &self, req: Result<&'static str, &'static str>, - _: ServiceCtx<'a, Self>, + _: ServiceCtx<'_, Self>, ) -> Result<&'static str, ()> { match req { Ok(msg) => Ok(msg), @@ -138,10 +138,10 @@ mod tests { Poll::Ready(Ok(())) } - async fn call<'a>( - &'a self, + async fn call( + &self, req: Result<&'static str, ()>, - _: ServiceCtx<'a, Self>, + _: ServiceCtx<'_, Self>, ) -> Result { match req { Ok(msg) => Ok((msg, "ok")), diff --git a/ntex-tls/Cargo.toml b/ntex-tls/Cargo.toml index 7516f9ba7..c87ceb4e0 100644 --- a/ntex-tls/Cargo.toml +++ b/ntex-tls/Cargo.toml @@ -39,7 +39,7 @@ tls_openssl = { version = "0.10", package = "openssl", optional = true } tls_rust = { version = "0.21", package = "rustls", optional = true } [dev-dependencies] -ntex = { version = "0.7", features = ["openssl", "rustls", "tokio"] } +ntex = { version = "1.0", features = ["openssl", "rustls", "tokio"] } env_logger = "0.10" rustls-pemfile = "1.0" webpki-roots = "0.25" diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index c49e645dc..b36c1c4b4 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -28,7 +28,7 @@ futures-sink = { version = "0.3", default-features = false, features = ["alloc"] pin-project-lite = "0.2.9" [dev-dependencies] -ntex = { version = "0.7", features = ["tokio"] } +ntex = { version = "1.0.0", features = ["tokio"] } ntex-bytes = "0.1.21" ntex-macros = "0.1.3" futures-util = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/ntex-util/src/services/buffer.rs b/ntex-util/src/services/buffer.rs index 6c91c9563..91bb20cee 100644 --- a/ntex-util/src/services/buffer.rs +++ b/ntex-util/src/services/buffer.rs @@ -316,7 +316,7 @@ mod tests { } } - async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { + async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> { self.0.ready.set(false); self.0.count.set(self.0.count.get() + 1); Ok(()) diff --git a/ntex-util/src/services/inflight.rs b/ntex-util/src/services/inflight.rs index 08fe20879..6d781f93e 100644 --- a/ntex-util/src/services/inflight.rs +++ b/ntex-util/src/services/inflight.rs @@ -102,7 +102,7 @@ mod tests { type Response = (); type Error = (); - async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { + async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> { let _ = self.0.recv().await; Ok(()) } diff --git a/ntex-util/src/services/onerequest.rs b/ntex-util/src/services/onerequest.rs index 11b1faf70..7f2537f65 100644 --- a/ntex-util/src/services/onerequest.rs +++ b/ntex-util/src/services/onerequest.rs @@ -93,7 +93,7 @@ mod tests { type Response = (); type Error = (); - async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { + async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> { let _ = self.0.recv().await; Ok::<_, ()>(()) } diff --git a/ntex-util/src/services/timeout.rs b/ntex-util/src/services/timeout.rs index 63768a8a4..1855b8ce5 100644 --- a/ntex-util/src/services/timeout.rs +++ b/ntex-util/src/services/timeout.rs @@ -170,11 +170,7 @@ mod tests { type Response = (); type Error = SrvError; - async fn call<'a>( - &'a self, - _: (), - _: ServiceCtx<'a, Self>, - ) -> Result<(), SrvError> { + async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), SrvError> { crate::time::sleep(self.0).await; Ok::<_, SrvError>(()) } diff --git a/ntex-util/src/services/variant.rs b/ntex-util/src/services/variant.rs index 0642e30e2..02235c10f 100644 --- a/ntex-util/src/services/variant.rs +++ b/ntex-util/src/services/variant.rs @@ -253,7 +253,7 @@ mod tests { Poll::Ready(()) } - async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result { + async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result { Ok(1) } } @@ -273,7 +273,7 @@ mod tests { Poll::Ready(()) } - async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result { + async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result { Ok(2) } } diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 64168731c..c1145e486 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.0.0] - 2024-01-0x + +* Use "async fn" in trait for Service definition + ## [0.7.17] - 2024-01-05 * Allow to set default response payload limit and timeout diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 2547f3bc0..9924ed25a 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.7.17" +version = "1.0.0" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -49,20 +49,20 @@ async-std = ["ntex-rt/async-std", "ntex-async-std", "ntex-connect/async-std"] [dependencies] ntex-codec = "0.6.2" -ntex-connect = "0.3.4" +ntex-connect = "1.0.0" ntex-http = "0.1.11" ntex-router = "0.5.2" -ntex-service = "1.2.7" +ntex-service = "2.0.0" ntex-macros = "0.1.3" -ntex-util = "0.3.4" +ntex-util = "1.0.0" ntex-bytes = "0.1.21" -ntex-h2 = "0.4.4" +ntex-h2 = "0.5.0" ntex-rt = "0.4.11" -ntex-io = "0.3.17" -ntex-tls = "0.3.3" -ntex-tokio = { version = "0.3.1", optional = true } -ntex-glommio = { version = "0.3.1", optional = true } -ntex-async-std = { version = "0.3.2", optional = true } +ntex-io = "1.0.0" +ntex-tls = "1.0.0" +ntex-tokio = { version = "0.4.0", optional = true } +ntex-glommio = { version = "0.4.0", optional = true } +ntex-async-std = { version = "0.4.0", optional = true } async-channel = "2.1" base64 = "0.21" diff --git a/ntex/src/http/body.rs b/ntex/src/http/body.rs index 4f7cd7e9d..02d472344 100644 --- a/ntex/src/http/body.rs +++ b/ntex/src/http/body.rs @@ -543,10 +543,10 @@ where #[cfg(test)] mod tests { use futures_util::stream; - use std::io; + use std::{future::poll_fn, io}; use super::*; - use crate::util::{poll_fn, Ready}; + use crate::util::Ready; impl Body { pub(crate) fn get_ref(&self) -> &[u8] { diff --git a/ntex/src/http/client/connector.rs b/ntex/src/http/client/connector.rs index 6485d04ab..037d254c7 100644 --- a/ntex/src/http/client/connector.rs +++ b/ntex/src/http/client/connector.rs @@ -3,9 +3,9 @@ use std::{fmt, task::Context, task::Poll, time::Duration}; use ntex_h2::{self as h2}; use crate::connect::{Connect as TcpConnect, Connector as TcpConnector}; -use crate::service::{apply_fn, boxed, Service, ServiceCall, ServiceCtx}; +use crate::service::{apply_fn, boxed, Service, ServiceCtx}; use crate::time::{Millis, Seconds}; -use crate::util::{timeout::TimeoutError, timeout::TimeoutService, Either, Ready}; +use crate::util::{timeout::TimeoutError, timeout::TimeoutService}; use crate::{http::Uri, io::IoBoxed}; use super::{connection::Connection, error::ConnectError, pool::ConnectionPool, Connect}; @@ -54,7 +54,6 @@ impl Connector { let conn = Connector { connector: boxed::service( TcpConnector::new() - .chain() .map(IoBoxed::from) .map_err(ConnectError::from), ), @@ -192,12 +191,8 @@ impl Connector { T: Service, Error = crate::connect::ConnectError> + 'static, IoBoxed: From, { - self.connector = boxed::service( - connector - .chain() - .map(IoBoxed::from) - .map_err(ConnectError::from), - ); + self.connector = + boxed::service(connector.map(IoBoxed::from).map_err(ConnectError::from)); self } @@ -208,10 +203,7 @@ impl Connector { IoBoxed: From, { self.ssl_connector = Some(boxed::service( - connector - .chain() - .map(IoBoxed::from) - .map_err(ConnectError::from), + connector.map(IoBoxed::from).map_err(ConnectError::from), )); self } @@ -265,7 +257,6 @@ fn connector( async move { srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)).await }, ) }) - .chain() .map(move |io: IoBoxed| { io.set_disconnect_timeout(disconnect_timeout); io @@ -290,12 +281,7 @@ where { type Response = as Service>::Response; type Error = ConnectError; - type Future<'f> = Either< - ServiceCall<'f, ConnectionPool, Connect>, - Ready, - >; - #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { let ready = self.tcp_pool.poll_ready(cx)?.is_ready(); let ready = if let Some(ref ssl_pool) = self.ssl_pool { @@ -310,7 +296,6 @@ where } } - #[inline] fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { let tcp_ready = self.tcp_pool.poll_shutdown(cx).is_ready(); let ssl_ready = self @@ -325,16 +310,20 @@ where } } - fn call<'a>(&'a self, req: Connect, ctx: ServiceCtx<'a, Self>) -> Self::Future<'_> { + async fn call( + &self, + req: Connect, + ctx: ServiceCtx<'_, Self>, + ) -> Result { match req.uri.scheme_str() { Some("https") | Some("wss") => { if let Some(ref conn) = self.ssl_pool { - Either::Left(ctx.call(conn, req)) + ctx.call(conn, req).await } else { - Either::Right(Ready::Err(ConnectError::SslIsNotSupported)) + Err(ConnectError::SslIsNotSupported) } } - _ => Either::Left(ctx.call(&self.tcp_pool, req)), + _ => ctx.call(&self.tcp_pool, req).await, } } } diff --git a/ntex/src/http/client/h1proto.rs b/ntex/src/http/client/h1proto.rs index 7c3749bbe..06572418d 100644 --- a/ntex/src/http/client/h1proto.rs +++ b/ntex/src/http/client/h1proto.rs @@ -1,4 +1,6 @@ -use std::{io, io::Write, pin::Pin, task::Context, task::Poll, time::Instant}; +use std::{ + future::poll_fn, io, io::Write, pin::Pin, task::Context, task::Poll, time::Instant, +}; use crate::http::body::{BodySize, MessageBody}; use crate::http::error::PayloadError; @@ -8,7 +10,7 @@ use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::payload::{Payload, PayloadStream}; use crate::io::{IoBoxed, RecvError}; use crate::time::{timeout_checked, Millis}; -use crate::util::{poll_fn, ready, BufMut, Bytes, BytesMut, Stream}; +use crate::util::{ready, BufMut, Bytes, BytesMut, Stream}; use super::connection::{Connection, ConnectionType}; use super::error::{ConnectError, SendRequestError}; diff --git a/ntex/src/http/client/h2proto.rs b/ntex/src/http/client/h2proto.rs index ee00e613b..25e29e9f4 100644 --- a/ntex/src/http/client/h2proto.rs +++ b/ntex/src/http/client/h2proto.rs @@ -1,4 +1,4 @@ -use std::io; +use std::{future::poll_fn, io}; use ntex_h2::client::{RecvStream, SimpleClient}; use ntex_h2::{self as h2, frame}; @@ -8,7 +8,7 @@ use crate::http::header::{self, HeaderMap, HeaderValue}; use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::{h2::payload, payload::Payload, Method, Version}; use crate::time::{timeout_checked, Millis}; -use crate::util::{poll_fn, ByteString, Bytes}; +use crate::util::{ByteString, Bytes}; use super::error::{ConnectError, SendRequestError}; diff --git a/ntex/src/http/client/pool.rs b/ntex/src/http/client/pool.rs index 820bd3e89..43789b1bc 100644 --- a/ntex/src/http/client/pool.rs +++ b/ntex/src/http/client/pool.rs @@ -8,7 +8,7 @@ use crate::http::uri::{Authority, Scheme, Uri}; use crate::io::{types::HttpProtocol, IoBoxed}; use crate::service::{Pipeline, PipelineCall, Service, ServiceCtx}; use crate::time::{now, Seconds}; -use crate::util::{ready, BoxFuture, ByteString, HashMap, HashSet}; +use crate::util::{ready, ByteString, HashMap, HashSet}; use crate::{channel::pool, rt::spawn, task::LocalWaker}; use super::connection::{Connection, ConnectionType}; @@ -116,61 +116,62 @@ where { type Response = Connection; type Error = ConnectError; - type Future<'f> = BoxFuture<'f, Result>; crate::forward_poll_ready!(connector); crate::forward_poll_shutdown!(connector); - fn call<'a>(&'a self, req: Connect, _: ServiceCtx<'a, Self>) -> Self::Future<'_> { + async fn call( + &self, + req: Connect, + _: ServiceCtx<'_, Self>, + ) -> Result { trace!("Get connection for {:?}", req.uri); let inner = self.inner.clone(); let waiters = self.waiters.clone(); - Box::pin(async move { - let key = if let Some(authority) = req.uri.authority() { - authority.clone().into() - } else { - return Err(ConnectError::Unresolved); - }; - - // acquire connection - let result = inner.borrow_mut().acquire(&key); - match result { - // use existing connection - Acquire::Acquired(io, created) => { - trace!("Use existing {:?} connection for {:?}", io, req.uri); - Ok(Connection::new( - io, - created, - Some(Acquired::new(key, inner)), - )) - } - // open new tcp connection - Acquire::Available => { - trace!("Connecting to {:?}", req.uri); - let uri = req.uri.clone(); - let (tx, rx) = waiters.borrow_mut().pool.channel(); - OpenConnection::spawn(key, tx, uri, inner, &self.connector, req); - - match rx.await { - Err(_) => Err(ConnectError::Disconnected(None)), - Ok(res) => res, - } + let key = if let Some(authority) = req.uri.authority() { + authority.clone().into() + } else { + return Err(ConnectError::Unresolved); + }; + + // acquire connection + let result = inner.borrow_mut().acquire(&key); + match result { + // use existing connection + Acquire::Acquired(io, created) => { + trace!("Use existing {:?} connection for {:?}", io, req.uri); + Ok(Connection::new( + io, + created, + Some(Acquired::new(key, inner)), + )) + } + // open new tcp connection + Acquire::Available => { + trace!("Connecting to {:?}", req.uri); + let uri = req.uri.clone(); + let (tx, rx) = waiters.borrow_mut().pool.channel(); + OpenConnection::spawn(key, tx, uri, inner, &self.connector, req); + + match rx.await { + Err(_) => Err(ConnectError::Disconnected(None)), + Ok(res) => res, } - // pool is full, wait - Acquire::NotAvailable => { - trace!( - "Pool is full, waiting for available connections for {:?}", - req.uri - ); - let rx = waiters.borrow_mut().wait_for(req); - match rx.await { - Err(_) => Err(ConnectError::Disconnected(None)), - Ok(res) => res, - } + } + // pool is full, wait + Acquire::NotAvailable => { + trace!( + "Pool is full, waiting for available connections for {:?}", + req.uri + ); + let rx = waiters.borrow_mut().wait_for(req); + match rx.await { + Err(_) => Err(ConnectError::Disconnected(None)), + Ok(res) => res, } } - }) + } } } @@ -659,8 +660,8 @@ mod tests { assert!(pool.get_ref().inner.borrow().connecting.is_empty()); // pool is full, waiting - let mut fut = pool.call(req.clone()); - assert!(lazy(|cx| Pin::new(&mut fut).poll(cx)).await.is_pending()); + let mut fut = std::pin::pin!(pool.call(req.clone())); + assert!(lazy(|cx| fut.as_mut().poll(cx)).await.is_pending()); assert_eq!(pool.get_ref().waiters.borrow().waiters.len(), 1); // release connection and push it to next waiter @@ -676,8 +677,8 @@ mod tests { assert_eq!(store.borrow().len(), 2); assert_eq!(pool.get_ref().inner.borrow().acquired, 1); assert!(pool.get_ref().inner.borrow().connecting.is_empty()); - let mut fut = pool.call(req.clone()); - assert!(lazy(|cx| Pin::new(&mut fut).poll(cx)).await.is_pending()); + let mut fut = std::pin::pin!(pool.call(req.clone())); + assert!(lazy(|cx| fut.as_mut().poll(cx)).await.is_pending()); assert_eq!(pool.get_ref().waiters.borrow().waiters.len(), 1); // release and close @@ -692,7 +693,7 @@ mod tests { assert_eq!(pool.get_ref().inner.borrow().acquired, 1); // drop waiter, no interest in connection - let mut fut = pool.call(req.clone()); + let mut fut = Box::pin(pool.call(req.clone())); assert!(lazy(|cx| Pin::new(&mut fut).poll(cx)).await.is_pending()); drop(fut); sleep(Millis(50)).await; @@ -704,8 +705,8 @@ mod tests { uri: Uri::try_from("http://localhost2/test").unwrap(), addr: None, }; - let mut fut = pool.call(req.clone()); - assert!(lazy(|cx| Pin::new(&mut fut).poll(cx)).await.is_pending()); + let mut fut = std::pin::pin!(pool.call(req.clone())); + assert!(lazy(|cx| fut.as_mut().poll(cx)).await.is_pending()); assert_eq!(pool.get_ref().waiters.borrow().waiters.len(), 1); conn.release(false); assert_eq!(pool.get_ref().inner.borrow().acquired, 0); diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index d6c114b16..772b6c5a1 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -117,7 +117,7 @@ where // slow-request timer let (flags, max_timeout) = if let Some(cfg) = config.headers_read_rate() { - io.start_timer_secs(cfg.timeout); + io.start_timer(cfg.timeout); (Flags::READ_HDRS_TIMEOUT, cfg.max_timeout) } else { (Flags::empty(), Seconds::ZERO) @@ -888,7 +888,7 @@ where self.io.tag(), total ); - self.io.start_timer_secs(cfg.timeout); + self.io.start_timer(cfg.timeout); return Ok(()); } } @@ -935,7 +935,7 @@ where ); self.flags.insert(Flags::READ_KA_TIMEOUT); if self.config.keep_alive_enabled() { - self.io.start_timer_secs(self.config.keep_alive); + self.io.start_timer(self.config.keep_alive); } } } else { @@ -957,7 +957,7 @@ where self.read_consumed = 0; self.read_remains = decoded.remains as u32; self.read_max_timeout = cfg.max_timeout; - self.io.start_timer_secs(cfg.timeout); + self.io.start_timer(cfg.timeout); } None } @@ -973,7 +973,7 @@ where self.read_remains = decoded.remains as u32; self.read_consumed = decoded.consumed as u32; self.read_max_timeout = cfg.max_timeout; - self.io.start_timer_secs(cfg.timeout); + self.io.start_timer(cfg.timeout); } } } @@ -981,7 +981,7 @@ where #[cfg(test)] mod tests { use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; - use std::{cell::Cell, io, sync::Arc}; + use std::{cell::Cell, future::poll_fn, io, sync::Arc}; use ntex_h2::Config; use rand::Rng; @@ -992,7 +992,7 @@ mod tests { use crate::http::{body, Request, ResponseHead, StatusCode}; use crate::io::{self as nio, Base}; use crate::service::{boxed, fn_service, IntoService}; - use crate::util::{lazy, poll_fn, stream_recv, Bytes, BytesMut}; + use crate::util::{lazy, stream_recv, Bytes, BytesMut}; use crate::{codec::Decoder, testing::Io, time::sleep, time::Millis, time::Seconds}; const BUFFER_SIZE: usize = 32_768; @@ -1274,9 +1274,7 @@ mod tests { assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending()); sleep(Millis(50)).await; - crate::util::poll_fn(|cx| Pin::new(&mut h1).poll(cx)) - .await - .unwrap(); + poll_fn(|cx| Pin::new(&mut h1).poll(cx)).await.unwrap(); assert!(h1.inner.io.is_closed()); let mut buf = BytesMut::from(&client.read().await.unwrap()[..]); diff --git a/ntex/src/http/h1/expect.rs b/ntex/src/http/h1/expect.rs index 6f8abdbd0..b994bc02e 100644 --- a/ntex/src/http/h1/expect.rs +++ b/ntex/src/http/h1/expect.rs @@ -1,7 +1,7 @@ use std::io; +use crate::http::request::Request; use crate::service::{Service, ServiceCtx, ServiceFactory}; -use crate::{http::request::Request, util::Ready}; #[derive(Copy, Clone, Debug)] pub struct ExpectHandler; @@ -11,21 +11,21 @@ impl ServiceFactory for ExpectHandler { type Error = io::Error; type Service = ExpectHandler; type InitError = io::Error; - type Future<'f> = Ready; - #[inline] - fn create(&self, _: ()) -> Self::Future<'_> { - Ready::Ok(ExpectHandler) + async fn create(&self, _: ()) -> Result { + Ok(ExpectHandler) } } impl Service for ExpectHandler { type Response = Request; type Error = io::Error; - type Future<'f> = Ready; - #[inline] - fn call<'a>(&'a self, req: Request, _: ServiceCtx<'a, Self>) -> Self::Future<'_> { - Ready::Ok(req) + async fn call( + &self, + req: Request, + _: ServiceCtx<'_, Self>, + ) -> Result { + Ok(req) } } diff --git a/ntex/src/http/h1/payload.rs b/ntex/src/http/h1/payload.rs index 9784b1687..1fe5e5a54 100644 --- a/ntex/src/http/h1/payload.rs +++ b/ntex/src/http/h1/payload.rs @@ -205,8 +205,9 @@ impl Inner { #[cfg(test)] mod tests { + use std::future::poll_fn; + use super::*; - use crate::util::poll_fn; #[crate::rt_test] async fn test_unread_data() { diff --git a/ntex/src/http/h1/service.rs b/ntex/src/http/h1/service.rs index e1de217b5..c85f4c500 100644 --- a/ntex/src/http/h1/service.rs +++ b/ntex/src/http/h1/service.rs @@ -6,7 +6,6 @@ use crate::http::error::{DispatchError, ResponseError}; use crate::http::{request::Request, response::Response}; use crate::io::{types, Filter, Io}; use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; -use crate::util::BoxFuture; use super::codec::Codec; use super::dispatcher::Dispatcher; @@ -82,10 +81,9 @@ mod openssl { > { Acceptor::new(acceptor) .timeout(self.cfg.ssl_handshake_timeout) - .chain() .map_err(SslError::Ssl) .map_init_err(|_| panic!()) - .and_then(self.chain().map_err(SslError::Service)) + .and_then(self.map_err(SslError::Service)) } } } @@ -128,10 +126,9 @@ mod rustls { > { Acceptor::from(config) .timeout(self.cfg.ssl_handshake_timeout) - .chain() .map_err(|e| SslError::Ssl(Box::new(e))) .map_init_err(|_| panic!()) - .and_then(self.chain().map_err(SslError::Service)) + .and_then(self.map_err(SslError::Service)) } } } @@ -205,39 +202,36 @@ where type Error = DispatchError; type InitError = (); type Service = H1ServiceHandler; - type Future<'f> = BoxFuture<'f, Result>; - fn create(&self, _: ()) -> Self::Future<'_> { + async fn create(&self, _: ()) -> Result { let fut = self.srv.create(()); let fut_ex = self.expect.create(()); let fut_upg = self.upgrade.as_ref().map(|f| f.create(())); let on_request = self.on_request.borrow_mut().take(); let cfg = self.cfg.clone(); - Box::pin(async move { - let service = fut - .await - .map_err(|e| log::error!("Init http service error: {:?}", e))?; - let expect = fut_ex - .await - .map_err(|e| log::error!("Init http service error: {:?}", e))?; - let upgrade = if let Some(fut) = fut_upg { - Some( - fut.await - .map_err(|e| log::error!("Init http service error: {:?}", e))?, - ) - } else { - None - }; + let service = fut + .await + .map_err(|e| log::error!("Init http service error: {:?}", e))?; + let expect = fut_ex + .await + .map_err(|e| log::error!("Init http service error: {:?}", e))?; + let upgrade = if let Some(fut) = fut_upg { + Some( + fut.await + .map_err(|e| log::error!("Init http service error: {:?}", e))?, + ) + } else { + None + }; - let config = Rc::new(DispatcherConfig::new( - cfg, service, expect, upgrade, on_request, - )); + let config = Rc::new(DispatcherConfig::new( + cfg, service, expect, upgrade, on_request, + )); - Ok(H1ServiceHandler { - config, - _t: marker::PhantomData, - }) + Ok(H1ServiceHandler { + config, + _t: marker::PhantomData, }) } } @@ -262,7 +256,6 @@ where { type Response = (); type Error = DispatchError; - type Future<'f> = Dispatcher; fn poll_ready( &self, @@ -324,12 +317,12 @@ where } } - fn call<'a>(&'a self, io: Io, _: ServiceCtx<'a, Self>) -> Self::Future<'_> { + async fn call(&self, io: Io, _: ServiceCtx<'_, Self>) -> Result<(), DispatchError> { log::trace!( "New http1 connection, peer address {:?}", io.query::().get() ); - Dispatcher::new(io, self.config.clone()) + Dispatcher::new(io, self.config.clone()).await } } diff --git a/ntex/src/http/h1/upgrade.rs b/ntex/src/http/h1/upgrade.rs index 72208d56c..a61c72ebf 100644 --- a/ntex/src/http/h1/upgrade.rs +++ b/ntex/src/http/h1/upgrade.rs @@ -1,8 +1,8 @@ use std::{io, marker::PhantomData}; use crate::http::{h1::Codec, request::Request}; +use crate::io::Io; use crate::service::{Service, ServiceCtx, ServiceFactory}; -use crate::{io::Io, util::Ready}; pub struct UpgradeHandler(PhantomData); @@ -12,10 +12,8 @@ impl ServiceFactory<(Request, Io, Codec)> for UpgradeHandler { type Service = UpgradeHandler; type InitError = io::Error; - type Future<'f> = Ready where Self: 'f; - #[inline] - fn create(&self, _: ()) -> Self::Future<'_> { + async fn create(&self, _: ()) -> Result { unimplemented!() } } @@ -23,14 +21,12 @@ impl ServiceFactory<(Request, Io, Codec)> for UpgradeHandler { impl Service<(Request, Io, Codec)> for UpgradeHandler { type Response = (); type Error = io::Error; - type Future<'f> = Ready where F: 'f; - #[inline] - fn call<'a>( - &'a self, + async fn call( + &self, _: (Request, Io, Codec), - _: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { + _: ServiceCtx<'_, Self>, + ) -> Result { unimplemented!() } } diff --git a/ntex/src/http/h2/payload.rs b/ntex/src/http/h2/payload.rs index a8b753ab7..be2a17344 100644 --- a/ntex/src/http/h2/payload.rs +++ b/ntex/src/http/h2/payload.rs @@ -1,10 +1,11 @@ //! Payload stream +use std::collections::VecDeque; use std::task::{Context, Poll}; -use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, rc::Weak}; +use std::{cell::RefCell, future::poll_fn, pin::Pin, rc::Rc, rc::Weak}; use ntex_h2::{self as h2}; -use crate::util::{poll_fn, Bytes, Stream}; +use crate::util::{Bytes, Stream}; use crate::{http::error::PayloadError, task::LocalWaker}; /// Buffered stream of byte chunks diff --git a/ntex/src/http/h2/service.rs b/ntex/src/http/h2/service.rs index fd6f185eb..7e62c57ab 100644 --- a/ntex/src/http/h2/service.rs +++ b/ntex/src/http/h2/service.rs @@ -1,5 +1,5 @@ use std::{cell::RefCell, io, task::Context, task::Poll}; -use std::{marker::PhantomData, mem, rc::Rc}; +use std::{future::poll_fn, marker::PhantomData, mem, rc::Rc}; use ntex_h2::{self as h2, frame::StreamId, server}; @@ -11,7 +11,7 @@ use crate::http::message::{CurrentIo, ResponseHead}; use crate::http::{DateService, Method, Request, Response, StatusCode, Uri, Version}; use crate::io::{types, Filter, Io, IoBoxed, IoRef}; use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; -use crate::util::{poll_fn, BoxFuture, Bytes, BytesMut, Either, HashMap, Ready}; +use crate::util::{Bytes, BytesMut, HashMap}; use super::payload::{Payload, PayloadSender}; @@ -71,10 +71,9 @@ mod openssl { > { Acceptor::new(acceptor) .timeout(self.cfg.ssl_handshake_timeout) - .chain() .map_err(SslError::Ssl) .map_init_err(|_| panic!()) - .and_then(self.chain().map_err(SslError::Service)) + .and_then(self.map_err(SslError::Service)) } } } @@ -110,10 +109,9 @@ mod rustls { Acceptor::from(config) .timeout(self.cfg.ssl_handshake_timeout) - .chain() .map_err(|e| SslError::Ssl(Box::new(e))) .map_init_err(|_| panic!()) - .and_then(self.chain().map_err(SslError::Service)) + .and_then(self.map_err(SslError::Service)) } } } @@ -130,20 +128,20 @@ where type Error = DispatchError; type InitError = S::InitError; type Service = H2ServiceHandler; - type Future<'f> = BoxFuture<'f, Result>; - fn create(&self, _: ()) -> Self::Future<'_> { - let fut = self.srv.create(()); - let cfg = self.cfg.clone(); - - Box::pin(async move { - let service = fut.await?; - let config = Rc::new(DispatcherConfig::new(cfg, service, (), None, None)); - - Ok(H2ServiceHandler { - config, - _t: PhantomData, - }) + async fn create(&self, _: ()) -> Result { + let service = self.srv.create(()).await?; + let config = Rc::new(DispatcherConfig::new( + self.cfg.clone(), + service, + (), + None, + None, + )); + + Ok(H2ServiceHandler { + config, + _t: PhantomData, }) } } @@ -164,9 +162,7 @@ where { type Response = (); type Error = DispatchError; - type Future<'f> = BoxFuture<'f, Result>; - #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self.config.service.poll_ready(cx).map_err(|e| { log::error!("Service readiness error: {:?}", e); @@ -174,18 +170,21 @@ where }) } - #[inline] fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { self.config.service.poll_shutdown(cx) } - fn call<'a>(&'a self, io: Io, _: ServiceCtx<'a, Self>) -> Self::Future<'_> { + async fn call( + &self, + io: Io, + _: ServiceCtx<'_, Self>, + ) -> Result { log::trace!( "New http2 connection, peer address {:?}", io.query::().get() ); - Box::pin(handle(io.into(), self.config.clone())) + handle(io.into(), self.config.clone()).await } } @@ -226,15 +225,14 @@ impl ControlService { impl Service> for ControlService { type Response = h2::ControlResult; type Error = (); - type Future<'f> = Ready; - fn call<'a>( - &'a self, + async fn call( + &self, msg: h2::ControlMessage, - _: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { + _: ServiceCtx<'_, Self>, + ) -> Result { log::trace!("Control message: {:?}", msg); - Ready::Ok::<_, ()>(msg.ack()) + Ok::<_, ()>(msg.ack()) } } @@ -273,12 +271,12 @@ where { type Response = (); type Error = H2Error; - type Future<'f> = Either< - BoxFuture<'f, Result>, - Ready, - >; - fn call<'a>(&'a self, msg: h2::Message, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { + async fn call( + &self, + msg: h2::Message, + _: ServiceCtx<'_, Self>, + ) -> Result { let h2::Message { stream, kind } = msg; let (io, pseudo, headers, eof, payload) = match kind { h2::MessageKind::Headers { @@ -303,7 +301,7 @@ where } else { log::error!("Payload stream does not exists for {:?}", stream.id()); }; - return Either::Right(Ready::Ok(())); + return Ok(()); } h2::MessageKind::Eof(item) => { log::debug!("Got payload eof for {:?}: {:?}", stream.id(), item); @@ -318,95 +316,93 @@ where h2::StreamEof::Error(err) => sender.set_error(err.into()), } } - return Either::Right(Ready::Ok(())); + return Ok(()); } h2::MessageKind::Disconnect(err) => { log::debug!("Connection is disconnected {:?}", err); if let Some(mut sender) = self.streams.borrow_mut().remove(&stream.id()) { sender.set_error(io::Error::new(io::ErrorKind::Other, err).into()); } - return Either::Right(Ready::Ok(())); + return Ok(()); } }; let cfg = self.config.clone(); - Either::Left(Box::pin(async move { - log::trace!( - "{:?} got request (eof: {}): {:#?}\nheaders: {:#?}", - stream.id(), - eof, - pseudo, - headers - ); - let mut req = if let Some(pl) = payload { - Request::with_payload(crate::http::Payload::H2(pl)) - } else { - Request::new() - }; - - let path = pseudo.path.ok_or(H2Error::MissingPseudo("Path"))?; - let method = pseudo.method.ok_or(H2Error::MissingPseudo("Method"))?; - - let head = req.head_mut(); - head.uri = if let Some(ref authority) = pseudo.authority { - let scheme = pseudo.scheme.ok_or(H2Error::MissingPseudo("Scheme"))?; - Uri::try_from(format!("{}://{}{}", scheme, authority, path))? - } else { - Uri::try_from(path.as_str())? - }; - let is_head_req = method == Method::HEAD; - head.version = Version::HTTP_2; - head.method = method; - head.headers = headers; - head.io = CurrentIo::Ref(io); - - let (mut res, mut body) = match cfg.service.call(req).await { - Ok(res) => res.into().into_parts(), - Err(err) => { - let (res, body) = Response::from(&err).into_parts(); - (res, body.into_body()) - } - }; - - let head = res.head_mut(); - let mut size = body.size(); - prepare_response(&cfg.timer, head, &mut size); - - log::debug!("Received service response: {:?} payload: {:?}", head, size); - - let hdrs = mem::replace(&mut head.headers, HeaderMap::new()); - if size.is_eof() || is_head_req { - stream.send_response(head.status, hdrs, true)?; - } else { - stream.send_response(head.status, hdrs, false)?; - - loop { - match poll_fn(|cx| body.poll_next_chunk(cx)).await { - None => { - log::debug!("{:?} closing payload stream", stream.id()); - stream.send_payload(Bytes::new(), true).await?; - break; - } - Some(Ok(chunk)) => { - log::debug!( - "{:?} sending data chunk {:?} bytes", - stream.id(), - chunk.len() - ); - if !chunk.is_empty() { - stream.send_payload(chunk, false).await?; - } - } - Some(Err(e)) => { - error!("Response payload stream error: {:?}", e); - return Err(e.into()); + log::trace!( + "{:?} got request (eof: {}): {:#?}\nheaders: {:#?}", + stream.id(), + eof, + pseudo, + headers + ); + let mut req = if let Some(pl) = payload { + Request::with_payload(crate::http::Payload::H2(pl)) + } else { + Request::new() + }; + + let path = pseudo.path.ok_or(H2Error::MissingPseudo("Path"))?; + let method = pseudo.method.ok_or(H2Error::MissingPseudo("Method"))?; + + let head = req.head_mut(); + head.uri = if let Some(ref authority) = pseudo.authority { + let scheme = pseudo.scheme.ok_or(H2Error::MissingPseudo("Scheme"))?; + Uri::try_from(format!("{}://{}{}", scheme, authority, path))? + } else { + Uri::try_from(path.as_str())? + }; + let is_head_req = method == Method::HEAD; + head.version = Version::HTTP_2; + head.method = method; + head.headers = headers; + head.io = CurrentIo::Ref(io); + + let (mut res, mut body) = match cfg.service.call(req).await { + Ok(res) => res.into().into_parts(), + Err(err) => { + let (res, body) = Response::from(&err).into_parts(); + (res, body.into_body()) + } + }; + + let head = res.head_mut(); + let mut size = body.size(); + prepare_response(&cfg.timer, head, &mut size); + + log::debug!("Received service response: {:?} payload: {:?}", head, size); + + let hdrs = mem::replace(&mut head.headers, HeaderMap::new()); + if size.is_eof() || is_head_req { + stream.send_response(head.status, hdrs, true)?; + } else { + stream.send_response(head.status, hdrs, false)?; + + loop { + match poll_fn(|cx| body.poll_next_chunk(cx)).await { + None => { + log::debug!("{:?} closing payload stream", stream.id()); + stream.send_payload(Bytes::new(), true).await?; + break; + } + Some(Ok(chunk)) => { + log::debug!( + "{:?} sending data chunk {:?} bytes", + stream.id(), + chunk.len() + ); + if !chunk.is_empty() { + stream.send_payload(chunk, false).await?; } } + Some(Err(e)) => { + error!("Response payload stream error: {:?}", e); + return Err(e.into()); + } } } - Ok(()) - })) + } + Ok(()) } } diff --git a/ntex/src/http/payload.rs b/ntex/src/http/payload.rs index 2fd38adb4..74b082d8b 100644 --- a/ntex/src/http/payload.rs +++ b/ntex/src/http/payload.rs @@ -1,7 +1,7 @@ -use std::{fmt, mem, pin::Pin, task::Context, task::Poll}; +use std::{fmt, future::poll_fn, mem, pin::Pin, task::Context, task::Poll}; use super::{error::PayloadError, h1, h2}; -use crate::util::{poll_fn, Bytes, Stream}; +use crate::util::{Bytes, Stream}; /// Type represent boxed payload pub type PayloadStream = Pin>>>; diff --git a/ntex/src/http/service.rs b/ntex/src/http/service.rs index 44539092e..7b12f858d 100644 --- a/ntex/src/http/service.rs +++ b/ntex/src/http/service.rs @@ -1,9 +1,7 @@ -use std::task::{Context, Poll}; -use std::{cell, error, fmt, future, marker, pin::Pin, rc::Rc}; +use std::{cell, error, fmt, marker, rc::Rc, task::Context, task::Poll}; use crate::io::{types, Filter, Io}; use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; -use crate::util::BoxFuture; use super::body::MessageBody; use super::builder::HttpServiceBuilder; @@ -175,10 +173,9 @@ mod openssl { > { Acceptor::new(acceptor) .timeout(self.cfg.ssl_handshake_timeout) - .chain() .map_err(SslError::Ssl) .map_init_err(|_| panic!()) - .and_then(self.chain().map_err(SslError::Service)) + .and_then(self.map_err(SslError::Service)) } } } @@ -222,10 +219,9 @@ mod rustls { Acceptor::from(config) .timeout(self.cfg.ssl_handshake_timeout) - .chain() .map_err(|e| SslError::Ssl(Box::new(e))) .map_init_err(|_| panic!()) - .and_then(self.chain().map_err(SslError::Service)) + .and_then(self.map_err(SslError::Service)) } } } @@ -249,39 +245,36 @@ where type Error = DispatchError; type InitError = (); type Service = HttpServiceHandler; - type Future<'f> = BoxFuture<'f, Result>; - fn create(&self, _: ()) -> Self::Future<'_> { + async fn create(&self, _: ()) -> Result { let fut = self.srv.create(()); let fut_ex = self.expect.create(()); let fut_upg = self.upgrade.as_ref().map(|f| f.create(())); let on_request = self.on_request.borrow_mut().take(); let cfg = self.cfg.clone(); - Box::pin(async move { - let service = fut - .await - .map_err(|e| log::error!("Init http service error: {:?}", e))?; - - let expect = fut_ex - .await - .map_err(|e| log::error!("Init http service error: {:?}", e))?; - - let upgrade = if let Some(fut) = fut_upg { - Some( - fut.await - .map_err(|e| log::error!("Init http service error: {:?}", e))?, - ) - } else { - None - }; - - let config = DispatcherConfig::new(cfg, service, expect, upgrade, on_request); - - Ok(HttpServiceHandler { - config: Rc::new(config), - _t: marker::PhantomData, - }) + let service = fut + .await + .map_err(|e| log::error!("Init http service error: {:?}", e))?; + + let expect = fut_ex + .await + .map_err(|e| log::error!("Init http service error: {:?}", e))?; + + let upgrade = if let Some(fut) = fut_upg { + Some( + fut.await + .map_err(|e| log::error!("Init http service error: {:?}", e))?, + ) + } else { + None + }; + + let config = DispatcherConfig::new(cfg, service, expect, upgrade, on_request); + + Ok(HttpServiceHandler { + config: Rc::new(config), + _t: marker::PhantomData, }) } } @@ -306,7 +299,6 @@ where { type Response = (); type Error = DispatchError; - type Future<'f> = HttpServiceHandlerResponse; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { let cfg = self.config.as_ref(); @@ -365,96 +357,20 @@ where } } - fn call<'a>(&'a self, io: Io, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { + async fn call( + &self, + io: Io, + _: ServiceCtx<'_, Self>, + ) -> Result { log::trace!( "New http connection, peer address {:?}", io.query::().get() ); if io.query::().get() == Some(types::HttpProtocol::Http2) { - HttpServiceHandlerResponse { - state: ResponseState::H2 { - fut: Box::pin(h2::handle(io.into(), self.config.clone())), - }, - } + h2::handle(io.into(), self.config.clone()).await } else { - HttpServiceHandlerResponse { - state: ResponseState::H1 { - fut: h1::Dispatcher::new(io, self.config.clone()), - }, - } - } - } -} - -pin_project_lite::pin_project! { - pub struct HttpServiceHandlerResponse - where - F: Filter, - S: Service, - S: 'static, - S::Error: ResponseError, - S::Response: Into>, - B: MessageBody, - X: Service, - X: 'static, - X::Error: ResponseError, - X::Error: 'static, - U: Service<(Request, Io, h1::Codec), Response = ()>, - U: 'static, - U::Error: fmt::Display, - U::Error: error::Error, - U: 'static, - { - #[pin] - state: ResponseState, - } -} - -pin_project_lite::pin_project! { - #[project = StateProject] - enum ResponseState - where - F: Filter, - S: Service, - S: 'static, - S::Error: ResponseError, - B: MessageBody, - X: Service, - X: 'static, - X::Error: ResponseError, - X::Error: 'static, - U: Service<(Request, Io, h1::Codec), Response = ()>, - U: 'static, - U::Error: fmt::Display, - U::Error: error::Error, - U: 'static, - { - H1 { #[pin] fut: h1::Dispatcher }, - H2 { fut: BoxFuture<'static, Result<(), DispatchError>> }, - } -} - -impl future::Future for HttpServiceHandlerResponse -where - F: Filter, - S: Service + 'static, - S::Error: ResponseError, - S::Response: Into>, - B: MessageBody, - X: Service + 'static, - X::Error: ResponseError, - U: Service<(Request, Io, h1::Codec), Response = ()> + 'static, - U::Error: fmt::Display + error::Error, -{ - type Output = Result<(), DispatchError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - match this.state.project() { - StateProject::H1 { fut } => fut.poll(cx), - StateProject::H2 { ref mut fut } => Pin::new(fut).poll(cx), + h1::Dispatcher::new(io, self.config.clone()).await } } } diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index a6aa6ced5..ab77abde4 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -39,7 +39,7 @@ pub mod ws; pub use self::service::{ chain, chain_factory, fn_service, into_service, IntoService, IntoServiceFactory, - Middleware, Pipeline, Service, ServiceCall, ServiceCtx, ServiceFactory, + Middleware, Pipeline, Service, ServiceCtx, ServiceFactory, }; pub use ntex_util::{channel, task}; diff --git a/ntex/src/server/builder.rs b/ntex/src/server/builder.rs index cb23ee117..f93e310a0 100644 --- a/ntex/src/server/builder.rs +++ b/ntex/src/server/builder.rs @@ -323,7 +323,7 @@ impl ServerBuilder { pub fn set_tag>(mut self, name: N, tag: &'static str) -> Self { let mut token = None; for sock in &self.sockets { - if &sock.1 == name.as_ref() { + if sock.1 == name.as_ref() { token = Some(sock.0); break; } diff --git a/ntex/src/server/config.rs b/ntex/src/server/config.rs index fb3dead48..c68f4fb60 100644 --- a/ntex/src/server/config.rs +++ b/ntex/src/server/config.rs @@ -397,20 +397,17 @@ where type Error = (); type InitError = (); type Service = BoxedServerService; - type Future<'f> = BoxFuture<'f, Result> where Self: 'f; - fn create(&self, _: ()) -> Self::Future<'_> { + async fn create(&self, _: ()) -> Result { let tag = self.tag; let pool = self.pool; - let fut = self.inner.create(()); - Box::pin(async move { - match fut.await { - Ok(s) => Ok(boxed::service(StreamService::new(s, tag, pool))), - Err(e) => { - error!("Cannot construct service: {:?}", e); - Err(()) - } + + match self.inner.create(()).await { + Ok(s) => Ok(boxed::service(StreamService::new(s, tag, pool))), + Err(e) => { + error!("Cannot construct service: {:?}", e); + Err(()) } - }) + } } } diff --git a/ntex/src/server/service.rs b/ntex/src/server/service.rs index 248dffdab..882b49932 100644 --- a/ntex/src/server/service.rs +++ b/ntex/src/server/service.rs @@ -62,7 +62,6 @@ where { type Response = (); type Error = (); - type Future<'f> = BoxFuture<'f, Result<(), ()>> where T: 'f; crate::forward_poll_shutdown!(service); @@ -77,32 +76,30 @@ where } } - fn call<'a>( - &'a self, + async fn call( + &self, (guard, req): (Option, ServerMessage), - ctx: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { - Box::pin(async move { - match req { - ServerMessage::Connect(stream) => { - let stream = stream.try_into().map_err(|e| { - error!("Cannot convert to an async io stream: {}", e); - }); - - if let Ok(stream) = stream { - let stream: Io<_> = stream; - stream.set_tag(self.tag); - stream.set_memory_pool(self.pool_ref); - let _ = ctx.call(self.service.as_ref(), stream).await; - drop(guard); - Ok(()) - } else { - Err(()) - } + ctx: ServiceCtx<'_, Self>, + ) -> Result<(), ()> { + match req { + ServerMessage::Connect(stream) => { + let stream = stream.try_into().map_err(|e| { + error!("Cannot convert to an async io stream: {}", e); + }); + + if let Ok(stream) = stream { + let stream: Io<_> = stream; + stream.set_tag(self.tag); + stream.set_memory_pool(self.pool_ref); + let _ = ctx.call(self.service.as_ref(), stream).await; + drop(guard); + Ok(()) + } else { + Err(()) } - _ => Ok(()), } - }) + _ => Ok(()), + } } } diff --git a/ntex/src/server/worker.rs b/ntex/src/server/worker.rs index 8a56407ec..64476f08c 100644 --- a/ntex/src/server/worker.rs +++ b/ntex/src/server/worker.rs @@ -515,7 +515,7 @@ mod tests { use crate::io::Io; use crate::server::service::Factory; use crate::service::{Service, ServiceCtx, ServiceFactory}; - use crate::util::{lazy, Ready}; + use crate::util::lazy; #[derive(Clone, Copy, Debug)] enum St { @@ -535,12 +535,11 @@ mod tests { type Error = (); type Service = Srv; type InitError = (); - type Future<'f> = Ready; - fn create(&self, _: ()) -> Self::Future<'_> { + async fn create(&self, _: ()) -> Result { let mut cnt = self.counter.lock().unwrap(); *cnt += 1; - Ready::Ok(Srv { + Ok(Srv { st: self.st.clone(), }) } @@ -553,7 +552,6 @@ mod tests { impl Service for Srv { type Response = (); type Error = (); - type Future<'f> = Ready<(), ()>; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { let st: St = { *self.st.lock().unwrap() }; @@ -574,8 +572,8 @@ mod tests { } } - fn call<'a>(&'a self, _: Io, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - Ready::Ok(()) + async fn call(&self, _: Io, _: ServiceCtx<'_, Self>) -> Result<(), ()> { + Ok(()) } } diff --git a/ntex/src/web/app.rs b/ntex/src/web/app.rs index 580a7e102..49f11d6e2 100644 --- a/ntex/src/web/app.rs +++ b/ntex/src/web/app.rs @@ -7,7 +7,7 @@ use crate::service::{ chain_factory, dev::ServiceChainFactory, map_config, IntoServiceFactory, }; use crate::service::{Identity, Middleware, Service, ServiceCtx, ServiceFactory, Stack}; -use crate::util::{BoxFuture, Extensions, Ready}; +use crate::util::{BoxFuture, Extensions}; use super::app_service::{AppFactory, AppService}; use super::config::{AppConfig, ServiceConfig}; @@ -269,9 +269,9 @@ where U::InitError: fmt::Debug, { // create and configure default resource - self.default = Some(Rc::new(boxed::factory(f.chain().map_init_err(|e| { - log::error!("Cannot construct default service: {:?}", e) - })))); + self.default = Some(Rc::new(boxed::factory(chain_factory(f).map_init_err( + |e| log::error!("Cannot construct default service: {:?}", e), + )))); self } @@ -569,26 +569,22 @@ impl ServiceFactory> for Filter { type Error = Err::Container; type InitError = (); type Service = Filter; - type Future<'f> = Ready, ()>; - #[inline] - fn create(&self, _: ()) -> Self::Future<'_> { - Ready::Ok(Filter(PhantomData)) + async fn create(&self, _: ()) -> Result { + Ok(Filter(PhantomData)) } } impl Service> for Filter { type Response = WebRequest; type Error = Err::Container; - type Future<'f> = Ready, Err::Container>; - #[inline] - fn call<'a>( - &'a self, + async fn call( + &self, req: WebRequest, - _: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { - Ready::Ok(req) + _: ServiceCtx<'_, Self>, + ) -> Result, Err::Container> { + Ok(req) } } diff --git a/ntex/src/web/app_service.rs b/ntex/src/web/app_service.rs index 2b65b637c..c94a010a8 100644 --- a/ntex/src/web/app_service.rs +++ b/ntex/src/web/app_service.rs @@ -1,14 +1,11 @@ -use std::task::{Context, Poll}; -use std::{cell::RefCell, future::Future, marker::PhantomData, pin::Pin, rc::Rc}; +use std::{cell::RefCell, marker::PhantomData, rc::Rc, task::Context, task::Poll}; use crate::http::{Request, Response}; use crate::router::{Path, ResourceDef, Router}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::dev::ServiceChainFactory; -use crate::service::{ - fn_service, Middleware, Service, ServiceCall, ServiceCtx, ServiceFactory, -}; -use crate::util::{BoxFuture, Either, Extensions}; +use crate::service::{fn_service, Middleware, Service, ServiceCtx, ServiceFactory}; +use crate::util::{BoxFuture, Extensions}; use super::config::AppConfig; use super::error::ErrorRenderer; @@ -24,8 +21,6 @@ type HttpService = BoxService, WebResponse, Err::Container>; type HttpNewService = BoxServiceFactory<(), WebRequest, WebResponse, Err::Container, ()>; -type BoxResponse<'a, Err: ErrorRenderer> = - ServiceCall<'a, HttpService, WebRequest>; type FnStateFactory = Box BoxFuture<'static, Result>>; /// Service factory to convert `Request` to a `WebRequest`. @@ -66,10 +61,9 @@ where type Error = Err::Container; type InitError = (); type Service = AppFactoryService; - type Future<'f> = BoxFuture<'f, Result> where Self: 'f; - fn create(&self, _: ()) -> Self::Future<'_> { - ServiceFactory::create(self, AppConfig::default()) + async fn create(&self, _: ()) -> Result { + ServiceFactory::create(self, AppConfig::default()).await } } @@ -89,9 +83,8 @@ where type Error = Err::Container; type InitError = (); type Service = AppFactoryService; - type Future<'f> = BoxFuture<'f, Result> where Self: 'f; - fn create(&self, config: AppConfig) -> Self::Future<'_> { + async fn create(&self, config: AppConfig) -> Result { let services = std::mem::take(&mut *self.services.borrow_mut()); // update resource default service @@ -114,65 +107,63 @@ where router.case_insensitive(); } - Box::pin(async move { - // app state factories - for fut in state_factories.iter() { - extensions = fut(extensions).await?; - } - let state = AppState::new(extensions, None, config.clone()); - - // App config - let mut config = WebServiceConfig::new(state.clone(), default.clone()); + // app state factories + for fut in state_factories.iter() { + extensions = fut(extensions).await?; + } + let state = AppState::new(extensions, None, config.clone()); - // register services - services - .into_iter() - .for_each(|mut srv| srv.register(&mut config)); - let services = config.into_services(); + // App config + let mut config = WebServiceConfig::new(state.clone(), default.clone()); - // resource map - let mut rmap = ResourceMap::new(ResourceDef::new("")); - for mut rdef in external { - rmap.add(&mut rdef, None); - } + // register services + services + .into_iter() + .for_each(|mut srv| srv.register(&mut config)); + let services = config.into_services(); - // complete pipeline creation - let services: Vec<_> = services - .into_iter() - .map(|(mut rdef, srv, guards, nested)| { - rmap.add(&mut rdef, nested); - (rdef, srv, RefCell::new(guards)) - }) - .collect(); - - // complete ResourceMap tree creation - let rmap = Rc::new(rmap); - rmap.finish(rmap.clone()); - - // create http services - for (path, factory, guards) in &mut services.iter() { - let service = factory.create(()).await?; - router.rdef(path.clone(), service).2 = guards.borrow_mut().take(); - } + // resource map + let mut rmap = ResourceMap::new(ResourceDef::new("")); + for mut rdef in external { + rmap.add(&mut rdef, None); + } - let routing = AppRouting { - router: router.finish(), - default: Some(default.create(()).await?), - }; - - // main service - let service = AppService { - routing, - filter: filter_fut.await?, - }; - - Ok(AppFactoryService { - rmap, - state, - service: middleware.create(service), - pool: HttpRequestPool::create(), - _t: PhantomData, + // complete pipeline creation + let services: Vec<_> = services + .into_iter() + .map(|(mut rdef, srv, guards, nested)| { + rmap.add(&mut rdef, nested); + (rdef, srv, RefCell::new(guards)) }) + .collect(); + + // complete ResourceMap tree creation + let rmap = Rc::new(rmap); + rmap.finish(rmap.clone()); + + // create http services + for (path, factory, guards) in &mut services.iter() { + let service = factory.create(()).await?; + router.rdef(path.clone(), service).2 = guards.borrow_mut().take(); + } + + let routing = AppRouting { + router: router.finish(), + default: Some(default.create(()).await?), + }; + + // main service + let service = AppService { + routing, + filter: filter_fut.await?, + }; + + Ok(AppFactoryService { + rmap, + state, + service: middleware.create(service), + pool: HttpRequestPool::create(), + _t: PhantomData, }) } } @@ -197,12 +188,15 @@ where { type Response = WebResponse; type Error = T::Error; - type Future<'f> = ServiceCall<'f, T, WebRequest> where T: 'f; crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); - fn call<'a>(&'a self, req: Request, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { + async fn call( + &self, + req: Request, + ctx: ServiceCtx<'_, Self>, + ) -> Result { let (head, payload) = req.into_parts(); let req = if let Some(mut req) = self.pool.get_request() { @@ -222,7 +216,7 @@ where self.pool, ) }; - ctx.call(&self.service, WebRequest::new(req)) + ctx.call(&self.service, WebRequest::new(req)).await } } @@ -244,14 +238,12 @@ struct AppRouting { impl Service> for AppRouting { type Response = WebResponse; type Error = Err::Container; - type Future<'f> = - Either, BoxFuture<'f, Result>>; - fn call<'a>( - &'a self, + async fn call( + &self, mut req: WebRequest, - ctx: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { + ctx: ServiceCtx<'_, Self>, + ) -> Result { let res = self.router.recognize_checked(&mut req, |req, guards| { if let Some(guards) = guards { for f in guards { @@ -264,14 +256,12 @@ impl Service> for AppRouting { }); if let Some((srv, _info)) = res { - Either::Left(ctx.call(srv, req)) + ctx.call(srv, req).await } else if let Some(ref default) = self.default { - Either::Left(ctx.call(default, req)) + ctx.call(default, req).await } else { let req = req.into_parts().0; - Either::Right(Box::pin(async { - Ok(WebResponse::new(Response::NotFound().finish(), req)) - })) + Ok(WebResponse::new(Response::NotFound().finish(), req)) } } } @@ -289,7 +279,6 @@ where { type Response = WebResponse; type Error = Err::Container; - type Future<'f> = AppServiceResponse<'f, F, Err> where F: 'f; #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { @@ -302,58 +291,13 @@ where } } - fn call<'a>( - &'a self, + async fn call( + &self, req: WebRequest, - ctx: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { - AppServiceResponse { - filter: ctx.call(&self.filter, req), - routing: &self.routing, - endpoint: None, - ctx, - } - } -} - -type BoxAppServiceResponse<'a, Err: ErrorRenderer> = - ServiceCall<'a, AppRouting, WebRequest>; - -pin_project_lite::pin_project! { - pub struct AppServiceResponse<'f, F: Service>, Err: ErrorRenderer> - where F: 'f - { - #[pin] - filter: ServiceCall<'f, F, WebRequest>, - routing: &'f AppRouting, - endpoint: Option>, - ctx: ServiceCtx<'f, AppService>, - } -} - -impl<'f, F, Err> Future for AppServiceResponse<'f, F, Err> -where - F: Service, Response = WebRequest, Error = Err::Container>, - Err: ErrorRenderer, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - loop { - if let Some(fut) = this.endpoint.as_mut() { - return Pin::new(fut).poll(cx); - } else { - let res = if let Poll::Ready(res) = this.filter.poll(cx) { - res? - } else { - return Poll::Pending; - }; - *this.endpoint = Some(this.ctx.call(this.routing, res)); - this = self.as_mut().project(); - } - } + ctx: ServiceCtx<'_, Self>, + ) -> Result { + let req = ctx.call(&self.filter, req).await?; + ctx.call(&self.routing, req).await } } diff --git a/ntex/src/web/error.rs b/ntex/src/web/error.rs index d1a6dd523..1080a44d6 100644 --- a/ntex/src/web/error.rs +++ b/ntex/src/web/error.rs @@ -84,10 +84,6 @@ pub enum StateExtractorError { NotConfigured, } -#[deprecated] -#[doc(hidden)] -pub type DataExtractorError = StateExtractorError; - /// Errors which can occur when attempting to generate resource uri. #[derive(Error, Debug, Copy, Clone, PartialEq, Eq)] pub enum UrlGenerationError { diff --git a/ntex/src/web/middleware/compress.rs b/ntex/src/web/middleware/compress.rs index d8dc137cf..0072021f5 100644 --- a/ntex/src/web/middleware/compress.rs +++ b/ntex/src/web/middleware/compress.rs @@ -1,10 +1,9 @@ //! `Middleware` for compressing response body. -use std::task::{Context, Poll}; -use std::{cmp, future::Future, marker, pin::Pin, str::FromStr}; +use std::{cmp, str::FromStr}; use crate::http::encoding::Encoder; use crate::http::header::{ContentEncoding, ACCEPT_ENCODING}; -use crate::service::{Middleware, Service, ServiceCall, ServiceCtx}; +use crate::service::{Middleware, Service, ServiceCtx}; use crate::web::{BodyEncoding, ErrorRenderer, WebRequest, WebResponse}; #[derive(Debug, Clone)] @@ -67,16 +66,15 @@ where { type Response = WebResponse; type Error = S::Error; - type Future<'f> = CompressResponse<'f, S, E> where S: 'f; crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); - fn call<'a>( - &'a self, + async fn call( + &self, req: WebRequest, - ctx: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { + ctx: ServiceCtx<'_, Self>, + ) -> Result { // negotiate content-encoding let encoding = if let Some(val) = req.headers().get(&ACCEPT_ENCODING) { if let Ok(enc) = val.to_str() { @@ -88,50 +86,15 @@ where ContentEncoding::Identity }; - CompressResponse { - encoding, - fut: ctx.call(&self.service, req), - _t: marker::PhantomData, - } - } -} + let resp = ctx.call(&self.service, req).await?; -pin_project_lite::pin_project! { - #[doc(hidden)] - pub struct CompressResponse<'f, S: Service>, E> - where S: 'f, E: 'f - { - #[pin] - fut: ServiceCall<'f, S, WebRequest>, - encoding: ContentEncoding, - _t: marker::PhantomData, - } -} + let enc = if let Some(enc) = resp.response().get_encoding() { + enc + } else { + encoding + }; -impl<'f, S, E> Future for CompressResponse<'f, S, E> -where - S: Service, Response = WebResponse>, - E: ErrorRenderer, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - match this.fut.poll(cx)? { - Poll::Ready(resp) => { - let enc = if let Some(enc) = resp.response().get_encoding() { - enc - } else { - *this.encoding - }; - - Poll::Ready(Ok( - resp.map_body(move |head, body| Encoder::response(enc, head, body)) - )) - } - Poll::Pending => Poll::Pending, - } + Ok(resp.map_body(move |head, body| Encoder::response(enc, head, body))) } } diff --git a/ntex/src/web/middleware/defaultheaders.rs b/ntex/src/web/middleware/defaultheaders.rs index f8d054546..eebe0ec77 100644 --- a/ntex/src/web/middleware/defaultheaders.rs +++ b/ntex/src/web/middleware/defaultheaders.rs @@ -4,7 +4,6 @@ use std::rc::Rc; use crate::http::error::HttpError; use crate::http::header::{HeaderMap, HeaderName, HeaderValue, CONTENT_TYPE}; use crate::service::{Middleware, Service, ServiceCtx}; -use crate::util::BoxFuture; use crate::web::{WebRequest, WebResponse}; /// `Middleware` for setting default response headers. @@ -111,35 +110,31 @@ where { type Response = WebResponse; type Error = S::Error; - type Future<'f> = - BoxFuture<'f, Result> where S: 'f, E: 'f; crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); - fn call<'a>( - &'a self, + async fn call( + &self, req: WebRequest, - ctx: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { - Box::pin(async move { - let mut res = ctx.call(&self.service, req).await?; - - // set response headers - for (key, value) in self.inner.headers.iter() { - if !res.headers().contains_key(key) { - res.headers_mut().insert(key.clone(), value.clone()); - } - } - // default content-type - if self.inner.ct && !res.headers().contains_key(&CONTENT_TYPE) { - res.headers_mut().insert( - CONTENT_TYPE, - HeaderValue::from_static("application/octet-stream"), - ); + ctx: ServiceCtx<'_, Self>, + ) -> Result { + let mut res = ctx.call(&self.service, req).await?; + + // set response headers + for (key, value) in self.inner.headers.iter() { + if !res.headers().contains_key(key) { + res.headers_mut().insert(key.clone(), value.clone()); } - Ok(res) - }) + } + // default content-type + if self.inner.ct && !res.headers().contains_key(&CONTENT_TYPE) { + res.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_static("application/octet-stream"), + ); + } + Ok(res) } } diff --git a/ntex/src/web/middleware/logger.rs b/ntex/src/web/middleware/logger.rs index 51a45a62b..3880f6e0c 100644 --- a/ntex/src/web/middleware/logger.rs +++ b/ntex/src/web/middleware/logger.rs @@ -1,14 +1,13 @@ //! Request logging middleware -use std::task::{ready, Context, Poll}; -use std::{env, error::Error, future::Future}; -use std::{fmt, fmt::Display, marker::PhantomData, pin::Pin, rc::Rc, time}; +use std::task::{Context, Poll}; +use std::{env, error::Error, fmt, fmt::Display, rc::Rc, time}; use regex::Regex; use crate::http::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::http::header::HeaderName; -use crate::service::{Middleware, Service, ServiceCall, ServiceCtx}; -use crate::util::{Bytes, Either, HashSet}; +use crate::service::{Middleware, Service, ServiceCtx}; +use crate::util::{Bytes, HashSet}; use crate::web::{HttpResponse, WebRequest, WebResponse}; /// `Middleware` for logging request and response info to the terminal. @@ -139,19 +138,17 @@ where { type Response = WebResponse; type Error = S::Error; - type Future<'f> = Either, ServiceCall<'f, S, WebRequest>> where S: 'f, E: 'f; crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); - #[inline] - fn call<'a>( - &'a self, + async fn call( + &self, req: WebRequest, - ctx: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { + ctx: ServiceCtx<'_, Self>, + ) -> Result { if self.inner.exclude.contains(req.path()) { - Either::Right(ctx.call(&self.service, req)) + ctx.call(&self.service, req).await } else { let time = time::SystemTime::now(); let mut format = self.inner.format.clone(); @@ -159,56 +156,21 @@ where for unit in &mut format.0 { unit.render_request(time, &req); } - Either::Left(LoggerResponse { - time, - format: Some(format), - fut: ctx.call(&self.service, req), - _t: PhantomData, - }) - } - } -} -pin_project_lite::pin_project! { - #[doc(hidden)] - pub struct LoggerResponse<'f, S: Service>, E> - where S: 'f, E: 'f - { - #[pin] - fut: ServiceCall<'f, S, WebRequest>, - time: time::SystemTime, - format: Option, - _t: PhantomData - } -} - -impl<'f, S, E> Future for LoggerResponse<'f, S, E> -where - S: Service, Response = WebResponse>, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - let res = ready!(this.fut.poll(cx)?); - if let Some(ref mut format) = this.format { + let res = ctx.call(&self.service, req).await?; for unit in &mut format.0 { unit.render_response(res.response()); } - } - - let time = *this.time; - let format = this.format.take(); - Poll::Ready(Ok(res.map_body(move |_, body| { - ResponseBody::Other(Body::from_message(StreamLog { - body, - time, - format, - size: 0, + Ok(res.map_body(move |_, body| { + ResponseBody::Other(Body::from_message(StreamLog { + body, + time, + format: Some(format), + size: 0, + })) })) - }))) + } } } diff --git a/ntex/src/web/resource.rs b/ntex/src/web/resource.rs index d2131b7ab..51b295fa9 100644 --- a/ntex/src/web/resource.rs +++ b/ntex/src/web/resource.rs @@ -4,11 +4,11 @@ use crate::http::Response; use crate::router::{IntoPattern, ResourceDef}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::dev::{AndThen, ServiceChain, ServiceChainFactory}; -use crate::service::{chain_factory, ServiceCtx}; +use crate::service::{chain, chain_factory, ServiceCtx}; use crate::service::{ - Identity, IntoServiceFactory, Middleware, Service, ServiceCall, ServiceFactory, Stack, + Identity, IntoServiceFactory, Middleware, Service, ServiceFactory, Stack, }; -use crate::util::{BoxFuture, Either, Extensions, Ready}; +use crate::util::Extensions; use super::dev::{insert_slash, WebServiceConfig, WebServiceFactory}; use super::extract::FromRequest; @@ -24,8 +24,6 @@ type HttpNewService = BoxServiceFactory<(), WebRequest, WebResponse, Err::Container, ()>; type ResourcePipeline = ServiceChain>, WebRequest>; -type BoxResponse<'a, Err: ErrorRenderer> = - ServiceCall<'a, HttpService, WebRequest>; /// *Resource* is an entry in resources table which corresponds to requested URL. /// @@ -302,7 +300,7 @@ where { // create and configure default resource self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory( - f.chain() + chain_factory(f.into_factory()) .map_init_err(|e| log::error!("Cannot construct default service: {:?}", e)), ))))); @@ -420,14 +418,11 @@ where type Error = Err::Container; type Service = M::Service; type InitError = (); - type Future<'f> = BoxFuture<'f, Result>; - fn create(&self, _: ()) -> Self::Future<'_> { - Box::pin(async move { - let filter = self.filter.create(()).await?; - let routing = self.routing.create(()).await?; - Ok(self.middleware.create(filter.chain().and_then(routing))) - }) + async fn create(&self, _: ()) -> Result { + let filter = self.filter.create(()).await?; + let routing = self.routing.create(()).await?; + Ok(self.middleware.create(chain(filter).and_then(routing))) } } @@ -442,27 +437,21 @@ impl ServiceFactory> for ResourceRouterFacto type Error = Err::Container; type InitError = (); type Service = ResourceRouter; - type Future<'f> = BoxFuture<'f, Result>; - - fn create(&self, _: ()) -> Self::Future<'_> { - Box::pin(async move { - let default = if let Some(ref default) = self.default { - Some(default.create(()).await?) - } else { - None - }; - Ok(ResourceRouter { - default, - state: self.state.clone(), - routes: self.routes.iter().map(|route| route.service()).collect(), - }) + + async fn create(&self, _: ()) -> Result { + let default = if let Some(ref default) = self.default { + Some(default.create(()).await?) + } else { + None + }; + Ok(ResourceRouter { + default, + state: self.state.clone(), + routes: self.routes.iter().map(|route| route.service()).collect(), }) } } -type BoxResourceRouterResponse<'a, Err: ErrorRenderer> = - ServiceCall<'a, RouteService, WebRequest>; - pub struct ResourceRouter { state: Option, routes: Vec>, @@ -472,31 +461,27 @@ pub struct ResourceRouter { impl Service> for ResourceRouter { type Response = WebResponse; type Error = Err::Container; - type Future<'f> = Either< - BoxResourceRouterResponse<'f, Err>, - Either, BoxResponse<'f, Err>>, - >; - fn call<'a>( - &'a self, + async fn call( + &self, mut req: WebRequest, - ctx: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { + ctx: ServiceCtx<'_, Self>, + ) -> Result { for route in self.routes.iter() { if route.check(&mut req) { if let Some(ref state) = self.state { req.set_state_container(state.clone()); } - return Either::Left(ctx.call(route, req)); + return ctx.call(route, req).await; } } if let Some(ref default) = self.default { - Either::Right(Either::Right(ctx.call(default, req))) + ctx.call(default, req).await } else { - Either::Right(Either::Left(Ready::Ok(WebResponse::new( + Ok(WebResponse::new( Response::MethodNotAllowed().finish(), req.into_parts().0, - )))) + )) } } } diff --git a/ntex/src/web/route.rs b/ntex/src/web/route.rs index a9248b175..b44aba7fa 100644 --- a/ntex/src/web/route.rs +++ b/ntex/src/web/route.rs @@ -1,6 +1,5 @@ use std::{fmt, mem, rc::Rc}; -use crate::util::{BoxFuture, Ready}; use crate::{http::Method, service::Service, service::ServiceCtx, service::ServiceFactory}; use super::error::ErrorRenderer; @@ -66,10 +65,9 @@ impl ServiceFactory> for Route { type Error = Err::Container; type InitError = (); type Service = RouteService; - type Future<'f> = Ready, ()>; - fn create(&self, _: ()) -> Self::Future<'_> { - Ok(self.service()).into() + async fn create(&self, _: ()) -> Result, ()> { + Ok(self.service()) } } @@ -102,15 +100,13 @@ impl fmt::Debug for RouteService { impl Service> for RouteService { type Response = WebResponse; type Error = Err::Container; - type Future<'f> = BoxFuture<'f, Result>; - #[inline] - fn call<'a>( - &'a self, + async fn call( + &self, req: WebRequest, - _: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { - self.handler.call(req) + _: ServiceCtx<'_, Self>, + ) -> Result { + self.handler.call(req).await } } diff --git a/ntex/src/web/scope.rs b/ntex/src/web/scope.rs index c16b729e8..6ae1a7e02 100644 --- a/ntex/src/web/scope.rs +++ b/ntex/src/web/scope.rs @@ -1,15 +1,11 @@ -use std::{ - cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll, -}; +use std::{cell::RefCell, fmt, rc::Rc, task::Context, task::Poll}; use crate::http::Response; use crate::router::{IntoPattern, ResourceDef, Router}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::{chain_factory, dev::ServiceChainFactory, IntoServiceFactory}; -use crate::service::{ - Identity, Middleware, Service, ServiceCall, ServiceCtx, ServiceFactory, Stack, -}; -use crate::util::{BoxFuture, Either, Extensions, Ready}; +use crate::service::{Identity, Middleware, Service, ServiceCtx, ServiceFactory, Stack}; +use crate::util::Extensions; use super::app::Filter; use super::config::ServiceConfig; @@ -28,8 +24,6 @@ type HttpService = BoxService, WebResponse, Err::Container>; type HttpNewService = BoxServiceFactory<(), WebRequest, WebResponse, Err::Container, ()>; -type BoxResponse<'a, Err: ErrorRenderer> = - ServiceCall<'a, HttpService, WebRequest>; /// Resources scope. /// @@ -288,7 +282,7 @@ where { // create and configure default resource self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory( - f.chain() + chain_factory(f.into_factory()) .map_init_err(|e| log::error!("Cannot construct default service: {:?}", e)), ))))); @@ -468,15 +462,12 @@ where type Error = Err::Container; type Service = M::Service; type InitError = (); - type Future<'f> = BoxFuture<'f, Result>; - - fn create(&self, _: ()) -> Self::Future<'_> { - Box::pin(async move { - Ok(self.middleware.create(ScopeService { - filter: self.filter.create(()).await?, - routing: self.routing.create(()).await?, - })) - }) + + async fn create(&self, _: ()) -> Result { + Ok(self.middleware.create(ScopeService { + filter: self.filter.create(()).await?, + routing: self.routing.create(()).await?, + })) } } @@ -492,7 +483,6 @@ where { type Response = WebResponse; type Error = Err::Container; - type Future<'f> = ScopeServiceResponse<'f, F, Err> where F: 'f; #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { @@ -505,55 +495,13 @@ where } } - fn call<'a>( - &'a self, + async fn call( + &self, req: WebRequest, - ctx: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { - ScopeServiceResponse { - filter: ctx.call(&self.filter, req), - routing: &self.routing, - endpoint: None, - ctx, - } - } -} - -pin_project_lite::pin_project! { - pub struct ScopeServiceResponse<'f, F: Service>, Err: ErrorRenderer> - where F: 'f - { - #[pin] - filter: ServiceCall<'f, F, WebRequest>, - routing: &'f ScopeRouter, - ctx: ServiceCtx<'f, ScopeService>, - endpoint: Option, WebRequest>>, - } -} - -impl<'f, F, Err> Future for ScopeServiceResponse<'f, F, Err> -where - F: Service, Response = WebRequest, Error = Err::Container>, - Err: ErrorRenderer, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - loop { - if let Some(fut) = this.endpoint.as_mut() { - return Pin::new(fut).poll(cx); - } - - let res = if let Poll::Ready(res) = this.filter.poll(cx) { - res? - } else { - return Poll::Pending; - }; - *this.endpoint = Some(this.ctx.call(this.routing, res)); - this = self.as_mut().project(); - } + ctx: ServiceCtx<'_, Self>, + ) -> Result { + let req = ctx.call(&self.filter, req).await?; + ctx.call(&self.routing, req).await } } @@ -569,31 +517,28 @@ impl ServiceFactory> for ScopeRouterFactory< type Error = Err::Container; type InitError = (); type Service = ScopeRouter; - type Future<'f> = BoxFuture<'f, Result>; - - fn create(&self, _: ()) -> Self::Future<'_> { - Box::pin(async move { - // create http services - let mut router = Router::build(); - if self.case_insensitive { - router.case_insensitive(); - } - for (path, factory, guards) in &mut self.services.iter() { - let service = factory.create(()).await?; - router.rdef(path.clone(), service).2 = guards.borrow_mut().take(); - } - let default = if let Some(ref default) = self.default { - Some(default.create(()).await?) - } else { - None - }; - - Ok(ScopeRouter { - default, - router: router.finish(), - state: self.state.clone(), - }) + async fn create(&self, _: ()) -> Result { + // create http services + let mut router = Router::build(); + if self.case_insensitive { + router.case_insensitive(); + } + for (path, factory, guards) in &mut self.services.iter() { + let service = factory.create(()).await?; + router.rdef(path.clone(), service).2 = guards.borrow_mut().take(); + } + + let default = if let Some(ref default) = self.default { + Some(default.create(()).await?) + } else { + None + }; + + Ok(ScopeRouter { + default, + router: router.finish(), + state: self.state.clone(), }) } } @@ -607,13 +552,12 @@ struct ScopeRouter { impl Service> for ScopeRouter { type Response = WebResponse; type Error = Err::Container; - type Future<'f> = Either, Ready>; - fn call<'a>( - &'a self, + async fn call( + &self, mut req: WebRequest, - ctx: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { + ctx: ServiceCtx<'_, Self>, + ) -> Result { let res = self.router.recognize_checked(&mut req, |req, guards| { if let Some(guards) = guards { for f in guards { @@ -629,15 +573,12 @@ impl Service> for ScopeRouter { if let Some(ref state) = self.state { req.set_state_container(state.clone()); } - Either::Left(ctx.call(srv, req)) + ctx.call(srv, req).await } else if let Some(ref default) = self.default { - Either::Left(ctx.call(default, req)) + ctx.call(default, req).await } else { let req = req.into_parts().0; - Either::Right(Ready::Ok(WebResponse::new( - Response::NotFound().finish(), - req, - ))) + Ok(WebResponse::new(Response::NotFound().finish(), req)) } } } diff --git a/ntex/src/web/types/mod.rs b/ntex/src/web/types/mod.rs index 1f27d163d..874227563 100644 --- a/ntex/src/web/types/mod.rs +++ b/ntex/src/web/types/mod.rs @@ -13,7 +13,3 @@ pub use self::path::Path; pub use self::payload::{Payload, PayloadConfig}; pub use self::query::Query; pub use self::state::State; - -#[deprecated] -#[doc(hidden)] -pub type Data = State; diff --git a/ntex/src/web/ws.rs b/ntex/src/web/ws.rs index 27b32406d..d3a3bbfff 100644 --- a/ntex/src/web/ws.rs +++ b/ntex/src/web/ws.rs @@ -5,7 +5,7 @@ pub use crate::ws::{CloseCode, CloseReason, Frame, Message, WsSink}; use crate::http::{body::BodySize, h1, StatusCode}; use crate::service::{ - apply_fn, fn_factory_with_config, IntoServiceFactory, ServiceFactory, + apply_fn, chain_factory, fn_factory_with_config, IntoServiceFactory, ServiceFactory, }; use crate::web::{HttpRequest, HttpResponse}; use crate::ws::{self, error::HandshakeError, error::WsError, handshake}; @@ -19,7 +19,7 @@ where F: IntoServiceFactory, Err: From + From, { - let inner_factory = Rc::new(factory.chain().map_err(WsError::Service)); + let inner_factory = Rc::new(chain_factory(factory).map_err(WsError::Service)); let factory = fn_factory_with_config(move |sink: WsSink| { let factory = inner_factory.clone(); @@ -105,7 +105,7 @@ where // start websockets service dispatcher rt::spawn(async move { - let res = crate::io::Dispatcher::with_config(io, codec, srv, &cfg).await; + let res = crate::io::Dispatcher::new(io, codec, srv, &cfg).await; log::trace!("Ws handler is terminated: {:?}", res); }); diff --git a/ntex/src/ws/client.rs b/ntex/src/ws/client.rs index e36f185f5..b15497545 100644 --- a/ntex/src/ws/client.rs +++ b/ntex/src/ws/client.rs @@ -757,7 +757,7 @@ impl WsConnection { U: IntoService, { let service = apply_fn( - service.into_chain().map_err(WsError::Service), + service.into_service().map_err(WsError::Service), |req, svc| async move { match req { DispatchItem::::Item(item) => svc.call(item).await, @@ -773,7 +773,7 @@ impl WsConnection { }, ); - Dispatcher::with_config(self.io, self.codec, service, &self.config).await + Dispatcher::new(self.io, self.codec, service, &self.config).await } } diff --git a/ntex/tests/http_ws.rs b/ntex/tests/http_ws.rs index 30da57809..26c4eeecf 100644 --- a/ntex/tests/http_ws.rs +++ b/ntex/tests/http_ws.rs @@ -5,8 +5,8 @@ use ntex::http::test::server as test_server; use ntex::http::{body, h1, test, HttpService, Request, Response, StatusCode}; use ntex::io::{DispatchItem, Dispatcher, Io}; use ntex::service::{fn_factory, Service, ServiceCtx}; -use ntex::time::{sleep, Millis, Seconds}; -use ntex::util::{BoxFuture, ByteString, Bytes, Ready}; +use ntex::time::Seconds; +use ntex::util::{ByteString, Bytes, Ready}; use ntex::ws::{self, handshake, handshake_response}; struct WsService(Arc>>); @@ -34,39 +34,28 @@ impl Clone for WsService { impl Service<(Request, Io, h1::Codec)> for WsService { type Response = (); type Error = io::Error; - type Future<'f> = BoxFuture<'f, Result<(), io::Error>>; fn poll_ready(&self, _ctx: &mut Context<'_>) -> Poll> { self.set_polled(); Poll::Ready(Ok(())) } - fn call<'a>( - &'a self, + async fn call( + &self, (req, io, codec): (Request, Io, h1::Codec), - _: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { - let fut = async move { - let res = handshake(req.head()).unwrap().message_body(()); - - io.encode((res, body::BodySize::None).into(), &codec) - .unwrap(); - - let cfg = ntex_io::DispatcherConfig::default(); - cfg.set_keepalive_timeout(Seconds(0)); - - Dispatcher::with_config( - io.seal(), - ws::Codec::new(), - service, - //&Default::default(), - &cfg, - ) + _: ServiceCtx<'_, Self>, + ) -> Result<(), io::Error> { + let res = handshake(req.head()).unwrap().message_body(()); + + io.encode((res, body::BodySize::None).into(), &codec) + .unwrap(); + + let cfg = ntex_io::DispatcherConfig::default(); + cfg.set_keepalive_timeout(Seconds(0)); + + Dispatcher::new(io.seal(), ws::Codec::new(), service, &cfg) .await .map_err(|_| panic!()) - }; - - Box::pin(fut) } } diff --git a/ntex/tests/http_ws_client.rs b/ntex/tests/http_ws_client.rs index 60dd83a7f..e3719902e 100644 --- a/ntex/tests/http_ws_client.rs +++ b/ntex/tests/http_ws_client.rs @@ -40,7 +40,7 @@ async fn test_simple() { .unwrap(); // start websocket service - Dispatcher::with_config( + Dispatcher::new( io.seal(), ws::Codec::default(), ws_service, @@ -96,7 +96,7 @@ async fn test_transport() { .unwrap(); // start websocket service - Dispatcher::with_config( + Dispatcher::new( io.seal(), ws::Codec::default(), ws_service, @@ -133,13 +133,7 @@ async fn test_keepalive_timeout() { // start websocket service let cfg = DispatcherConfig::default(); cfg.set_keepalive_timeout(Seconds::ZERO); - Dispatcher::with_config( - io.seal(), - ws::Codec::default(), - ws_service, - &cfg, - ) - .await + Dispatcher::new(io.seal(), ws::Codec::default(), ws_service, &cfg).await } }) .finish(|_| Ready::Ok::<_, io::Error>(Response::NotFound()))