Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] File transport under stress hangs logger #1318

Merged
merged 11 commits into from
May 22, 2018
58 changes: 21 additions & 37 deletions lib/winston/transports/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ const debug = require('diagnostics')('winston:file');
const os = require('os');
const tailFile = require('../tail-file');

/**
* Simple no-op function.
* @returns {undefined}
*/
function noop() {}

/**
* Transport for outputting to a local log file.
* @type {File}
Expand Down Expand Up @@ -55,7 +49,6 @@ module.exports = class File extends TransportStream {
this._stream = new PassThrough();

// Bind this context for listener methods.
this._onDrain = this._onDrain.bind(this);
this._onError = this._onError.bind(this);

if (options.filename || options.dirname) {
Expand Down Expand Up @@ -89,8 +82,7 @@ module.exports = class File extends TransportStream {
this._size = 0;
this._pendingSize = 0;
this._created = 0;
this._drains = 0;
this._next = noop;
this._drain = false;
this._opening = false;

this.open();
Expand All @@ -102,7 +94,7 @@ module.exports = class File extends TransportStream {
* @param {Function} callback - TODO: add param description.
* @returns {undefined}
*/
log(info, callback = noop) {
log(info, callback = () => {}) {
// Remark: (jcrugzz) What is necessary about this callback(null, true) now
// when thinking about 3.x? Should silent be handled in the base
// TransportStream _write method?
Expand All @@ -111,6 +103,15 @@ module.exports = class File extends TransportStream {
return true;
}

// Output stream buffer is full and has asked us to wait for the drain event
if (this._drain) {
this._stream.once('drain', () => {
this._drain = false;
this.log(info, callback);
});
return;
}

// Grab the raw string and append the expected EOL.
const output = `${info[MESSAGE]}${this.eol}`;
const bytes = Buffer.byteLength(output);
Expand Down Expand Up @@ -143,11 +144,6 @@ module.exports = class File extends TransportStream {
this._endStream(() => this._rotateFile());
}

const written = this._stream.write(output, logged.bind(this));
if (written === false) {
++this._drains;
}

// Keep track of the pending bytes being written while files are opening
// in order to properly rotate the PassThrough this._stream when the file
// eventually does open.
Expand All @@ -158,14 +154,19 @@ module.exports = class File extends TransportStream {
this.rotatedWhileOpening = true;
}

debug('written', written, this._drains);
if (!this._drains) {
callback(); // eslint-disable-line callback-return
const written = this._stream.write(output, logged.bind(this));
if (!written) {
this._drain = true;
this._stream.once('drain', () => {
this._drain = false;
callback();
});
} else {
this._next = callback;
this._dest.emit('drain');
callback(); // eslint-disable-line callback-return
}

debug('written', written, this._drain);

return written;
}

Expand Down Expand Up @@ -411,21 +412,6 @@ module.exports = class File extends TransportStream {
return this.maxsize && size >= this.maxsize;
}

/**
* TODO: add method description.
* @returns {undefined}
*/
_onDrain() {
if (--this._drains) {
return;
}

const next = this._next;
this._next = noop;

next();
}

/**
* TODO: add method description.
* @param {Error} err - TODO: add param description.
Expand All @@ -442,7 +428,6 @@ module.exports = class File extends TransportStream {
*/
_setupStream(stream) {
stream.on('error', this._onError);
stream.on('drain', this._onDrain);

return stream;
}
Expand All @@ -454,7 +439,6 @@ module.exports = class File extends TransportStream {
*/
_cleanupStream(stream) {
stream.removeListener('error', this._onError);
stream.removeListener('drain', this._onDrain);

return stream;
}
Expand Down
58 changes: 50 additions & 8 deletions test/transports/00-file-stress.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
*
*/

var fs = require('fs'),
os = require('os'),
path = require('path'),
assume = require('assume'),
helpers = require('../helpers'),
split = require('split2'),
winston = require('../../lib/winston');
const fs = require('fs');
const os = require('os');
const path = require('path');
const assume = require('assume');
const helpers = require('../helpers');
const split = require('split2');
const winston = require('../../lib/winston');

describe('File (stress)', function () {
this.timeout(30 * 1000);
Expand All @@ -35,7 +35,7 @@ describe('File (stress)', function () {
})]
});

let counters = {
const counters = {
write: 0,
read: 0
};
Expand Down Expand Up @@ -64,4 +64,46 @@ describe('File (stress)', function () {
});
}, 10000);
});

it('should handle a high volume of large writes', function (done) {
const logger = winston.createLogger({
transports: [new winston.transports.File({
filename: logPath
})]
});

const counters = {
write: 0,
read: 0
};

const interval = setInterval(function () {
const msg = {
counter: ++counters.write,
message: 'a'.repeat(16384 - os.EOL.length - 1)
};
logger.info(msg);
}, 0);

setTimeout(function () {
clearInterval(interval);

helpers.tryRead(logPath)
.on('error', function (err) {
assume(err).false();
done();
})
.pipe(split())
.on('data', function (d) {
const json = JSON.parse(d);
assume(json.level).equal('info');
assume(json.message).equal('a'.repeat(16384 - os.EOL.length - 1));
assume(json.counter).equal(++counters.read);
})
.on('end', function () {
assume(counters.write).equal(counters.read);
done();
});
}, 10000);
});
});