Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs.opcua): Allow to retry reads on invalid sessions #16026

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 34 additions & 31 deletions plugins/inputs/opcua/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,61 +30,64 @@ to use them.
[[inputs.opcua]]
## Metric name
# name = "opcua"
#

## OPC UA Endpoint URL
# endpoint = "opc.tcp://localhost:4840"
#

## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s"
#

## Maximum time allowed for a request over the established connection.
# request_timeout = "5s"

# Maximum time that a session shall remain open without activity.
## Maximum time that a session shall remain open without activity.
# session_timeout = "20m"
#

## Retry options for failing reads e.g. due to invalid sessions
## If the retry count is zero, the read will fail after the initial attempt.
# read_retry_timeout = "100ms"
# read_retry_count = 0

## Security policy, one of "None", "Basic128Rsa15", "Basic256",
## "Basic256Sha256", or "auto"
# security_policy = "auto"
#

## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto"
# security_mode = "auto"
#

## Path to cert.pem. Required when security mode or policy isn't "None".
## If cert path is not supplied, self-signed cert and key will be generated.
# certificate = "/etc/telegraf/cert.pem"
#

## Path to private key.pem. Required when security mode or policy isn't "None".
## If key path is not supplied, self-signed cert and key will be generated.
# private_key = "/etc/telegraf/key.pem"
#

## Authentication Method, one of "Certificate", "UserName", or "Anonymous". To
## authenticate using a specific ID, select 'Certificate' or 'UserName'
# auth_method = "Anonymous"
#
## Username. Required for auth_method = "UserName"

## Username and password required for auth_method = "UserName"
# username = ""
#
## Password. Required for auth_method = "UserName"
# password = ""
#

## Option to select the metric timestamp to use. Valid options are:
## "gather" -- uses the time of receiving the data in telegraf
## "server" -- uses the timestamp provided by the server
## "source" -- uses the timestamp provided by the source
# timestamp = "gather"
#

## Client trace messages
## When set to true, and debug mode enabled in the agent settings, the OPCUA
## client's messages are included in telegraf logs. These messages are very
## noisey, but essential for debugging issues.
# client_trace = false
#

## Include additional Fields in each metric
## Available options are:
## DataType -- OPC-UA Data Type (string)
# optional_fields = []
#

## Node ID configuration
## name - field name to use in the output
## namespace - OPC UA namespace of the node (integer value 0 thru 3)
Expand All @@ -93,12 +96,12 @@ to use them.
## default_tags - extra tags to be added to the output metric (optional)
##
## Use either the inline notation or the bracketed notation, not both.
#

## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier=""},
# ]
#

## Bracketed notation
# [[inputs.opcua.nodes]]
# name = "node1"
Expand All @@ -112,7 +115,7 @@ to use them.
# namespace = ""
# identifier_type = ""
# identifier = ""
#

## Node Group
## Sets defaults so they aren't required in every node.
## Default values can be set for:
Expand All @@ -126,29 +129,29 @@ to use them.
## Group Metric name. Overrides the top level name. If unset, the
## top level name is used.
# name =
#

## Group default namespace. If a node in the group doesn't set its
## namespace, this is used.
# namespace =
#

## Group default identifier type. If a node in the group doesn't set its
## namespace, this is used.
# identifier_type =
#

## Default tags that are applied to every node in this group. Can be
## overwritten in a node by setting a different value for the tag name.
## example: default_tags = { tag1 = "value1" }
# default_tags = {}
#
## Node ID Configuration. Array of nodes with the same settings as above.

## Node ID Configuration. Array of nodes with the same settings as above.
## Use either the inline notation or the bracketed notation, not both.
#

## Inline notation (default_tags not supported yet)
# nodes = [
# {name="node1", namespace="", identifier_type="", identifier=""},
# {name="node2", namespace="", identifier_type="", identifier=""},
#]
#

## Bracketed notation
# [[inputs.opcua.group.nodes]]
# name = "node1"
Expand All @@ -165,12 +168,12 @@ to use them.

## Enable workarounds required by some devices to work correctly
# [inputs.opcua.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid
# additional_valid_status_codes = ["0xC0"]
# ## Set additional valid status codes, StatusOK (0x0) is always considered valid
# # additional_valid_status_codes = ["0xC0"]

# [inputs.opcua.request_workarounds]
## Use unregistered reads instead of registered reads
# use_unregistered_reads = false
# ## Use unregistered reads instead of registered reads
# # use_unregistered_reads = false
```

## Node Configuration
Expand Down
56 changes: 45 additions & 11 deletions plugins/inputs/opcua/read_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package opcua

import (
"context"
"errors"
"fmt"
"time"

"github.com/gopcua/opcua/ua"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/selfstat"
Expand All @@ -17,6 +20,8 @@ type ReadClientWorkarounds struct {
}

type ReadClientConfig struct {
ReadRetryTimeout config.Duration `toml:"read_retry_timeout"`
ReadRetries uint64 `toml:"read_retry_count"`
ReadClientWorkarounds ReadClientWorkarounds `toml:"request_workarounds"`
input.InputClientConfig
}
Expand All @@ -25,9 +30,11 @@ type ReadClientConfig struct {
type ReadClient struct {
*input.OpcUAInputClient

ReadSuccess selfstat.Stat
ReadError selfstat.Stat
Workarounds ReadClientWorkarounds
ReadRetryTimeout time.Duration
ReadRetries uint64
ReadSuccess selfstat.Stat
ReadError selfstat.Stat
Workarounds ReadClientWorkarounds

// internal values
reqIDs []*ua.ReadValueID
Expand All @@ -44,8 +51,14 @@ func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient,
"endpoint": inputClient.Config.OpcUAClientConfig.Endpoint,
}

if rc.ReadRetryTimeout == 0 {
rc.ReadRetryTimeout = config.Duration(100 * time.Millisecond)
}

return &ReadClient{
OpcUAInputClient: inputClient,
ReadRetryTimeout: time.Duration(rc.ReadRetryTimeout),
ReadRetries: rc.ReadRetries,
ReadSuccess: selfstat.Register("opcua", "read_success", tags),
ReadError: selfstat.Register("opcua", "read_error", tags),
Workarounds: rc.ReadClientWorkarounds,
Expand Down Expand Up @@ -136,14 +149,35 @@ func (o *ReadClient) read() error {
NodesToRead: o.reqIDs,
}

resp, err := o.Client.Read(o.ctx, req)
if err != nil {
var count uint64
for {
count++

// Try to update the values for all registered nodes
resp, err := o.Client.Read(o.ctx, req)
if err == nil {
// Success, update the node values and exit
o.ReadSuccess.Incr(1)
for i, d := range resp.Results {
o.UpdateNodeValue(i, d)
}
return nil
}
o.ReadError.Incr(1)
return fmt.Errorf("reading registered nodes failed: %w", err)
}
o.ReadSuccess.Incr(1)
for i, d := range resp.Results {
o.UpdateNodeValue(i, d)

switch {
case count > o.ReadRetries:
// We exceeded the number of retries and should exit
return fmt.Errorf("reading registered nodes failed after %d attempts: %w", count, err)
case errors.Is(err, ua.StatusBadSessionIDInvalid),
errors.Is(err, ua.StatusBadSessionNotActivated),
errors.Is(err, ua.StatusBadSecureChannelIDInvalid):
// Retry after the defined period as session and channels should be refreshed
o.Log.Debugf("reading failed with %v, retry %d / %d...", err, count, o.ReadRetries)
time.Sleep(o.ReadRetryTimeout)
default:
// Non-retryable error, there is nothing we can do
return fmt.Errorf("reading registered nodes failed: %w", err)
}
}
return nil
}
Loading
Loading