Skip to content

Commit

Permalink
add client option "PublishAsyncMaxPending" to control the maximum num…
Browse files Browse the repository at this point in the history
…ber of pending messages in the async publish queue
  • Loading branch information
cfsghost committed Dec 9, 2024
1 parent 48f74c4 commit f83a2c0
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 18 deletions.
7 changes: 4 additions & 3 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ func (client *Client) Connect(host string, options *Options) error {
client.options = options

opts := EventBusOptions{
PingInterval: time.Duration(options.PingInterval),
MaxPingsOutstanding: options.MaxPingsOutstanding,
MaxReconnects: options.MaxReconnects,
PingInterval: time.Duration(options.PingInterval),
MaxPingsOutstanding: options.MaxPingsOutstanding,
MaxReconnects: options.MaxReconnects,
PublishAsyncMaxPending: options.PublishAsyncMaxPending,
}

// Create a new instance connector
Expand Down
9 changes: 5 additions & 4 deletions core/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

type EventBusOptions struct {
PingInterval time.Duration
MaxPingsOutstanding int
MaxReconnects int
PingInterval time.Duration
MaxPingsOutstanding int
MaxReconnects int
PublishAsyncMaxPending int
}

type EventBusHandler struct {
Expand Down Expand Up @@ -67,7 +68,7 @@ func (eb *EventBus) GetConnection() *nats.Conn {
func (eb *EventBus) GetJetStream() (nats.JetStreamContext, error) {

if eb.js == nil {
js, err := eb.connection.JetStream(nats.PublishAsyncMaxPending(1024000))
js, err := eb.connection.JetStream(nats.PublishAsyncMaxPending(eb.options.PublishAsyncMaxPending))
if err != nil {
return nil, err
}
Expand Down
24 changes: 13 additions & 11 deletions core/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@ package core
import "time"

type Options struct {
Token string
PingInterval time.Duration
MaxPingsOutstanding int
MaxReconnects int
ReconnectHandler func()
DisconnectHandler func()
Token string
PingInterval time.Duration
MaxPingsOutstanding int
MaxReconnects int
ReconnectHandler func()
DisconnectHandler func()
PublishAsyncMaxPending int
}

func NewOptions() *Options {
return &Options{
PingInterval: 10 * time.Second,
MaxPingsOutstanding: 3,
MaxReconnects: -1,
ReconnectHandler: func() {},
DisconnectHandler: func() {},
PingInterval: 10 * time.Second,
MaxPingsOutstanding: 3,
MaxReconnects: -1,
ReconnectHandler: func() {},
DisconnectHandler: func() {},
PublishAsyncMaxPending: 10240,
}
}

0 comments on commit f83a2c0

Please sign in to comment.