Skip to content
This repository has been archived by the owner on Jan 15, 2025. It is now read-only.

wip: attempt to drop async_compression #621

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ rust-version = "1.74.0"
[dependencies]
anyhow = "1.0"
containers-image-proxy = "0.5.5"
async-compression = { version = "0.4", features = ["gzip", "tokio", "zstd"] }
camino = "1.0.4"
chrono = "0.4.19"
olpc-cjson = "0.1.1"
Expand Down Expand Up @@ -43,6 +42,7 @@ tokio = { features = ["io-std", "time", "process", "rt", "net"], version = ">= 1
tokio-util = { features = ["io-util"], version = "0.7" }
tokio-stream = { features = ["sync"], version = "0.1.8" }
tracing = "0.1"
zstd = "0.13.1"

indoc = { version = "2", optional = true }
xshell = { version = "0.2", optional = true }
Expand Down
10 changes: 3 additions & 7 deletions lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,13 +850,9 @@ async fn testing(opts: &TestingOpts) -> Result<()> {
TestingOpts::RunIMA => crate::integrationtest::test_ima(),
TestingOpts::FilterTar => {
let tmpdir = cap_std_ext::cap_tempfile::TempDir::new(cap_std::ambient_authority())?;
crate::tar::filter_tar(
std::io::stdin(),
std::io::stdout(),
&Default::default(),
&tmpdir,
)
.map(|_| {})
let stdin = std::io::stdin();
crate::tar::filter_tar(stdin, std::io::stdout(), &Default::default(), &tmpdir)
.map(|_| {})
}
}
}
Expand Down
30 changes: 27 additions & 3 deletions lib/src/container/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const IMAGE_PREFIX: &str = "ostree/container/image";
/// ref with the project name, so the final ref may be of the form e.g. `ostree/container/baseimage/bootc/foo`.
pub const BASE_IMAGE_PREFIX: &str = "ostree/container/baseimage";

/// The legacy MIME type returned by the skopeo/(containers/storage) code
/// when we have local uncompressed docker-formatted image.
/// TODO: change the skopeo code to shield us from this correctly
const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar";

/// The key injected into the merge commit for the manifest digest.
pub(crate) const META_MANIFEST_DIGEST: &str = "ostree.manifest-digest";
/// The key injected into the merge commit with the manifest serialized as JSON.
Expand Down Expand Up @@ -437,6 +442,20 @@ fn timestamp_of_manifest_or_config(
.log_err_default()
}

fn decompressor_for_media_type(
media_type: oci_spec::image::MediaType,
src: impl std::io::Read + Send + 'static,
) -> Result<Box<dyn std::io::Read + Send>> {
let r: Box<dyn std::io::Read + Send> = match media_type {
oci_image::MediaType::ImageLayerGzip => Box::new(flate2::read::GzDecoder::new(src)),
oci_image::MediaType::ImageLayerZstd => Box::new(zstd::stream::Decoder::new(src)?),
oci_image::MediaType::ImageLayer => Box::new(src),
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src),
o => anyhow::bail!("Unhandled layer type: {}", o),
};
Ok(r)
}

impl ImageImporter {
/// The metadata key used in ostree commit metadata to serialize
const CACHED_KEY_MANIFEST_DIGEST: &'static str = "ostree-ext.cached.manifest-digest";
Expand Down Expand Up @@ -700,7 +719,7 @@ impl ImageImporter {
p.send(ImportProgress::OstreeChunkStarted(layer.layer.clone()))
.await?;
}
let (blob, driver) = fetch_layer_decompress(
let (blob, driver, media_type) = fetch_layer(
&mut self.proxy,
&self.proxy_img,
&import.manifest,
Expand All @@ -710,13 +729,15 @@ impl ImageImporter {
self.imgref.imgref.transport,
)
.await?;

let repo = self.repo.clone();
let target_ref = layer.ostree_ref.clone();
let import_task =
crate::tokio_util::spawn_blocking_cancellable_flatten(move |cancellable| {
let txn = repo.auto_transaction(Some(cancellable))?;
let mut importer = crate::tar::Importer::new_for_object_set(&repo);
let blob = tokio_util::io::SyncIoBridge::new(blob);
let blob = decompressor_for_media_type(media_type, blob)?;
let mut archive = tar::Archive::new(blob);
importer.import_objects(&mut archive, Some(cancellable))?;
let commit = if write_refs {
Expand Down Expand Up @@ -745,7 +766,7 @@ impl ImageImporter {
))
.await?;
}
let (blob, driver) = fetch_layer_decompress(
let (blob, driver, media_type) = fetch_layer(
&mut self.proxy,
&self.proxy_img,
&import.manifest,
Expand All @@ -762,6 +783,7 @@ impl ImageImporter {
let txn = repo.auto_transaction(Some(cancellable))?;
let mut importer = crate::tar::Importer::new_for_commit(&repo, remote);
let blob = tokio_util::io::SyncIoBridge::new(blob);
let blob = decompressor_for_media_type(media_type, blob)?;
let mut archive = tar::Archive::new(blob);
importer.import_commit(&mut archive, Some(cancellable))?;
let commit = importer.finish_import_commit();
Expand Down Expand Up @@ -856,7 +878,7 @@ impl ImageImporter {
p.send(ImportProgress::DerivedLayerStarted(layer.layer.clone()))
.await?;
}
let (blob, driver) = super::unencapsulate::fetch_layer_decompress(
let (blob, driver, media_type) = super::unencapsulate::fetch_layer(
&mut proxy,
&proxy_img,
&import.manifest,
Expand All @@ -874,6 +896,8 @@ impl ImageImporter {
allow_nonusr: root_is_transient,
retain_var: self.ostree_v2024_3,
};
let blob = tokio_util::io::SyncIoBridge::new(blob);
let blob = decompressor_for_media_type(media_type, blob)?;
let r =
crate::tar::write_tar(&self.repo, blob, layer.ostree_ref.as_str(), Some(opts));
let r = super::unencapsulate::join_fetch(r, driver)
Expand Down
41 changes: 8 additions & 33 deletions lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ use tokio::{
};
use tracing::instrument;

/// The legacy MIME type returned by the skopeo/(containers/storage) code
/// when we have local uncompressed docker-formatted image.
/// TODO: change the skopeo code to shield us from this correctly
const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar";

type Progress = tokio::sync::watch::Sender<u64>;

/// A read wrapper that updates the download progress.
Expand Down Expand Up @@ -189,26 +184,8 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
importer.unencapsulate().await
}

/// Create a decompressor for this MIME type, given a stream of input.
fn new_async_decompressor<'a>(
media_type: &oci_image::MediaType,
src: impl AsyncBufRead + Send + Unpin + 'a,
) -> Result<Box<dyn AsyncBufRead + Send + Unpin + 'a>> {
match media_type {
oci_image::MediaType::ImageLayerGzip => Ok(Box::new(tokio::io::BufReader::new(
async_compression::tokio::bufread::GzipDecoder::new(src),
))),
oci_image::MediaType::ImageLayerZstd => Ok(Box::new(tokio::io::BufReader::new(
async_compression::tokio::bufread::ZstdDecoder::new(src),
))),
oci_image::MediaType::ImageLayer => Ok(Box::new(src)),
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Ok(Box::new(src)),
o => Err(anyhow::anyhow!("Unhandled layer type: {}", o)),
}
}

/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
pub(crate) async fn fetch_layer_decompress<'a>(
/// A wrapper for [`get_blob`] which fetches a layer
pub(crate) async fn fetch_layer<'a>(
proxy: &'a mut ImageProxy,
img: &OpenedImage,
manifest: &oci_image::ImageManifest,
Expand All @@ -219,12 +196,12 @@ pub(crate) async fn fetch_layer_decompress<'a>(
) -> Result<(
Box<dyn AsyncBufRead + Send + Unpin>,
impl Future<Output = Result<()>> + 'a,
oci_image::MediaType,
)> {
use futures_util::future::Either;
tracing::debug!("fetching {}", layer.digest());
let layer_index = manifest.layers().iter().position(|x| x == layer).unwrap();
let (blob, driver, size);
let media_type: &oci_image::MediaType;
let (blob, driver, size, media_type);
match transport_src {
Transport::ContainerStorage => {
let layer_info = layer_info
Expand All @@ -234,17 +211,17 @@ pub(crate) async fn fetch_layer_decompress<'a>(
anyhow!("blobid position {layer_index} exceeds diffid count {n_layers}")
})?;
size = layer_blob.size;
media_type = &layer_blob.media_type;
(blob, driver) = proxy
.get_blob(img, layer_blob.digest.as_str(), size as u64)
.await?;
media_type = layer_blob.media_type.clone();
}
_ => {
size = layer.size();
media_type = layer.media_type();
(blob, driver) = proxy
.get_blob(img, layer.digest().as_str(), size as u64)
.await?;
media_type = layer.media_type().clone();
}
};

Expand All @@ -262,11 +239,9 @@ pub(crate) async fn fetch_layer_decompress<'a>(
progress.send_replace(Some(status));
}
};
let reader = new_async_decompressor(media_type, readprogress)?;
let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
Ok((reader, Either::Left(driver)))
Ok((Box::new(readprogress), Either::Left(driver), media_type))
} else {
let blob = new_async_decompressor(media_type, blob)?;
Ok((blob, Either::Right(driver)))
Ok((Box::new(blob), Either::Right(driver), media_type))
}
}
13 changes: 5 additions & 8 deletions lib/src/tar/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::io::{BufWriter, Seek, Write};
use std::path::Path;
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use tokio::io::{AsyncReadExt, AsyncWrite};
use tracing::instrument;

// Exclude things in https://www.freedesktop.org/wiki/Software/systemd/APIFileSystems/
Expand Down Expand Up @@ -290,17 +290,14 @@ pub(crate) fn filter_tar(
/// Asynchronous wrapper for filter_tar()
#[context("Filtering tar stream")]
async fn filter_tar_async(
src: impl AsyncRead + Send + 'static,
mut src: impl std::io::Read + Send + 'static,
mut dest: impl AsyncWrite + Send + Unpin,
config: &TarImportConfig,
repo_tmpdir: Dir,
) -> Result<BTreeMap<String, u32>> {
let (tx_buf, mut rx_buf) = tokio::io::duplex(8192);
// The source must be moved to the heap so we know it is stable for passing to the worker thread
let src = Box::pin(src);
let config = config.clone();
let tar_transformer = tokio::task::spawn_blocking(move || {
let mut src = tokio_util::io::SyncIoBridge::new(src);
let dest = tokio_util::io::SyncIoBridge::new(tx_buf);
let r = filter_tar(&mut src, dest, &config, &repo_tmpdir);
// Pass ownership of the input stream back to the caller - see below.
Expand All @@ -326,7 +323,7 @@ async fn filter_tar_async(
#[instrument(level = "debug", skip_all)]
pub async fn write_tar(
repo: &ostree::Repo,
src: impl tokio::io::AsyncRead + Send + Unpin + 'static,
src: impl std::io::Read + Send + Unpin + 'static,
refname: &str,
options: Option<WriteTarOptions>,
) -> Result<WriteTarResult> {
Expand Down Expand Up @@ -435,7 +432,7 @@ pub async fn write_tar(
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use std::io::{BufReader, Cursor};

#[test]
fn test_normalize_path() {
Expand Down Expand Up @@ -515,7 +512,7 @@ mod tests {
rootfs_tar.append_dir_all(".", rootfs)?;
let _ = rootfs_tar.into_inner()?;
let mut dest = Vec::new();
let src = tokio::io::BufReader::new(tokio::fs::File::open(rootfs_tar_path).await?);
let src = std::fs::File::open(rootfs_tar_path).map(BufReader::new)?;
let cap_tmpdir = Dir::open_ambient_dir(&tempd, cap_std::ambient_authority())?;
filter_tar_async(src, &mut dest, &Default::default(), cap_tmpdir).await?;
let dest = dest.as_slice();
Expand Down
7 changes: 2 additions & 5 deletions lib/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,8 @@ async fn test_tar_write() -> Result<()> {
tmproot.write("run/somefile", "somestate")?;
let tmptar = "testlayer.tar";
cmd!(sh, "tar cf {tmptar} -C tmproot .").run()?;
let src = fixture.dir.open(tmptar)?;
let src = fixture.dir.open(tmptar).map(BufReader::new)?;
fixture.dir.remove_file(tmptar)?;
let src = tokio::fs::File::from_std(src.into_std());
let r = ostree_ext::tar::write_tar(fixture.destrepo(), src, "layer", None).await?;
let layer_commit = r.commit.as_str();
cmd!(
Expand All @@ -401,9 +400,7 @@ async fn test_tar_write() -> Result<()> {
#[tokio::test]
async fn test_tar_write_tar_layer() -> Result<()> {
let fixture = Fixture::new_v1()?;
let uncompressed_tar = tokio::io::BufReader::new(
async_compression::tokio::bufread::GzipDecoder::new(EXAMPLE_TAR_LAYER),
);
let uncompressed_tar = flate2::read::GzDecoder::new(std::io::Cursor::new(EXAMPLE_TAR_LAYER));
ostree_ext::tar::write_tar(fixture.destrepo(), uncompressed_tar, "test", None).await?;
Ok(())
}
Expand Down
Loading