-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(processing_engine): Add cron plugins and triggers to the processing engine. #25852
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments and changes. On your questions:
Right now we will fall behind if the plugin consistently takes longer to run than the time between triggers. Do we want to have a different default or expose this as an option?
I think it should be an option. But this means that every execution should spawn its own background task? That way if they've specified that the behavior is noskip
, multiple executions of the trigger should be able to run at the same time.
The cron library we're using includes both second and year fields. The year fields allow for a cron schedule that ends. Right now the code exits, which gives an error if you disable. Should it be marked as disabled? Something else?
Yeah, they give one that has an end on the schedule, it makes sense that we'd mark it as disabled.
Should I hook this into the TimeProvider?
Yes, if we need to get the "now" time.
influxdb3/src/commands/test.rs
Outdated
@@ -1,7 +1,7 @@ | |||
use crate::commands::common::{InfluxDb3Config, SeparatedKeyValue, SeparatedList}; | |||
use anyhow::Context; | |||
use hashbrown::HashMap; | |||
use influxdb3_client::plugin_development::WalPluginTestRequest; | |||
use influxdb3_client::plugin_development::{CronPluginTestRequest, WalPluginTestRequest}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be called Schedule
plugin, rather than cron. Even though the cron string is what we're using, we want a more user friendly name.
influxdb3_config: InfluxDb3Config, | ||
/// If given pass this map of string key/value pairs as input arguments | ||
#[clap(long = "input-arguments")] | ||
pub input_arguments: Option<SeparatedList<SeparatedKeyValue<String, String>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
input_arguments should be on the trigger, not on the plugin itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm mirroring the behavior of WalPluginConfig above, where this gets passed up to the server and then passed down into python via the final argument of the process_scheduled_call() method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is for the test endpoint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is the CLI arguments, mirroring the test wal_plugin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing plugin and trigger creation APIs were sufficient for this, once I added a new TriggerSpecificationDefinition.
Arc::clone(&plugin.query_executor), | ||
&plugin.trigger_definition.trigger_arguments, | ||
) | ||
.expect("should be able to run trigger"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason this might error, can we catch this and log it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, any plugin error would trigger, switching to just have it exit with ?
.
db_schema: Arc<DatabaseSchema>, | ||
) -> Result<(), Error> { | ||
let Some(trigger_time) = self.next_trigger_time else { | ||
return Err(anyhow!("running a cron trigger that is finished.").into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This language could be a little more clear. Not sure what finished means in this context.
influxdb3_py_api/src/system_py.rs
Outdated
let local_api = api.into_pyobject(py)?; | ||
|
||
// turn args into an optional dict to pass into python | ||
let args = args.as_ref().map(|args| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we also have this block in the execute_wal_plugin
, can we pull into a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
factored it out.
ac6ef95
to
e59a2f1
Compare
e59a2f1
to
f3baede
Compare
break; | ||
} | ||
Some(PluginEvent::WriteWalContents(_)) => { | ||
debug!("ignoring wal contents in cron plugin.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unexpected so maybe should be a warn
? Can fix in a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this will be happening. Right now we send the Wal contents to all plugins, without doing any up-front work to figure out whether they actually need the message. For WalRows, this means we don't check if there is data on a relevant table, and for scheduled plugins all of it is irrelevant.
Going to merge this in so I can stack my new plugin type on it. |
Closes #25815.
This adds the
CronSchedule
plugin type to the processing engine, and an accompanying variant ofTriggerSpecificationDefinition
.When a cron schedule runs it looks at the current time to construct an iterator, using the
cron
library. Calls to the plugin proceed sequentially, and there is not currently any catchup. The time passed down to the plugin is the time the cron schedule triggered for, rather than the current time.I've added an endpoint to test cron plugins, similar to the existing one for the wal_plugin.
Some semantics questions I'd appreciate feedback on:
I'm planning to write up tests tomorrow, but wanted to get the PR published.