Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support TLS for Raft RPC and GRPC connections #180

Merged
merged 3 commits into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/apihttp/apihttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,8 @@ func InfoShardsHandler(api ClientApi) http.HandlerFunc {
// "EnableTLS": false,
// "EnableProfiling": false,
// "ProfilingAddr": "127.0.0.1:6060",
// "SSLCertificate": "/var/tmp/certs/my-cert",
// "SSLCertificateKey": "/var/tmp/certs",
// "TLSCertPath": "/var/tmp/certs/my-cert",
// "TLSKeyPath": "/var/tmp/certs",
// }
func InfoHandler(api ClientApi) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
Expand Down
7 changes: 3 additions & 4 deletions cmd/server_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,13 @@ func runServerStart(cmd *cobra.Command, args []string) error {
log.L().Fatalf("Wrong parameters: %v", err)
}

if conf.SSLCertificate != "" && conf.SSLCertificateKey != "" {
if _, err := os.Stat(conf.SSLCertificate); os.IsNotExist(err) {
if conf.EnableTLS && conf.TLSCertPath != "" && conf.TLSKeyPath != "" {
if _, err := os.Stat(conf.TLSCertPath); os.IsNotExist(err) {
log.L().Fatalf("Can't find certificate .crt file: %v", err)
} else if _, err := os.Stat(conf.SSLCertificateKey); os.IsNotExist(err) {
} else if _, err := os.Stat(conf.TLSKeyPath); os.IsNotExist(err) {
log.L().Fatalf("Can't find certificate .key file: %v", err)
} else {
log.L().Info("TLS enabled")
conf.EnableTLS = true
}
}

Expand Down
48 changes: 33 additions & 15 deletions consensus/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/bbva/qed/balloon"
"github.com/bbva/qed/crypto/hashing"
"github.com/bbva/qed/crypto/tlsutil"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
Expand Down Expand Up @@ -103,9 +105,10 @@ type RaftNode struct {
raftLog *raftLog // Underlying rocksdb-backed persistent log store
snapshots *raft.FileSnapshotStore // Persistent snapstop store

raft *raft.Raft // The consensus mechanism
transport *raft.NetworkTransport // Raft network transport
raftConfig *raft.Config // Config provides any necessary configuration for the Raft server.
raft *raft.Raft // The consensus mechanism
transport *raft.NetworkTransport // Raft network transport
raftConfig *raft.Config // Config provides any necessary configuration for the Raft server.
tlsConfigurator *tlsutil.TLSConfigurator

balloon *balloon.Balloon // Balloon's finite state machine
state *fsmState
Expand All @@ -122,11 +125,11 @@ type RaftNode struct {
done chan struct{}
}

func NewRaftNode(opts *ClusteringOptions, store storage.ManagedStore, snapshotsCh chan *protocol.Snapshot) (*RaftNode, error) {
return NewRaftNodeWithLogger(opts, store, snapshotsCh, log.L())
func NewRaftNode(opts *ClusteringOptions, store storage.ManagedStore, snapshotsCh chan *protocol.Snapshot, tlsConfigurator *tlsutil.TLSConfigurator) (*RaftNode, error) {
return NewRaftNodeWithLogger(opts, store, snapshotsCh, tlsConfigurator, log.L())
}

func NewRaftNodeWithLogger(opts *ClusteringOptions, store storage.ManagedStore, snapshotsCh chan *protocol.Snapshot, logger log.Logger) (*RaftNode, error) {
func NewRaftNodeWithLogger(opts *ClusteringOptions, store storage.ManagedStore, snapshotsCh chan *protocol.Snapshot, tlsConfigurator *tlsutil.TLSConfigurator, logger log.Logger) (*RaftNode, error) {

// We try to resolve the raft addr to avoid binding to hostnames
// because Raft library does not support FQDNs
Expand All @@ -144,12 +147,18 @@ func NewRaftNodeWithLogger(opts *ClusteringOptions, store storage.ManagedStore,
MgmtAddr: opts.MgmtAddr,
HttpAddr: opts.HttpAddr,
}

if tlsConfigurator == nil {
tlsConfigurator = tlsutil.NewTLSConfigurator(&tlsutil.Config{})
}

node := &RaftNode{
info: info,
snapshotsCh: snapshotsCh,
log: logger,
applyTimeout: opts.RaftApplyTimeout,
done: make(chan struct{}),
info: info,
snapshotsCh: snapshotsCh,
log: logger,
tlsConfigurator: tlsConfigurator,
applyTimeout: opts.RaftApplyTimeout,
done: make(chan struct{}),
}

// Create the log store
Expand Down Expand Up @@ -210,7 +219,9 @@ func NewRaftNodeWithLogger(opts *ClusteringOptions, store storage.ManagedStore,

node.raftConfig = conf

node.transport, err = NewCMuxTCPTransportWithLogger(node, 3, 10*time.Second, node.log) // TODO export params
node.transport, err = NewCMuxTCPTransportWithLogger(node.info.RaftAddr, 3, 10*time.Second, tlsConfigurator, func(srv *grpc.Server) {
srv.RegisterService(&_ClusterService_serviceDesc, node)
}, node.log.Named("transport")) // TODO export params
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -275,7 +286,7 @@ func NewRaftNodeWithLogger(opts *ClusteringOptions, store storage.ManagedStore,
func (n *RaftNode) Close(wait bool) error {
n.Lock()

n.log.Trace("RaftNode is cosing down")
n.log.Trace("RaftNode is closing down")

if n.closed {
n.Unlock()
Expand Down Expand Up @@ -418,10 +429,17 @@ func (n *RaftNode) JoinCluster(ctx context.Context, req *RaftJoinRequest) (*Raft
}

func (n *RaftNode) attemptToJoinCluster(addrs []string) error {
var err error
conf, err := n.tlsConfigurator.OutgoingTLSConfig()
if err != nil {
return err
}
for _, addr := range addrs {
var conn *grpc.ClientConn
conn, err = grpc.Dial(addr, grpc.WithInsecure())
if conf != nil {
conn, err = grpc.Dial(addr, grpc.WithTransportCredentials(credentials.NewTLS(conf)))
} else {
conn, err = grpc.Dial(addr, grpc.WithInsecure())
}
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func newNode(opts *ClusteringOptions, rocksOpts *rocks.Options) (*RaftNode, clos
}
opts.RaftLogPath = raftPath

node, err := NewRaftNodeWithLogger(opts, db, snapshotsCh, log.L().Named(opts.NodeID))
node, err := NewRaftNodeWithLogger(opts, db, snapshotsCh, nil, log.L().Named(opts.NodeID))
// node, err := NewRaftNode(opts, db, snapshotsCh)
if err != nil {
return nil, cleanF, err
Expand Down
Loading