Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processing_engine): Add cron plugins and triggers to the processing engine. #25852

Merged
merged 3 commits into from
Jan 18, 2025

Conversation

jacksonrnewhouse
Copy link
Contributor

Closes #25815.

This adds the CronSchedule plugin type to the processing engine, and an accompanying variant of TriggerSpecificationDefinition.

When a cron schedule runs it looks at the current time to construct an iterator, using the cron library. Calls to the plugin proceed sequentially, and there is not currently any catchup. The time passed down to the plugin is the time the cron schedule triggered for, rather than the current time.

I've added an endpoint to test cron plugins, similar to the existing one for the wal_plugin.

Some semantics questions I'd appreciate feedback on:

  • Right now we will fall behind if the plugin consistently takes longer to run than the time between triggers. Do we want to have a different default or expose this as an option?
  • The cron library we're using includes both second and year fields. The year fields allow for a cron schedule that ends. Right now the code exits, which gives an error if you disable. Should it be marked as disabled? Something else?
  • Should I hook this into the TimeProvider?

I'm planning to write up tests tomorrow, but wanted to get the PR published.

Copy link
Member

@pauldix pauldix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments and changes. On your questions:

Right now we will fall behind if the plugin consistently takes longer to run than the time between triggers. Do we want to have a different default or expose this as an option?

I think it should be an option. But this means that every execution should spawn its own background task? That way if they've specified that the behavior is noskip, multiple executions of the trigger should be able to run at the same time.

The cron library we're using includes both second and year fields. The year fields allow for a cron schedule that ends. Right now the code exits, which gives an error if you disable. Should it be marked as disabled? Something else?

Yeah, they give one that has an end on the schedule, it makes sense that we'd mark it as disabled.

Should I hook this into the TimeProvider?

Yes, if we need to get the "now" time.

@@ -1,7 +1,7 @@
use crate::commands::common::{InfluxDb3Config, SeparatedKeyValue, SeparatedList};
use anyhow::Context;
use hashbrown::HashMap;
use influxdb3_client::plugin_development::WalPluginTestRequest;
use influxdb3_client::plugin_development::{CronPluginTestRequest, WalPluginTestRequest};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be called Schedule plugin, rather than cron. Even though the cron string is what we're using, we want a more user friendly name.

influxdb3_config: InfluxDb3Config,
/// If given pass this map of string key/value pairs as input arguments
#[clap(long = "input-arguments")]
pub input_arguments: Option<SeparatedList<SeparatedKeyValue<String, String>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

input_arguments should be on the trigger, not on the plugin itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm mirroring the behavior of WalPluginConfig above, where this gets passed up to the server and then passed down into python via the final argument of the process_scheduled_call() method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is for the test endpoint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is the CLI arguments, mirroring the test wal_plugin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing plugin and trigger creation APIs were sufficient for this, once I added a new TriggerSpecificationDefinition.

Arc::clone(&plugin.query_executor),
&plugin.trigger_definition.trigger_arguments,
)
.expect("should be able to run trigger");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason this might error, can we catch this and log it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, any plugin error would trigger, switching to just have it exit with ?.

db_schema: Arc<DatabaseSchema>,
) -> Result<(), Error> {
let Some(trigger_time) = self.next_trigger_time else {
return Err(anyhow!("running a cron trigger that is finished.").into());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This language could be a little more clear. Not sure what finished means in this context.

let local_api = api.into_pyobject(py)?;

// turn args into an optional dict to pass into python
let args = args.as_ref().map(|args| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we also have this block in the execute_wal_plugin, can we pull into a function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

factored it out.

@jacksonrnewhouse jacksonrnewhouse force-pushed the cron_plugin branch 2 times, most recently from ac6ef95 to e59a2f1 Compare January 18, 2025 03:23
break;
}
Some(PluginEvent::WriteWalContents(_)) => {
debug!("ignoring wal contents in cron plugin.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unexpected so maybe should be a warn? Can fix in a followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this will be happening. Right now we send the Wal contents to all plugins, without doing any up-front work to figure out whether they actually need the message. For WalRows, this means we don't check if there is data on a relevant table, and for scheduled plugins all of it is irrelevant.

@pauldix
Copy link
Member

pauldix commented Jan 18, 2025

Going to merge this in so I can stack my new plugin type on it.

@pauldix pauldix merged commit 1d8d3d6 into main Jan 18, 2025
13 checks passed
@pauldix pauldix deleted the cron_plugin branch January 18, 2025 12:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create schedule plugin type and trigger
2 participants