Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Implement SIP9 #521

Merged
merged 100 commits into from
May 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
4b42718
Sketch out implementation
Oct 19, 2017
0d5fb32
Add auth middleware for exchange reports
Oct 20, 2017
372c11e
Sketch implementation for reports handling
Oct 20, 2017
b0a8d9f
Update reputation
Oct 20, 2017
9f015ee
Determine report type
Oct 20, 2017
79b832b
Validate report
Oct 20, 2017
7932d56
Create storage events on download
Oct 23, 2017
35536f5
Remove creation of storage event when creating bucket entry
Oct 23, 2017
499562d
Record storage events for uploads
Oct 23, 2017
8c73815
Switch to use frames
Oct 23, 2017
424ee69
Create storage event on mirroring
Oct 23, 2017
b0d8d99
Record storage event as ended when offline
Oct 23, 2017
34ebeeb
Create storage event when replicating when farmer goes offline
Oct 23, 2017
9ef8299
Update storeEnd when bucket is deleted
Oct 24, 2017
ef1c30a
Update storage events with storeEnd when file deleted
Oct 24, 2017
c166c14
Mirrors do not have a user
Oct 24, 2017
f2e8725
Mirrors don't have a user
Oct 24, 2017
3b5d5c2
Update changes to model
Oct 24, 2017
73f434c
Update monitor storage event creation
Oct 24, 2017
d0e7301
Update model
Oct 24, 2017
3dc1e5b
Update report from userReport to clientReport
Oct 24, 2017
9e66b06
Include size in mirror storage event
Oct 25, 2017
041ec5a
Fix syntax errors
Oct 27, 2017
2d7dcf4
Testing for monitor
Oct 27, 2017
d3b3a1d
Fix tests for monitor
Oct 27, 2017
665f727
Test storage event creation in monitor
Oct 27, 2017
8d1b27d
Bucket tests
Oct 27, 2017
a93f4a6
More bucket tests
Oct 27, 2017
0fd205d
Remove outdated test
Oct 27, 2017
e95370e
Remove tests that are not needed
Oct 27, 2017
f315fde
Fix monitor test
Oct 27, 2017
9001588
Test download storage events
Oct 28, 2017
73c6651
Check storage events created on download
Oct 28, 2017
b45c0e5
Test bucket removeFile
Oct 28, 2017
03b0043
Fix side-effect in buckets test
Oct 28, 2017
0b18faf
Test upload storage event
Oct 28, 2017
e4c54d6
Test exchange report validation
Oct 28, 2017
409e15b
Test reports auth middleware
Oct 28, 2017
cd4629a
Test contact reputation update
Oct 28, 2017
5c5c7ca
Test exchange reports
Oct 28, 2017
85cabd4
Update trigger mirror establish tests
Oct 28, 2017
d4e6f3e
Linting fixes
Oct 28, 2017
74a4666
Fix frames storage event test
Oct 28, 2017
ea65d7a
Check empty object, and update test
Oct 28, 2017
8454af5
Fix linting
Oct 28, 2017
85ac641
Add valid report message
Nov 9, 2017
49059cd
Fix storage event for initial mirroring
Nov 9, 2017
62d4a13
Fix client id
Nov 9, 2017
519160b
Fix shard hashes query for bucket entry
Nov 9, 2017
01aab7b
Add multi
Nov 9, 2017
1f52d38
Add multi
Nov 9, 2017
17f97e2
Fix shard hashes query by bucket
Nov 10, 2017
a249c65
Fix types
Nov 10, 2017
93579e8
Fix log
Nov 10, 2017
b9dc0a6
Fix creation of storage events in monitor
Nov 10, 2017
5846ffc
Fix monitor tests
Nov 10, 2017
cbaa26b
Fix tests
Nov 10, 2017
962c1b1
Add cron for updating storage events
Nov 13, 2017
0431dd5
Start implementation of cron for storage events
Nov 13, 2017
27feeea
Add cron lock
Nov 13, 2017
8ae6972
Resolve storage events in cron
Nov 15, 2017
98d2d65
Keep track of work completed
Nov 15, 2017
1be467c
Add finality time period
Nov 15, 2017
d01a69f
Add unknown reports threshold
Nov 16, 2017
288bb3a
Fix redefinition of last timestamp
Nov 16, 2017
73272d1
Fix reference
Nov 16, 2017
9a8539c
Add note about reputation points in cron
Nov 16, 2017
312ba59
Linting
Nov 16, 2017
3203e54
Create spec of functionality
Nov 16, 2017
24c602b
Test resolving codes
Nov 16, 2017
a4b0f4e
Finish unit tests on storage events finality cron
Nov 17, 2017
dad1349
Add backwards compat ability for exchange reports
Nov 20, 2017
c174ce0
Add to bin
Nov 20, 2017
ece9f74
Filter out null user, and use Date
Nov 20, 2017
3426c28
Add missing argument
Nov 20, 2017
0500885
Add small overlap
Dec 5, 2017
e0606c7
Include bytes
Dec 6, 2017
57a9dfa
Update tests
Dec 6, 2017
275a8d5
Adjust default offline threshold
Dec 8, 2017
c2124ce
Fix tests
Dec 8, 2017
54d0fb4
Fix reference
Dec 14, 2017
889037c
Fix timeout in test
Apr 19, 2018
0425a20
Fix cron scheduling pattern
kaloyan-raev Apr 24, 2018
ba0f180
Merge pull request #2 from kaloyan-raev/patch-1
braydonf Apr 24, 2018
d08818b
Fix for deleting empty bucket
kaloyan-raev Apr 25, 2018
6860925
Merge pull request #3 from kaloyan-raev/empty-bucket
braydonf Apr 25, 2018
a3d8f42
fix renter reports comparison
dylanlott Apr 26, 2018
96814f5
fix renter reports comparison
dylanlott Apr 26, 2018
2b6049d
Merge branch 'sip9' of github.com:dylanlott/bridge into sip9
dylanlott Apr 26, 2018
807703b
compare against `_id` instead of `id` to match middleware
dylanlott May 1, 2018
01e8040
Merge pull request #4 from dylanlott/sip9
braydonf May 1, 2018
65f8e8b
Fix tests
May 1, 2018
68e1472
Make sure that storage events are only processed once to avoid
May 2, 2018
5bc99a1
Update reputation in cron service for processing storage events
May 2, 2018
1fb9f18
Fix issue with initial mirror generation
May 2, 2018
df786a2
Also resolve client success
May 2, 2018
9e2a686
Check that event is not already processed
May 3, 2018
da42831
Update to released version of dependencies for SIP9
May 3, 2018
a7e33ab
Fix tests, added missing stubs
May 3, 2018
f7c9f41
Fix state issues with tests
May 3, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions bin/storj-cron.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env node

'use strict';

const async = require('async');
const program = require('commander');
const Config = require('../lib/config');
const StorageEventsCron = require('../lib/cron/storage-events');

program.version(require('../package').version);
program.option('-c, --config <path_to_config_file>', 'path to the config file');
program.option('-d, --datadir <path_to_datadir>', 'path to the data directory');
program.parse(process.argv);

var config = new Config(process.env.NODE_ENV || 'develop', program.config, program.datadir);

var jobs = [
new StorageEventsCron(config)
];

async.eachSeries(jobs, function(job, next) {
job.start(next);
}, function(err) {
if (err) {
throw err;
}
});
1 change: 1 addition & 0 deletions lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const DEFAULTS = {
from: '[email protected]'
},
application: {
unknownReportThreshold: 0.3, // one third unknown reports
delayedActivation: true, // send delayed user activation email
activateSIP6: false,
powOpts: {
Expand Down
248 changes: 248 additions & 0 deletions lib/cron/storage-events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
'use strict';

const assert = require('assert');
const CronJob = require('cron').CronJob;
const Config = require('../config');
const Storage = require('storj-service-storage-models');
const points = Storage.constants.POINTS;
const log = require('../logger');

function StorageEventsCron(config) {
if (!(this instanceof StorageEventsCron)) {
return new StorageEventsCron(config);
}

assert(config instanceof Config, 'Invalid config supplied');

this._config = config;
}

StorageEventsCron.CRON_TIME = '00 */10 * * * *'; // every ten minutes
StorageEventsCron.MAX_RUN_TIME = 600000; // 10 minutes
StorageEventsCron.FINALITY_TIME = 10800000; // 3 hours

StorageEventsCron.prototype.start = function(callback) {
log.info('starting the storage events cron');

this.storage = new Storage(
this._config.storage.mongoUrl,
this._config.storage.mongoOpts,
{ logger: log }
);

this.job = new CronJob({
cronTime: StorageEventsCron.CRON_TIME,
onTick: this.run.bind(this),
start: false,
timeZone: 'UTC'
});

this.job.start();

callback();
};

StorageEventsCron.prototype._resolveCodes = function(event, user) {
const threshold = this._config.application.unknownReportThreshold;

let success = event.success;
let unknown = success ? false : true;
if (unknown) {
resolveCodes();
}

function resolveCodes() {
const failureCode = 1100;
const successCode = 1000;

const clientCode = event.clientReport ?
event.clientReport.exchangeResultCode : undefined;
const farmerCode = event.farmerReport ?
event.farmerReport.exchangeResultCode : undefined;

/* jshint eqeqeq: false */
if (clientCode == successCode) {
success = true;
unknown = false;
} else if (farmerCode == failureCode && !clientCode) {
success = false;
unknown = false;
} else if (farmerCode == failureCode && clientCode == failureCode) {
success = false;
unknown = false;
} else if (!farmerCode && clientCode == failureCode) {
success = false;
unknown = false;
} else if (user.exceedsUnknownReportsThreshold(threshold)) {
success = true;
unknown = true;
}
}

return {
success: success,
unknown: unknown
};

};

StorageEventsCron.prototype._updateReputation = function(nodeID, points) {
this.storage.models.Contact.findOne({_id: nodeID}, (err, contact) => {
if (err || !contact) {
log.warn('updateReputation: Error trying to find contact ' +
' %s, reason: %s', nodeID, err ? err.message : 'unknown');
return;
}
contact.recordPoints(points).save((err) => {
if (err) {
log.warn('updateReputation: Error saving contact ' +
' %s, reason: %s', nodeID, err.message);
}
});
});
};

StorageEventsCron.prototype._resolveEvent = function(event, callback) {

this.storage.models.User.findOne({_id: event.user}, (err, user) => {
if (err) {
return callback(err);
}

const {success, unknown} = this._resolveCodes(event, user);

// Mark the event as processed as to avoid counting this storage
// event multiple times in reporting rates
event.processed = true;

// Set the event success status based on how it's resolved
event.success = success;

// Award points to the farmer for the success or failure
if (success) {
this._updateReputation(event.farmer, points.TRANSFER_SUCCESS);
} else {
this._updateReputation(event.farmer, points.TRANSFER_FAILURE);
}

event.save((err) => {
if (err) {
return callback(err);
}
finalize();
});


function finalize() {
const bytes = event.storage || event.downloadBandwidth;
user.updateUnknownReports(unknown, event.timestamp, bytes, (err) => {
if (err) {
return callback(err);
}

callback(null, event.timestamp);
});
}
});
};

StorageEventsCron.prototype._run = function(lastTimestamp, callback) {

const StorageEvent = this.storage.models.StorageEvent;
const finalityTime = new Date(Date.now() - StorageEventsCron.FINALITY_TIME);

// There is a slight overlap between work to make sure that the edge
// case where events have the same timestamp are not left behind in the
// processing. It's important that all updates are safetly repeatable.
// We also make sure to only query storage events that have not already
// been processed to make sure that they are not processed multiple
// times that would give incorrect reporting rates.
const cursor = StorageEvent.find({
timestamp: {
$lt: finalityTime,
$gte: lastTimestamp
},
processed: { $eq: false },
user: {
$exists: true,
$ne: null
}
}).sort({timestamp: 1}).cursor();

const timeout = setTimeout(() => {
finish(new Error('Job exceeded max duration'));
}, StorageEventsCron.MAX_RUN_TIME);

let callbackCalled = false;

function finish(err) {
clearTimeout(timeout);
cursor.close();
if (!callbackCalled) {
callbackCalled = true;
callback(err, lastTimestamp);
}
}

cursor.on('error', finish);

cursor.on('data', (event) => {
cursor.pause();
this._resolveEvent(event, (err, _lastTimestamp) => {
if (err) {
return finish(err);
}
lastTimestamp = _lastTimestamp;
cursor.resume();
});
});

cursor.on('end', finish);
};

StorageEventsCron.prototype.run = function() {
const name = 'StorageEventsFinalityCron';
const Cron = this.storage.models.CronJob;
Cron.lock(name, StorageEventsCron.MAX_RUN_TIME, (err, locked, res) => {
if (err) {
return log.error('%s lock failed, reason: %s', name, err.message);
}
if (!locked) {
return log.warn('%s already running', name);
}

log.info('Starting %s cron job', name);

let lastTimestamp = new Date(0);
if (res &&
res.value &&
res.value.rawData &&
res.value.rawData.lastTimestamp) {
lastTimestamp = new Date(res.value.rawData.lastTimestamp);
} else {
log.warn('%s cron has unknown lastTimestamp', name);
}

this._run(lastTimestamp, (err, _lastTimestamp) => {
if (err) {
let message = err.message ? err.message : 'unknown';
log.error('Error running %s, reason: %s', name, message);
}

log.info('Stopping %s cron', name);
const rawData = {};
if (_lastTimestamp) {
rawData.lastTimestamp = _lastTimestamp.getTime();
}

Cron.unlock(name, rawData, (err) => {
if (err) {
return log.error('%s unlock failed, reason: %s', name, err.message);
}
});

});
});
};

module.exports = StorageEventsCron;
2 changes: 1 addition & 1 deletion lib/monitor/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const DEFAULTS = {
minInterval: '5m',
queryNumber: 100,
pingConcurrency: 10,
timeoutRateThreshold: 0.04 // ~1 hour of downtime in 24 hours
timeoutRateThreshold: 0.1585 // ~11.4 hours of downtime in 72 hours
}
};

Expand Down
Loading