In an initial version of the project we used a JSON config based approach towards transmitting the workflow information to the deployment utilities. However, this approach decoupled the workflow from the source code and thus made it hard to reason about the workflow. Furthermore, it made the static code analysis more complicated as we had to parse the JSON config file and match the functions to the config file. Thus we decided to use source code annotations instead. The source code annotations are used to annotate the functions that are part of the workflow.
The source code annotations allow us to easily reason about the workflow and to extract both the physical and the logical representation of the workflow. There is still an additional configuration file that is used to transmit information such as environment variables and the name of the workflow to the deployment utilities. However, this information is decoupled from the source code and thus does not affect the static code analysis.
Currently, we support the following annotations:
- At the beginning of the workflow the user has to register the workflow with the following annotation:
workflow = CaribouWorkflow("workflow_name")
- At the beginning of each function the user has to register the function with the following annotation:
@workflow.serverless_function(
name="First_Function",
entry_point=True,
regions_and_providers={
"allowed_regions": [
{
"provider": "aws",
"region": "us-east-1",
}
],
"disallowed_regions": None,
"providers": {
"aws": {
"config": {
"timeout": 60,
"memory": 128,
},
},
},
},
environment_variables=[
{
"key": "example_key",
"value": "example_value"
}
],
)
The meaning of the different parameters is as follows:
-
name
: The name of the function. This is the name that is used directly in the physical representation of the workflow and is also used to identify the function in the logical representation of the workflow (see also the section on Logical Node Naming Scheme). -
entry_point
: A boolean flag that indicates whether the function is the entry point of the workflow. There can only be one entry point in a workflow. -
regions_and_providers
: A dictionary that contains the regions and providers that the function can be deployed to. This can be used to override the global settings in theconfig.yml
. If none or an empty dictionary is provided, the global config takes precedence. The dictionary has two keys:allowed_regions
: A list of regions that the function can be deployed to. If this list is empty, the function can be deployed to any region.disallowed_regions
: A list of regions that the function cannot be deployed to. If this list is empty, the function can be deployed to any region.providers
: A list of providers that the function can be deployed to. This can be used to override the global settings in theconfig.yml
. If a list of providers is specified at the function level this takes precedence over the global configurations. If none or an empty list is provided, the global config takes precedence. Each provider is a dictionary with two keys:name
: The name of the provider.config
: A dictionary that contains the configuration for the specific provider.
-
environment_variables
: This parameter represents a list of dictionaries, each designed for setting environment variables specifically for a function. Users must adhere to a structured format within each dictionary. This format requires two entries: "key" and "value". The "key" entry should contain the name of the environment variable, serving as an identifier. The "value" entry holds the corresponding value assigned to that variable. -
Within a function, a user can register a call to another function with the following annotation:
workflow.invoke_serverless_function(second_function, payload)
The payload is a dictionary that contains the input data for the function. The payload is optional and can be omitted if the function does not require any input data.
- Additionally, there is the option of conditionally calling a function. This is done with the following annotation:
workflow.invoke_serverless_function(second_function, payload, condition)
The condition is a boolean expression that is evaluated at runtime. If the condition evaluates to true, the function is called, otherwise it is not called.
- Finally, there is the option of synchronizing multiple predecessor calls at a synchronization node. The responses from the predecessor calls are then passed to the synchronization node as a list of responses. This is done with the following annotation:
responses: list[Any] = workflow.get_predecessor_data()
Using this annotation within a function has an important implication with regards to when the entire function is being executed. The entire function is only executed once all predecessor calls have been completed and the data has been synchronized. This is important to keep in mind when designing the workflow. Any code within the function preceding the annotation is also executed only once all predecessor calls have been completed and the data has been synchronized. See also the documentation on the Synchronization Node.
Additional Notes:
- All calls to
workflow.invoke_serverless_function
should not be placed inside anif
orfor
loop and must always be executed. For cases where conditional non-execution is required, set thecondition
parameter toFalse
. - The Caribou Wrapper defined in
caribou_workflow.py
also uses and reserves the custom log level25
. Please avoid using custom logs with log level25
to prevent conflicts.