Skip to content

Commit

Permalink
Merge branch 'main' into recordIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
aovestdipaperino authored Feb 4, 2025
2 parents 0695ef3 + e2c0731 commit a613ffc
Show file tree
Hide file tree
Showing 181 changed files with 2,856 additions and 2,594 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: Swatinem/rust-cache@v2
with:
# this line means that only the main branch writes to the cache
# benefits: save about 7s per workflow by skipping the actual cache write
# downsides: PRs that update rust version or changes deps will be slower to iterate on due to changes not being cached.
save-if: ${{ github.ref == 'refs/heads/main' }}
- uses: dtolnay/rust-toolchain@stable
- run: cargo build --workspace --all-features
- run: cargo test --workspace --all-features
Expand All @@ -29,6 +35,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: Swatinem/rust-cache@v2
with:
save-if: ${{ github.ref == 'refs/heads/main' }}
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy
Expand Down
24 changes: 21 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,28 @@

## Unreleased

- `Records::encode`/`Records::decode` no longer has unneeded trait bounds.

## v0.14.0

- `Records::encode`/`Records::decode` is now reverted back to their original API, instead `encode_with_custom_compression` and `decode_with_custom_compression` is provided for custom compression.
- `ApiKey` variants are renamed to remove the `Key` suffix.
- Add `iterate_all` and `valid_versions` methods to `ApiKey`
- Implement `PartialEq` and `Display` for `VersionRange`

## v0.13.0

- All "map" types in the protocol that were previously of type `IndexMap<K, V>` are now of type `Vec<V>`, the value of `K` is stored as a field within `V`.
- This was done to resolve <https://github.com/tychedelia/kafka-protocol-rs/issues/84> and improve decoding speed.
- If you were previously calling `.get()` on the map, the best way to migrate is to refactor your code to avoid the need to lookup by iterating over the items in the response instead.
- Alternatively, you could replace `responses.get(name)` with something like `responses.iter().find(|x| x.name == name)` to achieve the same result. But note that this access is now O(N) instead of O(1).
- Alternatively, you could use an intermediate hashmap before converting to a Vec to retain the O(1) lookup.
- Update protocol to kafka 3.8.0
- The Debug impl for new type wrappers now passes directly to the inner type.
The full list of new type wrappers is BrokerId, GroupId, ProducerId, TopicName and TransactionalId.
For example GroupId was previously `GroupId("some group")` but is now `"some group"`.
- ApiKey is now non_exhaustive.
- Added `gzip`, `zstd`, `snappy` and `lz4` features to enable the different compression algorithms for records (All enabled by default)

## v0.12.0

Expand All @@ -22,11 +40,11 @@
- Use `IntoIterator` instead of `Iterator` for `RecordBatchEncoder::encode`
- Use CRC-32 ISO/HDLC instead of CRC-32 CKSUM.
- Add `Display` and more `From<T>` implementations for `StrBytes`.
- Avoid redunand variant names in RequestKind/ResponseKind.
- Avoid redundant variant names in RequestKind/ResponseKind.

## v0.10.2

- Implement From<T> for RequestKind and ResponseKind.
- Implement `From<T>` for RequestKind and ResponseKind.

## v0.10.1

Expand Down Expand Up @@ -60,7 +78,7 @@ and other misc improvements.

## v0.7.0

- Switch to [crc32c](https://crates.io/crates/crc32c) crate, providing hardware accelration for crc operations
- Switch to [crc32c](https://crates.io/crates/crc32c) crate, providing hardware acceleration for crc operations
on supported platforms.
- Formatting fixes.
- Miscellaneous dependency updates.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 17 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ members = ["protocol_codegen"]

[package]
name = "kafka-protocol"
version = "0.12.0"
version = "0.14.0"
authors = [
"Diggory Blake <[email protected]>",
"Charlotte McElwain <[email protected]>",
"belltoy <[email protected]>",
]
edition = "2018"
edition = "2021"
license = "MIT/Apache-2.0"
description = "Implementation of Kafka wire protocol."
homepage = "https://github.com/tychedelia/kafka-protocol-rs"
Expand All @@ -28,17 +28,27 @@ client = []
# Enable this feature if you are implementing a kafka protocol compatible broker.
# It will enable encoding of responses and decoding of requests.
broker = []
default = ["client", "broker"]

# Enable compression of records using gzip
gzip = ["dep:flate2"]
# Enable compression of records using zstd
zstd = ["dep:zstd"]
# Enable compression of records using snap
snappy = ["dep:snap"]
# Enable compression of records using lz4
lz4 = ["dep:lz4"]

default = ["client", "broker", "gzip", "zstd", "snappy", "lz4"]

[dependencies]
bytes = "1.0.1"
uuid = "1.3.0"
indexmap = "2.0.0"
crc = "3.0.0"
snap = "1.0.5"
flate2 = "1.0.20"
zstd = "0.13"
lz4 = "1.24"
snap = { version = "1.0.5", optional = true }
flate2 = { version = "1.0.20", optional = true }
zstd = { version = "0.13", optional = true }
lz4 = { version = "1.24", optional = true }
paste = "1.0.7"
crc32c = "0.6.4"
anyhow = "1.0.80"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use kafka_protocol::protocol::StrBytes;

let mut header = RequestHeader::default();
header.client_id = Some(StrBytes::from_static_str("my-client"));
header.request_api_key = ApiKey::MetadataKey as i16;
header.request_api_key = ApiKey::Metadata as i16;
header.request_api_version = 12;

let mut request = MetadataRequest::default();
Expand All @@ -38,7 +38,7 @@ use kafka_protocol::protocol::StrBytes;

let header = RequestHeader::default()
.with_client_id(Some(StrBytes::from_static_str("my-client")))
.with_request_api_key(ApiKey::MetadataKey as i16)
.with_request_api_key(ApiKey::Metadata as i16)
.with_request_api_version(12);

let request = MetadataRequest::default()
Expand Down
2 changes: 1 addition & 1 deletion protocol_codegen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "protocol_codegen"
version = "0.1.0"
authors = ["Diggory Blake <[email protected]>"]
edition = "2018"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
61 changes: 50 additions & 11 deletions protocol_codegen/src/generate_messages.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
Expand All @@ -11,13 +11,10 @@ mod generate;
mod parse;
mod spec;

use spec::SpecType;
use spec::{SpecType, VersionSpec};

pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Result<(), Error> {
input_file_paths.sort();
let mut entity_types = BTreeSet::new();
let mut request_types = BTreeMap::new();
let mut response_types = BTreeMap::new();

let module_path = format!("{}.rs", messages_module_dir);

Expand All @@ -38,6 +35,7 @@ pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Res
)?;
writeln!(m, "#[cfg(all(feature = \"client\", feature = \"broker\"))]")?;
writeln!(m, "use crate::protocol::Request;")?;
writeln!(m, "use crate::protocol::VersionRange;")?;
writeln!(m, "use std::convert::TryFrom;")?;
writeln!(m, "#[cfg(feature = \"messages_enums\")]")?;
writeln!(m, "#[cfg(any(feature = \"client\", feature = \"broker\"))]")?;
Expand All @@ -51,15 +49,22 @@ pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Res
writeln!(m, "use anyhow::Context;")?;
writeln!(m)?;

let mut entity_types = BTreeSet::new();
let mut request_types = BTreeMap::new();
let mut response_types = BTreeMap::new();
let mut api_key_to_valid_version: HashMap<i16, VersionSpec> = HashMap::new();

for input_file_path in &input_file_paths {
let spec = parse::parse(input_file_path)?;
let spec_meta = (spec.type_, spec.api_key);
let valid_versions = spec.valid_versions;

let outcome = generate::generate(messages_module_dir, spec)?;
if let Some(output) = outcome {
match spec_meta {
(SpecType::Request, Some(k)) => {
request_types.insert(k, output);
api_key_to_valid_version.insert(k, valid_versions);
}
(SpecType::Response, Some(k)) => {
response_types.insert(k, output);
Expand Down Expand Up @@ -132,7 +137,7 @@ pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Res
writeln!(
m,
" {} = {},",
request_type.replace("Request", "Key"),
request_type.replace("Request", ""),
api_key
)?;
}
Expand All @@ -153,7 +158,7 @@ pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Res
writeln!(
m,
" ApiKey::{} => {}::header_version(version),",
request_type.replace("Request", "Key"),
request_type.replace("Request", ""),
request_type
)?;
}
Expand All @@ -173,12 +178,46 @@ pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Res
writeln!(
m,
" ApiKey::{} => {}::header_version(version),",
response_type.replace("Response", "Key"),
response_type.replace("Response", ""),
response_type
)?;
}
writeln!(m, " }}")?;
writeln!(m, " }}")?;

writeln!(
m,
" /// Returns the valid versions that can be used with this ApiKey"
)?;
writeln!(m, " pub fn valid_versions(&self) -> VersionRange {{")?;
writeln!(m, " match self {{")?;
for (api_key, request_type) in request_types.iter() {
let valid_versions = api_key_to_valid_version
.get(api_key)
.unwrap()
.range()
.unwrap();
writeln!(
m,
"ApiKey::{} => VersionRange {{ min: {}, max: {} }},",
request_type.replace("Request", ""),
valid_versions.start(),
valid_versions.end(),
)?;
}
writeln!(m, " }}")?;
writeln!(m, " }}")?;

writeln!(
m,
r#"
/// Iterate through every ApiKey variant in the order of the internal code.
pub fn iter() -> impl Iterator<Item = ApiKey> {{
(0..i16::MAX).map_while(|i| ApiKey::try_from(i).ok())
}}
"#
)?;

writeln!(m, "}}")?;

writeln!(m, "impl TryFrom<i16> for ApiKey {{")?;
Expand All @@ -187,7 +226,7 @@ pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Res
writeln!(m, " fn try_from(v: i16) -> Result<Self, Self::Error> {{")?;
writeln!(m, " match v {{")?;
for (_, request_type) in request_types.iter() {
let key = request_type.replace("Request", "Key");
let key = request_type.replace("Request", "");
writeln!(
m,
" x if x == ApiKey::{} as i16 => Ok(ApiKey::{}),",
Expand Down Expand Up @@ -250,7 +289,7 @@ pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Res
let variant = request_type.trim_end_matches("Request");
writeln!(
m,
"ApiKey::{variant}Key => Ok(RequestKind::{variant}(decode(bytes, version)?)),"
"ApiKey::{variant} => Ok(RequestKind::{variant}(decode(bytes, version)?)),"
)?;
}
writeln!(m, "}}")?;
Expand Down Expand Up @@ -351,7 +390,7 @@ fn encode<T: Encodable>(encodable: &T, bytes: &mut bytes::BytesMut, version: i16
let variant = response_type.trim_end_matches("Response");
writeln!(
m,
"ApiKey::{variant}Key => Ok(ResponseKind::{variant}(decode(bytes, version)?)),"
"ApiKey::{variant} => Ok(ResponseKind::{variant}(decode(bytes, version)?)),"
)?;
}
writeln!(m, "}}")?;
Expand Down
Loading

0 comments on commit a613ffc

Please sign in to comment.