Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to determine calling workflow engine #643

Merged
merged 14 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Tes.ApiClients.Tests/TerraWsmApiClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void SetUp()
cache.Setup(c => c.CreateEntry(It.IsAny<object>())).Returns(new Mock<Microsoft.Extensions.Caching.Memory.ICacheEntry>().Object);
cacheAndRetryBuilder.SetupGet(c => c.AppCache).Returns(cache.Object);
cacheAndRetryHandler = new(TestServices.RetryHandlersHelpers.GetCachingAsyncRetryPolicyMock(cacheAndRetryBuilder, c => c.DefaultRetryHttpResponseMessagePolicyBuilder()));
azureEnvironmentConfig = ExpensiveObjectTestUtility.AzureCloudConfig.AzureEnvironmentConfig;
azureEnvironmentConfig = ExpensiveObjectTestUtility.AzureCloudConfig.AzureEnvironmentConfig!;

terraWsmApiClient = new TerraWsmApiClient(TerraApiStubData.WsmApiHost, tokenCredential.Object,
cacheAndRetryBuilder.Object, azureEnvironmentConfig, NullLogger<TerraWsmApiClient>.Instance);
Expand Down
59 changes: 53 additions & 6 deletions src/Tes/Extensions/TesTaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using Tes.Models;
using Tes.TaskSubmitters;

namespace Tes.Extensions
{
Expand All @@ -12,6 +14,51 @@ namespace Tes.Extensions
/// </summary>
public static class TesTaskExtensions
{
/// <summary>
/// Reports if task was submitted by Cromwell.
/// </summary>
/// <param name="tesTask"><see cref="TesTask"/>.</param>
/// <returns><see cref="true"/> if task is from Cromwell, false otherwise.</returns>
public static bool IsCromwell(this TesTask tesTask)
{
return tesTask.TaskSubmitter is CromwellTaskSubmitter;
}

/// <summary>
/// Gets Cromwell task metadata
/// </summary>
/// <param name="tesTask"><see cref="TesTask"/>.</param>
/// <returns><see cref="CromwellTaskSubmitter"/>.</returns>
public static CromwellTaskSubmitter GetCromwellMetadata(this TesTask tesTask)
{
return tesTask.TaskSubmitter as CromwellTaskSubmitter;
}

/// <summary>
/// Visits each value in an enumeration with an action.
/// </summary>
/// <typeparam name="T">Type of enumerated items.</typeparam>
/// <param name="values">Enumeration on which to visit each item.</param>
/// <param name="action">Action to invoke with each item.</param>
public static void ForEach<T>(this IEnumerable<T> values, Action<T> action)
{
foreach (var value in values)
{
action(value);
}
}

/// <summary>
/// Adds a range of items to an <see cref="IList{T}"/>.
/// </summary>
/// <typeparam name="T">Type of enumerated items.</typeparam>
/// <param name="list">List to add <paramref name="items"/> to.</param>
/// <param name="items">Items to add to <paramref name="list"/>.</param>
public static void AddRange<T>(this IList<T> list, IEnumerable<T> items)
{
items.ForEach(list.Add);
}

/// <summary>
/// Writes to <see cref="TesTask"/> system log.
/// </summary>
Expand All @@ -22,7 +69,7 @@ public static void AddToSystemLog(this TesTask tesTask, IEnumerable<string> logE
if (logEntries is not null && logEntries.Any(e => !string.IsNullOrEmpty(e)))
{
var tesTaskLog = tesTask.GetOrAddTesTaskLog();
tesTaskLog.SystemLogs ??= new();
tesTaskLog.SystemLogs ??= [];
tesTaskLog.SystemLogs.AddRange(logEntries);
}
}
Expand All @@ -36,7 +83,7 @@ public static void AddToSystemLog(this TesTask tesTask, IEnumerable<string> logE
public static void SetFailureReason(this TesTask tesTask, string failureReason, params string[] additionalSystemLogItems)
{
tesTask.GetOrAddTesTaskLog().FailureReason = failureReason;
tesTask.AddToSystemLog(new[] { failureReason });
tesTask.AddToSystemLog([failureReason]);
tesTask.AddToSystemLog(additionalSystemLogItems.Where(i => !string.IsNullOrEmpty(i)));
}

Expand All @@ -57,7 +104,7 @@ public static void SetFailureReason(this TesTask tesTask, TesException tesExcept
public static void SetWarning(this TesTask tesTask, string warning, params string[] additionalSystemLogItems)
{
tesTask.GetOrAddTesTaskLog().Warning = warning;
tesTask.AddToSystemLog(new[] { warning });
tesTask.AddToSystemLog([warning]);
tesTask.AddToSystemLog(additionalSystemLogItems.Where(i => !string.IsNullOrEmpty(i)));
}

Expand All @@ -70,7 +117,7 @@ public static TesTaskLog GetOrAddTesTaskLog(this TesTask tesTask)
{
if (tesTask.Logs is null || !tesTask.Logs.Any())
{
tesTask.Logs = new() { new() };
tesTask.Logs = [new()];
}

return tesTask.Logs.Last();
Expand All @@ -83,7 +130,7 @@ public static TesTaskLog GetOrAddTesTaskLog(this TesTask tesTask)
/// <returns>Last <see cref="TesTaskLog"/></returns>
public static TesTaskLog AddTesTaskLog(this TesTask tesTask)
{
tesTask.Logs ??= new();
tesTask.Logs ??= [];
tesTask.Logs.Add(new());

return tesTask.Logs.Last();
Expand Down Expand Up @@ -114,7 +161,7 @@ public static TesExecutorLog GetOrAddExecutorLog(this TesTaskLog tesTaskLog)
{
if (tesTaskLog.Logs is null || !tesTaskLog.Logs.Any())
{
tesTaskLog.Logs = new() { new() };
tesTaskLog.Logs = [new()];
}

return tesTaskLog.Logs.Last();
Expand Down
4 changes: 2 additions & 2 deletions src/Tes/Models/TesTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public TesTask()
/// <returns>Valid TES task ID</returns>
public string CreateId()
{
var tesTaskIdPrefix = WorkflowId is not null && Guid.TryParse(WorkflowId, out _) ? $"{WorkflowId.Substring(0, 8)}_" : string.Empty;
var tesTaskIdPrefix = WorkflowId is not null && Guid.TryParse(WorkflowId, out _) ? $"{WorkflowId[..8]}_" : string.Empty;
return $"{tesTaskIdPrefix}{Guid.NewGuid():N}";
}

Expand All @@ -125,7 +125,7 @@ public string CreateId()
/// </summary>
/// <param name="id">TesTask ID</param>
/// <returns>True if valid, false if not</returns>
/// <remarks>Letter, digit, _, -, length 32, 36, 41. Supports GUID for backwards compatibility.</remarks>
/// <remarks>Letter, digit, _, -, length 32, 36, 41. Supports dashed GUID for backwards compatibility.</remarks>
public static bool IsValidId(string id)
{
return (id.Length == 32 || id.Length == 36 || id.Length == 41) && !id.Any(c => !(char.IsLetterOrDigit(c) || c == '_' || c == '-'));
Expand Down
27 changes: 16 additions & 11 deletions src/Tes/Models/TesTaskExtended.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,20 @@
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text.RegularExpressions;
using Tes.Repository;
using Tes.TaskSubmitters;

namespace Tes.Models
{
public partial class TesTask : RepositoryItem<TesTask>
{
private static readonly Regex CromwellTaskInstanceNameRegex = new("(.*):[^:]*:[^:]*");
private static readonly Regex CromwellShardRegex = new(".*:([^:]*):[^:]*");
private static readonly Regex CromwellAttemptRegex = new(".*:([^:]*)");
public static readonly List<TesState> ActiveStates = new List<TesState> {
public static readonly List<TesState> ActiveStates =
[
TesState.QUEUEDEnum,
TesState.RUNNINGEnum,
TesState.PAUSEDEnum,
TesState.INITIALIZINGEnum};
TesState.INITIALIZINGEnum
];

/// <summary>
/// Number of retries attempted
Expand All @@ -43,8 +42,14 @@ public partial class TesTask : RepositoryItem<TesTask>
/// <summary>
/// Top-most parent workflow ID from the workflow engine
/// </summary>
[DataMember(Name = "workflow_id")]
public string WorkflowId { get; set; }
[IgnoreDataMember]
public string WorkflowId => TaskSubmitter?.WorkflowId;

/// <summary>
/// Workflow engine task metadata
/// </summary>
[DataMember(Name = "task_submitter")]
public TaskSubmitter TaskSubmitter { get; set; }

/// <summary>
/// Assigned Azure Batch PoolId
Expand Down Expand Up @@ -74,19 +79,19 @@ public partial class TesTask : RepositoryItem<TesTask>
/// Cromwell task description without shard and attempt numbers
/// </summary>
[IgnoreDataMember]
public string CromwellTaskInstanceName => this.Description == null ? null : CromwellTaskInstanceNameRegex.Match(this.Description).Groups[1].Value;
public string CromwellTaskInstanceName => (TaskSubmitter as CromwellTaskSubmitter)?.CromwellTaskInstanceName;

/// <summary>
/// Cromwell shard number
/// </summary>
[IgnoreDataMember]
public int? CromwellShard => this.Description == null ? null : (int.TryParse(CromwellShardRegex.Match(this.Description).Groups[1].Value, out var result) ? result : null);
public int? CromwellShard => (TaskSubmitter as CromwellTaskSubmitter)?.CromwellShard;

/// <summary>
/// Cromwell attempt number
/// </summary>
[IgnoreDataMember]
public int? CromwellAttempt => this.Description == null ? null : (int.TryParse(CromwellAttemptRegex.Match(this.Description).Groups[1].Value, out var result) ? result : null);
public int? CromwellAttempt => (TaskSubmitter as CromwellTaskSubmitter)?.CromwellAttempt;

public bool IsActiveState()
{
Expand Down
6 changes: 3 additions & 3 deletions src/Tes/Models/TesTaskLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public TesTaskLog()
/// </summary>
/// <value>Logs for each executor</value>
[DataMember(Name = "logs")]
public List<TesExecutorLog> Logs { get; set; } = new List<TesExecutorLog>();
public List<TesExecutorLog> Logs { get; set; } = [];

/// <summary>
/// Arbitrary logging metadata included by the implementation.
Expand Down Expand Up @@ -63,14 +63,14 @@ public TesTaskLog()
/// </summary>
/// <value>Information about all output files. Directory outputs are flattened into separate items.</value>
[DataMember(Name = "outputs")]
public List<TesOutputFileLog> Outputs { get; set; } = new List<TesOutputFileLog>();
public List<TesOutputFileLog> Outputs { get; set; } = [];

/// <summary>
/// System logs are any logs the system decides are relevant, which are not tied directly to an Executor process. Content is implementation specific: format, size, etc. System logs may be collected here to provide convenient access. For example, the system may include the name of the host where the task is executing, an error message that caused a SYSTEM_ERROR state (e.g. disk is full), etc. System logs are only included in the FULL task view.
/// </summary>
/// <value>System logs are any logs the system decides are relevant, which are not tied directly to an Executor process. Content is implementation specific: format, size, etc. System logs may be collected here to provide convenient access. For example, the system may include the name of the host where the task is executing, an error message that caused a SYSTEM_ERROR state (e.g. disk is full), etc. System logs are only included in the FULL task view.</value>
[DataMember(Name = "system_logs")]
public List<string> SystemLogs { get; set; } = new List<string>();
public List<string> SystemLogs { get; set; } = [];

/// <summary>
/// Returns the string presentation of the object
Expand Down
36 changes: 21 additions & 15 deletions src/Tes/Repository/PostgreSqlCachingRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public abstract class PostgreSqlCachingRepository<T> : IDisposable where T : cla
.Handle<Npgsql.NpgsqlException>(e => e.IsTransient)
.WaitAndRetryAsync(10, i => TimeSpan.FromSeconds(Math.Pow(2, i)));

private readonly Channel<(T, WriteAction, TaskCompletionSource<T>)> itemsToWrite = Channel.CreateUnbounded<(T, WriteAction, TaskCompletionSource<T>)>();
private record struct WriteItem(T DbItem, WriteAction Action, TaskCompletionSource<T> TaskSource);
private readonly Channel<WriteItem> itemsToWrite = Channel.CreateUnbounded<WriteItem>();
private readonly ConcurrentDictionary<T, object> updatingItems = new(); // Collection of all pending updates to be written, to faciliate detection of simultaneous parallel updates.
private readonly CancellationTokenSource writerWorkerCancellationTokenSource = new();
private readonly Task writerWorkerTask;
Expand All @@ -45,8 +46,8 @@ protected enum WriteAction { Add, Update, Delete }
/// Constructor
/// </summary>
/// <param name="hostApplicationLifetime">Used for requesting termination of the current application if the writer task unexpectedly exits.</param>
/// <param name="logger"></param>
/// <param name="cache"></param>
/// <param name="logger">Logging interface.</param>
/// <param name="cache">Memory cache for fast access to active items.</param>
/// <exception cref="System.Diagnostics.UnreachableException"></exception>
protected PostgreSqlCachingRepository(Microsoft.Extensions.Hosting.IHostApplicationLifetime hostApplicationLifetime, ILogger logger = default, ICache<T> cache = default)
{
Expand All @@ -61,17 +62,16 @@ protected PostgreSqlCachingRepository(Microsoft.Extensions.Hosting.IHostApplicat

if (task.Status == TaskStatus.Faulted)
{
Console.WriteLine($"Repository WriterWorkerAsync failed unexpectedly with: {task.Exception.Message}.");
Logger.LogCritical(task.Exception, "Repository WriterWorkerAsync failed unexpectedly with: {ErrorMessage}.", task.Exception.Message);
Console.WriteLine($"Repository WriterWorkerAsync failed unexpectedly with: {task.Exception.Message}.");
}

const string errMessage = "Repository WriterWorkerAsync unexpectedly completed. The TES application will now be stopped.";
const string errMessage = "Repository WriterWorkerAsync unexpectedly completed. The service will now be stopped.";
Logger.LogCritical(errMessage);
Console.WriteLine(errMessage);

await Task.Delay(TimeSpan.FromSeconds(40)); // Give the logger time to flush; default flush is 30s
hostApplicationLifetime?.StopApplication();
return;
}, TaskContinuationOptions.NotOnCanceled)
.ContinueWith(task => Logger.LogInformation("The repository WriterWorkerAsync ended normally"), TaskContinuationOptions.OnlyOnCanceled);
}
Expand Down Expand Up @@ -148,7 +148,7 @@ protected Task<T> AddUpdateOrRemoveItemInDbAsync(T item, WriteAction action, Can
}
}

if (!itemsToWrite.Writer.TryWrite((item, action, source)))
if (!itemsToWrite.Writer.TryWrite(new(item, action, source)))
{
throw new InvalidOperationException("Failed to TryWrite to _itemsToWrite channel.");
}
Expand All @@ -172,7 +172,7 @@ Task<T> RemoveUpdatingItem(Task<T> task)
/// </summary>
private async Task WriterWorkerAsync(CancellationToken cancellationToken)
{
var list = new List<(T, WriteAction, TaskCompletionSource<T>)>();
var list = new List<WriteItem>();

await foreach (var itemToWrite in itemsToWrite.Reader.ReadAllAsync(cancellationToken))
{
Expand All @@ -191,7 +191,7 @@ private async Task WriterWorkerAsync(CancellationToken cancellationToken)
// If cancellation is requested, do not write any more items
}

private async ValueTask WriteItemsAsync(IList<(T DbItem, WriteAction Action, TaskCompletionSource<T> TaskSource)> dbItems, CancellationToken cancellationToken)
private async ValueTask WriteItemsAsync(IList<WriteItem> dbItems, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

Expand All @@ -208,19 +208,25 @@ private async ValueTask WriteItemsAsync(IList<(T DbItem, WriteAction Action, Tas
dbContext.UpdateRange(dbItems.Where(e => WriteAction.Update.Equals(e.Action)).Select(e => e.DbItem));
dbContext.RemoveRange(dbItems.Where(e => WriteAction.Delete.Equals(e.Action)).Select(e => e.DbItem));
await asyncPolicy.ExecuteAsync(dbContext.SaveChangesAsync, cancellationToken);
var action = ActionOnSuccess();
OperateOnAll(dbItems, action);
}
catch (Exception ex)
{
// It doesn't matter which item the failure was for, we will fail all items in this round.
// TODO: are there exceptions Postgre will send us that will tell us which item(s) failed or alternately succeeded?
FailAll(dbItems.Select(e => e.TaskSource), ex);
return;
var action = ActionOnFailure(ex);
OperateOnAll(dbItems, action);
}

_ = Parallel.ForEach(dbItems, e => e.TaskSource.TrySetResult(e.DbItem));
static void OperateOnAll(IEnumerable<WriteItem> sources, Action<WriteItem> action)
=> _ = Parallel.ForEach(sources, e => action(e));

static Action<WriteItem> ActionOnFailure(Exception ex) =>
e => _ = e.TaskSource.TrySetException(new AggregateException(Enumerable.Empty<Exception>().Append(ex)));

static void FailAll(IEnumerable<TaskCompletionSource<T>> sources, Exception ex)
=> _ = Parallel.ForEach(sources, s => s.TrySetException(new AggregateException(Enumerable.Empty<Exception>().Append(ex))));
static Action<WriteItem> ActionOnSuccess() =>
e => _ = e.TaskSource.TrySetResult(e.DbItem);
}

protected virtual void Dispose(bool disposing)
Expand All @@ -233,7 +239,7 @@ protected virtual void Dispose(bool disposing)

try
{
writerWorkerTask.Wait();
writerWorkerTask.GetAwaiter().GetResult();
}
catch (AggregateException aex) when (aex?.InnerException is TaskCanceledException ex && writerWorkerCancellationTokenSource.Token == ex.CancellationToken)
{ } // Expected return from Wait().
Expand Down
2 changes: 1 addition & 1 deletion src/Tes/Repository/TesDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public TesDbContext()

public TesDbContext(NpgsqlDataSource dataSource, Action<NpgsqlDbContextOptionsBuilder> contextOptionsBuilder = default)
{
ArgumentNullException.ThrowIfNull(dataSource, nameof(dataSource));
ArgumentNullException.ThrowIfNull(dataSource);
DataSource = dataSource;
ContextOptionsBuilder = contextOptionsBuilder;
}
Expand Down
Loading
Loading