Skip to content

Commit

Permalink
notifications | pr notes, merge and small fixes
Browse files Browse the repository at this point in the history
1. Introduce OP_TO_EVENT, remove req.s3event

Signed-off-by: Amit Prinz Setter <[email protected]>
  • Loading branch information
alphaprinz committed Oct 16, 2024
1 parent 3e9f52a commit af9d0f6
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 94 deletions.
2 changes: 1 addition & 1 deletion src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ async function logging_management() {
}

async function notification_management() {
new notifications_util.Notificator(config_fs.fs_context).process_notification_files();
new notifications_util.Notificator({fs_context: config_fs.fs_context}).process_notification_files();
}

exports.main = main;
Expand Down
58 changes: 1 addition & 57 deletions src/cmd/nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,63 +240,7 @@ class NsfsAccountSDK extends AccountSDK {
}
}

async function init_nc_system(config_root) {
const config_fs = new ConfigFS(config_root);
const system_data = await config_fs.get_system_config_file({silent_if_missing: true});
const hostname = os.hostname();

// If the system data already exists, we should not create it again
const updated_system_json = system_data || {};
if (updated_system_json[hostname]?.current_version && updated_system_json.config_directory) return;
if (!updated_system_json[hostname]?.current_version) {
updated_system_json[hostname] = {
current_version: pkg.version,
upgrade_history: { successful_upgrades: [], last_failure: undefined }
};
}
// If it's the first time a config_directory data is added to system.json
if (!updated_system_json.config_directory) {
updated_system_json.config_directory = {
config_dir_version: config_fs.config_dir_version,
upgrade_package_version: pkg.version,
phase: 'CONFIG_DIR_UNLOCKED',
upgrade_history: { successful_upgrades: [], last_failure: undefined }
};
}
try {
if (system_data) {
await config_fs.update_system_config_file(JSON.stringify(updated_system_json));
console.log('updated NC system data with version: ', pkg.version);
} else {
await config_fs.create_system_config_file(JSON.stringify(updated_system_json));
console.log('created NC system data with version: ', pkg.version);
}
=======
if (data?.[hostname]?.current_version) return data;

try {
await system_data.update({
...data,
[hostname]: {
current_version: pkg.version,
upgrade_history: {
successful_upgrades: [],
last_failure: undefined
}
}
});
console.log('created NSFS system data with version: ', pkg.version);
return data;
>>>>>>> 06c911d8c (notifications | align with official content (also change persistent namespace for easier parsing))
} catch (err) {
const msg = 'failed to create/update NC system data due to - ' + err.message;
const error = new Error(msg);
console.error(msg, err);
throw error;
}
}

>>>>>>> 78d41d050 (notifications | align with official content (also change persistent namespace for easier parsing))
/* eslint-disable max-statements */
async function main(argv = minimist(process.argv.slice(2))) {
try {
config.DB_TYPE = 'none';
Expand Down
3 changes: 1 addition & 2 deletions src/endpoint/s3/ops/s3_delete_object.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const config = require('../../../../config');
*/
async function delete_object(req, res) {
const version_id = s3_utils.parse_version_id(req.query.versionId);
req.s3event = 'ObjectRemoved';
const del_res = await req.object_sdk.delete_object({
bucket: req.params.bucket,
key: req.params.key,
Expand All @@ -27,7 +26,7 @@ async function delete_object(req, res) {
} else if (del_res.created_delete_marker) {
res.setHeader('x-amz-version-id', del_res.created_version_id);
res.setHeader('x-amz-delete-marker', 'true');
req.s3event_op = 'DeleteMarkerCreated';
req.s3_event_method = 'DeleteMarkerCreated';
}
res.seq = del_res.seq;
}
Expand Down
1 change: 0 additions & 1 deletion src/endpoint/s3/ops/s3_delete_object_tagging.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const s3_utils = require('../s3_utils');
* https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETEtagging.html
*/
async function delete_object_tagging(req, res) {
req.s3event = 'ObjectTagging';
const reply = await req.object_sdk.delete_object_tagging({
bucket: req.params.bucket,
key: req.params.key,
Expand Down
5 changes: 2 additions & 3 deletions src/endpoint/s3/ops/s3_post_object_restore.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ async function post_object_restore(req, res) {
days,
encryption,
};
req.s3event = 'ObjectRestore';

const restore_object_result = await req.object_sdk.restore_object(params);
if (restore_object_result.accepted) {
res.statusCode = 202;
//no need to set s3event_op, it is 'Post' by default because req.method == 'Post'
//no need to set s3_event_method, it is 'Post' by default because req.method == 'Post'
} else {
res.statusCode = 200;
req.s3event_op = 'Completed';
req.s3_event_method = 'Completed';
res.restore_object_result = restore_object_result;
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/endpoint/s3/ops/s3_post_object_uploadId.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ async function post_object_uploadId(req, res) {
}

http_utils.set_keep_alive_whitespace_interval(res);
req.s3event = "ObjectCreated";
req.s3event_op = "CompleteMultipartUpload";

const reply = await req.object_sdk.complete_object_upload({
obj_id: req.query.uploadId,
Expand Down
5 changes: 2 additions & 3 deletions src/endpoint/s3/ops/s3_put_object.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ async function put_object(req, res) {

dbg.log0('PUT OBJECT', req.params.bucket, req.params.key,
req.headers['x-amz-copy-source'] || '', encryption || '');
req.s3event = "ObjectCreated";
//for copy, use correct s3event_op. otherwise, just use default (req.method)
req.s3event_op = copy_source ? 'Copy' : undefined;
//for copy, use correct s3_event_method. otherwise, just use default (req.method)
req.s3_event_method = copy_source ? 'Copy' : undefined;

const source_stream = req.chunked_content ? s3_utils.decode_chunked_upload(req) : req;
const reply = await req.object_sdk.upload_object({
Expand Down
1 change: 0 additions & 1 deletion src/endpoint/s3/ops/s3_put_object_tagging.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const s3_utils = require('../s3_utils');
async function put_object_tagging(req, res) {
const tag_set = s3_utils.parse_body_tagging_xml(req);
const version_id = s3_utils.parse_version_id(req.query.versionId);
req.s3event = 'ObjectTagging';
const reply = await req.object_sdk.put_object_tagging({
bucket: req.params.bucket,
key: req.params.key,
Expand Down
25 changes: 2 additions & 23 deletions src/endpoint/s3/s3_bucket_logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,7 @@ const http_utils = require('../../util/http_utils');
const dgram = require('node:dgram');
const { Buffer } = require('node:buffer');
const config = require('../../../config');
const {compose_notification} = require('../../util/notifications_util');

function check_notif_relevant(notif, req) {
//if no events were specified, always notify
if (!notif.events) return true;

//check request's event is in notification's events list
for (const notif_event of notif.events) {
const notif_event_elems = notif_event.split(':');
const event_name = notif_event_elems[1];
const event_method = notif_event_elems[2];
const req_s3_event_op = req.s3_event_op || req.method;
if (!req.s3event) return false;
if (event_name.toLowerCase() !== req.s3event.toLowerCase()) return false;
if (event_method === '*') return true;
if (!req_s3_event_op) return false;
if (event_method.toLowerCase() === req_s3_event_op.toLowerCase()) return true;
}

//request does not match any of the requested events
return false;
}
const {compose_notification, check_notif_relevant} = require('../../util/notifications_util');

async function send_bucket_op_logs(req, res) {
if (req.params && req.params.bucket &&
Expand Down Expand Up @@ -73,7 +52,7 @@ async function send_bucket_op_logs(req, res) {
const promises = [];
for (const write of writes_aggregate) {
promises.push(new Promise((resolve, reject) => {
write.file.append(write.buffer).then(resolve());
write.file.append(write.buffer).then(resolve);
}));
}
await Promise.all(promises);
Expand Down
50 changes: 49 additions & 1 deletion src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ const { get_process_fs_context } = require('./native_fs_utils');
const nb_native = require('../util/nb_native');
const http_utils = require('../util/http_utils');

const OP_TO_EVENT = Object.freeze({
put_object: { name: 'ObjectCreated' },
post_object: { name: 'ObjectCreated' },
post_object_uploadId: { name: 'ObjectCreated', method: 'CompleteMultipartUpload' },
delete_object: { name: 'ObjectRemoved' },
post_object_restore: { name: 'ObjectRestore' },
put_object_acl: { name: 'ObjectAcl' },
put_object_tagging: { name: 'ObjectTagging' },
delete_object_tagging: { name: 'ObjectTagging' },
});

class Notificator {

/**
Expand Down Expand Up @@ -305,11 +316,14 @@ function compose_notification(req, res, bucket, notif_conf) {
eTag = eTag.substring(2, eTag.length - 2);
}

const event = OP_TO_EVENT[req.op_name];
const http_verb_capitalized = req.method.charAt(0).toUpperCase() + req.method.slice(1).toLowerCase();

const notif = {
eventVersion: '2.3',
eventSource: _get_system_name(req) + ':s3',
eventTime: new Date().toISOString,
eventName: req.s3event + ':' + (req.s3_event_op || req.method),
eventName: event.name + ':' + (event.method || req.s3_event_method || http_verb_capitalized),
userIdentity: {
principalId: req.object_sdk.requesting_account.name,
},
Expand Down Expand Up @@ -374,6 +388,40 @@ function _get_system_name(req) {
}
}

function check_notif_relevant(notif, req) {
const op_event = OP_TO_EVENT[req.op_name];
if (!op_event) {
//s3 op is not relevant for notifications
return false;
}

//if no events were specified, always notify
if (!notif.Events) return true;

//check request's event is in notification's events list
for (const notif_event of notif.Events) {
const notif_event_elems = notif_event.split(':');
const notif_event_name = notif_event_elems[1];
const notif_event_method = notif_event_elems[2];
if (notif_event_name.toLowerCase() !== op_event.name.toLowerCase()) return false;
//is there filter by method?
if (notif_event_method === '*') {
//no filtering on method. we've passed the filter and need to send a notification
return true;
}
//take request method by this order
//1 op_event.method - in case method can be inferred from req.op_name, eg s3_post_object_uploadId
//2 op explicitly set req.s3_event_method, eg DeleteMarkerCreated
//3 default to req.method (aka "http verb") eg get/post/delete
const op_method = op_event.method || req.s3_event_method || req.method;
if (notif_event_method.toLowerCase() === op_method.toLowerCase()) return true;
}

//request does not match any of the requested events
return false;
}

exports.Notificator = Notificator;
exports.test_notifications = test_notifications;
exports.compose_notification = compose_notification;
exports.check_notif_relevant = check_notif_relevant;

0 comments on commit af9d0f6

Please sign in to comment.