Skip to content

Commit

Permalink
#1034: simplified adapter's interface and moved most of the processin…
Browse files Browse the repository at this point in the history
…g code from AWSMessagingUtils to adapter's implementations.
  • Loading branch information
rypdal committed Mar 3, 2023
1 parent e20c509 commit ed1ef5a
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,12 @@
// </copyright>

using System.Collections.Generic;
using System.Linq;
using OpenTelemetry.Context.Propagation;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal static class AWSMessagingUtils
{
// SQS/SNS message attributes collection size limit according to
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html and
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
private const int MaxMessageAttributes = 10;

internal static void Inject(IRequestContextAdapter request, PropagationContext propagationContext)
{
if (!request.CanInject)
Expand All @@ -35,30 +29,7 @@ internal static void Inject(IRequestContextAdapter request, PropagationContext p
}

var carrier = new Dictionary<string, string>();
Propagators.DefaultTextMapPropagator.Inject(propagationContext, carrier, (c, k, v) => c[k] = v);
if (carrier.Keys.Any(k => request.ContainsAttribute(k)))
{
// If at least one attribute is already present in the request then we skip the injection.
return;
}

int attributesCount = request.AttributesCount;
if (carrier.Count + attributesCount > MaxMessageAttributes)
{
// TODO: add logging (event source).
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in carrier)
{
if (request.ContainsAttribute(param.Key))
{
continue;
}

request.AddAttribute(param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
}
Propagators.DefaultTextMapPropagator.Inject(propagationContext, carrier, (c, k, v) => c[k] = v);
request.AddAttributes(carrier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
// limitations under the License.
// </copyright>

using System.Collections.Generic;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal interface IRequestContextAdapter
{
bool CanInject { get; }

int AttributesCount { get; }

bool ContainsAttribute(string name);

void AddAttribute(string name, string value, int attributeIndex);
void AddAttributes(IReadOnlyDictionary<string, string> attributes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,76 @@
// limitations under the License.
// </copyright>

using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SimpleNotificationService.Model;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal class SnsRequestContextAdapter : IRequestContextAdapter
{
private readonly ParameterCollection parameters;
private readonly PublishRequest originalRequest;
// SQS/SNS message attributes collection size limit according to
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html and
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
private const int MaxMessageAttributes = 10;

private readonly ParameterCollection? parameters;
private readonly PublishRequest? originalRequest;

public SnsRequestContextAdapter(IRequestContext context)
{
this.parameters = context.Request?.ParameterCollection;
this.originalRequest = context.OriginalRequest as PublishRequest;
}

public bool CanInject => this.originalRequest != null;
public bool CanInject => this.originalRequest?.MessageAttributes != null && this.parameters != null;

public int AttributesCount =>
this.originalRequest?.MessageAttributes.Count ?? 0;
public void AddAttributes(IReadOnlyDictionary<string, string> attributes)
{
if (!this.CanInject)
{
return;
}

if (attributes.Keys.Any(k => this.ContainsAttribute(k)))
{
// If at least one attribute is already present in the request then we skip the injection.
return;
}

int attributesCount = this.originalRequest?.MessageAttributes.Count ?? 0;
if (attributes.Count + attributesCount > MaxMessageAttributes)
{
// TODO: add logging (event source).
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
this.AddAttribute(param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
}
}

public void AddAttribute(string name, string value, int nextAttributeIndex)
private void AddAttribute(string name, string value, int nextAttributeIndex)
{
if (this.parameters == null)
if (!this.CanInject)
{
return;
}

var prefix = "MessageAttributes.entry." + nextAttributeIndex;
this.parameters.Add(prefix + ".Name", name);
this.parameters.Add(prefix + ".Value.DataType", "String");
this.parameters.Add(prefix + ".Value.StringValue", value);
this.parameters?.Add(prefix + ".Name", name);
this.parameters?.Add(prefix + ".Value.DataType", "String");
this.parameters?.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
this.originalRequest?.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}

public bool ContainsAttribute(string name)
private bool ContainsAttribute(string name)
=> this.originalRequest?.MessageAttributes.ContainsKey(name) ?? false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,77 @@
// limitations under the License.
// </copyright>

using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SQS.Model;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal class SqsRequestContextAdapter : IRequestContextAdapter
{
private readonly ParameterCollection parameters;
private readonly SendMessageRequest originalRequest;
// SQS/SNS message attributes collection size limit according to
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html and
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
private const int MaxMessageAttributes = 10;

private readonly ParameterCollection? parameters;
private readonly SendMessageRequest? originalRequest;

public SqsRequestContextAdapter(IRequestContext context)
{
this.parameters = context.Request?.ParameterCollection;
this.originalRequest = context.OriginalRequest as SendMessageRequest;
}

public bool CanInject => this.originalRequest != null;
public bool CanInject => this.originalRequest?.MessageAttributes != null && this.parameters != null;

public void AddAttributes(IReadOnlyDictionary<string, string> attributes)
{
if (!this.CanInject)
{
return;
}

if (attributes.Keys.Any(k => this.ContainsAttribute(k)))
{
// If at least one attribute is already present in the request then we skip the injection.
return;
}

int attributesCount = this.originalRequest?.MessageAttributes.Count ?? 0;
if (attributes.Count + attributesCount > MaxMessageAttributes)
{
// TODO: add logging (event source).
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
this.AddAttribute(param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
}
}

public int AttributesCount =>
this.originalRequest?.MessageAttributes.Count ?? 0;

public void AddAttribute(string name, string value, int attributeIndex)
private void AddAttribute(string name, string value, int attributeIndex)
{
if (this.parameters == null)
if (!this.CanInject)
{
return;
}

var prefix = "MessageAttribute." + attributeIndex;
this.parameters.Add(prefix + ".Name", name);
this.parameters.Add(prefix + ".Value.DataType", "String");
this.parameters.Add(prefix + ".Value.StringValue", value);
this.parameters?.Add(prefix + ".Name", name);
this.parameters?.Add(prefix + ".Value.DataType", "String");
this.parameters?.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
this.originalRequest?.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}

public void AddAttributeToOriginalRequest(string name, string value) =>
this.originalRequest?.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });

public bool ContainsAttribute(string name) =>
private bool ContainsAttribute(string name) =>
this.originalRequest?.MessageAttributes.ContainsKey(name) ?? false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ internal static void AssertStringParameters(string serviceType, List<KeyValuePai
for (int i = 0; i < expectedParameters.Count; i++)
{
var prefix = $"{GetNamePrefix(serviceType)}.{i + 1}";
static string Value(ParameterValue p) => (p as StringParameterValue).Value;
static string? Value(ParameterValue p) => (p as StringParameterValue)?.Value;

Assert.True(parameters.ContainsKey($"{prefix}.Name"));
Assert.Equal(expectedParameters[i].Key, Value(parameters[$"{prefix}.Name"]));
Expand Down

0 comments on commit ed1ef5a

Please sign in to comment.