Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[relay] Add health check attempt threshold #2609

Merged
merged 4 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion relay/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (c *Client) handShake() error {

func (c *Client) readLoop(relayConn net.Conn) {
internallyStoppedFlag := newInternalStopFlag()
hc := healthcheck.NewReceiver()
hc := healthcheck.NewReceiver(c.log)
go c.listenForStopEvents(hc, relayConn, internallyStoppedFlag)

var (
Expand Down
34 changes: 23 additions & 11 deletions relay/healthcheck/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package healthcheck
import (
"context"
"time"

log "github.com/sirupsen/logrus"
)

var (
Expand All @@ -14,23 +16,26 @@ var (
// If the heartbeat is not received in a certain time, it will send a timeout signal and stop to work
// The heartbeat timeout is a bit longer than the sender's healthcheck interval
type Receiver struct {
OnTimeout chan struct{}

ctx context.Context
ctxCancel context.CancelFunc
heartbeat chan struct{}
alive bool
OnTimeout chan struct{}
log *log.Entry
ctx context.Context
ctxCancel context.CancelFunc
heartbeat chan struct{}
alive bool
attemptThreshold int
}

// NewReceiver creates a new healthcheck receiver and start the timer in the background
func NewReceiver() *Receiver {
func NewReceiver(log *log.Entry) *Receiver {
ctx, ctxCancel := context.WithCancel(context.Background())

r := &Receiver{
OnTimeout: make(chan struct{}, 1),
ctx: ctx,
ctxCancel: ctxCancel,
heartbeat: make(chan struct{}, 1),
OnTimeout: make(chan struct{}, 1),
log: log,
ctx: ctx,
ctxCancel: ctxCancel,
heartbeat: make(chan struct{}, 1),
attemptThreshold: getAttemptThresholdFromEnv(),
}

go r.waitForHealthcheck()
Expand All @@ -56,16 +61,23 @@ func (r *Receiver) waitForHealthcheck() {
defer r.ctxCancel()
defer close(r.OnTimeout)

failureCounter := 0
for {
select {
case <-r.heartbeat:
r.alive = true
failureCounter = 0
case <-ticker.C:
if r.alive {
r.alive = false
continue
}

failureCounter++
if failureCounter < r.attemptThreshold {
r.log.Warnf("healthcheck failed, attempt %d", failureCounter)
continue
}
r.notifyTimeout()
return
case <-r.ctx.Done():
Expand Down
61 changes: 58 additions & 3 deletions relay/healthcheck/receiver_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package healthcheck

import (
"context"
"fmt"
"os"
"testing"
"time"

log "github.com/sirupsen/logrus"
)

func TestNewReceiver(t *testing.T) {
heartbeatTimeout = 5 * time.Second
r := NewReceiver()
r := NewReceiver(log.WithContext(context.Background()))

select {
case <-r.OnTimeout:
Expand All @@ -19,7 +24,7 @@ func TestNewReceiver(t *testing.T) {

func TestNewReceiverNotReceive(t *testing.T) {
heartbeatTimeout = 1 * time.Second
r := NewReceiver()
r := NewReceiver(log.WithContext(context.Background()))

select {
case <-r.OnTimeout:
Expand All @@ -30,7 +35,7 @@ func TestNewReceiverNotReceive(t *testing.T) {

func TestNewReceiverAck(t *testing.T) {
heartbeatTimeout = 2 * time.Second
r := NewReceiver()
r := NewReceiver(log.WithContext(context.Background()))

r.Heartbeat()

Expand All @@ -40,3 +45,53 @@ func TestNewReceiverAck(t *testing.T) {
case <-time.After(3 * time.Second):
}
}

func TestReceiverHealthCheckAttemptThreshold(t *testing.T) {
testsCases := []struct {
name string
threshold int
resetCounterOnce bool
}{
{"Default attempt threshold", defaultAttemptThreshold, false},
{"Custom attempt threshold", 3, false},
{"Should reset threshold once", 2, true},
}

for _, tc := range testsCases {
t.Run(tc.name, func(t *testing.T) {
originalInterval := healthCheckInterval
originalTimeout := heartbeatTimeout
healthCheckInterval = 1 * time.Second
heartbeatTimeout = healthCheckInterval + 500*time.Millisecond
defer func() {
healthCheckInterval = originalInterval
heartbeatTimeout = originalTimeout
}()
//nolint:tenv
os.Setenv(defaultAttemptThresholdEnv, fmt.Sprintf("%d", tc.threshold))
defer os.Unsetenv(defaultAttemptThresholdEnv)

receiver := NewReceiver(log.WithField("test_name", tc.name))

testTimeout := heartbeatTimeout*time.Duration(tc.threshold) + healthCheckInterval

if tc.resetCounterOnce {
receiver.Heartbeat()
t.Logf("reset counter once")
}

select {
case <-receiver.OnTimeout:
if tc.resetCounterOnce {
t.Fatalf("should not have timed out before %s", testTimeout)
}
case <-time.After(testTimeout):
if tc.resetCounterOnce {
return
}
t.Fatalf("should have timed out before %s", testTimeout)
}

})
}
}
60 changes: 51 additions & 9 deletions relay/healthcheck/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ package healthcheck

import (
"context"
"os"
"strconv"
"time"

log "github.com/sirupsen/logrus"
)

const (
defaultAttemptThreshold = 1
defaultAttemptThresholdEnv = "NB_RELAY_HC_ATTEMPT_THRESHOLD"
)

var (
Expand All @@ -15,20 +24,25 @@ var (
// If the receiver does not receive the signal in a certain time, it will send a timeout signal and stop to work
// It will also stop if the context is canceled
type Sender struct {
log *log.Entry
// HealthCheck is a channel to send health check signal to the peer
HealthCheck chan struct{}
// Timeout is a channel to the health check signal is not received in a certain time
Timeout chan struct{}

ack chan struct{}
ack chan struct{}
alive bool
attemptThreshold int
}

// NewSender creates a new healthcheck sender
func NewSender() *Sender {
func NewSender(log *log.Entry) *Sender {
hc := &Sender{
HealthCheck: make(chan struct{}, 1),
Timeout: make(chan struct{}, 1),
ack: make(chan struct{}, 1),
log: log,
HealthCheck: make(chan struct{}, 1),
Timeout: make(chan struct{}, 1),
ack: make(chan struct{}, 1),
attemptThreshold: getAttemptThresholdFromEnv(),
}

return hc
Expand All @@ -46,23 +60,51 @@ func (hc *Sender) StartHealthCheck(ctx context.Context) {
ticker := time.NewTicker(healthCheckInterval)
defer ticker.Stop()

timeoutTimer := time.NewTimer(healthCheckInterval + healthCheckTimeout)
defer timeoutTimer.Stop()
timeoutTicker := time.NewTicker(hc.getTimeoutTime())
defer timeoutTicker.Stop()

defer close(hc.HealthCheck)
defer close(hc.Timeout)

failureCounter := 0
for {
select {
case <-ticker.C:
hc.HealthCheck <- struct{}{}
case <-timeoutTimer.C:
case <-timeoutTicker.C:
if hc.alive {
hc.alive = false
continue
}

failureCounter++
if failureCounter < hc.attemptThreshold {
hc.log.Warnf("Health check failed attempt %d.", failureCounter)
continue
}
hc.Timeout <- struct{}{}
return
case <-hc.ack:
timeoutTimer.Reset(healthCheckInterval + healthCheckTimeout)
failureCounter = 0
hc.alive = true
case <-ctx.Done():
return
}
}
}

func (hc *Sender) getTimeoutTime() time.Duration {
return healthCheckInterval + healthCheckTimeout
}

func getAttemptThresholdFromEnv() int {
if attemptThreshold := os.Getenv(defaultAttemptThresholdEnv); attemptThreshold != "" {
threshold, err := strconv.ParseInt(attemptThreshold, 10, 64)
if err != nil {
log.Errorf("Failed to parse attempt threshold from environment variable \"%s\" should be an integer. Using default value", attemptThreshold)
return defaultAttemptThreshold
}
return int(threshold)
}
return defaultAttemptThreshold
}
Loading
Loading