Skip to content

Commit

Permalink
Merge pull request #8 from fgrosse/confluent-kafka-go
Browse files Browse the repository at this point in the history
Use github.com/confluentinc/confluent-kafka-go/v2/schemaregistry
  • Loading branch information
fgrosse authored Apr 9, 2023
2 parents f3f12cd + ee10a64 commit a32882a
Show file tree
Hide file tree
Showing 9 changed files with 2,042 additions and 77 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Integration with the [Confluent Schema Registry]
- `kafkactl consume`: Support decoding Avro messages
- `kafkactl get message`: Support decoding Avro messages
- `kafkactl get topic`: Treat topics with single `_` prefix as internal (instead of double `_` prefix)
Expand All @@ -16,3 +17,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[Unreleased]: https://github.com/fgrosse/kafkactl/compare/v1.0.0...HEAD
[v1.0.0]: https://github.com/fgrosse/kafkactl/releases/tag/v1.0.0

[schema-registry]: https://docs.confluent.io/platform/current/schema-registry/index.html
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ This incarnation of `kafkactl` was created at [Fraugster][fraugster] in Septembe
It was a useful tool for many years and we decided to keep it around even after Fraugster
ceased to exist, mainly because we are very used to it and maybe for sentimental reasons.

Other `kafkactl` implementations come with similar features (e.g. Protobuf support,
Other `kafkactl` implementations come with similar features (e.g. Protobuf & Avro support,
managing configuration with kubectl-like contexts). We list them here, so you can
pick the tool that serves your use case best:

Expand All @@ -150,6 +150,8 @@ pick the tool that serves your use case best:
## Built With

* [sarama](https://github.com/Shopify/sarama) - a Go library for Apache Kafka
* [confluent-kafka-go](github.com/confluentinc/confluent-kafka-go) - Confluent's Apache Kafka Golang client
* [goavro](github.com/linkedin/goavro) - a library that encodes and decodes Avro data
* [cobra](https://github.com/spf13/cobra) - a library to build powerful CLI applications
* [viper](https://github.com/spf13/viper) - configuration with fangs
* [protoreflect](https://github.com/jhump/protoreflect) - reflection for Go Protocol Buffers
Expand Down
18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ go 1.19

require (
github.com/Shopify/sarama v1.21.0
github.com/confluentinc/confluent-kafka-go/v2 v2.1.0
github.com/fatih/color v1.13.0
github.com/fgrosse/cli v0.0.0-20190104120053-ecd919dde986
github.com/golang/protobuf v1.5.2
github.com/golang/protobuf v1.5.3
github.com/jhump/protoreflect v1.14.1
github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891
github.com/linkedin/goavro/v2 v2.12.0
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.6.1
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.2
golang.org/x/exp v0.0.0-20230210204819-062eb4c674ab
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -30,7 +30,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
Expand All @@ -41,10 +41,10 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
google.golang.org/protobuf v1.28.1 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
1,975 changes: 1,961 additions & 14 deletions go.sum

Large diffs are not rendered by default.

21 changes: 3 additions & 18 deletions pkg/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,16 @@ import (

// AvroDecoder is a Decoder implementation that supports the Apache Avro format.
type AvroDecoder struct {
registry SchemaRegistry
printAvroSchema bool
registry SchemaRegistry
}

// NewAvroDecoder creates a new AvroDecoder instance.
func NewAvroDecoder(r SchemaRegistry) *AvroDecoder {
return &AvroDecoder{
registry: r,
printAvroSchema: false,
registry: r,
}
}

// UseAvroJSON configures the AvroDecoder to serialize the decoded data using
// avro-json which is still valid JSON, but encodes fields in a more verbose way
// and has special handling for union types.
func (d *AvroDecoder) UseAvroJSON() {
d.printAvroSchema = true
}

// Decode a message from Kafka into our own Message type.
func (d *AvroDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error) {
key, err := d.decode(kafkaMsg.Key)
Expand Down Expand Up @@ -67,13 +58,7 @@ func (d *AvroDecoder) decode(value []byte) (json.RawMessage, error) {
return nil, fmt.Errorf("registry: %w", err)
}

var codec *goavro.Codec
if d.printAvroSchema {
codec, err = goavro.NewCodec(schema)
} else {
codec, err = goavro.NewCodecForStandardJSONFull(schema)
}

codec, err := goavro.NewCodecForStandardJSONFull(schema)
if err != nil {
return nil, fmt.Errorf("codec: %w", err)
}
Expand Down
26 changes: 17 additions & 9 deletions pkg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ type Configuration struct {
type ContextConfiguration struct {
Name string `yaml:"name"`
Brokers []string `yaml:"brokers"`

SchemaRegistry SchemaRegistryConfiguration `yaml:"schema_registry,omitempty"`
}

type SchemaRegistryConfiguration struct {
URL string `yaml:"url"`
Username string `yaml:"username"`
Password string `yaml:"password"`
}

type TopicConfig struct {
Expand All @@ -31,8 +39,8 @@ type TopicConfig struct {
}

type TopicSchemaConfig struct {
Type string `yaml:"type"` // "avro" or "proto"
Proto TopicProtoConfig `yaml:"proto"`
Avro TopicAvroConfig `yaml:"avro"`
}

type TopicProtoConfig struct {
Expand Down Expand Up @@ -209,16 +217,16 @@ func (conf *Configuration) RenameContext(oldName, newName string) error {
}

func (conf *Configuration) Brokers(context string) []string {
var brokers []string
for _, c := range conf.Contexts {
if c.Name != context {
continue
}
ct, err := conf.GetContext(context)
if err != nil {
return nil
}

for _, addr := range c.Brokers {
brokers = append(brokers, ensurePort(addr))
}
brokers := ct.Brokers
for i, addr := range brokers {
brokers[i] = ensurePort(addr)
}

return brokers
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ contexts:
- name: localhost
brokers:
- localhost:9092
schema_registry:
url: http://localhost:8081
username: test
password: secret
`

expected := &Configuration{
Expand All @@ -46,6 +50,11 @@ contexts:
{
Name: "localhost",
Brokers: []string{"localhost:9092"},
SchemaRegistry: SchemaRegistryConfiguration{
URL: "http://localhost:8081",
Username: "test",
Password: "secret",
},
},
},
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,24 @@ func NewTopicDecoder(topic string, conf Configuration) (Decoder, error) {
return nil, fmt.Errorf("failed to load topic configuration: %w", err)
}

registry, _ := NewSchemaRegistry(conf)

switch {
case topicConf == nil:
return new(StringDecoder), nil
case topicConf.Schema.Avro.RegistryURL != "":
r, err := NewKafkaSchemaRegistry(topicConf.Schema.Avro.RegistryURL)
if err != nil {
return nil, err
}

dec := NewAvroDecoder(r)
if topicConf.Schema.Avro.PrintAvroSchema {
dec.UseAvroJSON()
case topicConf.Schema.Type == "avro":
if registry == nil {
return nil, fmt.Errorf(`topic schema type is "avro"" but schema registry config is missing in context configuration`)
}

dec := NewAvroDecoder(registry)

return dec, nil

case topicConf.Schema.Type == "proto":
return nil, fmt.Errorf("reading proto schema from the Schema Registy is not yet supported. Please configure the proto type directly in the topic schema configuration")

case topicConf.Schema.Proto.Type != "":
for i, s := range conf.Proto.Includes {
conf.Proto.Includes[i] = os.ExpandEnv(s)
Expand Down
46 changes: 28 additions & 18 deletions pkg/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,51 @@ package pkg
import (
"fmt"

"github.com/landoop/schema-registry"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
)

type SchemaRegistry interface {
Schema(id int) (string, error)
}

type KafkaSchemaRegistry struct {
client *schemaregistry.Client
schemas map[int]string
type ConfluentSchemaRegistry struct {
client schemaregistry.Client
}

func NewKafkaSchemaRegistry(baseURL string) (*KafkaSchemaRegistry, error) {
client, err := schemaregistry.NewClient(baseURL)
func NewSchemaRegistry(conf Configuration) (SchemaRegistry, error) {
contextConfig, err := conf.GetContext(conf.CurrentContext)
if err != nil {
return nil, fmt.Errorf("failed to create schema registry client: %w", err)
return nil, fmt.Errorf("get context: %w", err)
}

if contextConfig.SchemaRegistry.URL == "" {
return nil, fmt.Errorf("missing schema registry base URL")
}

return &KafkaSchemaRegistry{
client: client,
schemas: map[int]string{},
}, nil
return NewConfluentSchemaRegistry(contextConfig.SchemaRegistry)
}

func (r *KafkaSchemaRegistry) Schema(id int) (string, error) {
schema, ok := r.schemas[id]
if ok {
return schema, nil
func NewConfluentSchemaRegistry(config SchemaRegistryConfiguration) (*ConfluentSchemaRegistry, error) {
var conf *schemaregistry.Config
if config.Username != "" {
conf = schemaregistry.NewConfigWithAuthentication(config.URL, config.Username, config.Password)
} else {
conf = schemaregistry.NewConfig(config.URL)
}

schema, err := r.client.GetSchemaByID(id)
client, err := schemaregistry.NewClient(conf)
if err != nil {
return nil, fmt.Errorf("failed to create schema registry client: %w", err)
}

return &ConfluentSchemaRegistry{client: client}, nil
}

func (r *ConfluentSchemaRegistry) Schema(id int) (string, error) {
schema, err := r.client.GetBySubjectAndID("", id)
if err != nil {
return "", err
}

r.schemas[id] = schema
return schema, nil
return schema.Schema, nil
}

0 comments on commit a32882a

Please sign in to comment.