Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Streaming utils for zipping and reading/wiring to S3 #7186

Merged
Merged
Show file tree
Hide file tree
Changes from 68 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
0054d6a
added stream-zip
Feb 7, 2025
ff96d46
added utils for stream zipping
Feb 7, 2025
fe6c34d
rename
Feb 7, 2025
bd1df7c
added minimal progress support
Feb 7, 2025
7b7aae0
rename
Feb 7, 2025
5f72a43
fixed types
Feb 7, 2025
aace76a
refactor
Feb 7, 2025
388b81a
refactor
Feb 7, 2025
357273a
added S3 streaming and integration test
Feb 7, 2025
973423e
refactor
Feb 7, 2025
b2a66f5
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 7, 2025
c129672
removed debug print
Feb 7, 2025
e32f265
refactor to use size instead of items count as progress
Feb 7, 2025
de490ad
using faster file hash checking
Feb 7, 2025
706934d
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 7, 2025
c672212
refactor progress on zip
Feb 7, 2025
cee5e9c
remove unused
Feb 7, 2025
6d3eb72
remove unused
Feb 7, 2025
3873976
remove outdated
Feb 7, 2025
706ee4b
reshuffled imports
Feb 7, 2025
a7e0867
fixed more broken imports
Feb 7, 2025
4775e55
reverted delted import
Feb 7, 2025
e0a5407
remove unused error
Feb 7, 2025
23515e3
revert number
Feb 7, 2025
b546f10
fixed broken import
Feb 7, 2025
d26289d
typing
Feb 10, 2025
f2c5923
fixeed tests
Feb 10, 2025
c2eea57
typing and imports
Feb 10, 2025
dc3b63a
fixed broken test
Feb 10, 2025
7eac64d
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 11, 2025
1fabe65
added missing
Feb 11, 2025
b245e63
rename module
Feb 11, 2025
19b5ef5
added FileLikeFileStreamReader
Feb 11, 2025
a5c9060
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 11, 2025
6fb389d
repalced with simpler implementation
Feb 11, 2025
c6cc5e0
rename
Feb 11, 2025
e7eee8b
refactor imports
Feb 11, 2025
2045c08
refactor
Feb 11, 2025
1298895
refactor
Feb 11, 2025
d17e450
added readme
Feb 11, 2025
d14d8fd
refacto fixture
Feb 11, 2025
0497f86
extended tests
Feb 11, 2025
6261c65
extended tests
Feb 12, 2025
cd9d443
renaming
Feb 12, 2025
0777442
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 12, 2025
e042a6c
fixed broken mocks
Feb 12, 2025
eea3fba
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 13, 2025
ed8280f
rename
Feb 13, 2025
f170791
rename
Feb 13, 2025
17c0ac5
rename module
Feb 13, 2025
c024827
refactor interface
Feb 13, 2025
a844a0c
refactor progress
Feb 13, 2025
a277e12
refactor placement of FileLikeFileStreamReader
Feb 13, 2025
5a1a8e7
rename
Feb 13, 2025
ec87ba0
update
Feb 13, 2025
b835ae0
rename and move around parts
Feb 13, 2025
70afc40
renamed modules
Feb 13, 2025
0addf04
renames
Feb 13, 2025
cb479f3
moved imports to more appropriate places
Feb 13, 2025
881d2a1
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 13, 2025
e37972c
refactor
Feb 13, 2025
0f11526
renamed
Feb 13, 2025
b23d1f1
renamed to bytes_iter
Feb 13, 2025
b29bff2
renaming paths
Feb 13, 2025
d6ca255
renamed
Feb 13, 2025
a169f73
refactor
Feb 13, 2025
2c2d2db
added missing type
Feb 14, 2025
526ed0a
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 14, 2025
0ec7e23
rename fixture
Feb 14, 2025
9d48624
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 14, 2025
3aa766b
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 17, 2025
9cf8f1b
rename
Feb 17, 2025
d2f928a
removed todo in test
Feb 17, 2025
25372c7
Merge branch 'master' into pr-osparc-stream-zipping-of-s3-content
GitHK Feb 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/aws-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ protobuf==5.29.3
# opentelemetry-proto
psutil==6.1.1
# via -r requirements/../../../packages/service-library/requirements/_base.in
pycryptodome==3.21.0
# via stream-zip
pydantic==2.10.6
# via
# -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
Expand Down Expand Up @@ -368,6 +370,8 @@ six==1.17.0
# via python-dateutil
sniffio==1.3.1
# via anyio
stream-zip==0.0.83
# via -r requirements/../../../packages/service-library/requirements/_base.in
tenacity==9.0.0
# via -r requirements/../../../packages/service-library/requirements/_base.in
toolz==1.0.0
Expand Down
54 changes: 54 additions & 0 deletions packages/aws-library/src/aws_library/s3/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
from botocore.client import Config
from models_library.api_schemas_storage import ETag, S3BucketName, UploadedPart
from models_library.basic_types import SHA256Str
from models_library.bytes_iters import BytesIter, DataSize
from pydantic import AnyUrl, ByteSize, TypeAdapter
from servicelib.bytes_iters import DEFAULT_READ_CHUNK_SIZE, BytesStreamer
from servicelib.logging_utils import log_catch, log_context
from servicelib.s3_utils import FileLikeReader
from servicelib.utils import limited_gather
from settings_library.s3 import S3Settings
from types_aiobotocore_s3 import S3Client
Expand Down Expand Up @@ -470,6 +473,57 @@ async def copy_objects_recursively(
limit=_MAX_CONCURRENT_COPY,
)

async def get_bytes_streamer_from_object(
self,
bucket_name: S3BucketName,
object_key: S3ObjectKey,
*,
chunk_size: int = DEFAULT_READ_CHUNK_SIZE,
) -> BytesStreamer:
"""stream read an object from S3 chunk by chunk"""

# NOTE `download_fileobj` cannot be used to implement this because
# it will buffer the entire file in memory instead of reading it
# chunk by chunk

# below is a quick call
head_response = await self._client.head_object(
Bucket=bucket_name, Key=object_key
)
data_size = DataSize(head_response["ContentLength"])

async def _() -> BytesIter:
# Download the file in chunks
position = 0
while position < data_size:
# Calculate the range for this chunk
end = min(position + chunk_size - 1, data_size - 1)
range_header = f"bytes={position}-{end}"

# Download the chunk
response = await self._client.get_object(
Bucket=bucket_name, Key=object_key, Range=range_header
)

chunk = await response["Body"].read()

# Yield the chunk for processing
yield chunk

position += chunk_size

return BytesStreamer(data_size, _)

@s3_exception_handler(_logger)
async def upload_object_from_file_like(
self,
bucket_name: S3BucketName,
object_key: S3ObjectKey,
file_like_reader: FileLikeReader,
) -> None:
"""streams write an object in S3 from an AsyncIterable[bytes]"""
await self._client.upload_fileobj(file_like_reader, bucket_name, object_key) # type: ignore[arg-type]

@staticmethod
def is_multipart(file_size: ByteSize) -> bool:
return file_size >= MULTIPART_UPLOADS_MIN_TOTAL_SIZE
Expand Down
Loading
Loading