-
Notifications
You must be signed in to change notification settings - Fork 100
Conversation
When compiling I'm getting lots of
How shall we deal with it? |
Please add this adapter to the build config. |
@@ -22,6 +22,8 @@ object ProjectDependencyVersions { | |||
val Log4jVersion = "2.5" | |||
val ProtobufVersion = "2.5.0" | |||
val SparkVersion = "1.6.1" | |||
val VertxVersion = "3.0.0" | |||
val ExampleVertxVersion = "3.3.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why different Vert.x versions for the adapter and the example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose different versions because the Adapter is compatible with Vert.x versions > 3.0.0, but version 3.0.0 has some logging issues and because of that the output in the examples was messed up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming that 3.3.2 is backwards-compatible to 3.0.0, why not depending on 3.3.2 in the adapter then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is backwards compatible, but using 3.3.2 in the adapter also has the risk of using APIs not available in Vert.x 3.0.0, which would break the compatibility of the adapter, wouldn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't that only an issue when using a provided
configuration? If we force applications to use 3.3.2 this should be ok, unless there are breaking changes in Vert.x 3.x minor versions. Is this the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The adapter should use Vert.x 3.0.0 as a provided dependency to not force the user into a specific Vert.x version because of breaking changes between minor versions (see https://github.com/vert-x3/wiki/wiki/3.1.0---Breaking-changes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see ...
|
val writeAdapters: Seq[VertxWriteAdapterConfig] = | ||
configurations.collect({ case c: VertxWriteAdapterConfig => c }) | ||
|
||
def addAdapter(first: VertxAdapterConfig, rest: VertxAdapterConfig*): VertxAdapterSystemConfig = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't a method with name addAdapter
have only a single VertxAdapterConfig
parameter? All usage examples also don't use the rest
vararg parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right - I will rename the varags-version to addAdapters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not removing the rest
parameter instead? They're never used in the example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right - the varargs version is not necessary, I will remove the parameter.
configurations.collect({ case c: VertxWriteAdapterConfig => c }) | ||
|
||
def addAdapter(first: VertxAdapterConfig, rest: VertxAdapterConfig*): VertxAdapterSystemConfig = | ||
addAdapters(first +: rest.toVector) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy(onfigurations = onfigurations :+ first)
or is ordering significant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, ordering is not significant in this case, but does appending the element have any advantages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, shoudn't make a difference
} | ||
|
||
class VertxAdapterSystemConfig(private[vertx] val configurations: Seq[VertxAdapterConfig], | ||
private[vertx] val codecClasses: Seq[Class[_]]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why private[vertx]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to expose the constructor as part of the public API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
else | ||
invalid.map(c => s"Ambigious definition for adapter with id '${c._1}' given. An id may only be used once.") | ||
.toVector | ||
.left |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This generates n error messages that only differ in ${c._1}
. Why not just collecting keys with more than one config and create a single error message? What about using just Either
instead of Scalaz's \/
?
case class VertxSendAdapterConfig(id: String, log: ActorRef, endpointRouter: VertxEndpointRouter, deliveryMode: DeliveryMode) extends VertxReadAdapterConfig | ||
case class VertxWriteAdapterConfig(id: String, log: ActorRef, endpoints: Seq[String], filter: PartialFunction[Any, Boolean]) extends VertxAdapterConfig | ||
|
||
sealed trait ConfirmationType {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{}
can be removed
case object Single extends ConfirmationType | ||
case class Batch(size: Int) extends ConfirmationType | ||
|
||
sealed trait DeliveryMode {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{}
can be removed
import org.scalatest.{BeforeAndAfterEach, Suite} | ||
|
||
import scala.concurrent.{ExecutionContext, Future} | ||
import scala.concurrent.duration._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some imports are not used here
|
||
class AkkaSerializationMessageCodec(override val name: String)(implicit system: ActorSystem) extends MessageCodec[AnyRef, AnyRef] { | ||
|
||
val serializer = PayloadSerializationExtension(system) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can avoid creating your own serialization extension by using Akka's SerializationExtension
and defining a payload wrapper (preferrably using protobuf) so that you know the class when deserializing with Serialization.deserialize[T](bytes: Array[Byte], clazz: Class[T])
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was there a special reason not implementing like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to reuse the CommonSerializer as it already uses protobuf to serialize the the payload, but this implementation needen an ExtendedActorSystem
which was only available within an Extension
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can access an ExtendedActorSystem
from Serialization
.
override def decodeFromWire(pos: Int, buffer: Buffer): AnyRef = { | ||
val payloadLength = buffer.getInt(pos) | ||
val payload = buffer.getBytes(pos + Integer.BYTES, pos + Integer.BYTES + payloadLength) | ||
serializer.fromBinary(payload).asInstanceOf[AnyRef] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Low level position arithmetic can be avoided with a protobuf-defined payload wrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the official Vert.x examples the payload length must be stored in the buffer to recover the part of the buffer that contains the payload in decodeFromWire
.
|
||
class PayloadSerializationExtension(system: ExtendedActorSystem) extends Extension { | ||
|
||
val serializer = new CommonSerializer(system) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When implementing the previously mentioned approach, the serializer
can be moved to AkkaSerializationMessageCodec
vertx.eventBus().registerDefaultCodec(c.asInstanceOf[Class[AnyRef]], AkkaSerializationMessageCodec(c)) | ||
} catch { | ||
case e: IllegalStateException => | ||
system.log.warning(s"An adapter codec for class ${c.getName} was configured, even though a default codec was already registered for this class.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we propagate this exception to the application?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I remember we said that trying to override an existing message-codec-binding should not result in an error but in a warning.
val handler = (msg: Message[Any]) => { | ||
routes.foreach { route => | ||
if (route.filter.applyOrElse(msg.body(), (_: Any) => false)) { | ||
route.destinationLog ! PersistMessage(msg.body(), msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not having only a single Message
parameter for PersistMessage
and VertxWriter
extracts the event with msg.body
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, a single parameter would suffice - I will change that
if (route.filter.applyOrElse(msg.body(), (_: Any) => false)) { | ||
route.destinationLog ! PersistMessage(msg.body(), msg) | ||
} else { | ||
msg.reply(msg.body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would an ACK be sufficient (here and in VertxWriter
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about returning a processing-state depending on whether the event was persisted in the log or filtered? It might be useful information for the caller and should not have a big serialization/deserialization overhead since it can be modelled as an enum.
|
||
private def installMessageConsumer(endpoint: String, routes: Seq[Route]): MessageConsumer[Any] = { | ||
val handler = (msg: Message[Any]) => { | ||
routes.foreach { route => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A message should have only a single reply. Allowing n routes here may erroneously lead to n replies. An endpoint should be restricted to publish to exactly one log.
|
||
object VertxWriteRouter { | ||
|
||
case class Route(sourceEndpoint: String, destinationLog: ActorRef, filter: PartialFunction[Any, Boolean]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename destinationLog
to writer
?
protected def produce[A](address: String, msg: Any, deliveryOptions: DeliveryOptions, handler: Handler[AsyncResult[Message[A]]]): Unit | ||
|
||
def produce(address: String, msg: Any): Unit = { | ||
produce[Unit](address, msg, new DeliveryOptions(), ((ar: AsyncResult[Message[Unit]]) => {}).asVertxHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Equivalent DeliveryOptions
and handler objects are created with every call. Define them in an outer scope? OTOH I'm not sure if the compiler is clever enough to detect that and optimize.
produce[Unit](address, msg, new DeliveryOptions(), ((ar: AsyncResult[Message[Unit]]) => {}).asVertxHandler) | ||
} | ||
|
||
def produce[A](address: String, msg: Any, timeout: FiniteDuration = 30.seconds)(implicit ec: ExecutionContext): Future[A] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make default timeout configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the default timeout altogether because it is not used anyway.
override protected def produce[A](address: String, msg: Any, deliveryOptions: DeliveryOptions, handler: Handler[AsyncResult[Message[A]]]): Unit = { | ||
eventBus().publish(address, msg, deliveryOptions) | ||
handler.handle(VertxFuture.succeededFuture(new MessageImpl[Any, A]())) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a design perspective, shouldn't a MessagePublisher
rather offer a publish
method without a handler
parameter (and use a produce
method to delegate to)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but the problem with this approach would be, that the MessagePublisher
and MessageSender
could not be used indistinguishable, as done in the various reader-actors.
This way a reader only has to depend on a MessageProducer
and the mixed-in implementation will decide if it is a Publisher
or a Sender
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case you may want to abstract over message delivery somewhere else (e.g. on reader-level) rather than here. Completing an asynchronous handler where the return from a from a method call is sufficient is a code/design smell IMO.
trait MessageSender extends MessageProducer { | ||
override protected def produce[A](address: String, msg: Any, deliveryOptions: DeliveryOptions, handler: Handler[AsyncResult[Message[A]]]): Unit = { | ||
eventBus().send(address, msg, deliveryOptions, handler) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send
method here? (see also https://github.com/RBMHTechnology/eventuate/pull/310/files#r79381579)
|
||
def vertx: Vertx | ||
|
||
protected def produce[A](address: String, msg: Any, deliveryOptions: DeliveryOptions, handler: Handler[AsyncResult[Message[A]]]): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're trying to abstract over point-to-point and pub-sub messaging here. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case I would say yes, because from a sender point-of-view a publish can be seen as a send with an immediate response.
This enables generic handling of message-sending in the VertxReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I consider a response semantically different from a send-acknowledgement. Responses are communicated via an asynchronous result handler, send/publish acknowledgements by returning from the send/publish method.
} | ||
} | ||
|
||
trait MessageDelivery extends MessageProducer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reasons of symmetry to the adapter configuration, MessageDelivery
sub-traits should only extend the behavior of MessageSender
. At-most-once semantics is implied for MessagePublisher
. This is also related to https://github.com/RBMHTechnology/eventuate/pull/310/files#r79381579
Some thoughts regarding naming: shouldn't we rather use
This would make declarations like trait VertxReader[R, W] extends EventsourcedWriter[R, W] with ... less confusing. WDYT? |
vertx.eventBus().publish(address, evt) | ||
} | ||
|
||
trait VertxEventSender extends VertxEventPublisher with EventSender { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extends VertxEventProducer
?
} | ||
|
||
private[eventuate] class VertxSender(val id: String, val eventLog: ActorRef, val endpointRouter: EndpointRouter, val vertx: Vertx, val storageProvider: StorageProvider) | ||
extends VertxEventDispatcher[Long, Long] with AtMostOnceDelivery with VertxEventSender with SequenceNumberProgressStore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AtMostOnceDelivery
causes the VertxEventDispatcher
to publish on the Vert.x bus. The VertxEventSender.send()
is never used here. Furthermore, when applying my previous change you anyway must use VertxEventPublisher
here for making the code compile.
The problem is that the *Delivery
traits are not independent of the VertxEvent*
traits. Maybe you design the *Delivery
traits in a way that they are only applicable to VertxEventSender
. At-least-once delivery for VertxEventPublisher
would/should be implied then.
if (invalid.isEmpty) | ||
Right(configs) | ||
else | ||
Left(s"Ambigious definition for adapter(s) [${invalid.keys.map(id => s"'$id'").mkString(", ")}] given. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ambiguous
Props(new LogWriter(id, eventLog)) | ||
} | ||
|
||
private[vertx] class LogWriter(val id: String, val eventLog: ActorRef) extends EventsourcedActor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actor might take a long time to replay events. Either safe periodically empty snapshots to reduce recovery time or use this new feature to start replay from a custom sequence number. In the latter case, the sequence number must be managed elsewhere.
Awesome contribution @cstub 👍 LGTM after adding API and user documentation. |
b39d7a9
to
21064cb
Compare
import scala.concurrent.{ ExecutionContext, Future, Promise } | ||
|
||
/** | ||
* A storage provider is used to persist the replication progress of individual `event producers`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replication progress -> log read progress?
* Specifies that this `point-to-point event producer` should use `At-Least-Once` delivery semantics. | ||
* | ||
* @param confirmationType Confirmation type - can either be `Batch` or `Single`. | ||
* @param confirmationTimeout Timeout after which events should be redelivered if not confirmation was received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... if no confirmation ...
|
||
/** | ||
* Sets the destination event log all received events are persisted to. | ||
* An optional filter function can be supplied. Only events passing the function are persisted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
passing the filter ...
- **Vert.x event producers** consume events from an event log and publish or send the events to a configurable event bus endpoint. | ||
- **Log event producers** consume events from a given event bus endpoint and persist the events in an event log. | ||
|
||
An event producer establishes a unidirectional connection between exactly one event log and one or multiple event bus endpoints. Event producers are instantiated by using the ``EventProducer`` factory methods. The configuration of a producer consists of: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... are instantiated by using the EventProducer factory methods ...
... are configured by using the EventProducer API?
.. includecode:: ../../../eventuate-example-vertx/src/main/java/com/rbmhtechnology/docs/vertx/japi/Documentation.java | ||
:snippet: vertx-event-producer | ||
|
||
*Log producers* are created by using the ``EventProducer.fromEndpoints`` method. Multiple event bus endpoints can be defined, which are used to consume events from the event bus and persist the same events to the given event log. Events can be filtered by defining a filter function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either include an example of a filter function or mention that it is not shown in the example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... or include a hint that event producers are covered in more detail further below (linking to the detail section)
:snippet: event-processing-log-producer | ||
|
||
.. note:: | ||
Event processing in event bus handlers should be performed idempotent because a Vert.x producer may deliver the same event multiple times under certain conditions. Events may be redelivered after a restart of a producer if it was not able to successfully persist its replication progress on shutdown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... its replication progress on shutdown
its read progress on shutdown (or crash)
:snippet: adapter-example | ||
|
||
.. warning:: | ||
The ``start`` method should only be called once all handlers on the event bus have been registered. Failing to do so may lead to loss of events because an producer might try to deliver events to an event bus endpoint which has not yet an event handler assigned to it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should only be called once all handlers
called after all handlers
an producer
a producer
Vert.x Publish Event Producer | ||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
||
A *Publish Event Producer* publishes events from an event log to *multiple* subscribers on the event bus. Events are delivered to specific endpoints defined in the configuration of the producer. A producer can route events to different event bus endpoints based on the content of the event. Event routing is enabled by supplying a partial function which maps events to event bus endpoints. If no endpoint is returned by the function the event will not be processed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If no endpoint is returned by the function
if the partial function is not defined at the event ...
.. includecode:: ../../../eventuate-example-vertx/src/main/java/com/rbmhtechnology/docs/vertx/japi/Documentation.java | ||
:snippet: vertx-publish-producer | ||
|
||
Event delivery is always performed with *At-Most-Once* delivery semantics, so no guarantees about the successful delivery of events can be made. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Event publishing is performed with At-Most-Once delivery semantics.
|
||
Event delivery is always performed with *At-Most-Once* delivery semantics, so no guarantees about the successful delivery of events can be made. | ||
|
||
Applications consume events by registering an event bus handler at the configured endpoints. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these called event bus handler and not just event handler?
.. includecode:: ../../../eventuate-example-vertx/src/main/java/com/rbmhtechnology/docs/vertx/japi/Documentation.java | ||
:snippet: event-processing-vertx-producer | ||
|
||
Replication progress from the source event log is tracked by persisting the ``localSequenceNr`` of the latest delivered event to the ``StorageProvider`` supplied to the ``VertxAdapter``. After delivery of one or multiple events the replication progress is persisted. The producer continues delivery of events from the latest known ``localSequenceNr`` once the it is started. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replication progress
read progress
latest delivered event
The latest sent event
Delivery usually means delivery to the consumer but in this context you can only reason about successful sending/publishing to the event bus. So other occurrences of deliver... should be replaced as well here.
.. includecode:: ../../../eventuate-example-vertx/src/main/java/com/rbmhtechnology/docs/vertx/japi/Documentation.java | ||
:snippet: vertx-ptp-producer-at-least-once | ||
|
||
Events sent by a point-to-point event producer are received by registering an event handler on the event bus. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha! Here's the event handler 😃
|
||
Events sent by a point-to-point event producer are received by registering an event handler on the event bus. | ||
|
||
Using *At-Least-Once* delivery semantics, every event must be confirmed by the receiver. Unconfirmed events are redelivered until a confirmation was received by the adapter. Event handlers signal event confirmation by replying to the event bus message with an arbitrary value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Event handlers signal event confirmation
Event handlers confirm event delivery
with an arbitrary value
Why that? Why not ACK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The adapter does not process the returned value, so the handler can send any value.
We could define a case object ACK
which can be sent? ok for you?
Using per-event confirmations, every confirmation received by the adapter is persisted to the source event log. Confirmation events are not delivered to any event bus handlers but will increase the size of the source event log. With this confirmation mode events will not be redelivered once an event confirmation has been received. | ||
|
||
- **Batch event confirmations**: | ||
Using batch confirmations, events are delivered in batches where the next batch is only delivered once all events of the previous batch have been confirmed. Batches containing events which have not been confirmed are redelivered as a whole, resulting in redelivery of all events of the same batch. This approach leads to modest storage requirements as no dedicated confirmation information has to be tracked. Using this confirmation mode, events may be redelivered multiple times even though a confirmation has already been received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no dedicated confirmation information
no individual per-event confirmation (?)
Log Event Producer | ||
~~~~~~~~~~~~~~~~~~ | ||
|
||
A *Log Event Producer* consumes events from multiple event bus endpoints and persists the same events to a single event log. Every persisted event creates a write confirmation which is returned to the sender of the event, containing the result of the write operation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
persists the same events
persists these events
@cstub very good documentation! After having fixed the build failures, please squash the commits. Ready for merge to master I think 😃. |
The Vert.x Adapter offers various facilities to connect Eventuate event-logs to the Vert.x event-bus. Events can be read from an event-log and published to the Vert.x event-bus. Furthermore events may be read from the Vert.x event-bus and persisted to an event-log. - implement an adapter-system which is provided with adapter-configurations and an instance of Vert.x - implement a publish-adapter which reads events from an event-log and publishes them to all consumers on the Vert.x event-bus - implement a send-adapter which reads events from an event-log and sends them to a single consumer on the Vert.x event-bus - provide at-least-once-delivery semantics for the send-adapter with single confirmations or batched confirmations - implement a write-adapter which reads events from the Vert.x event-bus and persists them to an event-log - closes #290
983af11
to
0bb2786
Compare
The Vert.x Adapter offers various facilities to connect Eventuate event-logs to the Vert.x event-bus. Events can be read from an event-log and published to the Vert.x event-bus. Furthermore events may be read from the Vert.x event-bus and persisted to an event-log.