-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
experimental prefect-aws
bundle steps
#17201
Changes from 6 commits
f3a0113
ef127f1
367e363
9b46f88
b9e5f6d
3040570
9a5718a
2aa9fc2
8d10e6c
404dea5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
from __future__ import annotations | ||
|
||
import os | ||
import tempfile | ||
from pathlib import Path | ||
|
||
import typer | ||
from botocore.exceptions import ClientError | ||
from pydantic_core import from_json | ||
|
||
from prefect.runner import Runner | ||
from prefect.utilities.asyncutils import run_coro_as_sync | ||
from prefect_aws.credentials import AwsCredentials | ||
|
||
from .types import AwsCredentialsBlockName, S3Bucket, S3Key | ||
|
||
|
||
def download_bundle_from_s3( | ||
bucket: S3Bucket, | ||
key: S3Key, | ||
output_dir: str | None = None, | ||
aws_credentials_block_name: AwsCredentialsBlockName | None = None, | ||
) -> dict[str, str]: | ||
""" | ||
Downloads a bundle from an S3 bucket. | ||
|
||
Args: | ||
bucket: S3 bucket name | ||
key: S3 object key | ||
output_dir: Local directory to save the bundle (if None, uses a temp directory) | ||
aws_credentials_block_name: Name of the AWS credentials block to use | ||
""" | ||
|
||
if aws_credentials_block_name: | ||
aws_credentials = AwsCredentials.load(aws_credentials_block_name) | ||
else: | ||
aws_credentials = AwsCredentials() | ||
|
||
s3 = aws_credentials.get_s3_client() | ||
|
||
output_dir = output_dir or tempfile.mkdtemp(prefix="prefect-bundle-") | ||
Path(output_dir).mkdir(parents=True, exist_ok=True) | ||
|
||
local_path = Path(output_dir) / os.path.basename(key) | ||
|
||
try: | ||
s3.download_file(bucket, key, local_path) | ||
return {"local_path": local_path} | ||
except ClientError as e: | ||
raise RuntimeError(f"Failed to download bundle from S3: {e}") | ||
|
||
|
||
def execute_bundle_from_s3( | ||
bucket: S3Bucket, | ||
key: S3Key, | ||
aws_credentials_block_name: AwsCredentialsBlockName | None = None, | ||
) -> None: | ||
""" | ||
Downloads a bundle from S3 and executes it. | ||
|
||
This step: | ||
1. Downloads the bundle from S3 | ||
2. Extracts and deserializes the bundle | ||
3. Executes the flow in a subprocess | ||
|
||
Args: | ||
bucket: S3 bucket name | ||
key: S3 object key | ||
aws_credentials_block_name: Name of the AWS credentials block to use | ||
""" | ||
download_result = download_bundle_from_s3( | ||
bucket=bucket, | ||
key=key, | ||
aws_credentials_block_name=aws_credentials_block_name, | ||
) | ||
|
||
bundle_data = from_json(Path(download_result["local_path"]).read_bytes()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. given the reliance on the |
||
|
||
run_coro_as_sync(Runner().execute_bundle(bundle_data)) | ||
|
||
|
||
if __name__ == "__main__": | ||
typer.run(execute_bundle_from_s3) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from typing import Annotated | ||
|
||
import typer | ||
|
||
S3Bucket = Annotated[str, typer.Option("--bucket")] | ||
S3Key = Annotated[str, typer.Option("--key")] | ||
AwsCredentialsBlockName = Annotated[str, typer.Option("--aws-credentials-block-name")] | ||
LocalFilepath = Annotated[str, typer.Argument()] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
""" | ||
S3 bundle steps for Prefect. | ||
These steps allow uploading and downloading flow/task bundles to and from S3. | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
from pathlib import Path | ||
|
||
import typer | ||
from botocore.exceptions import ClientError | ||
|
||
from prefect_aws.credentials import AwsCredentials | ||
|
||
from .types import ( | ||
AwsCredentialsBlockName, | ||
LocalFilepath, | ||
S3Bucket, | ||
S3Key, | ||
) | ||
|
||
|
||
def upload_bundle_to_s3( | ||
local_filepath: LocalFilepath, | ||
bucket: S3Bucket, | ||
key: S3Key, | ||
aws_credentials_block_name: AwsCredentialsBlockName | None = None, | ||
) -> dict[str, str]: | ||
""" | ||
Uploads a bundle file to an S3 bucket. | ||
|
||
Args: | ||
local_filepath: Local path to the bundle file | ||
bucket: S3 bucket name | ||
key: S3 object key (if None, uses the bundle filename) | ||
aws_credentials_block_name: Name of the AWS credentials block to use | ||
|
||
Returns: | ||
Dictionary containing the bucket, key, and S3 URL of the uploaded bundle | ||
""" | ||
filepath = Path(local_filepath) | ||
if not filepath.exists(): | ||
raise ValueError(f"Bundle file not found: {filepath}") | ||
|
||
key = key or filepath.name | ||
|
||
# Set up S3 client with credentials if provided | ||
if aws_credentials_block_name: | ||
aws_credentials = AwsCredentials.load(aws_credentials_block_name) | ||
else: | ||
aws_credentials = AwsCredentials() | ||
|
||
s3 = aws_credentials.get_s3_client() | ||
|
||
try: | ||
s3.upload_file(str(filepath), bucket, key) | ||
return {"bucket": bucket, "key": key, "url": f"s3://{bucket}/{key}"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same super minor point: could be worth a structured return object |
||
except ClientError as e: | ||
raise RuntimeError(f"Failed to upload bundle to S3: {e}") | ||
|
||
|
||
if __name__ == "__main__": | ||
typer.run(upload_bundle_to_s3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two notes:
None
, credentials inferred from environment