Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose missing protocol parts #1049

Merged
merged 2 commits into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,14 +583,14 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu

switch records.recordsType {
case legacyRecords:
messageSetMessages, err := child.parseMessages(records.msgSet)
messageSetMessages, err := child.parseMessages(records.MsgSet)
if err != nil {
return nil, err
}

messages = append(messages, messageSetMessages...)
case defaultRecords:
recordBatchMessages, err := child.parseRecords(records.recordBatch)
recordBatchMessages, err := child.parseRecords(records.RecordBatch)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
records := newLegacyRecords(&MessageSet{})
frb.RecordsSet = []*Records{&records}
}
set := frb.RecordsSet[0].msgSet
set := frb.RecordsSet[0].MsgSet
set.Messages = append(set.Messages, msgBlock)
}

Expand All @@ -365,7 +365,7 @@ func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Enco
records := newDefaultRecords(&RecordBatch{Version: 2})
frb.RecordsSet = []*Records{&records}
}
batch := frb.RecordsSet[0].recordBatch
batch := frb.RecordsSet[0].RecordBatch
batch.addRecord(rec)
}

Expand All @@ -375,7 +375,7 @@ func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset
records := newDefaultRecords(&RecordBatch{Version: 2})
frb.RecordsSet = []*Records{&records}
}
batch := frb.RecordsSet[0].recordBatch
batch := frb.RecordsSet[0].RecordBatch
batch.LastOffsetDelta = offset
}

Expand Down
6 changes: 3 additions & 3 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestOneMessageFetchResponse(t *testing.T) {
if n != 1 {
t.Fatal("Decoding produced incorrect number of messages.")
}
msgBlock := block.RecordsSet[0].msgSet.Messages[0]
msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
if msgBlock.Offset != 0x550000 {
t.Error("Decoding produced incorrect message offset.")
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestOneRecordFetchResponse(t *testing.T) {
if n != 1 {
t.Fatal("Decoding produced incorrect number of records.")
}
rec := block.RecordsSet[0].recordBatch.Records[0]
rec := block.RecordsSet[0].RecordBatch.Records[0]
if !bytes.Equal(rec.Key, []byte{0x01, 0x02, 0x03, 0x04}) {
t.Error("Decoding produced incorrect record key.")
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestOneMessageFetchResponseV4(t *testing.T) {
if n != 1 {
t.Fatal("Decoding produced incorrect number of records.")
}
msgBlock := block.RecordsSet[0].msgSet.Messages[0]
msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
if msgBlock.Offset != 0x550000 {
t.Error("Decoding produced incorrect message offset.")
}
Expand Down
14 changes: 14 additions & 0 deletions offset_commit_request.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "errors"

// ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
// tells the broker to set the timestamp to the time at which the request was received.
// The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
Expand Down Expand Up @@ -188,3 +190,15 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i

r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
}

func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) {
partitions := r.blocks[topic]
if partitions == nil {
return 0, "", errors.New("No such offset")
}
block := partitions[partitionID]
if block == nil {
return 0, "", errors.New("No such offset")
}
return block.offset, block.metadata, nil
}
6 changes: 3 additions & 3 deletions produce_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
}
if metricRegistry != nil {
if r.Version >= 3 {
topicRecordCount += updateBatchMetrics(records.recordBatch, compressionRatioMetric, topicCompressionRatioMetric)
topicRecordCount += updateBatchMetrics(records.RecordBatch, compressionRatioMetric, topicCompressionRatioMetric)
} else {
topicRecordCount += updateMsgSetMetrics(records.msgSet, compressionRatioMetric, topicCompressionRatioMetric)
topicRecordCount += updateMsgSetMetrics(records.MsgSet, compressionRatioMetric, topicCompressionRatioMetric)
}
batchSize := int64(pe.offset() - startOffset)
batchSizeMetric.Update(batchSize)
Expand Down Expand Up @@ -231,7 +231,7 @@ func (r *ProduceRequest) ensureRecords(topic string, partition int32) {

func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
r.ensureRecords(topic, partition)
set := r.records[topic][partition].msgSet
set := r.records[topic][partition].MsgSet

if set == nil {
set = new(MessageSet)
Expand Down
18 changes: 9 additions & 9 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
rec := &Record{
Key: key,
Value: val,
TimestampDelta: timestamp.Sub(set.recordsToSend.recordBatch.FirstTimestamp),
TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp),
}
size += len(key) + len(val)
if len(msg.Headers) > 0 {
Expand All @@ -90,14 +90,14 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32
}
}
set.recordsToSend.recordBatch.addRecord(rec)
set.recordsToSend.RecordBatch.addRecord(rec)
} else {
msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
msgToSend.Timestamp = timestamp
msgToSend.Version = 1
}
set.recordsToSend.msgSet.addMessage(msgToSend)
set.recordsToSend.MsgSet.addMessage(msgToSend)
size = producerMessageOverhead + len(key) + len(val)
}

Expand All @@ -123,7 +123,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
for topic, partitionSet := range ps.msgs {
for partition, set := range partitionSet {
if req.Version >= 3 {
rb := set.recordsToSend.recordBatch
rb := set.recordsToSend.RecordBatch
if len(rb.Records) > 0 {
rb.LastOffsetDelta = int32(len(rb.Records) - 1)
for i, record := range rb.Records {
Expand All @@ -135,7 +135,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
continue
}
if ps.parent.conf.Producer.Compression == CompressionNone {
req.AddSet(topic, partition, set.recordsToSend.msgSet)
req.AddSet(topic, partition, set.recordsToSend.MsgSet)
} else {
// When compression is enabled, the entire set for each partition is compressed
// and sent as the payload of a single fake "message" with the appropriate codec
Expand All @@ -148,11 +148,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
// recompressing the message set.
// (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
// for details on relative offsets.)
for i, msg := range set.recordsToSend.msgSet.Messages {
for i, msg := range set.recordsToSend.MsgSet.Messages {
msg.Offset = int64(i)
}
}
payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry)
payload, err := encode(set.recordsToSend.MsgSet, ps.parent.conf.MetricRegistry)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
Expand All @@ -162,11 +162,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
Key: nil,
Value: payload,
Set: set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics
Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
compMsg.Version = 1
compMsg.Timestamp = set.recordsToSend.msgSet.Messages[0].Msg.Timestamp
compMsg.Timestamp = set.recordsToSend.MsgSet.Messages[0].Msg.Timestamp
}
req.AddMessage(topic, partition, compMsg)
}
Expand Down
4 changes: 2 additions & 2 deletions produce_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) {
t.Error("Wrong request version")
}

for _, msgBlock := range req.records["t1"][0].msgSet.Messages {
for _, msgBlock := range req.records["t1"][0].MsgSet.Messages {
msg := msgBlock.Msg
err := msg.decodeSet()
if err != nil {
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
t.Error("Wrong request version")
}

batch := req.records["t1"][0].recordBatch
batch := req.records["t1"][0].RecordBatch
if batch.FirstTimestamp != now {
t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
}
Expand Down
54 changes: 27 additions & 27 deletions records.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,30 @@ const (
// Records implements a union type containing either a RecordBatch or a legacy MessageSet.
type Records struct {
recordsType int
msgSet *MessageSet
recordBatch *RecordBatch
MsgSet *MessageSet
RecordBatch *RecordBatch
}

func newLegacyRecords(msgSet *MessageSet) Records {
return Records{recordsType: legacyRecords, msgSet: msgSet}
return Records{recordsType: legacyRecords, MsgSet: msgSet}
}

func newDefaultRecords(batch *RecordBatch) Records {
return Records{recordsType: defaultRecords, recordBatch: batch}
return Records{recordsType: defaultRecords, RecordBatch: batch}
}

// setTypeFromFields sets type of Records depending on which of msgSet or recordBatch is not nil.
// setTypeFromFields sets type of Records depending on which of MsgSet or RecordBatch is not nil.
// The first return value indicates whether both fields are nil (and the type is not set).
// If both fields are not nil, it returns an error.
func (r *Records) setTypeFromFields() (bool, error) {
if r.msgSet == nil && r.recordBatch == nil {
if r.MsgSet == nil && r.RecordBatch == nil {
return true, nil
}
if r.msgSet != nil && r.recordBatch != nil {
return false, fmt.Errorf("both msgSet and recordBatch are set, but record type is unknown")
if r.MsgSet != nil && r.RecordBatch != nil {
return false, fmt.Errorf("both MsgSet and RecordBatch are set, but record type is unknown")
}
r.recordsType = defaultRecords
if r.msgSet != nil {
if r.MsgSet != nil {
r.recordsType = legacyRecords
}
return false, nil
Expand All @@ -52,15 +52,15 @@ func (r *Records) encode(pe packetEncoder) error {

switch r.recordsType {
case legacyRecords:
if r.msgSet == nil {
if r.MsgSet == nil {
return nil
}
return r.msgSet.encode(pe)
return r.MsgSet.encode(pe)
case defaultRecords:
if r.recordBatch == nil {
if r.RecordBatch == nil {
return nil
}
return r.recordBatch.encode(pe)
return r.RecordBatch.encode(pe)
}

return fmt.Errorf("unknown records type: %v", r.recordsType)
Expand Down Expand Up @@ -89,11 +89,11 @@ func (r *Records) decode(pd packetDecoder) error {

switch r.recordsType {
case legacyRecords:
r.msgSet = &MessageSet{}
return r.msgSet.decode(pd)
r.MsgSet = &MessageSet{}
return r.MsgSet.decode(pd)
case defaultRecords:
r.recordBatch = &RecordBatch{}
return r.recordBatch.decode(pd)
r.RecordBatch = &RecordBatch{}
return r.RecordBatch.decode(pd)
}
return fmt.Errorf("unknown records type: %v", r.recordsType)
}
Expand All @@ -107,15 +107,15 @@ func (r *Records) numRecords() (int, error) {

switch r.recordsType {
case legacyRecords:
if r.msgSet == nil {
if r.MsgSet == nil {
return 0, nil
}
return len(r.msgSet.Messages), nil
return len(r.MsgSet.Messages), nil
case defaultRecords:
if r.recordBatch == nil {
if r.RecordBatch == nil {
return 0, nil
}
return len(r.recordBatch.Records), nil
return len(r.RecordBatch.Records), nil
}
return 0, fmt.Errorf("unknown records type: %v", r.recordsType)
}
Expand All @@ -131,15 +131,15 @@ func (r *Records) isPartial() (bool, error) {
case unknownRecords:
return false, nil
case legacyRecords:
if r.msgSet == nil {
if r.MsgSet == nil {
return false, nil
}
return r.msgSet.PartialTrailingMessage, nil
return r.MsgSet.PartialTrailingMessage, nil
case defaultRecords:
if r.recordBatch == nil {
if r.RecordBatch == nil {
return false, nil
}
return r.recordBatch.PartialTrailingRecord, nil
return r.RecordBatch.PartialTrailingRecord, nil
}
return false, fmt.Errorf("unknown records type: %v", r.recordsType)
}
Expand All @@ -155,10 +155,10 @@ func (r *Records) isControl() (bool, error) {
case legacyRecords:
return false, nil
case defaultRecords:
if r.recordBatch == nil {
if r.RecordBatch == nil {
return false, nil
}
return r.recordBatch.Control, nil
return r.RecordBatch.Control, nil
}
return false, fmt.Errorf("unknown records type: %v", r.recordsType)
}
Expand Down
8 changes: 4 additions & 4 deletions records_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func TestLegacyRecords(t *testing.T) {
if r.recordsType != legacyRecords {
t.Fatalf("Wrong records type %v, expected %v", r.recordsType, legacyRecords)
}
if !reflect.DeepEqual(set, r.msgSet) {
t.Errorf("Wrong decoding for legacy records, wanted %#+v, got %#+v", set, r.msgSet)
if !reflect.DeepEqual(set, r.MsgSet) {
t.Errorf("Wrong decoding for legacy records, wanted %#+v, got %#+v", set, r.MsgSet)
}

n, err := r.numRecords()
Expand Down Expand Up @@ -113,8 +113,8 @@ func TestDefaultRecords(t *testing.T) {
if r.recordsType != defaultRecords {
t.Fatalf("Wrong records type %v, expected %v", r.recordsType, defaultRecords)
}
if !reflect.DeepEqual(batch, r.recordBatch) {
t.Errorf("Wrong decoding for default records, wanted %#+v, got %#+v", batch, r.recordBatch)
if !reflect.DeepEqual(batch, r.RecordBatch) {
t.Errorf("Wrong decoding for default records, wanted %#+v, got %#+v", batch, r.RecordBatch)
}

n, err := r.numRecords()
Expand Down