1use std::{
13 io::{BufReader, Read, Write},
14 sync::Arc,
15};
16
17use anyhow::{bail, Result};
18use sha2::{Digest, Sha256};
19use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
20use zstd::stream::{read::Decoder, write::Encoder};
21
22use crate::{
23 fsverity::FsVerityHashValue,
24 repository::Repository,
25 util::{read_exactish, Sha256Digest},
26};
27
28#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
30#[repr(C)]
31pub struct DigestMapEntry<ObjectID: FsVerityHashValue> {
32 pub body: Sha256Digest,
34 pub verity: ObjectID,
36}
37
38#[derive(Debug)]
40pub struct DigestMap<ObjectID: FsVerityHashValue> {
41 pub map: Vec<DigestMapEntry<ObjectID>>,
43}
44
45impl<ObjectID: FsVerityHashValue> Default for DigestMap<ObjectID> {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl<ObjectID: FsVerityHashValue> DigestMap<ObjectID> {
52 pub fn new() -> Self {
54 DigestMap { map: vec![] }
55 }
56
57 pub fn lookup(&self, body: &Sha256Digest) -> Option<&ObjectID> {
61 match self.map.binary_search_by_key(body, |e| e.body) {
62 Ok(idx) => Some(&self.map[idx].verity),
63 Err(..) => None,
64 }
65 }
66
67 pub fn insert(&mut self, body: &Sha256Digest, verity: &ObjectID) {
71 match self.map.binary_search_by_key(body, |e| e.body) {
72 Ok(idx) => assert_eq!(self.map[idx].verity, *verity), Err(idx) => self.map.insert(
74 idx,
75 DigestMapEntry {
76 body: *body,
77 verity: verity.clone(),
78 },
79 ),
80 }
81 }
82}
83
84pub struct SplitStreamWriter<ObjectID: FsVerityHashValue> {
86 repo: Arc<Repository<ObjectID>>,
87 inline_content: Vec<u8>,
88 writer: Encoder<'static, Vec<u8>>,
89 pub sha256: Option<(Sha256, Sha256Digest)>,
91}
92
93impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamWriter<ObjectID> {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 f.debug_struct("SplitStreamWriter")
97 .field("repo", &self.repo)
98 .field("inline_content", &self.inline_content)
99 .field("sha256", &self.sha256)
100 .finish()
101 }
102}
103
104impl<ObjectID: FsVerityHashValue> SplitStreamWriter<ObjectID> {
105 pub fn new(
110 repo: &Arc<Repository<ObjectID>>,
111 refs: Option<DigestMap<ObjectID>>,
112 sha256: Option<Sha256Digest>,
113 ) -> Self {
114 let mut writer = Encoder::new(vec![], 0).unwrap();
116
117 match refs {
118 Some(DigestMap { map }) => {
119 writer.write_all(&(map.len() as u64).to_le_bytes()).unwrap();
120 writer.write_all(map.as_bytes()).unwrap();
121 }
122 None => {
123 writer.write_all(&0u64.to_le_bytes()).unwrap();
124 }
125 }
126
127 Self {
128 repo: Arc::clone(repo),
129 inline_content: vec![],
130 writer,
131 sha256: sha256.map(|x| (Sha256::new(), x)),
132 }
133 }
134
135 fn write_fragment(writer: &mut impl Write, size: usize, data: &[u8]) -> Result<()> {
136 writer.write_all(&(size as u64).to_le_bytes())?;
137 Ok(writer.write_all(data)?)
138 }
139
140 fn flush_inline(&mut self, new_value: Vec<u8>) -> Result<()> {
142 if !self.inline_content.is_empty() {
143 Self::write_fragment(
144 &mut self.writer,
145 self.inline_content.len(),
146 &self.inline_content,
147 )?;
148 self.inline_content = new_value;
149 }
150 Ok(())
151 }
152
153 pub fn write_inline(&mut self, data: &[u8]) {
156 if let Some((ref mut sha256, ..)) = self.sha256 {
157 sha256.update(data);
158 }
159 self.inline_content.extend(data);
160 }
161
162 fn write_reference(&mut self, reference: &ObjectID, padding: Vec<u8>) -> Result<()> {
166 self.flush_inline(padding)?;
169
170 Self::write_fragment(&mut self.writer, 0, reference.as_bytes())
171 }
172
173 pub fn write_external(&mut self, data: &[u8], padding: Vec<u8>) -> Result<()> {
178 if let Some((ref mut sha256, ..)) = self.sha256 {
179 sha256.update(data);
180 sha256.update(&padding);
181 }
182 let id = self.repo.ensure_object(data)?;
183 self.write_reference(&id, padding)
184 }
185
186 pub async fn write_external_async(&mut self, data: Vec<u8>, padding: Vec<u8>) -> Result<()> {
191 if let Some((ref mut sha256, ..)) = self.sha256 {
192 sha256.update(&data);
193 sha256.update(&padding);
194 }
195 let id = self.repo.ensure_object_async(data).await?;
196 self.write_reference(&id, padding)
197 }
198
199 pub fn done(mut self) -> Result<ObjectID> {
204 self.flush_inline(vec![])?;
205
206 if let Some((context, expected)) = self.sha256 {
207 if Into::<Sha256Digest>::into(context.finalize()) != expected {
208 bail!("Content doesn't have expected SHA256 hash value!");
209 }
210 }
211
212 self.repo.ensure_object(&self.writer.finish()?)
213 }
214}
215
216#[derive(Debug)]
218pub enum SplitStreamData<ObjectID: FsVerityHashValue> {
219 Inline(Box<[u8]>),
221 External(ObjectID),
223}
224
225pub struct SplitStreamReader<R: Read, ObjectID: FsVerityHashValue> {
227 decoder: Decoder<'static, BufReader<R>>,
228 pub refs: DigestMap<ObjectID>,
230 inline_bytes: usize,
231}
232
233impl<R: Read, ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamReader<R, ObjectID> {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 f.debug_struct("SplitStreamReader")
237 .field("refs", &self.refs)
238 .field("inline_bytes", &self.inline_bytes)
239 .finish()
240 }
241}
242
243fn read_u64_le<R: Read>(reader: &mut R) -> Result<Option<usize>> {
244 let mut buf = [0u8; 8];
245 if read_exactish(reader, &mut buf)? {
246 Ok(Some(u64::from_le_bytes(buf) as usize))
247 } else {
248 Ok(None)
249 }
250}
251
252fn read_into_vec(reader: &mut impl Read, vec: &mut Vec<u8>, size: usize) -> Result<()> {
257 vec.resize(size, 0u8);
258 reader.read_exact(vec.as_mut_slice())?;
259 Ok(())
260}
261
262enum ChunkType<ObjectID: FsVerityHashValue> {
263 Eof,
264 Inline,
265 External(ObjectID),
266}
267
268impl<R: Read, ObjectID: FsVerityHashValue> SplitStreamReader<R, ObjectID> {
269 pub fn new(reader: R) -> Result<Self> {
273 let mut decoder = Decoder::new(reader)?;
274
275 let n_map_entries = {
276 let mut buf = [0u8; 8];
277 decoder.read_exact(&mut buf)?;
278 u64::from_le_bytes(buf)
279 } as usize;
280
281 let mut refs = DigestMap::<ObjectID> {
282 map: Vec::with_capacity(n_map_entries),
283 };
284 for _ in 0..n_map_entries {
285 refs.map.push(DigestMapEntry::read_from_io(&mut decoder)?);
286 }
287
288 Ok(Self {
289 decoder,
290 refs,
291 inline_bytes: 0,
292 })
293 }
294
295 fn ensure_chunk(
296 &mut self,
297 eof_ok: bool,
298 ext_ok: bool,
299 expected_bytes: usize,
300 ) -> Result<ChunkType<ObjectID>> {
301 if self.inline_bytes == 0 {
302 match read_u64_le(&mut self.decoder)? {
303 None => {
304 if !eof_ok {
305 bail!("Unexpected EOF when parsing splitstream");
306 }
307 return Ok(ChunkType::Eof);
308 }
309 Some(0) => {
310 if !ext_ok {
311 bail!("Unexpected external reference when parsing splitstream");
312 }
313 let id = ObjectID::read_from_io(&mut self.decoder)?;
314 return Ok(ChunkType::External(id));
315 }
316 Some(size) => {
317 self.inline_bytes = size;
318 }
319 }
320 }
321
322 if self.inline_bytes < expected_bytes {
323 bail!("Unexpectedly small inline content when parsing splitstream");
324 }
325
326 Ok(ChunkType::Inline)
327 }
328
329 pub fn read_inline_exact(&mut self, buffer: &mut [u8]) -> Result<bool> {
332 if let ChunkType::Inline = self.ensure_chunk(true, false, buffer.len())? {
333 self.decoder.read_exact(buffer)?;
334 self.inline_bytes -= buffer.len();
335 Ok(true)
336 } else {
337 Ok(false)
338 }
339 }
340
341 fn discard_padding(&mut self, size: usize) -> Result<()> {
342 let mut buf = [0u8; 512];
343 assert!(size <= 512);
344 self.ensure_chunk(false, false, size)?;
345 self.decoder.read_exact(&mut buf[0..size])?;
346 self.inline_bytes -= size;
347 Ok(())
348 }
349
350 pub fn read_exact(
356 &mut self,
357 actual_size: usize,
358 stored_size: usize,
359 ) -> Result<SplitStreamData<ObjectID>> {
360 if let ChunkType::External(id) = self.ensure_chunk(false, true, stored_size)? {
361 if actual_size < stored_size {
363 self.discard_padding(stored_size - actual_size)?;
364 }
365 Ok(SplitStreamData::External(id))
366 } else {
367 let mut content = vec![];
368 read_into_vec(&mut self.decoder, &mut content, stored_size)?;
369 content.truncate(actual_size);
370 self.inline_bytes -= stored_size;
371 Ok(SplitStreamData::Inline(content.into()))
372 }
373 }
374
375 pub fn cat(
380 &mut self,
381 output: &mut impl Write,
382 mut load_data: impl FnMut(&ObjectID) -> Result<Vec<u8>>,
383 ) -> Result<()> {
384 let mut buffer = vec![];
385
386 loop {
387 match self.ensure_chunk(true, true, 0)? {
388 ChunkType::Eof => break Ok(()),
389 ChunkType::Inline => {
390 read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?;
391 self.inline_bytes = 0;
392 output.write_all(&buffer)?;
393 }
394 ChunkType::External(ref id) => {
395 output.write_all(&load_data(id)?)?;
396 }
397 }
398 }
399 }
400
401 pub fn get_object_refs(&mut self, mut callback: impl FnMut(&ObjectID)) -> Result<()> {
405 let mut buffer = vec![];
406
407 for entry in &self.refs.map {
408 callback(&entry.verity);
409 }
410
411 loop {
412 match self.ensure_chunk(true, true, 0)? {
413 ChunkType::Eof => break Ok(()),
414 ChunkType::Inline => {
415 read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?;
416 self.inline_bytes = 0;
417 }
418 ChunkType::External(ref id) => {
419 callback(id);
420 }
421 }
422 }
423 }
424
425 pub fn get_stream_refs(&mut self, mut callback: impl FnMut(&Sha256Digest)) {
427 for entry in &self.refs.map {
428 callback(&entry.body);
429 }
430 }
431
432 pub fn lookup(&self, body: &Sha256Digest) -> Result<&ObjectID> {
436 match self.refs.lookup(body) {
437 Some(id) => Ok(id),
438 None => bail!("Reference is not found in splitstream"),
439 }
440 }
441}
442
443impl<F: Read, ObjectID: FsVerityHashValue> Read for SplitStreamReader<F, ObjectID> {
444 fn read(&mut self, data: &mut [u8]) -> std::io::Result<usize> {
445 match self.ensure_chunk(true, false, 1) {
446 Ok(ChunkType::Eof) => Ok(0),
447 Ok(ChunkType::Inline) => {
448 let n_bytes = std::cmp::min(data.len(), self.inline_bytes);
449 self.decoder.read_exact(&mut data[0..n_bytes])?;
450 self.inline_bytes -= n_bytes;
451 Ok(n_bytes)
452 }
453 Ok(ChunkType::External(..)) => unreachable!(),
454 Err(e) => Err(std::io::Error::other(e)),
455 }
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use std::io::Cursor;
463
464 #[test]
465 fn test_read_into_vec() -> Result<()> {
466 let mut reader = Cursor::new(vec![]);
468 let mut vec = Vec::new();
469 let result = read_into_vec(&mut reader, &mut vec, 0);
470 assert!(result.is_ok());
471 assert_eq!(vec.len(), 0);
472
473 let mut reader = Cursor::new(vec![1, 2, 3, 4, 5]);
475 let mut vec = Vec::new();
476 let result = read_into_vec(&mut reader, &mut vec, 3);
477 assert!(result.is_ok());
478 assert_eq!(vec.len(), 3);
479 assert_eq!(vec, vec![1, 2, 3]);
480
481 let mut reader = Cursor::new(vec![1, 2, 3]);
483 let mut vec = Vec::new();
484 let result = read_into_vec(&mut reader, &mut vec, 5);
485 assert!(result.is_err());
486
487 let mut reader = Cursor::new(vec![1, 2, 3]);
489 let mut vec = Vec::new();
490 let result = read_into_vec(&mut reader, &mut vec, 3);
491 assert!(result.is_ok());
492 assert_eq!(vec.len(), 3);
493 assert_eq!(vec, vec![1, 2, 3]);
494
495 let mut reader = Cursor::new(vec![1, 2, 3, 4, 5]);
497 let mut vec = Vec::with_capacity(10);
498 let result = read_into_vec(&mut reader, &mut vec, 4);
499 assert!(result.is_ok());
500 assert_eq!(vec.len(), 4);
501 assert_eq!(vec, vec![1, 2, 3, 4]);
502 assert_eq!(vec.capacity(), 10);
503
504 let mut reader = Cursor::new(vec![1, 2, 3]);
506 let mut vec = vec![9, 9, 9];
507 let result = read_into_vec(&mut reader, &mut vec, 2);
508 assert!(result.is_ok());
509 assert_eq!(vec.len(), 2);
510 assert_eq!(vec, vec![1, 2]);
511
512 Ok(())
513 }
514}