Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Vert.x Adapter #310

Merged
merged 1 commit into from
Oct 7, 2016
Merged

Vert.x Adapter #310

merged 1 commit into from
Oct 7, 2016

Conversation

cstub
Copy link
Contributor

@cstub cstub commented Sep 13, 2016

The Vert.x Adapter offers various facilities to connect Eventuate event-logs to the Vert.x event-bus. Events can be read from an event-log and published to the Vert.x event-bus. Furthermore events may be read from the Vert.x event-bus and persisted to an event-log.

  • implement an adapter-system which is provided with adapter-configurations and an instance of Vert.x
  • implement a publish-adapter which reads events from an event-log and publishes them to all consumers on the Vert.x event-bus
  • implement a send-adapter which reads events from an event-log and sends them to a single consumer on the Vert.x event-bus
  • provide at-least-once-delivery semantics for the send-adapter with single confirmations or batched confirmations
  • implement a write-adapter which reads events from the Vert.x event-bus and persists them to an event-log
  • closes Vert.x Adapter #290

@krasserm
Copy link
Contributor

When compiling I'm getting lots of

[warn] Class io.vertx.codegen.annotations.VertxGen not found - continuing with a stub.
[warn] Class io.vertx.codegen.annotations.GenIgnore not found - continuing with a stub.
[warn] Class io.vertx.codegen.annotations.GenIgnore not found - continuing with a stub.
[warn] Class io.vertx.codegen.annotations.GenIgnore not found - continuing with a stub.
[warn] Class io.vertx.codegen.annotations.GenIgnore not found - continuing with a stub.
[warn] Class io.vertx.codegen.annotations.Fluent not found - continuing with a stub.
[warn] Class io.vertx.codegen.annotations.Fluent not found - continuing with a stub.
[warn] Class io.vertx.codegen.annotations.GenIgnore not found - continuing with a stub.
[warn] Class io.vertx.codegen.annotations.Fluent not found - continuing with a stub.
[warn] Class io.vertx.codegen.annotations.GenIgnore not found - continuing with a stub.
[warn] Class io.vertx.codegen.annotations.Fluent not found - continuing with a stub.
[warn] Class io.vertx.codegen.annotations.Fluent not found - continuing with a stub.
...

How shall we deal with it?

@krasserm
Copy link
Contributor

Please add this adapter to the build config.

@@ -22,6 +22,8 @@ object ProjectDependencyVersions {
val Log4jVersion = "2.5"
val ProtobufVersion = "2.5.0"
val SparkVersion = "1.6.1"
val VertxVersion = "3.0.0"
val ExampleVertxVersion = "3.3.2"
Copy link
Contributor

@krasserm krasserm Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why different Vert.x versions for the adapter and the example?

Copy link
Contributor Author

@cstub cstub Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose different versions because the Adapter is compatible with Vert.x versions > 3.0.0, but version 3.0.0 has some logging issues and because of that the output in the examples was messed up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that 3.3.2 is backwards-compatible to 3.0.0, why not depending on 3.3.2 in the adapter then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is backwards compatible, but using 3.3.2 in the adapter also has the risk of using APIs not available in Vert.x 3.0.0, which would break the compatibility of the adapter, wouldn't it?

Copy link
Contributor

@krasserm krasserm Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that only an issue when using a provided configuration? If we force applications to use 3.3.2 this should be ok, unless there are breaking changes in Vert.x 3.x minor versions. Is this the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The adapter should use Vert.x 3.0.0 as a provided dependency to not force the user into a specific Vert.x version because of breaking changes between minor versions (see https://github.com/vert-x3/wiki/wiki/3.1.0---Breaking-changes).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see ...

@krasserm
Copy link
Contributor

build.sbt seems to break the diff view on github, hence my comments here:

  • dependsOn(logLeveldb % "it->it") should be enough, the Cassandra dependencies can be removed
  • The Netty dependency of Akka Remote must not be excluded as it depends on Netty 3 which cannot be replaced by Netty 4. I wonder if it is a problem at all to have both Netty 3 and 4 on the classpath, as Netty 3 is in the org.jboss.netty package and Netty 4 in the io.netty package.
  • The multi-jvm configs/dependencies can be removed as well unless you plan to make multi-jvm tests (see also the Akka Streams adapter configuration for a more lightweight example)

val writeAdapters: Seq[VertxWriteAdapterConfig] =
configurations.collect({ case c: VertxWriteAdapterConfig => c })

def addAdapter(first: VertxAdapterConfig, rest: VertxAdapterConfig*): VertxAdapterSystemConfig =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't a method with name addAdapter have only a single VertxAdapterConfig parameter? All usage examples also don't use the rest vararg parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - I will rename the varags-version to addAdapters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not removing the rest parameter instead? They're never used in the example

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right - the varargs version is not necessary, I will remove the parameter.

configurations.collect({ case c: VertxWriteAdapterConfig => c })

def addAdapter(first: VertxAdapterConfig, rest: VertxAdapterConfig*): VertxAdapterSystemConfig =
addAdapters(first +: rest.toVector)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy(onfigurations = onfigurations :+ first) or is ordering significant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, ordering is not significant in this case, but does appending the element have any advantages?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, shoudn't make a difference

}

class VertxAdapterSystemConfig(private[vertx] val configurations: Seq[VertxAdapterConfig],
private[vertx] val codecClasses: Seq[Class[_]]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why private[vertx]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to expose the constructor as part of the public API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

else
invalid.map(c => s"Ambigious definition for adapter with id '${c._1}' given. An id may only be used once.")
.toVector
.left
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generates n error messages that only differ in ${c._1}. Why not just collecting keys with more than one config and create a single error message? What about using just Either instead of Scalaz's \/?

case class VertxSendAdapterConfig(id: String, log: ActorRef, endpointRouter: VertxEndpointRouter, deliveryMode: DeliveryMode) extends VertxReadAdapterConfig
case class VertxWriteAdapterConfig(id: String, log: ActorRef, endpoints: Seq[String], filter: PartialFunction[Any, Boolean]) extends VertxAdapterConfig

sealed trait ConfirmationType {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{} can be removed

case object Single extends ConfirmationType
case class Batch(size: Int) extends ConfirmationType

sealed trait DeliveryMode {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{} can be removed

import org.scalatest.{BeforeAndAfterEach, Suite}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some imports are not used here


class AkkaSerializationMessageCodec(override val name: String)(implicit system: ActorSystem) extends MessageCodec[AnyRef, AnyRef] {

val serializer = PayloadSerializationExtension(system)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid creating your own serialization extension by using Akka's SerializationExtension and defining a payload wrapper (preferrably using protobuf) so that you know the class when deserializing with Serialization.deserialize[T](bytes: Array[Byte], clazz: Class[T]).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a special reason not implementing like that?

Copy link
Contributor Author

@cstub cstub Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to reuse the CommonSerializer as it already uses protobuf to serialize the the payload, but this implementation needen an ExtendedActorSystem which was only available within an Extension

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can access an ExtendedActorSystem from Serialization.

override def decodeFromWire(pos: Int, buffer: Buffer): AnyRef = {
val payloadLength = buffer.getInt(pos)
val payload = buffer.getBytes(pos + Integer.BYTES, pos + Integer.BYTES + payloadLength)
serializer.fromBinary(payload).asInstanceOf[AnyRef]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low level position arithmetic can be avoided with a protobuf-defined payload wrapper.

Copy link
Contributor Author

@cstub cstub Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the official Vert.x examples the payload length must be stored in the buffer to recover the part of the buffer that contains the payload in decodeFromWire.


class PayloadSerializationExtension(system: ExtendedActorSystem) extends Extension {

val serializer = new CommonSerializer(system)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When implementing the previously mentioned approach, the serializer can be moved to AkkaSerializationMessageCodec

vertx.eventBus().registerDefaultCodec(c.asInstanceOf[Class[AnyRef]], AkkaSerializationMessageCodec(c))
} catch {
case e: IllegalStateException =>
system.log.warning(s"An adapter codec for class ${c.getName} was configured, even though a default codec was already registered for this class.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we propagate this exception to the application?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I remember we said that trying to override an existing message-codec-binding should not result in an error but in a warning.

val handler = (msg: Message[Any]) => {
routes.foreach { route =>
if (route.filter.applyOrElse(msg.body(), (_: Any) => false)) {
route.destinationLog ! PersistMessage(msg.body(), msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not having only a single Message parameter for PersistMessage and VertxWriterextracts the event with msg.body?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, a single parameter would suffice - I will change that

if (route.filter.applyOrElse(msg.body(), (_: Any) => false)) {
route.destinationLog ! PersistMessage(msg.body(), msg)
} else {
msg.reply(msg.body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would an ACK be sufficient (here and in VertxWriter)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about returning a processing-state depending on whether the event was persisted in the log or filtered? It might be useful information for the caller and should not have a big serialization/deserialization overhead since it can be modelled as an enum.


private def installMessageConsumer(endpoint: String, routes: Seq[Route]): MessageConsumer[Any] = {
val handler = (msg: Message[Any]) => {
routes.foreach { route =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A message should have only a single reply. Allowing n routes here may erroneously lead to n replies. An endpoint should be restricted to publish to exactly one log.


object VertxWriteRouter {

case class Route(sourceEndpoint: String, destinationLog: ActorRef, filter: PartialFunction[Any, Boolean])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename destinationLog to writer?

protected def produce[A](address: String, msg: Any, deliveryOptions: DeliveryOptions, handler: Handler[AsyncResult[Message[A]]]): Unit

def produce(address: String, msg: Any): Unit = {
produce[Unit](address, msg, new DeliveryOptions(), ((ar: AsyncResult[Message[Unit]]) => {}).asVertxHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Equivalent DeliveryOptions and handler objects are created with every call. Define them in an outer scope? OTOH I'm not sure if the compiler is clever enough to detect that and optimize.

produce[Unit](address, msg, new DeliveryOptions(), ((ar: AsyncResult[Message[Unit]]) => {}).asVertxHandler)
}

def produce[A](address: String, msg: Any, timeout: FiniteDuration = 30.seconds)(implicit ec: ExecutionContext): Future[A] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make default timeout configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the default timeout altogether because it is not used anyway.

override protected def produce[A](address: String, msg: Any, deliveryOptions: DeliveryOptions, handler: Handler[AsyncResult[Message[A]]]): Unit = {
eventBus().publish(address, msg, deliveryOptions)
handler.handle(VertxFuture.succeededFuture(new MessageImpl[Any, A]()))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a design perspective, shouldn't a MessagePublisher rather offer a publish method without a handler parameter (and use a produce method to delegate to)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the problem with this approach would be, that the MessagePublisher and MessageSender could not be used indistinguishable, as done in the various reader-actors.

This way a reader only has to depend on a MessageProducer and the mixed-in implementation will decide if it is a Publisher or a Sender

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case you may want to abstract over message delivery somewhere else (e.g. on reader-level) rather than here. Completing an asynchronous handler where the return from a from a method call is sufficient is a code/design smell IMO.

trait MessageSender extends MessageProducer {
override protected def produce[A](address: String, msg: Any, deliveryOptions: DeliveryOptions, handler: Handler[AsyncResult[Message[A]]]): Unit = {
eventBus().send(address, msg, deliveryOptions, handler)
}
Copy link
Contributor

@krasserm krasserm Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def vertx: Vertx

protected def produce[A](address: String, msg: Any, deliveryOptions: DeliveryOptions, handler: Handler[AsyncResult[Message[A]]]): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're trying to abstract over point-to-point and pub-sub messaging here. Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I would say yes, because from a sender point-of-view a publish can be seen as a send with an immediate response.
This enables generic handling of message-sending in the VertxReader

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider a response semantically different from a send-acknowledgement. Responses are communicated via an asynchronous result handler, send/publish acknowledgements by returning from the send/publish method.

}
}

trait MessageDelivery extends MessageProducer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reasons of symmetry to the adapter configuration, MessageDelivery sub-traits should only extend the behavior of MessageSender. At-most-once semantics is implied for MessagePublisher. This is also related to https://github.com/RBMHTechnology/eventuate/pull/310/files#r79381579

@krasserm
Copy link
Contributor

Some thoughts regarding naming: shouldn't we rather use

  • bus writer instead of vert.x reader
  • log writer instead of vert.x writer

This would make declarations like

trait VertxReader[R, W] extends EventsourcedWriter[R, W] with ...

less confusing. WDYT?

vertx.eventBus().publish(address, evt)
}

trait VertxEventSender extends VertxEventPublisher with EventSender {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extends VertxEventProducer?

}

private[eventuate] class VertxSender(val id: String, val eventLog: ActorRef, val endpointRouter: EndpointRouter, val vertx: Vertx, val storageProvider: StorageProvider)
extends VertxEventDispatcher[Long, Long] with AtMostOnceDelivery with VertxEventSender with SequenceNumberProgressStore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AtMostOnceDelivery causes the VertxEventDispatcher to publish on the Vert.x bus. The VertxEventSender.send() is never used here. Furthermore, when applying my previous change you anyway must use VertxEventPublisher here for making the code compile.

The problem is that the *Delivery traits are not independent of the VertxEvent* traits. Maybe you design the *Delivery traits in a way that they are only applicable to VertxEventSender. At-least-once delivery for VertxEventPublisher would/should be implied then.

if (invalid.isEmpty)
Right(configs)
else
Left(s"Ambigious definition for adapter(s) [${invalid.keys.map(id => s"'$id'").mkString(", ")}] given. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ambiguous

Props(new LogWriter(id, eventLog))
}

private[vertx] class LogWriter(val id: String, val eventLog: ActorRef) extends EventsourcedActor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actor might take a long time to replay events. Either safe periodically empty snapshots to reduce recovery time or use this new feature to start replay from a custom sequence number. In the latter case, the sequence number must be managed elsewhere.

@krasserm
Copy link
Contributor

Awesome contribution @cstub 👍 LGTM after adding API and user documentation.

@cstub cstub force-pushed the wip-290-vertx-adapter branch 4 times, most recently from b39d7a9 to 21064cb Compare October 3, 2016 06:36
import scala.concurrent.{ ExecutionContext, Future, Promise }

/**
* A storage provider is used to persist the replication progress of individual `event producers`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replication progress -> log read progress?

* Specifies that this `point-to-point event producer` should use `At-Least-Once` delivery semantics.
*
* @param confirmationType Confirmation type - can either be `Batch` or `Single`.
* @param confirmationTimeout Timeout after which events should be redelivered if not confirmation was received.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... if no confirmation ...


/**
* Sets the destination event log all received events are persisted to.
* An optional filter function can be supplied. Only events passing the function are persisted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing the filter ...

- **Vert.x event producers** consume events from an event log and publish or send the events to a configurable event bus endpoint.
- **Log event producers** consume events from a given event bus endpoint and persist the events in an event log.

An event producer establishes a unidirectional connection between exactly one event log and one or multiple event bus endpoints. Event producers are instantiated by using the ``EventProducer`` factory methods. The configuration of a producer consists of:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... are instantiated by using the EventProducer factory methods ...

... are configured by using the EventProducer API?

.. includecode:: ../../../eventuate-example-vertx/src/main/java/com/rbmhtechnology/docs/vertx/japi/Documentation.java
:snippet: vertx-event-producer

*Log producers* are created by using the ``EventProducer.fromEndpoints`` method. Multiple event bus endpoints can be defined, which are used to consume events from the event bus and persist the same events to the given event log. Events can be filtered by defining a filter function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either include an example of a filter function or mention that it is not shown in the example

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... or include a hint that event producers are covered in more detail further below (linking to the detail section)

:snippet: event-processing-log-producer

.. note::
Event processing in event bus handlers should be performed idempotent because a Vert.x producer may deliver the same event multiple times under certain conditions. Events may be redelivered after a restart of a producer if it was not able to successfully persist its replication progress on shutdown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... its replication progress on shutdown

its read progress on shutdown (or crash)

:snippet: adapter-example

.. warning::
The ``start`` method should only be called once all handlers on the event bus have been registered. Failing to do so may lead to loss of events because an producer might try to deliver events to an event bus endpoint which has not yet an event handler assigned to it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should only be called once all handlers

called after all handlers

an producer

a producer

Vert.x Publish Event Producer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

A *Publish Event Producer* publishes events from an event log to *multiple* subscribers on the event bus. Events are delivered to specific endpoints defined in the configuration of the producer. A producer can route events to different event bus endpoints based on the content of the event. Event routing is enabled by supplying a partial function which maps events to event bus endpoints. If no endpoint is returned by the function the event will not be processed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no endpoint is returned by the function

if the partial function is not defined at the event ...

.. includecode:: ../../../eventuate-example-vertx/src/main/java/com/rbmhtechnology/docs/vertx/japi/Documentation.java
:snippet: vertx-publish-producer

Event delivery is always performed with *At-Most-Once* delivery semantics, so no guarantees about the successful delivery of events can be made.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event publishing is performed with At-Most-Once delivery semantics.


Event delivery is always performed with *At-Most-Once* delivery semantics, so no guarantees about the successful delivery of events can be made.

Applications consume events by registering an event bus handler at the configured endpoints.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these called event bus handler and not just event handler?

.. includecode:: ../../../eventuate-example-vertx/src/main/java/com/rbmhtechnology/docs/vertx/japi/Documentation.java
:snippet: event-processing-vertx-producer

Replication progress from the source event log is tracked by persisting the ``localSequenceNr`` of the latest delivered event to the ``StorageProvider`` supplied to the ``VertxAdapter``. After delivery of one or multiple events the replication progress is persisted. The producer continues delivery of events from the latest known ``localSequenceNr`` once the it is started.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replication progress

read progress

latest delivered event

The latest sent event

Delivery usually means delivery to the consumer but in this context you can only reason about successful sending/publishing to the event bus. So other occurrences of deliver... should be replaced as well here.

.. includecode:: ../../../eventuate-example-vertx/src/main/java/com/rbmhtechnology/docs/vertx/japi/Documentation.java
:snippet: vertx-ptp-producer-at-least-once

Events sent by a point-to-point event producer are received by registering an event handler on the event bus.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha! Here's the event handler 😃


Events sent by a point-to-point event producer are received by registering an event handler on the event bus.

Using *At-Least-Once* delivery semantics, every event must be confirmed by the receiver. Unconfirmed events are redelivered until a confirmation was received by the adapter. Event handlers signal event confirmation by replying to the event bus message with an arbitrary value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event handlers signal event confirmation

Event handlers confirm event delivery

with an arbitrary value

Why that? Why not ACK?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The adapter does not process the returned value, so the handler can send any value.
We could define a case object ACK which can be sent? ok for you?

Using per-event confirmations, every confirmation received by the adapter is persisted to the source event log. Confirmation events are not delivered to any event bus handlers but will increase the size of the source event log. With this confirmation mode events will not be redelivered once an event confirmation has been received.

- **Batch event confirmations**:
Using batch confirmations, events are delivered in batches where the next batch is only delivered once all events of the previous batch have been confirmed. Batches containing events which have not been confirmed are redelivered as a whole, resulting in redelivery of all events of the same batch. This approach leads to modest storage requirements as no dedicated confirmation information has to be tracked. Using this confirmation mode, events may be redelivered multiple times even though a confirmation has already been received.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no dedicated confirmation information

no individual per-event confirmation (?)

Log Event Producer
~~~~~~~~~~~~~~~~~~

A *Log Event Producer* consumes events from multiple event bus endpoints and persists the same events to a single event log. Every persisted event creates a write confirmation which is returned to the sender of the event, containing the result of the write operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

persists the same events

persists these events

@krasserm
Copy link
Contributor

krasserm commented Oct 4, 2016

@cstub very good documentation! After having fixed the build failures, please squash the commits. Ready for merge to master I think 😃.

The Vert.x Adapter offers various facilities to connect Eventuate event-logs to the Vert.x event-bus. Events can be read from an event-log and published to the Vert.x event-bus. Furthermore events may be read from the Vert.x event-bus and persisted to an event-log.

- implement an adapter-system which is provided with adapter-configurations and an instance of Vert.x
- implement a publish-adapter which reads events from an event-log and publishes them to all consumers on the Vert.x event-bus
- implement a send-adapter which reads events from an event-log and sends them to a single consumer on the Vert.x event-bus
- provide at-least-once-delivery semantics for the send-adapter with single confirmations or batched confirmations
- implement a write-adapter which reads events from the Vert.x event-bus and persists them to an event-log

- closes #290
@cstub cstub force-pushed the wip-290-vertx-adapter branch from 983af11 to 0bb2786 Compare October 6, 2016 08:00
@krasserm krasserm merged commit 97a8ab7 into master Oct 7, 2016
@krasserm krasserm deleted the wip-290-vertx-adapter branch October 7, 2016 12:02
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Vert.x Adapter
2 participants