From bd09f3cfa07d4c57990c652a2e6cd09cfefe4905 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Mon, 16 Nov 2015 05:58:26 +0100 Subject: [PATCH] Event-sourced writer and processor - event-sourced writer for implementing persistent views - event-sourced writer example application - event-sourced processor as specialization of writer - event-sourced processor (stateful and stateless) - event-sourced processor can also emit to source log - unified event emission rules of event-sourced actor and processor Futher enhancements - batching layer for replication writes (from processors and replicators) - system timestamp only set by event log (and not event-sourced actor) Other - example applications moved to subpackes ordermgnt and querydb - closes #100 - closes #136 --- .gitignore | 2 +- build.sbt | 2 +- example-location | 4 +- ...EventsourcedProcessorIntegrationSpec.scala | 250 +++++++++++++++++ .../eventuate/log/EventLogLifecycle.scala | 3 + .../eventuate/log/EventLogSpec.scala | 52 +++- src/main/resources/reference.conf | 8 + .../eventuate/ConfirmedDelivery.scala | 8 +- .../eventuate/DurableEvent.scala | 30 +- .../eventuate/EventsourcedActor.scala | 21 -- .../eventuate/EventsourcedProcessor.scala | 172 ++++++++++++ .../eventuate/EventsourcedView.scala | 101 +++++-- .../eventuate/EventsourcedWriter.scala | 219 +++++++++++++++ .../eventuate/EventsourcingProtocol.scala | 8 +- .../rbmhtechnology/eventuate/Recovery.scala | 2 +- .../eventuate/ReplicationProtocol.scala | 12 +- .../eventuate/crdt/CRDTService.scala | 4 +- .../eventuate/log/BatchingEventLog.scala | 73 ++++- .../eventuate/log/NotificationChannel.scala | 2 +- .../eventuate/log/SubscriberRegistry.scala | 5 +- .../eventuate/log/TimeTracker.scala | 34 ++- .../log/cassandra/CassandraEventLog.scala | 134 ++++----- .../log/cassandra/CassandraEventReader.scala | 2 +- .../log/leveldb/LeveldbEventLog.scala | 93 ++++--- src/sphinx/architecture.rst | 79 +++++- src/sphinx/code/EventSourcingDoc.scala | 30 ++ src/sphinx/example-application.rst | 6 +- src/sphinx/introduction.rst | 7 +- src/sphinx/reference/event-log.rst | 2 +- src/sphinx/reference/event-sourcing.rst | 119 +++++++- src/sphinx/user-guide.rst | 7 + .../example/{ => ordermgnt}/japi/Order.java | 2 +- .../{ => ordermgnt}/japi/OrderActor.java | 3 +- .../{ => ordermgnt}/japi/OrderExample.java | 8 +- .../example/{ => ordermgnt}/japi/OrderId.java | 2 +- .../{ => ordermgnt}/japi/OrderManager.java | 4 +- .../{ => ordermgnt}/japi/OrderSerializer.java | 2 +- .../{ => ordermgnt}/japi/OrderView.java | 4 +- src/test/resources/location-A.conf | 4 +- src/test/resources/location-B.conf | 4 +- src/test/resources/location-C.conf | 4 +- src/test/resources/location-D.conf | 4 +- src/test/resources/location-E.conf | 4 +- src/test/resources/location-F.conf | 4 +- .../eventuate/EventsourcedProcessorSpec.scala | 241 ++++++++++++++++ .../eventuate/EventsourcedWriterSpec.scala | 261 ++++++++++++++++++ .../example/{ => ordermgnt}/Order.scala | 2 +- .../example/{ => ordermgnt}/OrderActor.scala | 2 +- .../{ => ordermgnt}/OrderExample.scala | 2 +- .../{ => ordermgnt}/OrderManager.scala | 2 +- .../example/{ => ordermgnt}/OrderView.scala | 2 +- .../example/{ => ordermgnt}/package.scala | 4 +- .../example/querydb/Emitter.scala | 70 +++++ .../example/querydb/Writer.scala | 110 ++++++++ .../example/querydb/WriterApp.scala | 93 +++++++ 55 files changed, 2041 insertions(+), 288 deletions(-) create mode 100644 src/it/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorIntegrationSpec.scala create mode 100644 src/main/scala/com/rbmhtechnology/eventuate/EventsourcedProcessor.scala create mode 100644 src/main/scala/com/rbmhtechnology/eventuate/EventsourcedWriter.scala rename src/test/java/com/rbmhtechnology/example/{ => ordermgnt}/japi/Order.java (97%) rename src/test/java/com/rbmhtechnology/example/{ => ordermgnt}/japi/OrderActor.java (99%) rename src/test/java/com/rbmhtechnology/example/{ => ordermgnt}/japi/OrderExample.java (96%) rename src/test/java/com/rbmhtechnology/example/{ => ordermgnt}/japi/OrderId.java (94%) rename src/test/java/com/rbmhtechnology/example/{ => ordermgnt}/japi/OrderManager.java (97%) rename src/test/java/com/rbmhtechnology/example/{ => ordermgnt}/japi/OrderSerializer.java (97%) rename src/test/java/com/rbmhtechnology/example/{ => ordermgnt}/japi/OrderView.java (95%) create mode 100644 src/test/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorSpec.scala create mode 100644 src/test/scala/com/rbmhtechnology/eventuate/EventsourcedWriterSpec.scala rename src/test/scala/com/rbmhtechnology/example/{ => ordermgnt}/Order.scala (96%) rename src/test/scala/com/rbmhtechnology/example/{ => ordermgnt}/OrderActor.scala (99%) rename src/test/scala/com/rbmhtechnology/example/{ => ordermgnt}/OrderExample.scala (98%) rename src/test/scala/com/rbmhtechnology/example/{ => ordermgnt}/OrderManager.scala (98%) rename src/test/scala/com/rbmhtechnology/example/{ => ordermgnt}/OrderView.scala (97%) rename src/test/scala/com/rbmhtechnology/example/{ => ordermgnt}/package.scala (94%) create mode 100644 src/test/scala/com/rbmhtechnology/example/querydb/Emitter.scala create mode 100644 src/test/scala/com/rbmhtechnology/example/querydb/Writer.scala create mode 100644 src/test/scala/com/rbmhtechnology/example/querydb/WriterApp.scala diff --git a/.gitignore b/.gitignore index be431cb1..799d0eba 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,4 @@ *.iml *.env target/ -/.example-classpath +.example-classpath diff --git a/build.sbt b/build.sbt index 3066473d..b746c439 100644 --- a/build.sbt +++ b/build.sbt @@ -142,7 +142,7 @@ exampleClasspath := { val fileName = ".example-classpath" val file = Paths.get(fileName) - Files.write(file, output.getBytes, StandardOpenOption.TRUNCATE_EXISTING) + Files.write(file, output.getBytes, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING) file.toFile.setExecutable(true) } diff --git a/example-location b/example-location index 703af0fb..e6d5221b 100755 --- a/example-location +++ b/example-location @@ -1,7 +1,7 @@ #!/bin/sh EXAMPLE_MODE=normal -EXAMPLE_MAIN=com.rbmhtechnology.example.OrderExample +EXAMPLE_MAIN=com.rbmhtechnology.example.ordermgnt.OrderExample while [[ $# > 1 ]]; do key="$1" @@ -10,7 +10,7 @@ while [[ $# > 1 ]]; do EXAMPLE_MODE=recover ;; -j|--java) - EXAMPLE_MAIN=com.rbmhtechnology.example.japi.OrderExample + EXAMPLE_MAIN=com.rbmhtechnology.example.ordermgnt.japi.OrderExample ;; *) # unknown option diff --git a/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorIntegrationSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorIntegrationSpec.scala new file mode 100644 index 00000000..84536da9 --- /dev/null +++ b/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorIntegrationSpec.scala @@ -0,0 +1,250 @@ +/* + * Copyright (C) 2015 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.eventuate + +import akka.actor._ +import akka.testkit._ + +import com.rbmhtechnology.eventuate.log.EventLogLifecycleCassandra +import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb + +import org.scalatest._ + +import scala.collection.immutable.Seq +import scala.util._ + +object EventsourcedProcessorIntegrationSpec { + class SampleActor(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedActor { + override val onCommand: Receive = { + case s: String => persist(s) { + case Success(_) => onEvent(s) + case Failure(_) => + } + } + + override val onEvent: Receive = { + case s: String => probe ! ((s, lastVectorTimestamp)) + } + } + + class SampleProcessor(val id: String, val eventLog: ActorRef, val targetEventLog: ActorRef, sharedVectorClockEntry: Boolean, probe: ActorRef) extends EventsourcedProcessor { + override def sharedClockEntry = sharedVectorClockEntry + + override val onCommand: Receive = { + case "boom" => throw boom + case "snap" => save("") { + case Success(_) => probe ! "snapped" + case Failure(_) => + } + } + + override val processEvent: Process = { + case s: String if !s.contains("processed") => + probe ! s + List(s"${s}-processed-1", s"${s}-processed-2") + } + + override val onSnapshot: Receive = { + case _ => + } + } +} + +abstract class EventsourcedProcessorIntegrationSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterEach { + import EventsourcedProcessorIntegrationSpec._ + + def log: ActorRef + def logId: String + def logProps(logId: String): Props + + def sourceLog = log + def sourceLogId = logId + + var targetLog: ActorRef = _ + var targetLogId: String = _ + + var sourceProbe: TestProbe = _ + var targetProbe: TestProbe = _ + var processorProbe: TestProbe = _ + + var a1: ActorRef = _ + var a2: ActorRef = _ + + def init(): Unit = { + targetLogId = s"${logId}_target" + targetLog = system.actorOf(logProps(targetLogId)) + + sourceProbe = TestProbe() + targetProbe = TestProbe() + processorProbe = TestProbe() + + a1 = system.actorOf(Props(new SampleActor("a1", sourceLog, sourceProbe.ref))) + a2 = system.actorOf(Props(new SampleActor("a2", targetLog, targetProbe.ref))) + } + + "A stateful EventsourcedProcessor" must { + "write processed events to a target log and recover from scratch" in { + val p = system.actorOf(Props(new SampleProcessor("p", sourceLog, targetLog, sharedVectorClockEntry = true, processorProbe.ref))) + + a1 ! "a" + a1 ! "b" + a1 ! "c" + + processorProbe.expectMsg("a") + processorProbe.expectMsg("b") + processorProbe.expectMsg("c") + + targetProbe.expectMsg(("a-processed-1", VectorTime(sourceLogId -> 1L, targetLogId -> 1L))) + targetProbe.expectMsg(("a-processed-2", VectorTime(sourceLogId -> 1L, targetLogId -> 2L))) + targetProbe.expectMsg(("b-processed-1", VectorTime(sourceLogId -> 2L, targetLogId -> 3L))) + targetProbe.expectMsg(("b-processed-2", VectorTime(sourceLogId -> 2L, targetLogId -> 4L))) + targetProbe.expectMsg(("c-processed-1", VectorTime(sourceLogId -> 3L, targetLogId -> 5L))) + targetProbe.expectMsg(("c-processed-2", VectorTime(sourceLogId -> 3L, targetLogId -> 6L))) + + p ! "boom" + a1 ! "d" + + processorProbe.expectMsg("d") + + targetProbe.expectMsg(("d-processed-1", VectorTime(sourceLogId -> 4L, targetLogId -> 7L))) + targetProbe.expectMsg(("d-processed-2", VectorTime(sourceLogId -> 4L, targetLogId -> 8L))) + } + "write processed events to a target log and recover from snapshot" in { + val p = system.actorOf(Props(new SampleProcessor("p", sourceLog, targetLog, sharedVectorClockEntry = true, processorProbe.ref))) + + a1 ! "a" + a1 ! "b" + + processorProbe.expectMsg("a") + processorProbe.expectMsg("b") + + p ! "snap" + + processorProbe.expectMsg("snapped") + + a1 ! "c" + + processorProbe.expectMsg("c") + + targetProbe.expectMsg(("a-processed-1", VectorTime(sourceLogId -> 1L, targetLogId -> 1L))) + targetProbe.expectMsg(("a-processed-2", VectorTime(sourceLogId -> 1L, targetLogId -> 2L))) + targetProbe.expectMsg(("b-processed-1", VectorTime(sourceLogId -> 2L, targetLogId -> 3L))) + targetProbe.expectMsg(("b-processed-2", VectorTime(sourceLogId -> 2L, targetLogId -> 4L))) + targetProbe.expectMsg(("c-processed-1", VectorTime(sourceLogId -> 3L, targetLogId -> 5L))) + targetProbe.expectMsg(("c-processed-2", VectorTime(sourceLogId -> 3L, targetLogId -> 6L))) + + p ! "boom" + a1 ! "d" + + processorProbe.expectMsg("d") + + targetProbe.expectMsg(("d-processed-1", VectorTime(sourceLogId -> 4L, targetLogId -> 7L))) + targetProbe.expectMsg(("d-processed-2", VectorTime(sourceLogId -> 4L, targetLogId -> 8L))) + } + "update event vector timestamps when having set sharedClockEntry to false" in { + val p = system.actorOf(Props(new SampleProcessor("p", sourceLog, targetLog, sharedVectorClockEntry = false, processorProbe.ref))) + + a1 ! "a" + a1 ! "b" + a1 ! "c" + + processorProbe.expectMsg("a") + processorProbe.expectMsg("b") + + targetProbe.expectMsg(("a-processed-1", VectorTime(sourceLogId -> 1L, "p" -> 2L))) + targetProbe.expectMsg(("a-processed-2", VectorTime(sourceLogId -> 1L, "p" -> 3L))) + targetProbe.expectMsg(("b-processed-1", VectorTime(sourceLogId -> 2L, "p" -> 5L))) + targetProbe.expectMsg(("b-processed-2", VectorTime(sourceLogId -> 2L, "p" -> 6L))) + } + "be able to write to the source event log" in { + + } + } + + "A stateful EventsourcedProcessor" when { + "writing to the source event log" must { + "have its own vector clock entry" in { + a1 ! "a" + a1 ! "b" + a1 ! "c" + + sourceProbe.expectMsg(("a", VectorTime(sourceLogId -> 1L))) + sourceProbe.expectMsg(("b", VectorTime(sourceLogId -> 2L))) + sourceProbe.expectMsg(("c", VectorTime(sourceLogId -> 3L))) + + val p = system.actorOf(Props(new SampleProcessor("p", sourceLog, sourceLog, sharedVectorClockEntry = false, processorProbe.ref))) + + sourceProbe.expectMsg(("a-processed-1", VectorTime(sourceLogId -> 1L, "p" -> 2L))) + sourceProbe.expectMsg(("a-processed-2", VectorTime(sourceLogId -> 1L, "p" -> 3L))) + sourceProbe.expectMsg(("b-processed-1", VectorTime(sourceLogId -> 2L, "p" -> 5L))) + sourceProbe.expectMsg(("b-processed-2", VectorTime(sourceLogId -> 2L, "p" -> 6L))) + sourceProbe.expectMsg(("c-processed-1", VectorTime(sourceLogId -> 3L, "p" -> 8L))) + sourceProbe.expectMsg(("c-processed-2", VectorTime(sourceLogId -> 3L, "p" -> 9L))) + + p ! "boom" + a1 ! "d" + + sourceProbe.expectMsg(("d", VectorTime(sourceLogId -> 10L, "p" -> 9))) + sourceProbe.expectMsg(("d-processed-1", VectorTime(sourceLogId -> 10L, "p" -> 11L))) + sourceProbe.expectMsg(("d-processed-2", VectorTime(sourceLogId -> 10L, "p" -> 12L))) + } + } + } + + "A stateless EventsourcedProcessor" must { + "write processed events to a target log and resume from stored position" in { + val p = system.actorOf(Props(new SampleProcessor("p", sourceLog, targetLog, sharedVectorClockEntry = true, processorProbe.ref) with StatelessProcessor)) + + a1 ! "a" + a1 ! "b" + a1 ! "c" + + processorProbe.expectMsg("a") + processorProbe.expectMsg("b") + processorProbe.expectMsg("c") + + targetProbe.expectMsg(("a-processed-1", VectorTime(sourceLogId -> 1L, targetLogId -> 1L))) + targetProbe.expectMsg(("a-processed-2", VectorTime(sourceLogId -> 1L, targetLogId -> 2L))) + targetProbe.expectMsg(("b-processed-1", VectorTime(sourceLogId -> 2L, targetLogId -> 3L))) + targetProbe.expectMsg(("b-processed-2", VectorTime(sourceLogId -> 2L, targetLogId -> 4L))) + targetProbe.expectMsg(("c-processed-1", VectorTime(sourceLogId -> 3L, targetLogId -> 5L))) + targetProbe.expectMsg(("c-processed-2", VectorTime(sourceLogId -> 3L, targetLogId -> 6L))) + + p ! "boom" + a1 ! "d" + + processorProbe.expectMsg("d") + + targetProbe.expectMsg(("d-processed-1", VectorTime(sourceLogId -> 4L, targetLogId -> 7L))) + targetProbe.expectMsg(("d-processed-2", VectorTime(sourceLogId -> 4L, targetLogId -> 8L))) + } + } +} + +class EventsourcedProcessorIntegrationSpecLeveldb extends EventsourcedProcessorIntegrationSpec with EventLogLifecycleLeveldb { + override def beforeEach(): Unit = { + super.beforeEach() + init() + } +} + +class EventsourcedProcessorIntegrationSpecCassandra extends EventsourcedProcessorIntegrationSpec with EventLogLifecycleCassandra { + override def beforeEach(): Unit = { + super.beforeEach() + init() + } +} \ No newline at end of file diff --git a/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogLifecycle.scala b/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogLifecycle.scala index 448a4149..d57611b2 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogLifecycle.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogLifecycle.scala @@ -188,6 +188,9 @@ trait EventLogLifecycleCassandra extends EventLogCleanupCassandra with BeforeAnd def logId: String = _logCtr.toString + def logProps(logId: String): Props = + logProps(logId, TestFailureSpec(), system.deadLetters) + def logProps(logId: String, failureSpec: TestFailureSpec, indexProbe: ActorRef): Props = TestEventLog.props(logId, failureSpec, indexProbe, batching) } diff --git a/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogSpec.scala index 213d10cf..c9d14183 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogSpec.scala @@ -531,13 +531,57 @@ abstract class EventLogSpec extends TestKit(ActorSystem("test", EventLogSpec.con log.tell(GetReplicationProgresses, requestorProbe.ref) requestorProbe.expectMsg(GetReplicationProgressesSuccess(Map(EventLogSpec.remoteLogId -> 19L))) } - "set an event's system timestamp if that event's processId is not defined" in { + "update an event's system timestamp" in { log ! Write(List(event("a").copy(systemTimestamp = 3L)), system.deadLetters, requestorProbe.ref, 0) requestorProbe.expectMsgType[WriteSuccess].event.systemTimestamp should be(0L) } - "use an event's system timestamp if that event's processId is defined" in { - log ! Write(List(event("a").copy(systemTimestamp = 3L, processId = "foo")), system.deadLetters, requestorProbe.ref, 0) - requestorProbe.expectMsgType[WriteSuccess].event.systemTimestamp should be(3L) + "update an emitted event's process id and vector timestamp during if the process id is not defined" in { + val evt = DurableEvent("a", emitterIdA, processId = UndefinedLogId) + val exp = DurableEvent("a", emitterIdA, processId = logId, vectorTimestamp = VectorTime(logId -> 1L), localLogId = logId, localSequenceNr = 1) + log ! Write(List(evt), system.deadLetters, requestorProbe.ref, 0) + requestorProbe.expectMsgType[WriteSuccess].event should be(exp) + } + "not update an emitted event's process id and vector timestamp during if the process id is defined" in { + val evt = DurableEvent("a", emitterIdA, processId = emitterIdA, vectorTimestamp = VectorTime(emitterIdA -> 1L)) + val exp = DurableEvent("a", emitterIdA, processId = emitterIdA, vectorTimestamp = VectorTime(emitterIdA -> 1L), localLogId = logId, localSequenceNr = 1) + log ! Write(List(evt), system.deadLetters, requestorProbe.ref, 0) + requestorProbe.expectMsgType[WriteSuccess].event should be(exp) + } + "update a replicated event's process id and vector timestamp during if the process id is not defined" in { + val evt = DurableEvent("a", emitterIdA, processId = UndefinedLogId, vectorTimestamp = VectorTime(remoteLogId -> 1L)) + val exp = DurableEvent("a", emitterIdA, processId = logId, vectorTimestamp = VectorTime(remoteLogId -> 1L, logId -> 1L), localLogId = logId, localSequenceNr = 1) + registerCollaborator(aggregateId = None, collaborator = requestorProbe) + log ! ReplicationWrite(List(evt), remoteLogId, 5, VectorTime()) + requestorProbe.expectMsgType[Written].event should be(exp) + } + "not update a replicated event's process id and vector timestamp during if the process id is defined" in { + val evt = DurableEvent("a", emitterIdA, processId = emitterIdA, vectorTimestamp = VectorTime(emitterIdA -> 1L)) + val exp = DurableEvent("a", emitterIdA, processId = emitterIdA, vectorTimestamp = VectorTime(emitterIdA -> 1L), localLogId = logId, localSequenceNr = 1) + registerCollaborator(aggregateId = None, collaborator = requestorProbe) + log ! ReplicationWrite(List(evt), remoteLogId, 5, VectorTime()) + requestorProbe.expectMsgType[Written].event should be(exp) + } + "not write events to the target log that are in causal past of the target log" in { + val evt1 = DurableEvent("i", emitterIdB, vectorTimestamp = timestamp(0, 7), processId = remoteLogId) + val evt2 = DurableEvent("j", emitterIdB, vectorTimestamp = timestamp(0, 8), processId = remoteLogId) + val evt3 = DurableEvent("k", emitterIdB, vectorTimestamp = timestamp(0, 9), processId = remoteLogId) + registerCollaborator(aggregateId = None, collaborator = requestorProbe) + log ! ReplicationWrite(List(evt1, evt2), remoteLogId, 5, VectorTime()) + log ! ReplicationWrite(List(evt2, evt3), remoteLogId, 6, VectorTime()) + requestorProbe.expectMsgType[Written].event.payload should be("i") + requestorProbe.expectMsgType[Written].event.payload should be("j") + requestorProbe.expectMsgType[Written].event.payload should be("k") + } + "not read events from the source log that are in causal past of the target log (using the target time from the request)" in { + generateEmittedEvents() + log.tell(ReplicationRead(1, Int.MaxValue, NoFilter, remoteLogId, dl, timestamp(1)), requestorProbe.ref) + requestorProbe.expectMsgType[ReplicationReadSuccess].events.map(_.payload) should be(Seq("b", "c")) + } + "not read events from the source log that are in causal past of the target log (using the target time from the cache)" in { + generateEmittedEvents() + log ! ReplicationWrite(Nil, remoteLogId, 5, timestamp(2)) // update time cache + log.tell(ReplicationRead(1, Int.MaxValue, NoFilter, remoteLogId, dl, timestamp(1)), requestorProbe.ref) + requestorProbe.expectMsgType[ReplicationReadSuccess].events.map(_.payload) should be(Seq("c")) } } } diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 8a7df76d..9d39d1fc 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -27,6 +27,14 @@ eventuate { # event handler (= backpressure). replay.chunk-size-max = 65536 + processor { + # Timeout for read operation from target event log. + read-timeout = 10s + + # Timeout for write operations to target event log. + write-timeout = 10s + } + log.batching { # This limit is used by the batching layer to limit the number of events # to be written atomically to the event log. If the number of events from diff --git a/src/main/scala/com/rbmhtechnology/eventuate/ConfirmedDelivery.scala b/src/main/scala/com/rbmhtechnology/eventuate/ConfirmedDelivery.scala index f27c2946..116015f9 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/ConfirmedDelivery.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/ConfirmedDelivery.scala @@ -73,8 +73,8 @@ trait ConfirmedDelivery extends EventsourcedActor { /** * Internal API. */ - override private[eventuate] def capturedSnapshot(snapshot: Snapshot): Snapshot = { - _unconfirmed.values.foldLeft(super.capturedSnapshot(snapshot)) { + override private[eventuate] def snapshotCaptured(snapshot: Snapshot): Snapshot = { + _unconfirmed.values.foldLeft(super.snapshotCaptured(snapshot)) { case (s, da) => s.add(da) } } @@ -82,8 +82,8 @@ trait ConfirmedDelivery extends EventsourcedActor { /** * Internal API. */ - override private[eventuate] def loadedSnapshot(snapshot: Snapshot): Unit = { - super.loadedSnapshot(snapshot) + override private[eventuate] def snapshotLoaded(snapshot: Snapshot): Unit = { + super.snapshotLoaded(snapshot) snapshot.deliveryAttempts.foreach { da => _unconfirmed = _unconfirmed + (da.deliveryId -> da) } diff --git a/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala b/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala index ed98bf31..6d37f50a 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala @@ -58,19 +58,17 @@ case class DurableEvent( vectorTimestamp /** - * Returns `true` if this event is a valid replication candidate. A valid candidate - * has a `vectorTimestamp` that is not `<=` the given `vectorTime` and passes the - * given replication `filter`. + * Returns `true` if this event did not happen before or at the given `vectorTime` + * and passes the given replication `filter`. */ - def replicate(vectorTime: VectorTime, filter: ReplicationFilter): Boolean = - replicate(vectorTime) && filter(this) + def replicable(vectorTime: VectorTime, filter: ReplicationFilter): Boolean = + !before(vectorTime) && filter(this) /** - * Returns `true` if the event is a valid replication candidate. A valid candidate - * has a `vectorTimestamp` that is not `<=` the given `vectorTime`. + * Returns `true` if this event happened before or at the given `vectorTime`. */ - def replicate(vectorTime: VectorTime): Boolean = - !(vectorTimestamp <= vectorTime) + def before(vectorTime: VectorTime): Boolean = + vectorTimestamp <= vectorTime /** * The default routing destination of this event is its `emitterAggregateId`. If defined, the event is @@ -87,20 +85,12 @@ case class DurableEvent( if (defaultDestinationAggregateId.isDefined) customDestinationAggregateIds + defaultDestinationAggregateId.get else customDestinationAggregateIds /** - * Prepares the event for the initial write to a local event log. + * Prepares the event for writing to an event log. */ - private[eventuate] def prepareWrite(logId: String, sequenceNr: Long, timestamp: Long): DurableEvent = { - val st = if (processId == UndefinedLogId) timestamp else systemTimestamp + private[eventuate] def prepare(logId: String, sequenceNr: Long, timestamp: Long): DurableEvent = { val vt = if (processId == UndefinedLogId) vectorTimestamp.setLocalTime(logId, sequenceNr) else vectorTimestamp val id = if (processId == UndefinedLogId) logId else processId - copy(systemTimestamp = st, vectorTimestamp = vt, processId = id, localLogId = logId, localSequenceNr = sequenceNr) - } - - /** - * Prepares the event for a replication write to a target event log. - */ - private[eventuate] def prepareReplicate(logId: String, sequenceNr: Long): DurableEvent = { - copy(localLogId = logId, localSequenceNr = sequenceNr) + copy(systemTimestamp = timestamp, vectorTimestamp = vt, processId = id, localLogId = logId, localSequenceNr = sequenceNr) } } diff --git a/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala b/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala index 38c321c0..cabcde84 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala @@ -122,27 +122,6 @@ trait EventsourcedActor extends EventsourcedView { } } - private def durableEvent(payload: Any, customDestinationAggregateIds: Set[String]): DurableEvent = { - if (sharedClockEntry) { - DurableEvent( - payload = payload, - emitterId = id, - emitterAggregateId = aggregateId, - customDestinationAggregateIds = customDestinationAggregateIds, - vectorTimestamp = currentTime, - processId = UndefinedLogId) - } else { - DurableEvent( - payload = payload, - emitterId = id, - emitterAggregateId = aggregateId, - customDestinationAggregateIds = customDestinationAggregateIds, - systemTimestamp = System.currentTimeMillis(), - vectorTimestamp = incrementLocalTime, - processId = id) - } - } - private def writePending: Boolean = writeRequests.nonEmpty diff --git a/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedProcessor.scala b/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedProcessor.scala new file mode 100644 index 00000000..d509a36b --- /dev/null +++ b/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedProcessor.scala @@ -0,0 +1,172 @@ +/* + * Copyright (C) 2015 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.eventuate + +import java.util.concurrent.TimeUnit + +import akka.actor._ +import akka.pattern.ask +import akka.util.Timeout + +import com.typesafe.config.Config + +import scala.collection.immutable.Seq +import scala.concurrent._ +import scala.concurrent.duration._ + +private class EventsourcedProcessorSettings(config: Config) { + val readTimeout = + config.getDuration("eventuate.processor.read-timeout", TimeUnit.MILLISECONDS).millis + + val writeTimeout = + config.getDuration("eventuate.processor.write-timeout", TimeUnit.MILLISECONDS).millis +} + +object EventsourcedProcessor { + /** + * Type of an [[EventsourcedProcessor]]'s event handler. + */ + //#process + type Process = PartialFunction[Any, Seq[Any]] + //# +} + +/** + * An [[EventsourcedWriter]] that writes processed events to a `targetEventLog`. `EventsourcedProcessor` + * is an idempotent writer that guarantees that no duplicates are ever written to the target event log, + * also under failure conditions. Hence, applications don't need to take extra care about idempotency. + * Processed events are returned by `processEvent`, an application-defined event handler that is called + * with events stored in the source `eventLog`. + * + * The `processEvent` event handler may also update actor state. When a processor (re)starts, its state is + * automatically recovered by replaying events, optionally starting from a snapshot. Snapshots of processor + * state can be saved with the `save` method in the processor's `onCommand` handler. An event-sourced processor + * that doesn't need to maintain internal state (= stateless processor) should implement [[StatelessProcessor]] + * instead. + * + * Usually, a processor's source event log is different from its target event log. If a processor needs to + * write processed events back to its source event log, it must reserve its own entry in the vector clock by + * setting `sharedClockEntry` to `false`. + * + * During initialization, a processor reads the processing progress from the target event log. The timeout + * for this read operation can be configured with the `eventuate.processor.read-timeout` parameter for all + * event-sourced processors or defined on a per class or instance basis by overriding `readTimeout`. The timeout + * for write operations to the target log can be configured with the `eventuate.processor.write-timeout` parameter + * for all event-sourced processors or defined on a per class or instance basis by overriding `writeTimeout`. + */ +trait EventsourcedProcessor extends EventsourcedWriter[Long, Long] { + import ReplicationProtocol._ + import context.dispatcher + + /** + * Type of this processor's event handler. + */ + type Process = EventsourcedProcessor.Process + + private val settings = new EventsourcedProcessorSettings(context.system.settings.config) + + private var processedEvents: Vector[DurableEvent] = Vector.empty + private var storedSequenceNr: Long = 0L + + /** + * This processor's target event log. + */ + def targetEventLog: ActorRef + + /** + * This processor's event handler. It may generate zero or more processed events per source event. + */ + def processEvent: Process + + override final val onEvent: Receive = { + case payload if processEvent.isDefinedAt(payload) => + if (lastSequenceNr > storedSequenceNr) + processedEvents = processEvent(payload).map(durableEvent(_, Set.empty)).foldLeft(processedEvents)(_ :+ _) + } + + override final def write(): Future[Long] = + if (lastSequenceNr > storedSequenceNr) { + val result = targetEventLog.ask(ReplicationWrite(processedEvents, id, lastSequenceNr, VectorTime()))(Timeout(writeTimeout)).flatMap { + case ReplicationWriteSuccess(_, progress, _) => Future.successful(progress) + case ReplicationWriteFailure(cause) => Future.failed(cause) + } + processedEvents = Vector.empty + result + } else Future.successful(storedSequenceNr) + + override final def read(): Future[Long] = { + targetEventLog.ask(GetReplicationProgress(id))(Timeout(readTimeout)).flatMap { + case GetReplicationProgressSuccess(_, progress, _) => Future.successful(progress) + case GetReplicationProgressFailure(cause) => Future.failed(cause) + } + } + + override def writeSuccess(progress: Long): Unit = { + storedSequenceNr = progress + super.writeSuccess(progress) + } + + override def readSuccess(progress: Long): Option[Long] = { + storedSequenceNr = progress + super.readSuccess(progress) + } + + /** + * The default write timeout configured with the `eventuate.processor.write-timeout` parameter. + * Can be overridden. + */ + def writeTimeout: FiniteDuration = + settings.writeTimeout + + /** + * The default read timeout configured with the `eventuate.processor.read-timeout` parameter. + * Can be overridden. + */ + def readTimeout: FiniteDuration = + settings.readTimeout + + override def preStart(): Unit = { + if (eventLog == targetEventLog) require(!sharedClockEntry, "A processor writing to the source log must set sharedClockEntry=false") + super.preStart() + } +} + +/** + * An [[EventsourcedProcessor]] whose `processEvent` handler does '''not''' update actor state. A + * [[StatelessProcessor]] is optimized for fast startup and failure recovery times, at the cost of + * not being able to have its own entry in the vector clock (see overridden method `sharedClockEntry`). + * Consequently, a stateless processor can not write processed events back to its source event log i.e. + * the source and the target event log of a stateless processor must be different. + * + * @see [[EventsourcedProcessor]]. + */ +trait StatelessProcessor extends EventsourcedProcessor { + /** + * Always returns `true`. Therefore, a stateless processor cannot have its own entry in the vector clock + * because its clock can not be recovered during processor (re)start. + */ + override final def sharedClockEntry: Boolean = + true + + /** + * Returns the last successfully processed source log sequence number. + */ + override final def readSuccess(progress: Long): Option[Long] = { + super.readSuccess(progress) + Some(progress + 1L) + } +} \ No newline at end of file diff --git a/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedView.scala b/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedView.scala index 52e1d882..5bc348e7 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedView.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedView.scala @@ -27,7 +27,7 @@ import com.typesafe.config.Config import scala.util._ -private class ReplaySettings(config: Config) { +private class EventsourcedViewSettings(config: Config) { val chunkSizeMax = config.getInt("eventuate.replay.chunk-size-max") } @@ -53,8 +53,15 @@ private object EventsourcedView { * An `EventsourcedView` can only consume events from its `eventLog` but cannot produce new events. * Commands sent to an `EventsourcedView` during recovery are delayed until recovery completes. * - * @see [[EventsourcedActor]] + * Event replay is subject to backpressure. After a configurable number of events + * (see `eventuate.replay.chunk-size-max` configuration parameter), replay is suspended until these + * events have been handled by `onEvent` and then resumed again. There's no backpressure mechanism + * for live event processing yet (but will come in future releases). + * * @see [[DurableEvent]] + * @see [[EventsourcedActor]] + * @see [[EventsourcedWriter]] + * @see [[EventsourcedProcessor]] */ trait EventsourcedView extends Actor with ConditionalCommands with Stash with ActorLogging { import EventsourcedView._ @@ -67,7 +74,7 @@ trait EventsourcedView extends Actor with ConditionalCommands with Stash with Ac private var _lastHandledEvent: DurableEvent = _ private var _clock: VectorClock = _ - private val settings = new ReplaySettings(context.system.settings.config) + private val settings = new EventsourcedViewSettings(context.system.settings.config) private var saveRequests: Map[SnapshotMetadata, Handler[SnapshotMetadata]] = Map.empty /** @@ -150,6 +157,20 @@ trait EventsourcedView extends Actor with ConditionalCommands with Stash with Ac onRecovered() } + /** + * Internal API. + */ + private[eventuate] def receiveEvent(event: DurableEvent): Unit = { + if (onEvent.isDefinedAt(event.payload)) { + onEventInternal(event) + onEvent(event.payload) + } + + if (!recovering) { + conditionChanged(event.vectorTimestamp) + } + } + /** * Internal API. */ @@ -160,7 +181,7 @@ trait EventsourcedView extends Actor with ConditionalCommands with Stash with Ac // set local clock to local time (= sequence number) of event log _clock = _clock.set(event.localLogId, event.localSequenceNr) if (event.emitterId != id) - // merge clock with non-self-emitted event timestamp + // merge clock with non-self-emitted event timestamp _clock = _clock.merge(event.vectorTimestamp) } else { if (event.emitterId != id) @@ -194,7 +215,7 @@ trait EventsourcedView extends Actor with ConditionalCommands with Stash with Ac /** * Internal API. */ - private[eventuate] def incrementLocalTime: VectorTime = { + private[eventuate] def incrementLocalTime(): VectorTime = { _clock = _clock.tick() _clock.currentTime } @@ -234,7 +255,7 @@ trait EventsourcedView extends Actor with ConditionalCommands with Stash with Ac * snapshot metadata. The `handler` can obtain a reference to the initial message * sender with `sender()`. */ - def save(snapshot: Any)(handler: Handler[SnapshotMetadata]): Unit = { + final def save(snapshot: Any)(handler: Handler[SnapshotMetadata]): Unit = { val payload = snapshot match { case tree: ConcurrentVersionsTree[_, _] => tree.copy() case other => other @@ -247,7 +268,7 @@ trait EventsourcedView extends Actor with ConditionalCommands with Stash with Ac handler(Failure(new IllegalStateException(s"snapshot with metadata ${metadata} is currently being saved"))) } else { saveRequests += (metadata -> handler) - val snapshot = capturedSnapshot(prototype) + val snapshot = snapshotCaptured(prototype) eventLog ! SaveSnapshot(snapshot, sender(), self, instanceId) } } @@ -255,13 +276,13 @@ trait EventsourcedView extends Actor with ConditionalCommands with Stash with Ac /** * Internal API. */ - private[eventuate] def capturedSnapshot(snapshot: Snapshot): Snapshot = + private[eventuate] def snapshotCaptured(snapshot: Snapshot): Snapshot = snapshot /** * Internal API. */ - private[eventuate] def loadedSnapshot(snapshot: Snapshot): Unit = { + private[eventuate] def snapshotLoaded(snapshot: Snapshot): Unit = { _lastHandledEvent = snapshot.lastEvent _clock = _clock.copy(currentTime = snapshot.currentTime) } @@ -273,23 +294,55 @@ trait EventsourcedView extends Actor with ConditionalCommands with Stash with Ac onCommand(msg) /** - * Sends a [[EventsourcingProtocol#LoadSnapshot LoadSnapshot]] command to the event log. + * Internal API. + */ + private[eventuate] def init(): Unit = + load() + + /** + * Internal API. */ - private def load(): Unit = + private[eventuate] def load(): Unit = eventLog ! LoadSnapshot(id, self, instanceId) /** - * Sends a [[EventsourcingProtocol#Replay Replay]] command to the event log. + * Internal API. */ //#replay - private def replay(fromSequenceNr: Long = 1L): Unit = + private[eventuate] def replay(fromSequenceNr: Long = 1L): Unit = eventLog ! Replay(fromSequenceNr, replayChunkSizeMax, self, aggregateId, instanceId) //# - private def initiating: Receive = { + /** + * Internal API. + */ + private[eventuate] def durableEvent(payload: Any, customDestinationAggregateIds: Set[String]): DurableEvent = { + if (sharedClockEntry) { + DurableEvent( + payload = payload, + emitterId = id, + emitterAggregateId = aggregateId, + customDestinationAggregateIds = customDestinationAggregateIds, + vectorTimestamp = currentTime, + processId = DurableEvent.UndefinedLogId) + } else { + DurableEvent( + payload = payload, + emitterId = id, + emitterAggregateId = aggregateId, + customDestinationAggregateIds = customDestinationAggregateIds, + vectorTimestamp = incrementLocalTime(), + processId = id) + } + } + + /** + * Internal API. + */ + private[eventuate] def initiating: Receive = { case LoadSnapshotSuccess(Some(snapshot), iid) => if (iid == instanceId) { if (onSnapshot.isDefinedAt(snapshot.payload)) { - loadedSnapshot(snapshot) + snapshotLoaded(snapshot) onSnapshot(snapshot.payload) replay(snapshot.metadata.sequenceNr + 1L) } else { @@ -323,7 +376,10 @@ trait EventsourcedView extends Actor with ConditionalCommands with Stash with Ac messageStash.stash() } - private def initiated: Receive = { + /** + * Internal API. + */ + private[eventuate] def initiated: Receive = { case Written(event) => if (event.localSequenceNr > lastSequenceNr) { receiveEvent(event) } @@ -341,17 +397,6 @@ trait EventsourcedView extends Actor with ConditionalCommands with Stash with Ac unhandledMessage(msg) } - private def receiveEvent(event: DurableEvent): Unit = { - if (onEvent.isDefinedAt(event.payload)) { - onEventInternal(event) - onEvent(event.payload) - } - - if (!recovering) { - conditionChanged(event.vectorTimestamp) - } - } - /** * Initialization behavior. */ @@ -363,7 +408,7 @@ trait EventsourcedView extends Actor with ConditionalCommands with Stash with Ac override def preStart(): Unit = { _lastHandledEvent = DurableEvent(id) _clock = VectorClock(id) - load() + init() } /** diff --git a/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedWriter.scala b/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedWriter.scala new file mode 100644 index 00000000..d557793e --- /dev/null +++ b/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedWriter.scala @@ -0,0 +1,219 @@ +/* + * Copyright (C) 2015 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.eventuate + +import akka.actor._ + +import scala.concurrent.Future +import scala.util._ + +object EventsourcedWriter { + /** + * Thrown by [[EventsourcedWriter]] to indicate that a read operation from an external database failed. + * + * @see [[EventsourcedWriter#readFailure]] + */ + class ReadException(message: String, cause: Throwable) extends RuntimeException(message, cause) + + /** + * Thrown by [[EventsourcedWriter]] to indicate that a write operation to the external database failed. + * + * @see [[EventsourcedWriter#writeFailure]] + */ + class WriteException(message: String, cause: Throwable) extends RuntimeException(message, cause) +} + +/** + * An [[EventsourcedView]] designed to update external databases from events stored in its event log. It + * supports event processing patterns optimized for batch-updating external databases to create persistent + * views or read models: + * + * - During initialization, a concrete `EventsourcedWriter` asynchronously `read`s data from the external + * database to obtain information about the actual event processing progress. For example, if the last + * processed event sequence number is written with every batch update to the database, it can be read + * during initialization and used by the writer to detect duplicates during further event processing so + * that event processing can be made idempotent. + * + * - During event processing in its `onEvent` handler, a concrete writer usually builds a database-specific + * write-batch (representing an incremental update). After a configurable number of events, `EventsourcedWriter` + * calls `write` to asynchronously write the prepared batch to the database. + * + * An `EventsourcedWriter` may also implement an `onCommand` handler to process commands and save snapshots of + * internal state. Internal state is recovered by replaying events from the event log, optionally starting from + * a saved snapshot (see [[EventsourcedView]] for details). If a writer doesn't require full internal state + * recovery, it may define a custom starting position in the event log by returning a sequence number from + * `readSuccess`. If full internal state recovery is required instead, `readSuccess` should return `None` + * (which is the default). + * + * Implementation notes: + * + * - After having started an asynchronous write but before returning from `write`, a writer should clear the + * prepared write batch so that further events can be processed while the asynchronous write operation is + * in progress. + * - Event processing during replay (either starting from a default or application-defined position) is subject + * to backpressure. After a configurable number of events (see `eventuate.replay.chunk-size-max` configuration + * parameter), replay is suspended until these events have been written to the target database and then resumed + * again. There's no backpressure mechanism for live event processing yet (but will come in future releases). + * + * @see [[EventsourcedProcessor]] + * @see [[StatelessProcessor]] + * + * @tparam R Result type of the asynchronous read operation. + * @tparam W Result type of the asynchronous write operations. + */ +trait EventsourcedWriter[R, W] extends EventsourcedView { + import EventsourcedWriter._ + import EventsourcingProtocol._ + import context.dispatcher + + private case class ReadSuccess(result: R, instanceId: Int) + private case class ReadFailure(cause: Throwable, instanceId: Int) + + private case class WriteSuccess(result: W, instanceId: Int) + private case class WriteFailure(cause: Throwable, instanceId: Int) + + private var numPending: Int = 0 + + /** + * Asynchronously reads an initial value from the target database, usually to obtain information about + * event processing progress. This method is called during initialization. + */ + def read(): Future[R] + + /** + * Asynchronously writes an incremental update to the target database. Incremental updates are prepared + * during event processing by a concrete `onEvent` handler. + * + * During event replay, this method is called latest after having replayed `eventuate.replay.chunk-size-max` + * events and immediately after replay completes. During live processing, `write` is called immediately if + * no write operation is in progress and an event has been handled by `onEvent`. If a write operation is in + * progress, further event handling may run concurrently to that operation. If events are handled while a + * write operation is in progress, another write will follow immediately after the previous write operation + * completes. + */ + def write(): Future[W] + + /** + * Called with a read result after a `read` operation successfully completes. This method may update + * internal actor state. If `None` is returned, the writer continues with state recovery by replaying + * events, optionally starting from a snapshot. If the return value is defined, replay starts from the + * returned sequence number without ever loading a snapshot. Does nothing and returns `None` by default + * and can be overridden. + */ + def readSuccess(result: R): Option[Long] = + None + + /** + * Called with a write result after a `write` operation successfully completes. This method may update + * internal actor state. Does nothing by default and can be overridden. + */ + def writeSuccess(result: W): Unit = + () + + /** + * Called with failure details after a `write` operation failed. Throws [[EventsourcedWriter#ReadException]] + * by default (causing the writer to restart) and can be overridden. + */ + def readFailure(cause: Throwable): Unit = + throw new ReadException("read failed", cause) + + /** + * Called with failure details after a `write` operation failed. Throws [[EventsourcedWriter#WriteException]] + * by default (causing the writer to restart) and can be overridden. + */ + def writeFailure(cause: Throwable): Unit = + throw new WriteException("write failed", cause) + + override private[eventuate] def onEventInternal(event: DurableEvent): Unit = { + super.onEventInternal(event) + numPending += 1 + } + + override private[eventuate] def init(): Unit = { + read onComplete { + case Success(r) => self ! ReadSuccess(r, instanceId) + case Failure(e) => self ! ReadFailure(e, instanceId) + } + } + + override private[eventuate] def initiating: Receive = { + case ReadSuccess(r, iid) => if (iid == instanceId) { + readSuccess(r) match { + case Some(snr) => replay(snr) + case None => load() + } + } + case ReadFailure(e, iid) => if (iid == instanceId) { + readFailure(e) + } + case ReplaySuspended(iid) => if (iid == instanceId) { + context.become(initiatingWrite(sender()) orElse initiating) + write(instanceId) + } + case ReplaySuccess(iid) => if (iid == instanceId) { + context.become(initiatedWrite orElse initiated) + conditionChanged(lastVectorTimestamp) + messageStash.unstashAll() + recovered() + write(instanceId) + } + case other => + super.initiating(other) + } + + override private[eventuate] def initiated: Receive = { + case Written(event) => if (event.localSequenceNr > lastSequenceNr) { + context.become(initiatedWrite orElse initiated) + receiveEvent(event) + write(instanceId) + } + case other => + super.initiated(other) + } + + private def initiatingWrite(replayer: ActorRef): Receive = { + case WriteSuccess(r, iid) => if (iid == instanceId) { + writeSuccess(r) + context.become(initiating) + replayer ! ReplayNext(replayChunkSizeMax, iid) + } + case WriteFailure(cause, iid) => if (iid == instanceId) { + writeFailure(cause) + } + } + + private def initiatedWrite: Receive = { + case Written(event) => if (event.localSequenceNr > lastSequenceNr) { + receiveEvent(event) + } + case WriteSuccess(r, iid) => if (iid == instanceId) { + writeSuccess(r) + if (numPending > 0) write(instanceId) else context.become(initiated) + } + case WriteFailure(cause, iid) => if (iid == instanceId) { + writeFailure(cause) + } + } + + private def write(instanceId: Int): Unit = { + write().onComplete { + case Success(r) => self ! WriteSuccess(r, instanceId) + case Failure(e) => self ! WriteFailure(e, instanceId) + } + numPending = 0 + } +} diff --git a/src/main/scala/com/rbmhtechnology/eventuate/EventsourcingProtocol.scala b/src/main/scala/com/rbmhtechnology/eventuate/EventsourcingProtocol.scala index 8e660d92..beb3c3bd 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/EventsourcingProtocol.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/EventsourcingProtocol.scala @@ -24,13 +24,7 @@ object EventsourcingProtocol { /** * Instructs an event log to batch-execute the given `writes`. */ - case class WriteN(writes: Seq[Write]) { - - /** - * Number of events in this batch write request. - */ - def size: Int = writes.map(_.events.size).sum - } + case class WriteN(writes: Seq[Write]) /** * Completion reply after a [[WriteN]]. diff --git a/src/main/scala/com/rbmhtechnology/eventuate/Recovery.scala b/src/main/scala/com/rbmhtechnology/eventuate/Recovery.scala index 4a131b14..8e9d9dab 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/Recovery.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/Recovery.scala @@ -139,7 +139,7 @@ private class Acceptor(endpoint: ReplicationEndpoint) extends Actor { case Process => context.become(processing) case Recover(links, promise) => - println(s"[recovery of ${endpoint.id}] Chacking replication progress with remote endpoints ...") + println(s"[recovery of ${endpoint.id}] Checking replication progress with remote endpoints ...") context.become(recovering(context.actorOf(Props(new RecoveryManager(endpoint.id, links))), promise)) } diff --git a/src/main/scala/com/rbmhtechnology/eventuate/ReplicationProtocol.scala b/src/main/scala/com/rbmhtechnology/eventuate/ReplicationProtocol.scala index 2dc2d7a2..7fede9e7 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/ReplicationProtocol.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/ReplicationProtocol.scala @@ -148,11 +148,21 @@ object ReplicationProtocol { */ case class ReplicationReadFailure(cause: String, targetLogId: String) extends Format + /** + * Instructs an event log to batch-execute the given `writes`. + */ + case class ReplicationWriteN(writes: Seq[ReplicationWrite]) + + /** + * Completion reply after a [[ReplicationWriteN]]. + */ + case object ReplicationWriteNComplete + /** * Instructs a target log to write replicated `events` from the source log identified by * `sourceLogId` along with the last read position in the source log (`replicationProgress`). */ - case class ReplicationWrite(events: Seq[DurableEvent], sourceLogId: String, replicationProgress: Long, currentSourceVectorTime: VectorTime) + case class ReplicationWrite(events: Seq[DurableEvent], sourceLogId: String, replicationProgress: Long, currentSourceVectorTime: VectorTime, initiator: ActorRef = null) /** * Success reply after a [[ReplicationWrite]]. diff --git a/src/main/scala/com/rbmhtechnology/eventuate/crdt/CRDTService.scala b/src/main/scala/com/rbmhtechnology/eventuate/crdt/CRDTService.scala index 91f97224..6a1caf84 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/crdt/CRDTService.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/crdt/CRDTService.scala @@ -214,8 +214,8 @@ trait CRDTService[A, B] { } override val onSnapshot: Receive = { - case snapshot: A => - crdt = snapshot + case snapshot => + crdt = snapshot.asInstanceOf[A] context.parent ! OnChange(crdt, null) } } diff --git a/src/main/scala/com/rbmhtechnology/eventuate/log/BatchingEventLog.scala b/src/main/scala/com/rbmhtechnology/eventuate/log/BatchingEventLog.scala index 70225db4..b8896249 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/log/BatchingEventLog.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/log/BatchingEventLog.scala @@ -20,6 +20,8 @@ import akka.actor._ import com.rbmhtechnology.eventuate.EventsourcingProtocol import com.rbmhtechnology.eventuate.EventsourcingProtocol._ +import com.rbmhtechnology.eventuate.ReplicationProtocol +import com.rbmhtechnology.eventuate.ReplicationProtocol._ import com.typesafe.config.Config @@ -28,20 +30,38 @@ private[eventuate] class BatchingSettings(config: Config) { } /** - * An event log wrapper that batches [[EventsourcingProtocol.Write]] commands. Batched write commands are - * sent as [[EventsourcingProtocol.WriteN]] batch to the wrapped event log. + * An event log wrapper that batches write commands. Batched [[EventsourcingProtocol.Write]] commands are sent as + * [[EventsourcingProtocol.WriteN]] batch to the wrapped event log. Batched [[ReplicationProtocol.ReplicationWrite]] + * commands are sent as [[ReplicationProtocol.ReplicationWriteN]] batch to the wrapped event event log. * - * Batch sizes dynamically increase to a configurable limit under increasing load. The batch size limit can - * be configured with `eventuate.log.batch-size-limit`. If there is no current write operation in progress, - * a new `Write` command is served immediately (as `WriteN` batch of size 1), keeping latency at a minimum. + * Batch sizes dynamically increase to a configurable limit under increasing load. The batch size limit can be + * configured with `eventuate.log.batch-size-limit`. If there is no current write operation in progress, a new + * `Write` or `ReplicationWrite` command is served immediately (as `WriteN` or `ReplicationWriteN` batch of size + * 1, respectively), keeping latency at a minimum. * * @param eventLogProps configuration object of the wrapped event log actor. The wrapped event log actor is * created as child actor of this wrapper. */ class BatchingEventLog(eventLogProps: Props) extends Actor { - val settings = new BatchingSettings(context.system.settings.config) - val eventLog = context.actorOf(eventLogProps) + val eventLog: ActorRef = + context.actorOf(eventLogProps) + + val emissionBatcher: ActorRef = + context.actorOf(Props(new EmissionBatcher(eventLog))) + + val replicationBatcher: ActorRef = + context.actorOf(Props(new ReplicationBatcher(eventLog))) + + def receive = { + case r: ReplicationWrite => + replicationBatcher forward r.copy(initiator = sender()) + case cmd => + emissionBatcher forward cmd + } +} +private class EmissionBatcher(eventLog: ActorRef) extends Actor { + val settings = new BatchingSettings(context.system.settings.config) var batch: Vector[Write] = Vector.empty val idle: Receive = { @@ -54,9 +74,6 @@ class BatchingEventLog(eventLogProps: Props) extends Actor { } val writing: Receive = { - // - // TODO: consider using a receive timeout here - // case w: Write => batch = batch :+ w case WriteNComplete if batch.isEmpty => @@ -77,7 +94,7 @@ class BatchingEventLog(eventLogProps: Props) extends Actor { private def writeAll(): Unit = if (writeBatch()) writeAll() - private def writeBatch(): Boolean = if (batch.size > 0) { + private def writeBatch(): Boolean = if (batch.nonEmpty) { var num = 0 val (w, r) = batch.span { w => num += w.events.size @@ -87,4 +104,38 @@ class BatchingEventLog(eventLogProps: Props) extends Actor { batch = r batch.nonEmpty } else false +} + +private class ReplicationBatcher(eventLog: ActorRef) extends Actor { + val settings = new BatchingSettings(context.system.settings.config) + var batch: Vector[ReplicationWrite] = Vector.empty + + val idle: Receive = { + case w: ReplicationWrite => + batch = batch :+ w.copy(initiator = sender()) + writeBatch() + context.become(writing) + } + + val writing: Receive = { + case w: ReplicationWrite => + batch = batch :+ w.copy(initiator = sender()) + case ReplicationWriteNComplete if batch.isEmpty => + context.become(idle) + case ReplicationWriteNComplete => + writeBatch() + } + + def receive = idle + + private def writeBatch(): Boolean = if (batch.nonEmpty) { + var num = 0 + val (w, r) = batch.span { w => + num += w.events.size + num <= settings.batchSizeLimit || num == w.events.size + } + eventLog ! ReplicationWriteN(w) + batch = r + batch.nonEmpty + } else false } diff --git a/src/main/scala/com/rbmhtechnology/eventuate/log/NotificationChannel.scala b/src/main/scala/com/rbmhtechnology/eventuate/log/NotificationChannel.scala index ff18e512..0a236b3e 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/log/NotificationChannel.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/log/NotificationChannel.scala @@ -43,7 +43,7 @@ private class NotificationChannel(logId: String) extends Actor { case Updated(events) => registry.foreach { case (targetLogId, reg) => - if (!reading.contains(targetLogId) && events.exists(_.replicate(reg.currentTargetVectorTime, reg.filter))) { + if (!reading.contains(targetLogId) && events.exists(_.replicable(reg.currentTargetVectorTime, reg.filter))) { reg.replicator ! ReplicationDue } } diff --git a/src/main/scala/com/rbmhtechnology/eventuate/log/SubscriberRegistry.scala b/src/main/scala/com/rbmhtechnology/eventuate/log/SubscriberRegistry.scala index 22f8455e..22bfaed6 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/log/SubscriberRegistry.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/log/SubscriberRegistry.scala @@ -18,7 +18,7 @@ package com.rbmhtechnology.eventuate.log import akka.actor.ActorRef import com.rbmhtechnology.eventuate.DurableEvent -import com.rbmhtechnology.eventuate.EventsourcingProtocol.{WriteFailure, WriteSuccess, Written} +import com.rbmhtechnology.eventuate.EventsourcingProtocol._ import scala.collection.immutable.Seq @@ -51,7 +51,7 @@ private case class SubscriberRegistry( } } - def pushWriteSuccess(events: Seq[DurableEvent], initiator: ActorRef, requestor: ActorRef, instanceId: Int): Unit = + def pushWriteSuccess(events: Seq[DurableEvent], initiator: ActorRef, requestor: ActorRef, instanceId: Int): Unit = { events.foreach { event => requestor.tell(WriteSuccess(event, instanceId), initiator) val written = Written(event) @@ -63,6 +63,7 @@ private case class SubscriberRegistry( aggregate <- aggregateRegistry(aggregateId) if aggregate != requestor } aggregate ! written } + } def pushWriteFailure(events: Seq[DurableEvent], initiator: ActorRef, requestor: ActorRef, instanceId: Int, cause: Throwable): Unit = events.foreach { event => diff --git a/src/main/scala/com/rbmhtechnology/eventuate/log/TimeTracker.scala b/src/main/scala/com/rbmhtechnology/eventuate/log/TimeTracker.scala index adcbc2c1..ea5b42b4 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/log/TimeTracker.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/log/TimeTracker.scala @@ -20,6 +20,7 @@ import akka.event.LoggingAdapter import com.rbmhtechnology.eventuate._ import com.rbmhtechnology.eventuate.EventsourcingProtocol.Write +import com.rbmhtechnology.eventuate.ReplicationProtocol.ReplicationWrite import scala.collection.immutable.Seq @@ -54,6 +55,14 @@ private[eventuate] case class TimeTracker(updateCount: Long = 0L, sequenceNr: Lo } } + def prepareReplicates(logId: String, writes: Seq[ReplicationWrite], logger: LoggingAdapter): (Seq[ReplicationWrite], TimeTracker) = { + writes.foldLeft((Vector.empty[ReplicationWrite], this)) { + case ((writes2, tracker2), write) => tracker2.prepareReplicate(logId, write.events, write.replicationProgress, logger) match { + case (updated, tracker3) => (writes2 :+ write.copy(events = updated), tracker3) + } + } + } + def prepareWrite(logId: String, events: Seq[DurableEvent], systemTimestamp: Long): (Seq[DurableEvent], TimeTracker) = { var snr = sequenceNr var upd = updateCount @@ -63,7 +72,7 @@ private[eventuate] case class TimeTracker(updateCount: Long = 0L, sequenceNr: Lo snr += 1L upd += 1L - val e2 = e.prepareWrite(logId, snr, systemTimestamp) + val e2 = e.prepare(logId, snr, systemTimestamp) lvt = lvt.merge(e2.vectorTimestamp) e2 } @@ -71,39 +80,40 @@ private[eventuate] case class TimeTracker(updateCount: Long = 0L, sequenceNr: Lo (updated, copy(updateCount = upd, sequenceNr = snr, vectorTime = vectorTime.merge(lvt))) } - def prepareReplicate(logId: String, events: Seq[DurableEvent], replicationProgress: Long): (Seq[DurableEvent], TimeTracker) = { + def prepareReplicate(logId: String, events: Seq[DurableEvent], replicationProgress: Long, logger: LoggingAdapter): (Seq[DurableEvent], TimeTracker) = { var snr = sequenceNr var upd = updateCount var lvt = vectorTime val updated = events.foldLeft(Vector.empty[DurableEvent]) { - case (acc, e) if e.replicate(lvt) => - snr += 1L - upd += 1L - - val e2 = e.prepareReplicate(logId, snr) - lvt = lvt.merge(e2.vectorTimestamp) - acc :+ e2 - case (acc, e) => + case (acc, e) if e.before(vectorTime) => // Exclude events from writing that are in the causal past of this event log. // Excluding them at the target is needed for correctness. Events are also // filtered at sources (to reduce network bandwidth usage) but this is only // an optimization which cannot achieve 100% filtering coverage for certain // replication network topologies. acc + case (acc, e) => + snr += 1L + upd += 1L + + val e2 = e.prepare(logId, snr, e.systemTimestamp) + lvt = lvt.merge(e2.vectorTimestamp) + acc :+ e2 } + TimeTracker.logFilterStatistics(logId, "target", events, updated, logger) (updated, copy(updateCount = upd, sequenceNr = snr, vectorTime = vectorTime.merge(lvt))) } } private[eventuate] object TimeTracker { - def logFilterStatistics(log: LoggingAdapter, logId: String, location: String, before: Seq[DurableEvent], after: Seq[DurableEvent]): Unit = { + def logFilterStatistics(logId: String, location: String, before: Seq[DurableEvent], after: Seq[DurableEvent], logger: LoggingAdapter): Unit = { val bl = before.length val al = after.length if (al < bl) { val diff = bl - al val perc = diff * 100.0 / bl - log.info(f"[$logId] excluded $diff events ($perc%3.1f%% at $location)") + logger.info(f"[$logId] excluded $diff events ($perc%3.1f%% at $location)") } } } \ No newline at end of file diff --git a/src/main/scala/com/rbmhtechnology/eventuate/log/cassandra/CassandraEventLog.scala b/src/main/scala/com/rbmhtechnology/eventuate/log/cassandra/CassandraEventLog.scala index 901d7c67..774b6955 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/log/cassandra/CassandraEventLog.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/log/cassandra/CassandraEventLog.scala @@ -163,75 +163,21 @@ class CassandraEventLog(val id: String) extends Actor with Stash with ActorLoggi // still filtered at target based on the current local vector time at the target (for // correctness). val currentTargetVectorTime = timeCache(targetLogId) - val updated = events.filter(_.replicate(currentTargetVectorTime)) + val updated = events.filterNot(_.before(currentTargetVectorTime)) val reply = r.copy(updated, currentSourceVectorTime = timeTracker.vectorTime) sender() ! reply notificationChannel ! reply - logFilterStatistics(log, id, "source", events, updated) - case Write(events, initiator, requestor, iid) => - val result = for { - (partition, tracker) <- Try(adjustSequenceNr(events.size, cassandra.settings.partitionSizeMax, timeTracker)) - (updated, tracker2) = tracker.prepareWrite(id, events, currentSystemTime) - tracker3 <- Try(write(partition, updated, tracker2)) - } yield (updated, tracker3) - - result match { - case Success((updated, tracker)) => - timeTracker = tracker - registry.pushWriteSuccess(updated, initiator, requestor, iid) - notificationChannel ! Updated(updated) - case Failure(e) => - registry.pushWriteFailure(events, initiator, requestor, iid, e) - } - case r @ WriteN(writes) => - val result = for { - (partition, tracker) <- Try(adjustSequenceNr(r.size, cassandra.settings.partitionSizeMax, timeTracker)) - (updatedWrites, tracker2) = tracker.prepareWrites(id, writes, currentSystemTime) - updatedEvents = updatedWrites.flatMap(_.events) - tracker3 <- Try(write(partition, updatedEvents, tracker2)) - } yield (updatedWrites, updatedEvents, tracker3) - - result match { - case Success((updatedWrites, updatedEvents, tracker)) => - timeTracker = tracker - updatedWrites.foreach(w => registry.pushWriteSuccess(w.events, w.initiator, w.requestor, w.instanceId)) - notificationChannel ! Updated(updatedEvents) - case Failure(e) => - writes.foreach(w => registry.pushWriteFailure(w.events, w.initiator, w.requestor, w.instanceId, e)) - } - sender() ! WriteNComplete // notify batch layer that write completed - case ReplicationWrite(events, sourceLogId, replicationProgress, currentSourceVectorTime) => - timeCache = timeCache.updated(sourceLogId, currentSourceVectorTime) - val result = for { - (partition, tracker) <- Try(adjustSequenceNr(events.size, cassandra.settings.partitionSizeMax, timeTracker)) - (updated, tracker2) = tracker.prepareReplicate(id, events, replicationProgress) - tracker3 <- Try(write(partition, updated, tracker2)) - } yield (updated, tracker3) - - result match { - case Success((updated, tracker)) => - val rws = ReplicationWriteSuccess(events.size, replicationProgress, tracker.vectorTime) - val sdr = sender() - timeTracker = tracker - registry.pushReplicateSuccess(updated) - notificationChannel ! rws - notificationChannel ! Updated(updated) - logFilterStatistics(log, id, "target", events, updated) - implicit val dispatcher = context.system.dispatchers.defaultGlobalDispatcher - progressStore.writeReplicationProgressAsync(sourceLogId, replicationProgress) onComplete { - case Success(_) => - sdr ! rws - case Failure(e) => - // Write failure of replication progress can be ignored. Using a stale - // progress to resume replication will redundantly read events from a - // source log but these events will be successfully identified as - // duplicates, either at source or latest at target. - log.warning(s"Writing of replication progress failed: ${e.getMessage}") - sdr ! ReplicationWriteFailure(e) - } - case Failure(e) => - sender() ! ReplicationWriteFailure(e) - } + logFilterStatistics(id, "source", events, updated, log) + case w: Write => + writeN(Seq(w)) + case WriteN(writes) => + writeN(writes) + sender() ! WriteNComplete + case w: ReplicationWrite => + replicateN(Seq(w.copy(initiator = sender()))) + case ReplicationWriteN(writes) => + replicateN(writes) + sender() ! ReplicationWriteNComplete case LoadSnapshot(emitterId, requestor, iid) => import cassandra.readDispatcher snapshotStore.loadAsync(emitterId) onComplete { @@ -273,6 +219,62 @@ class CassandraEventLog(val id: String) extends Actor with Stash with ActorLoggi private[eventuate] def replayer(requestor: ActorRef, iterator: => Iterator[DurableEvent] with Closeable, fromSequenceNr: Long): ActorRef = context.actorOf(Props(new ChunkedEventReplay(requestor, iterator)).withDispatcher("eventuate.log.cassandra.read-dispatcher")) + private def replicateN(writes: Seq[ReplicationWrite]): Unit = { + writes.foreach(w => timeCache = timeCache.updated(w.sourceLogId, w.currentSourceVectorTime)) + val result = for { + (partition, tracker) <- Try(adjustSequenceNr(writes.map(_.events.size).sum, cassandra.settings.partitionSizeMax, timeTracker)) + (updatedWrites, tracker2) = tracker.prepareReplicates(id, writes, log) + updatedEvents = updatedWrites.flatMap(_.events) + tracker3 <- Try(write(partition, updatedEvents, tracker2)) + } yield (updatedWrites, updatedEvents, tracker3) + + result match { + case Success((updatedWrites, updatedEvents, tracker3)) => + timeTracker = tracker3 + updatedWrites.foreach { w => + val rws = ReplicationWriteSuccess(w.events.size, w.replicationProgress, tracker3.vectorTime) + val sdr = w.initiator + registry.pushReplicateSuccess(w.events) + notificationChannel ! rws + implicit val dispatcher = context.system.dispatchers.defaultGlobalDispatcher + progressStore.writeReplicationProgressAsync(w.sourceLogId, w.replicationProgress) onComplete { + case Success(_) => + sdr ! rws + case Failure(e) => + // Write failure of replication progress can be ignored. Using a stale + // progress to resume replication will redundantly read events from a + // source log but these events will be successfully identified as + // duplicates, either at source or latest at target. + log.warning(s"Writing of replication progress failed: ${e.getMessage}") + sdr ! ReplicationWriteFailure(e) + } + } + notificationChannel ! Updated(updatedEvents) + case Failure(e) => + writes.foreach { w => + w.initiator ! ReplicationWriteFailure(e) + } + } + } + + private def writeN(writes: Seq[Write]): Unit = { + val result = for { + (partition, tracker) <- Try(adjustSequenceNr(writes.map(_.events.size).sum, cassandra.settings.partitionSizeMax, timeTracker)) + (updatedWrites, tracker2) = tracker.prepareWrites(id, writes, currentSystemTime) + updatedEvents = updatedWrites.flatMap(_.events) + tracker3 <- Try(write(partition, updatedEvents, tracker2)) + } yield (updatedWrites, updatedEvents, tracker3) + + result match { + case Success((updatedWrites, updatedEvents, tracker3)) => + timeTracker = tracker3 + updatedWrites.foreach(w => registry.pushWriteSuccess(w.events, w.initiator, w.requestor, w.instanceId)) + notificationChannel ! Updated(updatedEvents) + case Failure(e) => + writes.foreach(w => registry.pushWriteFailure(w.events, w.initiator, w.requestor, w.instanceId, e)) + } + } + private[eventuate] def write(partition: Long, events: Seq[DurableEvent], tracker: TimeTracker): TimeTracker = { cassandra.executeBatch { batch => events.foreach { event => diff --git a/src/main/scala/com/rbmhtechnology/eventuate/log/cassandra/CassandraEventReader.scala b/src/main/scala/com/rbmhtechnology/eventuate/log/cassandra/CassandraEventReader.scala index 999e025f..1d87725f 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/log/cassandra/CassandraEventReader.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/log/cassandra/CassandraEventReader.scala @@ -45,7 +45,7 @@ private[eventuate] class CassandraEventReader(cassandra: Cassandra, logId: Strin var lastSequenceNr = fromSequenceNr - 1L val events = eventIterator(fromSequenceNr, toSequenceNr).filter { evt => lastSequenceNr = evt.localSequenceNr - evt.replicate(lower, filter) + evt.replicable(lower, filter) }.take(max).toVector ReadResult(events, lastSequenceNr) } diff --git a/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala b/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala index ded0812e..0e857bbb 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala @@ -130,52 +130,21 @@ class LeveldbEventLog(val id: String, prefix: String) extends Actor with ActorLo // still filtered at target based on the current local vector time at the target (for // correctness). val currentTargetVectorTime = timeCache(targetLogId) - val updated = events.filter(_.replicate(currentTargetVectorTime)) + val updated = events.filterNot(_.before(currentTargetVectorTime)) val reply = r.copy(updated, currentSourceVectorTime = timeTracker.vectorTime) sender() ! reply notificationChannel ! reply - logFilterStatistics(log, id, "source", events, updated) - case Write(events, initiator, requestor, iid) => - val (updated, tracker) = timeTracker.prepareWrite(id, events, currentSystemTime) - Try(write(updated, tracker)) match { - case Success(tracker2) => - timeTracker = tracker2 - registry.pushWriteSuccess(updated, initiator, requestor, iid) - notificationChannel ! Updated(updated) - case Failure(e) => - registry.pushWriteFailure(events, initiator, requestor, iid, e) - } + logFilterStatistics(id, "source", events, updated, log) + case w: Write => + writeN(Seq(w)) case WriteN(writes) => - val (updatedWrites, tracker) = timeTracker.prepareWrites(id, writes, currentSystemTime) - val updatedEvents = updatedWrites.flatMap(_.events) - Try(withBatch(batch => write(updatedEvents, tracker, batch))) match { - case Success(tracker2) => - timeTracker = tracker2 - updatedWrites.foreach(w => registry.pushWriteSuccess(w.events, w.initiator, w.requestor, w.instanceId)) - notificationChannel ! Updated(updatedEvents) - case Failure(e) => - writes.foreach(w => registry.pushWriteFailure(w.events, w.initiator, w.requestor, w.instanceId, e)) - } - sender() ! WriteNComplete // notify batch layer that write completed - case w @ ReplicationWrite(events, sourceLogId, replicationProgress, currentSourceVectorTime) => - timeCache = timeCache.updated(sourceLogId, currentSourceVectorTime) - val (updated, tracker) = timeTracker.prepareReplicate(id, events, replicationProgress) - Try { - withBatch { batch => - replicationProgressMap.writeReplicationProgress(sourceLogId, replicationProgress, batch) - write(updated, tracker, batch) - } - } match { - case Success(tracker2) => - sender() ! ReplicationWriteSuccess(events.size, replicationProgress, tracker2.vectorTime) - timeTracker = tracker2 - registry.pushReplicateSuccess(updated) - notificationChannel ! w - notificationChannel ! Updated(updated) - logFilterStatistics(log, id, "target", events, updated) - case Failure(e) => - sender() ! ReplicationWriteFailure(e) - } + writeN(writes) + sender() ! WriteNComplete + case w: ReplicationWrite => + replicateN(Seq(w.copy(initiator = sender()))) + case ReplicationWriteN(writes) => + replicateN(writes) + sender() ! ReplicationWriteNComplete case LoadSnapshot(emitterId, requestor, iid) => import leveldbSettings.readDispatcher snapshotStore.loadAsync(emitterId) onComplete { @@ -205,6 +174,44 @@ class LeveldbEventLog(val id: String, prefix: String) extends Actor with ActorLo def currentSystemTime: Long = System.currentTimeMillis + private def replicateN(writes: Seq[ReplicationWrite]): Unit = { + writes.foreach(w => timeCache = timeCache.updated(w.sourceLogId, w.currentSourceVectorTime)) + val (updatedWrites, tracker) = timeTracker.prepareReplicates(id, writes, log) + val updatedEvents = updatedWrites.flatMap(_.events) + Try { + withBatch { batch => + updatedWrites.foreach(w => replicationProgressMap.writeReplicationProgress(w.sourceLogId, w.replicationProgress, batch)) + write(updatedEvents, tracker, batch) + } + } match { + case Success(tracker2) => + timeTracker = tracker2 + updatedWrites.foreach { w => + registry.pushReplicateSuccess(w.events) + w.initiator ! ReplicationWriteSuccess(w.events.size, w.replicationProgress, tracker2.vectorTime) + notificationChannel ! w + } + notificationChannel ! Updated(updatedEvents) + case Failure(e) => + updatedWrites.foreach { w => + w.initiator ! ReplicationWriteFailure(e) + } + } + } + + private def writeN(writes: Seq[Write]): Unit = { + val (updatedWrites, tracker) = timeTracker.prepareWrites(id, writes, currentSystemTime) + val updatedEvents = updatedWrites.flatMap(_.events) + Try(withBatch(batch => write(updatedEvents, tracker, batch))) match { + case Success(tracker2) => + timeTracker = tracker2 + updatedWrites.foreach(w => registry.pushWriteSuccess(w.events, w.initiator, w.requestor, w.instanceId)) + notificationChannel ! Updated(updatedEvents) + case Failure(e) => + writes.foreach(w => registry.pushWriteFailure(w.events, w.initiator, w.requestor, w.instanceId, e)) + } + } + private[eventuate] def write(events: Seq[DurableEvent], tracker: TimeTracker): TimeTracker = withBatch(write(events, tracker, _)) @@ -231,7 +238,7 @@ class LeveldbEventLog(val id: String, prefix: String) extends Actor with ActorLo var last = first - 1L val evts = iter.filter { evt => last = evt.localSequenceNr - evt.replicate(lower, filter) + evt.replicable(lower, filter) }.take(max).toVector ReadResult(evts, last) } diff --git a/src/sphinx/architecture.rst b/src/sphinx/architecture.rst index a20922af..83bb7bc6 100644 --- a/src/sphinx/architecture.rst +++ b/src/sphinx/architecture.rst @@ -50,10 +50,57 @@ Event replication across locations is reliable. Should there be a network partit Replication connections can also be configured with replication filters, so that only events matching one or more filter criteria are replicated. This is especially useful for smaller locations (for example, mobile devices) that only need to exchange a subset of events with other locations. +Event sourcing +-------------- + +Eventuate provides several abstractions for building event-sourced application components. They all derive application state from events stored in :ref:`event-logs` but follow different strategies for managing derived state. A summary is given in :ref:`arch-tab1`, more information in the following subsections. For further details, follow the links in the *Details* column. + +.. _arch-tab1: +.. list-table:: Table 1: Event sourcing abstractions + :widths: 20 70 10 + :header-rows: 1 + + * - Abstraction + - Description + - Details + * - :ref:`Event-sourced actor ` + - | Consumes events from its event log and + | emits new events to the same event log + | during command processing. Derived state + | is an in-memory write model, representing + | the command-side (C) of CQRS. + - - :ref:`User guide ` + - :ref:`Reference ` + - `API docs `_ + * - :ref:`Event-sourced view` + - | Consumes events from its event log but + | cannot emit new events. Derived state + | is an in-memory read model, representing + | the query-side (Q) of CQRS. + - - :ref:`User guide ` + - :ref:`Reference ` + - `API docs `_ + * - :ref:`Event-sourced writer` + - | Consumes events from its event log and + | batch-updates an external query database + | using event data. Derived state is a persistent + | read model, representing the query-side (Q) of + | CQRS. + - - :ref:`Reference ` + - `API docs `_ + * - :ref:`Event-sourced processor` + - | Consumes events from its event log and + | emits new events to a target event log + | during event processing. Processors can + | connect event logs to event processing + | pipelines or graphs. + - - :ref:`Reference ` + - `API docs `_ + .. _event-sourced-actors: Event-sourced actors --------------------- +~~~~~~~~~~~~~~~~~~~~ Event-sourced actors produce events to and consume events from an event log. During *command processing* they usually validate external commands against internal state and, if validation succeeds, write one or more events to their event log. During *event processing* they consume events they have written and update internal state by handling these events. This is the basic idea behind `event sourcing`_. When used in context of a `CQRS`_ architecture, event-sourced actors usually implement the command-side (C). @@ -88,17 +135,31 @@ Event-sourced actors may also interact with external services by sending command External service integration. +.. _event-sourced-views: + Event-sourced views -------------------- +~~~~~~~~~~~~~~~~~~~ + +.. role:: strike + :class: wy-text-strike + +Event-sourced views are a functional subset of event-sourced actors. They can only consume events from an event log but cannot produce new events. When used in context of a `CQRS`_ architecture, views implement the query-side (Q). + +Applications use event-sourced views to create in-memory read models from consumed events. Applications that want to create persistent read models should use :ref:`event-sourced-writers` instead. -Event-sourced views are a functional subset of event-sourced actors. They can only consume events from an event log but cannot produce new events. Views do not only maintain state in-memory but often persist it to a database. By additionally storing the sequence number of the last processed event in the database, writing can be made idempotent. When used in context of a `CQRS`_ architecture, views implement the query-side (Q). +.. _event-sourced-writers: -.. _processors: +Event-sourced writers +~~~~~~~~~~~~~~~~~~~~~ + +Event-sourced writers are a specialization of event-sourced views. They also consume events from an event log but persist the created read model to an external, application-specific query database (which can be a relational database, a graph database or a simple key value store, for example). Event-sourced writers update the query database in incremental batches. For query processing, applications use the external query database directly. + +.. _event-sourced-processors: Event-sourced processors ------------------------- +~~~~~~~~~~~~~~~~~~~~~~~~ -An event-sourced processor consumes events from one or more event logs, processes them (stateless or stateful) and produces the processed events to another event log. Event-sourced processors are gateways between otherwise partitioned event logs. They are not implemented yet. +An event-sourced processor consumes events from one event log, processes them (stateless or stateful) and produces the processed events to another event log. Event-sourced processors are idempotent producers and a specialization of event-sourced writers. Applications use processors to connect event logs to event stream processing pipelines and graphs. Connectivity to other stream processing solutions is given by :ref:`adapters`. .. _operation-based-crdts: @@ -133,6 +194,8 @@ Eventuate internally uses batching to optimize read and write throughput. It is - replicating events: Events are replicated in batches of configurable size. They are batch-read from a source log, batch-transferred over a replication connection and batch-written to a target log. +- writing to external databases: :ref:`event-sourced-writers` update persistent read models in incremental batches. When a write to an external query database is in progress, new event processing results are batched in-memory and written with the next scheduled write. + .. _adapters: Adapters @@ -168,6 +231,4 @@ We haven’t started yet working on this. Should you have any preferences or pro .. _ticket 103: https://github.com/RBMHTechnology/eventuate/issues/103 .. _let us know: https://groups.google.com/forum/#!forum/eventuate -.. [#] :ref:`processors` can be used to connect otherwise partitioned event logs. - - +.. [#] :ref:`event-sourced-processors` can be used to connect otherwise partitioned event logs. diff --git a/src/sphinx/code/EventSourcingDoc.scala b/src/sphinx/code/EventSourcingDoc.scala index e36bc85a..9e330629 100644 --- a/src/sphinx/code/EventSourcingDoc.scala +++ b/src/sphinx/code/EventSourcingDoc.scala @@ -221,5 +221,35 @@ object ChunkedReplay { //#chunk-size-max } //# +} + +object Processor { + import akka.actor._ + import com.rbmhtechnology.eventuate.StatelessProcessor + import scala.collection.immutable.Seq + + //#processor + class Processor( + val id: String, + val eventLog: ActorRef, + val targetEventLog: ActorRef) + extends StatelessProcessor { + // ... + + //# + override val onCommand: Receive = { + case cmd => // ... + } + //#processor + override val processEvent: Process = { + // exclude event + case "my-event-1" => Seq() + // transform event + case "my-event-2" => Seq("my-event-2a") + // transform and split event + case "my-event-3" => Seq("my-event-3a", "my-event-3b") + } + } + //# } \ No newline at end of file diff --git a/src/sphinx/example-application.rst b/src/sphinx/example-application.rst index 7eec8a20..cc29758d 100644 --- a/src/sphinx/example-application.rst +++ b/src/sphinx/example-application.rst @@ -14,7 +14,7 @@ Domain The ``Order`` domain object is defined as follows: -.. includecode:: ../test/scala/com/rbmhtechnology/example/Order.scala +.. includecode:: ../test/scala/com/rbmhtechnology/example/ordermgnt/Order.scala :snippet: order-definition Order creation and updates are tracked as events in a replicated event log. At each location, there is one event-sourced ``OrderActor`` instance per created ``Order`` instance and one event-sourced ``OrderView`` instance that counts the updates made to all orders. @@ -119,7 +119,7 @@ Disaster recovery can also start from a previous, older backup of the LevelDB di .. _sbt: http://www.scala-sbt.org/ -.. _Scala version: https://github.com/RBMHTechnology/eventuate/tree/master/src/test/scala/com/rbmhtechnology/example -.. _Java 8 version: https://github.com/RBMHTechnology/eventuate/tree/master/src/test/java/com/rbmhtechnology/example/japi +.. _Scala version: https://github.com/RBMHTechnology/eventuate/tree/master/src/test/scala/com/rbmhtechnology/example/ordermgnt +.. _Java 8 version: https://github.com/RBMHTechnology/eventuate/tree/master/src/test/java/com/rbmhtechnology/example/ordermgnt/japi .. _activator: https://www.typesafe.com/community/core-tools/activator-and-sbt .. _akka-eventuate-scala: https://www.typesafe.com/activator/template/akka-eventuate-scala diff --git a/src/sphinx/introduction.rst b/src/sphinx/introduction.rst index 5631a820..e73bd1fc 100644 --- a/src/sphinx/introduction.rst +++ b/src/sphinx/introduction.rst @@ -10,9 +10,11 @@ Eventuate is a toolkit for building distributed, highly-available and partition- Eventuate supports replication of application state through asynchronous event replication across *locations*. These can be geographically distinct locations\ [#]_, nodes within a data center or even processes on the same node, for example. Locations consume replicated events to re-construct application state locally. Eventuate allows multiple locations to concurrently update replicated application state (multi-master) and supports interactive and automated conflict resolution strategies in case of conflicting updates (incl. :ref:`operation-based-crdts`). -Replicated application state is managed by :ref:`event-sourced-actors` that exchange events via a replicated :ref:`Event log ` by producing events to and consuming events from that log. Here, event-sourced actors of the same type exchange events which is a special case of `event collaboration`_. Event-sourced actors of different type may also exchange events to implement a distributed event-driven application. For example, event-sourced actors can be microservices that reliably exchange events via one or more replicated event logs. Replication filters and custom event routing rules can be used to make event-based communication more directed. :ref:`adapters` provide connectivity to other stream processing solutions. +Replicated application state is managed by :ref:`event-sourced-actors` that exchange events via a replicated :ref:`Event log ` by producing events to and consuming events from that log. For state replication, events are exchanged between event-sourced actors of the same type, which is a special case of `event collaboration`_. In more general cases, event-sourced actors of different type exchange events to implement a distributed and event-driven business process, for example. These actors can be microservices that reliably exchange events via one or more replicated event logs. Replication filters and custom event routing rules can be used to make event-based communication more directed, if needed. -Events captured at a location are stored in a local event log and asynchronously replicated to other locations based on a replication protocol that preserves the *happened-before* relationship (= potential causality) of events. Causality is tracked with :ref:`vector-clocks`. For any two events, applications can determine if they have a potential causal relationship or if they are concurrent by comparing their vector timestamps. This is important to achieve `causal consistency`_ which is the strongest possible consistency for *always-on* applications i.e. applications that should remain available for writes during network partitions\ [#]_. +Eventuate also separates the command side of an application from its query side which is known as CQRS_. The command side is implemented with :ref:`event-sourced-actors`, the query side with :ref:`event-sourced-views` and :ref:`event-sourced-writers`. Different event logs can be connected with :ref:`event-sourced-processors` for building event processing pipelines or graphs. :ref:`adapters` provide connectivity to other stream processing solutions. + +Events captured at one location are stored in a local event log and asynchronously replicated to other locations based on a replication protocol that preserves the *happened-before* relationship (= potential causality) of events. Causality is tracked with :ref:`vector-clocks`. For any two events, applications can determine if they have a potential causal relationship or if they are concurrent by comparing their vector timestamps. This is important to achieve `causal consistency`_ which is the strongest possible consistency for *always-on* applications i.e. applications that should remain available for writes during network partitions\ [#]_. Individual locations remain available for writes during inter-location network partitions. Events that have been captured locally during a network partition are replicated later when the partition heals. Storing events locally and replicating them later can also be useful for applications distributed across temporarily connected devices, for example. @@ -29,6 +31,7 @@ Storage backends at individual locations are pluggable (see also :ref:`current-l .. _event collaboration: http://martinfowler.com/eaaDev/EventCollaboration.html .. _CAP: http://en.wikipedia.org/wiki/CAP_theorem .. _CRDT: http://en.wikipedia.org/wiki/Conflict-free_replicated_data_type +.. _CQRS: http://martinfowler.com/bliki/CQRS.html .. _causal consistency: http://en.wikipedia.org/wiki/Causal_consistency .. _Event sourcing at global scale: http://krasserm.github.io/2015/01/13/event-sourcing-at-global-scale/ diff --git a/src/sphinx/reference/event-log.rst b/src/sphinx/reference/event-log.rst index 51001f30..2276c644 100644 --- a/src/sphinx/reference/event-log.rst +++ b/src/sphinx/reference/event-log.rst @@ -96,7 +96,7 @@ The distribution of ``L``, ``M`` and ``N`` across locations may also differ:: N2 --- N3 .. note:: - Event replication is reliable and can recover from network failures. It can also recover from crashes of source and target locations i.e. event replication automatically resumes when a crashed location recovers. Replicated events are also guaranteed to be written *exactly-once* to a target log. This is possible because replication progress metadata are stored along with replicated events in the target log. This allows a replication target to reliably detect and ignore duplicates. Event-sourced actors and views can therefore rely on receiving a de-duplicated event stream. + Event replication is reliable and can recover from network failures. It can also recover from crashes of source and target locations i.e. event replication automatically resumes when a crashed location recovers. Replicated events are also guaranteed to be written *exactly-once* to a target log. This is possible because replication progress metadata are stored along with replicated events in the target log. This allows a replication target to reliably detect and ignore duplicates. Event-sourced actors, views, writers and processors can therefore rely on receiving a de-duplicated event stream. Replication endpoints ^^^^^^^^^^^^^^^^^^^^^ diff --git a/src/sphinx/reference/event-sourcing.rst b/src/sphinx/reference/event-sourcing.rst index 5002fddb..b077b803 100644 --- a/src/sphinx/reference/event-sourcing.rst +++ b/src/sphinx/reference/event-sourcing.rst @@ -1,7 +1,9 @@ +.. _ref-event-sourced-actors: + Event-sourced actors -------------------- -An introduction to event-sourced actors is already given in section :ref:`architecture` and the :ref:`user-guide`. Applications use event-sourced actors for writing events to an event log and for maintaining state on the command side (C) of a CQRS_ architecture. Event-sourced actors distinguish command processing from event processing. They must extend the EventsourcedActor_ trait and implement a :ref:`command-handler` and an :ref:`event-handler`. +An introduction to event-sourced actors is already given in section :ref:`architecture` and the :ref:`user-guide`. Applications use event-sourced actors for writing events to an event log and for maintaining in-memory write models on the command side (C) of a CQRS_ application. Event-sourced actors distinguish command processing from event processing. They must extend the EventsourcedActor_ trait and implement a :ref:`command-handler` and an :ref:`event-handler`. .. _command-handler: @@ -84,17 +86,104 @@ The value of ``sharedClockEntry`` may also be instance-specific, if required. .. includecode:: ../code/EventSourcingDoc.scala :snippet: clock-entry-instance +.. _ref-event-sourced-views: + Event-sourced views ------------------- -An introduction to event-sourced views is already given in section :ref:`architecture` and the :ref:`user-guide`. Applications use event-sourced views for consuming events from an event log and for maintaining state on the query side (Q) of a CQRS_ architecture. +An introduction to event-sourced views is already given in section :ref:`architecture` and the :ref:`user-guide`. Applications use event-sourced views for for maintaining in-memory read models on the query side (Q) of a CQRS_ application. Like event-sourced actors, event-sourced views distinguish command processing from event processing. They must implement the EventsourcedView_ trait. ``EventsourcedView`` is a functional subset of ``EventsourcedActor`` that cannot ``persist`` events. +.. _ref-event-sourced-writers: + +Event-sourced writers +--------------------- + +An introduction to event-sourced writers is already given in section :ref:`architecture`. Applications use event-sourced writers for maintaining persistent read models on the query side (Q) of a CQRS_ application. + +Like event-sourced views, event-sourced writers can only consume events from an event log but can make incremental batch updates to external, application-defined query databases. A query database can be a relational database, a graph database or whatever is needed by an application. Concrete writers must implement the EventsourcedWriter_ trait. + +This section outlines how to update a persistent read model in Cassandra_ from events consumed by an event-sourced writer. The relevant events are: + +.. includecode:: ../../test/scala/com/rbmhtechnology/example/querydb/Emitter.scala + :snippet: events + +The persistent read model is a ``CUSTOMER`` table with the following structure:: + + id | first | last | address + ----+--------+---------+------------- + 1 | Martin | Krasser | Somewhere 1 + 2 | Volker | Stampa | Somewhere 3 + 3 | ... | ... | ... + +The read model update progress is written to a separate ``PROGRESS`` table with a single ``sequence_nr`` column:: + + id | sequence_nr + ----+------------- + 0 | 3 + +The stored sequence number is that of the last successfully processed event. An event is considered as successfully processed if its data have been written to the ``CUSTOMER`` table. Only a single row is needed in the ``PROGRESS`` table to track the update progress for the whole ``CUSTOMER`` table. + +The event-sourced ``Writer`` in the following example implements ``EventsourcedWriter[Long, Unit]`` (where ``Long`` is the type of the initial read result and ``Unit`` the type of write results). It is initialized with an ``eventLog`` from which it consumes events and a Cassandra ``Session`` for writing event processing results. + +.. includecode:: ../../test/scala/com/rbmhtechnology/example/querydb/Writer.scala + :snippet: writer + +.. hint:: + The full example source code is available `here `_. + +On a high level, the example ``Writer`` implements the following behavior: + +- During initialization (after start or restart) it asynchronously ``read``\ s the stored update progress from the ``PROGRESS`` table. The read result is passed as argument to ``readSuccess`` and incremented by ``1`` before returning it to the caller. This causes the ``Writer`` to resume event processing from that position in the event log. +- Event are processed in ``onEvent`` by translating them to Cassandra update statements which are added to an in-memory ``batch`` of type ``Vector[BoundStatement]``. The batch is written to Cassandra when Eventuate calls the ``write`` method. +- The ``write`` method asynchronously updates the ``CUSTOMER`` table with the statements contained in ``batch`` and then updates the ``PROGRESS`` table with the sequence number of the last processed event. After having submitted the statements to Cassandra, the batch is cleared for further event processing. Event processing can run concurrently to write operations. +- A ``batch`` that has been updated while a write operation is in progress is written directly after the current write operation successfully completes. If no write operation is in progress, a change to ``batch`` is written immediately. This keeps read model update delays at a minimum and increases batch sizes under increasing load. Batch sizes can be limited with ``replayChunkSizeMax``. + +If a ``write`` (or ``read``) operation fails, the writer is restarted, by default, and resumes event processing from the last stored sequence number + ``1``. This behavior can be changed by overriding ``writeFailure`` (or ``readFailure``) from ``EventsourcedWriter``. + +.. note:: + The example does not use Cassandra ``BatchStatement``\ s for reasons explained in `this article `_. Atomic writes are not needed because database updates in this example are idempotent and can be re-tried in failure cases. Failure cases where idempotency is relevant are partial updates to the ``CUSTOMER`` table or a failed write to the ``PROGRESS`` table. ``BatchStatement``\ s should only be used when database updates are not idempotent and atomicity is required on database level. + +.. _stateful-writers: + +Stateful writers +~~~~~~~~~~~~~~~~ + +The above ``Writer`` implements a stateless writer. Although it accumulates batches while a write operation is in progress, it cannot recover permanent in-memory state from the event log, because event processing only starts from the last stored sequence number. If a writer needs to be stateful, it must return ``None`` from ``readSuccess``. In this case, event replay either starts from scratch or from a previously stored snapshot. A stateful writer should still write the update progress to the ``PROGRESS`` table but exclude events with a sequence number less than or equal to the stored sequence number from contributing to the update ``batch``. + +.. _ref-event-sourced-processors: + +Event-sourced processors +------------------------ + +An introduction to event-sourced processors is already given in section :ref:`architecture`. Applications use event-sourced processors to consume events form a source event log, process these events and write the processed events to a target event log. With processors, event logs can be connected to event stream processing pipelines and graphs. + +Event-sourced processors are a specialization of :ref:`event-sourced-writers` where the *external database* is a target event log. Concrete stateless processors must implement the StatelessProcessor_ trait, stateful processors the EventsourcedProcessor_ trait (see also :ref:`stateful-writers`). + +The following example ``Processor`` is an implementation of ``StatelessProcessor``. In addition to providing a source ``eventLog``, concrete processors must also provide a ``targetEventLog``: + +.. includecode:: ../code/EventSourcingDoc.scala + :snippet: processor + +The event handler implemented by a processor is ``processEvent``. The type of the handler is defined as: + +.. includecode:: ../../main/scala/com/rbmhtechnology/eventuate/EventsourcedProcessor.scala + :snippet: process + +Processed events to be written to the target event log are returned by the handler as ``Seq[Any]``. With this handler signature, events from the source log can be + +- excluded from being written to the target log by returning an empty ``Seq`` +- transformed one-to-one by returning a ``Seq`` of size 1 or even +- transformed and split by returning a ``Seq`` of size greater than ``1`` + +.. note:: + ``StatelessProcessor`` and ``EventsourcedProcessor`` internally ensure that writing to the target event log is idempotent. Applications don’t need to take extra care about idempotency. + State recovery -------------- -When an event-sourced actor or view is started or re-started, events are replayed to its ``onEvent`` handler so that internal state can be recovered\ [#]_. Event replay is initiated internally by sending a ``Replay`` message to the ``eventLog`` actor: +When an event-sourced actor or view is started or re-started, events are replayed to its ``onEvent`` handler so that internal state can be recovered\ [#]_. This is also the case for stateful event-sourced writers and processors. Event replay is initiated internally by sending a ``Replay`` message to the ``eventLog`` actor: .. includecode:: ../../main/scala/com/rbmhtechnology/eventuate/EventsourcedView.scala :snippet: replay @@ -103,21 +192,21 @@ The ``replay`` method is defined by EventsourcedView_ and automatically called w Sending a ``Replay`` message automatically registers the sending actor at its event log, so that newly written events can be immediately routed to that actor. If the actor is stopped it is automatically de-registered. -While an event-sourced actor or view is recovering i.e. replaying messages, its ``recovering`` method returns ``true``. If recovery successfully completes, its empty ``onRecovered()`` method is called which can be overridden by applications. +While an event-sourced actor, view, writer or processor is recovering i.e. replaying messages, its ``recovering`` method returns ``true``. If recovery successfully completes, its empty ``onRecovered()`` method is called which can be overridden by applications. During recovery, new commands are stashed_ and dispatched to ``onCommand`` after recovery successfully completed. This ensures that new commands never see partially recovered state. Backpressure ~~~~~~~~~~~~ -If event handling is slower than event replay, events are buffered in the mailboxes of event-sourced actors and views. In order to avoid out-of-memory errors, Eventuate has a built-in backpressure mechanism for event replay. +If event handling is slower than event replay, events are buffered in the mailboxes of event-sourced actors, views, writers and processors. In order to avoid out-of-memory errors, Eventuate has a built-in backpressure mechanism for event replay. After a configurable number of events, replay is suspended for giving event handlers time to catch up. When they are done, replay is automatically resumed. The default number of events to be replayed before replay is suspended can be configured with: .. includecode:: ../conf/common.conf :snippet: chunk-size-max -Concrete event-sourced actors and views can override the configured default value by overriding ``replayChunkSizeMax``: +Concrete event-sourced actors, views, writers and processors can override the configured default value by overriding ``replayChunkSizeMax``: .. includecode:: ../code/EventSourcingDoc.scala :snippet: chunk-size-max @@ -127,7 +216,7 @@ Concrete event-sourced actors and views can override the configured default valu Snapshots --------- -Recovery times increase with the number of events that are replayed to event-sourced actors or views. They can be decreased by starting event replay from a previously saved snapshot of internal state rather than replaying events from scratch. Event-sourced actors and views can save snapshots by calling ``save`` within their command handler: +Recovery times increase with the number of events that are replayed to event-sourced actors, views, stateful writers or stateful processors. They can be decreased by starting event replay from a previously saved snapshot of internal state rather than replaying events from scratch. Event-sourced actors, views, stateful writers and stateful processors can save snapshots by calling ``save`` within their command handler: .. includecode:: ../code/EventSourcingDoc.scala :snippet: snapshot-save @@ -136,7 +225,7 @@ Snapshots are saved asynchronously. On completion, a user-defined handler of typ An event-sourced actor that is :ref:`tracking-conflicting-versions` of application state can also save ``ConcurrentVersions[A, B]`` instances directly. One can even configure custom serializers for type parameter ``A`` as explained in section :ref:`snapshot-serialization`. -During recovery, the latest snapshot saved by an event-sourced actor or view is loaded and can be handled with the ``onSnapshot`` handler. This handler should initialize internal actor state from the loaded snapshot: +During recovery, the latest snapshot saved by an event-sourced actor, view, stateful writer or stateful processor is loaded and can be handled with the ``onSnapshot`` handler. This handler should initialize internal actor state from the loaded snapshot: .. includecode:: ../code/EventSourcingDoc.scala :snippet: snapshot-load @@ -156,7 +245,7 @@ Snapshots are currently stored in a directory that can be configured with .. includecode:: ../conf/snapshot.conf :snippet: snapshot-dir -in ``application.conf``. The maximum number of stored snapshots per event-sourced actor or view can be configured with +in ``application.conf``. The maximum number of stored snapshots per event-sourced actor, view, writer or processor can be configured with .. includecode:: ../conf/snapshot.conf :snippet: snapshot-num @@ -169,10 +258,10 @@ If this number is exceeded, older snapshots are automatically deleted. Event routing ------------- -An event that is emitted by an event-sourced actor can be routed to other event-sourced actors and views if they share an :ref:`event-log`\ [#]_ . The default event routing rules are: +An event that is emitted by an event-sourced actor or processor can be routed to other event-sourced actors, views, writers and processors if they share an :ref:`event-log`\ [#]_ . The default event routing rules are: -- If an event-sourced actor or view has an undefined ``aggregateId``, all events are routed to it. It may choose to handle only a subset of them though. -- If an event-sourced actor or view has a defined ``aggregateId``, only events emitted by event-sourced actors with the same ``aggregateId`` are routed to it. +- If an event-sourced actor, view, writer or processor has an undefined ``aggregateId``, all events are routed to it. It may choose to handle only a subset of them though. +- If an event-sourced actor, view, writer or processor has a defined ``aggregateId``, only events emitted by event-sourced actors or processors with the same ``aggregateId`` are routed to it. Routing destinations are defined during emission of an event and are persisted together with the event\ [#]_. This makes routing decisions repeatable during event replay and allows for routing rule changes without affecting past routing decisions. Applications can define additional routing destinations with the ``customDestinationAggregateIds`` parameter of ``persist``: @@ -243,7 +332,7 @@ Custom serializers can also be configured for the type parameter ``A`` of ``MVRe .. [#] The ``customDestinationAggregateIds`` parameter is described in section :ref:`event-routing`. .. [#] Writes from different event-sourced actors that have ``stateSync`` set to ``true`` are still batched, but not the writes from a single event-sourced actor. .. [#] Event replay can optionally start from :ref:`snapshots` of actor state. -.. [#] :ref:`processors` can additionally route events between event logs. +.. [#] Event-sourced processors can additionally route events between event logs. .. [#] The routing destinations of a DurableEvent_ can be obtained with its ``destinationAggregateIds`` method. .. _CQRS: http://martinfowler.com/bliki/CQRS.html @@ -252,10 +341,14 @@ Custom serializers can also be configured for the type parameter ``A`` of ``MVRe .. _Serializer: http://doc.akka.io/api/akka/2.3.9/#akka.serialization.Serializer .. _Protocol Buffers: https://developers.google.com/protocol-buffers/ .. _plausible clocks: https://github.com/RBMHTechnology/eventuate/issues/68 +.. _Cassandra: http://cassandra.apache.org/ .. _ConfirmedDelivery: ../latest/api/index.html#com.rbmhtechnology.eventuate.ConfirmedDelivery .. _DurableEvent: ../latest/api/index.html#com.rbmhtechnology.eventuate.DurableEvent .. _DurableEventSerializer: ../latest/api/index.html#com.rbmhtechnology.eventuate.serializer.DurableEventSerializer .. _EventsourcedActor: ../latest/api/index.html#com.rbmhtechnology.eventuate.EventsourcedActor .. _EventsourcedView: ../latest/api/index.html#com.rbmhtechnology.eventuate.EventsourcedView +.. _EventsourcedWriter: ../latest/api/index.html#com.rbmhtechnology.eventuate.EventsourcedWriter +.. _EventsourcedProcessor: ../latest/api/index.html#com.rbmhtechnology.eventuate.EventsourcedProcessor +.. _StatelessProcessor: ../latest/api/index.html#com.rbmhtechnology.eventuate.StatelessProcessor .. _ReplicationFilter: ../latest/api/index.html#com.rbmhtechnology.eventuate.ReplicationFilter diff --git a/src/sphinx/user-guide.rst b/src/sphinx/user-guide.rst index 131f7f51..75c11e22 100644 --- a/src/sphinx/user-guide.rst +++ b/src/sphinx/user-guide.rst @@ -17,6 +17,8 @@ This is a brief user guide to Eventuate. It is recommended to read sections :ref The user guide only scratches the surface of Eventuate. You can find further details in the :ref:`reference`. +.. _guide-event-sourced-actors: + Event-sourced actors -------------------- @@ -215,6 +217,8 @@ New operation-based CRDTs and their corresponding services can be developed with .. _this article: https://krasserm.github.io/2015/02/17/Implementing-operation-based-CRDTs/ +.. _guide-event-sourced-views: + Event-sourced views ------------------- @@ -227,6 +231,9 @@ Event-sourced views handle events in the same way as event-sourced actors by imp ``ExampleView`` implements the mandatory global unique ``id`` but doesn’t define an ``aggregateId``. A view that doesn’t define an ``aggregateId`` can consume events from all event-sourced actors on the same event log. If it defines an ``aggregateId`` it can only consume events from event-sourced actors with the same ``aggregateId`` (assuming the default :ref:`event-routing` rules). +.. hint:: + While event-sourced views maintain view state in-memory, :ref:`ref-event-sourced-writers` can be used to persist view state to external databases. + .. _conditional-commands: Conditional commands diff --git a/src/test/java/com/rbmhtechnology/example/japi/Order.java b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/Order.java similarity index 97% rename from src/test/java/com/rbmhtechnology/example/japi/Order.java rename to src/test/java/com/rbmhtechnology/example/ordermgnt/japi/Order.java index 726c66cc..03e385a0 100644 --- a/src/test/java/com/rbmhtechnology/example/japi/Order.java +++ b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/Order.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.rbmhtechnology.example.japi; +package com.rbmhtechnology.example.ordermgnt.japi; import fj.data.List; diff --git a/src/test/java/com/rbmhtechnology/example/japi/OrderActor.java b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderActor.java similarity index 99% rename from src/test/java/com/rbmhtechnology/example/japi/OrderActor.java rename to src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderActor.java index af5602e1..ed01e6f3 100644 --- a/src/test/java/com/rbmhtechnology/example/japi/OrderActor.java +++ b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderActor.java @@ -14,9 +14,8 @@ * limitations under the License. */ -package com.rbmhtechnology.example.japi; +package com.rbmhtechnology.example.ordermgnt.japi; -import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/src/test/java/com/rbmhtechnology/example/japi/OrderExample.java b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderExample.java similarity index 96% rename from src/test/java/com/rbmhtechnology/example/japi/OrderExample.java rename to src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderExample.java index c446ae21..2e24fc90 100644 --- a/src/test/java/com/rbmhtechnology/example/japi/OrderExample.java +++ b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderExample.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.rbmhtechnology.example.japi; +package com.rbmhtechnology.example.ordermgnt.japi; import java.io.BufferedReader; import java.io.IOException; @@ -33,8 +33,8 @@ import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLog; import com.typesafe.config.ConfigFactory; -import static com.rbmhtechnology.example.japi.OrderActor.*; -import static com.rbmhtechnology.example.japi.OrderView.*; +import static com.rbmhtechnology.example.ordermgnt.japi.OrderActor.*; +import static com.rbmhtechnology.example.ordermgnt.japi.OrderView.*; public class OrderExample extends AbstractActor { private static Pattern pExit = Pattern.compile("^exit\\s*"); @@ -82,7 +82,7 @@ public OrderExample(ActorRef manager, ActorRef view) { .match(CommandFailure.class, r -> r.getCause() instanceof ConflictDetectedException, r -> { ConflictDetectedException cause = (ConflictDetectedException) r.getCause(); System.out.println(cause.getMessage() + ", select one of the following versions to resolve conflict"); - OrderActor.printOrder(cause.getVersions()); + printOrder(cause.getVersions()); prompt(); }) .match(CommandFailure.class, r -> { diff --git a/src/test/java/com/rbmhtechnology/example/japi/OrderId.java b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderId.java similarity index 94% rename from src/test/java/com/rbmhtechnology/example/japi/OrderId.java rename to src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderId.java index f12ca77a..cf16ddc7 100644 --- a/src/test/java/com/rbmhtechnology/example/japi/OrderId.java +++ b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderId.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.rbmhtechnology.example.japi; +package com.rbmhtechnology.example.ordermgnt.japi; import java.io.Serializable; diff --git a/src/test/java/com/rbmhtechnology/example/japi/OrderManager.java b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderManager.java similarity index 97% rename from src/test/java/com/rbmhtechnology/example/japi/OrderManager.java rename to src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderManager.java index d3db26ad..e2a28ea3 100644 --- a/src/test/java/com/rbmhtechnology/example/japi/OrderManager.java +++ b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderManager.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.rbmhtechnology.example.japi; +package com.rbmhtechnology.example.ordermgnt.japi; import java.util.HashMap; import java.util.Map; @@ -36,7 +36,7 @@ import static java.util.stream.Collectors.reducing; import static com.rbmhtechnology.eventuate.VersionedAggregate.*; -import static com.rbmhtechnology.example.japi.OrderActor.*; +import static com.rbmhtechnology.example.ordermgnt.japi.OrderActor.*; public class OrderManager extends AbstractEventsourcedView { diff --git a/src/test/java/com/rbmhtechnology/example/japi/OrderSerializer.java b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderSerializer.java similarity index 97% rename from src/test/java/com/rbmhtechnology/example/japi/OrderSerializer.java rename to src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderSerializer.java index a22fe51c..88930944 100644 --- a/src/test/java/com/rbmhtechnology/example/japi/OrderSerializer.java +++ b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderSerializer.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.rbmhtechnology.example.japi; +package com.rbmhtechnology.example.ordermgnt.japi; import akka.actor.ExtendedActorSystem; import akka.serialization.*; diff --git a/src/test/java/com/rbmhtechnology/example/japi/OrderView.java b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderView.java similarity index 95% rename from src/test/java/com/rbmhtechnology/example/japi/OrderView.java rename to src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderView.java index 857265f0..c07b282e 100644 --- a/src/test/java/com/rbmhtechnology/example/japi/OrderView.java +++ b/src/test/java/com/rbmhtechnology/example/ordermgnt/japi/OrderView.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.rbmhtechnology.example.japi; +package com.rbmhtechnology.example.ordermgnt.japi; import java.util.HashMap; import java.util.Map; @@ -23,7 +23,7 @@ import akka.japi.pf.ReceiveBuilder; import com.rbmhtechnology.eventuate.AbstractEventsourcedView; -import com.rbmhtechnology.example.japi.OrderActor.*; +import com.rbmhtechnology.example.ordermgnt.japi.OrderActor.*; public class OrderView extends AbstractEventsourcedView { private Map updateCounts; diff --git a/src/test/resources/location-A.conf b/src/test/resources/location-A.conf index 3c425f02..b94c07a8 100644 --- a/src/test/resources/location-A.conf +++ b/src/test/resources/location-A.conf @@ -1,10 +1,10 @@ akka { actor { serializers { - example-order = "com.rbmhtechnology.example.japi.OrderSerializer" + example-order = "com.rbmhtechnology.example.ordermgnt.japi.OrderSerializer" } serialization-bindings { - "com.rbmhtechnology.example.japi.Order" = example-order + "com.rbmhtechnology.example.ordermgnt.japi.Order" = example-order } provider = "akka.remote.RemoteActorRefProvider" } diff --git a/src/test/resources/location-B.conf b/src/test/resources/location-B.conf index 6c282dbe..5ec433d5 100644 --- a/src/test/resources/location-B.conf +++ b/src/test/resources/location-B.conf @@ -1,10 +1,10 @@ akka { actor { serializers { - example-order = "com.rbmhtechnology.example.japi.OrderSerializer" + example-order = "com.rbmhtechnology.example.ordermgnt.japi.OrderSerializer" } serialization-bindings { - "com.rbmhtechnology.example.japi.Order" = example-order + "com.rbmhtechnology.example.ordermgnt.japi.Order" = example-order } provider = "akka.remote.RemoteActorRefProvider" } diff --git a/src/test/resources/location-C.conf b/src/test/resources/location-C.conf index 953a3ee2..1e01d6c9 100644 --- a/src/test/resources/location-C.conf +++ b/src/test/resources/location-C.conf @@ -1,10 +1,10 @@ akka { actor { serializers { - example-order = "com.rbmhtechnology.example.japi.OrderSerializer" + example-order = "com.rbmhtechnology.example.ordermgnt.japi.OrderSerializer" } serialization-bindings { - "com.rbmhtechnology.example.japi.Order" = example-order + "com.rbmhtechnology.example.ordermgnt.japi.Order" = example-order } provider = "akka.remote.RemoteActorRefProvider" } diff --git a/src/test/resources/location-D.conf b/src/test/resources/location-D.conf index 66454afc..73bbe9c3 100644 --- a/src/test/resources/location-D.conf +++ b/src/test/resources/location-D.conf @@ -1,10 +1,10 @@ akka { actor { serializers { - example-order = "com.rbmhtechnology.example.japi.OrderSerializer" + example-order = "com.rbmhtechnology.example.ordermgnt.japi.OrderSerializer" } serialization-bindings { - "com.rbmhtechnology.example.japi.Order" = example-order + "com.rbmhtechnology.example.ordermgnt.japi.Order" = example-order } provider = "akka.remote.RemoteActorRefProvider" } diff --git a/src/test/resources/location-E.conf b/src/test/resources/location-E.conf index e4931a50..46260e3a 100644 --- a/src/test/resources/location-E.conf +++ b/src/test/resources/location-E.conf @@ -1,10 +1,10 @@ akka { actor { serializers { - example-order = "com.rbmhtechnology.example.japi.OrderSerializer" + example-order = "com.rbmhtechnology.example.ordermgnt.japi.OrderSerializer" } serialization-bindings { - "com.rbmhtechnology.example.japi.Order" = example-order + "com.rbmhtechnology.example.ordermgnt.japi.Order" = example-order } provider = "akka.remote.RemoteActorRefProvider" } diff --git a/src/test/resources/location-F.conf b/src/test/resources/location-F.conf index 1b7683e0..5856be70 100644 --- a/src/test/resources/location-F.conf +++ b/src/test/resources/location-F.conf @@ -1,10 +1,10 @@ akka { actor { serializers { - example-order = "com.rbmhtechnology.example.japi.OrderSerializer" + example-order = "com.rbmhtechnology.example.ordermgnt.japi.OrderSerializer" } serialization-bindings { - "com.rbmhtechnology.example.japi.Order" = example-order + "com.rbmhtechnology.example.ordermgnt.japi.Order" = example-order } provider = "akka.remote.RemoteActorRefProvider" } diff --git a/src/test/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorSpec.scala b/src/test/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorSpec.scala new file mode 100644 index 00000000..2ec66336 --- /dev/null +++ b/src/test/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorSpec.scala @@ -0,0 +1,241 @@ +/* + * Copyright (C) 2015 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.eventuate + +import akka.actor._ +import akka.testkit._ + +import com.rbmhtechnology.eventuate.EventsourcedViewSpec._ + +import org.scalatest._ + +import scala.collection.immutable.Seq + +object EventsourcedProcessorSpec { + import DurableEvent._ + + val eventA = event("a", 1) + val eventB = event("b", 2) + val eventC = event("c", 3) + + val eventA1 = update(eventA.copy("a-1")) + val eventA2 = update(eventA.copy("a-2")) + val eventB1 = update(eventB.copy("b-1")) + val eventB2 = update(eventB.copy("b-2")) + val eventC1 = update(eventC.copy("c-1")) + val eventC2 = update(eventC.copy("c-2")) + + class TestProcessor(srcProbe: ActorRef, trgProbe: ActorRef, appProbe: ActorRef, sce: Boolean) extends EventsourcedProcessor { + override val id = emitterIdB + override val eventLog = srcProbe + override val targetEventLog = trgProbe + override val replayChunkSizeMax = 2 + override def sharedClockEntry = sce + + override val onCommand: Receive = { + case cmd => appProbe ! cmd + } + + override val processEvent: Process = { + case evt: String => Seq(s"${evt}-1", s"${evt}-2") + } + + override def writeSuccess(result: Long): Unit = { + appProbe ! result + super.writeSuccess(result) + } + + override def writeFailure(cause: Throwable): Unit = { + appProbe ! cause + super.writeFailure(cause) + } + } + + def update(event: DurableEvent): DurableEvent = + event.copy(emitterId = emitterIdB, processId = UndefinedLogId, localLogId = UndefinedLogId, localSequenceNr = UndefinedSequenceNr) +} + +class EventsourcedProcessorSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach { + import EventsourcedProcessorSpec._ + import EventsourcingProtocol._ + import ReplicationProtocol._ + + var instanceId: Int = _ + var srcProbe: TestProbe = _ + var trgProbe: TestProbe = _ + var appProbe: TestProbe = _ + + override def beforeEach(): Unit = { + instanceId = EventsourcedView.instanceIdCounter.get + srcProbe = TestProbe() + trgProbe = TestProbe() + appProbe = TestProbe() + } + + override def afterAll(): Unit = + TestKit.shutdownActorSystem(system) + + def unrecoveredProcessor(stateful: Boolean = true, sharedClockEntry: Boolean = true): ActorRef = + if (stateful) system.actorOf(Props(new TestProcessor(srcProbe.ref, trgProbe.ref, appProbe.ref, sharedClockEntry))) + else system.actorOf(Props(new TestProcessor(srcProbe.ref, trgProbe.ref, appProbe.ref, sharedClockEntry) with StatelessProcessor)) + + def recoveredProcessor(stateful: Boolean = true, sharedClockEntry: Boolean = true): ActorRef = + processRecover(unrecoveredProcessor(stateful, sharedClockEntry), stateful) + + def processRecover(actor: ActorRef, stateful: Boolean = true): ActorRef = { + processRead(0) + if (stateful) { + processLoad(actor) + processReplay(actor, 1) + } else { + processReplay(actor, 1) + } + actor + } + + def processLoad(actor: ActorRef, instanceId: Int = instanceId): Unit = { + srcProbe.expectMsg(LoadSnapshot(emitterIdB, actor, instanceId)) + actor ! LoadSnapshotSuccess(None, instanceId) + } + + def processReplay(actor: ActorRef, fromSequenceNr: Long, instanceId: Int = instanceId): Unit = + processReplay(actor, fromSequenceNr, fromSequenceNr - 1L, instanceId) + + def processReplay(actor: ActorRef, fromSequenceNr: Long, storedSequenceNr: Long, instanceId: Int): Unit = { + srcProbe.expectMsg(Replay(fromSequenceNr, 2, actor, instanceId)) + actor ! ReplaySuccess(instanceId) + appProbe.expectMsg(storedSequenceNr) + } + + def processRead(progress: Long, success: Boolean = true): Unit = { + trgProbe.expectMsg(GetReplicationProgress(emitterIdB)) + if (success) processResult(GetReplicationProgressSuccess(emitterIdB, progress, VectorTime())) + else processResult(GetReplicationProgressFailure(boom)) + } + + def processWrite(progress: Long, events: Seq[DurableEvent], success: Boolean = true): Unit = { + trgProbe.expectMsg(ReplicationWrite(events, emitterIdB, progress, VectorTime())) + if (success) { + processResult(ReplicationWriteSuccess(events.size, progress, VectorTime())) + appProbe.expectMsg(progress) + } else { + processResult(ReplicationWriteFailure(boom)) + appProbe.expectMsg(boom) + } + } + + def processResult(result: Any): Unit = + trgProbe.sender() ! Status.Success(result) + + "A stateful EventsourcedProcessor" must { + "recover" in { + recoveredProcessor() + } + "restart on failed read by default" in { + val actor = unrecoveredProcessor() + processRead(0, success = false) + processRead(0) + processLoad(actor, instanceId + 1) + processReplay(actor, 1, instanceId + 1) + } + "recover on failed write by default" in { + val actor = unrecoveredProcessor() + processRead(0) + processLoad(actor) + processReplay(actor, 1) + actor ! Written(eventA) + processWrite(1, Seq(eventA1, eventA2), success = false) + processRead(0) + processLoad(actor, instanceId + 1) + processReplay(actor, 1, instanceId + 1) + actor ! Written(eventA) + processWrite(1, Seq(eventA1, eventA2)) + } + "write to target log during and after recovery" in { + val actor = unrecoveredProcessor() + processRead(0) + processLoad(actor) + srcProbe.expectMsg(Replay(1, 2, actor, instanceId)) + actor ! Replaying(eventA, instanceId) + actor ! Replaying(eventB, instanceId) + actor.tell(ReplaySuspended(instanceId), srcProbe.ref) + processWrite(2, Seq(eventA1, eventA2, eventB1, eventB2)) + srcProbe.expectMsg(ReplayNext(2, instanceId)) + actor ! Replaying(eventC, instanceId) + actor ! ReplaySuccess(instanceId) + processWrite(3, Seq(eventC1, eventC2)) + } + "write to target log and process concurrently" in { + val actor = recoveredProcessor() + actor ! Written(eventA) + actor ! Written(eventB) + actor ! Written(eventC) + processWrite(1, Seq(eventA1, eventA2)) + processWrite(3, Seq(eventB1, eventB2, eventC1, eventC2)) + } + "exclude events from write with sequenceNr <= storedSequenceNr" in { + val actor = unrecoveredProcessor() + processRead(3) + processLoad(actor) + processReplay(actor, 1, 3, instanceId) + actor ! Written(eventA) + appProbe.expectMsg(3) + } + "include events to write with sequenceNr > storedSequenceNr" in { + val actor = unrecoveredProcessor() + processRead(2) + processLoad(actor) + processReplay(actor, 1, 2, instanceId) + actor ! Written(eventA) + appProbe.expectMsg(2) + actor ! Written(eventB) + appProbe.expectMsg(2) + actor ! Written(eventC) + processWrite(3, Seq(eventC1, eventC2)) + } + } + + "A stateful EventsourcedProcessor" when { + "using its own vector clock entry" must { + "update the process id and vector time of emitted events" in { + val actor = recoveredProcessor(sharedClockEntry = false) + actor ! Written(eventA) + processWrite(1, Seq( + eventA1.copy(processId = emitterIdB, vectorTimestamp = eventA.vectorTimestamp.merge(VectorTime(emitterIdB -> 2L))), + eventA2.copy(processId = emitterIdB, vectorTimestamp = eventA.vectorTimestamp.merge(VectorTime(emitterIdB -> 3L))))) + } + } + } + + "A stateless EventsourcedProcessor" must { + "resume" in { + recoveredProcessor(stateful = false) + } + "resume on failed read by default" in { + val actor = unrecoveredProcessor(stateful = false) + processRead(3) + processReplay(actor, 4) + } + "resume on failed write by default" in { + val actor = recoveredProcessor(stateful = false) + actor ! Written(eventA) + processWrite(1, Seq(eventA1, eventA2), success = false) + processRead(3) + processReplay(actor, 4, instanceId + 1) + } + } +} \ No newline at end of file diff --git a/src/test/scala/com/rbmhtechnology/eventuate/EventsourcedWriterSpec.scala b/src/test/scala/com/rbmhtechnology/eventuate/EventsourcedWriterSpec.scala new file mode 100644 index 00000000..f4ecbf75 --- /dev/null +++ b/src/test/scala/com/rbmhtechnology/eventuate/EventsourcedWriterSpec.scala @@ -0,0 +1,261 @@ +/* + * Copyright (C) 2015 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.eventuate + +import akka.actor._ +import akka.pattern.ask +import akka.testkit._ +import akka.util.Timeout + +import com.rbmhtechnology.eventuate.EventsourcedViewSpec._ + +import org.scalatest._ + +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.util._ + +object EventsourcedWriterSpec { + class TestEventsourcedWriter(logProbe: ActorRef, appProbe: ActorRef, rwProbe: ActorRef, readSuccessResult: Option[Long]) extends EventsourcedWriter[String, String] { + implicit val timeout = Timeout(3.seconds) + + override val id = emitterIdB + override val eventLog = logProbe + override val replayChunkSizeMax: Int = 2 + + override def onCommand: Receive = { + case cmd => appProbe ! cmd + } + + override val onEvent: Receive = { + case evt: String => + appProbe ! ((evt, lastSequenceNr)) + } + + override def read(): Future[String] = + rwProbe.ask("r").mapTo[String] + + override def write(): Future[String] = { + rwProbe.ask("w").mapTo[String] + } + + override def readSuccess(result: String): Option[Long] = { + appProbe ! result + readSuccessResult + } + + override def writeSuccess(result: String): Unit = + appProbe ! result + + override def readFailure(cause: Throwable): Unit = { + appProbe ! cause + super.readFailure(cause) + } + + override def writeFailure(cause: Throwable): Unit = { + appProbe ! cause + super.writeFailure(cause) + } + } +} + +class EventsourcedWriterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach { + import EventsourcedWriterSpec._ + import EventsourcingProtocol._ + + var instanceId: Int = _ + var logProbe: TestProbe = _ + var appProbe: TestProbe = _ + var rwProbe: TestProbe = _ + + override def beforeEach(): Unit = { + instanceId = EventsourcedView.instanceIdCounter.get + logProbe = TestProbe() + appProbe = TestProbe() + rwProbe = TestProbe() + } + + override def afterAll(): Unit = + TestKit.shutdownActorSystem(system) + + def unrecoveredEventsourcedWriter(readSuccessResult: Option[Long] = None): ActorRef = + system.actorOf(Props(new TestEventsourcedWriter(logProbe.ref, appProbe.ref, rwProbe.ref, readSuccessResult))) + + def recoveredEventsourcedWriter(readSuccessResult: Option[Long] = None): ActorRef = + processRecover(unrecoveredEventsourcedWriter(readSuccessResult), readSuccessResult) + + def processRecover(actor: ActorRef, readSuccessResult: Option[Long] = None): ActorRef = { + processRead(Success("rs")) + readSuccessResult match { + case Some(snr) => + processReplay(actor, snr) + case None => + processLoad(actor) + processReplay(actor, 1) + } + processWrite(Success("ws")) + actor + } + + def processLoad(actor: ActorRef, instanceId: Int = instanceId): Unit = { + logProbe.expectMsg(LoadSnapshot(emitterIdB, actor, instanceId)) + actor ! LoadSnapshotSuccess(None, instanceId) + } + + def processReplay(actor: ActorRef, fromSequenceNr: Long): Unit = { + logProbe.expectMsg(Replay(fromSequenceNr, 2, actor, instanceId)) + actor ! ReplaySuccess(instanceId) + } + + def processRead(result: Try[String]): Unit = { + rwProbe.expectMsg("r") + processResult(result) + } + + def processWrite(result: Try[String]): Unit = { + rwProbe.expectMsg("w") + processResult(result) + } + + def processResult(result: Try[String]): Unit = { + result match { + case Success(s) => + rwProbe.sender() ! Status.Success(s) + appProbe.expectMsg(s) + case Failure(e) => + rwProbe.sender() ! Status.Failure(e) + appProbe.expectMsg(e) + } + } + + "An EventsourcedWriter" when { + "recovering" must { + "recover after an initial read with an undefined return value" in { + recoveredEventsourcedWriter(None) + } + "restart on failed read by default" in { + val actor = unrecoveredEventsourcedWriter() + processRead(Failure(boom)) + processRead(Success("rs")) + processLoad(actor, instanceId + 1) + logProbe.expectMsg(Replay(1, 2, actor, instanceId + 1)) + actor ! ReplaySuccess(instanceId + 1) + processWrite(Success("ws")) + } + "restart on failed write by default" in { + val actor = unrecoveredEventsourcedWriter() + processRead(Success("rs")) + processLoad(actor) + logProbe.expectMsg(Replay(1, 2, actor, instanceId)) + actor ! Replaying(event("a", 1), instanceId) + actor ! ReplaySuccess(instanceId) + appProbe.expectMsg(("a", 1)) + processWrite(Failure(boom)) + processRead(Success("rs")) + processLoad(actor, instanceId + 1) + logProbe.expectMsg(Replay(1, 2, actor, instanceId + 1)) + actor ! Replaying(event("a", 1), instanceId + 1) + actor ! ReplaySuccess(instanceId + 1) + appProbe.expectMsg(("a", 1)) + processWrite(Success("ws")) + } + "trigger writes when recovery is suspended and completed" in { + val actor = unrecoveredEventsourcedWriter() + processRead(Success("rs")) + processLoad(actor) + logProbe.expectMsg(Replay(1, 2, actor, instanceId)) + actor ! Replaying(event("a", 1), instanceId) + actor ! Replaying(event("b", 2), instanceId) + actor.tell(ReplaySuspended(instanceId), logProbe.ref) + appProbe.expectMsg(("a", 1)) + appProbe.expectMsg(("b", 2)) + processWrite(Success("ws")) + logProbe.expectMsg(ReplayNext(2, instanceId)) + actor ! Replaying(event("c", 3), instanceId) + actor ! ReplaySuccess(instanceId) + appProbe.expectMsg(("c", 3)) + processWrite(Success("ws")) + } + "stash commands while read is in progress" in { + val promise = Promise[String]() + val actor = unrecoveredEventsourcedWriter() + actor ! "cmd" + processRead(Success("rs")) + processLoad(actor) + processReplay(actor, 1) + appProbe.expectMsg("cmd") + processWrite(Success("ws")) + } + "stash commands while write is in progress after suspended replay" in { + val actor = unrecoveredEventsourcedWriter() + processRead(Success("rs")) + processLoad(actor) + logProbe.expectMsg(Replay(1, 2, actor, instanceId)) + actor ! Replaying(event("a", 1), instanceId) + actor ! Replaying(event("b", 2), instanceId) + actor.tell(ReplaySuspended(instanceId), logProbe.ref) + actor ! "cmd" + appProbe.expectMsg(("a", 1)) + appProbe.expectMsg(("b", 2)) + processWrite(Success("ws")) + logProbe.expectMsg(ReplayNext(2, instanceId)) + actor ! Replaying(event("c", 3), instanceId) + actor ! ReplaySuccess(instanceId) + appProbe.expectMsg(("c", 3)) + appProbe.expectMsg("cmd") + processWrite(Success("ws")) + } + "handle commands while write is in progress after completed replay" in { + val promise = Promise[String]() + val actor = unrecoveredEventsourcedWriter() + processRead(Success("rs")) + processLoad(actor) + processReplay(actor, 1) + actor ! "cmd" + appProbe.expectMsg("cmd") + processWrite(Success("ws")) + } + } + + "resuming" must { + "replay after an initial read using the defined return value as starting position" in { + recoveredEventsourcedWriter(Some(3)) + } + } + + "recovered" must { + "handle commands while write is in progress" in { + val promise = Promise[String]() + val actor = processRecover(unrecoveredEventsourcedWriter()) + actor ! Written(event("a", 1)) // trigger write + actor ! "cmd" + appProbe.expectMsg(("a", 1)) + appProbe.expectMsg("cmd") + processWrite(Success("ws")) + } + "handle events while write is in progress" in { + val actor = processRecover(unrecoveredEventsourcedWriter()) + actor ! Written(event("a", 1)) // trigger write 1 + actor ! Written(event("b", 2)) // trigger write 2 (after write 1 completed) + appProbe.expectMsg(("a", 1)) + appProbe.expectMsg(("b", 2)) + processWrite(Success("ws")) + processWrite(Success("ws")) + } + } + } +} \ No newline at end of file diff --git a/src/test/scala/com/rbmhtechnology/example/Order.scala b/src/test/scala/com/rbmhtechnology/example/ordermgnt/Order.scala similarity index 96% rename from src/test/scala/com/rbmhtechnology/example/Order.scala rename to src/test/scala/com/rbmhtechnology/example/ordermgnt/Order.scala index 6caa91c3..fcf594e9 100644 --- a/src/test/scala/com/rbmhtechnology/example/Order.scala +++ b/src/test/scala/com/rbmhtechnology/example/ordermgnt/Order.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.rbmhtechnology.example +package com.rbmhtechnology.example.ordermgnt //#order-definition case class Order(id: String, items: List[String] = Nil, cancelled: Boolean = false) { diff --git a/src/test/scala/com/rbmhtechnology/example/OrderActor.scala b/src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderActor.scala similarity index 99% rename from src/test/scala/com/rbmhtechnology/example/OrderActor.scala rename to src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderActor.scala index be066ee9..e228a73b 100644 --- a/src/test/scala/com/rbmhtechnology/example/OrderActor.scala +++ b/src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderActor.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.rbmhtechnology.example +package com.rbmhtechnology.example.ordermgnt import akka.actor._ diff --git a/src/test/scala/com/rbmhtechnology/example/OrderExample.scala b/src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderExample.scala similarity index 98% rename from src/test/scala/com/rbmhtechnology/example/OrderExample.scala rename to src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderExample.scala index 14d1c589..789682dc 100644 --- a/src/test/scala/com/rbmhtechnology/example/OrderExample.scala +++ b/src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderExample.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.rbmhtechnology.example +package com.rbmhtechnology.example.ordermgnt import akka.actor._ diff --git a/src/test/scala/com/rbmhtechnology/example/OrderManager.scala b/src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderManager.scala similarity index 98% rename from src/test/scala/com/rbmhtechnology/example/OrderManager.scala rename to src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderManager.scala index 88bfa69c..c6d1ea6d 100644 --- a/src/test/scala/com/rbmhtechnology/example/OrderManager.scala +++ b/src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderManager.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.rbmhtechnology.example +package com.rbmhtechnology.example.ordermgnt import akka.actor._ import akka.pattern.ask diff --git a/src/test/scala/com/rbmhtechnology/example/OrderView.scala b/src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderView.scala similarity index 97% rename from src/test/scala/com/rbmhtechnology/example/OrderView.scala rename to src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderView.scala index 7af4959a..f49f37de 100644 --- a/src/test/scala/com/rbmhtechnology/example/OrderView.scala +++ b/src/test/scala/com/rbmhtechnology/example/ordermgnt/OrderView.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.rbmhtechnology.example +package com.rbmhtechnology.example.ordermgnt import akka.actor.ActorRef diff --git a/src/test/scala/com/rbmhtechnology/example/package.scala b/src/test/scala/com/rbmhtechnology/example/ordermgnt/package.scala similarity index 94% rename from src/test/scala/com/rbmhtechnology/example/package.scala rename to src/test/scala/com/rbmhtechnology/example/ordermgnt/package.scala index a162ebd0..f11a0da8 100644 --- a/src/test/scala/com/rbmhtechnology/example/package.scala +++ b/src/test/scala/com/rbmhtechnology/example/ordermgnt/package.scala @@ -14,11 +14,11 @@ * limitations under the License. */ -package com.rbmhtechnology +package com.rbmhtechnology.example import com.rbmhtechnology.eventuate.Versioned -package object example { +package object ordermgnt { def printOrder(versions: Seq[Versioned[Order]]): Unit = { if (versions.size > 1) { println("Conflict:") diff --git a/src/test/scala/com/rbmhtechnology/example/querydb/Emitter.scala b/src/test/scala/com/rbmhtechnology/example/querydb/Emitter.scala new file mode 100644 index 00000000..e41bca5e --- /dev/null +++ b/src/test/scala/com/rbmhtechnology/example/querydb/Emitter.scala @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2015 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.example.querydb + +import akka.actor._ + +import com.rbmhtechnology.eventuate._ + +import scala.util._ + +// ----------------- +// Domain commands +// ----------------- + +case class CreateCustomer(first: String, last: String, address: String) +case class UpdateAddress(cid: Long, address: String) + +// --------------- +// Domain events +// --------------- + +//#events +case class CustomerCreated(cid: Long, first: String, last: String, address: String) +case class AddressUpdated(cid: Long, address: String) +//# + +// --------------- +// Event emitter +// --------------- + +class Emitter(val id: String, val eventLog: ActorRef) extends EventsourcedActor { + private var highestCustomerId = 0L + + override val onCommand: Receive = { + case CreateCustomer(first, last, address) => + persist(CustomerCreated(highestCustomerId + 1L, first, last, address)) { + case Success(c) => onEvent(c); sender() ! c + case Failure(e) => throw e + } + case UpdateAddress(cid, address) if cid <= highestCustomerId => + persist(AddressUpdated(cid, address)) { + case Success(c) => onEvent(c); sender() ! c + case Failure(e) => throw e + } + case UpdateAddress(cid, _) => + sender() ! new Exception(s"Customer with $cid does not exist") + } + + override val onEvent: Receive = { + case CustomerCreated(cid, first, last, address) => + highestCustomerId = cid + case AddressUpdated(_, _) => + // ... + } +} + diff --git a/src/test/scala/com/rbmhtechnology/example/querydb/Writer.scala b/src/test/scala/com/rbmhtechnology/example/querydb/Writer.scala new file mode 100644 index 00000000..566176c4 --- /dev/null +++ b/src/test/scala/com/rbmhtechnology/example/querydb/Writer.scala @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2015 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.example.querydb + +//#writer +import java.lang.{Long => JLong} + +import akka.actor.ActorRef + +import com.datastax.driver.core._ +import com.rbmhtechnology.eventuate.EventsourcedWriter + +import scala.concurrent.Future + +/** + * Processes `CustomerCreated` and `AddressUpdated` events and updates + * a `CUSTOMER` table in Cassandra with incremental batches. + */ +class Writer(val id: String, val eventLog: ActorRef, session: Session) + extends EventsourcedWriter[Long, Unit] { + + import com.rbmhtechnology.eventuate.log.cassandra.{listenableFutureToFuture => ftr} + import context.dispatcher + + val insertCustomerStmt = session.prepare( + "INSERT INTO CUSTOMER (id, first, last, address) VALUES (?, ?, ?, ?)") + + val updateCustomerStmt = session.prepare( + "UPDATE CUSTOMER SET address = ? WHERE id = ?") + + val updateProgressStmt = session.prepare( + "UPDATE PROGRESS SET sequence_nr = ? WHERE id = 0") + + /** + * Batch of Cassandra update statements collected during event processing. + */ + var batch: Vector[BoundStatement] = Vector.empty + + /** + * Suspends replay after 16 events, triggers a `write` and then continues + * with the next 16 events. This is implements event replay backpressure, + * needed if writing to the database is slower than replaying from the + * `eventLog` (which is usually the case). + */ + override def replayChunkSizeMax: Int = + 16 + + override val onCommand: Receive = { + case _ => + } + + /** + * Prepares an update `batch` from handled events that is written to the + * database when `write` is called. An event handler never writes to the + * database directly. + */ + override val onEvent: Receive = { + case c @ CustomerCreated(cid, first, last, address) => + batch = batch :+ insertCustomerStmt.bind(cid: JLong, first, last, address) + case u @ AddressUpdated(cid, address) => + batch = batch :+ updateCustomerStmt.bind(address, cid: JLong) + } + + /** + * Asynchronously writes the prepared update `batch` to the database + * together with the sequence number of the last processed event. After + * having submitted the batch, it is cleared so that further events can + * be processed while the write is in progress. + */ + override def write(): Future[Unit] = { + val snr = lastSequenceNr + val res = for { + _ <- Future.sequence(batch.map(stmt => ftr(session.executeAsync(stmt)))) + _ <- session.executeAsync(updateProgressStmt.bind(snr: JLong)) + } yield () + batch = Vector.empty // clear batch + res + } + + /** + * Reads the sequence number of the last update. This method is called only + * once during writer initialization (after start or restart). + */ + override def read(): Future[Long] = { + session.executeAsync("SELECT sequence_nr FROM PROGRESS WHERE id = 0") + .map(rs => if (rs.isExhausted) 0L else rs.one().getLong(0)) + } + + /** + * Handles the `read` result by returning the read value + 1, indicating the + * start position for further reads from the event log. + */ + override def readSuccess(result: Long): Option[Long] = + Some(result + 1L) +} +//# \ No newline at end of file diff --git a/src/test/scala/com/rbmhtechnology/example/querydb/WriterApp.scala b/src/test/scala/com/rbmhtechnology/example/querydb/WriterApp.scala new file mode 100644 index 00000000..1bf47ff9 --- /dev/null +++ b/src/test/scala/com/rbmhtechnology/example/querydb/WriterApp.scala @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2015 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.example.querydb + +import akka.actor._ +import akka.pattern.ask +import akka.util.Timeout + +import com.datastax.driver.core._ +import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLog +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ +import scala.util._ + +object WriterApp extends App { + val config = + s""" + |eventuate.log.leveldb.dir = target/example-querydb + |eventuate.snapshot.filesystem.dir = target/example-querydb + | + |akka.log-dead-letters = 0 + """.stripMargin + + // --------------------------------------------------------------- + // Assumption: Cassandra 2.1 or higher running on localhost:9042 + // --------------------------------------------------------------- + + withQueryDB(drop = false) { session => + val system = ActorSystem("example-querydb", ConfigFactory.parseString(config)) + val log = system.actorOf(LeveldbEventLog.props("example")) + + val emitter = system.actorOf(Props(new Emitter("emitter", log))) + val writer = system.actorOf(Props(new Writer("writer", log, session))) + + import system.dispatcher + + implicit val timeout = Timeout(5.seconds) + + emitter ! CreateCustomer("Martin", "Krasser", "Somewhere 1") + emitter ? CreateCustomer("Volker", "Stampa", "Somewhere 2") onComplete { + case Success(CustomerCreated(cid, _, _, _)) => emitter ! UpdateAddress(cid, s"Somewhere ${Random.nextInt(10)}") + case Failure(e) => e.printStackTrace() + } + + Thread.sleep(3000) + system.terminate() + } + + + def createQueryDB(drop: Boolean): Session = { + val cluster = Cluster.builder().addContactPoint("localhost").build() + val session = cluster.connect() + + if (drop) { + session.execute("DROP KEYSPACE IF EXISTS QUERYDB") + } + + session.execute("CREATE KEYSPACE IF NOT EXISTS QUERYDB WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }") + session.execute("USE QUERYDB") + + session.execute("CREATE TABLE IF NOT EXISTS CUSTOMER (id bigint, first text, last text, address text, PRIMARY KEY (id))") + session.execute("CREATE TABLE IF NOT EXISTS PROGRESS (id bigint, sequence_nr bigint, PRIMARY KEY (id))") + session.execute("INSERT INTO PROGRESS (id, sequence_nr) VALUES(0, 0) IF NOT EXISTS") + + session + } + + def dropQueryDB(session: Session): Unit = { + session.close() + session.getCluster.close() + } + + def withQueryDB[A](drop: Boolean = true)(f: Session => A): A = { + val session = createQueryDB(drop) + try f(session) finally dropQueryDB(session) + } +} +