1use std::{cmp::Reverse, process::Command, thread::available_parallelism};
12
13use std::{iter::zip, sync::Arc};
14
15use anyhow::{bail, Context, Result};
16use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
17use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage};
18use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
19use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
20use rustix::process::geteuid;
21use tokio::{io::AsyncReadExt, sync::Semaphore};
22
23use composefs::{
24 fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest,
25};
26
27use crate::{sha256_from_descriptor, sha256_from_digest, tar::split_async, ContentAndVerity};
28
29struct ImageOp<ObjectID: FsVerityHashValue> {
30 repo: Arc<Repository<ObjectID>>,
31 proxy: ImageProxy,
32 img: OpenedImage,
33 progress: MultiProgress,
34}
35
36impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
37 async fn new(
38 repo: &Arc<Repository<ObjectID>>,
39 imgref: &str,
40 img_proxy_config: Option<ImageProxyConfig>,
41 ) -> Result<Self> {
42 let skopeo_cmd = if imgref.starts_with("containers-storage:") && !geteuid().is_root() {
44 let mut cmd = Command::new("podman");
45 cmd.args(["unshare", "skopeo"]);
46 Some(cmd)
47 } else {
48 None
49 };
50
51 let config = match img_proxy_config {
52 Some(mut conf) => {
53 if conf.skopeo_cmd.is_none() {
54 conf.skopeo_cmd = skopeo_cmd;
55 }
56
57 conf
58 }
59
60 None => {
61 ImageProxyConfig {
62 skopeo_cmd,
63 ..ImageProxyConfig::default()
65 }
66 }
67 };
68
69 let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
70 let img = proxy.open_image(imgref).await.context("Opening image")?;
71 let progress = MultiProgress::new();
72 Ok(ImageOp {
73 repo: Arc::clone(repo),
74 proxy,
75 img,
76 progress,
77 })
78 }
79
80 pub async fn ensure_layer(
81 &self,
82 layer_sha256: Sha256Digest,
83 descriptor: &Descriptor,
84 ) -> Result<ObjectID> {
85 if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? {
90 self.progress
91 .println(format!("Already have layer {}", hex::encode(layer_sha256)))?;
92 Ok(layer_id)
93 } else {
94 let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
96
97 let blob_reader = blob_reader.take(descriptor.size());
99
100 let bar = self.progress.add(ProgressBar::new(descriptor.size()));
101 bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}")
102 .unwrap()
103 .progress_chars("##-"));
104 let progress = bar.wrap_async_read(blob_reader);
105 self.progress
106 .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
107
108 let mut splitstream = self.repo.create_stream(Some(layer_sha256), None);
109 match descriptor.media_type() {
110 MediaType::ImageLayer => {
111 split_async(progress, &mut splitstream).await?;
112 }
113 MediaType::ImageLayerGzip => {
114 split_async(GzipDecoder::new(progress), &mut splitstream).await?;
115 }
116 MediaType::ImageLayerZstd => {
117 split_async(ZstdDecoder::new(progress), &mut splitstream).await?;
118 }
119 other => bail!("Unsupported layer media type {:?}", other),
120 };
121 let layer_id = self.repo.write_stream(splitstream, None)?;
122
123 drop(driver);
128
129 Ok(layer_id)
130 }
131 }
132
133 pub async fn ensure_config(
134 self: &Arc<Self>,
135 manifest_layers: &[Descriptor],
136 descriptor: &Descriptor,
137 ) -> Result<ContentAndVerity<ObjectID>> {
138 let config_sha256 = sha256_from_descriptor(descriptor)?;
139 if let Some(config_id) = self.repo.check_stream(&config_sha256)? {
140 self.progress.println(format!(
142 "Already have container config {}",
143 hex::encode(config_sha256)
144 ))?;
145 Ok((config_sha256, config_id))
146 } else {
147 self.progress
151 .println(format!("Fetching config {}", hex::encode(config_sha256)))?;
152
153 let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
154 let config = async move {
155 let mut s = Vec::new();
156 config.read_to_end(&mut s).await?;
157 anyhow::Ok(s)
158 };
159 let (config, driver) = tokio::join!(config, driver);
160 let _: () = driver?;
161 let raw_config = config?;
162 let config = ImageConfiguration::from_reader(&raw_config[..])?;
163
164 let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect();
167 layers.sort_by_key(|(mld, ..)| Reverse(mld.size()));
168
169 let threads = available_parallelism()?;
171 let sem = Arc::new(Semaphore::new(threads.into()));
172 let mut entries = vec![];
173 for (mld, diff_id) in layers {
174 let self_ = Arc::clone(self);
175 let permit = Arc::clone(&sem).acquire_owned().await?;
176 let layer_sha256 = sha256_from_digest(diff_id)?;
177 let descriptor = mld.clone();
178 let future = tokio::spawn(async move {
179 let _permit = permit;
180 self_.ensure_layer(layer_sha256, &descriptor).await
181 });
182 entries.push((layer_sha256, future));
183 }
184
185 let mut config_maps = DigestMap::new();
187 for (layer_sha256, future) in entries {
188 config_maps.insert(&layer_sha256, &future.await??);
189 }
190
191 let mut splitstream = self
192 .repo
193 .create_stream(Some(config_sha256), Some(config_maps));
194 splitstream.write_inline(&raw_config);
195 let config_id = self.repo.write_stream(splitstream, None)?;
196
197 Ok((config_sha256, config_id))
198 }
199 }
200
201 pub async fn pull(self: &Arc<Self>) -> Result<ContentAndVerity<ObjectID>> {
202 let (_manifest_digest, raw_manifest) = self
203 .proxy
204 .fetch_manifest_raw_oci(&self.img)
205 .await
206 .context("Fetching manifest")?;
207
208 let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?;
211 let config_descriptor = manifest.config();
212 let layers = manifest.layers();
213 self.ensure_config(layers, config_descriptor)
214 .await
215 .with_context(|| format!("Failed to pull config {config_descriptor:?}"))
216 }
217}
218
219pub async fn pull<ObjectID: FsVerityHashValue>(
222 repo: &Arc<Repository<ObjectID>>,
223 imgref: &str,
224 reference: Option<&str>,
225 img_proxy_config: Option<ImageProxyConfig>,
226) -> Result<(Sha256Digest, ObjectID)> {
227 let op = Arc::new(ImageOp::new(repo, imgref, img_proxy_config).await?);
228 let (sha256, id) = op
229 .pull()
230 .await
231 .with_context(|| format!("Unable to pull container image {imgref}"))?;
232
233 if let Some(name) = reference {
234 repo.name_stream(sha256, name)?;
235 }
236 Ok((sha256, id))
237}