Skip to content

Commit

Permalink
Long haul test (#836)
Browse files Browse the repository at this point in the history
1. fix EventDataComparer in MessagesCache
2. refactor and add more logs
  • Loading branch information
philipktlin authored Feb 13, 2019
1 parent a7a246d commit 408f70d
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 96 deletions.
6 changes: 3 additions & 3 deletions edge-modules/MessagesAnalyzer/MessageDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ namespace MessagesAnalyzer

class MessageDetails
{
public MessageDetails(long seqNumber, DateTime enquedDateTime)
public MessageDetails(long seqNumber, DateTime enqueuedDateTime)
{
this.SequenceNumber = seqNumber;
this.EnquedDateTime = enquedDateTime;
this.EnqueuedDateTime = enqueuedDateTime;
}

public long SequenceNumber { get; }

public DateTime EnquedDateTime { get; }
public DateTime EnqueuedDateTime { get; }
}
}
6 changes: 5 additions & 1 deletion edge-modules/MessagesAnalyzer/MessagesCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@ class EventDataComparer : IComparer<MessageDetails>
public int Compare(MessageDetails msg1, MessageDetails msg2)
{
if (msg1 == null)
{
return -1;
}

if (msg2 == null)
return -1;
{
return 1;
}

return msg1.SequenceNumber.CompareTo(msg2.SequenceNumber);
}
Expand Down
41 changes: 30 additions & 11 deletions edge-modules/MessagesAnalyzer/PartitionReceiverHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,19 @@ public Task ProcessEventsAsync(IEnumerable<EventData> events)

if (sequence != null && batchId != null)
{
DateTime enqueuedtime = DateTime.MinValue.ToUniversalTime();
if (eventData.SystemProperties.TryGetValue(EnqueuedTimePropertyName, out object enqueued))
{
if (DateTime.TryParse(enqueued.ToString(), out enqueuedtime))
{
enqueuedtime = DateTime.SpecifyKind(enqueuedtime, DateTimeKind.Utc);
}
}

if (long.TryParse(sequence.ToString(), out long sequenceNumber))
{
DateTime enqueuedtime = GetEnqueuedTime(devId.ToString(), modId.ToString(), eventData);
MessagesCache.Instance.AddMessage(modId.ToString(), batchId.ToString(), new MessageDetails(sequenceNumber, enqueuedtime));
}
else
{
Log.LogError($"Message for module [{modId}] and device [{this.deviceId}] contains invalid sequence number [{sequence}].");
}
}
else
{
Log.LogDebug($"Message for moduleId: {modId} doesn't contain required properties");
Log.LogDebug($"Message for module [{modId}] and device [{this.deviceId}] doesn't contain batch id and sequence number.");
}
}
}
Expand All @@ -69,9 +65,32 @@ public Task ProcessEventsAsync(IEnumerable<EventData> events)
return Task.CompletedTask;
}

static DateTime GetEnqueuedTime(string deviceId, string moduleId, EventData eventData)
{
DateTime enqueuedtime = DateTime.MinValue.ToUniversalTime();

if (eventData.SystemProperties.TryGetValue(EnqueuedTimePropertyName, out object enqueued))
{
if (DateTime.TryParse(enqueued.ToString(), out enqueuedtime))
{
enqueuedtime = DateTime.SpecifyKind(enqueuedtime, DateTimeKind.Utc);
}
else
{
Log.LogError($"Message for module [{moduleId}] and device [{deviceId}] enqueued time [{enqueued}] cannot be parsed.");
}
}
else
{
Log.LogError($"Message for module [{moduleId}] and device [{deviceId}] doesn't contain {EnqueuedTimePropertyName} property.");
}

return enqueuedtime;
}

public Task ProcessErrorAsync(Exception error)
{
Log.LogError(error.StackTrace);
Log.LogError(error.ToString());
return Task.CompletedTask;
}
}
Expand Down
27 changes: 12 additions & 15 deletions edge-modules/MessagesAnalyzer/Reporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ static ModuleReport GetReceivedMessagesReport(string moduleId, double toleranceI

long missingCounter = 0;
long totalMessagesCounter = 0;
IList<MissedMessagesDetails> missingIntervals = new List<MissedMessagesDetails>();
IList<MissedMessagesDetails> missedMessages = new List<MissedMessagesDetails>();

if (batchesSnapshot.Count == 0)
{
Expand All @@ -43,44 +43,41 @@ static ModuleReport GetReceivedMessagesReport(string moduleId, double toleranceI
foreach (SortedSet<MessageDetails> messageDetails in batchesSnapshot)
{
long prevSequenceNumber = messageDetails.First().SequenceNumber;
DateTime prevEnquedDateTime = messageDetails.First().EnquedDateTime;
DateTime prevEnquedDateTime = messageDetails.First().EnqueuedDateTime;

foreach (MessageDetails msg in messageDetails.Skip(1))
{
// ignore messages enqued after endTime
if (DateTime.Compare(endDateTime, msg.EnquedDateTime) < 0)
if (DateTime.Compare(endDateTime, msg.EnqueuedDateTime) < 0)
{
Log.LogDebug($"Ignore message for {moduleId} enqued at {msg.EnquedDateTime} because is after {endDateTime}");
Log.LogDebug($"Ignore message for {moduleId} enqued at {msg.EnqueuedDateTime} because is after {endDateTime}");
break;
}

if (msg.SequenceNumber - 1 != prevSequenceNumber)
{
Log.LogInformation($"Missing messages for {moduleId} from {prevSequenceNumber} to {msg.SequenceNumber}");
Log.LogInformation($"Missing messages for {moduleId} from {prevSequenceNumber} to {msg.SequenceNumber} exclusive.");
long currentMissing = msg.SequenceNumber - prevSequenceNumber - 1;
missingCounter += currentMissing;
missingIntervals.Add(new MissedMessagesDetails(currentMissing, prevEnquedDateTime, msg.EnquedDateTime));
}
else
{
totalMessagesCounter++;
missedMessages.Add(new MissedMessagesDetails(currentMissing, prevEnquedDateTime, msg.EnqueuedDateTime));
}

totalMessagesCounter++;
prevSequenceNumber = msg.SequenceNumber;
prevEnquedDateTime = msg.EnquedDateTime;
lastMessageDateTime = lastMessageDateTime < msg.EnquedDateTime ? msg.EnquedDateTime : lastMessageDateTime;
prevEnquedDateTime = msg.EnqueuedDateTime;
lastMessageDateTime = lastMessageDateTime < msg.EnqueuedDateTime ? msg.EnqueuedDateTime : lastMessageDateTime;
}
}

// check if last message is older
if (DateTime.Compare(lastMessageDateTime.AddMilliseconds(toleranceInMilliseconds), endDateTime) < 0)
{
return new ModuleReport(moduleId, StatusCode.OldMessages, totalMessagesCounter, $"No messages received for the past {toleranceInMilliseconds} milliseconds", lastMessageDateTime, missingIntervals);
return new ModuleReport(moduleId, StatusCode.OldMessages, totalMessagesCounter, $"Missing messages: {missingCounter}. No messages received for the past {toleranceInMilliseconds} milliseconds.", lastMessageDateTime, missedMessages);
}

return missingCounter > 0
? new ModuleReport(moduleId, StatusCode.SkippedMessages, totalMessagesCounter, $"Missing messages: {missingCounter}", lastMessageDateTime, missingIntervals)
: new ModuleReport(moduleId, StatusCode.AllMessages, totalMessagesCounter, $"All messages received", lastMessageDateTime);
? new ModuleReport(moduleId, StatusCode.SkippedMessages, totalMessagesCounter, $"Missing messages: {missingCounter}.", lastMessageDateTime, missedMessages)
: new ModuleReport(moduleId, StatusCode.AllMessages, totalMessagesCounter, "All messages received.", lastMessageDateTime);
}
}
}
4 changes: 2 additions & 2 deletions edge-modules/MessagesAnalyzer/Settings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Settings
const string DefaultWebhostPort = "5001";
const double DefaultToleranceInMilliseconds = 1000 * 60;

static readonly Lazy<Settings> setting = new Lazy<Settings>(
static readonly Lazy<Settings> Setting = new Lazy<Settings>(
() =>
{
IConfiguration configuration = new ConfigurationBuilder()
Expand Down Expand Up @@ -45,7 +45,7 @@ class Settings
this.ToleranceInMilliseconds = tolerance;
}

public static Settings Current => setting.Value;
public static Settings Current => Setting.Value;

public string EventHubConnectionString { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static async Task FilterMessageAndSendMessage(
byte[] messageBytes = messageReceived.GetBytes();
var messageString = Encoding.UTF8.GetString(messageBytes);

// Get message body, containing the Temperature data
// Get message body, containing the Temperature data
var messageBody = JsonConvert.DeserializeObject<MessageBody>(messageString);

if (messageBody != null && messageBody.Machine.Temperature > defaultTemperatureThreshold)
Expand Down
10 changes: 4 additions & 6 deletions edge-modules/load-gen/BufferPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@ public Buffer AllocBuffer(ulong size)
{
List<Buffer> buffers = this.buffers.GetOrAdd(
size,
(bufSize) =>
bufferSize =>
{
var list = new List<Buffer>
{
new Buffer(bufSize)
new Buffer(bufferSize)
};

Log.Information($"Allocated new list & buffer [{list[0].Id}] of size {size}");
Log.Information($"Allocated new list & buffer [{list[0].Id}] of size {bufferSize}");
return list;
});

lock (buffers)
{
Buffer buffer = buffers
.Where(buf => buf.InUse.Get() == false)
.FirstOrDefault();
Buffer buffer = buffers.FirstOrDefault(buf => buf.InUse.Get() == false);

if (buffer == null)
{
Expand Down
103 changes: 51 additions & 52 deletions edge-modules/load-gen/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,44 +35,30 @@ class Program
static async Task Main()
{
ILogger logger = InitLogger().CreateLogger("loadgen");
Log.Information($"Starting load run with the following settings:\r\n{Settings.Current.ToString()}");
Log.Information($"Starting load gen with the following settings:\r\n{Settings.Current}");

try
{
var retryPolicy = new RetryPolicy(TimeoutErrorDetectionStrategy, TransientRetryStrategy);
retryPolicy.Retrying += (_, args) =>
{
Log.Error($"Creating ModuleClient failed with exception {args.LastException}");
if (args.CurrentRetryCount < RetryCount)
{
Log.Information("Retrying...");
}
};
ModuleClient client = await retryPolicy.ExecuteAsync(() => InitModuleClient(Settings.Current.TransportType));
var client = await GetModuleClientWithRetryAsync();

using (var timers = new Timers())
{
var random = new Random();
Guid batchId = Guid.NewGuid();
var bufferPool = new BufferPool();

// setup the message timer
timers.Add(
Settings.Current.MessageFrequency,
Settings.Current.JitterFactor,
() => GenMessage(client, random, batchId, bufferPool));
() => GenerateMessageAsync(client, batchId));

// setup the twin update timer
timers.Add(
Settings.Current.TwinUpdateFrequency,
Settings.Current.JitterFactor,
() => GenTwinUpdate(client, batchId));
timers.Start();

(CancellationTokenSource cts,
ManualResetEventSlim completed,
Option<object> handler) = ShutdownHandler.Init(TimeSpan.FromSeconds(5), logger);
() => GenerateTwinUpdateAsync(client, batchId));

timers.Start();
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(TimeSpan.FromSeconds(5), logger);
Log.Information("Load gen running.");

await cts.Token.WhenCanceled();
Expand All @@ -82,50 +68,58 @@ static async Task Main()
await client.CloseAsync();
completed.Set();
handler.ForEach(h => GC.KeepAlive(h));

Log.Information("Load run complete. Exiting.");
Log.Information("Load gen complete. Exiting.");
}
}
catch (Exception ex)
{
Log.Error($"Error occurred during load run. \r\n{ex.ToString()}");
Log.Error($"Error occurred during load gen.\r\n{ex}");
}
}

static async void GenMessage(
ModuleClient client,
Random random,
Guid batchId,
BufferPool bufferPool)
static ILoggerFactory InitLogger()
{
using (Buffer data = bufferPool.AllocBuffer(Settings.Current.MessageSizeInBytes))
{
// generate some bytes
random.NextBytes(data.Data);
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.WriteTo.Console(outputTemplate: "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff} {Level:u3}] {Message:lj}{NewLine}{Exception}")
.CreateLogger();

// build message
var messageBody = new
{
data = data.Data,
};
return new LoggerFactory().AddSerilog();
}

long sequenceNumber = -1;
try
static async Task GenerateMessageAsync(ModuleClient client, Guid batchId)
{
var random = new Random();
var bufferPool = new BufferPool();
long sequenceNumber = -1;

try
{
using (Buffer data = bufferPool.AllocBuffer(Settings.Current.MessageSizeInBytes))
{
// generate some bytes
random.NextBytes(data.Data);

// build message
var messageBody = new
{
data = data.Data,
};

var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(messageBody)));
sequenceNumber = Interlocked.Increment(ref messageIdCounter);
message.Properties.Add("sequenceNumber", sequenceNumber.ToString());
message.Properties.Add("batchId", batchId.ToString());
await client.SendEventAsync(Settings.Current.OutputName, message).ConfigureAwait(false);
}
catch (Exception e)
{
Log.Error($"Sequence number {sequenceNumber}, BatchId: {batchId.ToString()} {e}");
}
}
catch (Exception e)
{
Log.Error($"[GenerateMessageAsync] Sequence number {sequenceNumber}, BatchId: {batchId.ToString()}; {e}");
}
}

static async void GenTwinUpdate(ModuleClient client, Guid batchId)
static async Task GenerateTwinUpdateAsync(ModuleClient client, Guid batchId)
{
var twin = new TwinCollection();
long sequenceNumber = messageIdCounter;
Expand All @@ -136,18 +130,23 @@ static async void GenTwinUpdate(ModuleClient client, Guid batchId)
}
catch (Exception e)
{
Log.Error($"Sequence number {sequenceNumber}, BatchId: {batchId.ToString()} {e}");
Log.Error($"[GenerateTwinUpdateAsync] Sequence number {sequenceNumber}, BatchId: {batchId.ToString()} {e}");
}
}

static ILoggerFactory InitLogger()
static async Task<ModuleClient> GetModuleClientWithRetryAsync()
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.WriteTo.Console(outputTemplate: "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff} {Level:u3}] {Message:lj}{NewLine}{Exception}")
.CreateLogger();

return new LoggerFactory().AddSerilog();
var retryPolicy = new RetryPolicy(TimeoutErrorDetectionStrategy, TransientRetryStrategy);
retryPolicy.Retrying += (_, args) =>
{
Log.Error($"Creating ModuleClient failed with exception {args.LastException}");
if (args.CurrentRetryCount < RetryCount)
{
Log.Information("Retrying...");
}
};
ModuleClient client = await retryPolicy.ExecuteAsync(() => InitModuleClient(Settings.Current.TransportType));
return client;
}

static async Task<ModuleClient> InitModuleClient(TransportType transportType)
Expand Down
Loading

0 comments on commit 408f70d

Please sign in to comment.