Skip to content

Commit

Permalink
MsgSets with just aborted msgs raised a MSG_SIZE error, and fix backo…
Browse files Browse the repository at this point in the history
…ff (#2993)

This also removes fetch backoffs on underflows (truncated responses).
  • Loading branch information
edenhill committed Nov 30, 2021
1 parent a82595b commit eff237d
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 24 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ librdkafka v1.9.0 is a feature release:
keystore file was read. #3554.


### Consumer fixes

* A `ERR_MSG_SIZE_TOO_LARGE` consumer error would previously be raised
if the consumer received a maximum sized FetchResponse only containing
(transaction) aborted messages with no control messages. The fetching did
not stop, but some applications would terminate upon receiving this error.
No error is now raised in this case. (#2993)
Thanks to @jacobmikesell for providing an application to reproduce the
issue.
* The consumer no longer backs off the next fetch request (default 500ms) when
the parsed fetch response is truncated (which is a valid case).
This should speed up the message fetch rate in case of maximum sized
fetch responses.


# librdkafka v1.8.2

Expand Down
58 changes: 39 additions & 19 deletions src/rdkafka_msgset_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ typedef struct rd_kafka_msgset_reader_s {
int msetr_ctrl_cnt; /**< Number of control messages
* or MessageSets received. */

int msetr_aborted_cnt; /**< Number of aborted MessageSets
* encountered. */

const char *msetr_srcname; /**< Optional message source string,
* used in debug logging to
* indicate messages were
Expand Down Expand Up @@ -984,6 +987,7 @@ rd_kafka_msgset_reader_msgs_v2(rd_kafka_msgset_reader_t *msetr) {
msetr->msetr_rkbuf,
rd_slice_remains(
&msetr->msetr_rkbuf->rkbuf_reader));
msetr->msetr_aborted_cnt++;
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
}
Expand Down Expand Up @@ -1341,9 +1345,18 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) {
* This means the size limit perhaps was too tight,
* increase it automatically.
* If there was at least one control message there
* is probably not a size limit and nothing is done. */
* is probably not a size limit and nothing is done.
* If there were aborted messagesets and no underflow then
* there is no error either (#2993).
*
* Also; avoid propagating underflow errors, which cause
* backoffs, since we'll want to continue fetching the
* remaining truncated messages as soon as possible.
*/
if (msetr->msetr_ctrl_cnt > 0) {
/* Noop */
if (err == RD_KAFKA_RESP_ERR__UNDERFLOW)
err = RD_KAFKA_RESP_ERR_NO_ERROR;

} else if (rktp->rktp_fetch_msg_max_bytes < (1 << 30)) {
rktp->rktp_fetch_msg_max_bytes *= 2;
Expand All @@ -1354,17 +1367,25 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) {
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rktp->rktp_fetch_msg_max_bytes);
} else if (!err) {

if (err == RD_KAFKA_RESP_ERR__UNDERFLOW)
err = RD_KAFKA_RESP_ERR_NO_ERROR;

} else if (!err && msetr->msetr_aborted_cnt == 0) {
rd_kafka_consumer_err(
&msetr->msetr_rkq, msetr->msetr_broker_id,
RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
msetr->msetr_tver->version, NULL, rktp,
rktp->rktp_offsets.fetch_offset,
"Message at offset %" PRId64
" "
"might be too large to fetch, try increasing "
" might be too large to fetch, try increasing "
"receive.message.max.bytes",
rktp->rktp_offsets.fetch_offset);

} else if (msetr->msetr_aborted_cnt > 0) {
/* Noop */
if (err == RD_KAFKA_RESP_ERR__UNDERFLOW)
err = RD_KAFKA_RESP_ERR_NO_ERROR;
}

} else {
Expand All @@ -1379,21 +1400,20 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) {
err = RD_KAFKA_RESP_ERR_NO_ERROR;
}

rd_rkb_dbg(
msetr->msetr_rkb, MSG | RD_KAFKA_DBG_FETCH, "CONSUME",
"Enqueue %i %smessage(s) (%" PRId64
" bytes, %d ops) on "
"%s [%" PRId32
"] "
"fetch queue (qlen %d, v%d, last_offset %" PRId64
", %d ctrl msgs, %s)",
msetr->msetr_msgcnt, msetr->msetr_srcname, msetr->msetr_msg_bytes,
rd_kafka_q_len(&msetr->msetr_rkq), rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, rd_kafka_q_len(msetr->msetr_par_rkq),
msetr->msetr_tver->version, last_offset, msetr->msetr_ctrl_cnt,
msetr->msetr_compression
? rd_kafka_compression2str(msetr->msetr_compression)
: "uncompressed");
rd_rkb_dbg(msetr->msetr_rkb, MSG | RD_KAFKA_DBG_FETCH, "CONSUME",
"Enqueue %i %smessage(s) (%" PRId64
" bytes, %d ops) on %s [%" PRId32
"] fetch queue (qlen %d, v%d, last_offset %" PRId64
", %d ctrl msgs, %d aborted msgsets, %s)",
msetr->msetr_msgcnt, msetr->msetr_srcname,
msetr->msetr_msg_bytes, rd_kafka_q_len(&msetr->msetr_rkq),
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rd_kafka_q_len(msetr->msetr_par_rkq),
msetr->msetr_tver->version, last_offset,
msetr->msetr_ctrl_cnt, msetr->msetr_aborted_cnt,
msetr->msetr_compression
? rd_kafka_compression2str(msetr->msetr_compression)
: "uncompressed");

/* Concat all messages&errors onto the parent's queue
* (the partition's fetch queue) */
Expand Down
79 changes: 79 additions & 0 deletions tests/0129-fetch_aborted_msgs.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2021, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "test.h"


/**
* @brief Verify that a FetchResponse containing only aborted messages does not
* raise a ERR_MSG_SIZE_TOO_LARGE error. #2993.
*
* 1. Create topic with a small message.max.bytes to make sure that
* there's at least one full fetch response without any control messages,
* just aborted messages.
* 2. Transactionally produce 10x the message.max.bytes.
* 3. Abort the transaction.
* 4. Consume from start, verify that no error is received, wait for EOF.
*
*/
int main_0129_fetch_aborted_msgs(int argc, char **argv) {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
const char *topic = test_mk_topic_name("0129_fetch_aborted_msgs", 1);
const int msgcnt = 1000;
const size_t msgsize = 1000;

test_conf_init(&conf, NULL, 30);

test_conf_set(conf, "linger.ms", "10000");
test_conf_set(conf, "transactional.id", topic);
test_conf_set(conf, "message.max.bytes", "10000");
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

test_admin_create_topic(rk, topic, 1, 1,
(const char *[]){
"max.message.bytes", "10000",
"segment.bytes", "20000",
NULL });

TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

/* Produce half set of messages without waiting for delivery. */
test_produce_msgs2(rk, topic, 0, 0, 0, msgcnt, NULL, msgsize);

TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));

rd_kafka_destroy(rk);

/* Verify messages were actually produced by consuming them back. */
test_consume_msgs_easy(topic, topic, 0, 1, 0, NULL);

return 0;
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ set(
0125-immediate_flush.c
0126-oauthbearer_oidc.c
0128-sasl_callback_queue.cpp
0129-fetch_aborted_msgs.c
8000-idle.cpp
test.c
testcpp.cpp
Expand Down
24 changes: 19 additions & 5 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ _TEST_DECL(0124_openssl_invalid_engine);
_TEST_DECL(0125_immediate_flush);
_TEST_DECL(0126_oauthbearer_oidc);
_TEST_DECL(0128_sasl_callback_queue);
_TEST_DECL(0129_fetch_aborted_msgs);

/* Manual tests */
_TEST_DECL(8000_idle);
Expand Down Expand Up @@ -471,6 +472,7 @@ struct test tests[] = {
_TEST(0125_immediate_flush, 0),
_TEST(0126_oauthbearer_oidc, 0, TEST_BRKVER(3, 0, 0, 0)),
_TEST(0128_sasl_callback_queue, TEST_F_LOCAL, TEST_BRKVER(2, 0, 0, 0)),
_TEST(0129_fetch_aborted_msgs, 0, TEST_BRKVER(0, 11, 0, 0)),

/* Manual tests */
_TEST(8000_idle, TEST_F_MANUAL),
Expand Down Expand Up @@ -4542,11 +4544,15 @@ void test_kafka_topics(const char *fmt, ...) {

/**
* @brief Create topic using Topic Admin API
*
* @param configs is an optional key-value tuple array of
* topic configs (or NULL).
*/
static void test_admin_create_topic(rd_kafka_t *use_rk,
const char *topicname,
int partition_cnt,
int replication_factor) {
void test_admin_create_topic(rd_kafka_t *use_rk,
const char *topicname,
int partition_cnt,
int replication_factor,
const char **configs) {
rd_kafka_t *rk;
rd_kafka_NewTopic_t *newt[1];
const size_t newt_cnt = 1;
Expand All @@ -4571,6 +4577,14 @@ static void test_admin_create_topic(rd_kafka_t *use_rk,
errstr, sizeof(errstr));
TEST_ASSERT(newt[0] != NULL, "%s", errstr);

if (configs) {
int i;

for (i = 0; configs[i] && configs[i + 1]; i += 2)
TEST_CALL_ERR__(rd_kafka_NewTopic_set_config(
newt[0], configs[i], configs[i + 1]));
}

options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
err = rd_kafka_AdminOptions_set_operation_timeout(
options, timeout_ms, errstr, sizeof(errstr));
Expand Down Expand Up @@ -4651,7 +4665,7 @@ void test_create_topic(rd_kafka_t *use_rk,
replication_factor);
else
test_admin_create_topic(use_rk, topicname, partition_cnt,
replication_factor);
replication_factor, NULL);
}


Expand Down
5 changes: 5 additions & 0 deletions tests/test.h
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,11 @@ int test_partition_list_cmp(rd_kafka_topic_partition_list_t *al,
rd_kafka_topic_partition_list_t *bl);

void test_kafka_topics(const char *fmt, ...);
void test_admin_create_topic(rd_kafka_t *use_rk,
const char *topicname,
int partition_cnt,
int replication_factor,
const char **configs);
void test_create_topic(rd_kafka_t *use_rk,
const char *topicname,
int partition_cnt,
Expand Down
1 change: 1 addition & 0 deletions win32/tests/tests.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@
<ClCompile Include="..\..\tests\0125-immediate_flush.c" />
<ClCompile Include="..\..\tests\0126-oauthbearer_oidc.c" />
<ClCompile Include="..\..\tests\0128-sasl_callback_queue.cpp" />
<ClCompile Include="..\..\tests\0129-fetch_aborted_msgs.c" />
<ClCompile Include="..\..\tests\8000-idle.cpp" />
<ClCompile Include="..\..\tests\test.c" />
<ClCompile Include="..\..\tests\testcpp.cpp" />
Expand Down

0 comments on commit eff237d

Please sign in to comment.