Skip to content

Commit

Permalink
Merge branch 'main' into replace-builder
Browse files Browse the repository at this point in the history
  • Loading branch information
Hackzzila authored Jul 23, 2024
2 parents 446f5eb + 2388b1e commit 76d3ea9
Show file tree
Hide file tree
Showing 167 changed files with 1,807 additions and 2,148 deletions.
16 changes: 8 additions & 8 deletions protocol_codegen/src/generate_messages/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1195,9 +1195,9 @@ impl PreparedStruct {
w.block(|w| {
if let Some(key) = &self.map_key {
writeln!(w, "type Key = {};", key.rust_name())?;
write!(w, "fn encode<B: ByteBufMut>(&self, key: &Self::Key, buf: &mut B, version: i16) -> Result<(), EncodeError> ")?;
write!(w, "fn encode<B: ByteBufMut>(&self, key: &Self::Key, buf: &mut B, version: i16) -> Result<()> ")?;
} else {
write!(w, "fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<(), EncodeError> ")?;
write!(w, "fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> ")?;
}
w.block(|w| {
for prepared_field in &self.prepared_fields {
Expand All @@ -1209,9 +1209,9 @@ impl PreparedStruct {
})?;
writeln!(w)?;
if self.map_key.is_some() {
write!(w, "fn compute_size(&self, key: &Self::Key, version: i16) -> Result<usize, EncodeError> ")?;
write!(w, "fn compute_size(&self, key: &Self::Key, version: i16) -> Result<usize> ")?;
} else {
write!(w, "fn compute_size(&self, version: i16) -> Result<usize, EncodeError> ")?;
write!(w, "fn compute_size(&self, version: i16) -> Result<usize> ")?;
}
w.block(|w| {
writeln!(w, "let mut total_size = 0;")?;
Expand All @@ -1235,9 +1235,9 @@ impl PreparedStruct {
w.block(|w| {
if let Some(key) = &self.map_key {
writeln!(w, "type Key = {};", key.rust_name())?;
write!(w, "fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<(Self::Key, Self), DecodeError> ")?;
write!(w, "fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<(Self::Key, Self)> ")?;
} else {
write!(w, "fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self, DecodeError> ")?;
write!(w, "fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> ")?;
}
w.block(|w| {
for prepared_field in &self.prepared_fields {
Expand Down Expand Up @@ -1352,10 +1352,10 @@ fn write_file_header<W: Write>(w: &mut CodeWriter<W>, name: &str) -> Result<(),
writeln!(w)?;
writeln!(w, "use bytes::Bytes;")?;
writeln!(w, "use uuid::Uuid;")?;
writeln!(w, "use anyhow::bail;")?;
writeln!(w, "use anyhow::{{bail, Result}};")?;
writeln!(w)?;
writeln!(w, "use crate::protocol::{{")?;
writeln!(w, " Encodable, Decodable, MapEncodable, MapDecodable, Encoder, Decoder, EncodeError, DecodeError, Message, HeaderVersion, VersionRange,")?;
writeln!(w, " Encodable, Decodable, MapEncodable, MapDecodable, Encoder, Decoder, Message, HeaderVersion, VersionRange,")?;
writeln!(w, " types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{{ByteBuf, ByteBufMut}}")?;
writeln!(w, "}};")?;
writeln!(w)?;
Expand Down
10 changes: 5 additions & 5 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! allows encoding and decoding records into a [`Record`](crate::records::Record).
use crate::protocol::buf::{ByteBuf, ByteBufMut};
use crate::protocol::{DecodeError, EncodeError};
use anyhow::Result;

mod gzip;
mod lz4;
Expand All @@ -23,17 +23,17 @@ pub trait Compressor<B: ByteBufMut> {
/// Target buffer type for compression.
type BufMut: ByteBufMut;
/// Compresses into provided [`ByteBufMut`], with records encoded by `F` into `R`.
fn compress<R, F>(buf: &mut B, f: F) -> Result<R, EncodeError>
fn compress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::BufMut) -> Result<R, EncodeError>;
F: FnOnce(&mut Self::BufMut) -> Result<R>;
}

/// A trait for record decompression algorithms.
pub trait Decompressor<B: ByteBuf> {
/// Target buffer type for decompression.
type Buf: ByteBuf;
/// Decompress records from `B` mapped using `F` into `R`.
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R, DecodeError>
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::Buf) -> Result<R, DecodeError>;
F: FnOnce(&mut Self::Buf) -> Result<R>;
}
11 changes: 5 additions & 6 deletions src/compression/gzip.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::io::Write;

use anyhow::Context;
use anyhow::{Context, Result};
use bytes::buf::BufMut;
use bytes::{Bytes, BytesMut};
use flate2::write::{GzDecoder, GzEncoder};
use flate2::Compression;

use crate::protocol::buf::{ByteBuf, ByteBufMut};
use crate::protocol::{DecodeError, EncodeError};

use super::{Compressor, Decompressor};

Expand All @@ -17,9 +16,9 @@ pub struct Gzip;

impl<B: ByteBufMut> Compressor<B> for Gzip {
type BufMut = BytesMut;
fn compress<R, F>(buf: &mut B, f: F) -> Result<R, EncodeError>
fn compress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::BufMut) -> Result<R, EncodeError>,
F: FnOnce(&mut Self::BufMut) -> Result<R>,
{
// Write uncompressed bytes into a temporary buffer
let mut tmp = BytesMut::new();
Expand All @@ -36,9 +35,9 @@ impl<B: ByteBufMut> Compressor<B> for Gzip {

impl<B: ByteBuf> Decompressor<B> for Gzip {
type Buf = Bytes;
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R, DecodeError>
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::Buf) -> Result<R, DecodeError>,
F: FnOnce(&mut Self::Buf) -> Result<R>,
{
let mut tmp = BytesMut::new();

Expand Down
17 changes: 8 additions & 9 deletions src/compression/lz4.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::protocol::buf::{ByteBuf, ByteBufMut};
use crate::protocol::{DecodeError, EncodeError};
use anyhow::Context;
use anyhow::{Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lz4::BlockMode;
use lz4::{Decoder, EncoderBuilder};
Expand All @@ -16,9 +15,9 @@ const COMPRESSION_LEVEL: u32 = 4;

impl<B: ByteBufMut> Compressor<B> for Lz4 {
type BufMut = BytesMut;
fn compress<R, F>(buf: &mut B, f: F) -> Result<R, EncodeError>
fn compress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::BufMut) -> Result<R, EncodeError>,
F: FnOnce(&mut Self::BufMut) -> Result<R>,
{
// Write uncompressed bytes into a temporary buffer
let mut tmp = BytesMut::new();
Expand All @@ -39,9 +38,9 @@ impl<B: ByteBufMut> Compressor<B> for Lz4 {

impl<B: ByteBuf> Decompressor<B> for Lz4 {
type Buf = Bytes;
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R, DecodeError>
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::Buf) -> Result<R, DecodeError>,
F: FnOnce(&mut Self::Buf) -> Result<R>,
{
let mut tmp = BytesMut::new().writer();

Expand All @@ -59,7 +58,7 @@ impl<B: ByteBuf> Decompressor<B> for Lz4 {
mod test {
use crate::compression::Lz4;
use crate::compression::{Compressor, Decompressor};
use crate::protocol::{DecodeError, EncodeError};
use anyhow::Result;
use bytes::BytesMut;
use std::fmt::Write;
use std::str;
Expand All @@ -68,13 +67,13 @@ mod test {
#[test]
fn test_lz4() {
let mut compressed = BytesMut::new();
Lz4::compress(&mut compressed, |buf| -> Result<(), EncodeError> {
Lz4::compress(&mut compressed, |buf| -> Result<()> {
buf.write_str("hello lz4").unwrap();
Ok(())
})
.unwrap();

Lz4::decompress(&mut compressed, |buf| -> Result<(), DecodeError> {
Lz4::decompress(&mut compressed, |buf| -> Result<()> {
let decompressed_str = str::from_utf8(buf.as_slice()).unwrap();
assert_eq!(decompressed_str, "hello lz4");
Ok(())
Expand Down
10 changes: 5 additions & 5 deletions src/compression/none.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::protocol::buf::{ByteBuf, ByteBufMut};
use crate::protocol::{DecodeError, EncodeError};
use anyhow::Result;

use super::{Compressor, Decompressor};

Expand All @@ -8,19 +8,19 @@ pub struct None;

impl<B: ByteBufMut> Compressor<B> for None {
type BufMut = B;
fn compress<R, F>(buf: &mut B, f: F) -> Result<R, EncodeError>
fn compress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::BufMut) -> Result<R, EncodeError>,
F: FnOnce(&mut Self::BufMut) -> Result<R>,
{
f(buf)
}
}

impl<B: ByteBuf> Decompressor<B> for None {
type Buf = B;
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R, DecodeError>
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::Buf) -> Result<R, DecodeError>,
F: FnOnce(&mut Self::Buf) -> Result<R>,
{
f(buf)
}
Expand Down
11 changes: 5 additions & 6 deletions src/compression/snappy.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use anyhow::Context;
use anyhow::{Context, Result};
use bytes::{Bytes, BytesMut};
use snap::raw::*;

use crate::protocol::buf::{ByteBuf, ByteBufMut};
use crate::protocol::{DecodeError, EncodeError};

use super::{Compressor, Decompressor};

Expand All @@ -13,9 +12,9 @@ pub struct Snappy;

impl<B: ByteBufMut> Compressor<B> for Snappy {
type BufMut = BytesMut;
fn compress<R, F>(buf: &mut B, f: F) -> Result<R, EncodeError>
fn compress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::BufMut) -> Result<R, EncodeError>,
F: FnOnce(&mut Self::BufMut) -> Result<R>,
{
// Write uncompressed bytes into a temporary buffer
let mut tmp = BytesMut::new();
Expand All @@ -35,9 +34,9 @@ impl<B: ByteBufMut> Compressor<B> for Snappy {

impl<B: ByteBuf> Decompressor<B> for Snappy {
type Buf = Bytes;
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R, DecodeError>
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::Buf) -> Result<R, DecodeError>,
F: FnOnce(&mut Self::Buf) -> Result<R>,
{
// Allocate a temporary buffer to hold the uncompressed bytes
let buf = buf.copy_to_bytes(buf.remaining());
Expand Down
17 changes: 8 additions & 9 deletions src/compression/zstd.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::protocol::buf::{ByteBuf, ByteBufMut};
use crate::protocol::{DecodeError, EncodeError};
use anyhow::Context;
use anyhow::{Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};

use super::{Compressor, Decompressor};
Expand All @@ -13,9 +12,9 @@ const COMPRESSION_LEVEL: i32 = 3;

impl<B: ByteBufMut> Compressor<B> for Zstd {
type BufMut = BytesMut;
fn compress<R, F>(buf: &mut B, f: F) -> Result<R, EncodeError>
fn compress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::BufMut) -> Result<R, EncodeError>,
F: FnOnce(&mut Self::BufMut) -> Result<R>,
{
// Write uncompressed bytes into a temporary buffer
let mut tmp = BytesMut::new();
Expand All @@ -31,9 +30,9 @@ impl<B: ByteBufMut> Compressor<B> for Zstd {
impl<B: ByteBuf> Decompressor<B> for Zstd {
type Buf = Bytes;

fn decompress<R, F>(buf: &mut B, f: F) -> Result<R, DecodeError>
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R>
where
F: FnOnce(&mut Self::Buf) -> Result<R, DecodeError>,
F: FnOnce(&mut Self::Buf) -> Result<R>,
{
let mut tmp = BytesMut::new().writer();
// Allocate a temporary buffer to hold the uncompressed bytes
Expand All @@ -48,7 +47,7 @@ impl<B: ByteBuf> Decompressor<B> for Zstd {
mod test {
use crate::compression::Zstd;
use crate::compression::{Compressor, Decompressor};
use crate::protocol::{DecodeError, EncodeError};
use anyhow::Result;
use bytes::BytesMut;
use std::fmt::Write;
use std::str;
Expand All @@ -57,13 +56,13 @@ mod test {
#[test]
fn test_zstd() {
let mut compressed = BytesMut::new();
Zstd::compress(&mut compressed, |buf| -> Result<(), EncodeError> {
Zstd::compress(&mut compressed, |buf| -> Result<()> {
buf.write_str("hello zstd")?;
Ok(())
})
.unwrap();

Zstd::decompress(&mut compressed, |buf| -> Result<(), DecodeError> {
Zstd::decompress(&mut compressed, |buf| -> Result<()> {
let decompressed_str = str::from_utf8(buf.as_slice())?;
assert_eq!(decompressed_str, "hello zstd");
Ok(())
Expand Down
12 changes: 6 additions & 6 deletions src/messages/add_offsets_to_txn_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
use std::borrow::Borrow;
use std::collections::BTreeMap;

use anyhow::bail;
use anyhow::{bail, Result};
use bytes::Bytes;
use uuid::Uuid;

use crate::protocol::{
buf::{ByteBuf, ByteBufMut},
compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Builder, Decodable,
DecodeError, Decoder, Encodable, EncodeError, Encoder, HeaderVersion, MapDecodable,
MapEncodable, Message, StrBytes, VersionRange,
Decoder, Encodable, Encoder, HeaderVersion, MapDecodable, MapEncodable, Message, StrBytes,
VersionRange,
};

/// Valid versions: 0-3
Expand Down Expand Up @@ -56,7 +56,7 @@ impl Builder for AddOffsetsToTxnRequest {
}

impl Encodable for AddOffsetsToTxnRequest {
fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<(), EncodeError> {
fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
if version >= 3 {
types::CompactString.encode(buf, &self.transactional_id)?;
} else {
Expand All @@ -83,7 +83,7 @@ impl Encodable for AddOffsetsToTxnRequest {
}
Ok(())
}
fn compute_size(&self, version: i16) -> Result<usize, EncodeError> {
fn compute_size(&self, version: i16) -> Result<usize> {
let mut total_size = 0;
if version >= 3 {
total_size += types::CompactString.compute_size(&self.transactional_id)?;
Expand Down Expand Up @@ -114,7 +114,7 @@ impl Encodable for AddOffsetsToTxnRequest {
}

impl Decodable for AddOffsetsToTxnRequest {
fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self, DecodeError> {
fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
let transactional_id = if version >= 3 {
types::CompactString.decode(buf)?
} else {
Expand Down
12 changes: 6 additions & 6 deletions src/messages/add_offsets_to_txn_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
use std::borrow::Borrow;
use std::collections::BTreeMap;

use anyhow::bail;
use anyhow::{bail, Result};
use bytes::Bytes;
use uuid::Uuid;

use crate::protocol::{
buf::{ByteBuf, ByteBufMut},
compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Builder, Decodable,
DecodeError, Decoder, Encodable, EncodeError, Encoder, HeaderVersion, MapDecodable,
MapEncodable, Message, StrBytes, VersionRange,
Decoder, Encodable, Encoder, HeaderVersion, MapDecodable, MapEncodable, Message, StrBytes,
VersionRange,
};

/// Valid versions: 0-3
Expand Down Expand Up @@ -46,7 +46,7 @@ impl Builder for AddOffsetsToTxnResponse {
}

impl Encodable for AddOffsetsToTxnResponse {
fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<(), EncodeError> {
fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
types::Int32.encode(buf, &self.throttle_time_ms)?;
types::Int16.encode(buf, &self.error_code)?;
if version >= 3 {
Expand All @@ -63,7 +63,7 @@ impl Encodable for AddOffsetsToTxnResponse {
}
Ok(())
}
fn compute_size(&self, version: i16) -> Result<usize, EncodeError> {
fn compute_size(&self, version: i16) -> Result<usize> {
let mut total_size = 0;
total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
total_size += types::Int16.compute_size(&self.error_code)?;
Expand All @@ -84,7 +84,7 @@ impl Encodable for AddOffsetsToTxnResponse {
}

impl Decodable for AddOffsetsToTxnResponse {
fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self, DecodeError> {
fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
let throttle_time_ms = types::Int32.decode(buf)?;
let error_code = types::Int16.decode(buf)?;
let mut unknown_tagged_fields = BTreeMap::new();
Expand Down
Loading

0 comments on commit 76d3ea9

Please sign in to comment.