-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature Suggestion: Make high water marks immediately available in partition consumers #2056
Comments
Hello, I also encountered this issue and was surprised that it works this way, and just worked around it. But now, after reading your great description with a solution basically in there, I went ahead and created PR: #2082 |
Yes, that looks like what I had in mind, thanks. One reason I hadn't bumped this further is that I was investigating how this interacts with the subtle difference between the log end offset and HWM: The LEO is the last message produced on the current leader, while the HWM is the last message replicated to all brokers. In theory if the broker dies the LEO might go backwards, while the HWM should be monotonic. This introduces a case where broker failure during startup may cause the HWM reported by sarama to go backwards. This would actually be OK with me (the services we need this for also use |
Good point. To be honest, I'm not that deep into Kafka, or Sarama, so...not sure. But when I look at this: func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {
broker, err := client.Leader(topic, partitionID)
if err != nil {
return -1, err
}
request := &OffsetRequest{}
if client.conf.Version.IsAtLeast(V0_10_1_0) {
request.Version = 1
}
request.AddBlock(topic, partitionID, time, 1)
response, err := broker.GetAvailableOffsets(request) ... and this: type OffsetRequest struct {
Version int16
IsolationLevel IsolationLevel
replicaID int32
isReplicaIDSet bool
blocks map[string]map[int32]*offsetRequestBlock
}
func (r *OffsetRequest) encode(pe packetEncoder) error {
if r.isReplicaIDSet {
pe.putInt32(r.replicaID)
} else {
// default replica ID is always -1 for clients
pe.putInt32(-1)
}
if r.Version >= 2 {
pe.putBool(r.IsolationLevel == ReadCommitted)
} then I would say that all we need to do is use v2 (or newer, if any) when applicable. WDY? |
The |
Currently the high water mark of a
partitionConsumer
isn't filled in until it's processed at least one fetch response. This makes life hard for consumers e.g. reading compaction topics from the start that want to delay until they have reasonably "caught up" to the topic end, in the case when the topic is empty. You need to either detect this from theClient
(which may not be available if you used e.g.NewConsumer
or want your code to also work with mock consumers), or assume no HWM within some time span is an empty topic, or put a dummy first message in all such topics.I think during
chooseStartingOffset
, the HWM could be filled in from theoffsetNewest
lookup. This would make it available by the timeConsumePartition
returns.The text was updated successfully, but these errors were encountered: