Skip to content

Commit

Permalink
feat: allow overriding stream implementation
Browse files Browse the repository at this point in the history
Fixes: nodejs#17
  • Loading branch information
ronag committed May 3, 2020
1 parent 24d15a0 commit 3fb9af0
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
17 changes: 9 additions & 8 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ const net = require('net')
const tls = require('tls')
const Q = require('fastq')
const { HTTPParser } = require('http-parser-js')
const { Readable } = require('readable-stream')
const eos = require('end-of-stream')
const retimer = require('retimer')
const { EventEmitter } = require('events')
const Request = require('./request')
Expand All @@ -22,6 +20,7 @@ const kCallbacks = Symbol('callbacks')
const kRequests = Symbol('requests')
const kTimer = Symbol('kTimer')
const kTLSOpts = Symbol('TLS Options')
const kStream = Symbol('kStream')

function connect (client) {
var socket = null
Expand All @@ -45,7 +44,7 @@ function connect (client) {
client[kQueue].resume()
})

eos(socket, (err) => {
client[kStream].finished(socket, (err) => {
reconnect(client, err || new Error('other side closed'))
})
}
Expand Down Expand Up @@ -113,6 +112,8 @@ class Client extends EventEmitter {

this.url = url

this[kStream] = opts.stream || require('readable-stream')

// state machine, might need more states
this.closed = false
this.destroyed = false
Expand Down Expand Up @@ -186,7 +187,7 @@ class Client extends EventEmitter {
}
this.socket.write(body)
} else if (body && typeof body.pipe === 'function') {
const cleanup = eos(this.socket, err => {
const cleanup = this[kStream].finished(this.socket, err => {
if (err) {
body.destroy(err)
}
Expand Down Expand Up @@ -215,7 +216,7 @@ class Client extends EventEmitter {
this.socket.on('drain', onDrain)

this.socket.uncork()
eos(body, (err) => {
this[kStream].finished(body, (err) => {
cleanup()
if (err || !this.socket) {
// TODO we might want to wait before previous in-flight
Expand Down Expand Up @@ -263,7 +264,7 @@ class Client extends EventEmitter {
const skipBody = request.method === 'HEAD'

if (!skipBody) {
this._lastBody = new Readable({ read: this[kRead].bind(this) })
this._lastBody = new this[kStream].Readable({ read: this[kRead].bind(this) })
this._lastBody.push = request.wrapSimple(this._lastBody, this._lastBody.push)
}
cb(null, {
Expand Down Expand Up @@ -368,8 +369,8 @@ class Client extends EventEmitter {
}

if (cb) {
eos(this.socket, (err) => {
if (err && err.message !== 'premature close') {
this[kStream].finished(this.socket, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
cb(err, null)
} else {
cb(null, null)
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
"tap": "^14.0.0"
},
"dependencies": {
"end-of-stream": "^1.4.1",
"fastq": "^1.6.0",
"readable-stream": "^3.0.0",
"http-parser-js": "^0.5.2",
"readable-stream": "^3.0.0",
"retimer": "^2.0.0"
}
}
2 changes: 1 addition & 1 deletion test/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { Pool } = require('..')
const { createServer } = require('http')
const { EventEmitter } = require('events')
const { promisify } = require('util')
const eos = require('end-of-stream')
const eos = require('readable-stream').finished

test('basic get', (t) => {
t.plan(6)
Expand Down

0 comments on commit 3fb9af0

Please sign in to comment.