composefs/
splitstream.rs

1//! Split Stream file format implementation.
2//!
3//! This module implements the Split Stream format for efficiently storing
4//! and transferring data with inline content and external object references,
5//! supporting compression and content deduplication.
6
7/* Implementation of the Split Stream file format
8 *
9 * See doc/splitstream.md
10 */
11
12use std::{
13    io::{BufReader, Read, Write},
14    sync::Arc,
15};
16
17use anyhow::{bail, Result};
18use sha2::{Digest, Sha256};
19use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
20use zstd::stream::{read::Decoder, write::Encoder};
21
22use crate::{
23    fsverity::FsVerityHashValue,
24    repository::Repository,
25    util::{read_exactish, Sha256Digest},
26};
27
28/// A single entry in the digest map, mapping content SHA256 hash to fs-verity object ID.
29#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
30#[repr(C)]
31pub struct DigestMapEntry<ObjectID: FsVerityHashValue> {
32    /// SHA256 hash of the content body
33    pub body: Sha256Digest,
34    /// fs-verity object identifier
35    pub verity: ObjectID,
36}
37
38/// A map of content digests to object IDs, maintained in sorted order for binary search.
39#[derive(Debug)]
40pub struct DigestMap<ObjectID: FsVerityHashValue> {
41    /// Vector of digest map entries, kept sorted by body hash
42    pub map: Vec<DigestMapEntry<ObjectID>>,
43}
44
45impl<ObjectID: FsVerityHashValue> Default for DigestMap<ObjectID> {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl<ObjectID: FsVerityHashValue> DigestMap<ObjectID> {
52    /// Creates a new empty digest map.
53    pub fn new() -> Self {
54        DigestMap { map: vec![] }
55    }
56
57    /// Looks up an object ID by its content SHA256 hash.
58    ///
59    /// Returns the object ID if found, or None if not present in the map.
60    pub fn lookup(&self, body: &Sha256Digest) -> Option<&ObjectID> {
61        match self.map.binary_search_by_key(body, |e| e.body) {
62            Ok(idx) => Some(&self.map[idx].verity),
63            Err(..) => None,
64        }
65    }
66
67    /// Inserts a new digest mapping, maintaining sorted order.
68    ///
69    /// If the body hash already exists, asserts that the verity ID matches.
70    pub fn insert(&mut self, body: &Sha256Digest, verity: &ObjectID) {
71        match self.map.binary_search_by_key(body, |e| e.body) {
72            Ok(idx) => assert_eq!(self.map[idx].verity, *verity), // or else, bad things...
73            Err(idx) => self.map.insert(
74                idx,
75                DigestMapEntry {
76                    body: *body,
77                    verity: verity.clone(),
78                },
79            ),
80        }
81    }
82}
83
84/// Writer for creating split stream format files with inline content and external object references.
85pub struct SplitStreamWriter<ObjectID: FsVerityHashValue> {
86    repo: Arc<Repository<ObjectID>>,
87    inline_content: Vec<u8>,
88    writer: Encoder<'static, Vec<u8>>,
89    /// Optional SHA256 hasher and expected digest for validation
90    pub sha256: Option<(Sha256, Sha256Digest)>,
91}
92
93impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamWriter<ObjectID> {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        // writer doesn't impl Debug
96        f.debug_struct("SplitStreamWriter")
97            .field("repo", &self.repo)
98            .field("inline_content", &self.inline_content)
99            .field("sha256", &self.sha256)
100            .finish()
101    }
102}
103
104impl<ObjectID: FsVerityHashValue> SplitStreamWriter<ObjectID> {
105    /// Creates a new split stream writer.
106    ///
107    /// The writer is initialized with optional digest map references and an optional
108    /// expected SHA256 hash for validation when the stream is finalized.
109    pub fn new(
110        repo: &Arc<Repository<ObjectID>>,
111        refs: Option<DigestMap<ObjectID>>,
112        sha256: Option<Sha256Digest>,
113    ) -> Self {
114        // SAFETY: we surely can't get an error writing the header to a Vec<u8>
115        let mut writer = Encoder::new(vec![], 0).unwrap();
116
117        match refs {
118            Some(DigestMap { map }) => {
119                writer.write_all(&(map.len() as u64).to_le_bytes()).unwrap();
120                writer.write_all(map.as_bytes()).unwrap();
121            }
122            None => {
123                writer.write_all(&0u64.to_le_bytes()).unwrap();
124            }
125        }
126
127        Self {
128            repo: Arc::clone(repo),
129            inline_content: vec![],
130            writer,
131            sha256: sha256.map(|x| (Sha256::new(), x)),
132        }
133    }
134
135    fn write_fragment(writer: &mut impl Write, size: usize, data: &[u8]) -> Result<()> {
136        writer.write_all(&(size as u64).to_le_bytes())?;
137        Ok(writer.write_all(data)?)
138    }
139
140    /// flush any buffered inline data, taking new_value as the new value of the buffer
141    fn flush_inline(&mut self, new_value: Vec<u8>) -> Result<()> {
142        if !self.inline_content.is_empty() {
143            Self::write_fragment(
144                &mut self.writer,
145                self.inline_content.len(),
146                &self.inline_content,
147            )?;
148            self.inline_content = new_value;
149        }
150        Ok(())
151    }
152
153    /// really, "add inline content to the buffer"
154    /// you need to call .flush_inline() later
155    pub fn write_inline(&mut self, data: &[u8]) {
156        if let Some((ref mut sha256, ..)) = self.sha256 {
157            sha256.update(data);
158        }
159        self.inline_content.extend(data);
160    }
161
162    /// write a reference to external data to the stream.  If the external data had padding in the
163    /// stream which is not stored in the object then pass it here as well and it will be stored
164    /// inline after the reference.
165    fn write_reference(&mut self, reference: &ObjectID, padding: Vec<u8>) -> Result<()> {
166        // Flush the inline data before we store the external reference.  Any padding from the
167        // external data becomes the start of a new inline block.
168        self.flush_inline(padding)?;
169
170        Self::write_fragment(&mut self.writer, 0, reference.as_bytes())
171    }
172
173    /// Writes data as an external object reference with optional padding.
174    ///
175    /// The data is stored in the repository and a reference is written to the stream.
176    /// Any padding bytes are stored inline after the reference.
177    pub fn write_external(&mut self, data: &[u8], padding: Vec<u8>) -> Result<()> {
178        if let Some((ref mut sha256, ..)) = self.sha256 {
179            sha256.update(data);
180            sha256.update(&padding);
181        }
182        let id = self.repo.ensure_object(data)?;
183        self.write_reference(&id, padding)
184    }
185
186    /// Asynchronously writes data as an external object reference with optional padding.
187    ///
188    /// The data is stored in the repository asynchronously and a reference is written to the stream.
189    /// Any padding bytes are stored inline after the reference.
190    pub async fn write_external_async(&mut self, data: Vec<u8>, padding: Vec<u8>) -> Result<()> {
191        if let Some((ref mut sha256, ..)) = self.sha256 {
192            sha256.update(&data);
193            sha256.update(&padding);
194        }
195        let id = self.repo.ensure_object_async(data).await?;
196        self.write_reference(&id, padding)
197    }
198
199    /// Finalizes the split stream and returns its object ID.
200    ///
201    /// Flushes any remaining inline content, validates the SHA256 hash if provided,
202    /// and stores the compressed stream in the repository.
203    pub fn done(mut self) -> Result<ObjectID> {
204        self.flush_inline(vec![])?;
205
206        if let Some((context, expected)) = self.sha256 {
207            if Into::<Sha256Digest>::into(context.finalize()) != expected {
208                bail!("Content doesn't have expected SHA256 hash value!");
209            }
210        }
211
212        self.repo.ensure_object(&self.writer.finish()?)
213    }
214}
215
216/// Data fragment from a split stream, either inline content or an external object reference.
217#[derive(Debug)]
218pub enum SplitStreamData<ObjectID: FsVerityHashValue> {
219    /// Inline content stored directly in the stream
220    Inline(Box<[u8]>),
221    /// Reference to an external object
222    External(ObjectID),
223}
224
225/// Reader for parsing split stream format files with inline content and external object references.
226pub struct SplitStreamReader<R: Read, ObjectID: FsVerityHashValue> {
227    decoder: Decoder<'static, BufReader<R>>,
228    /// Digest map containing content hash to object ID mappings
229    pub refs: DigestMap<ObjectID>,
230    inline_bytes: usize,
231}
232
233impl<R: Read, ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamReader<R, ObjectID> {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        // decoder doesn't impl Debug
236        f.debug_struct("SplitStreamReader")
237            .field("refs", &self.refs)
238            .field("inline_bytes", &self.inline_bytes)
239            .finish()
240    }
241}
242
243fn read_u64_le<R: Read>(reader: &mut R) -> Result<Option<usize>> {
244    let mut buf = [0u8; 8];
245    if read_exactish(reader, &mut buf)? {
246        Ok(Some(u64::from_le_bytes(buf) as usize))
247    } else {
248        Ok(None)
249    }
250}
251
252/// Using the provided [`vec`] as a buffer, read exactly [`size`]
253/// bytes of content from [`reader`] into it. Any existing content
254/// in [`vec`] will be discarded; however its capacity will be reused,
255/// making this function suitable for use in loops.
256fn read_into_vec(reader: &mut impl Read, vec: &mut Vec<u8>, size: usize) -> Result<()> {
257    vec.resize(size, 0u8);
258    reader.read_exact(vec.as_mut_slice())?;
259    Ok(())
260}
261
262enum ChunkType<ObjectID: FsVerityHashValue> {
263    Eof,
264    Inline,
265    External(ObjectID),
266}
267
268impl<R: Read, ObjectID: FsVerityHashValue> SplitStreamReader<R, ObjectID> {
269    /// Creates a new split stream reader from the provided reader.
270    ///
271    /// Reads the digest map header from the stream during initialization.
272    pub fn new(reader: R) -> Result<Self> {
273        let mut decoder = Decoder::new(reader)?;
274
275        let n_map_entries = {
276            let mut buf = [0u8; 8];
277            decoder.read_exact(&mut buf)?;
278            u64::from_le_bytes(buf)
279        } as usize;
280
281        let mut refs = DigestMap::<ObjectID> {
282            map: Vec::with_capacity(n_map_entries),
283        };
284        for _ in 0..n_map_entries {
285            refs.map.push(DigestMapEntry::read_from_io(&mut decoder)?);
286        }
287
288        Ok(Self {
289            decoder,
290            refs,
291            inline_bytes: 0,
292        })
293    }
294
295    fn ensure_chunk(
296        &mut self,
297        eof_ok: bool,
298        ext_ok: bool,
299        expected_bytes: usize,
300    ) -> Result<ChunkType<ObjectID>> {
301        if self.inline_bytes == 0 {
302            match read_u64_le(&mut self.decoder)? {
303                None => {
304                    if !eof_ok {
305                        bail!("Unexpected EOF when parsing splitstream");
306                    }
307                    return Ok(ChunkType::Eof);
308                }
309                Some(0) => {
310                    if !ext_ok {
311                        bail!("Unexpected external reference when parsing splitstream");
312                    }
313                    let id = ObjectID::read_from_io(&mut self.decoder)?;
314                    return Ok(ChunkType::External(id));
315                }
316                Some(size) => {
317                    self.inline_bytes = size;
318                }
319            }
320        }
321
322        if self.inline_bytes < expected_bytes {
323            bail!("Unexpectedly small inline content when parsing splitstream");
324        }
325
326        Ok(ChunkType::Inline)
327    }
328
329    /// Reads the exact number of inline bytes
330    /// Assumes that the data cannot be split across chunks
331    pub fn read_inline_exact(&mut self, buffer: &mut [u8]) -> Result<bool> {
332        if let ChunkType::Inline = self.ensure_chunk(true, false, buffer.len())? {
333            self.decoder.read_exact(buffer)?;
334            self.inline_bytes -= buffer.len();
335            Ok(true)
336        } else {
337            Ok(false)
338        }
339    }
340
341    fn discard_padding(&mut self, size: usize) -> Result<()> {
342        let mut buf = [0u8; 512];
343        assert!(size <= 512);
344        self.ensure_chunk(false, false, size)?;
345        self.decoder.read_exact(&mut buf[0..size])?;
346        self.inline_bytes -= size;
347        Ok(())
348    }
349
350    /// Reads an exact amount of data, which may be inline or external.
351    ///
352    /// The stored_size is the size as recorded in the stream (including any padding),
353    /// while actual_size is the actual content size without padding.
354    /// Returns either inline content or an external object reference.
355    pub fn read_exact(
356        &mut self,
357        actual_size: usize,
358        stored_size: usize,
359    ) -> Result<SplitStreamData<ObjectID>> {
360        if let ChunkType::External(id) = self.ensure_chunk(false, true, stored_size)? {
361            // ...and the padding
362            if actual_size < stored_size {
363                self.discard_padding(stored_size - actual_size)?;
364            }
365            Ok(SplitStreamData::External(id))
366        } else {
367            let mut content = vec![];
368            read_into_vec(&mut self.decoder, &mut content, stored_size)?;
369            content.truncate(actual_size);
370            self.inline_bytes -= stored_size;
371            Ok(SplitStreamData::Inline(content.into()))
372        }
373    }
374
375    /// Concatenates the entire split stream content to the output writer.
376    ///
377    /// Inline content is written directly, while external references are resolved
378    /// using the provided load_data callback function.
379    pub fn cat(
380        &mut self,
381        output: &mut impl Write,
382        mut load_data: impl FnMut(&ObjectID) -> Result<Vec<u8>>,
383    ) -> Result<()> {
384        let mut buffer = vec![];
385
386        loop {
387            match self.ensure_chunk(true, true, 0)? {
388                ChunkType::Eof => break Ok(()),
389                ChunkType::Inline => {
390                    read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?;
391                    self.inline_bytes = 0;
392                    output.write_all(&buffer)?;
393                }
394                ChunkType::External(ref id) => {
395                    output.write_all(&load_data(id)?)?;
396                }
397            }
398        }
399    }
400
401    /// Traverses the split stream and calls the callback for each object reference.
402    ///
403    /// This includes both references from the digest map and external references in the stream.
404    pub fn get_object_refs(&mut self, mut callback: impl FnMut(&ObjectID)) -> Result<()> {
405        let mut buffer = vec![];
406
407        for entry in &self.refs.map {
408            callback(&entry.verity);
409        }
410
411        loop {
412            match self.ensure_chunk(true, true, 0)? {
413                ChunkType::Eof => break Ok(()),
414                ChunkType::Inline => {
415                    read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?;
416                    self.inline_bytes = 0;
417                }
418                ChunkType::External(ref id) => {
419                    callback(id);
420                }
421            }
422        }
423    }
424
425    /// Calls the callback for each content hash in the digest map.
426    pub fn get_stream_refs(&mut self, mut callback: impl FnMut(&Sha256Digest)) {
427        for entry in &self.refs.map {
428            callback(&entry.body);
429        }
430    }
431
432    /// Looks up an object ID by content hash in the digest map.
433    ///
434    /// Returns an error if the reference is not found.
435    pub fn lookup(&self, body: &Sha256Digest) -> Result<&ObjectID> {
436        match self.refs.lookup(body) {
437            Some(id) => Ok(id),
438            None => bail!("Reference is not found in splitstream"),
439        }
440    }
441}
442
443impl<F: Read, ObjectID: FsVerityHashValue> Read for SplitStreamReader<F, ObjectID> {
444    fn read(&mut self, data: &mut [u8]) -> std::io::Result<usize> {
445        match self.ensure_chunk(true, false, 1) {
446            Ok(ChunkType::Eof) => Ok(0),
447            Ok(ChunkType::Inline) => {
448                let n_bytes = std::cmp::min(data.len(), self.inline_bytes);
449                self.decoder.read_exact(&mut data[0..n_bytes])?;
450                self.inline_bytes -= n_bytes;
451                Ok(n_bytes)
452            }
453            Ok(ChunkType::External(..)) => unreachable!(),
454            Err(e) => Err(std::io::Error::other(e)),
455        }
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462    use std::io::Cursor;
463
464    #[test]
465    fn test_read_into_vec() -> Result<()> {
466        // Test with an empty reader
467        let mut reader = Cursor::new(vec![]);
468        let mut vec = Vec::new();
469        let result = read_into_vec(&mut reader, &mut vec, 0);
470        assert!(result.is_ok());
471        assert_eq!(vec.len(), 0);
472
473        // Test with a reader that has some data
474        let mut reader = Cursor::new(vec![1, 2, 3, 4, 5]);
475        let mut vec = Vec::new();
476        let result = read_into_vec(&mut reader, &mut vec, 3);
477        assert!(result.is_ok());
478        assert_eq!(vec.len(), 3);
479        assert_eq!(vec, vec![1, 2, 3]);
480
481        // Test reading more than the reader has
482        let mut reader = Cursor::new(vec![1, 2, 3]);
483        let mut vec = Vec::new();
484        let result = read_into_vec(&mut reader, &mut vec, 5);
485        assert!(result.is_err());
486
487        // Test reading exactly what the reader has
488        let mut reader = Cursor::new(vec![1, 2, 3]);
489        let mut vec = Vec::new();
490        let result = read_into_vec(&mut reader, &mut vec, 3);
491        assert!(result.is_ok());
492        assert_eq!(vec.len(), 3);
493        assert_eq!(vec, vec![1, 2, 3]);
494
495        // Test reading into a vector with existing capacity
496        let mut reader = Cursor::new(vec![1, 2, 3, 4, 5]);
497        let mut vec = Vec::with_capacity(10);
498        let result = read_into_vec(&mut reader, &mut vec, 4);
499        assert!(result.is_ok());
500        assert_eq!(vec.len(), 4);
501        assert_eq!(vec, vec![1, 2, 3, 4]);
502        assert_eq!(vec.capacity(), 10);
503
504        // Test reading into a vector with existing data
505        let mut reader = Cursor::new(vec![1, 2, 3]);
506        let mut vec = vec![9, 9, 9];
507        let result = read_into_vec(&mut reader, &mut vec, 2);
508        assert!(result.is_ok());
509        assert_eq!(vec.len(), 2);
510        assert_eq!(vec, vec![1, 2]);
511
512        Ok(())
513    }
514}