Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log possibly unobserved exceptions in MQTT #3319

Merged
merged 2 commits into from
May 2, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 55 additions & 49 deletions iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,12 @@ internal MqttTransportHandler(
else
{
ClientOptions options = context.ClientOptions;
switch (settings.GetTransportType())
_channelFactory = settings.GetTransportType() switch
{
case TransportType.Mqtt_Tcp_Only:
_channelFactory = CreateChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options);
break;

case TransportType.Mqtt_WebSocket_Only:
_channelFactory = CreateWebSocketChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options);
break;

default:
throw new InvalidOperationException("Unsupported Transport Setting {0}".FormatInvariant(settings.GetTransportType()));
}
TransportType.Mqtt_Tcp_Only => CreateChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options),
TransportType.Mqtt_WebSocket_Only => CreateWebSocketChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options),
_ => throw new InvalidOperationException("Unsupported Transport Setting {0}".FormatInvariant(settings.GetTransportType())),
};
}

_closeRetryPolicy = new RetryPolicy(new TransientErrorIgnoreStrategy(), 5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
Expand Down Expand Up @@ -277,37 +270,35 @@ public override async Task<Message> ReceiveAsync(CancellationToken cancellationT

return null;
}
else
{
try
{
if (Logging.IsEnabled)
Logging.Enter(
this,
cancellationToken, $"ReceiveAsync() called with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");

cancellationToken.ThrowIfCancellationRequested();
try
{
if (Logging.IsEnabled)
Logging.Enter(
this,
cancellationToken, $"ReceiveAsync() called with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");

EnsureValidState();
cancellationToken.ThrowIfCancellationRequested();

if (State != TransportState.Receiving)
{
await SubscribeCloudToDeviceMessagesAsync().ConfigureAwait(false);
}
EnsureValidState();

await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false);
return ProcessC2dMessage();
}
finally
if (State != TransportState.Receiving)
{
if (Logging.IsEnabled)
Logging.Exit(
this,
cancellationToken,
$"Exiting ReceiveAsync() with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");
await SubscribeCloudToDeviceMessagesAsync().ConfigureAwait(false);
}

await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false);
return ProcessC2dMessage();
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(
this,
cancellationToken,
$"Exiting ReceiveAsync() with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");
}
}

Expand All @@ -320,6 +311,7 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper)

return null;
}

try
{
if (Logging.IsEnabled)
Expand Down Expand Up @@ -444,9 +436,10 @@ protected override void Dispose(bool disposing)
try
{
if (Logging.IsEnabled)
{
Logging.Enter(this, $"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}", $"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
}
Logging.Enter(
this,
$"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}",
$"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");

if (!_isDisposed)
{
Expand All @@ -458,6 +451,18 @@ protected override void Dispose(bool disposing)
CleanUpAsync().GetAwaiter().GetResult();
}

// Log the task completion source tasks' exceptions and avoid unobserved exceptions.
if (_connectCompletion.Task.Exception != null)
{
if (Logging.IsEnabled)
Logging.Error(this, $"{_connectCompletion} has exception {_connectCompletion.Task.Exception}", nameof(Dispose));
}
if (_subscribeCompletionSource.Task.Exception != null)
{
if (Logging.IsEnabled)
Logging.Error(this, $"{_subscribeCompletionSource} has exception {_subscribeCompletionSource.Task.Exception}", nameof(Dispose));
}

_disconnectAwaitersCancellationSource?.Dispose();
_disconnectAwaitersCancellationSource = null;

Expand All @@ -480,9 +485,10 @@ protected override void Dispose(bool disposing)
finally
{
if (Logging.IsEnabled)
{
Logging.Exit(this, $"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}", $"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
}
Logging.Exit(
this,
$"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}",
$"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
}
}

Expand Down Expand Up @@ -726,8 +732,9 @@ public async void OnError(Exception exception)
break;

default:
Debug.Fail($"Unknown transport state: {previousState}");
throw new InvalidOperationException();
string error = $"Unknown transport state: {previousState}";
Debug.Fail(error);
throw new InvalidOperationException(error);
}

await _closeRetryPolicy.RunWithRetryAsync(CleanUpImplAsync).ConfigureAwait(true);
Expand All @@ -753,6 +760,7 @@ private TransportState MoveToStateIfPossible(TransportState destination, Transpo
{
return previousState;
}

TransportState prevState;
if ((prevState = (TransportState)Interlocked.CompareExchange(ref _state, (int)destination, (int)previousState)) == previousState)
{
Expand Down Expand Up @@ -944,11 +952,9 @@ public override async Task<Twin> SendTwinGetAsync(CancellationToken cancellation
Properties = JsonConvert.DeserializeObject<TwinProperties>(body),
};
}
catch (JsonReaderException ex)
catch (JsonReaderException ex) when (Logging.IsEnabled)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Failed to parse Twin JSON: {ex}. Message body: '{body}'");

Logging.Error(this, $"Failed to parse Twin JSON: {ex}. Message body: '{body}'");
throw;
}
}
Expand Down