Skip to content

Commit

Permalink
Use global customization function
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Oct 25, 2024
1 parent 3bec87a commit b3d7983
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace NServiceBus
public System.TimeSpan HeartbeatInterval { get; set; }
public System.Func<RabbitMQ.Client.Events.BasicDeliverEventArgs, string> MessageIdStrategy { get; set; }
public System.TimeSpan NetworkRecoveryInterval { get; set; }
public System.Action<NServiceBus.Transport.IOutgoingTransportOperation, RabbitMQ.Client.IBasicProperties> OutgoingNativeMessageCustomization { get; set; }
public NServiceBus.PrefetchCountCalculation PrefetchCountCalculation { get; set; }
public System.TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker { get; set; }
public bool UseExternalAuthMechanism { get; set; }
Expand Down Expand Up @@ -73,8 +74,4 @@ namespace NServiceBus.Transport.RabbitMQ
System.Threading.Tasks.ValueTask SetupSubscription(RabbitMQ.Client.IChannel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName, System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.ValueTask TeardownSubscription(RabbitMQ.Client.IChannel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName, System.Threading.CancellationToken cancellationToken = default);
}
public static class NativeIntegrationDispatchPropertiesExtensions
{
public static void SetContentType(this NServiceBus.Transport.DispatchProperties dispatchProperties, string contentType) { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@ public TransportDefinition CreateTransportDefinition()
{
var connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost";

var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Classic), connectionString, false);
var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Classic), connectionString, false)
{
//Used by When_customizing_outgoing_messages test
OutgoingNativeMessageCustomization = (operation, properties) =>
{
if (operation.Properties.TryGetValue("ContentType", out var overrideContentType))
{
properties.ContentType = overrideContentType;
}
}
};

return transport;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
namespace NServiceBus.Transport.RabbitMQ.TransportTests;

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using global::RabbitMQ.Client;
using global::RabbitMQ.Client.Events;
using NServiceBus.TransportTests;
using NUnit.Framework;

public class When_setting_content_type : NServiceBusTransportTest
public class When_customizing_outgoing_messages : NServiceBusTransportTest
{
[Test]
public async Task Should_use_provided_value()
public async Task Should_override_default_values()
{
var onMessageInvoked = CreateTaskCompletionSource<MessageContext>();
IReadOnlyBasicProperties basicProps = null;
Expand All @@ -27,8 +26,7 @@ await StartPump(
(_, __) => Task.FromResult(ErrorHandleResult.RetryRequired),
TransportTransactionMode.ReceiveOnly);

var dispatchProperties = new DispatchProperties();
dispatchProperties.SetContentType("my-content");
var dispatchProperties = new DispatchProperties { ["ContentType"] = "my-content" };

await SendMessage(InputQueueName, new Dictionary<string, string>
{
Expand Down
9 changes: 8 additions & 1 deletion src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Transport;
using Transport.RabbitMQ;
using ConnectionFactory = Transport.RabbitMQ.ConnectionFactory;


/// <summary>
/// Transport definition for RabbitMQ.
/// </summary>
Expand Down Expand Up @@ -91,6 +93,11 @@ public TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker
}
}

/// <summary>
/// The callback to use when customizing the message before dispatching it to the broker.
/// </summary>
public Action<IOutgoingTransportOperation, IBasicProperties> OutgoingNativeMessageCustomization { get; set; }

/// <summary>
/// The calculation method for the prefetch count. The default is 3 times the maximum concurrency value.
/// </summary>
Expand Down Expand Up @@ -192,7 +199,7 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
var converter = new MessageConverter(MessageIdStrategy);

var infra = new RabbitMQTransportInfrastructure(hostSettings, receivers, connectionFactory,
RoutingTopology, channelProvider, converter, TimeToWaitBeforeTriggeringCircuitBreaker,
RoutingTopology, channelProvider, converter, OutgoingNativeMessageCustomization, TimeToWaitBeforeTriggeringCircuitBreaker,
PrefetchCountCalculation, NetworkRecoveryInterval, SupportsDelayedDelivery);

if (hostSettings.SetupInfrastructure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using global::RabbitMQ.Client;

sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
{
Expand All @@ -17,6 +18,7 @@ sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings,
ConnectionFactory connectionFactory, IRoutingTopology routingTopology,
ChannelProvider channelProvider, MessageConverter messageConverter,
Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization,
TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation,
TimeSpan networkRecoveryInterval, bool supportsDelayedDelivery)
{
Expand All @@ -26,7 +28,7 @@ public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSetting
this.networkRecoveryInterval = networkRecoveryInterval;
this.supportsDelayedDelivery = supportsDelayedDelivery;

Dispatcher = new MessageDispatcher(channelProvider, supportsDelayedDelivery);
Dispatcher = new MessageDispatcher(channelProvider, messageCustomization, supportsDelayedDelivery);
Receivers = receiverSettings.Select(x => CreateMessagePump(hostSettings, x, messageConverter, timeToWaitBeforeTriggeringCircuitBreaker, prefetchCountCalculation))
.ToDictionary(x => x.Id, x => x);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,7 @@ public static void Fill(this IBasicProperties properties, OutgoingMessage messag
}
}

if (dispatchProperties.TryGetValue(NativeIntegrationDispatchPropertiesExtensions.ContentTypeAttribute, out var contentType) && contentType != null)
{
properties.ContentType = contentType;
}
else if (messageHeaders.TryGetValue(NServiceBus.Headers.ContentType, out contentType) && contentType != null)
if (messageHeaders.TryGetValue(NServiceBus.Headers.ContentType, out var contentType) && contentType != null)
{
properties.ContentType = contentType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
class MessageDispatcher : IMessageDispatcher
{
readonly ChannelProvider channelProvider;
readonly Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization;
readonly bool supportsDelayedDelivery;

public MessageDispatcher(ChannelProvider channelProvider, bool supportsDelayedDelivery)
public MessageDispatcher(ChannelProvider channelProvider, Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization, bool supportsDelayedDelivery)
{
this.channelProvider = channelProvider;
this.messageCustomization = messageCustomization ?? ((_, _) => { });
this.supportsDelayedDelivery = supportsDelayedDelivery;
}

Expand Down Expand Up @@ -55,6 +57,7 @@ Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChan

var properties = new BasicProperties();
properties.Fill(message, transportOperation.Properties);
messageCustomization(transportOperation, properties);

return channel.SendMessage(transportOperation.Destination, message, properties, cancellationToken);
}
Expand All @@ -67,6 +70,7 @@ Task PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwar

var properties = new BasicProperties();
properties.Fill(message, transportOperation.Properties);
messageCustomization(transportOperation, properties);

return channel.PublishMessage(transportOperation.MessageType, message, properties, cancellationToken);
}
Expand Down

This file was deleted.

0 comments on commit b3d7983

Please sign in to comment.