Skip to content

Commit

Permalink
notifications | align with official content (also change persistent n…
Browse files Browse the repository at this point in the history
…amespace for easier parsing)

Signed-off-by: Amit Prinz Setter <[email protected]>
  • Loading branch information
alphaprinz committed Oct 4, 2024
1 parent ec5e911 commit d09a342
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 61 deletions.
2 changes: 2 additions & 0 deletions src/api/object_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ module.exports = {
content_type: { type: 'string' },
content_encoding: { type: 'string' },
size: { type: 'integer' },
seq: { type: 'integer' },
}
},
auth: { system: ['admin', 'user'] }
Expand Down Expand Up @@ -653,6 +654,7 @@ module.exports = {
deleted_delete_marker: { type: 'boolean' },
created_version_id: { type: 'string' },
created_delete_marker: { type: 'boolean' },
seq: { type: 'integer' },
}
},
auth: { system: ['admin', 'user'] }
Expand Down
11 changes: 7 additions & 4 deletions src/cmd/nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ function print_usage() {
let nsfs_config_root;

class NsfsObjectSDK extends ObjectSDK {
constructor(fs_root, fs_config, account, versioning, config_root) {
constructor(fs_root, fs_config, account, versioning, config_root, nsfs_system) {
// const rpc_client_hooks = new_rpc_client_hooks();
// rpc_client_hooks.account.read_account_by_access_key = async ({ access_key }) => {
// if (access_key) {
Expand Down Expand Up @@ -153,6 +153,7 @@ class NsfsObjectSDK extends ObjectSDK {
this.nsfs_account = account;
this.nsfs_versioning = versioning;
this.nsfs_namespaces = {};
this.nsfs_system = nsfs_system;
if (!config_root) {
this._get_bucket_namespace = bucket_name => this._simple_get_single_bucket_namespace(bucket_name);
this.load_requesting_account = auth_req => this._simple_load_requesting_account(auth_req);
Expand Down Expand Up @@ -247,7 +248,7 @@ async function init_nsfs_system(config_root) {
const data = await system_data.read();
const hostname = os.hostname();
// If the system data already exists, we should not create it again
if (data?.[hostname]?.current_version) return;
if (data?.[hostname]?.current_version) return data;

try {
await system_data.update({
Expand All @@ -261,6 +262,7 @@ async function init_nsfs_system(config_root) {
}
});
console.log('created NSFS system data with version: ', pkg.version);
return data;
} catch (err) {
const msg = 'failed to create NSFS system data due to - ' + err.message;
const error = new Error(msg);
Expand Down Expand Up @@ -348,7 +350,8 @@ async function main(argv = minimist(process.argv.slice(2))) {
nsfs_config_root,
});

if (!simple_mode) await init_nsfs_system(nsfs_config_root);
let nsfs_system;
if (!simple_mode) nsfs_system = await init_nsfs_system(nsfs_config_root);

const endpoint = require('../endpoint/endpoint');
await endpoint.main({
Expand All @@ -360,7 +363,7 @@ async function main(argv = minimist(process.argv.slice(2))) {
forks,
nsfs_config_root,
init_request_sdk: (req, res) => {
req.object_sdk = new NsfsObjectSDK(fs_root, fs_config, account, versioning, nsfs_config_root);
req.object_sdk = new NsfsObjectSDK(fs_root, fs_config, account, versioning, nsfs_config_root, nsfs_system);
req.account_sdk = new NsfsAccountSDK(fs_root, fs_config, account, nsfs_config_root);
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/endpoint/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async function main(options = {}) {
});

notification_logger = config.NOTIFICATION_LOG_DIR &&
new PersistentLogger(config.NOTIFICATION_LOG_DIR, config.NOTIFICATION_LOG_NS + '_' + node_name, {
new PersistentLogger(config.NOTIFICATION_LOG_DIR, node_name + '_' + config.NOTIFICATION_LOG_NS, {
locking: 'SHARED',
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
});
Expand Down
1 change: 1 addition & 0 deletions src/endpoint/s3/ops/s3_delete_object.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async function delete_object(req, res) {
res.setHeader('x-amz-delete-marker', 'true');
req.s3event_op = 'DeleteMarkerCreated';
}
res.seq = del_res.seq;
}

module.exports = {
Expand Down
5 changes: 3 additions & 2 deletions src/endpoint/s3/ops/s3_post_object_restore.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ async function post_object_restore(req, res) {
};
req.s3event = 'ObjectRestore';

const accepted = await req.object_sdk.restore_object(params);
if (accepted) {
const restore_object_result = await req.object_sdk.restore_object(params);
if (restore_object_result.accepted) {
res.statusCode = 202;
//no need to set s3event_op, it is 'Post' by default because req.method == 'Post'
} else {
res.statusCode = 200;
req.s3event_op = 'Completed';
res.restore_object_result = restore_object_result;
}
}

Expand Down
9 changes: 4 additions & 5 deletions src/endpoint/s3/ops/s3_put_bucket_notification.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
/* Copyright (C) 2016 NooBaa */
'use strict';

const s3_utils = require('../s3_utils');
const S3Error = require('../s3_errors').S3Error;

/**
* http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
*/
async function put_bucket_notification(req) {

const topic_configuration = req.body.NotificationConfiguration?.TopicConfiguration;
if (!topic_configuration ||
if (!topic_configuration ||
typeof topic_configuration !== 'object') throw new S3Error(S3Error.MalformedXML);


//align request aws s3api sends
for(const notif of topic_configuration) {
for (const notif of topic_configuration) {
if (Array.isArray(notif.Id)) notif.Id = notif.Id[0];
notif.Connect = Array.isArray(notif.Topic) ? notif.Topic[0] : notif.Topic;
notif.Events = notif.Event;
Expand All @@ -27,7 +26,7 @@ async function put_bucket_notification(req) {
bucket_name: req.params.bucket,
notifications: topic_configuration
});

return reply;
}

Expand Down
5 changes: 5 additions & 0 deletions src/endpoint/s3/ops/s3_put_object.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ async function put_object(req, res) {
};
}
res.setHeader('ETag', `"${reply.etag}"`);

if (reply.seq) {
res.seq = reply.seq;
delete reply.seq;
}
}


Expand Down
42 changes: 19 additions & 23 deletions src/endpoint/s3/s3_bucket_logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const http_utils = require('../../util/http_utils');
const dgram = require('node:dgram');
const { Buffer } = require('node:buffer');
const config = require('../../../config');
const {compose_notification} = require('../../util/notifications_util');

function check_notif_relevant(notif, req) {
//if no events were specified, always notify
Expand All @@ -18,10 +19,10 @@ function check_notif_relevant(notif, req) {
const event_method = notif_event_elems[2];
const req_s3_event_op = req.s3_event_op || req.method;
if (!req.s3event) return false;
if (event_name.toLowerCase() !== req.s3event.toLowerCase()) return false; //TODO
if (event_name.toLowerCase() !== req.s3event.toLowerCase()) return false;
if (event_method === '*') return true;
if (!req_s3_event_op) return false;
if (event_method.toLowerCase() === req_s3_event_op.toLowerCase()) return true; //TODO
if (event_method.toLowerCase() === req_s3_event_op.toLowerCase()) return true;
}

//request does not match any of the requested events
Expand All @@ -31,8 +32,8 @@ function check_notif_relevant(notif, req) {
async function send_bucket_op_logs(req, res) {
if (req.params && req.params.bucket &&
!(req.op_name === 'put_bucket' ||
req.op_name === 'put_bucket_notification' ||
req.op_name === 'get_bucket_notification'
req.op_name === 'put_bucket_notification' ||
req.op_name === 'get_bucket_notification'
)) {
//potentially, there could be two writes to two different files.
//we want to await for all writes together, instead of serially
Expand All @@ -48,20 +49,19 @@ async function send_bucket_op_logs(req, res) {
}

if (req.notification_logger && bucket_info.notifications) {

for (const notif of bucket_info.notifications) {
if (check_notif_relevant(notif, req)) {
const s3_log = {
for (const notif_conf of bucket_info.notifications) {
if (check_notif_relevant(notif_conf, req)) {
const notif = {
meta: {
connect: notif.Connect,
name: notif.name
connect: notif_conf.Connect,
name: notif_conf.name
},
notif: get_bucket_log_record(req.op_name, bucket_info, req, res, "NOTIF")
notif: compose_notification(req, res, bucket_info, notif_conf)
};
dbg.log1("logging notif ", notif, ", s3_log = ", s3_log);
dbg.log1("logging notif ", notif_conf, ", notif = ", notif);
writes_aggregate.push({
file: req.notification_logger,
buffer: JSON.stringify(s3_log)
buffer: JSON.stringify(notif)
});
}
}
Expand Down Expand Up @@ -119,7 +119,7 @@ function endpoint_bucket_op_logs(op_name, req, res, source_bucket, writes_aggreg

// 1 - Get all the information to be logged in a log message.
// 2 - Format it and send it to log bucket/syslog.
const s3_log = get_bucket_log_record(op_name, source_bucket, req, res, "LOG");
const s3_log = get_bucket_log_record(op_name, source_bucket, req, res);
dbg.log1("Bucket operation logs = ", s3_log);

switch (config.BUCKET_LOG_TYPE) {
Expand Down Expand Up @@ -152,7 +152,7 @@ function send_op_logs_to_syslog(syslog, s3_log) {
}
}

function get_bucket_log_record(op_name, source_bucket, req, res, type) {
function get_bucket_log_record(op_name, source_bucket, req, res) {

const client_ip = http_utils.parse_client_ip(req);
let status_code = 102;
Expand All @@ -167,16 +167,12 @@ function get_bucket_log_record(op_name, source_bucket, req, res, type) {
remote_ip: client_ip,
request_uri: req.originalUrl,
http_status: status_code,
request_id: req.request_id
request_id: req.request_id,
noobaa_bucket_logging: true,
log_bucket: source_bucket.bucket_info.logging.log_bucket,
log_prefix: source_bucket.bucket_info.logging.log_prefix,
};

//add fields unique to bucket logging
if (type === 'LOG') {
log.noobaa_bucket_logging = true;
log.log_bucket = source_bucket.bucket_info.logging.log_bucket;
log.log_prefix = source_bucket.bucket_info.logging.log_prefix;
}

return log;
}

Expand Down
1 change: 0 additions & 1 deletion src/manage_nsfs/manage_nsfs_validations.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ function validate_options_type_by_value(input_options_with_data) {
continue;
}
const details = `type of flag ${option} should be ${type_of_option}`;
console.log("option = ", option, " typepof option = ", typeof value, " should be ", type_of_option);
throw_cli_error(ManageCLIError.InvalidArgumentType, details);
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -2075,7 +2075,7 @@ class NamespaceFS {
* - XATTR_RESTORE_EXPIRY
* @param {*} params
* @param {nb.ObjectSDK} object_sdk
* @returns {Promise<boolean>}
* @returns {Promise<Object>}
*/
async restore_object(params, object_sdk) {
dbg.log0('namespace_fs.restore_object:', params);
Expand Down Expand Up @@ -2112,7 +2112,7 @@ class NamespaceFS {
});

// Should result in HTTP: 202 Accepted
return true;
return {accepted: true};
}

if (restore_status.state === GlacierBackend.RESTORE_STATUS_ONGOING) {
Expand All @@ -2132,7 +2132,9 @@ class NamespaceFS {
});

// Should result in HTTP: 200 OK
return false;
return {accepted: false,
expires_on,
storage_class: s3_utils.STORAGE_CLASS_GLACIER};
}
} catch (error) {
dbg.error('namespace_fs.restore_object: failed with error: ', error, file_path);
Expand Down
2 changes: 2 additions & 0 deletions src/server/object_services/object_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ async function complete_object_upload(req) {
encryption: obj.encryption,
size: set_updates.size,
content_type: obj.content_type,
seq: set_updates.version_seq,
};
}

Expand Down Expand Up @@ -881,6 +882,7 @@ async function delete_object(req) {
if (obj) {
dbg.log1(`${obj.key} was deleted by ${req.account && req.account.email.unwrap()}`);
}
reply.seq = await MDStore.instance().alloc_object_version_seq();
return reply;
}

Expand Down
1 change: 0 additions & 1 deletion src/server/system_services/bucket_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,6 @@ async function get_bucket_notification(req) {
return {
notifications: bucket.notifications ? bucket.notifications : [],
};
return res;
}

/**
Expand Down
Loading

0 comments on commit d09a342

Please sign in to comment.