From e97e357630f6bd7ea565f182330315d01484b1cc Mon Sep 17 00:00:00 2001 From: Tyson Andre Date: Wed, 5 May 2021 14:56:16 -0400 Subject: [PATCH] fix(node-http-handler): handle NodeHttp2Handler session failure (#2289) Detect errors on the NodeHttp2Handler, immediately destroy connections on unexpected error mode, and reconnect. Prior to this PR, if the server sent the client a GOAWAY frame, the session would not be removed from the connection pool and requests would fail indefinitely. This tries to avoid keeping streams (requests) on the Http2Session (tcp connection) from being stuck in an open state waiting for a gentle close even if there were unexpected protocol or connection errors during the request, assuming http2 errors are rare. (if a server or load balancer or network is misbehaving, close() might get stuck waiting for requests to finish, especially if requests and sessions don't have timeouts?) Co-authored-by: Trivikram Kamat <16024985+trivikr@users.noreply.github.com> --- .../src/node-http2-handler.spec.ts | 109 ++++++++++++++++++ .../src/node-http2-handler.ts | 60 ++++++++-- 2 files changed, 162 insertions(+), 7 deletions(-) diff --git a/packages/node-http-handler/src/node-http2-handler.spec.ts b/packages/node-http-handler/src/node-http2-handler.spec.ts index f55a0a1ed69f2..fb7d6e64047ea 100644 --- a/packages/node-http-handler/src/node-http2-handler.spec.ts +++ b/packages/node-http-handler/src/node-http2-handler.spec.ts @@ -1,4 +1,6 @@ import { HttpRequest } from "@aws-sdk/protocol-http"; +import { rejects } from "assert"; +import { constants, Http2Stream } from "http2"; import { NodeHttp2Handler } from "./node-http2-handler"; import { createMockHttp2Server, createResponseFunction } from "./server.mock"; @@ -100,6 +102,113 @@ describe("NodeHttp2Handler", () => { mockH2Server2.close(); }); + const UNEXPECTEDLY_CLOSED_REGEX = /closed|destroy|cancel|did not get a response/i; + it("handles goaway frames", async () => { + const port3 = port + 2; + const mockH2Server3 = createMockHttp2Server().listen(port3); + let establishedConnections = 0; + let numRequests = 0; + let shouldSendGoAway = true; + + mockH2Server3.on("stream", (request: Http2Stream) => { + // transmit goaway frame without shutting down the connection + // to simulate an unlikely error mode. + numRequests += 1; + if (shouldSendGoAway) { + request.session.goaway(constants.NGHTTP2_PROTOCOL_ERROR); + } + }); + mockH2Server3.on("connection", () => { + establishedConnections += 1; + }); + const req = new HttpRequest({ ...getMockReqOptions(), port: port3 }); + expect(establishedConnections).toBe(0); + expect(numRequests).toBe(0); + await rejects( + nodeH2Handler.handle(req, {}), + UNEXPECTEDLY_CLOSED_REGEX, + "should be rejected promptly due to goaway frame" + ); + expect(establishedConnections).toBe(1); + expect(numRequests).toBe(1); + await rejects( + nodeH2Handler.handle(req, {}), + UNEXPECTEDLY_CLOSED_REGEX, + "should be rejected promptly due to goaway frame" + ); + expect(establishedConnections).toBe(2); + expect(numRequests).toBe(2); + await rejects( + nodeH2Handler.handle(req, {}), + UNEXPECTEDLY_CLOSED_REGEX, + "should be rejected promptly due to goaway frame" + ); + expect(establishedConnections).toBe(3); + expect(numRequests).toBe(3); + + // should be able to recover from goaway after reconnecting to a server + // that doesn't send goaway, and reuse the TCP connection (Http2Session) + shouldSendGoAway = false; + mockH2Server3.on("request", createResponseFunction(mockResponse)); + await nodeH2Handler.handle(req, {}); + const result = await nodeH2Handler.handle(req, {}); + const resultReader = result.response.body; + + // ...and validate that the mocked response is received + const responseBody = await new Promise((resolve) => { + const buffers = []; + resultReader.on("data", (chunk) => buffers.push(chunk)); + resultReader.on("end", () => { + resolve(Buffer.concat(buffers).toString("utf8")); + }); + }); + expect(responseBody).toBe("test"); + expect(establishedConnections).toBe(4); + expect(numRequests).toBe(5); + mockH2Server3.close(); + }); + + it("handles connections destroyed by servers", async () => { + const port3 = port + 2; + const mockH2Server3 = createMockHttp2Server().listen(port3); + let establishedConnections = 0; + let numRequests = 0; + + mockH2Server3.on("stream", (request: Http2Stream) => { + // transmit goaway frame and then shut down the connection. + numRequests += 1; + request.session.destroy(); + }); + mockH2Server3.on("connection", () => { + establishedConnections += 1; + }); + const req = new HttpRequest({ ...getMockReqOptions(), port: port3 }); + expect(establishedConnections).toBe(0); + expect(numRequests).toBe(0); + await rejects( + nodeH2Handler.handle(req, {}), + UNEXPECTEDLY_CLOSED_REGEX, + "should be rejected promptly due to goaway frame or destroyed connection" + ); + expect(establishedConnections).toBe(1); + expect(numRequests).toBe(1); + await rejects( + nodeH2Handler.handle(req, {}), + UNEXPECTEDLY_CLOSED_REGEX, + "should be rejected promptly due to goaway frame or destroyed connection" + ); + expect(establishedConnections).toBe(2); + expect(numRequests).toBe(2); + await rejects( + nodeH2Handler.handle(req, {}), + UNEXPECTEDLY_CLOSED_REGEX, + "should be rejected promptly due to goaway frame or destroyed connection" + ); + expect(establishedConnections).toBe(3); + expect(numRequests).toBe(3); + mockH2Server3.close(); + }); + it("closes and removes session on sessionTimeout", async (done) => { const sessionTimeout = 500; nodeH2Handler = new NodeHttp2Handler({ sessionTimeout }); diff --git a/packages/node-http-handler/src/node-http2-handler.ts b/packages/node-http-handler/src/node-http2-handler.ts index 0aa9ae62b11a2..e0aa8bb4d1ef8 100644 --- a/packages/node-http-handler/src/node-http2-handler.ts +++ b/packages/node-http-handler/src/node-http2-handler.ts @@ -45,7 +45,14 @@ export class NodeHttp2Handler implements HttpHandler { } handle(request: HttpRequest, { abortSignal }: HttpHandlerOptions = {}): Promise<{ response: HttpResponse }> { - return new Promise((resolve, reject) => { + return new Promise((resolve, rejectOriginal) => { + // It's redundant to track fulfilled because promises use the first resolution/rejection + // but avoids generating unnecessary stack traces in the "close" event handler. + let fulfilled = false; + const reject = (err: Error) => { + fulfilled = true; + rejectOriginal(err); + }; // if the request was already aborted, prevent doing extra work if (abortSignal?.aborted) { const abortError = new Error("Request aborted"); @@ -70,13 +77,10 @@ export class NodeHttp2Handler implements HttpHandler { headers: getTransformedHeaders(headers), body: req, }); + fulfilled = true; resolve({ response: httpResponse }); }); - req.on("error", reject); - req.on("frameError", reject); - req.on("aborted", reject); - const requestTimeout = this.requestTimeout; if (requestTimeout) { req.setTimeout(requestTimeout, () => { @@ -96,6 +100,20 @@ export class NodeHttp2Handler implements HttpHandler { }; } + // Set up handlers for errors + req.on("frameError", reject); + req.on("error", reject); + req.on("goaway", reject); + req.on("aborted", reject); + + // The HTTP/2 error code used when closing the stream can be retrieved using the + // http2stream.rstCode property. If the code is any value other than NGHTTP2_NO_ERROR (0), + // an 'error' event will have also been emitted. + req.on("close", () => { + if (!fulfilled) { + reject(new Error("Unexpected error: http2 request did not get a response")); + } + }); writeRequestBody(req, request); }); } @@ -107,14 +125,42 @@ export class NodeHttp2Handler implements HttpHandler { const newSession = connect(authority); connectionPool.set(authority, newSession); + const destroySessionCb = () => { + this.destroySession(authority, newSession); + }; + newSession.on("goaway", destroySessionCb); + newSession.on("error", destroySessionCb); + newSession.on("frameError", destroySessionCb); const sessionTimeout = this.sessionTimeout; if (sessionTimeout) { newSession.setTimeout(sessionTimeout, () => { - newSession.close(); - connectionPool.delete(authority); + if (connectionPool.get(authority) === newSession) { + newSession.close(); + connectionPool.delete(authority); + } }); } return newSession; } + + /** + * Destroy a session immediately and remove it from the http2 pool. + * + * This check ensures that the session is only closed once + * and that an event on one session does not close a different session. + */ + private destroySession(authority: string, session: ClientHttp2Session): void { + if (this.connectionPool.get(authority) !== session) { + // Already closed? + return; + } + this.connectionPool.delete(authority); + session.removeAllListeners("goaway"); + session.removeAllListeners("error"); + session.removeAllListeners("frameError"); + if (!session.destroyed) { + session.destroy(); + } + } }