Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional producer fixes #3971

Merged
merged 7 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@

librdkafka v1.9.3 is a maintenance release:

* Self-contained static libraries can now be built on Linux arm64 too
* Self-contained static libraries can now be built on Linux arm64.
* Fixes to the transactional and idempotent producer (#3971).


## Fixes

### Transactional producer fixes

* In certain circumstances, an InitProducerId request would be issued to bump the producer epoch before the open transaction was aborted, causing the producer to enter a failed state. The producer will now wait until the transaction is aborted before requesting an epoch bump. (#3971).
* While a commit operation was in queue, a timeout happens that can cause an abort. The state changes from COMMITTING_TRANSACTION to ABORTABLE_ERROR to ABORTING_TRANSACTION. When the broker comes up the error, that before was retriable, now is permanent or fatal because the state has changed from the initial one (#3971).
* When doing a drain and bump txn_curr_coord is not null in state WAIT_TRANSPORT, but if txn_coord has to be requested or broker is down it's retried. During this retry the txn_curr_coord can be set to null after a COORDINATOR_NOT_AVAILABLE error so when it comes back to the WAIT_TRANSPORT case a fatal error happens in the assert. It has to query for a transaction coordinator and retry (#3971).
* When aborting a transaction, if a local TIMED_OUT, TIMED_OUT_QUEUE or OUTDATED error happens, the error was not retriable (nor fenced or abortable) so rd_kafka_txn_op_abort_transaction_ack is never called, caller code doesn't know what to do so starts a new transaction but begin transaction fails because transactional state is not READY. These errors are now retriable (#3971).
* When calling TxnOffsetCommit, if retries were needed they were made at full speed, causing too much logging and network calls. A one second timeout has been added between retries (#3971).



Expand Down
67 changes: 49 additions & 18 deletions src/rdkafka_idempotence.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,22 @@ void rd_kafka_idemp_pid_fsm(rd_kafka_t *rk) {
case RD_KAFKA_IDEMP_STATE_WAIT_TRANSPORT:
/* Waiting for broker/coordinator to become available */
if (rd_kafka_is_transactional(rk)) {
/* Assert that a coordinator has been assigned by
/* A coordinator must have been assigned.
* inspecting txn_curr_coord (the real broker)
* rather than txn_coord (the logical broker). */
rd_assert(rk->rk_eos.txn_curr_coord);
if (!rk->rk_eos.txn_curr_coord) {
/*
* Can happen if the coordinator wasn't set or
* wasn't up initially and has been set to NULL
* after a COORDINATOR_NOT_AVAILABLE error in
* FindCoordinatorResponse. When the coordinator
* is known this FSM will be called again.
*/
if (rd_kafka_txn_coord_query(
rk, "Waiting coordinator"))
return; /* Fatal error */
break;
}
rkb = rk->rk_eos.txn_coord;
rd_kafka_broker_keep(rkb);

Expand Down Expand Up @@ -490,6 +502,7 @@ void rd_kafka_idemp_pid_update(rd_kafka_broker_t *rkb,
/* The idempotence state change will trigger the transaction manager,
* see rd_kafka_txn_idemp_state_change(). */
rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_ASSIGNED);
rd_kafka_txns_complete_waiting(rk);

rd_kafka_wrunlock(rk);

Expand Down Expand Up @@ -540,6 +553,7 @@ static void rd_kafka_idemp_drain_done(rd_kafka_t *rk) {
rd_kafka_pid2str(rk->rk_eos.pid));
rd_kafka_idemp_set_state(rk,
RD_KAFKA_IDEMP_STATE_ASSIGNED);
rd_kafka_txns_complete_waiting(rk);
wakeup_brokers = rd_true;
}
}
Expand Down Expand Up @@ -582,7 +596,7 @@ void rd_kafka_idemp_drain_reset(rd_kafka_t *rk, const char *reason) {
rd_kafka_wrlock(rk);
rd_kafka_dbg(rk, EOS, "DRAIN",
"Beginning partition drain for %s reset "
"for %d partition(s) with in-flight requests: %s",
"for %" PRId32 " partition(s) with in-flight requests: %s",
rd_kafka_pid2str(rk->rk_eos.pid),
rd_atomic32_get(&rk->rk_eos.inflight_toppar_cnt), reason);
rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_DRAIN_RESET);
Expand All @@ -599,6 +613,32 @@ void rd_kafka_idemp_drain_reset(rd_kafka_t *rk, const char *reason) {
*
* The PID is not bumped until the queues are fully drained.
*
* @param fmt is a human-readable reason for the bump.
*
* @locality any
* @locks none
*/
void rd_kafka_idemp_drain_epoch_bump_start(rd_kafka_t *rk, const char *reason) {
rd_kafka_wrlock(rk);
rd_kafka_dbg(rk, EOS, "DRAIN",
"Beginning partition drain for %s epoch bump "
"for %" PRId32 " partition(s) with in-flight requests: %s",
rd_kafka_pid2str(rk->rk_eos.pid),
rd_atomic32_get(&rk->rk_eos.inflight_toppar_cnt), reason);
rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_DRAIN_BUMP);
rd_kafka_wrunlock(rk);
/* Check right away if the drain could be done. */
rd_kafka_idemp_check_drain_done(rk);
}

/**
* @brief Schedule an epoch bump when the local ProduceRequest queues
* have been fully drained, or abort the current transaction,
* if the producer is transactional, and
* rd_kafka_idemp_drain_epoch_bump_start will be called after that.
*
* The PID is not bumped until the queues are fully drained.
*
* @param fmt is a human-readable reason for the bump
*
*
Expand All @@ -616,22 +656,13 @@ void rd_kafka_idemp_drain_epoch_bump(rd_kafka_t *rk,
rd_vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);

rd_kafka_wrlock(rk);
rd_kafka_dbg(rk, EOS, "DRAIN",
"Beginning partition drain for %s epoch bump "
"for %d partition(s) with in-flight requests: %s",
rd_kafka_pid2str(rk->rk_eos.pid),
rd_atomic32_get(&rk->rk_eos.inflight_toppar_cnt), buf);
rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_DRAIN_BUMP);
rd_kafka_wrunlock(rk);

/* Transactions: bumping the epoch requires the current transaction
* to be aborted. */
if (rd_kafka_is_transactional(rk))
if (rd_kafka_is_transactional(rk)) {
/* Transactions: requires aborting the current transaction
* before bumping the epoch . */
rd_kafka_txn_set_abortable_error_with_bump(rk, err, "%s", buf);

/* Check right away if the drain could be done. */
rd_kafka_idemp_check_drain_done(rk);
} else {
rd_kafka_idemp_drain_epoch_bump_start(rk, buf);
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_idempotence.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void rd_kafka_idemp_pid_update(rd_kafka_broker_t *rkb,
const rd_kafka_pid_t pid);
void rd_kafka_idemp_pid_fsm(rd_kafka_t *rk);
void rd_kafka_idemp_drain_reset(rd_kafka_t *rk, const char *reason);
void rd_kafka_idemp_drain_epoch_bump_start(rd_kafka_t *rk, const char *reason);
void rd_kafka_idemp_drain_epoch_bump(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
const char *fmt,
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,10 @@ struct rd_kafka_s {

/**< Transaction coordinator query timer */
rd_kafka_timer_t txn_coord_tmr;

/**< Queue of the completed transaction waiting response after
* epoch bump */
rd_kafka_q_t *completed_txn_waiting_bump;
} rk_eos;

rd_atomic32_t rk_flushing; /**< Application is calling flush(). */
Expand Down
Loading