Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

adds an optional encoding for serializing duplicate shreds #14362

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ tokio_io_01 = { version = "0.1", package = "tokio-io" }
tokio_codec_01 = { version = "0.1", package = "tokio-codec" }
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.6.0" }
trees = "0.2.1"
zstd = "0.5.1"

[dev-dependencies]
matches = "0.1.6"
Expand Down
53 changes: 14 additions & 39 deletions core/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::crds_value::sanitize_wallclock;
use crate::{crds_value::sanitize_wallclock, encode};
use itertools::Itertools;
use solana_ledger::{
blockstore_meta::DuplicateSlotProof,
shred::{Shred, ShredError, ShredType},
};
use solana_sdk::{
clock::Slot,
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
sanitize::{Sanitize, SanitizeError},
};
Expand All @@ -17,6 +18,8 @@ use std::{
use thiserror::Error;

const DUPLICATE_SHRED_HEADER_SIZE: usize = 63;
// An upper bound on bytes needed to send a duplicate shred proof.
const DUPLICATE_PROOF_DECODE_LIMIT: usize = 2 * PACKET_DATA_SIZE + 256;

/// Function returning leader at a given slot.
pub trait LeaderScheduleFn: FnOnce(Slot) -> Option<Pubkey> {}
Expand Down Expand Up @@ -49,10 +52,8 @@ pub struct DuplicateShredIndex {
pub enum Error {
#[error("data chunk mismatch")]
DataChunkMismatch,
#[error("decoding error")]
DecodingError(std::io::Error),
#[error("encoding error")]
EncodingError(std::io::Error),
EncodingError(#[from] encode::Error),
#[error("invalid chunk index")]
InvalidChunkIndex,
#[error("invalid duplicate shreds")]
Expand Down Expand Up @@ -117,7 +118,7 @@ pub fn from_duplicate_slot_proof(
leader: impl LeaderScheduleFn,
wallclock: u64,
max_size: usize, // Maximum serialized size of each DuplicateShred.
encoder: impl FnOnce(Vec<u8>) -> Result<Vec<u8>, std::io::Error>,
encode_options: encode::Options,
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
if proof.shred1 == proof.shred2 {
return Err(Error::InvalidDuplicateSlotProof);
Expand All @@ -130,8 +131,7 @@ pub fn from_duplicate_slot_proof(
shred1.index(),
shred1.common_header.shred_type,
);
let data = bincode::serialize(proof)?;
let data = encoder(data).map_err(Error::EncodingError)?;
let data = encode::encode(&proof, encode_options)?;
let chunk_size = if DUPLICATE_SHRED_HEADER_SIZE < max_size {
max_size - DUPLICATE_SHRED_HEADER_SIZE
} else {
Expand Down Expand Up @@ -184,7 +184,6 @@ fn check_chunk(
pub fn into_shreds(
chunks: impl IntoIterator<Item = DuplicateShred>,
leader: impl LeaderScheduleFn,
decoder: impl FnOnce(Vec<u8>) -> Result<Vec<u8>, std::io::Error>,
) -> Result<(Shred, Shred), Error> {
let mut chunks = chunks.into_iter();
let DuplicateShred {
Expand Down Expand Up @@ -220,8 +219,7 @@ pub fn into_shreds(
return Err(Error::MissingDataChunk);
}
let data = (0..num_chunks).map(|k| data.remove(&k).unwrap());
let data = decoder(data.concat()).map_err(Error::DecodingError)?;
let proof: DuplicateSlotProof = bincode::deserialize(&data)?;
let proof: DuplicateSlotProof = encode::decode(&data.concat(), DUPLICATE_PROOF_DECODE_LIMIT)?;
if proof.shred1 == proof.shred2 {
return Err(Error::InvalidDuplicateSlotProof);
}
Expand Down Expand Up @@ -269,9 +267,10 @@ impl From<&DuplicateShred> for DuplicateShredIndex {
#[cfg(test)]
mod tests {
use super::*;
use crate::encode::tests::new_rand_shred;
use rand::Rng;
use solana_ledger::{entry::Entry, shred::Shredder};
use solana_sdk::{hash, signature::Keypair, signature::Signer, system_transaction};
use solana_ledger::shred::Shredder;
use solana_sdk::{signature::Keypair, signature::Signer};
use std::sync::Arc;

#[test]
Expand All @@ -296,30 +295,6 @@ mod tests {
);
}

fn new_rand_shred<R: Rng>(rng: &mut R, next_shred_index: u32, shredder: &Shredder) -> Shred {
let entries: Vec<_> = std::iter::repeat_with(|| {
let tx = system_transaction::transfer(
&Keypair::new(), // from
&Pubkey::new_unique(), // to
rng.gen(), // lamports
hash::new_rand(rng), // recent blockhash
);
Entry::new(
&hash::new_rand(rng), // prev_hash
1, // num_hashes,
vec![tx], // transactions
)
})
.take(5)
.collect();
let (mut data_shreds, _coding_shreds, _last_shred_index) = shredder.entries_to_shreds(
&entries,
true, // is_last_in_slot
next_shred_index,
);
data_shreds.swap_remove(0)
}

#[test]
fn test_duplicate_shred_round_trip() {
let mut rng = rand::thread_rng();
Expand Down Expand Up @@ -355,12 +330,12 @@ mod tests {
leader,
rng.gen(), // wallclock
512, // max_size
Ok, // encoder
encode::Options::Zstd { level: 0 },
)
.unwrap()
.collect();
assert!(chunks.len() > 4);
let (shred3, shred4) = into_shreds(chunks, leader, Ok).unwrap();
assert!(chunks.len() > 3);
let (shred3, shred4) = into_shreds(chunks, leader).unwrap();
assert_eq!(shred1, shred3);
assert_eq!(shred2, shred4);
}
Expand Down
176 changes: 176 additions & 0 deletions core/src/encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use bincode::Options as _;
use flate2::{
read::{DeflateDecoder, GzDecoder, ZlibDecoder},
write::{DeflateEncoder, GzEncoder, ZlibEncoder},
Compression,
};
use serde::{de::DeserializeOwned, Serialize};
use std::io::{Read, Write};
use thiserror::Error;

#[derive(Deserialize, Serialize)]
enum Encoded {
Bincode(Vec<u8>),
Deflate(Vec<u8>),
Gzip(Vec<u8>),
Zlib(Vec<u8>),
Zstd(Vec<u8>),
}

#[derive(Clone, Copy)]
pub enum Options {
Bincode,
Deflate(Compression),
Gzip(Compression),
Zlib(Compression),
Zstd { level: i32 },
}

#[derive(Debug, Error)]
pub enum Error {
#[error("io error")]
IoError(#[from] std::io::Error),
#[error("(de)serialization error")]
SerializationError(#[from] bincode::Error),
}

impl Encoded {
fn len(&self) -> usize {
match self {
Encoded::Bincode(bytes) => bytes.len(),
Encoded::Deflate(bytes) => bytes.len(),
Encoded::Gzip(bytes) => bytes.len(),
Encoded::Zlib(bytes) => bytes.len(),
Encoded::Zstd(bytes) => bytes.len(),
}
}
}

fn encode_zstd(data: &[u8], level: i32) -> std::io::Result<Vec<u8>> {
let mut buffer = Vec::with_capacity(data.len() * 2);
let mut encoder = zstd::stream::write::Encoder::new(&mut buffer, level)?;
encoder.write_all(data)?;
encoder.finish()?;
Ok(buffer)
}

macro_rules! encode {
($encoder:ident, $bytes:ident, $compression:ident) => {{
fn encode(data: &[u8], compression: Compression) -> std::io::Result<Vec<u8>> {
let mut buffer = Vec::with_capacity(data.len() * 2);
let mut encoder = $encoder::new(&mut buffer, compression);
encoder.write_all(data)?;
encoder.finish()?;
Ok(buffer)
}
encode(&$bytes[..], $compression).ok()
}};
}

pub fn encode<T: Serialize>(obj: &T, options: Options) -> bincode::Result<Vec<u8>> {
let bytes = bincode::options().serialize(obj)?;
let encoded = match options {
Options::Bincode => None,
Options::Deflate(opts) => encode!(DeflateEncoder, bytes, opts).map(Encoded::Deflate),
Options::Gzip(opts) => encode!(GzEncoder, bytes, opts).map(Encoded::Gzip),
Options::Zlib(opts) => encode!(ZlibEncoder, bytes, opts).map(Encoded::Zlib),
Options::Zstd { level } => encode_zstd(&bytes, level).map(Encoded::Zstd).ok(),
};
let encoded = match encoded {
Some(encoded) if encoded.len() < bytes.len() => encoded,
_ => Encoded::Bincode(bytes),
};
bincode::options().serialize(&encoded)
}

pub fn decode<T: DeserializeOwned>(
bytes: &[u8],
limit: usize, // Limit maximum number of bytes decoded.
) -> Result<T, Error> {
let encoded = bincode::options().deserialize_from(bytes)?;
let decoder: Box<dyn Read> = match &encoded {
Encoded::Bincode(bytes) => Box::new(&bytes[..]),
Encoded::Deflate(bytes) => Box::new(DeflateDecoder::new(&bytes[..])),
Encoded::Gzip(bytes) => Box::new(GzDecoder::new(&bytes[..])),
Encoded::Zlib(bytes) => Box::new(ZlibDecoder::new(&bytes[..])),
Encoded::Zstd(bytes) => Box::new(zstd::stream::read::Decoder::new(&bytes[..])?),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these decoders hardened against zip-bomb-like behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I raised that question to the upstream package; and meanwhile, added a limit here for number of bytes decoded.

};
Ok(bincode::options()
.with_limit(limit as u64)
.deserialize_from(decoder)?)
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
use rand::Rng;
use solana_ledger::{
blockstore_meta::DuplicateSlotProof,
entry::Entry,
shred::{Shred, Shredder},
};
use solana_sdk::{hash, pubkey::Pubkey, signature::Keypair, system_transaction};
use std::sync::Arc;

pub fn new_rand_shred<R: Rng>(
rng: &mut R,
next_shred_index: u32,
shredder: &Shredder,
) -> Shred {
let entries: Vec<_> = std::iter::repeat_with(|| {
let tx = system_transaction::transfer(
&Keypair::new(), // from
&Pubkey::new_unique(), // to
rng.gen(), // lamports
hash::new_rand(rng), // recent blockhash
);
Entry::new(
&hash::new_rand(rng), // prev_hash
1, // num_hashes,
vec![tx], // transactions
)
})
.take(5)
.collect();
let (mut data_shreds, _coding_shreds, _last_shred_index) = shredder.entries_to_shreds(
&entries,
true, // is_last_in_slot
next_shred_index,
);
data_shreds.swap_remove(0)
}

#[test]
fn test_encode_round_trip() {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, fec_rate, reference_tick, version) =
(53084024, 53084023, 0.0, 0, 0);
let shredder =
Shredder::new(slot, parent_slot, fec_rate, leader, reference_tick, version).unwrap();
let next_shred_index = rng.gen();
let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder);
let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder);
let proof = DuplicateSlotProof {
shred1: shred1.payload,
shred2: shred2.payload,
};
let options = vec![
Options::Bincode,
Options::Deflate(Compression::default()),
Options::Deflate(Compression::best()),
Options::Gzip(Compression::default()),
Options::Gzip(Compression::best()),
Options::Zlib(Compression::default()),
Options::Zlib(Compression::best()),
Options::Zstd { level: 0 },
Options::Zstd { level: 9 },
];
for opts in options {
let bytes = encode(&proof, opts).unwrap();
let other: DuplicateSlotProof = decode(&bytes[..], 4096).unwrap();
assert_eq!(proof.shred1, other.shred1);
assert_eq!(proof.shred2, other.shred2);
}
}
}
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod crds_shards;
pub mod crds_value;
pub mod data_budget;
pub mod duplicate_shred;
pub mod encode;
pub mod epoch_slots;
pub mod fetch_stage;
pub mod fork_choice;
Expand Down