Skip to content

Commit

Permalink
feat(bindings): add pulsar bindings (#74)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Wichmann <[email protected]>
  • Loading branch information
Alquila and VisualBean authored Dec 7, 2022
1 parent 7e2e075 commit 04e8af8
Show file tree
Hide file tree
Showing 11 changed files with 368 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Readers
{
using LEGO.AsyncAPI.Models.Bindings.Pulsar;
using LEGO.AsyncAPI.Readers.ParseNodes;
using LEGO.AsyncAPI.Writers;

internal static partial class AsyncApiV2Deserializer
{
private static FixedFieldMap<PulsarServerBinding> pulsarServerBindingFixedFields = new ()
{
{ "bindingVersion", (a, n) => { a.BindingVersion = n.GetScalarValue(); } },
{ "tenant", (a, n) => { a.Tenant = n.GetScalarValue(); } },
};

private static FixedFieldMap<PulsarChannelBinding> pulsarChannelBindingFixedFields = new ()
{
{ "bindingVersion", (a, n) => { a.BindingVersion = n.GetScalarValue(); } },
{ "namespace", (a, n) => { a.Namespace = n.GetScalarValue(); } },
{ "persistence", (a, n) => { a.Persistence = n.GetScalarValue().GetEnumFromDisplayName<Persistence>(); } },
{ "compaction", (a, n) => { a.Compaction = n.GetIntegerValue(); } },
{ "retention", (a, n) => { a.Retention = LoadRetention(n); } },
{ "geo-replication", (a, n) => { a.GeoReplication = n.CreateSimpleList(s => s.GetScalarValue()); } },
{ "ttl", (a, n) => { a.TTL = n.GetIntegerValue(); } },
{ "deduplication", (a, n) => { a.Deduplication = n.GetBooleanValue(); } },
};

private static FixedFieldMap<RetentionDefinition> pulsarServerBindingRetentionFixedFields = new ()
{
{ "time", (a, n) => { a.Time = n.GetIntegerValue(); } },
{ "size", (a, n) => { a.Size = n.GetIntegerValue(); } },
};

private static RetentionDefinition LoadRetention(ParseNode node)
{
var mapNode = node.CheckMapNode("retention");
var retention = new RetentionDefinition();
ParseMap(mapNode, retention, pulsarServerBindingRetentionFixedFields, null);
return retention;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ internal static IChannelBinding LoadChannelBinding(ParseNode node)
{
case BindingType.Kafka:
return LoadBinding("ChannelBinding", property.Value, kafkaChannelBindingFixedFields);
case BindingType.Pulsar:
return LoadBinding("ChannelBinding", property.Value, pulsarChannelBindingFixedFields);
case BindingType.Websockets:
return LoadBinding("ChannelBinding", property.Value, webSocketsChannelBindingFixedFields);
default:
Expand Down
9 changes: 5 additions & 4 deletions src/LEGO.AsyncAPI.Readers/V2/AsyncApiChannelDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace LEGO.AsyncAPI.Readers

internal static partial class AsyncApiV2Deserializer
{
private static readonly FixedFieldMap<AsyncApiChannel> channelFixedFields = new()
private static readonly FixedFieldMap<AsyncApiChannel> ChannelFixedFields = new ()
{
{ "description", (a, n) => { a.Description = n.GetScalarValue(); } },
{ "servers", (a, n) => { a.Servers = n.CreateSimpleList(s => s.GetScalarValue()); } },
Expand All @@ -18,8 +18,8 @@ internal static partial class AsyncApiV2Deserializer
{ "bindings", (a, n) => { a.Bindings = LoadChannelBindings(n); } },
};

private static readonly PatternFieldMap<AsyncApiChannel> channelPatternFields =
new()
private static readonly PatternFieldMap<AsyncApiChannel> ChannelPatternFields =
new ()
{
{ s => s.StartsWith("x-"), (a, p, n) => a.AddExtension(p, LoadExtension(p, n)) },
};
Expand All @@ -32,9 +32,10 @@ public static AsyncApiChannel LoadChannel(ParseNode node)
{
return mapNode.GetReferencedObject<AsyncApiChannel>(ReferenceType.Channel, pointer);
}

var pathItem = new AsyncApiChannel();

ParseMap(mapNode, pathItem, channelFixedFields, channelPatternFields);
ParseMap(mapNode, pathItem, ChannelFixedFields, ChannelPatternFields);

return pathItem;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ internal static IServerBinding LoadServerBinding(ParseNode node)
{
case BindingType.Kafka:
return LoadBinding("ServerBinding", property.Value, kafkaServerBindingFixedFields);
case BindingType.Pulsar:
return LoadBinding("ServerBinding", property.Value, pulsarServerBindingFixedFields);
default:
throw new AsyncApiException($"ServerBinding {property.Name} is not supported");
}
Expand Down
2 changes: 2 additions & 0 deletions src/LEGO.AsyncAPI/Models/AsyncApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public static class AsyncApiConstants
public const string Size = "size";
public const string Persistence = "persistence";
public const string Compaction = "compaction";
public const string TTL = "ttl";
public const string Tenant = "tenant";
public const string Namespace = "namespace";
public const string ServerVariables = "serverVariables";
Expand All @@ -136,5 +137,6 @@ public static class AsyncApiConstants
public const string DeleteRetentionMiliseconds = "delete.retention.ms";
public const string MaxMessageBytes = "max.message.bytes";
public const string TopicConfiguration = "topicConfiguration";
public const string GeoReplication = "geo-replication";
}
}
3 changes: 3 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/BindingType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@ public enum BindingType

[Display("websockets")]
Websockets,

[Display("pulsar")]
Pulsar,
}
}
15 changes: 15 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/Pulsar/Persistence.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Bindings.Pulsar
{
using LEGO.AsyncAPI.Attributes;

public enum Persistence
{
[Display("persistent")]
Persistent,

[Display("non-persistent")]
NonPersistent,
}
}
101 changes: 101 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/Pulsar/PulsarChannelBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Bindings.Pulsar
{
using System;
using System.Collections.Generic;
using LEGO.AsyncAPI.Models.Interfaces;
using LEGO.AsyncAPI.Writers;

/// <summary>
/// Binding class for Pulsar server settings.
/// </summary>
public class PulsarChannelBinding : IChannelBinding
{
/// <summary>
/// The namespace associated with the topic.
/// </summary>
public string Namespace { get; set; }

/// <summary>
/// persistence of the topic in Pulsar persistent or non-persistent.
/// </summary>
public Persistence Persistence { get; set; }

/// <summary>
/// Topic compaction threshold given in bytes.
/// </summary>
public int Compaction { get; set; }

/// <summary>
/// A list of clusters the topic is replicated to.
/// </summary>
public IEnumerable<string> GeoReplication { get; set; }

/// <summary>
/// Topic retention policy.
/// </summary>
public RetentionDefinition Retention { get; set; }

/// <summary>
/// Message Time-to-live in seconds.
/// </summary>
public int TTL { get; set; }

/// <summary>
/// When Message deduplication is enabled, it ensures that each message produced on Pulsar topics is persisted to disk only once.
/// </summary>
public bool Deduplication { get; set; }

/// <summary>
/// The version of this binding.
public string BindingVersion { get; set; }

public BindingType Type => BindingType.Pulsar;

public bool UnresolvedReference { get; set; }

public AsyncApiReference Reference { get; set; }

public IDictionary<string, IAsyncApiExtension> Extensions { get; set; } = new Dictionary<string, IAsyncApiExtension>();

/// <summary>
/// Serialize to AsyncAPI V2 document without using reference.
/// </summary>
public void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

writer.WriteStartObject();
writer.WriteRequiredProperty(AsyncApiConstants.Namespace, this.Namespace);
writer.WriteRequiredProperty(AsyncApiConstants.Persistence, this.Persistence.GetDisplayName());
writer.WriteOptionalProperty<int>(AsyncApiConstants.Compaction, this.Compaction);
writer.WriteOptionalCollection(AsyncApiConstants.GeoReplication, this.GeoReplication, (v, s) => v.WriteValue(s));
writer.WriteOptionalObject(AsyncApiConstants.Retention, this.Retention, (w, r) => r.Serialize(w));
writer.WriteOptionalProperty<int>(AsyncApiConstants.TTL, this.TTL);
writer.WriteOptionalProperty(AsyncApiConstants.Deduplication, this.Deduplication);
writer.WriteOptionalProperty(AsyncApiConstants.BindingVersion, this.BindingVersion);

writer.WriteEndObject();
}

public void SerializeV2(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

if (this.Reference != null && writer.GetSettings().ReferenceInline != ReferenceInlineSetting.InlineReferences)
{
this.Reference.SerializeV2(writer);
return;
}

this.SerializeV2WithoutReference(writer);
}
}
}
66 changes: 66 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/Pulsar/PulsarServerBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Bindings.Pulsar
{
using System;
using System.Collections.Generic;
using LEGO.AsyncAPI.Models.Interfaces;
using LEGO.AsyncAPI.Writers;

/// <summary>
/// Binding class for Pulsar server settings.
/// </summary>
public class PulsarServerBinding : IServerBinding
{
/// <summary>
/// The pulsar tenant. If omitted, "public" must be assumed.
/// </summary>
public string Tenant { get; set; }

/// <summary>
/// The version of this binding.
public string BindingVersion { get; set; }

public BindingType Type => BindingType.Pulsar;

public bool UnresolvedReference { get; set; }

public AsyncApiReference Reference { get; set; }

public IDictionary<string, IAsyncApiExtension> Extensions { get; set; } = new Dictionary<string, IAsyncApiExtension>();

/// <summary>
/// Serialize to AsyncAPI V2 document without using reference.
/// </summary>
public void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

writer.WriteStartObject();

writer.WriteOptionalProperty(AsyncApiConstants.Tenant, this.Tenant);
writer.WriteOptionalProperty(AsyncApiConstants.BindingVersion, this.BindingVersion);

writer.WriteEndObject();
}

public void SerializeV2(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

if (this.Reference != null && writer.GetSettings().ReferenceInline != ReferenceInlineSetting.InlineReferences)
{
this.Reference.SerializeV2(writer);
return;
}

this.SerializeV2WithoutReference(writer);
}
}
}
28 changes: 28 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/Pulsar/RetentionDefinition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Bindings.Pulsar
{
using LEGO.AsyncAPI.Models.Interfaces;
using LEGO.AsyncAPI.Writers;

public class RetentionDefinition : IAsyncApiElement
{
/// <summary>
/// Time given in Minutes. 0 = Disable message retention (by default).
/// </summary>
public int Time { get; set; }

/// <summary>
/// Size given in MegaBytes. 0 = Disable message retention (by default).
/// </summary>
public int Size { get; set; }

public void Serialize(IAsyncApiWriter writer)
{
writer.WriteStartObject();
writer.WriteRequiredProperty(AsyncApiConstants.Time, this.Time);
writer.WriteRequiredProperty(AsyncApiConstants.Size, this.Size);
writer.WriteEndObject();
}
}
}
Loading

0 comments on commit 04e8af8

Please sign in to comment.