composefs_oci/
skopeo.rs

1//! Container image pulling and registry interaction via skopeo/containers-image-proxy.
2//!
3//! This module provides functionality to pull container images from various registries and import them
4//! into composefs repositories. It uses the containers-image-proxy library to interface with skopeo
5//! for image operations, handling authentication, transport protocols, and image manifest processing.
6//!
7//! The main entry point is the `pull()` function which downloads an image, processes its layers
8//! asynchronously with parallelism control, and stores them in the composefs repository with proper
9//! fs-verity integration. It supports various image formats and compression types.
10
11use std::{cmp::Reverse, process::Command, thread::available_parallelism};
12
13use std::{iter::zip, sync::Arc};
14
15use anyhow::{bail, Context, Result};
16use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
17use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage};
18use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
19use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
20use rustix::process::geteuid;
21use tokio::{io::AsyncReadExt, sync::Semaphore};
22
23use composefs::{
24    fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest,
25};
26
27use crate::{sha256_from_descriptor, sha256_from_digest, tar::split_async, ContentAndVerity};
28
29struct ImageOp<ObjectID: FsVerityHashValue> {
30    repo: Arc<Repository<ObjectID>>,
31    proxy: ImageProxy,
32    img: OpenedImage,
33    progress: MultiProgress,
34}
35
36impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
37    async fn new(
38        repo: &Arc<Repository<ObjectID>>,
39        imgref: &str,
40        img_proxy_config: Option<ImageProxyConfig>,
41    ) -> Result<Self> {
42        // See https://github.com/containers/skopeo/issues/2563
43        let skopeo_cmd = if imgref.starts_with("containers-storage:") && !geteuid().is_root() {
44            let mut cmd = Command::new("podman");
45            cmd.args(["unshare", "skopeo"]);
46            Some(cmd)
47        } else {
48            None
49        };
50
51        let config = match img_proxy_config {
52            Some(mut conf) => {
53                if conf.skopeo_cmd.is_none() {
54                    conf.skopeo_cmd = skopeo_cmd;
55                }
56
57                conf
58            }
59
60            None => {
61                ImageProxyConfig {
62                    skopeo_cmd,
63                    // auth_anonymous: true, debug: true, insecure_skip_tls_verification: None,
64                    ..ImageProxyConfig::default()
65                }
66            }
67        };
68
69        let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
70        let img = proxy.open_image(imgref).await.context("Opening image")?;
71        let progress = MultiProgress::new();
72        Ok(ImageOp {
73            repo: Arc::clone(repo),
74            proxy,
75            img,
76            progress,
77        })
78    }
79
80    pub async fn ensure_layer(
81        &self,
82        layer_sha256: Sha256Digest,
83        descriptor: &Descriptor,
84    ) -> Result<ObjectID> {
85        // We need to use the per_manifest descriptor to download the compressed layer but it gets
86        // stored in the repository via the per_config descriptor.  Our return value is the
87        // fsverity digest for the corresponding splitstream.
88
89        if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? {
90            self.progress
91                .println(format!("Already have layer {}", hex::encode(layer_sha256)))?;
92            Ok(layer_id)
93        } else {
94            // Otherwise, we need to fetch it...
95            let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
96
97            // See https://github.com/containers/containers-image-proxy-rs/issues/71
98            let blob_reader = blob_reader.take(descriptor.size());
99
100            let bar = self.progress.add(ProgressBar::new(descriptor.size()));
101            bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}")
102                .unwrap()
103                .progress_chars("##-"));
104            let progress = bar.wrap_async_read(blob_reader);
105            self.progress
106                .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
107
108            let mut splitstream = self.repo.create_stream(Some(layer_sha256), None);
109            match descriptor.media_type() {
110                MediaType::ImageLayer => {
111                    split_async(progress, &mut splitstream).await?;
112                }
113                MediaType::ImageLayerGzip => {
114                    split_async(GzipDecoder::new(progress), &mut splitstream).await?;
115                }
116                MediaType::ImageLayerZstd => {
117                    split_async(ZstdDecoder::new(progress), &mut splitstream).await?;
118                }
119                other => bail!("Unsupported layer media type {:?}", other),
120            };
121            let layer_id = self.repo.write_stream(splitstream, None)?;
122
123            // We intentionally explicitly ignore this, even though we're supposed to check it.
124            // See https://github.com/containers/containers-image-proxy-rs/issues/80 for discussion
125            // about why.  Note: we only care about the uncompressed layer tar, and we checksum it
126            // ourselves.
127            drop(driver);
128
129            Ok(layer_id)
130        }
131    }
132
133    pub async fn ensure_config(
134        self: &Arc<Self>,
135        manifest_layers: &[Descriptor],
136        descriptor: &Descriptor,
137    ) -> Result<ContentAndVerity<ObjectID>> {
138        let config_sha256 = sha256_from_descriptor(descriptor)?;
139        if let Some(config_id) = self.repo.check_stream(&config_sha256)? {
140            // We already got this config?  Nice.
141            self.progress.println(format!(
142                "Already have container config {}",
143                hex::encode(config_sha256)
144            ))?;
145            Ok((config_sha256, config_id))
146        } else {
147            // We need to add the config to the repo.  We need to parse the config and make sure we
148            // have all of the layers first.
149            //
150            self.progress
151                .println(format!("Fetching config {}", hex::encode(config_sha256)))?;
152
153            let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
154            let config = async move {
155                let mut s = Vec::new();
156                config.read_to_end(&mut s).await?;
157                anyhow::Ok(s)
158            };
159            let (config, driver) = tokio::join!(config, driver);
160            let _: () = driver?;
161            let raw_config = config?;
162            let config = ImageConfiguration::from_reader(&raw_config[..])?;
163
164            // We want to sort the layers based on size so we can get started on the big layers
165            // first.  The last thing we want is to start on the biggest layer right at the end.
166            let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect();
167            layers.sort_by_key(|(mld, ..)| Reverse(mld.size()));
168
169            // Bound the number of tasks to the available parallelism.
170            let threads = available_parallelism()?;
171            let sem = Arc::new(Semaphore::new(threads.into()));
172            let mut entries = vec![];
173            for (mld, diff_id) in layers {
174                let self_ = Arc::clone(self);
175                let permit = Arc::clone(&sem).acquire_owned().await?;
176                let layer_sha256 = sha256_from_digest(diff_id)?;
177                let descriptor = mld.clone();
178                let future = tokio::spawn(async move {
179                    let _permit = permit;
180                    self_.ensure_layer(layer_sha256, &descriptor).await
181                });
182                entries.push((layer_sha256, future));
183            }
184
185            // Collect the results.
186            let mut config_maps = DigestMap::new();
187            for (layer_sha256, future) in entries {
188                config_maps.insert(&layer_sha256, &future.await??);
189            }
190
191            let mut splitstream = self
192                .repo
193                .create_stream(Some(config_sha256), Some(config_maps));
194            splitstream.write_inline(&raw_config);
195            let config_id = self.repo.write_stream(splitstream, None)?;
196
197            Ok((config_sha256, config_id))
198        }
199    }
200
201    pub async fn pull(self: &Arc<Self>) -> Result<ContentAndVerity<ObjectID>> {
202        let (_manifest_digest, raw_manifest) = self
203            .proxy
204            .fetch_manifest_raw_oci(&self.img)
205            .await
206            .context("Fetching manifest")?;
207
208        // We need to add the manifest to the repo.  We need to parse the manifest and make
209        // sure we have the config first (which will also pull in the layers).
210        let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?;
211        let config_descriptor = manifest.config();
212        let layers = manifest.layers();
213        self.ensure_config(layers, config_descriptor)
214            .await
215            .with_context(|| format!("Failed to pull config {config_descriptor:?}"))
216    }
217}
218
219/// Pull the target image, and add the provided tag. If this is a mountable
220/// image (i.e. not an artifact), it is *not* unpacked by default.
221pub async fn pull<ObjectID: FsVerityHashValue>(
222    repo: &Arc<Repository<ObjectID>>,
223    imgref: &str,
224    reference: Option<&str>,
225    img_proxy_config: Option<ImageProxyConfig>,
226) -> Result<(Sha256Digest, ObjectID)> {
227    let op = Arc::new(ImageOp::new(repo, imgref, img_proxy_config).await?);
228    let (sha256, id) = op
229        .pull()
230        .await
231        .with_context(|| format!("Unable to pull container image {imgref}"))?;
232
233    if let Some(name) = reference {
234        repo.name_stream(sha256, name)?;
235    }
236    Ok((sha256, id))
237}