Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log possibly unobserved exceptions in MQTT #3319

Merged
merged 2 commits into from
May 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 62 additions & 56 deletions iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,12 @@ internal MqttTransportHandler(
else
{
ClientOptions options = context.ClientOptions;
switch (settings.GetTransportType())
_channelFactory = settings.GetTransportType() switch
{
case TransportType.Mqtt_Tcp_Only:
_channelFactory = CreateChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options);
break;

case TransportType.Mqtt_WebSocket_Only:
_channelFactory = CreateWebSocketChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options);
break;

default:
throw new InvalidOperationException("Unsupported Transport Setting {0}".FormatInvariant(settings.GetTransportType()));
}
TransportType.Mqtt_Tcp_Only => CreateChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options),
TransportType.Mqtt_WebSocket_Only => CreateWebSocketChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options),
_ => throw new InvalidOperationException("Unsupported Transport Setting {0}".FormatInvariant(settings.GetTransportType())),
};
}

_closeRetryPolicy = new RetryPolicy(new TransientErrorIgnoreStrategy(), 5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
Expand Down Expand Up @@ -277,37 +270,35 @@ public override async Task<Message> ReceiveAsync(CancellationToken cancellationT

return null;
}
else
{
try
{
if (Logging.IsEnabled)
Logging.Enter(
this,
cancellationToken, $"ReceiveAsync() called with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");

cancellationToken.ThrowIfCancellationRequested();
try
{
if (Logging.IsEnabled)
Logging.Enter(
this,
cancellationToken, $"ReceiveAsync() called with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");

EnsureValidState();
cancellationToken.ThrowIfCancellationRequested();

if (State != TransportState.Receiving)
{
await SubscribeCloudToDeviceMessagesAsync().ConfigureAwait(false);
}
EnsureValidState();

await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false);
return ProcessC2dMessage();
}
finally
if (State != TransportState.Receiving)
{
if (Logging.IsEnabled)
Logging.Exit(
this,
cancellationToken,
$"Exiting ReceiveAsync() with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");
await SubscribeCloudToDeviceMessagesAsync().ConfigureAwait(false);
}

await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false);
return ProcessC2dMessage();
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(
this,
cancellationToken,
$"Exiting ReceiveAsync() with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");
}
}

Expand All @@ -320,6 +311,7 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper)

return null;
}

try
{
if (Logging.IsEnabled)
Expand Down Expand Up @@ -405,7 +397,7 @@ public override async Task CompleteAsync(string lockToken, CancellationToken can
isTransient: false);
}

if (_completionQueue.Count == 0)
if (_completionQueue.IsEmpty)
{
throw new IotHubException("Unknown lock token.", isTransient: false);
}
Expand Down Expand Up @@ -444,9 +436,10 @@ protected override void Dispose(bool disposing)
try
{
if (Logging.IsEnabled)
{
Logging.Enter(this, $"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}", $"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
}
Logging.Enter(
this,
$"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}",
$"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");

if (!_isDisposed)
{
Expand All @@ -458,6 +451,18 @@ protected override void Dispose(bool disposing)
CleanUpAsync().GetAwaiter().GetResult();
}

// Log the task completion source tasks' exceptions and avoid unobserved exceptions.
if (_connectCompletion.Task.Exception != null)
{
if (Logging.IsEnabled)
Logging.Error(this, $"{_connectCompletion} has exception {_connectCompletion.Task.Exception}", nameof(Dispose));
}
if (_subscribeCompletionSource.Task.Exception != null)
{
if (Logging.IsEnabled)
Logging.Error(this, $"{_subscribeCompletionSource} has exception {_subscribeCompletionSource.Task.Exception}", nameof(Dispose));
}

_disconnectAwaitersCancellationSource?.Dispose();
_disconnectAwaitersCancellationSource = null;

Expand All @@ -480,9 +485,10 @@ protected override void Dispose(bool disposing)
finally
{
if (Logging.IsEnabled)
{
Logging.Exit(this, $"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}", $"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
}
Logging.Exit(
this,
$"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}",
$"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
}
}

Expand Down Expand Up @@ -726,8 +732,9 @@ public async void OnError(Exception exception)
break;

default:
Debug.Fail($"Unknown transport state: {previousState}");
throw new InvalidOperationException();
string error = $"Unknown transport state: {previousState}";
Debug.Fail(error);
throw new InvalidOperationException(error);
}

await _closeRetryPolicy.RunWithRetryAsync(CleanUpImplAsync).ConfigureAwait(true);
Expand All @@ -753,6 +760,7 @@ private TransportState MoveToStateIfPossible(TransportState destination, Transpo
{
return previousState;
}

TransportState prevState;
if ((prevState = (TransportState)Interlocked.CompareExchange(ref _state, (int)destination, (int)previousState)) == previousState)
{
Expand Down Expand Up @@ -944,11 +952,9 @@ public override async Task<Twin> SendTwinGetAsync(CancellationToken cancellation
Properties = JsonConvert.DeserializeObject<TwinProperties>(body),
};
}
catch (JsonReaderException ex)
catch (JsonReaderException ex) when (Logging.IsEnabled)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Failed to parse Twin JSON: {ex}. Message body: '{body}'");

Logging.Error(this, $"Failed to parse Twin JSON: {ex}. Message body: '{body}'");
throw;
}
}
Expand Down Expand Up @@ -1117,7 +1123,7 @@ private async Task<Message> SendTwinRequestAsync(Message request, string rid, Ca
Message response = null; ;
ExceptionDispatchInfo responseException = null;

Action<Message> onTwinResponse = (Message possibleResponse) =>
void OnTwinResponse(Message possibleResponse)
{
try
{
Expand Down Expand Up @@ -1152,11 +1158,11 @@ private async Task<Message> SendTwinRequestAsync(Message request, string rid, Ca
responseException = ExceptionDispatchInfo.Capture(e);
responseReceived.Release();
}
};
}

try
{
_twinResponseEvent += onTwinResponse;
_twinResponseEvent += OnTwinResponse;

await SendEventAsync(request, cancellationToken).ConfigureAwait(false);

Expand All @@ -1175,7 +1181,7 @@ private async Task<Message> SendTwinRequestAsync(Message request, string rid, Ca
}
finally
{
_twinResponseEvent -= onTwinResponse;
_twinResponseEvent -= OnTwinResponse;
}
}

Expand All @@ -1185,7 +1191,7 @@ private Func<IPAddress[], int, Task<IChannel>> CreateChannelFactory(IotHubConnec
{
IChannel channel = null;

Func<Stream, SslStream> streamFactory = stream => new SslStream(stream, true, settings.RemoteCertificateValidationCallback);
SslStream StreamFactory(Stream stream) => new SslStream(stream, true, settings.RemoteCertificateValidationCallback);

List<X509Certificate> certs = settings.ClientCertificate == null
? new List<X509Certificate>(0)
Expand Down Expand Up @@ -1216,7 +1222,7 @@ private Func<IPAddress[], int, Task<IChannel>> CreateChannelFactory(IotHubConnec
.Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
.Handler(new ActionChannelInitializer<ISocketChannel>(ch =>
{
var tlsHandler = new TlsHandler(streamFactory, clientTlsSettings);
var tlsHandler = new TlsHandler(StreamFactory, clientTlsSettings);
ch.Pipeline.AddLast(
tlsHandler,
MqttEncoder.Instance,
Expand Down