Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experimental prefect-aws bundle steps #17201

Merged
merged 10 commits into from
Feb 20, 2025
Merged

experimental prefect-aws bundle steps #17201

merged 10 commits into from
Feb 20, 2025

Conversation

zzstoatzz
Copy link
Collaborator

@zzstoatzz zzstoatzz commented Feb 19, 2025

this PR adds the code for transporting bundles to and from s3, allowing execution of the bundle directly from s3 with a Runner

  • upload bundle to s3
  • download and execute bundle
example
import asyncio
import subprocess
from pathlib import Path
from typing import Any

from pydantic_core import to_json

from prefect import flow, get_client
from prefect._experimental.bundles import create_bundle_for_flow_run

BUCKET_NAME = "zestoatbucket"


async def _write_sample_bundle(parameters: dict[str, Any]):
    @flow(log_prints=True)
    def add_one(x: int) -> int:
        _sum = x + 1
        print(f"x: {x}, _sum: {_sum}")
        return _sum

    async with get_client() as client:
        flow_run = await client.create_flow_run(
            flow=add_one,
            parameters=parameters,
        )

    bundle = create_bundle_for_flow_run(add_one, flow_run)

    Path("foo").write_bytes(to_json(bundle))


async def main():
    ### WORKER SUBMIT STUFF

    await _write_sample_bundle(parameters={"x": 42})

    subprocess.run(
        [
            "uv",
            "run",
            "-m",
            "prefect_aws.experimental.bundles.upload",
            "foo",
            "--bucket",
            BUCKET_NAME,
            "--key",
            "foo",
        ]
    )

    ### ENTRYPOINT FOR EVENTUAL RUNTIME

    subprocess.run(
        [
            "uv",
            "run",
            "-m",
            "prefect_aws.experimental.bundles.execute",
            "--bucket",
            BUCKET_NAME,
            "--key",
            "foo",
        ]
    )


if __name__ == "__main__":
    asyncio.run(main())

results in

14:12:32.972 | INFO    | Flow run 'outgoing-bobcat' - Beginning flow run 'outgoing-bobcat' for flow 'add-one'
14:12:32.974 | INFO    | Flow run 'outgoing-bobcat' - View at http://localhost:4200/runs/flow-run/461d15c2-4e9c-4af6-a313-07c5b7c28d8c
14:12:32.975 | INFO    | Flow run 'outgoing-bobcat' - x: 42, _sum: 43
14:12:32.987 | INFO    | Flow run 'outgoing-bobcat' - Finished in state Completed()
14:12:33.076 | INFO    | prefect.flow_runs.runner - Process for flow run 'outgoing-bobcat' exited cleanly.

Related to #17194

Copy link

codspeed-hq bot commented Feb 19, 2025

CodSpeed Performance Report

Merging #17201 will not alter performance

Comparing bundle-steps (516919a) with main (8e802bb)

Summary

✅ 2 untouched benchmarks

@zzstoatzz zzstoatzz changed the title [wip] prefect-aws bundle steps experimental prefect-aws bundle steps Feb 19, 2025
@zzstoatzz zzstoatzz self-assigned this Feb 19, 2025
@zzstoatzz zzstoatzz added the integrations Related to integrations with other services label Feb 19, 2025
@zzstoatzz zzstoatzz marked this pull request as ready for review February 19, 2025 22:34
Copy link
Member

@desertaxle desertaxle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation looks good! Could you add some tests for the CLI interface also? We'll want to ensure that behavior is maintained.

test CLI interface
Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool use of typer - left some minor bookkeeping comments, otherwise this makes sense to me

bucket: S3 bucket name
key: S3 object key
output_dir: Local directory to save the bundle (if None, uses a temp directory)
aws_credentials_block_name: Name of the AWS credentials block to use
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two notes:

  • worth calling out if block is None, credentials inferred from environment
  • prob worth doc'ing the return value (what is generally in the dict?)

aws_credentials_block_name=aws_credentials_block_name,
)

bundle_data = from_json(Path(download_result["local_path"]).read_bytes())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the reliance on the local_path key, could be worth elevating the return type of download_bundle to a TypedDict or dataclass


try:
s3.upload_file(str(filepath), bucket, key)
return {"bucket": bucket, "key": key, "url": f"s3://{bucket}/{key}"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same super minor point: could be worth a structured return object

Copy link
Member

@desertaxle desertaxle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small question on test assertions, but otherwise LGTM!

Comment on lines 167 to 174
sys.argv = [
"upload.py",
str(mock_bundle_file),
"--bucket",
"test-bucket",
"--key",
"test-key",
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we verify these values get passed into the function as expected?

@zzstoatzz zzstoatzz merged commit ff4134e into main Feb 20, 2025
14 checks passed
@zzstoatzz zzstoatzz deleted the bundle-steps branch February 20, 2025 20:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
integrations Related to integrations with other services
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants