Skip to content

Commit

Permalink
[FIXED] Invalid handling of heartbeats in Consume and Messages (#1428)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Oct 12, 2023
1 parent e76c644 commit 26fa0a9
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 118 deletions.
22 changes: 12 additions & 10 deletions jetstream/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,20 @@ func (max PullMaxMessages) configureMessages(opts *consumeOpts) error {
type PullExpiry time.Duration

func (exp PullExpiry) configureConsume(opts *consumeOpts) error {
if exp < 0 {
return fmt.Errorf("%w: expires value must be positive", ErrInvalidOption)
expiry := time.Duration(exp)
if expiry < 1*time.Second {
return fmt.Errorf("%w: expires value must be at least 1s", ErrInvalidOption)
}
opts.Expires = time.Duration(exp)
opts.Expires = expiry
return nil
}

func (exp PullExpiry) configureMessages(opts *consumeOpts) error {
if exp < 0 {
return fmt.Errorf("%w: expires value must be positive", ErrInvalidOption)
expiry := time.Duration(exp)
if expiry < 0 {
return fmt.Errorf("%w: expires value must be at least 1s", ErrInvalidOption)
}
opts.Expires = time.Duration(exp)
opts.Expires = expiry
return nil
}

Expand Down Expand Up @@ -191,17 +193,17 @@ type PullHeartbeat time.Duration

func (hb PullHeartbeat) configureConsume(opts *consumeOpts) error {
hbTime := time.Duration(hb)
if hbTime < 1*time.Second || hbTime > 30*time.Second {
return fmt.Errorf("%w: idle_heartbeat value must be within 1s-30s range", ErrInvalidOption)
if hbTime < 500*time.Millisecond || hbTime > 30*time.Second {
return fmt.Errorf("%w: idle_heartbeat value must be within 500ms-30s range", ErrInvalidOption)
}
opts.Heartbeat = hbTime
return nil
}

func (hb PullHeartbeat) configureMessages(opts *consumeOpts) error {
hbTime := time.Duration(hb)
if hbTime < 1*time.Second || hbTime > 30*time.Second {
return fmt.Errorf("%w: idle_heartbeat value must be within 1s-30s range", ErrInvalidOption)
if hbTime < 500*time.Millisecond || hbTime > 30*time.Second {
return fmt.Errorf("%w: idle_heartbeat value must be within 500ms-30s range", ErrInvalidOption)
}
opts.Heartbeat = hbTime
return nil
Expand Down
25 changes: 20 additions & 5 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ type (
const (
DefaultMaxMessages = 500
DefaultExpires = 30 * time.Second
DefaultHeartbeat = 5 * time.Second
unset = -1
)

Expand Down Expand Up @@ -192,12 +191,17 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
}
userMsg, msgErr := checkMsg(msg)
if !userMsg && msgErr == nil {
if sub.hbMonitor != nil {
sub.hbMonitor.Reset(2 * consumeOpts.Heartbeat)
}
return
}
defer func() {
sub.Lock()
sub.checkPending()
sub.hbMonitor.Reset(2 * consumeOpts.Heartbeat)
if sub.hbMonitor != nil {
sub.hbMonitor.Reset(2 * consumeOpts.Heartbeat)
}
sub.Unlock()
}()
if !userMsg {
Expand Down Expand Up @@ -305,6 +309,9 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
MaxBytes: sub.consumeOpts.MaxBytes,
Heartbeat: sub.consumeOpts.Heartbeat,
}
if sub.hbMonitor != nil {
sub.hbMonitor.Reset(2 * sub.consumeOpts.Heartbeat)
}
sub.resetPendingMsgs()
}
sub.Unlock()
Expand All @@ -325,6 +332,9 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
MaxBytes: sub.consumeOpts.MaxBytes,
Heartbeat: sub.consumeOpts.Heartbeat,
}
if sub.hbMonitor != nil {
sub.hbMonitor.Reset(2 * sub.consumeOpts.Heartbeat)
}
sub.resetPendingMsgs()
}
sub.Unlock()
Expand Down Expand Up @@ -465,7 +475,7 @@ func (s *pullSubscription) Next() (Msg, error) {
if atomic.LoadUint32(&s.closed) == 1 {
return nil, ErrMsgIteratorClosed
}
hbMonitor := s.scheduleHeartbeatCheck(s.consumeOpts.Heartbeat)
hbMonitor := s.scheduleHeartbeatCheck(2 * s.consumeOpts.Heartbeat)
defer func() {
if hbMonitor != nil {
hbMonitor.Stop()
Expand Down Expand Up @@ -509,6 +519,9 @@ func (s *pullSubscription) Next() (Msg, error) {
if s.consumeOpts.ReportMissingHeartbeats {
return nil, err
}
if hbMonitor != nil {
hbMonitor.Reset(2 * s.consumeOpts.Heartbeat)
}
}
if errors.Is(err, errConnected) {
if !isConnected {
Expand All @@ -531,12 +544,14 @@ func (s *pullSubscription) Next() (Msg, error) {
}
s.pending.msgCount = 0
s.pending.byteCount = 0
hbMonitor = s.scheduleHeartbeatCheck(s.consumeOpts.Heartbeat)
if hbMonitor != nil {
hbMonitor.Reset(2 * s.consumeOpts.Heartbeat)
}
}
}
if errors.Is(err, errDisconnected) {
if hbMonitor != nil {
hbMonitor.Stop()
hbMonitor.Reset(2 * s.consumeOpts.Heartbeat)
}
isConnected = false
}
Expand Down
8 changes: 8 additions & 0 deletions jetstream/test/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ type jsServer struct {
restart sync.Mutex
}

// Restart can be used to start again a server
// using the same listen address as before.
func (srv *jsServer) Restart() {
srv.restart.Lock()
defer srv.restart.Unlock()
srv.Server = natsserver.RunServer(srv.myopts)
}

// Dumb wait program to sync on callbacks, etc... Will timeout
func Wait(ch chan bool) error {
return WaitTime(ch, 5*time.Second)
Expand Down
Loading

0 comments on commit 26fa0a9

Please sign in to comment.