diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3d61b391ea..1d46b4b86f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -30,6 +30,20 @@ librdkafka v1.9.0 is a feature release:
keystore file was read. #3554.
+### Consumer fixes
+
+ * A `ERR_MSG_SIZE_TOO_LARGE` consumer error would previously be raised
+ if the consumer received a maximum sized FetchResponse only containing
+ (transaction) aborted messages with no control messages. The fetching did
+ not stop, but some applications would terminate upon receiving this error.
+ No error is now raised in this case. (#2993)
+ Thanks to @jacobmikesell for providing an application to reproduce the
+ issue.
+ * The consumer no longer backs off the next fetch request (default 500ms) when
+ the parsed fetch response is truncated (which is a valid case).
+ This should speed up the message fetch rate in case of maximum sized
+ fetch responses.
+
# librdkafka v1.8.2
diff --git a/src/rdkafka_msgset_reader.c b/src/rdkafka_msgset_reader.c
index fdbd114104..28a199744f 100644
--- a/src/rdkafka_msgset_reader.c
+++ b/src/rdkafka_msgset_reader.c
@@ -194,6 +194,9 @@ typedef struct rd_kafka_msgset_reader_s {
int msetr_ctrl_cnt; /**< Number of control messages
* or MessageSets received. */
+ int msetr_aborted_cnt; /**< Number of aborted MessageSets
+ * encountered. */
+
const char *msetr_srcname; /**< Optional message source string,
* used in debug logging to
* indicate messages were
@@ -984,6 +987,7 @@ rd_kafka_msgset_reader_msgs_v2(rd_kafka_msgset_reader_t *msetr) {
msetr->msetr_rkbuf,
rd_slice_remains(
&msetr->msetr_rkbuf->rkbuf_reader));
+ msetr->msetr_aborted_cnt++;
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
}
@@ -1341,9 +1345,18 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) {
* This means the size limit perhaps was too tight,
* increase it automatically.
* If there was at least one control message there
- * is probably not a size limit and nothing is done. */
+ * is probably not a size limit and nothing is done.
+ * If there were aborted messagesets and no underflow then
+ * there is no error either (#2993).
+ *
+ * Also; avoid propagating underflow errors, which cause
+ * backoffs, since we'll want to continue fetching the
+ * remaining truncated messages as soon as possible.
+ */
if (msetr->msetr_ctrl_cnt > 0) {
/* Noop */
+ if (err == RD_KAFKA_RESP_ERR__UNDERFLOW)
+ err = RD_KAFKA_RESP_ERR_NO_ERROR;
} else if (rktp->rktp_fetch_msg_max_bytes < (1 << 30)) {
rktp->rktp_fetch_msg_max_bytes *= 2;
@@ -1354,17 +1367,25 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) {
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rktp->rktp_fetch_msg_max_bytes);
- } else if (!err) {
+
+ if (err == RD_KAFKA_RESP_ERR__UNDERFLOW)
+ err = RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ } else if (!err && msetr->msetr_aborted_cnt == 0) {
rd_kafka_consumer_err(
&msetr->msetr_rkq, msetr->msetr_broker_id,
RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
msetr->msetr_tver->version, NULL, rktp,
rktp->rktp_offsets.fetch_offset,
"Message at offset %" PRId64
- " "
- "might be too large to fetch, try increasing "
+ " might be too large to fetch, try increasing "
"receive.message.max.bytes",
rktp->rktp_offsets.fetch_offset);
+
+ } else if (msetr->msetr_aborted_cnt > 0) {
+ /* Noop */
+ if (err == RD_KAFKA_RESP_ERR__UNDERFLOW)
+ err = RD_KAFKA_RESP_ERR_NO_ERROR;
}
} else {
@@ -1379,21 +1400,20 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) {
err = RD_KAFKA_RESP_ERR_NO_ERROR;
}
- rd_rkb_dbg(
- msetr->msetr_rkb, MSG | RD_KAFKA_DBG_FETCH, "CONSUME",
- "Enqueue %i %smessage(s) (%" PRId64
- " bytes, %d ops) on "
- "%s [%" PRId32
- "] "
- "fetch queue (qlen %d, v%d, last_offset %" PRId64
- ", %d ctrl msgs, %s)",
- msetr->msetr_msgcnt, msetr->msetr_srcname, msetr->msetr_msg_bytes,
- rd_kafka_q_len(&msetr->msetr_rkq), rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition, rd_kafka_q_len(msetr->msetr_par_rkq),
- msetr->msetr_tver->version, last_offset, msetr->msetr_ctrl_cnt,
- msetr->msetr_compression
- ? rd_kafka_compression2str(msetr->msetr_compression)
- : "uncompressed");
+ rd_rkb_dbg(msetr->msetr_rkb, MSG | RD_KAFKA_DBG_FETCH, "CONSUME",
+ "Enqueue %i %smessage(s) (%" PRId64
+ " bytes, %d ops) on %s [%" PRId32
+ "] fetch queue (qlen %d, v%d, last_offset %" PRId64
+ ", %d ctrl msgs, %d aborted msgsets, %s)",
+ msetr->msetr_msgcnt, msetr->msetr_srcname,
+ msetr->msetr_msg_bytes, rd_kafka_q_len(&msetr->msetr_rkq),
+ rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
+ rd_kafka_q_len(msetr->msetr_par_rkq),
+ msetr->msetr_tver->version, last_offset,
+ msetr->msetr_ctrl_cnt, msetr->msetr_aborted_cnt,
+ msetr->msetr_compression
+ ? rd_kafka_compression2str(msetr->msetr_compression)
+ : "uncompressed");
/* Concat all messages&errors onto the parent's queue
* (the partition's fetch queue) */
diff --git a/tests/0129-fetch_aborted_msgs.c b/tests/0129-fetch_aborted_msgs.c
new file mode 100644
index 0000000000..eef49d8879
--- /dev/null
+++ b/tests/0129-fetch_aborted_msgs.c
@@ -0,0 +1,79 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2021, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+
+/**
+ * @brief Verify that a FetchResponse containing only aborted messages does not
+ * raise a ERR_MSG_SIZE_TOO_LARGE error. #2993.
+ *
+ * 1. Create topic with a small message.max.bytes to make sure that
+ * there's at least one full fetch response without any control messages,
+ * just aborted messages.
+ * 2. Transactionally produce 10x the message.max.bytes.
+ * 3. Abort the transaction.
+ * 4. Consume from start, verify that no error is received, wait for EOF.
+ *
+ */
+int main_0129_fetch_aborted_msgs(int argc, char **argv) {
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ const char *topic = test_mk_topic_name("0129_fetch_aborted_msgs", 1);
+ const int msgcnt = 1000;
+ const size_t msgsize = 1000;
+
+ test_conf_init(&conf, NULL, 30);
+
+ test_conf_set(conf, "linger.ms", "10000");
+ test_conf_set(conf, "transactional.id", topic);
+ test_conf_set(conf, "message.max.bytes", "10000");
+ rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ test_admin_create_topic(rk, topic, 1, 1,
+ (const char *[]){
+ "max.message.bytes", "10000",
+ "segment.bytes", "20000",
+ NULL });
+
+ TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
+ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
+
+ /* Produce half set of messages without waiting for delivery. */
+ test_produce_msgs2(rk, topic, 0, 0, 0, msgcnt, NULL, msgsize);
+
+ TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
+
+ rd_kafka_destroy(rk);
+
+ /* Verify messages were actually produced by consuming them back. */
+ test_consume_msgs_easy(topic, topic, 0, 1, 0, NULL);
+
+ return 0;
+}
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 34422b9375..7d714156cd 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -119,6 +119,7 @@ set(
0125-immediate_flush.c
0126-oauthbearer_oidc.c
0128-sasl_callback_queue.cpp
+ 0129-fetch_aborted_msgs.c
8000-idle.cpp
test.c
testcpp.cpp
diff --git a/tests/test.c b/tests/test.c
index d2afb37a01..399a967372 100644
--- a/tests/test.c
+++ b/tests/test.c
@@ -235,6 +235,7 @@ _TEST_DECL(0124_openssl_invalid_engine);
_TEST_DECL(0125_immediate_flush);
_TEST_DECL(0126_oauthbearer_oidc);
_TEST_DECL(0128_sasl_callback_queue);
+_TEST_DECL(0129_fetch_aborted_msgs);
/* Manual tests */
_TEST_DECL(8000_idle);
@@ -471,6 +472,7 @@ struct test tests[] = {
_TEST(0125_immediate_flush, 0),
_TEST(0126_oauthbearer_oidc, 0, TEST_BRKVER(3, 0, 0, 0)),
_TEST(0128_sasl_callback_queue, TEST_F_LOCAL, TEST_BRKVER(2, 0, 0, 0)),
+ _TEST(0129_fetch_aborted_msgs, 0, TEST_BRKVER(0, 11, 0, 0)),
/* Manual tests */
_TEST(8000_idle, TEST_F_MANUAL),
@@ -4542,11 +4544,15 @@ void test_kafka_topics(const char *fmt, ...) {
/**
* @brief Create topic using Topic Admin API
+ *
+ * @param configs is an optional key-value tuple array of
+ * topic configs (or NULL).
*/
-static void test_admin_create_topic(rd_kafka_t *use_rk,
- const char *topicname,
- int partition_cnt,
- int replication_factor) {
+void test_admin_create_topic(rd_kafka_t *use_rk,
+ const char *topicname,
+ int partition_cnt,
+ int replication_factor,
+ const char **configs) {
rd_kafka_t *rk;
rd_kafka_NewTopic_t *newt[1];
const size_t newt_cnt = 1;
@@ -4571,6 +4577,14 @@ static void test_admin_create_topic(rd_kafka_t *use_rk,
errstr, sizeof(errstr));
TEST_ASSERT(newt[0] != NULL, "%s", errstr);
+ if (configs) {
+ int i;
+
+ for (i = 0; configs[i] && configs[i + 1]; i += 2)
+ TEST_CALL_ERR__(rd_kafka_NewTopic_set_config(
+ newt[0], configs[i], configs[i + 1]));
+ }
+
options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
err = rd_kafka_AdminOptions_set_operation_timeout(
options, timeout_ms, errstr, sizeof(errstr));
@@ -4651,7 +4665,7 @@ void test_create_topic(rd_kafka_t *use_rk,
replication_factor);
else
test_admin_create_topic(use_rk, topicname, partition_cnt,
- replication_factor);
+ replication_factor, NULL);
}
diff --git a/tests/test.h b/tests/test.h
index fe170d55af..bbfd7a49e5 100644
--- a/tests/test.h
+++ b/tests/test.h
@@ -682,6 +682,11 @@ int test_partition_list_cmp(rd_kafka_topic_partition_list_t *al,
rd_kafka_topic_partition_list_t *bl);
void test_kafka_topics(const char *fmt, ...);
+void test_admin_create_topic(rd_kafka_t *use_rk,
+ const char *topicname,
+ int partition_cnt,
+ int replication_factor,
+ const char **configs);
void test_create_topic(rd_kafka_t *use_rk,
const char *topicname,
int partition_cnt,
diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj
index 6fe10900e4..0c27ec0690 100644
--- a/win32/tests/tests.vcxproj
+++ b/win32/tests/tests.vcxproj
@@ -209,6 +209,7 @@
+