Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: sending single message with no delay #3202

Closed
davinnfld opened this issue Jan 6, 2021 · 7 comments
Closed

Question: sending single message with no delay #3202

davinnfld opened this issue Jan 6, 2021 · 7 comments
Labels
Milestone

Comments

@davinnfld
Copy link

Description

Hello,

I have a use case where I need to send a single message with no (minimal) delay. I have tried setting linger.ms configuration setting to 0, and am calling produce() then immediately flush(-1) but sending still takes about 800ms. The message is only a few hundred bytes. I have not set any other configuration settings (beside bootstrap.servers)

librdkafka++ 1.5.0 on Redhat Enterprise Linux 7

Log output with "debug" "protocol"

%7|1609942113.294|CONNECTED|rdkafka#producer-1| [thrd:xxxx:9092/bootstrap]: xxxx:9092/bootstrap: Connected (#1)
%7|1609942113.294|FEATURE|rdkafka#producer-1| [thrd:xxxx:9092/bootstrap]: xxxx:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1609942113.294|SEND|rdkafka#producer-1| [thrd:xxxx:9092/bootstrap]: xxxx:9092/bootstrap: Sent ApiVersionRequest (v3, 40 bytes @ 0, CorrId 1)
%7|1609942113.295|RECV|rdkafka#producer-1| [thrd:xxxx:9092/bootstrap]: xxxx:9092/bootstrap: Received ApiVersionResponse (v3, 344 bytes, CorrId 1, rtt 0.50ms)
%7|1609942113.295|SEND|rdkafka#producer-1| [thrd:xxxx:9092/bootstrap]: xxxx:9092/bootstrap: Sent MetadataRequest (v4, 26 bytes @ 0, CorrId 2)
%7|1609942113.295|RECV|rdkafka#producer-1| [thrd:xxxx:9092/bootstrap]: xxxx:9092/bootstrap: Received MetadataResponse (v4, 109 bytes, CorrId 2, rtt 0.23ms)
2021-01-06 08:08:33.453 calling produce()
2021-01-06 08:08:33.453 calling flush()
%7|1609942114.293|SEND|rdkafka#producer-1| [thrd:xxxx:9092/bootstrap]: xxxx:9092/55: Sent MetadataRequest (v4, 53 bytes @ 0, CorrId 3)
%7|1609942114.293|RECV|rdkafka#producer-1| [thrd:xxxx:9092/bootstrap]: xxxx:9092/55: Received MetadataResponse (v4, 169 bytes, CorrId 3, rtt 0.35ms)
%7|1609942114.294|SEND|rdkafka#producer-1| [thrd:xxxx:9092/bootstrap]: xxxx:9092/55: Sent ProduceRequest (v7, 12392 bytes @ 0, CorrId 4)
%7|1609942114.294|RECV|rdkafka#producer-1| [thrd:xxxx:9092/bootstrap]: xxxx:9092/55: Received ProduceResponse (v7, 69 bytes, CorrId 4, rtt 0.67ms)
%7|1609942116.300|DESTROY|rdkafka#producer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1609942116.300|DESTROY|rdkafka#producer-1| [thrd:main]: Destroy internal
%7|1609942116.300|DESTROY|rdkafka#producer-1| [thrd:main]: Removing all topics

Many Thanks

@gridaphobe
Copy link
Contributor

Can you try reproducing against master or v1.6.0-RC1? There was a recent patch that might be related.

@davinnfld
Copy link
Author

Hi,

I tried v1.6.0-RC1 but am still experiencing the same problem. However, If I send a small message before this message then the message gets sent immediately. So that is the workaround that I have in place for now.

Attached is the log output from v1.6.0-RC1 without sending the a message before, and then the log output without sending a message before.

Thanks

With sending small message first:

%7|1610113305.210|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v1.6.0 (0x10600ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer, CC CXX PKGCONFIG INSTALL GNULD LIBDL PLUGINS ZLIB SSL SASL_CYRUS HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x80)
2021-01-08 07:41:45.210270 wrote msg: bytes: 2
2021-01-08 07:41:45.210287 flushing
%7|1610113305.211|CONNECTED|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Connected (#1)
%7|1610113305.211|FEATURE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1610113305.211|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Sent ApiVersionRequest (v3, 40 bytes @ 0, CorrId 1)
%7|1610113305.211|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Received ApiVersionResponse (v3, 344 bytes, CorrId 1, rtt 0.51ms)
%7|1610113305.211|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Sent MetadataRequest (v4, 53 bytes @ 0, CorrId 2)
%7|1610113305.212|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Received MetadataResponse (v4, 169 bytes, CorrId 2, rtt 0.29ms)
%7|1610113305.212|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Sent ProduceRequest (v7, 148 bytes @ 0, CorrId 3)
%7|1610113305.212|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Received ProduceResponse (v7, 69 bytes, CorrId 3, rtt 0.28ms)
2021-01-08 07:41:45.212642 % Message delivered to topic test.topic [0] at offset 401
Running with 20210101 20210108
2021-01-08 07:41:45.340987 wrote msg: bytes: 527
2021-01-08 07:41:45.341040 flushing
%7|1610113305.341|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Sent ProduceRequest (v7, 700 bytes @ 0, CorrId 4)
%7|1610113305.341|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Received ProduceResponse (v7, 69 bytes, CorrId 4, rtt 0.32ms)
2021-01-08 07:41:45.341794 % Message delivered to topic test.topic [0] at offset 402
Runner thread exiting
Flushing kafka queue...
%7|1610113307.342|DESTROY|rdkafka#producer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1610113307.342|DESTROY|rdkafka#producer-1| [thrd:main]: Destroy internal
%7|1610113307.342|DESTROY|rdkafka#producer-1| [thrd:main]: Removing all topics
%7|1610113307.342|BRKTERM|rdkafka#producer-1| [thrd:server2:9092/21]: server2:9092/21: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1610113307.342|BRKTERM|rdkafka#producer-1| [thrd:server3:9092/56]: server3:9092/56: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1610113307.342|BRKTERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1610113307.342|BRKTERM|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)


without first msg

%7|1610113064.442|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v1.6.0 (0x10600ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer, CC CXX PKGCONFIG INSTALL GNULD LIBDL PLUGINS ZLIB SSL SASL_CYRUS HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x80)
Running with 20210101 20210108
%7|1610113064.444|CONNECTED|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Connected (#1)
%7|1610113064.444|FEATURE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1610113064.444|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Sent ApiVersionRequest (v3, 40 bytes @ 0, CorrId 1)
%7|1610113064.444|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Received ApiVersionResponse (v3, 344 bytes, CorrId 1, rtt 0.47ms)
%7|1610113064.444|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Sent MetadataRequest (v4, 26 bytes @ 0, CorrId 2)
%7|1610113064.445|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Received MetadataResponse (v4, 109 bytes, CorrId 2, rtt 0.28ms)
2021-01-08 07:37:44.620276 wrote msg: bytes: 527
2021-01-08 07:37:44.620319 flushing
%7|1610113065.442|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Sent MetadataRequest (v4, 53 bytes @ 0, CorrId 3)
%7|1610113065.442|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Received MetadataResponse (v4, 169 bytes, CorrId 3, rtt 0.32ms)
%7|1610113065.443|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Sent ProduceRequest (v7, 700 bytes @ 0, CorrId 4)
%7|1610113065.443|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Received ProduceResponse (v7, 69 bytes, CorrId 4, rtt 0.27ms)
2021-01-08 07:37:45.443620 % Message delivered to topic test.topic [0] at offset 398
Runner thread exiting
Flushing kafka queue...
%7|1610113067.443|DESTROY|rdkafka#producer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1610113067.443|DESTROY|rdkafka#producer-1| [thrd:main]: Destroy internal
%7|1610113067.443|DESTROY|rdkafka#producer-1| [thrd:main]: Removing all topics
%7|1610113067.444|BRKTERM|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1610113067.444|BRKTERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1610113067.444|BRKTERM|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1610113067.444|BRKTERM|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)

@edenhill
Copy link
Contributor

edenhill commented Jan 8, 2021

Can you repro this with debug=all please?

@davinnfld
Copy link
Author

Sure,

%7|1610118981.203|WAKEUPFD|rdkafka#producer-1| [thrd:app]: server1:9092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1610118981.203|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1610118981.203|BROKER|rdkafka#producer-1| [thrd:app]: server1:9092/bootstrap: Added new broker with NodeId -1
%7|1610118981.203|CONNECT|rdkafka#producer-1| [thrd:app]: server1:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1610118981.203|BRKMAIN|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enter main broker thread
%7|1610118981.203|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v1.6.0 (0x10600ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer, CC CXX PKGCONFIG INSTALL GNULD LIBDL PLUGINS ZLIB SSL SASL_CYRUS HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0xfffff)
%7|1610118981.203|CONNECT|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Received CONNECT op
%7|1610118981.203|STATE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1610118981.203|BROADCAST|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: Broadcasting state change
%7|1610118981.203|CONNECT|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1610118981.204|STATE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1610118981.204|CONF|rdkafka#producer-1| [thrd:app]: Client configuration:
%7|1610118981.204|CONF|rdkafka#producer-1| [thrd:app]: client.software.version = 1.6.0
%7|1610118981.204|CONF|rdkafka#producer-1| [thrd:app]: metadata.broker.list = server1:9092
%7|1610118981.204|CONF|rdkafka#producer-1| [thrd:app]: debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,all
%7|1610118981.204|CONF|rdkafka#producer-1| [thrd:app]: opaque = 0x25fb898
%7|1610118981.204|CONF|rdkafka#producer-1| [thrd:app]: ssl_key = [redacted]
%7|1610118981.204|CONF|rdkafka#producer-1| [thrd:app]: queue.buffering.max.ms = 0
%7|1610118981.204|CONF|rdkafka#producer-1| [thrd:app]: dr_msg_cb = 0x445f80
%7|1610118981.204|BROADCAST|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: Broadcasting state change
%7|1610118981.205|CONNECT|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Connecting to ipv4#10.6.12.55:9092 (plaintext) with socket 7
%7|1610118981.205|CONNECT|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Connected to ipv4#10.6.12.55:9092
%7|1610118981.205|CONNECTED|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Connected (#1)
%7|1610118981.205|FEATURE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1610118981.205|STATE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1610118981.205|BROADCAST|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: Broadcasting state change
%7|1610118981.205|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Sent ApiVersionRequest (v3, 40 bytes @ 0, CorrId 1)
%7|1610118981.206|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Received ApiVersionResponse (v3, 344 bytes, CorrId 1, rtt 0.53ms)
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Broker API support:
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey Produce (0) Versions 0..8
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey Fetch (1) Versions 0..11
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey Offset (2) Versions 0..5
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey Metadata (3) Versions 0..9
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey LeaderAndIsr (4) Versions 0..4
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey StopReplica (5) Versions 0..2
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey UpdateMetadata (6) Versions 0..6
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey ControlledShutdown (7) Versions 0..3
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey OffsetCommit (8) Versions 0..8
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey OffsetFetch (9) Versions 0..6
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey FindCoordinator (10) Versions 0..3
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey JoinGroup (11) Versions 0..6
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey Heartbeat (12) Versions 0..4
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey LeaveGroup (13) Versions 0..4
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey SyncGroup (14) Versions 0..4
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey DescribeGroups (15) Versions 0..5
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey ListGroups (16) Versions 0..3
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey SaslHandshake (17) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey ApiVersion (18) Versions 0..3
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey CreateTopics (19) Versions 0..5
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey DeleteTopics (20) Versions 0..4
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey DeleteRecords (21) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey InitProducerId (22) Versions 0..2
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey OffsetForLeaderEpoch (23) Versions 0..3
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey AddPartitionsToTxn (24) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey AddOffsetsToTxn (25) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey EndTxn (26) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey WriteTxnMarkers (27) Versions 0..0
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey TxnOffsetCommit (28) Versions 0..2
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey DescribeAcls (29) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey CreateAcls (30) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey DeleteAcls (31) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey DescribeConfigs (32) Versions 0..2
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey AlterConfigs (33) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey AlterReplicaLogDirs (34) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey DescribeLogDirs (35) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey SaslAuthenticate (36) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey CreatePartitions (37) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey CreateDelegationToken (38) Versions 0..2
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey RenewDelegationToken (39) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey ExpireDelegationToken (40) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey DescribeDelegationToken (41) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey DeleteGroups (42) Versions 0..2
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey ElectLeadersRequest (43) Versions 0..2
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey IncrementalAlterConfigsRequest (44) Versions 0..1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey AlterPartitionReassignmentsRequest (45) Versions 0..0
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey ListPartitionReassignmentsRequest (46) Versions 0..0
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: ApiKey OffsetDeleteRequest (47) Versions 0..0
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature MsgVer1: Produce (2..2) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature MsgVer1: Fetch (2..2) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature MsgVer1
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature MsgVer2: Produce (3..3) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature MsgVer2: Fetch (4..4) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature MsgVer2
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature ApiVersion: ApiVersion (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature ApiVersion
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature BrokerGroupCoordinator: FindCoordinator (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature BrokerGroupCoordinator
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature BrokerBalancedConsumer: FindCoordinator (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature BrokerBalancedConsumer: OffsetCommit (1..2) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature BrokerBalancedConsumer: OffsetFetch (1..1) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature BrokerBalancedConsumer: JoinGroup (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature BrokerBalancedConsumer: SyncGroup (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature BrokerBalancedConsumer: Heartbeat (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature BrokerBalancedConsumer: LeaveGroup (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature BrokerBalancedConsumer
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature ThrottleTime: Produce (1..2) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature ThrottleTime: Fetch (1..2) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature ThrottleTime
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature Sasl: JoinGroup (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature Sasl
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature SaslHandshake: SaslHandshake (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature SaslHandshake
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature LZ4: FindCoordinator (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature LZ4
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature OffsetTime: Offset (1..1) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature OffsetTime
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature IdempotentProducer: InitProducerId (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature IdempotentProducer
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature ZSTD: Produce (7..7) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature ZSTD: Fetch (10..10) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature ZSTD
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature SaslAuthReq: SaslHandshake (1..1) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Feature SaslAuthReq: SaslAuthenticate (0..0) supported by broker
%7|1610118981.206|APIVERSION|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Enabling feature SaslAuthReq
%7|1610118981.206|FEATURE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
%7|1610118981.206|STATE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP
%7|1610118981.206|BROADCAST|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: Broadcasting state change
%7|1610118981.206|METADATA|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Request metadata for brokers only: connected
%7|1610118981.206|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Sent MetadataRequest (v4, 26 bytes @ 0, CorrId 2)
%7|1610118981.207|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: Received MetadataResponse (v4, 109 bytes, CorrId 2, rtt 0.24ms)
%7|1610118981.207|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/bootstrap: ===== Received metadata: connected =====
%7|1610118981.207|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/bootstrap: ClusterId: cE4M4HfwRJOsZqbLqBOaOA, ControllerId: 55
%7|1610118981.207|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/bootstrap: 3 brokers, 0 topics
%7|1610118981.207|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/bootstrap: Broker #0/3: server2:9092 NodeId 56
%7|1610118981.207|WAKEUPFD|rdkafka#producer-1| [thrd:main]: server2:9092/56: Enabled low-latency ops queue wake-ups
%7|1610118981.207|BROKER|rdkafka#producer-1| [thrd:main]: server2:9092/56: Added new broker with NodeId 56
%7|1610118981.207|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/bootstrap: Broker #1/3: server3:9092 NodeId 21
%7|1610118981.207|WAKEUPFD|rdkafka#producer-1| [thrd:main]: server3:9092/21: Enabled low-latency ops queue wake-ups
%7|1610118981.207|BRKMAIN|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Enter main broker thread
%7|1610118981.207|BROKER|rdkafka#producer-1| [thrd:main]: server3:9092/21: Added new broker with NodeId 21
%7|1610118981.207|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/bootstrap: Broker #2/3: server1:9092 NodeId 55
%7|1610118981.207|BRKMAIN|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Enter main broker thread
%7|1610118981.207|CLUSTERID|rdkafka#producer-1| [thrd:main]: server1:9092/bootstrap: ClusterId update "" -> "cE4M4HfwRJOsZqbLqBOaOA"
%7|1610118981.207|CONTROLLERID|rdkafka#producer-1| [thrd:main]: server1:9092/bootstrap: ControllerId update -1 -> 55
%7|1610118981.207|BROADCAST|rdkafka#producer-1| [thrd:main]: Broadcasting state change
%7|1610118981.207|UPDATE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/bootstrap: NodeId changed from -1 to 55
%7|1610118981.207|UPDATE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Name changed from server1:9092/bootstrap to server1:9092/55
%7|1610118981.207|LEADER|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Mapped 0 partition(s) to broker
%7|1610118981.207|STATE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Broker changed state UP -> UPDATE
%7|1610118981.207|BROADCAST|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: Broadcasting state change
%7|1610118981.207|BROADCAST|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: Broadcasting state change
%7|1610118981.207|STATE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Broker changed state UPDATE -> UP
%7|1610118981.207|BROADCAST|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: Broadcasting state change
%7|1610118981.395|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: test.topic
%7|1610118981.395|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW test.topic [-1] 0x268b470 refcnt 0x268b500 (at rd_kafka_topic_new0:465)
2021-01-08 09:16:21.396087 wrote msg: bytes: 527
2021-01-08 09:16:21.396117 flushing(-1)
%7|1610118982.203|NOINFO|rdkafka#producer-1| [thrd:main]: Topic test.topic metadata information unknown
%7|1610118982.203|NOINFO|rdkafka#producer-1| [thrd:main]: Topic test.topic partition count is zero: should refresh metadata
%7|1610118982.203|METADATA|rdkafka#producer-1| [thrd:main]: Requesting metadata for 1/1 topics: refresh unavailable topics
%7|1610118982.203|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/55: Request metadata for 1 topic(s): refresh unavailable topics
%7|1610118982.204|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Sent MetadataRequest (v4, 53 bytes @ 0, CorrId 3)
%7|1610118982.204|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Received MetadataResponse (v4, 169 bytes, CorrId 3, rtt 0.44ms)
%7|1610118982.204|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/55: ===== Received metadata (for 1 requested topics): refresh unavailable topics =====
%7|1610118982.204|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/55: ClusterId: cE4M4HfwRJOsZqbLqBOaOA, ControllerId: 55
%7|1610118982.204|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/55: 3 brokers, 1 topics
%7|1610118982.204|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/55: Broker #0/3: server2:9092 NodeId 56
%7|1610118982.204|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/55: Broker #1/3: server3:9092 NodeId 21
%7|1610118982.204|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/55: Broker #2/3: server1:9092 NodeId 55
%7|1610118982.204|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/55: Topic #0/1: test.topic with 1 partitions
%7|1610118982.204|STATE|rdkafka#producer-1| [thrd:main]: Topic test.topic changed state unknown -> exists
%7|1610118982.204|PARTCNT|rdkafka#producer-1| [thrd:main]: Topic test.topic partition count changed from 0 to 1
%7|1610118982.204|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW test.topic [0] 0x2705d90 refcnt 0x2705e20 (at rd_kafka_topic_partition_cnt_update:798)
%7|1610118982.204|METADATA|rdkafka#producer-1| [thrd:main]: Topic test.topic partition 0 Leader 55
%7|1610118982.204|BRKDELGT|rdkafka#producer-1| [thrd:main]: test.topic [0]: delegate to broker server1:9092/55 (rktp 0x2705d90, term 0, ref 2)
%7|1610118982.204|BRKDELGT|rdkafka#producer-1| [thrd:main]: test.topic [0]: delegating to broker server1:9092/55 for partition with 0 messages (0 bytes) queued
%7|1610118982.204|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic test.topic [0] 0x2705d90 from (none) to server1:9092/55 (sending PARTITION_JOIN to server1:9092/55)
%7|1610118982.204|PARTCNT|rdkafka#producer-1| [thrd:main]: Partitioning 1 unassigned messages in topic test.topic to 1 partitions
%7|1610118982.204|UAS|rdkafka#producer-1| [thrd:main]: 1/1 messages were partitioned in topic test.topic
%7|1610118982.204|METADATA|rdkafka#producer-1| [thrd:main]: server1:9092/55: 1/1 requested topic(s) seen in metadata
%7|1610118982.204|TOPBRK|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Topic test.topic [0]: joining broker (rktp 0x2705d90, 1 message(s) queued)
%7|1610118982.204|FETCHADD|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Added test.topic [0] to active list (1 entries, opv 0, 1 messages queued): joining
%7|1610118982.204|BROADCAST|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: Broadcasting state change
%7|1610118982.204|TOPPAR|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: test.topic [0] 1 message(s) in xmit queue (1 added from partition queue)
%7|1610118982.204|PRODUCE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: test.topic [0]: Produce MessageSet with 1 message(s) (628 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
%7|1610118982.204|SEND|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Sent ProduceRequest (v7, 700 bytes @ 0, CorrId 4)
%7|1610118982.205|RECV|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Received ProduceResponse (v7, 69 bytes, CorrId 4, rtt 0.49ms)
%7|1610118982.205|MSGSET|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: test.topic [0]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered
2021-01-08 09:16:22.205191 % Message delivered to topic test.topic [0] at offset 415
Runner thread exiting
Flushing kafka queue...
%7|1610118984.205|DESTROY|rdkafka#producer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1610118984.205|TERMINATE|rdkafka#producer-1| [thrd:app]: Interrupting timers
%7|1610118984.205|TERMINATE|rdkafka#producer-1| [thrd:app]: Sending TERMINATE to internal main thread
%7|1610118984.205|TERMINATE|rdkafka#producer-1| [thrd:app]: Joining internal main thread
%7|1610118984.205|TERMINATE|rdkafka#producer-1| [thrd:main]: Internal main thread terminating
%7|1610118984.205|DESTROY|rdkafka#producer-1| [thrd:main]: Destroy internal
%7|1610118984.205|BROADCAST|rdkafka#producer-1| [thrd:main]: Broadcasting state change
%7|1610118984.205|DESTROY|rdkafka#producer-1| [thrd:main]: Removing all topics
%7|1610118984.205|PARTCNT|rdkafka#producer-1| [thrd:main]: Topic test.topic partition count changed from 1 to 0
%7|1610118984.205|REMOVE|rdkafka#producer-1| [thrd:main]: test.topic [0] no longer reported in metadata
%7|1610118984.205|BRKMIGR|rdkafka#producer-1| [thrd:main]: test.topic [0] 0x2705d90 sending final LEAVE for removal by server1:9092/55
%7|1610118984.205|TOPPARREMOVE|rdkafka#producer-1| [thrd:main]: Removing toppar test.topic [-1] 0x268b470
%7|1610118984.205|DESTROY|rdkafka#producer-1| [thrd:main]: test.topic [-1]: 0x268b470 DESTROY_FINAL
%7|1610118984.205|DESTROY|rdkafka#producer-1| [thrd:main]: Sending TERMINATE to server3:9092/21
%7|1610118984.205|DESTROY|rdkafka#producer-1| [thrd:main]: Sending TERMINATE to server2:9092/56
%7|1610118984.205|DESTROY|rdkafka#producer-1| [thrd:main]: Sending TERMINATE to server1:9092/55
%7|1610118984.205|TERMINATE|rdkafka#producer-1| [thrd:main]: Purging reply queue
%7|1610118984.205|TERMINATE|rdkafka#producer-1| [thrd:main]: Decommissioning internal broker
%7|1610118984.205|TOPBRK|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Topic test.topic [0]: leaving broker (0 messages in xmitq, next broker (none), rktp 0x2705d90)
%7|1610118984.205|FETCHADD|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Removed test.topic [0] from active list (0 entries, opv 0): leaving
%7|1610118984.205|TOPBRK|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Topic test.topic [0]: no next broker, failing 0 message(s) in partition queue
%7|1610118984.205|BROADCAST|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: Broadcasting state change
%7|1610118984.205|TERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1610118984.205|TERMINATE|rdkafka#producer-1| [thrd:main]: Join 4 broker thread(s)
%7|1610118984.205|TERM|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1610118984.205|TERM|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1610118984.205|TOPPARREMOVE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: Removing toppar test.topic [0] 0x2705d90
%7|1610118984.205|FAIL|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Client is terminating (after 2998ms in state INIT) (_DESTROY)
%7|1610118984.205|FAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Client is terminating (after 3001ms in state INIT) (_DESTROY)
%7|1610118984.205|FAIL|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Client is terminating (after 2998ms in state INIT) (_DESTROY)
%7|1610118984.205|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
%7|1610118984.205|STATE|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Broker changed state INIT -> DOWN
%7|1610118984.205|DESTROY|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: test.topic [0]: 0x2705d90 DESTROY_FINAL
%7|1610118984.205|STATE|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Broker changed state INIT -> DOWN
%7|1610118984.205|BROADCAST|rdkafka#producer-1| [thrd::0/internal]: Broadcasting state change
%7|1610118984.205|BROADCAST|rdkafka#producer-1| [thrd:server2:9092/56]: Broadcasting state change
%7|1610118984.205|BROADCAST|rdkafka#producer-1| [thrd:server3:9092/21]: Broadcasting state change
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Purging bufq with 0 buffers
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Purging bufq with 0 buffers
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Updating 0 buffers on connection reset
%7|1610118984.205|BRKTERM|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Purging bufq with 0 buffers
%7|1610118984.205|TERMINATE|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Handle is terminating in state DOWN: 1 refcnts (0x260e7c0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Purging bufq with 0 buffers
%7|1610118984.205|TERM|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Received TERMINATE op in state UP: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
%7|1610118984.205|BRKTERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1610118984.205|FAIL|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Purging bufq with 0 buffers
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Purging bufq with 0 buffers
%7|1610118984.205|FAIL|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Client is terminating (after 2998ms in state UP) (_DESTROY)
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server3:9092/21]: server3:9092/21: Updating 0 buffers on connection reset
%7|1610118984.205|TERMINATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x2604720), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Updating 0 buffers on connection reset
%7|1610118984.205|BRKTERM|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1610118984.205|FAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
%7|1610118984.205|TERMINATE|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Handle is terminating in state DOWN: 1 refcnts (0x260dba0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1610118984.205|STATE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Broker changed state UP -> DOWN
%7|1610118984.205|FAIL|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Purging bufq with 0 buffers
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Purging bufq with 0 buffers
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server2:9092/56]: server2:9092/56: Updating 0 buffers on connection reset
%7|1610118984.205|BROADCAST|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: Broadcasting state change
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Purging bufq with 0 buffers
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Purging bufq with 0 buffers
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Updating 0 buffers on connection reset
%7|1610118984.205|BRKTERM|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1610118984.205|TERMINATE|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Handle is terminating in state DOWN: 1 refcnts (0x2605690), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1610118984.205|FAIL|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1610118984.205|BUFQ|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Purging bufq with 0 buffers
%7|1610118984.206|BUFQ|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Purging bufq with 0 buffers
%7|1610118984.206|BUFQ|rdkafka#producer-1| [thrd:server1:9092/bootstrap]: server1:9092/55: Updating 0 buffers on connection reset
%7|1610118984.206|TERMINATE|rdkafka#producer-1| [thrd:main]: Internal main thread termination done
%7|1610118984.206|TERMINATE|rdkafka#producer-1| [thrd:app]: Destroying op queues
%7|1610118984.206|TERMINATE|rdkafka#producer-1| [thrd:app]: Termination done: freeing resources

@edenhill
Copy link
Contributor

edenhill commented Jan 8, 2021

Thank you.

The issue is that in certain situations unknown topics are looked up once per second, and this is why you're seeing the delay here.

We'll fix this after the v1.6.0 release.

As a workaround you can query the topic metadata once on startup, like so:

rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, "mytopic", NULL,..);
rd_kafka_metadata_t *md;
if (rd_kafka_metadata(rk, rkt, 0, rkt, &md, 10000) == RD_KAFKA_RESP_ERR_NO_ERROR)
    rd_kafka_metadata_destroy(md);

@davinnfld
Copy link
Author

Great, Thanks!

@laurglia
Copy link

laurglia commented May 29, 2024

Hi @edenhill, just want to confirm if this was actually fixed since I'm seeing similar behavior still on librdkafka 2.3.

I can see that the commit you pushed 4503dcf was never merged and looking at the main branch, I cannot see these changes there. Also, there's no mention of this in the v1.7.0 changelog which leads me to believe that the issue was not actually fixed.

Edit: I tried to apply 4503dcf to librdkafka 2.3.0 (the version I was using) and now the issue is gone.

laurglia added a commit to salemove/karafka-rdkafka that referenced this issue May 29, 2024
There is a bug in librdkafka where producing the very first message into
a topic always takes around 1 second.

This happens due to bug that was first raised in this GitHub issue:
confluentinc/librdkafka#3202

The maintainer proposed a fix for this problem and said it was going to
be fixed starting librdkafka 1.7.0, but it seems the fix was actually
never merged since the problem still existed. I think that the
maintainer mixed something up there because they actually closed the
issue, perhaps they forgot to merge the fix or something similar.

This commit makes rdkafka use our fork of librdkafka 2.3.0. This fork is
almost identical to the regular 2.3.0 but it has the same fix applied
that the maintainer proposed.

You can find the commit here:
salemove/librdkafka@71eea91

XT-59
JoonasHalapuu pushed a commit to salemove/karafka-rdkafka that referenced this issue Feb 21, 2025
There is a bug in librdkafka where producing the very first message into
a topic always takes around 1 second.

This happens due to bug that was first raised in this GitHub issue:
confluentinc/librdkafka#3202

The maintainer proposed a fix for this problem and said it was going to
be fixed starting librdkafka 1.7.0, but it seems the fix was actually
never merged since the problem still existed. I think that the
maintainer mixed something up there because they actually closed the
issue, perhaps they forgot to merge the fix or something similar.

This commit makes rdkafka use our fork of librdkafka 2.3.0. This fork is
almost identical to the regular 2.3.0 but it has the same fix applied
that the maintainer proposed.

You can find the commit here:
salemove/librdkafka@71eea91

XT-59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants