Skip to content

Commit

Permalink
feat(event): add external event broker
Browse files Browse the repository at this point in the history
  • Loading branch information
ncarlier committed Apr 2, 2019
1 parent 25aaae2 commit 60dfbf2
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 1 deletion.
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"syscall"
"time"

eventbroker "github.com/ncarlier/reader/pkg/event-broker"
"github.com/ncarlier/reader/pkg/service"

"github.com/ncarlier/reader/pkg/db"
Expand Down Expand Up @@ -46,6 +47,12 @@ func main() {
log.Fatal().Err(err).Msg("Could not configure Database")
}

// Configure Event Broker
_, err = eventbroker.Configure(*conf.Broker)
if err != nil {
log.Fatal().Err(err).Msg("Could not configure Event Broker")
}

// Init service registry
service.InitRegistry(_db)

Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type Config struct {
ListenAddr *string
DB *string
Broker *string
Version *bool
Debug *bool
LogDir *string
Expand All @@ -19,6 +20,7 @@ type Config struct {
var config = &Config{
ListenAddr: flag.String("listen", getEnv("LISTEN_ADDR", ":8080"), "HTTP service address"),
DB: flag.String("db", getEnv("DB", "postgres://postgres:testpwd@localhost/reader_test?sslmode=disable"), "Database connection string"),
Broker: flag.String("broker", getEnv("BROKER", ""), "External event broker URI for outgoing events"),
Version: flag.Bool("version", false, "Print version"),
Debug: flag.Bool("debug", getBoolEnv("DEBUG", false), "Output debug logs"),
LogDir: flag.String("log-dir", getEnv("LOG_DIR", os.TempDir()), "Webhooks execution log directory"),
Expand Down
47 changes: 47 additions & 0 deletions pkg/event-broker/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package eventbroker

import (
"fmt"
"io"
"net/url"

"github.com/rs/zerolog/log"
)

var instance Broker

// Broker is the event broker interface
type Broker interface {
Send(payload io.Reader) error
}

// Configure the data event Broker regarding the configuration URI
func Configure(uri string) (Broker, error) {
if uri == "" {
return nil, nil
}
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("invalid configuration URI: %s", uri)
}

switch u.Scheme {
case "http", "https":
instance, err = newHTTPBroker(u)
if err != nil {
return nil, err
}
log.Info().Str("component", "broker").Str("uri", u.String()).Msg("using HTTP event broker")
default:
return nil, fmt.Errorf("unsuported event broker: %s", u.Scheme)
}
return instance, nil
}

// Lookup returns the global event broker
func Lookup() Broker {
if instance != nil {
return instance
}
return nil
}
30 changes: 30 additions & 0 deletions pkg/event-broker/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package eventbroker

import (
"fmt"
"io"
"net/http"
"net/url"
)

// HTTPBroker structure
type HTTPBroker struct {
uri *url.URL
}

func newHTTPBroker(uri *url.URL) (Broker, error) {
return &HTTPBroker{
uri: uri,
}, nil
}

// Send the payload to the event broker
func (hb *HTTPBroker) Send(payload io.Reader) error {
resp, err := http.Post(hb.uri.String(), "application/json; charset=utf-8", payload)
if err != nil {
return err
} else if resp.StatusCode >= 300 {
return fmt.Errorf("bad status code: %d", resp.StatusCode)
}
return nil
}
30 changes: 30 additions & 0 deletions pkg/event/user-registration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package event

import (
"bytes"
"encoding/json"
"fmt"

eventbroker "github.com/ncarlier/reader/pkg/event-broker"
"github.com/ncarlier/reader/pkg/model"
"github.com/rs/zerolog/log"
)

const errorMsg = "Unable to send new user external event broker"

func init() {
bus.Subscribe(CreateUser, func(payload ...interface{}) {
if user, ok := payload[0].(model.User); ok {
broker := eventbroker.Lookup()
if broker == nil {
log.Debug().Err(fmt.Errorf("event broker not configured")).Uint("uid", *user.ID).Msg(errorMsg)
return
}
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(user)
if err := broker.Send(b); err != nil {
log.Error().Err(err).Uint("uid", *user.ID).Msg(errorMsg)
}
}
})
}
2 changes: 1 addition & 1 deletion pkg/middleware/mock-auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func MockAuth(inner http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
user, err := service.Lookup().GetOrRegisterUser(ctx, "call@me.neo")
user, err := service.Lookup().GetOrRegisterUser(ctx, "call@me.morpheus")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down

0 comments on commit 60dfbf2

Please sign in to comment.