From eff237d89275273f3fd8470506c0c1d6b2da8793 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Tue, 30 Nov 2021 17:33:21 +0100 Subject: [PATCH] MsgSets with just aborted msgs raised a MSG_SIZE error, and fix backoff (#2993) This also removes fetch backoffs on underflows (truncated responses). --- CHANGELOG.md | 14 ++++++ src/rdkafka_msgset_reader.c | 58 ++++++++++++++++-------- tests/0129-fetch_aborted_msgs.c | 79 +++++++++++++++++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/test.c | 24 +++++++--- tests/test.h | 5 +++ win32/tests/tests.vcxproj | 1 + 7 files changed, 158 insertions(+), 24 deletions(-) create mode 100644 tests/0129-fetch_aborted_msgs.c diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d61b391ea..1d46b4b86f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,20 @@ librdkafka v1.9.0 is a feature release: keystore file was read. #3554. +### Consumer fixes + + * A `ERR_MSG_SIZE_TOO_LARGE` consumer error would previously be raised + if the consumer received a maximum sized FetchResponse only containing + (transaction) aborted messages with no control messages. The fetching did + not stop, but some applications would terminate upon receiving this error. + No error is now raised in this case. (#2993) + Thanks to @jacobmikesell for providing an application to reproduce the + issue. + * The consumer no longer backs off the next fetch request (default 500ms) when + the parsed fetch response is truncated (which is a valid case). + This should speed up the message fetch rate in case of maximum sized + fetch responses. + # librdkafka v1.8.2 diff --git a/src/rdkafka_msgset_reader.c b/src/rdkafka_msgset_reader.c index fdbd114104..28a199744f 100644 --- a/src/rdkafka_msgset_reader.c +++ b/src/rdkafka_msgset_reader.c @@ -194,6 +194,9 @@ typedef struct rd_kafka_msgset_reader_s { int msetr_ctrl_cnt; /**< Number of control messages * or MessageSets received. */ + int msetr_aborted_cnt; /**< Number of aborted MessageSets + * encountered. */ + const char *msetr_srcname; /**< Optional message source string, * used in debug logging to * indicate messages were @@ -984,6 +987,7 @@ rd_kafka_msgset_reader_msgs_v2(rd_kafka_msgset_reader_t *msetr) { msetr->msetr_rkbuf, rd_slice_remains( &msetr->msetr_rkbuf->rkbuf_reader)); + msetr->msetr_aborted_cnt++; return RD_KAFKA_RESP_ERR_NO_ERROR; } } @@ -1341,9 +1345,18 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) { * This means the size limit perhaps was too tight, * increase it automatically. * If there was at least one control message there - * is probably not a size limit and nothing is done. */ + * is probably not a size limit and nothing is done. + * If there were aborted messagesets and no underflow then + * there is no error either (#2993). + * + * Also; avoid propagating underflow errors, which cause + * backoffs, since we'll want to continue fetching the + * remaining truncated messages as soon as possible. + */ if (msetr->msetr_ctrl_cnt > 0) { /* Noop */ + if (err == RD_KAFKA_RESP_ERR__UNDERFLOW) + err = RD_KAFKA_RESP_ERR_NO_ERROR; } else if (rktp->rktp_fetch_msg_max_bytes < (1 << 30)) { rktp->rktp_fetch_msg_max_bytes *= 2; @@ -1354,17 +1367,25 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) { rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, rktp->rktp_fetch_msg_max_bytes); - } else if (!err) { + + if (err == RD_KAFKA_RESP_ERR__UNDERFLOW) + err = RD_KAFKA_RESP_ERR_NO_ERROR; + + } else if (!err && msetr->msetr_aborted_cnt == 0) { rd_kafka_consumer_err( &msetr->msetr_rkq, msetr->msetr_broker_id, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE, msetr->msetr_tver->version, NULL, rktp, rktp->rktp_offsets.fetch_offset, "Message at offset %" PRId64 - " " - "might be too large to fetch, try increasing " + " might be too large to fetch, try increasing " "receive.message.max.bytes", rktp->rktp_offsets.fetch_offset); + + } else if (msetr->msetr_aborted_cnt > 0) { + /* Noop */ + if (err == RD_KAFKA_RESP_ERR__UNDERFLOW) + err = RD_KAFKA_RESP_ERR_NO_ERROR; } } else { @@ -1379,21 +1400,20 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) { err = RD_KAFKA_RESP_ERR_NO_ERROR; } - rd_rkb_dbg( - msetr->msetr_rkb, MSG | RD_KAFKA_DBG_FETCH, "CONSUME", - "Enqueue %i %smessage(s) (%" PRId64 - " bytes, %d ops) on " - "%s [%" PRId32 - "] " - "fetch queue (qlen %d, v%d, last_offset %" PRId64 - ", %d ctrl msgs, %s)", - msetr->msetr_msgcnt, msetr->msetr_srcname, msetr->msetr_msg_bytes, - rd_kafka_q_len(&msetr->msetr_rkq), rktp->rktp_rkt->rkt_topic->str, - rktp->rktp_partition, rd_kafka_q_len(msetr->msetr_par_rkq), - msetr->msetr_tver->version, last_offset, msetr->msetr_ctrl_cnt, - msetr->msetr_compression - ? rd_kafka_compression2str(msetr->msetr_compression) - : "uncompressed"); + rd_rkb_dbg(msetr->msetr_rkb, MSG | RD_KAFKA_DBG_FETCH, "CONSUME", + "Enqueue %i %smessage(s) (%" PRId64 + " bytes, %d ops) on %s [%" PRId32 + "] fetch queue (qlen %d, v%d, last_offset %" PRId64 + ", %d ctrl msgs, %d aborted msgsets, %s)", + msetr->msetr_msgcnt, msetr->msetr_srcname, + msetr->msetr_msg_bytes, rd_kafka_q_len(&msetr->msetr_rkq), + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rd_kafka_q_len(msetr->msetr_par_rkq), + msetr->msetr_tver->version, last_offset, + msetr->msetr_ctrl_cnt, msetr->msetr_aborted_cnt, + msetr->msetr_compression + ? rd_kafka_compression2str(msetr->msetr_compression) + : "uncompressed"); /* Concat all messages&errors onto the parent's queue * (the partition's fetch queue) */ diff --git a/tests/0129-fetch_aborted_msgs.c b/tests/0129-fetch_aborted_msgs.c new file mode 100644 index 0000000000..eef49d8879 --- /dev/null +++ b/tests/0129-fetch_aborted_msgs.c @@ -0,0 +1,79 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2021, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + + +/** + * @brief Verify that a FetchResponse containing only aborted messages does not + * raise a ERR_MSG_SIZE_TOO_LARGE error. #2993. + * + * 1. Create topic with a small message.max.bytes to make sure that + * there's at least one full fetch response without any control messages, + * just aborted messages. + * 2. Transactionally produce 10x the message.max.bytes. + * 3. Abort the transaction. + * 4. Consume from start, verify that no error is received, wait for EOF. + * + */ +int main_0129_fetch_aborted_msgs(int argc, char **argv) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + const char *topic = test_mk_topic_name("0129_fetch_aborted_msgs", 1); + const int msgcnt = 1000; + const size_t msgsize = 1000; + + test_conf_init(&conf, NULL, 30); + + test_conf_set(conf, "linger.ms", "10000"); + test_conf_set(conf, "transactional.id", topic); + test_conf_set(conf, "message.max.bytes", "10000"); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + test_admin_create_topic(rk, topic, 1, 1, + (const char *[]){ + "max.message.bytes", "10000", + "segment.bytes", "20000", + NULL }); + + TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); + TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); + + /* Produce half set of messages without waiting for delivery. */ + test_produce_msgs2(rk, topic, 0, 0, 0, msgcnt, NULL, msgsize); + + TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); + + rd_kafka_destroy(rk); + + /* Verify messages were actually produced by consuming them back. */ + test_consume_msgs_easy(topic, topic, 0, 1, 0, NULL); + + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 34422b9375..7d714156cd 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -119,6 +119,7 @@ set( 0125-immediate_flush.c 0126-oauthbearer_oidc.c 0128-sasl_callback_queue.cpp + 0129-fetch_aborted_msgs.c 8000-idle.cpp test.c testcpp.cpp diff --git a/tests/test.c b/tests/test.c index d2afb37a01..399a967372 100644 --- a/tests/test.c +++ b/tests/test.c @@ -235,6 +235,7 @@ _TEST_DECL(0124_openssl_invalid_engine); _TEST_DECL(0125_immediate_flush); _TEST_DECL(0126_oauthbearer_oidc); _TEST_DECL(0128_sasl_callback_queue); +_TEST_DECL(0129_fetch_aborted_msgs); /* Manual tests */ _TEST_DECL(8000_idle); @@ -471,6 +472,7 @@ struct test tests[] = { _TEST(0125_immediate_flush, 0), _TEST(0126_oauthbearer_oidc, 0, TEST_BRKVER(3, 0, 0, 0)), _TEST(0128_sasl_callback_queue, TEST_F_LOCAL, TEST_BRKVER(2, 0, 0, 0)), + _TEST(0129_fetch_aborted_msgs, 0, TEST_BRKVER(0, 11, 0, 0)), /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL), @@ -4542,11 +4544,15 @@ void test_kafka_topics(const char *fmt, ...) { /** * @brief Create topic using Topic Admin API + * + * @param configs is an optional key-value tuple array of + * topic configs (or NULL). */ -static void test_admin_create_topic(rd_kafka_t *use_rk, - const char *topicname, - int partition_cnt, - int replication_factor) { +void test_admin_create_topic(rd_kafka_t *use_rk, + const char *topicname, + int partition_cnt, + int replication_factor, + const char **configs) { rd_kafka_t *rk; rd_kafka_NewTopic_t *newt[1]; const size_t newt_cnt = 1; @@ -4571,6 +4577,14 @@ static void test_admin_create_topic(rd_kafka_t *use_rk, errstr, sizeof(errstr)); TEST_ASSERT(newt[0] != NULL, "%s", errstr); + if (configs) { + int i; + + for (i = 0; configs[i] && configs[i + 1]; i += 2) + TEST_CALL_ERR__(rd_kafka_NewTopic_set_config( + newt[0], configs[i], configs[i + 1])); + } + options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS); err = rd_kafka_AdminOptions_set_operation_timeout( options, timeout_ms, errstr, sizeof(errstr)); @@ -4651,7 +4665,7 @@ void test_create_topic(rd_kafka_t *use_rk, replication_factor); else test_admin_create_topic(use_rk, topicname, partition_cnt, - replication_factor); + replication_factor, NULL); } diff --git a/tests/test.h b/tests/test.h index fe170d55af..bbfd7a49e5 100644 --- a/tests/test.h +++ b/tests/test.h @@ -682,6 +682,11 @@ int test_partition_list_cmp(rd_kafka_topic_partition_list_t *al, rd_kafka_topic_partition_list_t *bl); void test_kafka_topics(const char *fmt, ...); +void test_admin_create_topic(rd_kafka_t *use_rk, + const char *topicname, + int partition_cnt, + int replication_factor, + const char **configs); void test_create_topic(rd_kafka_t *use_rk, const char *topicname, int partition_cnt, diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index 6fe10900e4..0c27ec0690 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -209,6 +209,7 @@ +