Skip to content

Commit

Permalink
[fix] File transport under stress hangs logger (winstonjs#1318)
Browse files Browse the repository at this point in the history
* [fix] Add drain handler back into file transport

This looks like it was removed (by accident?) in
40c1ce4

* [fix] Remove extra drain event

* Prevent this._drains from going negative

* Refactor drain mechanism to fix file transport logging under stress

* Removed early return when write returns false

* Greatly simplify drain mechanism based on PR feedback and testing

* Revert "Greatly simplify drain mechanism based on PR feedback and testing"

This reverts commit 7ff756b.

* Changes and cleanup based on PR feebback

* Add test case that should cover the original reason for this change

This test should fail on current winston 'master' and succeed on this PR

* Test the content of the message as well just in case

* Add synchronous version of the file transport large message stress test
  • Loading branch information
mempf authored and indexzero committed May 22, 2018
1 parent 53a7f8b commit 565595c
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 45 deletions.
58 changes: 21 additions & 37 deletions lib/winston/transports/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ const debug = require('diagnostics')('winston:file');
const os = require('os');
const tailFile = require('../tail-file');

/**
* Simple no-op function.
* @returns {undefined}
*/
function noop() {}

/**
* Transport for outputting to a local log file.
* @type {File}
Expand Down Expand Up @@ -55,7 +49,6 @@ module.exports = class File extends TransportStream {
this._stream = new PassThrough();

// Bind this context for listener methods.
this._onDrain = this._onDrain.bind(this);
this._onError = this._onError.bind(this);

if (options.filename || options.dirname) {
Expand Down Expand Up @@ -89,8 +82,7 @@ module.exports = class File extends TransportStream {
this._size = 0;
this._pendingSize = 0;
this._created = 0;
this._drains = 0;
this._next = noop;
this._drain = false;
this._opening = false;

this.open();
Expand All @@ -102,7 +94,7 @@ module.exports = class File extends TransportStream {
* @param {Function} callback - TODO: add param description.
* @returns {undefined}
*/
log(info, callback = noop) {
log(info, callback = () => {}) {
// Remark: (jcrugzz) What is necessary about this callback(null, true) now
// when thinking about 3.x? Should silent be handled in the base
// TransportStream _write method?
Expand All @@ -111,6 +103,15 @@ module.exports = class File extends TransportStream {
return true;
}

// Output stream buffer is full and has asked us to wait for the drain event
if (this._drain) {
this._stream.once('drain', () => {
this._drain = false;
this.log(info, callback);
});
return;
}

// Grab the raw string and append the expected EOL.
const output = `${info[MESSAGE]}${this.eol}`;
const bytes = Buffer.byteLength(output);
Expand Down Expand Up @@ -143,11 +144,6 @@ module.exports = class File extends TransportStream {
this._endStream(() => this._rotateFile());
}

const written = this._stream.write(output, logged.bind(this));
if (written === false) {
++this._drains;
}

// Keep track of the pending bytes being written while files are opening
// in order to properly rotate the PassThrough this._stream when the file
// eventually does open.
Expand All @@ -158,14 +154,19 @@ module.exports = class File extends TransportStream {
this.rotatedWhileOpening = true;
}

debug('written', written, this._drains);
if (!this._drains) {
callback(); // eslint-disable-line callback-return
const written = this._stream.write(output, logged.bind(this));
if (!written) {
this._drain = true;
this._stream.once('drain', () => {
this._drain = false;
callback();
});
} else {
this._next = callback;
this._dest.emit('drain');
callback(); // eslint-disable-line callback-return
}

debug('written', written, this._drain);

return written;
}

Expand Down Expand Up @@ -411,21 +412,6 @@ module.exports = class File extends TransportStream {
return this.maxsize && size >= this.maxsize;
}

/**
* TODO: add method description.
* @returns {undefined}
*/
_onDrain() {
if (--this._drains) {
return;
}

const next = this._next;
this._next = noop;

next();
}

/**
* TODO: add method description.
* @param {Error} err - TODO: add param description.
Expand All @@ -442,7 +428,6 @@ module.exports = class File extends TransportStream {
*/
_setupStream(stream) {
stream.on('error', this._onError);
stream.on('drain', this._onDrain);

return stream;
}
Expand All @@ -454,7 +439,6 @@ module.exports = class File extends TransportStream {
*/
_cleanupStream(stream) {
stream.removeListener('error', this._onError);
stream.removeListener('drain', this._onDrain);

return stream;
}
Expand Down
96 changes: 88 additions & 8 deletions test/transports/00-file-stress.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
*
*/

var fs = require('fs'),
os = require('os'),
path = require('path'),
assume = require('assume'),
helpers = require('../helpers'),
split = require('split2'),
winston = require('../../lib/winston');
const fs = require('fs');
const os = require('os');
const path = require('path');
const assume = require('assume');
const helpers = require('../helpers');
const split = require('split2');
const winston = require('../../lib/winston');

describe('File (stress)', function () {
this.timeout(30 * 1000);
Expand All @@ -35,7 +35,7 @@ describe('File (stress)', function () {
})]
});

let counters = {
const counters = {
write: 0,
read: 0
};
Expand Down Expand Up @@ -64,4 +64,84 @@ describe('File (stress)', function () {
});
}, 10000);
});

it('should handle a high volume of large writes', function (done) {
const logger = winston.createLogger({
transports: [new winston.transports.File({
filename: logPath
})]
});

const counters = {
write: 0,
read: 0
};

const interval = setInterval(function () {
const msg = {
counter: ++counters.write,
message: 'a'.repeat(16384 - os.EOL.length - 1)
};
logger.info(msg);
}, 0);

setTimeout(function () {
clearInterval(interval);

helpers.tryRead(logPath)
.on('error', function (err) {
assume(err).false();
done();
})
.pipe(split())
.on('data', function (d) {
const json = JSON.parse(d);
assume(json.level).equal('info');
assume(json.message).equal('a'.repeat(16384 - os.EOL.length - 1));
assume(json.counter).equal(++counters.read);
})
.on('end', function () {
assume(counters.write).equal(counters.read);
done();
});
}, 10000);
});

it('should handle a high volume of large writes synchronous', function (done) {
const logger = winston.createLogger({
transports: [new winston.transports.File({
filename: logPath
})]
});

const counters = {
write: 0,
read: 0
};

const msgs = new Array(10).fill().map(() => ({
counter: ++counters.write,
message: 'a'.repeat(16384 - os.EOL.length - 1)
}));
msgs.forEach(msg => logger.info(msg));

setTimeout(function () {
helpers.tryRead(logPath)
.on('error', function (err) {
assume(err).false();
done();
})
.pipe(split())
.on('data', function (d) {
const json = JSON.parse(d);
assume(json.level).equal('info');
assume(json.message).equal('a'.repeat(16384 - os.EOL.length - 1));
assume(json.counter).equal(++counters.read);
})
.on('end', function () {
assume(counters.write).equal(counters.read);
done();
});
}, 10000);
});
});

0 comments on commit 565595c

Please sign in to comment.