Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Event-sourced writer and processor
Browse files Browse the repository at this point in the history
- event-sourced writer for implementing persistent views
- event-sourced writer example application
- event-sourced processor as specialization of writer
- event-sourced processor (stateful and stateless)
- event-sourced processor can also emit to source log
- unified event emission rules of event-sourced actor and processor

Futher enhancements

- batching layer for replication writes (from processors and replicators)
- system timestamp only set by event log (and not event-sourced actor)

Other

- example applications moved to subpackes ordermgnt and querydb

- closes #100
- closes #136
  • Loading branch information
krasserm committed Nov 21, 2015
1 parent 90de7eb commit bd09f3c
Show file tree
Hide file tree
Showing 55 changed files with 2,041 additions and 288 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
*.iml
*.env
target/
/.example-classpath
.example-classpath
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ exampleClasspath := {
val fileName = ".example-classpath"
val file = Paths.get(fileName)

Files.write(file, output.getBytes, StandardOpenOption.TRUNCATE_EXISTING)
Files.write(file, output.getBytes, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)
file.toFile.setExecutable(true)
}

Expand Down
4 changes: 2 additions & 2 deletions example-location
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/sh

EXAMPLE_MODE=normal
EXAMPLE_MAIN=com.rbmhtechnology.example.OrderExample
EXAMPLE_MAIN=com.rbmhtechnology.example.ordermgnt.OrderExample

while [[ $# > 1 ]]; do
key="$1"
Expand All @@ -10,7 +10,7 @@ while [[ $# > 1 ]]; do
EXAMPLE_MODE=recover
;;
-j|--java)
EXAMPLE_MAIN=com.rbmhtechnology.example.japi.OrderExample
EXAMPLE_MAIN=com.rbmhtechnology.example.ordermgnt.japi.OrderExample
;;
*)
# unknown option
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/*
* Copyright (C) 2015 Red Bull Media House GmbH <http://www.redbullmediahouse.com> - all rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.rbmhtechnology.eventuate

import akka.actor._
import akka.testkit._

import com.rbmhtechnology.eventuate.log.EventLogLifecycleCassandra
import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb

import org.scalatest._

import scala.collection.immutable.Seq
import scala.util._

object EventsourcedProcessorIntegrationSpec {
class SampleActor(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedActor {
override val onCommand: Receive = {
case s: String => persist(s) {
case Success(_) => onEvent(s)
case Failure(_) =>
}
}

override val onEvent: Receive = {
case s: String => probe ! ((s, lastVectorTimestamp))
}
}

class SampleProcessor(val id: String, val eventLog: ActorRef, val targetEventLog: ActorRef, sharedVectorClockEntry: Boolean, probe: ActorRef) extends EventsourcedProcessor {
override def sharedClockEntry = sharedVectorClockEntry

override val onCommand: Receive = {
case "boom" => throw boom
case "snap" => save("") {
case Success(_) => probe ! "snapped"
case Failure(_) =>
}
}

override val processEvent: Process = {
case s: String if !s.contains("processed") =>
probe ! s
List(s"${s}-processed-1", s"${s}-processed-2")
}

override val onSnapshot: Receive = {
case _ =>
}
}
}

abstract class EventsourcedProcessorIntegrationSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterEach {
import EventsourcedProcessorIntegrationSpec._

def log: ActorRef
def logId: String
def logProps(logId: String): Props

def sourceLog = log
def sourceLogId = logId

var targetLog: ActorRef = _
var targetLogId: String = _

var sourceProbe: TestProbe = _
var targetProbe: TestProbe = _
var processorProbe: TestProbe = _

var a1: ActorRef = _
var a2: ActorRef = _

def init(): Unit = {
targetLogId = s"${logId}_target"
targetLog = system.actorOf(logProps(targetLogId))

sourceProbe = TestProbe()
targetProbe = TestProbe()
processorProbe = TestProbe()

a1 = system.actorOf(Props(new SampleActor("a1", sourceLog, sourceProbe.ref)))
a2 = system.actorOf(Props(new SampleActor("a2", targetLog, targetProbe.ref)))
}

"A stateful EventsourcedProcessor" must {
"write processed events to a target log and recover from scratch" in {
val p = system.actorOf(Props(new SampleProcessor("p", sourceLog, targetLog, sharedVectorClockEntry = true, processorProbe.ref)))

a1 ! "a"
a1 ! "b"
a1 ! "c"

processorProbe.expectMsg("a")
processorProbe.expectMsg("b")
processorProbe.expectMsg("c")

targetProbe.expectMsg(("a-processed-1", VectorTime(sourceLogId -> 1L, targetLogId -> 1L)))
targetProbe.expectMsg(("a-processed-2", VectorTime(sourceLogId -> 1L, targetLogId -> 2L)))
targetProbe.expectMsg(("b-processed-1", VectorTime(sourceLogId -> 2L, targetLogId -> 3L)))
targetProbe.expectMsg(("b-processed-2", VectorTime(sourceLogId -> 2L, targetLogId -> 4L)))
targetProbe.expectMsg(("c-processed-1", VectorTime(sourceLogId -> 3L, targetLogId -> 5L)))
targetProbe.expectMsg(("c-processed-2", VectorTime(sourceLogId -> 3L, targetLogId -> 6L)))

p ! "boom"
a1 ! "d"

processorProbe.expectMsg("d")

targetProbe.expectMsg(("d-processed-1", VectorTime(sourceLogId -> 4L, targetLogId -> 7L)))
targetProbe.expectMsg(("d-processed-2", VectorTime(sourceLogId -> 4L, targetLogId -> 8L)))
}
"write processed events to a target log and recover from snapshot" in {
val p = system.actorOf(Props(new SampleProcessor("p", sourceLog, targetLog, sharedVectorClockEntry = true, processorProbe.ref)))

a1 ! "a"
a1 ! "b"

processorProbe.expectMsg("a")
processorProbe.expectMsg("b")

p ! "snap"

processorProbe.expectMsg("snapped")

a1 ! "c"

processorProbe.expectMsg("c")

targetProbe.expectMsg(("a-processed-1", VectorTime(sourceLogId -> 1L, targetLogId -> 1L)))
targetProbe.expectMsg(("a-processed-2", VectorTime(sourceLogId -> 1L, targetLogId -> 2L)))
targetProbe.expectMsg(("b-processed-1", VectorTime(sourceLogId -> 2L, targetLogId -> 3L)))
targetProbe.expectMsg(("b-processed-2", VectorTime(sourceLogId -> 2L, targetLogId -> 4L)))
targetProbe.expectMsg(("c-processed-1", VectorTime(sourceLogId -> 3L, targetLogId -> 5L)))
targetProbe.expectMsg(("c-processed-2", VectorTime(sourceLogId -> 3L, targetLogId -> 6L)))

p ! "boom"
a1 ! "d"

processorProbe.expectMsg("d")

targetProbe.expectMsg(("d-processed-1", VectorTime(sourceLogId -> 4L, targetLogId -> 7L)))
targetProbe.expectMsg(("d-processed-2", VectorTime(sourceLogId -> 4L, targetLogId -> 8L)))
}
"update event vector timestamps when having set sharedClockEntry to false" in {
val p = system.actorOf(Props(new SampleProcessor("p", sourceLog, targetLog, sharedVectorClockEntry = false, processorProbe.ref)))

a1 ! "a"
a1 ! "b"
a1 ! "c"

processorProbe.expectMsg("a")
processorProbe.expectMsg("b")

targetProbe.expectMsg(("a-processed-1", VectorTime(sourceLogId -> 1L, "p" -> 2L)))
targetProbe.expectMsg(("a-processed-2", VectorTime(sourceLogId -> 1L, "p" -> 3L)))
targetProbe.expectMsg(("b-processed-1", VectorTime(sourceLogId -> 2L, "p" -> 5L)))
targetProbe.expectMsg(("b-processed-2", VectorTime(sourceLogId -> 2L, "p" -> 6L)))
}
"be able to write to the source event log" in {

}
}

"A stateful EventsourcedProcessor" when {
"writing to the source event log" must {
"have its own vector clock entry" in {
a1 ! "a"
a1 ! "b"
a1 ! "c"

sourceProbe.expectMsg(("a", VectorTime(sourceLogId -> 1L)))
sourceProbe.expectMsg(("b", VectorTime(sourceLogId -> 2L)))
sourceProbe.expectMsg(("c", VectorTime(sourceLogId -> 3L)))

val p = system.actorOf(Props(new SampleProcessor("p", sourceLog, sourceLog, sharedVectorClockEntry = false, processorProbe.ref)))

sourceProbe.expectMsg(("a-processed-1", VectorTime(sourceLogId -> 1L, "p" -> 2L)))
sourceProbe.expectMsg(("a-processed-2", VectorTime(sourceLogId -> 1L, "p" -> 3L)))
sourceProbe.expectMsg(("b-processed-1", VectorTime(sourceLogId -> 2L, "p" -> 5L)))
sourceProbe.expectMsg(("b-processed-2", VectorTime(sourceLogId -> 2L, "p" -> 6L)))
sourceProbe.expectMsg(("c-processed-1", VectorTime(sourceLogId -> 3L, "p" -> 8L)))
sourceProbe.expectMsg(("c-processed-2", VectorTime(sourceLogId -> 3L, "p" -> 9L)))

p ! "boom"
a1 ! "d"

sourceProbe.expectMsg(("d", VectorTime(sourceLogId -> 10L, "p" -> 9)))
sourceProbe.expectMsg(("d-processed-1", VectorTime(sourceLogId -> 10L, "p" -> 11L)))
sourceProbe.expectMsg(("d-processed-2", VectorTime(sourceLogId -> 10L, "p" -> 12L)))
}
}
}

"A stateless EventsourcedProcessor" must {
"write processed events to a target log and resume from stored position" in {
val p = system.actorOf(Props(new SampleProcessor("p", sourceLog, targetLog, sharedVectorClockEntry = true, processorProbe.ref) with StatelessProcessor))

a1 ! "a"
a1 ! "b"
a1 ! "c"

processorProbe.expectMsg("a")
processorProbe.expectMsg("b")
processorProbe.expectMsg("c")

targetProbe.expectMsg(("a-processed-1", VectorTime(sourceLogId -> 1L, targetLogId -> 1L)))
targetProbe.expectMsg(("a-processed-2", VectorTime(sourceLogId -> 1L, targetLogId -> 2L)))
targetProbe.expectMsg(("b-processed-1", VectorTime(sourceLogId -> 2L, targetLogId -> 3L)))
targetProbe.expectMsg(("b-processed-2", VectorTime(sourceLogId -> 2L, targetLogId -> 4L)))
targetProbe.expectMsg(("c-processed-1", VectorTime(sourceLogId -> 3L, targetLogId -> 5L)))
targetProbe.expectMsg(("c-processed-2", VectorTime(sourceLogId -> 3L, targetLogId -> 6L)))

p ! "boom"
a1 ! "d"

processorProbe.expectMsg("d")

targetProbe.expectMsg(("d-processed-1", VectorTime(sourceLogId -> 4L, targetLogId -> 7L)))
targetProbe.expectMsg(("d-processed-2", VectorTime(sourceLogId -> 4L, targetLogId -> 8L)))
}
}
}

class EventsourcedProcessorIntegrationSpecLeveldb extends EventsourcedProcessorIntegrationSpec with EventLogLifecycleLeveldb {
override def beforeEach(): Unit = {
super.beforeEach()
init()
}
}

class EventsourcedProcessorIntegrationSpecCassandra extends EventsourcedProcessorIntegrationSpec with EventLogLifecycleCassandra {
override def beforeEach(): Unit = {
super.beforeEach()
init()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ trait EventLogLifecycleCassandra extends EventLogCleanupCassandra with BeforeAnd
def logId: String =
_logCtr.toString

def logProps(logId: String): Props =
logProps(logId, TestFailureSpec(), system.deadLetters)

def logProps(logId: String, failureSpec: TestFailureSpec, indexProbe: ActorRef): Props =
TestEventLog.props(logId, failureSpec, indexProbe, batching)
}
Expand Down
52 changes: 48 additions & 4 deletions src/it/scala/com/rbmhtechnology/eventuate/log/EventLogSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -531,13 +531,57 @@ abstract class EventLogSpec extends TestKit(ActorSystem("test", EventLogSpec.con
log.tell(GetReplicationProgresses, requestorProbe.ref)
requestorProbe.expectMsg(GetReplicationProgressesSuccess(Map(EventLogSpec.remoteLogId -> 19L)))
}
"set an event's system timestamp if that event's processId is not defined" in {
"update an event's system timestamp" in {
log ! Write(List(event("a").copy(systemTimestamp = 3L)), system.deadLetters, requestorProbe.ref, 0)
requestorProbe.expectMsgType[WriteSuccess].event.systemTimestamp should be(0L)
}
"use an event's system timestamp if that event's processId is defined" in {
log ! Write(List(event("a").copy(systemTimestamp = 3L, processId = "foo")), system.deadLetters, requestorProbe.ref, 0)
requestorProbe.expectMsgType[WriteSuccess].event.systemTimestamp should be(3L)
"update an emitted event's process id and vector timestamp during if the process id is not defined" in {
val evt = DurableEvent("a", emitterIdA, processId = UndefinedLogId)
val exp = DurableEvent("a", emitterIdA, processId = logId, vectorTimestamp = VectorTime(logId -> 1L), localLogId = logId, localSequenceNr = 1)
log ! Write(List(evt), system.deadLetters, requestorProbe.ref, 0)
requestorProbe.expectMsgType[WriteSuccess].event should be(exp)
}
"not update an emitted event's process id and vector timestamp during if the process id is defined" in {
val evt = DurableEvent("a", emitterIdA, processId = emitterIdA, vectorTimestamp = VectorTime(emitterIdA -> 1L))
val exp = DurableEvent("a", emitterIdA, processId = emitterIdA, vectorTimestamp = VectorTime(emitterIdA -> 1L), localLogId = logId, localSequenceNr = 1)
log ! Write(List(evt), system.deadLetters, requestorProbe.ref, 0)
requestorProbe.expectMsgType[WriteSuccess].event should be(exp)
}
"update a replicated event's process id and vector timestamp during if the process id is not defined" in {
val evt = DurableEvent("a", emitterIdA, processId = UndefinedLogId, vectorTimestamp = VectorTime(remoteLogId -> 1L))
val exp = DurableEvent("a", emitterIdA, processId = logId, vectorTimestamp = VectorTime(remoteLogId -> 1L, logId -> 1L), localLogId = logId, localSequenceNr = 1)
registerCollaborator(aggregateId = None, collaborator = requestorProbe)
log ! ReplicationWrite(List(evt), remoteLogId, 5, VectorTime())
requestorProbe.expectMsgType[Written].event should be(exp)
}
"not update a replicated event's process id and vector timestamp during if the process id is defined" in {
val evt = DurableEvent("a", emitterIdA, processId = emitterIdA, vectorTimestamp = VectorTime(emitterIdA -> 1L))
val exp = DurableEvent("a", emitterIdA, processId = emitterIdA, vectorTimestamp = VectorTime(emitterIdA -> 1L), localLogId = logId, localSequenceNr = 1)
registerCollaborator(aggregateId = None, collaborator = requestorProbe)
log ! ReplicationWrite(List(evt), remoteLogId, 5, VectorTime())
requestorProbe.expectMsgType[Written].event should be(exp)
}
"not write events to the target log that are in causal past of the target log" in {
val evt1 = DurableEvent("i", emitterIdB, vectorTimestamp = timestamp(0, 7), processId = remoteLogId)
val evt2 = DurableEvent("j", emitterIdB, vectorTimestamp = timestamp(0, 8), processId = remoteLogId)
val evt3 = DurableEvent("k", emitterIdB, vectorTimestamp = timestamp(0, 9), processId = remoteLogId)
registerCollaborator(aggregateId = None, collaborator = requestorProbe)
log ! ReplicationWrite(List(evt1, evt2), remoteLogId, 5, VectorTime())
log ! ReplicationWrite(List(evt2, evt3), remoteLogId, 6, VectorTime())
requestorProbe.expectMsgType[Written].event.payload should be("i")
requestorProbe.expectMsgType[Written].event.payload should be("j")
requestorProbe.expectMsgType[Written].event.payload should be("k")
}
"not read events from the source log that are in causal past of the target log (using the target time from the request)" in {
generateEmittedEvents()
log.tell(ReplicationRead(1, Int.MaxValue, NoFilter, remoteLogId, dl, timestamp(1)), requestorProbe.ref)
requestorProbe.expectMsgType[ReplicationReadSuccess].events.map(_.payload) should be(Seq("b", "c"))
}
"not read events from the source log that are in causal past of the target log (using the target time from the cache)" in {
generateEmittedEvents()
log ! ReplicationWrite(Nil, remoteLogId, 5, timestamp(2)) // update time cache
log.tell(ReplicationRead(1, Int.MaxValue, NoFilter, remoteLogId, dl, timestamp(1)), requestorProbe.ref)
requestorProbe.expectMsgType[ReplicationReadSuccess].events.map(_.payload) should be(Seq("c"))
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ eventuate {
# event handler (= backpressure).
replay.chunk-size-max = 65536

processor {
# Timeout for read operation from target event log.
read-timeout = 10s

# Timeout for write operations to target event log.
write-timeout = 10s
}

log.batching {
# This limit is used by the batching layer to limit the number of events
# to be written atomically to the event log. If the number of events from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ trait ConfirmedDelivery extends EventsourcedActor {
/**
* Internal API.
*/
override private[eventuate] def capturedSnapshot(snapshot: Snapshot): Snapshot = {
_unconfirmed.values.foldLeft(super.capturedSnapshot(snapshot)) {
override private[eventuate] def snapshotCaptured(snapshot: Snapshot): Snapshot = {
_unconfirmed.values.foldLeft(super.snapshotCaptured(snapshot)) {
case (s, da) => s.add(da)
}
}

/**
* Internal API.
*/
override private[eventuate] def loadedSnapshot(snapshot: Snapshot): Unit = {
super.loadedSnapshot(snapshot)
override private[eventuate] def snapshotLoaded(snapshot: Snapshot): Unit = {
super.snapshotLoaded(snapshot)
snapshot.deliveryAttempts.foreach { da =>
_unconfirmed = _unconfirmed + (da.deliveryId -> da)
}
Expand Down
Loading

0 comments on commit bd09f3c

Please sign in to comment.