Skip to content

Commit

Permalink
notifications | nsfs
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Prinz Setter <[email protected]>
  • Loading branch information
alphaprinz committed Sep 9, 2024
1 parent bc77785 commit e3888e6
Show file tree
Hide file tree
Showing 16 changed files with 247 additions and 16 deletions.
6 changes: 6 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,12 @@ config.PERSISTENT_BUCKET_LOG_DIR = process.env.GUARANTEED_LOGS_PATH;
config.PERSISTENT_BUCKET_LOG_NS = 'bucket_logging';
config.BUCKET_LOG_CONCURRENCY = 10;

////////////////////////////////
// NOTIFICATIONS //
////////////////////////////////
config.NOTIFICATION_LOG_NS = 'notification_logging';
config.NOTIFICATION_LOG_DIR = process.env.NOTIFICATION_LOG_DIR;

///////////////////////////
// KEY ROTATOR //
///////////////////////////
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"nan": "2.20.0",
"ncp": "2.0.0",
"node-addon-api": "8.1.0",
"node-rdkafka": "3.0.1",
"performance-now": "2.1.0",
"pg": "8.12.0",
"ping": "0.4.4",
Expand Down
47 changes: 47 additions & 0 deletions src/api/common_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,53 @@ module.exports = {
type: 'string',
},
}
},
bucket_notification: {
type: 'object',
required: ['name', 'target_type', 'topic', 'connect'],
properties: {
name: {
type: 'string'
},
target_type: {
type: 'string',
enum: ['kafka']
},
topic: {
type: 'string'
},
connect: {
type: 'string'
},
events: {
type: 'array',
items: {
type: 'string',
enum: [
's3:TestEvent',
's3:ObjectCreated:*',
's3:ObjectCreated:Put',
's3:ObjectCreated:Post',
's3:ObjectCreated:Copy',
's3:ObjectCreated:CompleteMultipartUpload',
's3:ObjectRemoved:*',
's3:ObjectRemoved:Delete',
's3:ObjectRemoved:DeleteMarkerCreated',
's3:ObjectRestore:*',
's3:ObjectRestore:Post',
's3:ObjectRestore:Completed',
's3:ObjectRestore:Delete',
's3:ObjectTagging:*',
's3:ObjectTagging:Put',
's3:ObjectTagging:Delete',
/*We plan to support LifecycleExpiration
's3:LifecycleExpiration:*',
's3:LifecycleExpiration:Delete',
's3:LifecycleExpiration:DeleteMarkerCreated',*/
],
}
}
}
}
}
};
10 changes: 9 additions & 1 deletion src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const { throw_cli_error, get_bucket_owner_account, write_stdout_response, get_bo
is_name_update, is_access_key_update } = require('../manage_nsfs/manage_nsfs_cli_utils');
const manage_nsfs_validations = require('../manage_nsfs/manage_nsfs_validations');
const nc_mkm = require('../manage_nsfs/nc_master_key_manager').get_instance();
const { KafkaNotificator } = require('../util/notifications_util');

let config_fs;

Expand Down Expand Up @@ -68,6 +69,8 @@ async function main(argv = minimist(process.argv.slice(2))) {
await noobaa_cli_diagnose.manage_diagnose_operations(action, user_input, config_fs);
} else if (type === TYPES.UPGRADE) {
await noobaa_cli_upgrade.manage_upgrade_operations(action, config_fs);
} else if (type === TYPES.NOTIFICATION) {
await notification_management();
} else {
throw_cli_error(ManageCLIError.InvalidType);
}
Expand Down Expand Up @@ -98,7 +101,8 @@ async function fetch_bucket_data(action, user_input) {
should_create_underlying_storage: action === ACTIONS.ADD ? false : undefined,
new_name: user_input.new_name === undefined ? undefined : String(user_input.new_name),
fs_backend: user_input.fs_backend === undefined ? config.NSFS_NC_STORAGE_BACKEND : String(user_input.fs_backend),
force_md5_etag: user_input.force_md5_etag === undefined || user_input.force_md5_etag === '' ? user_input.force_md5_etag : get_boolean_or_string_value(user_input.force_md5_etag)
force_md5_etag: user_input.force_md5_etag === undefined || user_input.force_md5_etag === '' ? user_input.force_md5_etag : get_boolean_or_string_value(user_input.force_md5_etag),
notifications: user_input.notifications
};

if (user_input.bucket_policy !== undefined) {
Expand Down Expand Up @@ -708,5 +712,9 @@ async function logging_management() {
await manage_nsfs_logging.export_bucket_logging(config_fs);
}

async function notification_management() {
new KafkaNotificator(config_fs.fs_context).process_notification_files();
}

exports.main = main;
if (require.main === module) main();
14 changes: 12 additions & 2 deletions src/endpoint/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ dbg.log0('endpoint: replacing old umask: ', old_umask.toString(8), 'with new uma
* sts_sdk?: StsSDK;
* virtual_hosts?: readonly string[];
* bucket_logger?: PersistentLogger;
* notification_logger?: PersistentLogger;
* }} EndpointRequest
*/

Expand Down Expand Up @@ -100,6 +101,7 @@ async function create_https_server(ssl_cert_info, honorCipherOrder, endpoint_han
/* eslint-disable max-statements */
async function main(options = {}) {
let bucket_logger;
let notification_logger;
try {
// setting process title needed for letting GPFS to identify the noobaa endpoint processes see issue #8039.
if (config.ENDPOINT_PROCESS_TITLE) {
Expand Down Expand Up @@ -137,6 +139,12 @@ async function main(options = {}) {
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
});

notification_logger = config.NOTIFICATION_LOG_DIR &&
new PersistentLogger(config.NOTIFICATION_LOG_DIR, config.NOTIFICATION_LOG_NS + '_' + node_name, {
locking: 'SHARED',
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
});

process.on('warning', e => dbg.warn(e.stack));

let internal_rpc_client;
Expand Down Expand Up @@ -174,7 +182,8 @@ async function main(options = {}) {
init_request_sdk = create_init_request_sdk(rpc, internal_rpc_client, object_io);
}

const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ false, bucket_logger);
const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ false,
bucket_logger, notification_logger);
const endpoint_request_handler_sts = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ true);

const ssl_cert_info = await ssl_utils.get_ssl_cert_info('S3', options.nsfs_config_root);
Expand Down Expand Up @@ -263,7 +272,7 @@ async function main(options = {}) {
* @param {readonly string[]} virtual_hosts
* @returns {EndpointHandler}
*/
function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger) {
function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger, notification_logger) {
const blob_rest_handler = process.env.ENDPOINT_BLOB_ENABLED === 'true' ? blob_rest : unavailable_handler;
const lambda_rest_handler = config.DB_TYPE === 'mongodb' ? lambda_rest : unavailable_handler;

Expand All @@ -273,6 +282,7 @@ function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger) {
endpoint_utils.prepare_rest_request(req);
req.virtual_hosts = virtual_hosts;
if (logger) req.bucket_logger = logger;
if (notification_logger) req.notification_logger = notification_logger;
init_request_sdk(req, res);
if (req.url.startsWith('/2015-03-31/functions')) {
return lambda_rest_handler(req, res);
Expand Down
2 changes: 2 additions & 0 deletions src/endpoint/s3/ops/s3_delete_object.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const config = require('../../../../config');
*/
async function delete_object(req, res) {
const version_id = s3_utils.parse_version_id(req.query.versionId);
req.s3event = 'ObjectRemoved';
const del_res = await req.object_sdk.delete_object({
bucket: req.params.bucket,
key: req.params.key,
Expand All @@ -26,6 +27,7 @@ async function delete_object(req, res) {
} else if (del_res.created_delete_marker) {
res.setHeader('x-amz-version-id', del_res.created_version_id);
res.setHeader('x-amz-delete-marker', 'true');
req.s3event_op = 'DeleteMarkerCreated';
}
}

Expand Down
1 change: 1 addition & 0 deletions src/endpoint/s3/ops/s3_delete_object_tagging.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const s3_utils = require('../s3_utils');
* https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETEtagging.html
*/
async function delete_object_tagging(req, res) {
req.s3event = 'ObjectTagging';
const reply = await req.object_sdk.delete_object_tagging({
bucket: req.params.bucket,
key: req.params.key,
Expand Down
3 changes: 3 additions & 0 deletions src/endpoint/s3/ops/s3_post_object_restore.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ async function post_object_restore(req, res) {
days,
encryption,
};
req.s3event = 'ObjectRestore';

const accepted = await req.object_sdk.restore_object(params);
if (accepted) {
res.statusCode = 202;
//no need to set s3event_op, it is 'Post' by default because req.method == 'Post'
} else {
res.statusCode = 200;
req.s3event_op = 'Completed';
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/endpoint/s3/ops/s3_post_object_uploadId.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ async function post_object_uploadId(req, res) {
}

http_utils.set_keep_alive_whitespace_interval(res);
req.s3event = "ObjectCreated";
req.s3event_op = "CompleteMultipartUpload";

const reply = await req.object_sdk.complete_object_upload({
obj_id: req.query.uploadId,
Expand Down
3 changes: 3 additions & 0 deletions src/endpoint/s3/ops/s3_put_object.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ async function put_object(req, res) {

dbg.log0('PUT OBJECT', req.params.bucket, req.params.key,
req.headers['x-amz-copy-source'] || '', encryption || '');
req.s3event = "ObjectCreated";
//for copy, use correct s3event_op. otherwise, just use default (req.method)
req.s3event_op = copy_source ? 'Copy' : undefined;

const source_stream = req.chunked_content ? s3_utils.decode_chunked_upload(req) : req;
const reply = await req.object_sdk.upload_object({
Expand Down
1 change: 1 addition & 0 deletions src/endpoint/s3/ops/s3_put_object_tagging.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const s3_utils = require('../s3_utils');
async function put_object_tagging(req, res) {
const tag_set = s3_utils.parse_body_tagging_xml(req);
const version_id = s3_utils.parse_version_id(req.query.versionId);
req.s3event = 'ObjectTagging';
const reply = await req.object_sdk.put_object_tagging({
bucket: req.params.bucket,
key: req.params.key,
Expand Down
84 changes: 74 additions & 10 deletions src/endpoint/s3/s3_bucket_logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,77 @@ const dgram = require('node:dgram');
const { Buffer } = require('node:buffer');
const config = require('../../../config');

function check_notif_relevant(notif, req) {
//if no events were specified, always notify
if (!notif.events) return true;
//check request's event is in notification's events list

for (const notif_event of notif.events) {
const notif_event_elems = notif_event.split(':');
const event_name = notif_event_elems[1];
const event_method = notif_event_elems[2];
const req_s3_event_op = req.s3_event_op || req.method;
if (!req.s3event) return false;
if (event_name.toLowerCase() !== req.s3event.toLowerCase()) return false; //TODO
if (event_method === '*') return true;
if (!req_s3_event_op) return false;
if (event_method.toLowerCase() === req_s3_event_op.toLowerCase()) return true; //TODO
}

//request does not match any of the requested events
return false;
}

async function send_bucket_op_logs(req, res) {
if (req.params && req.params.bucket && req.op_name !== 'put_bucket') {
//potentially, there could be two writes to two different files.
//we want to await for all writes together, instead of serially
//so we aggregate and issue the writes only in the end
const writes_aggregate = [];

const bucket_info = await req.object_sdk.read_bucket_sdk_config_info(req.params.bucket);
dbg.log2("read_bucket_sdk_config_info = ", bucket_info);

if (is_bucket_logging_enabled(bucket_info)) {
dbg.log2("Bucket logging is enabled for Bucket : ", req.params.bucket);
await endpoint_bucket_op_logs(req.op_name, req, res, bucket_info);
endpoint_bucket_op_logs(req.op_name, req, res, bucket_info, writes_aggregate);
}

if (req.notification_logger && bucket_info.notifications) {
for (const notif of bucket_info.notifications) {
if (check_notif_relevant(notif, req)) {
const s3_log = {
meta: {
connect: notif.connect,
topic: notif.topic
},
notif: get_bucket_log_record(req.op_name, bucket_info, req, res, "NOTIF")
};
dbg.log2("logging notif ", notif, ", s3_log = ", s3_log);
writes_aggregate.push({
file: req.notification_logger,
buffer: JSON.stringify(s3_log)
});
}
}
}

//by now we have all possible writes,
//issue them concurrently and then await them
if (writes_aggregate.length > 0) {
const promises = [];
for (const write of writes_aggregate) {
promises.push(new Promise((resolve, reject) => {
write.file.append(write.buffer).then(resolve());
}));
}
await Promise.all(promises);
}
}
}


function is_bucket_logging_enabled(source_bucket) {

if (!source_bucket || !source_bucket.bucket_info.logging) {
return false;
}
Expand Down Expand Up @@ -54,23 +110,27 @@ const create_syslog_udp_socket = (() => {
})();


async function endpoint_bucket_op_logs(op_name, req, res, source_bucket) {
function endpoint_bucket_op_logs(op_name, req, res, source_bucket, writes_aggregate) {

// 1 - Get all the information to be logged in a log message.
// 2 - Format it and send it to log bucket/syslog.
const s3_log = get_bucket_log_record(op_name, source_bucket, req, res);
const s3_log = get_bucket_log_record(op_name, source_bucket, req, res, "LOG");
dbg.log1("Bucket operation logs = ", s3_log);

switch (config.BUCKET_LOG_TYPE) {
case 'PERSISTENT': {
await req.bucket_logger.append(JSON.stringify(s3_log));
//remember this write in writes_aggregate,
//it'll be issued later (with other potential writes)
writes_aggregate.push({
file: req.bucket_logger,
buffer: JSON.stringify(s3_log)
});
break;
}
default: {
send_op_logs_to_syslog(req.object_sdk.rpc_client.rpc.router.syslog, s3_log);
}
}

}

function send_op_logs_to_syslog(syslog, s3_log) {
Expand All @@ -87,27 +147,31 @@ function send_op_logs_to_syslog(syslog, s3_log) {
}
}

function get_bucket_log_record(op_name, source_bucket, req, res) {
function get_bucket_log_record(op_name, source_bucket, req, res, type) {

const client_ip = http_utils.parse_client_ip(req);
let status_code = 102;
if (res && res.statusCode) {
status_code = res.statusCode;
}
const log = {
noobaa_bucket_logging: "true",
op: req.method,
bucket_owner: source_bucket.bucket_owner,
source_bucket: req.params.bucket,
object_key: req.originalUrl,
log_bucket: source_bucket.bucket_info.logging.log_bucket,
log_prefix: source_bucket.bucket_info.logging.log_prefix,
remote_ip: client_ip,
request_uri: req.originalUrl,
http_status: status_code,
request_id: req.request_id
};

//add fields unique to bucket logging
if (type === 'LOG') {
log.noobaa_bucket_logging = true;
log.log_bucket = source_bucket.bucket_info.logging.log_bucket;
log.log_prefix = source_bucket.bucket_info.logging.log_prefix;
}

return log;
}

Expand Down
2 changes: 1 addition & 1 deletion src/manage_nsfs/manage_nsfs_cli_errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ ManageCLIError.InvalidArgumentType = Object.freeze({

ManageCLIError.InvalidType = Object.freeze({
code: 'InvalidType',
message: 'Invalid type, available types are account, bucket, logging, whitelist or upgrade',
message: 'Invalid type, available types are account, bucket, logging, whitelist, upgrade or notification',
http_code: 400,
});

Expand Down
Loading

0 comments on commit e3888e6

Please sign in to comment.