From 96b580d92b0b14e14d4c84d49ec92c5af426872a Mon Sep 17 00:00:00 2001 From: Jakub Bartkowiak Date: Tue, 19 Mar 2024 14:55:57 +0100 Subject: [PATCH] fix: make invokeId thread safe (svn@421) If you had one BACnetClient instance and you sent multiple requests in parallel, it was possible that the responses could get mixed up due to using same invokeId and/or not checking the sender address. --- BACnetClient.cs | 60 +++++++++++++++++++++----------------------- BacnetAsyncResult.cs | 19 +++++++------- 2 files changed, 38 insertions(+), 41 deletions(-) diff --git a/BACnetClient.cs b/BACnetClient.cs index 3ede8a3..6cb84f5 100644 --- a/BACnetClient.cs +++ b/BACnetClient.cs @@ -23,6 +23,8 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * *********************************************************************/ +using System.Collections.Concurrent; + namespace System.IO.BACnet; public delegate void MessageRecievedHandler(IBacnetTransport sender, byte[] buffer, int offset, int msgLength, BacnetAddress remoteAddress); @@ -33,7 +35,7 @@ namespace System.IO.BACnet; public class BacnetClient : IDisposable { private int _retries; - private byte _invokeId; + private int _invokeId; private readonly LastSegmentAck _lastSegmentAck = new(); private uint _writepriority; @@ -43,7 +45,7 @@ public class BacnetClient : IDisposable /// TODO: invoke-id should be PER (remote) DEVICE! /// private Dictionary>> _segmentsPerInvokeId = new(); - private Dictionary _locksPerInvokeId = new(); + private ConcurrentDictionary _locksPerInvokeId = new(); private Dictionary _expectedSegmentsPerInvokeId = new(); public const int DEFAULT_UDP_PORT = 0xBAC0; @@ -676,13 +678,7 @@ protected void ProcessSegmentAck(BacnetAddress adr, BacnetPduTypes type, byte or private void ProcessSegment(BacnetAddress address, BacnetPduTypes type, BacnetConfirmedServices service, byte invokeId, BacnetMaxSegments maxSegments, BacnetMaxAdpu maxAdpu, bool server, byte sequenceNumber, byte proposedWindowNumber, byte[] buffer, int offset, int length) { - if (!_locksPerInvokeId.TryGetValue(invokeId, out var lockObj)) - { - lockObj = new object(); - _locksPerInvokeId[invokeId] = lockObj; - } - - lock (lockObj) + lock (_locksPerInvokeId.GetOrAdd(invokeId, () => new object())) { ProcessSegmentLocked(address, type, service, invokeId, maxSegments, maxAdpu, server, sequenceNumber, proposedWindowNumber, buffer, offset, length); @@ -1303,7 +1299,7 @@ public IAsyncResult BeginWriteFileRequest(BacnetAddress adr, BacnetObjectId obje { Log.Debug("Sending AtomicWriteFileRequest"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -1342,7 +1338,7 @@ public IAsyncResult BeginReadFileRequest(BacnetAddress adr, BacnetObjectId objec { Log.Debug("Sending AtomicReadFileRequest"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); //encode var buffer = GetEncodeBuffer(Transport.HeaderLength); @@ -1423,7 +1419,7 @@ private IAsyncResult BeginReadRangeRequestCore(BacnetAddress adr, BacnetObjectId { Log.Debug("Sending ReadRangeRequest"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); //encode var buffer = GetEncodeBuffer(Transport.HeaderLength); @@ -1529,7 +1525,7 @@ public IAsyncResult BeginSubscribeCOVRequest(BacnetAddress adr, BacnetObjectId o { Log.Debug($"Sending SubscribeCOVRequest {objectId}"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -1579,7 +1575,7 @@ public IAsyncResult BeginSendConfirmedEventNotificationRequest(BacnetAddress adr { Log.Debug($"Sending Confirmed Event Notification {eventData.eventType} {eventData.eventObjectIdentifier}"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr, source); @@ -1628,7 +1624,7 @@ public IAsyncResult BeginSubscribePropertyRequest(BacnetAddress adr, BacnetObjec { Log.Debug($"Sending SubscribePropertyRequest {objectId}.{monitoredProperty}"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -1696,7 +1692,7 @@ public IAsyncResult BeginReadPropertyRequest(BacnetAddress address, BacnetObject { Log.Debug($"Sending ReadPropertyRequest {objectId} {propertyId}"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, address.RoutedSource, address.RoutedDestination); @@ -1775,7 +1771,7 @@ public IAsyncResult BeginWritePropertyRequest(BacnetAddress adr, BacnetObjectId { Log.Debug($"Sending WritePropertyRequest {objectId} {propertyId}"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -1792,7 +1788,7 @@ public IAsyncResult BeginWritePropertyRequest(BacnetAddress adr, BacnetObjectId public IAsyncResult BeginWritePropertyMultipleRequest(BacnetAddress adr, BacnetObjectId objectId, ICollection valueList, bool waitForTransmit, byte invokeId = 0) { Log.Debug($"Sending WritePropertyMultipleRequest {objectId}"); - if (invokeId == 0) invokeId = unchecked(_invokeId++); + if (invokeId == 0) invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); //BacnetNpduControls.PriorityNormalMessage @@ -1845,7 +1841,7 @@ public IAsyncResult BeginWritePropertyMultipleRequest(BacnetAddress adr, ICollec Log.Debug($"Sending WritePropertyMultipleRequest {objectIds}"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); //BacnetNpduControls.PriorityNormalMessage @@ -1909,7 +1905,7 @@ public IAsyncResult BeginReadPropertyMultipleRequest(BacnetAddress adr, BacnetOb var propertyIds = string.Join(", ", propertyIdAndArrayIndex.Select(v => (BacnetPropertyIds)v.propertyIdentifier)); Log.Debug($"Sending ReadPropertyMultipleRequest {objectId} {propertyIds}"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -1950,7 +1946,7 @@ public IAsyncResult BeginReadPropertyMultipleRequest(BacnetAddress adr, IList v.objectIdentifier)); Log.Debug($"Sending ReadPropertyMultipleRequest {objectIds}"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -2016,7 +2012,7 @@ public bool CreateObjectRequest(BacnetAddress adr, BacnetObjectId objectId, ICol public IAsyncResult BeginCreateObjectRequest(BacnetAddress adr, BacnetObjectId objectId, ICollection valueList, bool waitForTransmit, byte invokeId = 0) { Log.Debug("Sending CreateObjectRequest"); - if (invokeId == 0) invokeId = unchecked(_invokeId++); + if (invokeId == 0) invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); @@ -2065,7 +2061,7 @@ public bool DeleteObjectRequest(BacnetAddress adr, BacnetObjectId objectId, byte public IAsyncResult BeginDeleteObjectRequest(BacnetAddress adr, BacnetObjectId objectId, bool waitForTransmit, byte invokeId = 0) { Log.Debug("Sending DeleteObjectRequest"); - if (invokeId == 0) invokeId = unchecked(_invokeId++); + if (invokeId == 0) invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); @@ -2139,7 +2135,7 @@ public IAsyncResult BeginRemoveListElementRequest(BacnetAddress adr, BacnetObjec { Log.Debug("Sending RemoveListElementRequest"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -2157,7 +2153,7 @@ public IAsyncResult BeginAddListElementRequest(BacnetAddress adr, BacnetObjectId { Log.Debug($"Sending AddListElementRequest {objectId} {(BacnetPropertyIds)reference.propertyIdentifier}"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -2211,7 +2207,7 @@ public IAsyncResult BeginRawEncodedDecodedPropertyConfirmedRequest(BacnetAddress { Log.Debug("Sending RawEncodedRequest"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -2293,7 +2289,7 @@ public IAsyncResult BeginDeviceCommunicationControlRequest(BacnetAddress adr, ui { Log.Debug("Sending DeviceCommunicationControlRequest"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -2356,7 +2352,7 @@ public IAsyncResult BeginGetAlarmSummaryOrEventRequest(BacnetAddress adr, bool g { Log.Debug("Sending Alarm summary request"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -2438,7 +2434,7 @@ public IAsyncResult BeginAlarmAcknowledgement(BacnetAddress adr, BacnetObjectId { Log.Debug("Sending AlarmAcknowledgement"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage, adr.RoutedSource, adr.RoutedDestination); @@ -2482,7 +2478,7 @@ public IAsyncResult BeginReinitializeRequest(BacnetAddress adr, BacnetReinitiali { Log.Debug("Sending ReinitializeRequest"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -2509,7 +2505,7 @@ public void EndReinitializeRequest(IAsyncResult result, out Exception ex) public IAsyncResult BeginConfirmedNotify(BacnetAddress adr, uint subscriberProcessIdentifier, uint initiatingDeviceIdentifier, BacnetObjectId monitoredObjectIdentifier, uint timeRemaining, IList values, bool waitForTransmit, byte invokeId = 0) { Log.Debug("Sending Notify (confirmed)"); - if (invokeId == 0) invokeId = unchecked(_invokeId++); + if (invokeId == 0) invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination); @@ -2587,7 +2583,7 @@ public IAsyncResult BeginLifeSafetyOperationRequest(BacnetAddress address, Bacne { Log.Debug($"Sending {ToTitleCase(operation)} {objectId}"); if (invokeId == 0) - invokeId = unchecked(_invokeId++); + invokeId = (byte)Interlocked.Increment(ref _invokeId); var buffer = GetEncodeBuffer(Transport.HeaderLength); NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, address.RoutedSource, address.RoutedDestination); diff --git a/BacnetAsyncResult.cs b/BacnetAsyncResult.cs index 00628a1..c68af53 100644 --- a/BacnetAsyncResult.cs +++ b/BacnetAsyncResult.cs @@ -10,6 +10,7 @@ public class BacnetAsyncResult : IAsyncResult, IDisposable private readonly bool _waitForTransmit; private readonly int _transmitTimeout; private ManualResetEvent _waitHandle; + private readonly BacnetAddress _address; public bool Segmented { get; private set; } public byte[] Result { get; private set; } @@ -17,7 +18,7 @@ public class BacnetAsyncResult : IAsyncResult, IDisposable public bool CompletedSynchronously { get; private set; } public WaitHandle AsyncWaitHandle => _waitHandle; public bool IsCompleted => _waitHandle.WaitOne(0); - public BacnetAddress Address { get; } + public BacnetAddress Address => _address; public Exception Error { @@ -33,7 +34,7 @@ public Exception Error public BacnetAsyncResult(BacnetClient comm, BacnetAddress adr, byte invokeId, byte[] transmitBuffer, int transmitLength, bool waitForTransmit, int transmitTimeout) { _transmitTimeout = transmitTimeout; - Address = adr; + _address = adr; _waitForTransmit = waitForTransmit; _transmitBuffer = transmitBuffer; _transmitLength = transmitLength; @@ -52,7 +53,7 @@ public void Resend() { try { - if (_comm.Transport.Send(_transmitBuffer, _comm.Transport.HeaderLength, _transmitLength, Address, _waitForTransmit, _transmitTimeout) < 0) + if (_comm.Transport.Send(_transmitBuffer, _comm.Transport.HeaderLength, _transmitLength, _address, _waitForTransmit, _transmitTimeout) < 0) { Error = new IOException("Write Timeout"); } @@ -65,7 +66,7 @@ public void Resend() private void OnSegment(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, BacnetConfirmedServices service, byte invokeId, BacnetMaxSegments maxSegments, BacnetMaxAdpu maxAdpu, byte sequenceNumber, byte[] buffer, int offset, int length) { - if (invokeId != _waitInvokeId) + if (invokeId != _waitInvokeId || !adr.Equals(_address)) return; Segmented = true; @@ -74,7 +75,7 @@ private void OnSegment(BacnetClient sender, BacnetAddress adr, BacnetPduTypes ty private void OnSimpleAck(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, BacnetConfirmedServices service, byte invokeId, byte[] data, int dataOffset, int dataLength) { - if (invokeId != _waitInvokeId) + if (invokeId != _waitInvokeId || !adr.Equals(_address)) return; _waitHandle.Set(); @@ -82,7 +83,7 @@ private void OnSimpleAck(BacnetClient sender, BacnetAddress adr, BacnetPduTypes private void OnAbort(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, byte invokeId, BacnetAbortReason reason, byte[] buffer, int offset, int length) { - if (invokeId != _waitInvokeId) + if (invokeId != _waitInvokeId || !adr.Equals(_address)) return; Error = new Exception($"Abort from device, reason: {reason}"); @@ -90,7 +91,7 @@ private void OnAbort(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type private void OnReject(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, byte invokeId, BacnetRejectReason reason, byte[] buffer, int offset, int length) { - if (invokeId != _waitInvokeId) + if (invokeId != _waitInvokeId || !adr.Equals(_address)) return; Error = new Exception($"Reject from device, reason: {reason}"); @@ -98,7 +99,7 @@ private void OnReject(BacnetClient sender, BacnetAddress adr, BacnetPduTypes typ private void OnError(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, BacnetConfirmedServices service, byte invokeId, BacnetErrorClasses errorClass, BacnetErrorCodes errorCode, byte[] buffer, int offset, int length) { - if (invokeId != _waitInvokeId) + if (invokeId != _waitInvokeId || !adr.Equals(_address)) return; Error = new Exception($"Error from device: {errorClass} - {errorCode}"); @@ -106,7 +107,7 @@ private void OnError(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type private void OnComplexAck(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, BacnetConfirmedServices service, byte invokeId, byte[] buffer, int offset, int length) { - if (invokeId != _waitInvokeId) + if (invokeId != _waitInvokeId || !adr.Equals(_address)) return; Segmented = false;