diff --git a/relay/client/client.go b/relay/client/client.go index 7ff17944f8f..52bc95346d3 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -306,7 +306,7 @@ func (c *Client) handShake() error { func (c *Client) readLoop(relayConn net.Conn) { internallyStoppedFlag := newInternalStopFlag() - hc := healthcheck.NewReceiver() + hc := healthcheck.NewReceiver(c.log) go c.listenForStopEvents(hc, relayConn, internallyStoppedFlag) var ( diff --git a/relay/healthcheck/receiver.go b/relay/healthcheck/receiver.go index 59f780ed8a6..b3503d5db7f 100644 --- a/relay/healthcheck/receiver.go +++ b/relay/healthcheck/receiver.go @@ -3,6 +3,8 @@ package healthcheck import ( "context" "time" + + log "github.com/sirupsen/logrus" ) var ( @@ -14,23 +16,26 @@ var ( // If the heartbeat is not received in a certain time, it will send a timeout signal and stop to work // The heartbeat timeout is a bit longer than the sender's healthcheck interval type Receiver struct { - OnTimeout chan struct{} - - ctx context.Context - ctxCancel context.CancelFunc - heartbeat chan struct{} - alive bool + OnTimeout chan struct{} + log *log.Entry + ctx context.Context + ctxCancel context.CancelFunc + heartbeat chan struct{} + alive bool + attemptThreshold int } // NewReceiver creates a new healthcheck receiver and start the timer in the background -func NewReceiver() *Receiver { +func NewReceiver(log *log.Entry) *Receiver { ctx, ctxCancel := context.WithCancel(context.Background()) r := &Receiver{ - OnTimeout: make(chan struct{}, 1), - ctx: ctx, - ctxCancel: ctxCancel, - heartbeat: make(chan struct{}, 1), + OnTimeout: make(chan struct{}, 1), + log: log, + ctx: ctx, + ctxCancel: ctxCancel, + heartbeat: make(chan struct{}, 1), + attemptThreshold: getAttemptThresholdFromEnv(), } go r.waitForHealthcheck() @@ -56,16 +61,23 @@ func (r *Receiver) waitForHealthcheck() { defer r.ctxCancel() defer close(r.OnTimeout) + failureCounter := 0 for { select { case <-r.heartbeat: r.alive = true + failureCounter = 0 case <-ticker.C: if r.alive { r.alive = false continue } + failureCounter++ + if failureCounter < r.attemptThreshold { + r.log.Warnf("healthcheck failed, attempt %d", failureCounter) + continue + } r.notifyTimeout() return case <-r.ctx.Done(): diff --git a/relay/healthcheck/receiver_test.go b/relay/healthcheck/receiver_test.go index 4b41234165c..3b3e32fe676 100644 --- a/relay/healthcheck/receiver_test.go +++ b/relay/healthcheck/receiver_test.go @@ -1,13 +1,18 @@ package healthcheck import ( + "context" + "fmt" + "os" "testing" "time" + + log "github.com/sirupsen/logrus" ) func TestNewReceiver(t *testing.T) { heartbeatTimeout = 5 * time.Second - r := NewReceiver() + r := NewReceiver(log.WithContext(context.Background())) select { case <-r.OnTimeout: @@ -19,7 +24,7 @@ func TestNewReceiver(t *testing.T) { func TestNewReceiverNotReceive(t *testing.T) { heartbeatTimeout = 1 * time.Second - r := NewReceiver() + r := NewReceiver(log.WithContext(context.Background())) select { case <-r.OnTimeout: @@ -30,7 +35,7 @@ func TestNewReceiverNotReceive(t *testing.T) { func TestNewReceiverAck(t *testing.T) { heartbeatTimeout = 2 * time.Second - r := NewReceiver() + r := NewReceiver(log.WithContext(context.Background())) r.Heartbeat() @@ -40,3 +45,53 @@ func TestNewReceiverAck(t *testing.T) { case <-time.After(3 * time.Second): } } + +func TestReceiverHealthCheckAttemptThreshold(t *testing.T) { + testsCases := []struct { + name string + threshold int + resetCounterOnce bool + }{ + {"Default attempt threshold", defaultAttemptThreshold, false}, + {"Custom attempt threshold", 3, false}, + {"Should reset threshold once", 2, true}, + } + + for _, tc := range testsCases { + t.Run(tc.name, func(t *testing.T) { + originalInterval := healthCheckInterval + originalTimeout := heartbeatTimeout + healthCheckInterval = 1 * time.Second + heartbeatTimeout = healthCheckInterval + 500*time.Millisecond + defer func() { + healthCheckInterval = originalInterval + heartbeatTimeout = originalTimeout + }() + //nolint:tenv + os.Setenv(defaultAttemptThresholdEnv, fmt.Sprintf("%d", tc.threshold)) + defer os.Unsetenv(defaultAttemptThresholdEnv) + + receiver := NewReceiver(log.WithField("test_name", tc.name)) + + testTimeout := heartbeatTimeout*time.Duration(tc.threshold) + healthCheckInterval + + if tc.resetCounterOnce { + receiver.Heartbeat() + t.Logf("reset counter once") + } + + select { + case <-receiver.OnTimeout: + if tc.resetCounterOnce { + t.Fatalf("should not have timed out before %s", testTimeout) + } + case <-time.After(testTimeout): + if tc.resetCounterOnce { + return + } + t.Fatalf("should have timed out before %s", testTimeout) + } + + }) + } +} diff --git a/relay/healthcheck/sender.go b/relay/healthcheck/sender.go index 8d1716b2cbe..57b3015ec8b 100644 --- a/relay/healthcheck/sender.go +++ b/relay/healthcheck/sender.go @@ -2,7 +2,16 @@ package healthcheck import ( "context" + "os" + "strconv" "time" + + log "github.com/sirupsen/logrus" +) + +const ( + defaultAttemptThreshold = 1 + defaultAttemptThresholdEnv = "NB_RELAY_HC_ATTEMPT_THRESHOLD" ) var ( @@ -15,20 +24,25 @@ var ( // If the receiver does not receive the signal in a certain time, it will send a timeout signal and stop to work // It will also stop if the context is canceled type Sender struct { + log *log.Entry // HealthCheck is a channel to send health check signal to the peer HealthCheck chan struct{} // Timeout is a channel to the health check signal is not received in a certain time Timeout chan struct{} - ack chan struct{} + ack chan struct{} + alive bool + attemptThreshold int } // NewSender creates a new healthcheck sender -func NewSender() *Sender { +func NewSender(log *log.Entry) *Sender { hc := &Sender{ - HealthCheck: make(chan struct{}, 1), - Timeout: make(chan struct{}, 1), - ack: make(chan struct{}, 1), + log: log, + HealthCheck: make(chan struct{}, 1), + Timeout: make(chan struct{}, 1), + ack: make(chan struct{}, 1), + attemptThreshold: getAttemptThresholdFromEnv(), } return hc @@ -46,23 +60,51 @@ func (hc *Sender) StartHealthCheck(ctx context.Context) { ticker := time.NewTicker(healthCheckInterval) defer ticker.Stop() - timeoutTimer := time.NewTimer(healthCheckInterval + healthCheckTimeout) - defer timeoutTimer.Stop() + timeoutTicker := time.NewTicker(hc.getTimeoutTime()) + defer timeoutTicker.Stop() defer close(hc.HealthCheck) defer close(hc.Timeout) + failureCounter := 0 for { select { case <-ticker.C: hc.HealthCheck <- struct{}{} - case <-timeoutTimer.C: + case <-timeoutTicker.C: + if hc.alive { + hc.alive = false + continue + } + + failureCounter++ + if failureCounter < hc.attemptThreshold { + hc.log.Warnf("Health check failed attempt %d.", failureCounter) + continue + } hc.Timeout <- struct{}{} return case <-hc.ack: - timeoutTimer.Reset(healthCheckInterval + healthCheckTimeout) + failureCounter = 0 + hc.alive = true case <-ctx.Done(): return } } } + +func (hc *Sender) getTimeoutTime() time.Duration { + return healthCheckInterval + healthCheckTimeout +} + +func getAttemptThresholdFromEnv() int { + if attemptThreshold := os.Getenv(defaultAttemptThresholdEnv); attemptThreshold != "" { + threshold, err := strconv.ParseInt(attemptThreshold, 10, 64) + if err != nil { + log.Errorf("Failed to parse attempt threshold from environment variable \"%s\" should be an integer. Using default value", attemptThreshold) + return defaultAttemptThreshold + } + return int(threshold) + } + return defaultAttemptThreshold +} diff --git a/relay/healthcheck/sender_test.go b/relay/healthcheck/sender_test.go index 7a105c308fd..f2116702523 100644 --- a/relay/healthcheck/sender_test.go +++ b/relay/healthcheck/sender_test.go @@ -2,9 +2,12 @@ package healthcheck import ( "context" + "fmt" "os" "testing" "time" + + log "github.com/sirupsen/logrus" ) func TestMain(m *testing.M) { @@ -18,7 +21,7 @@ func TestMain(m *testing.M) { func TestNewHealthPeriod(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hc := NewSender() + hc := NewSender(log.WithContext(ctx)) go hc.StartHealthCheck(ctx) iterations := 0 @@ -38,7 +41,7 @@ func TestNewHealthPeriod(t *testing.T) { func TestNewHealthFailed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hc := NewSender() + hc := NewSender(log.WithContext(ctx)) go hc.StartHealthCheck(ctx) select { @@ -50,7 +53,7 @@ func TestNewHealthFailed(t *testing.T) { func TestNewHealthcheckStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - hc := NewSender() + hc := NewSender(log.WithContext(ctx)) go hc.StartHealthCheck(ctx) time.Sleep(100 * time.Millisecond) @@ -75,7 +78,7 @@ func TestNewHealthcheckStop(t *testing.T) { func TestTimeoutReset(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hc := NewSender() + hc := NewSender(log.WithContext(ctx)) go hc.StartHealthCheck(ctx) iterations := 0 @@ -101,3 +104,102 @@ func TestTimeoutReset(t *testing.T) { t.Fatalf("is not exited") } } + +func TestSenderHealthCheckAttemptThreshold(t *testing.T) { + testsCases := []struct { + name string + threshold int + resetCounterOnce bool + }{ + {"Default attempt threshold", defaultAttemptThreshold, false}, + {"Custom attempt threshold", 3, false}, + {"Should reset threshold once", 2, true}, + } + + for _, tc := range testsCases { + t.Run(tc.name, func(t *testing.T) { + originalInterval := healthCheckInterval + originalTimeout := healthCheckTimeout + healthCheckInterval = 1 * time.Second + healthCheckTimeout = 500 * time.Millisecond + defer func() { + healthCheckInterval = originalInterval + healthCheckTimeout = originalTimeout + }() + + //nolint:tenv + os.Setenv(defaultAttemptThresholdEnv, fmt.Sprintf("%d", tc.threshold)) + defer os.Unsetenv(defaultAttemptThresholdEnv) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sender := NewSender(log.WithField("test_name", tc.name)) + go sender.StartHealthCheck(ctx) + + go func() { + responded := false + for { + select { + case <-ctx.Done(): + return + case _, ok := <-sender.HealthCheck: + if !ok { + return + } + if tc.resetCounterOnce && !responded { + responded = true + sender.OnHCResponse() + } + } + } + }() + + testTimeout := sender.getTimeoutTime()*time.Duration(tc.threshold) + healthCheckInterval + + select { + case <-sender.Timeout: + if tc.resetCounterOnce { + t.Fatalf("should not have timed out before %s", testTimeout) + } + case <-time.After(testTimeout): + if tc.resetCounterOnce { + return + } + t.Fatalf("should have timed out before %s", testTimeout) + } + + }) + } + +} + +//nolint:tenv +func TestGetAttemptThresholdFromEnv(t *testing.T) { + tests := []struct { + name string + envValue string + expected int + }{ + {"Default attempt threshold when env is not set", "", defaultAttemptThreshold}, + {"Custom attempt threshold when env is set to a valid integer", "3", 3}, + {"Default attempt threshold when env is set to an invalid value", "invalid", defaultAttemptThreshold}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.envValue == "" { + os.Unsetenv(defaultAttemptThresholdEnv) + } else { + os.Setenv(defaultAttemptThresholdEnv, tt.envValue) + } + + result := getAttemptThresholdFromEnv() + if result != tt.expected { + t.Fatalf("Expected %d, got %d", tt.expected, result) + } + + os.Unsetenv(defaultAttemptThresholdEnv) + }) + } +} diff --git a/relay/server/peer.go b/relay/server/peer.go index 0de60199685..520a5bb5328 100644 --- a/relay/server/peer.go +++ b/relay/server/peer.go @@ -49,7 +49,7 @@ func (p *Peer) Work() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hc := healthcheck.NewSender() + hc := healthcheck.NewSender(p.log) go hc.StartHealthCheck(ctx) go p.handleHealthcheckEvents(ctx, hc)