Skip to content

Commit

Permalink
Implement decoding Avro encoded message keys
Browse files Browse the repository at this point in the history
  • Loading branch information
fgrosse committed Apr 8, 2023
1 parent 6dbaac3 commit d967291
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 92 deletions.
2 changes: 1 addition & 1 deletion cmd/replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func (cmd *command) sendMessages(
outPartition, outOffset, err := destination.SendMessage(out)
if err != nil {
numErrors++
cmd.logger.Printf("Error: failed to send message to destination topic: %w", err)
cmd.logger.Printf("Error: failed to send message to destination topic: %v", err)
continue
}

Expand Down
95 changes: 58 additions & 37 deletions pkg/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,62 @@ func (d *AvroDecoder) UseAvroJSON() {
}

// Decode a message from Kafka into our own Message type.
func (d *AvroDecoder) Decode(msg *sarama.ConsumerMessage) (*Message, error) {
return newMessage(msg, func(value []byte) (any, error) {
if len(value) < 5 {
return nil, fmt.Errorf("need at least 5 bytes to decode Avro messages (got %d)", len(value))
}

schemaID := int(binary.BigEndian.Uint32(value[1:5]))
data := value[5:]

schema, err := d.registry.Schema(schemaID)
if err != nil {
return nil, fmt.Errorf("registry: %w", err)
}

var codec *goavro.Codec
if d.printAvroSchema {
codec, err = goavro.NewCodec(schema)
} else {
codec, err = goavro.NewCodecForStandardJSONFull(schema)
}

if err != nil {
return nil, fmt.Errorf("codec: %w", err)
}

decoded, _, err := codec.NativeFromBinary(data)
if err != nil {
return nil, fmt.Errorf("decode: %w", err)
}

avroJSON, err := codec.TextualFromNative(nil, decoded)
if err != nil {
return nil, fmt.Errorf("failed to convert value to avro JSON: %w", err)
}

return json.RawMessage(avroJSON), nil
})
func (d *AvroDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error) {
key, err := d.decode(kafkaMsg.Key)
if err != nil {
return nil, fmt.Errorf("decoding key: %w", err)
}

value, err := d.decode(kafkaMsg.Value)
if err != nil {
return nil, fmt.Errorf("decoding value: %w", err)
}

msg := NewMessage(kafkaMsg)
msg.Key = key
msg.Value = value

return msg, nil
}

func (d *AvroDecoder) decode(value []byte) (json.RawMessage, error) {
if len(value) < 5 {
return nil, fmt.Errorf("need at least 5 bytes to decode Avro messages (got %d)", len(value))
}

const magicByte byte = 0x0
if value[0] != magicByte {
return nil, fmt.Errorf("unknown magic byte")
}

schemaID := int(binary.BigEndian.Uint32(value[1:5]))
data := value[5:]

schema, err := d.registry.Schema(schemaID)
if err != nil {
return nil, fmt.Errorf("registry: %w", err)
}

var codec *goavro.Codec
if d.printAvroSchema {
codec, err = goavro.NewCodec(schema)
} else {
codec, err = goavro.NewCodecForStandardJSONFull(schema)
}

if err != nil {
return nil, fmt.Errorf("codec: %w", err)
}

decoded, _, err := codec.NativeFromBinary(data)
if err != nil {
return nil, fmt.Errorf("decode: %w", err)
}

avroJSON, err := codec.TextualFromNative(nil, decoded)
if err != nil {
return nil, fmt.Errorf("failed to convert value to avro JSON: %w", err)
}

return avroJSON, nil
}
59 changes: 38 additions & 21 deletions pkg/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pkg
import (
"encoding/binary"
"encoding/json"
"fmt"
"testing"
"time"

Expand All @@ -13,24 +14,37 @@ import (
)

func TestAvroDecoder(t *testing.T) {
schema := `{
"type": "record",
"namespace": "test",
"name" : "Record",
"fields" : [
{ "name" : "Name" , "type" : "string" },
{ "name" : "Age" , "type" : "int" }
]
}`

encoded := encodeAvro(t, schema, 123, map[string]any{
schemaRegistry := TestingSchemaRegistry{
1: `{
"type": "record",
"namespace": "test",
"name" : "Key",
"fields" : [
{ "name" : "id" , "type" : "string" }
]
}`,
2: `{
"type": "record",
"namespace": "test",
"name" : "Record",
"fields" : [
{ "name" : "Name" , "type" : "string" },
{ "name" : "Age" , "type" : "int" }
]
}`,
}

encodedKey := encodeAvro(t, schemaRegistry[1], 1, map[string]any{
"id": "e2901280-d602-11ed-99dc-9c2dcd849371",
})
encodedValue := encodeAvro(t, schemaRegistry[2], 2, map[string]any{
"Name": "John Doe",
"Age": 42,
})

msg := &sarama.ConsumerMessage{
Key: []byte("123"),
Value: encoded,
Key: encodedKey,
Value: encodedValue,
Topic: "test-topic",
Partition: 1,
Offset: 42,
Expand All @@ -42,7 +56,7 @@ func TestAvroDecoder(t *testing.T) {
}

expected := &Message{
Key: "123",
Key: json.RawMessage(`{"id":"e2901280-d602-11ed-99dc-9c2dcd849371"}`),
Topic: "test-topic",
Partition: 1,
Offset: 42,
Expand All @@ -51,7 +65,7 @@ func TestAvroDecoder(t *testing.T) {
}
expectedValue := `{ "Name":"John Doe", "Age":42 }`

d := NewAvroDecoder(TestingSchemaRegistry{schema})
d := NewAvroDecoder(schemaRegistry)
actual, err := d.Decode(msg)
require.NoError(t, err)

Expand All @@ -71,15 +85,18 @@ func encodeAvro(t *testing.T, schema string, id uint32, value any) []byte {
require.NoError(t, err)

idBuf := make([]byte, 5)
binary.BigEndian.PutUint32(idBuf, id)
binary.BigEndian.PutUint32(idBuf[1:], id)

return append(idBuf, encoded...)
}

type TestingSchemaRegistry struct {
schema string
}
type TestingSchemaRegistry map[int]string

func (r TestingSchemaRegistry) Schema(id int) (string, error) {
schema, ok := r[id]
if !ok {
return "", fmt.Errorf("unknown schema %d", id)
}

func (r TestingSchemaRegistry) Schema(int) (string, error) {
return r.schema, nil
return schema, nil
}
26 changes: 12 additions & 14 deletions pkg/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Message struct {
Offset int64
Headers map[string][]string
Timestamp time.Time
Key string
Key any
Value any
}

Expand Down Expand Up @@ -62,21 +62,19 @@ func NewTopicDecoder(topic string, conf Configuration) (Decoder, error) {
// The StringDecoder assumes that the values of all consumed messages are unicode strings.
type StringDecoder struct{}

func (d *StringDecoder) Decode(msg *sarama.ConsumerMessage) (*Message, error) {
return newMessage(msg, func(value []byte) (any, error) {
return string(value), nil
})
func (d *StringDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error) {
msg := NewMessage(kafkaMsg)
msg.Key = string(kafkaMsg.Key)
msg.Value = string(kafkaMsg.Value)
return msg, nil
}

func newMessage(m *sarama.ConsumerMessage, decodeValue func([]byte) (any, error)) (*Message, error) {
decoded, err := decodeValue(m.Value)
if err != nil {
return nil, err
}

// NewMessage creates a new Message from a given Kafka message.
// The Key and Value are copied into the Message as is (i.e. without decoding).
func NewMessage(m *sarama.ConsumerMessage) *Message {
msg := &Message{
Key: string(m.Key),
Value: decoded,
Key: m.Key,
Value: m.Value,
Topic: m.Topic,
Partition: m.Partition,
Offset: m.Offset,
Expand All @@ -89,5 +87,5 @@ func newMessage(m *sarama.ConsumerMessage, decodeValue func([]byte) (any, error)
msg.Headers[key] = append(msg.Headers[key], string(h.Value))
}

return msg, nil
return msg
}
48 changes: 29 additions & 19 deletions pkg/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,38 @@ func NewProtoDecoder(conf ProtoConfig) (*ProtoDecoder, error) {
return dec, nil
}

func (d *ProtoDecoder) Decode(msg *sarama.ConsumerMessage) (*Message, error) {
return newMessage(msg, func(value []byte) (any, error) {
protoMsg := dynamic.NewMessage(d.typ)
err := protoMsg.Unmarshal(msg.Value)
if err != nil {
return nil, fmt.Errorf("failed to decode message as proto %s: %w", d.typ.GetName(), err)
}
func (d *ProtoDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error) {
val, err := d.decode(kafkaMsg.Value)
if err != nil {
return nil, err
}

marshaler := &jsonpb.Marshaler{
OrigName: true,
EnumsAsInts: false,
EmitDefaults: true,
Indent: " ",
}
msg := NewMessage(kafkaMsg)
msg.Value = val

val, err := protoMsg.MarshalJSONPB(marshaler)
if err != nil {
return nil, fmt.Errorf("failed to re-encode message as JSON: %w", err)
}
return msg, nil
}

func (d *ProtoDecoder) decode(value []byte) (json.RawMessage, error) {
protoMsg := dynamic.NewMessage(d.typ)
err := protoMsg.Unmarshal(value)
if err != nil {
return nil, fmt.Errorf("failed to decode message as proto %s: %w", d.typ.GetName(), err)
}

marshaler := &jsonpb.Marshaler{
OrigName: true,
EnumsAsInts: false,
EmitDefaults: true,
Indent: " ",
}

val, err := protoMsg.MarshalJSONPB(marshaler)
if err != nil {
return nil, fmt.Errorf("failed to re-encode message as JSON: %w", err)
}

return json.RawMessage(val), nil
})
return val, nil
}

func NewProtoEncoder(conf ProtoConfig) (*ProtoEncoder, error) {
Expand Down

0 comments on commit d967291

Please sign in to comment.