Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Suggestion: Make high water marks immediately available in partition consumers #2056

Closed
joewreschnig opened this issue Nov 2, 2021 · 4 comments

Comments

@joewreschnig
Copy link
Contributor

joewreschnig commented Nov 2, 2021

Currently the high water mark of a partitionConsumer isn't filled in until it's processed at least one fetch response. This makes life hard for consumers e.g. reading compaction topics from the start that want to delay until they have reasonably "caught up" to the topic end, in the case when the topic is empty. You need to either detect this from the Client (which may not be available if you used e.g. NewConsumer or want your code to also work with mock consumers), or assume no HWM within some time span is an empty topic, or put a dummy first message in all such topics.

I think during chooseStartingOffset, the HWM could be filled in from the offsetNewest lookup. This would make it available by the time ConsumePartition returns.

@grongor
Copy link
Contributor

grongor commented Dec 5, 2021

Hello, I also encountered this issue and was surprised that it works this way, and just worked around it. But now, after reading your great description with a solution basically in there, I went ahead and created PR: #2082

@joewreschnig
Copy link
Contributor Author

Yes, that looks like what I had in mind, thanks.

One reason I hadn't bumped this further is that I was investigating how this interacts with the subtle difference between the log end offset and HWM: The LEO is the last message produced on the current leader, while the HWM is the last message replicated to all brokers. In theory if the broker dies the LEO might go backwards, while the HWM should be monotonic.

This introduces a case where broker failure during startup may cause the HWM reported by sarama to go backwards. This would actually be OK with me (the services we need this for also use acks=all so would re-produce any message dropped this way and the HWM would then match again), and it's better than having no information about the HWM at all. But I was also trying to figure out what would happen in chooseStartingOffset more generally if this happens because I think this revealed another unlikely bug. Try to consume from an LEO past the HWM, from reading the protocol documentation, would cause an error leading to the whole of ConsumePartition failing. But I didn't have time to verify this yet or think about a fix if it is a problem.

@grongor
Copy link
Contributor

grongor commented Dec 5, 2021

Good point. To be honest, I'm not that deep into Kafka, or Sarama, so...not sure. But when I look at this:

func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {
	broker, err := client.Leader(topic, partitionID)
	if err != nil {
		return -1, err
	}

	request := &OffsetRequest{}
	if client.conf.Version.IsAtLeast(V0_10_1_0) {
		request.Version = 1
	}
	request.AddBlock(topic, partitionID, time, 1)

	response, err := broker.GetAvailableOffsets(request)

... and this:

type OffsetRequest struct {
	Version        int16
	IsolationLevel IsolationLevel
	replicaID      int32
	isReplicaIDSet bool
	blocks         map[string]map[int32]*offsetRequestBlock
}

func (r *OffsetRequest) encode(pe packetEncoder) error {
	if r.isReplicaIDSet {
		pe.putInt32(r.replicaID)
	} else {
		// default replica ID is always -1 for clients
		pe.putInt32(-1)
	}

	if r.Version >= 2 {
		pe.putBool(r.IsolationLevel == ReadCommitted)
	}

then I would say that all we need to do is use v2 (or newer, if any) when applicable. WDY?

@joewreschnig
Copy link
Contributor Author

The ReadCommitted isolation level is only relevant for transactional producers/consumers, I think, not the general idea of data "committed" by completing replication. But in the end I'm fine with what got merged, we don't depend on absolutely monotonic HWMs especially in such an edge case. Thanks for doing the actual work! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants