Skip to content

Commit

Permalink
feature(processing_engine): add test for test scheduled plugin.
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Jan 18, 2025
1 parent a71e981 commit e59a2f1
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 5 deletions.
7 changes: 6 additions & 1 deletion influxdb3/src/commands/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,16 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
);
}
SubCommand::SchedulePlugin(plugin_config) => {
let input_arguments = plugin_config.input_arguments.map(|a| {
a.into_iter()
.map(|SeparatedKeyValue((k, v))| (k, v))
.collect::<HashMap<String, String>>()
});
let cron_plugin_test_request = SchedulePluginTestRequest {
filename: plugin_config.filename,
database: plugin_config.influxdb3_config.database_name,
schedule: plugin_config.schedule,
input_arguments: None,
input_arguments,
};
let response = client
.schedule_plugin_test(cron_plugin_test_request)
Expand Down
76 changes: 75 additions & 1 deletion influxdb3/tests/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use test_helpers::tempfile::NamedTempFile;
#[cfg(feature = "system-py")]
use test_helpers::tempfile::TempDir;

const WRITE_REPORTS_PLUGIN_CODE: &str = r#"
#[cfg(feature = "system-py")]
pub const WRITE_REPORTS_PLUGIN_CODE: &str = r#"
def process_writes(influxdb3_local, table_batches, args=None):
for table_batch in table_batches:
# Skip if table_name is write_reports
Expand Down Expand Up @@ -1163,6 +1164,79 @@ def process_writes(influxdb3_local, table_batches, args=None):
let expected_result = serde_json::from_str::<serde_json::Value>(expected_result).unwrap();
assert_eq!(res, expected_result);
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_schedule_plugin_test() {
use crate::ConfigProvider;
use influxdb3_client::Precision;

// Create plugin file with a scheduled task
let plugin_file = create_plugin_file(
r#"
def process_scheduled_call(influxdb3_local, schedule_time, args=None):
influxdb3_local.info(f"args are {args}")
influxdb3_local.info("Successfully called")"#,
);

let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_name = plugin_file.path().file_name().unwrap().to_str().unwrap();

let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();

// Write some test data
server
.write_lp_to_db(
"foo",
"cpu,host=host1,region=us-east usage=0.75\n\
cpu,host=host2,region=us-west usage=0.82\n\
cpu,host=host3,region=us-east usage=0.91",
Precision::Nanosecond,
)
.await
.unwrap();

let db_name = "foo";

// Run the schedule plugin test
let result = run_with_confirmation(&[
"test",
"schedule_plugin",
"--database",
db_name,
"--host",
&server_addr,
"--schedule",
"*/5 * * * * *", // Run every 5 seconds
"--input-arguments",
"region=us-east",
plugin_name,
]);
debug!(result = ?result, "test schedule plugin");

let res = serde_json::from_str::<Value>(&result).unwrap();

// The trigger_time will be dynamic, so we'll just verify it exists and is in the right format
let trigger_time = res["trigger_time"].as_str().unwrap();
assert!(trigger_time.contains('T')); // Basic RFC3339 format check

// Check the rest of the response structure
let expected_result = serde_json::json!({
"log_lines": [
"INFO: args are {'region': 'us-east'}",
"INFO: Successfully called"
],
"database_writes": {
},
"errors": []
});
assert_eq!(res["log_lines"], expected_result["log_lines"]);
assert_eq!(res["database_writes"], expected_result["database_writes"]);
assert_eq!(res["errors"], expected_result["errors"]);
}

#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
Expand Down
7 changes: 4 additions & 3 deletions influxdb3_processing_engine/src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,8 @@ mod python_plugin {
use iox_time::Time;
use observability_deps::tracing::{debug, info, warn};
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use std::time::SystemTime;
use tokio::sync::mpsc::Receiver;
use tokio::time::Instant;

impl TriggerPlugin {
pub(crate) async fn run_wal_contents_plugin(
Expand Down Expand Up @@ -563,7 +562,9 @@ pub(crate) fn run_test_schedule_plugin(
let log_lines = plugin_return_state.log();

let mut database_writes = plugin_return_state.write_db_lines;
database_writes.insert(database, plugin_return_state.write_back_lines);
if !plugin_return_state.write_back_lines.is_empty() {
database_writes.insert(database, plugin_return_state.write_back_lines);
}

let test_write_handler = TestWriteHandler::new(Arc::clone(&catalog), now_time);
let errors = test_write_handler.validate_all_writes(&database_writes);
Expand Down

0 comments on commit e59a2f1

Please sign in to comment.