Skip to content

Commit

Permalink
feat: allow overriding stream implementation
Browse files Browse the repository at this point in the history
Fixes: nodejs#17
  • Loading branch information
ronag committed May 3, 2020
1 parent 8ae7358 commit 19b4ee7
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
24 changes: 13 additions & 11 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ const net = require('net')
const tls = require('tls')
const Q = require('fastq')
const { HTTPParser } = require('http-parser-js')
const { Readable } = require('readable-stream')
const eos = require('end-of-stream')
const retimer = require('retimer')
const { EventEmitter } = require('events')
const Request = require('./request')
Expand All @@ -24,6 +22,7 @@ const kTLSOpts = Symbol('TLS Options')
const kLastBody = Symbol('lastBody')
const kNeedHeaders = Symbol('needHeaders')
const kResetParser = Symbol('resetParser')
const kStream = Symbol('kStream')

function connect (client) {
var socket = null
Expand All @@ -50,7 +49,7 @@ function connect (client) {
client[kQueue].resume()
})

eos(socket, (err) => {
client[kStream].finished(socket, (err) => {
reconnect(client, err || new Error('other side closed'))
})
}
Expand Down Expand Up @@ -120,6 +119,8 @@ class Client extends EventEmitter {

this.url = url

this[kStream] = opts.stream || require('readable-stream')

// state machine, might need more states
this.closed = false
this.destroyed = false
Expand Down Expand Up @@ -190,7 +191,7 @@ class Client extends EventEmitter {
}
this.socket.write(body)
} else if (body && typeof body.pipe === 'function') {
const cleanup = eos(this.socket, err => {
const cleanup = this[kStream].finished(this.socket, (err) => {
if (err) {
body.destroy(err)
}
Expand Down Expand Up @@ -219,13 +220,13 @@ class Client extends EventEmitter {
this.socket.on('drain', onDrain)

this.socket.uncork()
eos(body, (err) => {
this[kStream].finished(body, (err) => {
cleanup()
if (err || !this.socket) {
// TODO we might want to wait before previous in-flight
// requests are finished before destroying
if (this.socket) {
destroySocket(this.socket, err, cb)
destroySocket(this, err, cb)
} else {
assert(this.closed)
cb(err, null)
Expand Down Expand Up @@ -266,7 +267,7 @@ class Client extends EventEmitter {
const skipBody = request.method === 'HEAD'

if (!skipBody) {
this[kLastBody] = new Readable({ read: this[kRead].bind(this) })
this[kLastBody] = new this[kStream].Readable({ read: this[kRead].bind(this) })
this[kLastBody].push = request.wrapSimple(this[kLastBody], this[kLastBody].push)
}
callback(null, {
Expand Down Expand Up @@ -378,8 +379,8 @@ class Client extends EventEmitter {
}

if (this.socket) {
eos(this.socket, (err) => {
if (err && err.message !== 'premature close') {
this[kStream].finished(this.socket, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
cb(err)
} else {
cb(null)
Expand Down Expand Up @@ -430,11 +431,12 @@ function parseHeaders (headers) {

module.exports = Client

function destroySocket (socket, err, cb) {
function destroySocket (client, err, cb) {
// This code is basically the same as...
// stream.finished(socket, er => cb(err || er))
// socket.destroy(err)
// ... in Node 14+
const socket = client.socket
const wState = socket._writableState
const rState = socket._readableState
const closed = (wState && wState.closed) || (rState && rState.closed)
Expand All @@ -450,7 +452,7 @@ function destroySocket (socket, err, cb) {
socket.on('error', callback)
process.nextTick(callback)
} else if (closed === false || !socket.destroyed) {
eos(socket, callback)
client[kStream].finished(socket, callback)
} else {
socket.on('error', callback)
setImmediate(callback)
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
"tap": "^14.0.0"
},
"dependencies": {
"end-of-stream": "^1.4.1",
"fastq": "^1.6.0",
"readable-stream": "^3.0.0",
"http-parser-js": "^0.5.2",
"readable-stream": "^3.0.0",
"retimer": "^2.0.0"
}
}
2 changes: 1 addition & 1 deletion test/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { Pool } = require('..')
const { createServer } = require('http')
const { EventEmitter } = require('events')
const { promisify } = require('util')
const eos = require('end-of-stream')
const eos = require('readable-stream').finished

test('basic get', (t) => {
t.plan(6)
Expand Down

0 comments on commit 19b4ee7

Please sign in to comment.