From f145cbfaccd9f3b251b2f80690ad7c68b26d924b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 15 Jun 2023 15:34:21 +0200 Subject: [PATCH] refactor(ext/fetch): simplify fetch ops (#19494) Addresses feedback from https://github.com/denoland/deno/pull/19412#discussion_r1227912676 --- cli/js/40_testing.js | 4 +- .../run/fetch_response_finalization.js.out | 7 +- ext/fetch/26_fetch.js | 2 +- ext/fetch/lib.rs | 114 +++++++++--------- ext/node/polyfills/http.ts | 13 +- 5 files changed, 61 insertions(+), 79 deletions(-) diff --git a/cli/js/40_testing.js b/cli/js/40_testing.js index ec83ce370fd99c..3058fcee380f88 100644 --- a/cli/js/40_testing.js +++ b/cli/js/40_testing.js @@ -228,7 +228,7 @@ function prettyResourceNames(name) { return ["A fetch request", "started", "finished"]; case "fetchRequestBody": return ["A fetch request body", "created", "closed"]; - case "fetchResponseBody": + case "fetchResponse": return ["A fetch response body", "created", "consumed"]; case "httpClient": return ["An HTTP client", "created", "closed"]; @@ -295,7 +295,7 @@ function resourceCloseHint(name) { return "Await the promise returned from `fetch()` or abort the fetch with an abort signal."; case "fetchRequestBody": return "Terminate the request body `ReadableStream` by closing or erroring it."; - case "fetchResponseBody": + case "fetchResponse": return "Consume or close the response body `ReadableStream`, e.g `await resp.text()` or `await resp.body.cancel()`."; case "httpClient": return "Close the HTTP client by calling `httpClient.close()`."; diff --git a/cli/tests/testdata/run/fetch_response_finalization.js.out b/cli/tests/testdata/run/fetch_response_finalization.js.out index 1a8d7563df8c88..645842a5b34475 100644 --- a/cli/tests/testdata/run/fetch_response_finalization.js.out +++ b/cli/tests/testdata/run/fetch_response_finalization.js.out @@ -1,7 +1,2 @@ -{ - "0": "stdin", - "1": "stdout", - "2": "stderr", - "5": "fetchResponseBody" -} +{ "0": "stdin", "1": "stdout", "2": "stderr", "5": "fetchResponse" } { "0": "stdin", "1": "stdout", "2": "stderr" } diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 0dc06db021905c..5084fab3433df6 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -86,7 +86,7 @@ function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) { * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>} */ function opFetchSend(rid) { - return core.opAsync("op_fetch_send", rid, true); + return core.opAsync("op_fetch_send", rid); } /** diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index ded69b2c42aebd..538b741a6ca218 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -112,7 +112,6 @@ deno_core::extension!(deno_fetch, ops = [ op_fetch, op_fetch_send, - op_fetch_response_into_byte_stream, op_fetch_response_upgrade, op_fetch_custom_client, ], @@ -427,7 +426,6 @@ pub struct FetchResponse { pub async fn op_fetch_send( state: Rc>, rid: ResourceId, - into_byte_stream: bool, ) -> Result { let request = state .borrow_mut() @@ -459,27 +457,10 @@ pub async fn op_fetch_send( (None, None) }; - let response_rid = if !into_byte_stream { - state - .borrow_mut() - .resource_table - .add(FetchResponseResource { - response: res, - size: content_length, - }) - } else { - let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - state - .borrow_mut() - .resource_table - .add(FetchResponseBodyResource { - reader: AsyncRefCell::new(stream.peekable()), - cancel: CancelHandle::default(), - size: content_length, - }) - }; + let response_rid = state + .borrow_mut() + .resource_table + .add(FetchResponseResource::new(res, content_length)); Ok(FetchResponse { status: status.as_u16(), @@ -493,28 +474,6 @@ pub async fn op_fetch_send( }) } -#[op] -pub fn op_fetch_response_into_byte_stream( - state: &mut OpState, - rid: ResourceId, -) -> Result { - let raw_response = state.resource_table.take::(rid)?; - let raw_response = Rc::try_unwrap(raw_response) - .expect("Someone is holding onto FetchResponseResource"); - let stream: BytesStream = - Box::pin(raw_response.response.bytes_stream().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - - let rid = state.resource_table.add(FetchResponseBodyResource { - reader: AsyncRefCell::new(stream.peekable()), - cancel: CancelHandle::default(), - size: raw_response.size, - }); - - Ok(rid) -} - #[op] pub async fn op_fetch_response_upgrade( state: Rc>, @@ -530,7 +489,7 @@ pub async fn op_fetch_response_upgrade( let (read, write) = tokio::io::duplex(1024); let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); - let upgraded = raw_response.response.upgrade().await?; + let upgraded = raw_response.upgrade().await?; { // Stage 3: Pump the data let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); @@ -698,35 +657,72 @@ impl Resource for FetchRequestBodyResource { type BytesStream = Pin> + Unpin>>; +pub enum FetchResponseReader { + Start(Response), + BodyReader(Peekable), +} + +impl Default for FetchResponseReader { + fn default() -> Self { + let stream: BytesStream = Box::pin(deno_core::futures::stream::empty()); + Self::BodyReader(stream.peekable()) + } +} #[derive(Debug)] pub struct FetchResponseResource { - pub response: Response, + pub response_reader: AsyncRefCell, + pub cancel: CancelHandle, pub size: Option, } -impl Resource for FetchResponseResource { - fn name(&self) -> Cow { - "fetchResponse".into() +impl FetchResponseResource { + pub fn new(response: Response, size: Option) -> Self { + Self { + response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)), + cancel: CancelHandle::default(), + size, + } } -} -pub struct FetchResponseBodyResource { - pub reader: AsyncRefCell>, - pub cancel: CancelHandle, - pub size: Option, + pub async fn upgrade(self) -> Result { + let reader = self.response_reader.into_inner(); + match reader { + FetchResponseReader::Start(resp) => Ok(resp.upgrade().await?), + _ => unreachable!(), + } + } } -impl Resource for FetchResponseBodyResource { +impl Resource for FetchResponseResource { fn name(&self) -> Cow { - "fetchResponseBody".into() + "fetchResponse".into() } fn read(self: Rc, limit: usize) -> AsyncResult { Box::pin(async move { - let reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; + let mut reader = + RcRef::map(&self, |r| &r.response_reader).borrow_mut().await; + let body = loop { + match &mut *reader { + FetchResponseReader::BodyReader(reader) => break reader, + FetchResponseReader::Start(_) => {} + } + + match std::mem::take(&mut *reader) { + FetchResponseReader::Start(resp) => { + let stream: BytesStream = Box::pin(resp.bytes_stream().map(|r| { + r.map_err(|err| { + std::io::Error::new(std::io::ErrorKind::Other, err) + }) + })); + *reader = FetchResponseReader::BodyReader(stream.peekable()); + } + FetchResponseReader::BodyReader(_) => unreachable!(), + } + }; let fut = async move { - let mut reader = Pin::new(reader); + let mut reader = Pin::new(body); loop { match reader.as_mut().peek_mut().await { Some(Ok(chunk)) if !chunk.is_empty() => { diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 0529ccca5962e2..a07a2c91d991ac 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -595,13 +595,7 @@ class ClientRequest extends OutgoingMessage { (async () => { try { const [res, _] = await Promise.all([ - core.opAsync( - "op_fetch_send", - this._req.requestRid, - /* false because we want to have access to actual Response, - not the bytes stream of response (because we need to handle upgrades) */ - false, - ), + core.opAsync("op_fetch_send", this._req.requestRid), (async () => { if (this._bodyWriteRid) { try { @@ -700,10 +694,7 @@ class ClientRequest extends OutgoingMessage { this.emit("close"); } else { { - const responseRid = core.ops.op_fetch_response_into_byte_stream( - res.responseRid, - ); - incoming._bodyRid = responseRid; + incoming._bodyRid = res.responseRid; } this.emit("response", incoming); }