Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DefaultPinger doesn't detect connection loss when consistently sending packets #288

Open
AndrewBase opened this issue Feb 19, 2025 · 5 comments

Comments

@AndrewBase
Copy link

Describe the bug
When a TCP connection is lost (for instance, when switching between wifi networks) the DefaultPinger doesn't detect the issue or re-establish the connection.

This happens because DefaultPinger calculates the pingDue time based on lastPacketSent. When packets are consistently being sent (but no replies are ever received) the DefaultPinger keeps delaying it's ping message indefinitely.

To reproduce
Establish an MQTT connection, send messages on a more frequent cadence than keepAlive, and then disrupt the TCP connection.

Debug output

time                           lastPingSent                   lastPacketSent                 lastPingResponse               pingDue
2025-02-19T07:26:14.930876194Z 0001-01-01T00:00:00Z.          0001-01-01T00:00:00Z           0001-01-01T00:00:00Z           0001-01-01T00:00:30Z
2025-02-19T07:26:44.932315842Z 2025-02-19T07:26:14.932301934Z 2025-02-19T07:26:39.972580833Z 2025-02-19T07:26:14.980298158Z 2025-02-19T07:27:09.972580833Z
2025-02-19T07:27:09.973104232Z 2025-02-19T07:26:14.932301934Z 2025-02-19T07:27:04.974105149Z 2025-02-19T07:26:14.980298158Z 2025-02-19T07:27:34.974105149Z
2025-02-19T07:27:34.974607106Z 2025-02-19T07:26:14.932301934Z 2025-02-19T07:27:29.976906357Z 2025-02-19T07:26:14.980298158Z 2025-02-19T07:27:59.976906357Z
2025-02-19T07:27:59.977479867Z 2025-02-19T07:26:14.932301934Z 2025-02-19T07:27:59.977238334Z 2025-02-19T07:26:14.980298158Z 2025-02-19T07:28:29.977238334Z
2025-02-19T07:28:29.978458208Z 2025-02-19T07:26:14.932301934Z 2025-02-19T07:28:29.976478509Z 2025-02-19T07:26:14.980298158Z 2025-02-19T07:28:59.976478509Z
2025-02-19T07:28:59.977752918Z 2025-02-19T07:26:14.932301934Z 2025-02-19T07:28:59.97761318Z  2025-02-19T07:26:14.980298158Z 2025-02-19T07:29:29.97761318Z
2025-02-19T07:29:29.978714351Z 2025-02-19T07:26:14.932301934Z 2025-02-19T07:29:25.262394398Z 2025-02-19T07:26:14.980298158Z 2025-02-19T07:29:55.262394398Z
2025-02-19T07:29:55.26762407Z  2025-02-19T07:26:14.932301934Z 2025-02-19T07:29:45.35189184Z  2025-02-19T07:26:14.980298158Z 2025-02-19T07:30:15.35189184Z
2025-02-19T07:30:15.361808698Z 2025-02-19T07:26:14.932301934Z 2025-02-19T07:30:05.603714165Z 2025-02-19T07:26:14.980298158Z 2025-02-19T07:30:35.603714165Z
2025-02-19T07:30:35.604367247Z 2025-02-19T07:26:14.932301934Z 2025-02-19T07:30:25.865425995Z 2025-02-19T07:26:14.980298158Z 2025-02-19T07:30:55.865425995Z
2025-02-19T07:30:55.874694767Z 2025-02-19T07:26:14.932301934Z 2025-02-19T07:30:45.94657684Z  2025-02-19T07:26:14.980298158Z 2025-02-19T07:31:15.94657684Z
2025-02-19T07:31:15.955379912Z 2025-02-19T07:26:14.932301934Z 2025-02-19T07:31:06.031695079Z 2025-02-19T07:26:14.980298158Z 2025-02-19T07:31:36.031695079Z

Expected behaviour
DefaultPinger should detect the lost network connection and re-establish the MQTT connection.

Software used:
github.com/eclipse/paho.golang v0.21.0

Additional context

@MattBrittan
Copy link
Contributor

Interesting. In this situation I'd expect the loss of connection to be picked when an attempt is made to write to (or, generally, read from) the connection. I think there might be a small bug here;

At QOS 0 we:

c.debug.Println("sending QoS0 message")
		if _, err := pb.WriteTo(c.config.Conn); err != nil {
			go c.error(err)
			return nil, err
		}

but at QOS 1/2 we don't call go c.error(err) (and I think we probably should). Can you please confirm what QOS you are using when publishing?

Generally I'd also expect the connection loss to be picked in incomming.

The ping process exists to detect half-open connections; sending a packet should be enough to detect this (should get a TCP timeout) so I don't believe that an additional ping is needed in this case. I do test this and the connection loss is picked up in that situation, so what is happening in your case must vary.

I'd appreciate it if you could share more information about how you are using the library (i.e. are you using autopaho?), debug logs would also be useful.

@AndrewBase
Copy link
Author

I'm using QOS1 and autopaho. I'll try to get you code and logs tomorrow.

From what I understand the TCP timeout can take up to 15 minutes to detect a dead connection. gRPC fixed a similar issue by setting TCP_USER_TIMEOUT on the socket on Linux:
grpc/grpc-go#2307
https://www.evanjones.ca/tcp-connection-timeouts.html
https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/#busy-estab-socket-is-not-forever

@MattBrittan
Copy link
Contributor

I'll try to get you code and logs tomorrow.

Don't worry too much about this (as you are using autopaho rather than something custom).

If you have time, try chucking a go c.error(err) here. I'm not sure if pb.WriteTo will fail or not (I guess this is where having logs might be nice, I develop on a windows box so would need to spin up a VM to try and test this). It would be interesting to know if the connection is reestablished after a longer wait?

@AndrewBase
Copy link
Author

Partially redacted log messages:
logs.txt

Code for how we create the paho client:

	tlsConfig := &tls.Config{
		Certificates: tlsCertificate,
		RootCAs:      certpool,
	}

	// Router can be used to route incoming messages (from subscription topics) to channels.
	router := paho.NewStandardRouter()
	router.DefaultHandler(func(p *paho.Publish) { fmt.Printf("defaulthandler received message with topic: %s\n", p.Topic) })

	brokerUrl, err := url.Parse(broker)
	if err != nil {
		return nil, fmt.Errorf("error parsing broker URL: %w", err)
	}
	cliCfg := autopaho.ClientConfig{
		ServerUrls:        []*url.URL{brokerUrl},
		TlsCfg:            tlsConfig,
		KeepAlive:         20,                     // Keepalive message recommended in paho.golang documentation to be sent every 20 seconds.
		ConnectRetryDelay: 500 * time.Millisecond, // Set to 1/2 second. NW-1808, was set to the default of 10 seconds but seemed to cause issues with publishing telemetry data.
		OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
			subscriptions := make([]paho.SubscribeOptions, 0)
			for topic, QOS := range topicsQOSMap {
				subscription := paho.SubscribeOptions{
					Topic: topic,
					QoS:   QOS,
				}
				subscriptions = append(subscriptions, subscription)
			}
			if len(subscriptions) == 0 {
				return
			}
			if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{Subscriptions: subscriptions}); err != nil {
				slog.Error(fmt.Sprintf("error subscribing to topics %v: %v", topicsQOSMap, err))
			}
		},
		ClientConfig: paho.ClientConfig{
			ClientID:      clientID,
			OnClientError: func(err error) { slog.Error(fmt.Sprintf("client error: %s\n", err)) },
			OnServerDisconnect: func(d *paho.Disconnect) {
				slog.Warn(fmt.Sprintf("server requested disconnect; reason code: %s\n", string(d.ReasonCode)))
			},
			OnPublishReceived: []func(paho.PublishReceived) (bool, error){
				func(pr paho.PublishReceived) (bool, error) {
					router.Route(pr.Packet.Packet())
					return true, nil // We assume that the router handles all messages.
				},
			},
		},
	}
	cliCfg.Debug = newMqttLogger(*slog.Default(), slog.LevelDebug)
	cliCfg.PahoDebug = newMqttLogger(*slog.Default(), slog.LevelDebug)

	// Connect to the server - this will return immediately after initiating the connection process
	client, err := autopaho.NewConnection(ctx, cliCfg)
	if err != nil {
		return nil, fmt.Errorf("error creating MQTT connection: %w", err)
	}

@MattBrittan
Copy link
Contributor

Interesting, it's not what I suspected (there is no indication in the log that the loss of connection has been detected). This seems to be an issue with the TCP stack (would generally expect the loss of connection to be noted).

The MQTT Spec only requires the client to send a PING when the gap between outgoing packets exceeds the Keep Aliveperiod. This is what this library currently does.

I've taken a look at other libraries. The Go paho.mqtt.golang v3.1.1 client sends a ping based on both sent and received packets, as do the python and c clients. As such, I think it would be reasonable for this library to take the same approach to catch this kind of issue.

Happy to receive a PR for this, otherwise I'll try to have a look within a couple of weeks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants