Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn ContactImporter into ES6 class #542

Merged
merged 3 commits into from
Oct 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"test:helpers": "babel-node ./node_modules/istanbul/lib/cli cover ./node_modules/mocha/bin/_mocha \"packages/helpers/**/*.spec.js\"",
"test:client": "babel-node ./node_modules/istanbul/lib/cli cover ./node_modules/mocha/bin/_mocha \"packages/client/**/*.spec.js\"",
"test:mail": "babel-node ./node_modules/istanbul/lib/cli cover ./node_modules/mocha/bin/_mocha \"packages/mail/**/*.spec.js\"",
"test:contact": "babel-node ./node_modules/istanbul/lib/cli cover ./node_modules/mocha/bin/_mocha \"packages/contact-importer/**/*.spec.js\"",
"test:typescript": "tsc",
"test": "npm run test:all -s",
"coverage": "open -a \"Google Chrome\" ./coverage/lcov-report/index.html"
Expand Down
3 changes: 2 additions & 1 deletion packages/contact-importer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
"async.queue": "^0.5.2",
"bottleneck": "^1.12.0",
"debug": "^2.2.0",
"lodash.chunk": "^4.2.0"
"lodash.chunk": "^4.2.0",
"sendgrid": "^5.2.3"
},
"tags": [
"sendgrid"
Expand Down
271 changes: 141 additions & 130 deletions packages/contact-importer/src/importer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,145 +2,156 @@
'use strict';

const Bottleneck = require('bottleneck');
const EventEmitter = require('events').EventEmitter;
const { EventEmitter } = require('events');
const chunk = require('lodash.chunk');
const debug = require('debug')('sendgrid');
const util = require('util');
const queue = require('async.queue');
const ensureAsync = require('async.ensureasync');

const ContactImporter = module.exports = function(sg, options) {
options = options || {};
const self = this;
this.sg = sg;
this.pendingItems = [];

// Number of items to send per batch.
this.batchSize = options.batchSize || 1500;

// Max number of requests per rate limit period.
this.rateLimitLimit = options.rateLimitLimit || 3;

// Length of rate limit period (miliseconds).
this.rateLimitPeriod = options.rateLimitPeriod || 2000;

// Create a throttler that will process no more than `rateLimitLimit` requests every `rateLimitPeriod` ms.
this.throttle = new Bottleneck(0, 0);
this.throttle.changeReservoir(this.rateLimitLimit);

// Create a queue that wil be used to send batches to the throttler.
this.queue = queue(ensureAsync(this._worker));

// When the last batch is removed from the queue, add any incomplete batches.
this.queue.empty = function() {
if (self.pendingItems.length) {
debug('adding %s items from deferrd queue for processing', self.pendingItems.length);
const batch = self.pendingItems.splice(0);
self.queue.push({
data: batch,
owner: self,
}, function(error, result) {
if (error) {
return self._notify(error, JSON.parse(error.response.body), batch);
}
return self._notify(null, JSON.parse(result.body), batch);
class ContactImporter extends EventEmitter {
constructor(sg, options = {}) {
super();

this.sg = sg;
this.pendingItems = [];

// Number of items to send per batch.
this.batchSize = options.batchSize || 1500;

// Max number of requests per rate limit period.
this.rateLimitLimit = options.rateLimitLimit || 3;

// Length of rate limit period (miliseconds).
this.rateLimitPeriod = options.rateLimitPeriod || 2000;

// Create a throttler that will process no more than `rateLimitLimit` requests every `rateLimitPeriod` ms.
this.throttle = new Bottleneck(0, 0);
this.throttle.changeReservoir(this.rateLimitLimit);

this._setupQueue();
}

/**
* Add a new contact, or an array of contacts, to the end of the queue.
*
* @param {Array|Object} data A contact or array of contacts.
*/
push(data = []) {
data = Array.isArray(data) ? data : [data];

// Add the new items onto the pending items.
const itemsToProcess = this.pendingItems.concat(data);

// Chunk the pending items into batches and add onto the queue
const batches = chunk(itemsToProcess, this.batchSize);
debug('generated batches %s from %s items', batches.length, data.length);

batches.forEach((batch) => {
// If this batch is full or the queue is empty queue it for processing.
if (batch.length === this.batchSize || !this.queue.length()) {
this._pushToQueue(batch);
}
// Otherwise, it store it for later.
else {
debug('the last batch with only %s item is deferred (partial batch)', batch.length);
this.pendingItems = batch;
}
});

debug('batches in queue: %s', this.queue.length());
debug('items in deferred queue: %s', this.pendingItems.length);
}

/**
* Send a batch of contacts to Sendgrid.
*
* @param {Object} task Task to be processed (data in 'data' property)
* @param {Function} callback Callback function.
*/
_worker (task, callback) {
const context = task.owner;
debug('processing batch (%s items)', task.data.length);
context.throttle.submit(context._sendBatch, context, task.data, callback);
}

_sendBatch (context, data, callback) {
debug('sending batch (%s items)', data.length);

const request = context.sg.emptyRequest();
request.method = 'POST';
request.path = '/v3/contactdb/recipients';
request.body = data;

context.sg.API(request)
.then((response) => {
debug('got response: %o', response);
setTimeout(() => {
context.throttle.incrementReservoir(1);
}, context.rateLimitPeriod);
return callback(null, response);
})
.catch((error) => {
debug('got error, %o', error);
setTimeout(() => {
context.throttle.incrementReservoir(1);
}, context.rateLimitPeriod);
return callback(error);
});
}

/**
* Emit the result of processing a batch.
*
* @param {Object} error
* @param {Object} result
*/
_notify (error, result, batch) {
if (error) {
return this.emit('error', error, batch);
}
return this.emit('success', result, batch);
};

// Emit an event when the queue is drained.
this.queue.drain = function() {
self.emit('drain');
};
};
util.inherits(ContactImporter, EventEmitter);

/**
* Add a new contact, or an array of contact, to the end of the queue.
*
* @param {Array|Object} data A contact or array of contacts.
*/
ContactImporter.prototype.push = function(data) {
const self = this;
data = Array.isArray(data) ? data : [data];

// Add the new items onto the pending items.
const itemsToProcess = this.pendingItems.concat(data);

// Chunk the pending items into batches and add onto the queue
const batches = chunk(itemsToProcess, this.batchSize);
debug('generated batches %s from %s items', batches.length, data.length);

batches.forEach(function(batch) {
// If this batch is full or the queue is empty queue it for processing.
if (batch.length === self.batchSize || !self.queue.length()) {
self.queue.push({
data: batch,
owner: self,
}, function(error, result) {
if (error) {
return self._notify(error, JSON.parse(error.response.body), batch);
}
return self._notify(null, JSON.parse(result.body), batch);
});
}
// Otherwise, it store it for later.
else {
debug('the last batch with only %s item is deferred (partial batch)', batch.length);
self.pendingItems = batch;
}
});

debug('batches in queue: %s', this.queue.length());
debug('items in deferred queue: %s', this.pendingItems.length);
};

/**
* Send a batch of contacts to Sendgrid.
*
* @param {Object} task Task to be processed (data in 'data' property)
* @param {Function} callback Callback function.
*/
ContactImporter.prototype._worker = function(task, callback) {
const context = task.owner;
debug('processing batch (%s items)', task.data.length);
context.throttle.submit(context._sendBatch, context, task.data, callback);
};

ContactImporter.prototype._sendBatch = function(context, data, callback) {
debug('sending batch (%s items)', data.length);

const request = context.sg.emptyRequest();
request.method = 'POST';
request.path = '/v3/contactdb/recipients';
request.body = data;

context.sg.API(request)
.then(function(response) {
debug('got response: %o', response);
setTimeout(function() {
context.throttle.incrementReservoir(1);
}, context.rateLimitPeriod);
return callback(null, response);
})
.catch(function(error) {
debug('got error, %o', error);
setTimeout(function() {
context.throttle.incrementReservoir(1);
}, context.rateLimitPeriod);
return callback(error);
/**
* Sets up the queue object on this instance of ContactImporter
*/
_setupQueue () {
// Create a queue that wil be used to send batches to the throttler.
this.queue = queue(ensureAsync(this._worker));

// When the last batch is removed from the queue, add any incomplete batches.
this.queue.empty = () => {
if (!this.pendingItems.length) return;

debug('adding %s items from deferrd queue for processing', this.pendingItems.length);

const batch = this.pendingItems.splice(0);
this._pushToQueue(batch);
};

// Emit an event when the queue is drained.
this.queue.drain = () => {
this.emit('drain');
};
}

/**
* Takes a batch and pushes it to the queue, handling the result as well.
*
* @param {Array} batch A batch to send to the queue.
*/
_pushToQueue (batch) {
this.queue.push({
data: batch,
owner: this,
}, (error, result) => {
if (error) {
return this._notify(error, error.response.body, batch);
}
return this._notify(null, result.body, batch);
});
};

/**
* Emit the result of processing a batch.
*
* @param {Object} error
* @param {Object} result
*/
ContactImporter.prototype._notify = function(error, result, batch) {
if (error) {
return this.emit('error', error, batch);
}
return this.emit('success', result, batch);
};
}

module.exports = ContactImporter;
5 changes: 3 additions & 2 deletions packages/contact-importer/src/importer.spec.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const sendgrid = require('sendgrid');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test file uses sendgrid, but it wasn't in the file. This was required to get it to run and pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add this script shortcut in main package json:

{
  // ...
  scripts: {
    // ... 
    "test:contact": "babel-node ./node_modules/istanbul/lib/cli cover ./node_modules/mocha/bin/_mocha \"packages/contact-importer/**/*.spec.js\"",
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I'll get that added shortly.

const ContactImporter = require('./importer');

describe.only('test_contact_importer', function() {
Expand Down Expand Up @@ -38,8 +39,8 @@ describe.only('test_contact_importer', function() {
console.log('SUCCESS batch', batch);
});
this.contactImporter.on('error', function(error, batch) {
console.log('SUCCESS error', error);
console.log('SUCCESS batch', batch);
console.log('ERROR error', error);
console.log('ERROR batch', batch);
});
this.contactImporter.on('drain', function() {
expect(self.contactImporter._sendBatch).to.have.callCount(3);
Expand Down