Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update messages to kafka 3.9 #105

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions protocol_codegen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ fn main() -> Result<(), Error> {
};

// Checkout the release commit
// https://github.com/apache/kafka/releases/tag/3.8.0
// https://github.com/apache/kafka/releases/tag/3.9.0
// checking out a tag with git2 is annoying -- we pin to the tag's commit sha instead
let release_commit = "771b9576b00ecf5b64ab6e8bedf04156fbdb5cd6";
let release_commit = "84caaa6e9da06435411510a81fa321d4f99c351f";
println!("Checking out release {}", release_commit);
let oid = Oid::from_str(release_commit).unwrap();
let commit = repo
Expand Down
145 changes: 134 additions & 11 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,15 @@ pub use list_client_metrics_resources_request::ListClientMetricsResourcesRequest
pub mod describe_topic_partitions_request;
pub use describe_topic_partitions_request::DescribeTopicPartitionsRequest;

pub mod add_raft_voter_request;
pub use add_raft_voter_request::AddRaftVoterRequest;

pub mod remove_raft_voter_request;
pub use remove_raft_voter_request::RemoveRaftVoterRequest;

pub mod update_raft_voter_request;
pub use update_raft_voter_request::UpdateRaftVoterRequest;

pub mod produce_response;
pub use produce_response::ProduceResponse;

Expand Down Expand Up @@ -505,6 +514,15 @@ pub use list_client_metrics_resources_response::ListClientMetricsResourcesRespon
pub mod describe_topic_partitions_response;
pub use describe_topic_partitions_response::DescribeTopicPartitionsResponse;

pub mod add_raft_voter_response;
pub use add_raft_voter_response::AddRaftVoterResponse;

pub mod remove_raft_voter_response;
pub use remove_raft_voter_response::RemoveRaftVoterResponse;

pub mod update_raft_voter_response;
pub use update_raft_voter_response::UpdateRaftVoterResponse;

#[cfg(all(feature = "client", feature = "broker"))]
impl Request for ProduceRequest {
const KEY: i16 = 0;
Expand Down Expand Up @@ -961,6 +979,24 @@ impl Request for DescribeTopicPartitionsRequest {
type Response = DescribeTopicPartitionsResponse;
}

#[cfg(all(feature = "client", feature = "broker"))]
impl Request for AddRaftVoterRequest {
const KEY: i16 = 80;
type Response = AddRaftVoterResponse;
}

#[cfg(all(feature = "client", feature = "broker"))]
impl Request for RemoveRaftVoterRequest {
const KEY: i16 = 81;
type Response = RemoveRaftVoterResponse;
}

#[cfg(all(feature = "client", feature = "broker"))]
impl Request for UpdateRaftVoterRequest {
const KEY: i16 = 82;
type Response = UpdateRaftVoterResponse;
}

/// Valid API keys in the Kafka protocol.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApiKey {
Expand Down Expand Up @@ -1116,6 +1152,12 @@ pub enum ApiKey {
ListClientMetricsResources = 74,
/// API key for request DescribeTopicPartitionsRequest
DescribeTopicPartitions = 75,
/// API key for request AddRaftVoterRequest
AddRaftVoter = 80,
/// API key for request RemoveRaftVoterRequest
RemoveRaftVoter = 81,
/// API key for request UpdateRaftVoterRequest
UpdateRaftVoter = 82,
}

impl ApiKey {
Expand Down Expand Up @@ -1220,6 +1262,9 @@ impl ApiKey {
ApiKey::DescribeTopicPartitions => {
DescribeTopicPartitionsRequest::header_version(version)
}
ApiKey::AddRaftVoter => AddRaftVoterRequest::header_version(version),
ApiKey::RemoveRaftVoter => RemoveRaftVoterRequest::header_version(version),
ApiKey::UpdateRaftVoter => UpdateRaftVoterRequest::header_version(version),
}
}
/// Get the version of response header that needs to be prepended to this message
Expand Down Expand Up @@ -1323,30 +1368,33 @@ impl ApiKey {
ApiKey::DescribeTopicPartitions => {
DescribeTopicPartitionsResponse::header_version(version)
}
ApiKey::AddRaftVoter => AddRaftVoterResponse::header_version(version),
ApiKey::RemoveRaftVoter => RemoveRaftVoterResponse::header_version(version),
ApiKey::UpdateRaftVoter => UpdateRaftVoterResponse::header_version(version),
}
}
/// Returns the valid versions that can be used with this ApiKey
pub fn valid_versions(&self) -> VersionRange {
match self {
ApiKey::Produce => VersionRange { min: 0, max: 11 },
ApiKey::Fetch => VersionRange { min: 0, max: 16 },
ApiKey::ListOffsets => VersionRange { min: 0, max: 8 },
ApiKey::Fetch => VersionRange { min: 0, max: 17 },
ApiKey::ListOffsets => VersionRange { min: 0, max: 9 },
ApiKey::Metadata => VersionRange { min: 0, max: 12 },
ApiKey::LeaderAndIsr => VersionRange { min: 0, max: 7 },
ApiKey::StopReplica => VersionRange { min: 0, max: 4 },
ApiKey::UpdateMetadata => VersionRange { min: 0, max: 8 },
ApiKey::ControlledShutdown => VersionRange { min: 0, max: 3 },
ApiKey::OffsetCommit => VersionRange { min: 0, max: 9 },
ApiKey::OffsetFetch => VersionRange { min: 0, max: 9 },
ApiKey::FindCoordinator => VersionRange { min: 0, max: 5 },
ApiKey::FindCoordinator => VersionRange { min: 0, max: 6 },
ApiKey::JoinGroup => VersionRange { min: 0, max: 9 },
ApiKey::Heartbeat => VersionRange { min: 0, max: 4 },
ApiKey::LeaveGroup => VersionRange { min: 0, max: 5 },
ApiKey::SyncGroup => VersionRange { min: 0, max: 5 },
ApiKey::DescribeGroups => VersionRange { min: 0, max: 5 },
ApiKey::ListGroups => VersionRange { min: 0, max: 5 },
ApiKey::SaslHandshake => VersionRange { min: 0, max: 1 },
ApiKey::ApiVersions => VersionRange { min: 0, max: 3 },
ApiKey::ApiVersions => VersionRange { min: 0, max: 4 },
ApiKey::CreateTopics => VersionRange { min: 0, max: 7 },
ApiKey::DeleteTopics => VersionRange { min: 0, max: 6 },
ApiKey::DeleteRecords => VersionRange { min: 0, max: 2 },
Expand Down Expand Up @@ -1380,17 +1428,17 @@ impl ApiKey {
ApiKey::AlterClientQuotas => VersionRange { min: 0, max: 1 },
ApiKey::DescribeUserScramCredentials => VersionRange { min: 0, max: 0 },
ApiKey::AlterUserScramCredentials => VersionRange { min: 0, max: 0 },
ApiKey::Vote => VersionRange { min: 0, max: 0 },
ApiKey::BeginQuorumEpoch => VersionRange { min: 0, max: 0 },
ApiKey::EndQuorumEpoch => VersionRange { min: 0, max: 0 },
ApiKey::DescribeQuorum => VersionRange { min: 0, max: 1 },
ApiKey::Vote => VersionRange { min: 0, max: 1 },
ApiKey::BeginQuorumEpoch => VersionRange { min: 0, max: 1 },
ApiKey::EndQuorumEpoch => VersionRange { min: 0, max: 1 },
ApiKey::DescribeQuorum => VersionRange { min: 0, max: 2 },
ApiKey::AlterPartition => VersionRange { min: 0, max: 3 },
ApiKey::UpdateFeatures => VersionRange { min: 0, max: 1 },
ApiKey::Envelope => VersionRange { min: 0, max: 0 },
ApiKey::FetchSnapshot => VersionRange { min: 0, max: 0 },
ApiKey::FetchSnapshot => VersionRange { min: 0, max: 1 },
ApiKey::DescribeCluster => VersionRange { min: 0, max: 1 },
ApiKey::DescribeProducers => VersionRange { min: 0, max: 0 },
ApiKey::BrokerRegistration => VersionRange { min: 0, max: 3 },
ApiKey::BrokerRegistration => VersionRange { min: 0, max: 4 },
ApiKey::BrokerHeartbeat => VersionRange { min: 0, max: 1 },
ApiKey::UnregisterBroker => VersionRange { min: 0, max: 0 },
ApiKey::DescribeTransactions => VersionRange { min: 0, max: 0 },
Expand All @@ -1404,6 +1452,9 @@ impl ApiKey {
ApiKey::AssignReplicasToDirs => VersionRange { min: 0, max: 0 },
ApiKey::ListClientMetricsResources => VersionRange { min: 0, max: 0 },
ApiKey::DescribeTopicPartitions => VersionRange { min: 0, max: 0 },
ApiKey::AddRaftVoter => VersionRange { min: 0, max: 0 },
ApiKey::RemoveRaftVoter => VersionRange { min: 0, max: 0 },
ApiKey::UpdateRaftVoter => VersionRange { min: 0, max: 0 },
}
}

Expand Down Expand Up @@ -1505,6 +1556,9 @@ impl TryFrom<i16> for ApiKey {
Ok(ApiKey::ListClientMetricsResources)
}
x if x == ApiKey::DescribeTopicPartitions as i16 => Ok(ApiKey::DescribeTopicPartitions),
x if x == ApiKey::AddRaftVoter as i16 => Ok(ApiKey::AddRaftVoter),
x if x == ApiKey::RemoveRaftVoter as i16 => Ok(ApiKey::RemoveRaftVoter),
x if x == ApiKey::UpdateRaftVoter as i16 => Ok(ApiKey::UpdateRaftVoter),
_ => Err(()),
}
}
Expand Down Expand Up @@ -1667,6 +1721,12 @@ pub enum RequestKind {
ListClientMetricsResources(ListClientMetricsResourcesRequest),
/// DescribeTopicPartitionsRequest,
DescribeTopicPartitions(DescribeTopicPartitionsRequest),
/// AddRaftVoterRequest,
AddRaftVoter(AddRaftVoterRequest),
/// RemoveRaftVoterRequest,
RemoveRaftVoter(RemoveRaftVoterRequest),
/// UpdateRaftVoterRequest,
UpdateRaftVoter(UpdateRaftVoterRequest),
}

#[cfg(feature = "messages_enums")]
Expand Down Expand Up @@ -1751,6 +1811,9 @@ impl RequestKind {
RequestKind::AssignReplicasToDirs(x) => encode(x, bytes, version),
RequestKind::ListClientMetricsResources(x) => encode(x, bytes, version),
RequestKind::DescribeTopicPartitions(x) => encode(x, bytes, version),
RequestKind::AddRaftVoter(x) => encode(x, bytes, version),
RequestKind::RemoveRaftVoter(x) => encode(x, bytes, version),
RequestKind::UpdateRaftVoter(x) => encode(x, bytes, version),
}
}
/// Decode the message from the provided buffer and version
Expand Down Expand Up @@ -1889,6 +1952,9 @@ impl RequestKind {
ApiKey::DescribeTopicPartitions => Ok(RequestKind::DescribeTopicPartitions(decode(
bytes, version,
)?)),
ApiKey::AddRaftVoter => Ok(RequestKind::AddRaftVoter(decode(bytes, version)?)),
ApiKey::RemoveRaftVoter => Ok(RequestKind::RemoveRaftVoter(decode(bytes, version)?)),
ApiKey::UpdateRaftVoter => Ok(RequestKind::UpdateRaftVoter(decode(bytes, version)?)),
}
}
}
Expand Down Expand Up @@ -2424,6 +2490,27 @@ impl From<DescribeTopicPartitionsRequest> for RequestKind {
}
}

#[cfg(feature = "messages_enums")]
impl From<AddRaftVoterRequest> for RequestKind {
fn from(value: AddRaftVoterRequest) -> RequestKind {
RequestKind::AddRaftVoter(value)
}
}

#[cfg(feature = "messages_enums")]
impl From<RemoveRaftVoterRequest> for RequestKind {
fn from(value: RemoveRaftVoterRequest) -> RequestKind {
RequestKind::RemoveRaftVoter(value)
}
}

#[cfg(feature = "messages_enums")]
impl From<UpdateRaftVoterRequest> for RequestKind {
fn from(value: UpdateRaftVoterRequest) -> RequestKind {
RequestKind::UpdateRaftVoter(value)
}
}

#[cfg(feature = "messages_enums")]
#[cfg(any(feature = "client", feature = "broker"))]
fn decode<T: Decodable>(bytes: &mut bytes::Bytes, version: i16) -> Result<T> {
Expand Down Expand Up @@ -2605,6 +2692,12 @@ pub enum ResponseKind {
ListClientMetricsResources(ListClientMetricsResourcesResponse),
/// DescribeTopicPartitionsResponse,
DescribeTopicPartitions(DescribeTopicPartitionsResponse),
/// AddRaftVoterResponse,
AddRaftVoter(AddRaftVoterResponse),
/// RemoveRaftVoterResponse,
RemoveRaftVoter(RemoveRaftVoterResponse),
/// UpdateRaftVoterResponse,
UpdateRaftVoter(UpdateRaftVoterResponse),
}

#[cfg(feature = "messages_enums")]
Expand Down Expand Up @@ -2689,6 +2782,9 @@ impl ResponseKind {
ResponseKind::AssignReplicasToDirs(x) => encode(x, bytes, version),
ResponseKind::ListClientMetricsResources(x) => encode(x, bytes, version),
ResponseKind::DescribeTopicPartitions(x) => encode(x, bytes, version),
ResponseKind::AddRaftVoter(x) => encode(x, bytes, version),
ResponseKind::RemoveRaftVoter(x) => encode(x, bytes, version),
ResponseKind::UpdateRaftVoter(x) => encode(x, bytes, version),
}
}
/// Decode the message from the provided buffer and version
Expand Down Expand Up @@ -2827,6 +2923,9 @@ impl ResponseKind {
ApiKey::DescribeTopicPartitions => Ok(ResponseKind::DescribeTopicPartitions(decode(
bytes, version,
)?)),
ApiKey::AddRaftVoter => Ok(ResponseKind::AddRaftVoter(decode(bytes, version)?)),
ApiKey::RemoveRaftVoter => Ok(ResponseKind::RemoveRaftVoter(decode(bytes, version)?)),
ApiKey::UpdateRaftVoter => Ok(ResponseKind::UpdateRaftVoter(decode(bytes, version)?)),
}
}
/// Get the version of request header that needs to be prepended to this message
Expand Down Expand Up @@ -2960,6 +3059,9 @@ impl ResponseKind {
ResponseKind::DescribeTopicPartitions(_) => {
DescribeTopicPartitionsResponse::header_version(version)
}
ResponseKind::AddRaftVoter(_) => AddRaftVoterResponse::header_version(version),
ResponseKind::RemoveRaftVoter(_) => RemoveRaftVoterResponse::header_version(version),
ResponseKind::UpdateRaftVoter(_) => UpdateRaftVoterResponse::header_version(version),
}
}
}
Expand Down Expand Up @@ -3496,7 +3598,28 @@ impl From<DescribeTopicPartitionsResponse> for ResponseKind {
}
}

/// The ID of the leader broker.
#[cfg(feature = "messages_enums")]
impl From<AddRaftVoterResponse> for ResponseKind {
fn from(value: AddRaftVoterResponse) -> ResponseKind {
ResponseKind::AddRaftVoter(value)
}
}

#[cfg(feature = "messages_enums")]
impl From<RemoveRaftVoterResponse> for ResponseKind {
fn from(value: RemoveRaftVoterResponse) -> ResponseKind {
ResponseKind::RemoveRaftVoter(value)
}
}

#[cfg(feature = "messages_enums")]
impl From<UpdateRaftVoterResponse> for ResponseKind {
fn from(value: UpdateRaftVoterResponse) -> ResponseKind {
ResponseKind::UpdateRaftVoter(value)
}
}

/// The replica id of the current leader or -1 if the leader is unknown
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Default, Copy)]
pub struct BrokerId(pub i32);

Expand Down
Loading