Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Event storage plugin API
Browse files Browse the repository at this point in the history
- LevelDB implementation
- Cassandra implementation
- Batching layer refactorings

- closes #95
  • Loading branch information
krasserm committed Nov 30, 2015
1 parent c3d80e0 commit ed65959
Show file tree
Hide file tree
Showing 39 changed files with 1,495 additions and 1,392 deletions.
1 change: 1 addition & 0 deletions project/Formatting.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.rbmhtechnology.eventuate

import sbt._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb

import org.scalatest._

import scala.collection.immutable.Seq
import scala.util._

object EventsourcedProcessorIntegrationSpec {
Expand Down
4 changes: 2 additions & 2 deletions src/it/scala/com/rbmhtechnology/eventuate/RecoverySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import akka.testkit.TestProbe
import akka.util.Timeout

import com.rbmhtechnology.eventuate.log._
import com.rbmhtechnology.eventuate.log.leveldb.LeveldbSettings
import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLogSettings
import com.rbmhtechnology.eventuate.utilities._

import org.apache.commons.io.FileUtils
Expand Down Expand Up @@ -57,7 +57,7 @@ object RecoverySpec {
""".stripMargin

def rootDirectory(target: ReplicationTarget): File =
new File(new LeveldbSettings(target.endpoint.system).rootDir)
new File(new LeveldbEventLogSettings(target.endpoint.system.settings.config).rootDir)

def logDirectory(target: ReplicationTarget): File = {
implicit val timeout = Timeout(3.seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import akka.testkit.TestProbe

import com.rbmhtechnology.eventuate._
import com.rbmhtechnology.eventuate.crdt.CRDTService.ValueUpdated
import com.rbmhtechnology.eventuate.log.EventLog.Clock
import com.rbmhtechnology.eventuate.log._
import com.rbmhtechnology.eventuate.log.cassandra._
import com.rbmhtechnology.eventuate.log.leveldb._

import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import org.iq80.leveldb.WriteBatch
import org.scalatest._

import scala.collection.immutable.Seq
Expand Down Expand Up @@ -128,25 +128,25 @@ class CRDTChaosSpecLeveldb extends CRDTChaosSpec with EventLogCleanupLeveldb {
import CRDTChaosSpec._

class TestEventLog(id: String) extends LeveldbEventLog(id, "log-test") {
private[eventuate] override def write(events: Seq[DurableEvent], tracker: TimeTracker, batch: WriteBatch): TimeTracker =
if (events.map(_.payload).contains(ValueUpdated(crdtId, AddOp(randomNr())))) throw boom else super.write(events, tracker, batch)
override def write(events: Seq[DurableEvent], partition: Long, clock: Clock): Unit =
if (events.map(_.payload).contains(ValueUpdated(crdtId, AddOp(randomNr())))) throw boom else super.write(events, partition, clock)
}

val logFactory: String => Props =
id => logProps(id)

def logProps(logId: String, batching: Boolean = true): Props = {
val logProps = Props(new TestEventLog(logId)).withDispatcher("eventuate.log.leveldb.write-dispatcher")
if (batching) Props(new BatchingEventLog(logProps)) else logProps
if (batching) Props(new BatchingLayer(logProps)) else logProps
}
}

class CRDTChaosSpecCassandra extends CRDTChaosSpec with EventLogCleanupCassandra {
import CRDTChaosSpec._

class TestEventLog(id: String) extends CassandraEventLog(id) {
private[eventuate] override def write(partition: Long, events: Seq[DurableEvent], tracker: TimeTracker): TimeTracker =
if (events.map(_.payload).contains(ValueUpdated(crdtId, AddOp(randomNr())))) throw boom else super.write(partition, events, tracker)
override def write(events: Seq[DurableEvent], partition: Long, clock: Clock): Unit =
if (events.map(_.payload).contains(ValueUpdated(crdtId, AddOp(randomNr())))) throw boom else super.write(events, partition, clock)
}

override val logFactory: String => Props =
Expand All @@ -165,6 +165,6 @@ class CRDTChaosSpecCassandra extends CRDTChaosSpec with EventLogCleanupCassandr

def logProps(logId: String, batching: Boolean = true): Props = {
val logProps = Props(new TestEventLog(logId)).withDispatcher("eventuate.log.cassandra.write-dispatcher")
if (batching) Props(new BatchingEventLog(logProps)) else logProps
if (batching) Props(new BatchingLayer(logProps)) else logProps
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,21 @@ package com.rbmhtechnology.eventuate.log
import java.io.{Closeable, File}

import akka.actor._
import akka.pattern.ask
import akka.testkit.{TestProbe, TestKit}
import akka.util.Timeout

import com.rbmhtechnology.eventuate._
import com.rbmhtechnology.eventuate.ReplicationProtocol._
import com.rbmhtechnology.eventuate.log.EventLog._
import com.rbmhtechnology.eventuate.log.cassandra._
import com.rbmhtechnology.eventuate.log.cassandra.CassandraIndex._
import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLog
import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLog.ReadResult
import com.rbmhtechnology.eventuate.log.leveldb._
import com.typesafe.config.Config

import org.apache.commons.io.FileUtils
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import org.iq80.leveldb.WriteBatch
import org.scalatest._

import scala.collection.immutable.Seq
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util._

trait EventLogCleanupLeveldb extends Suite with BeforeAndAfterAll {
def config: Config
Expand Down Expand Up @@ -96,25 +90,22 @@ trait EventLogLifecycleLeveldb extends EventLogCleanupLeveldb with BeforeAndAfte
object EventLogLifecycleLeveldb {
object TestEventLog {
def props(logId: String, batching: Boolean): Props = {
val logProps = Props(new EventLogLifecycleLeveldb.TestEventLog(logId)).withDispatcher("eventuate.log.leveldb.write-dispatcher")
if (batching) Props(new BatchingEventLog(logProps)) else logProps
val logProps = Props(new TestEventLog(logId)).withDispatcher("eventuate.log.leveldb.write-dispatcher")
if (batching) Props(new BatchingLayer(logProps)) else logProps
}
}

class TestEventLog(id: String) extends LeveldbEventLog(id, "log-test") {
override def currentSystemTime: Long =
0L
override def currentSystemTime: Long = 0L

private[eventuate] override def replayer(requestor: ActorRef, iterator: => Iterator[DurableEvent] with Closeable, from: Long): ActorRef =
if (from == -1L) super.replayer(requestor, EventLogLifecycle.failingEventIterator, from) else super.replayer(requestor, iterator, from)
override def eventIterator(parameters: LeveldbEventIteratorParameters): Iterator[DurableEvent] with Closeable =
if (parameters.fromSequenceNr == -1L) EventLogLifecycle.failingEventIterator else super.eventIterator(parameters)

private[eventuate] override def read(from: Long, max: Int, filter: ReplicationFilter, lower: VectorTime): ReadResult =
if (from == -1L) throw boom else super.read(from, max, filter, lower)
override def read(fromSequenceNr: Long, toSequenceNr: Long, max: Int, filter: (DurableEvent) => Boolean): Future[EventLog.ReadResult] =
if (fromSequenceNr == -1L) Future.failed(boom) else super.read(fromSequenceNr, toSequenceNr, max, filter)

private[eventuate] override def write(events: Seq[DurableEvent], tracker: TimeTracker, batch: WriteBatch): TimeTracker = events match {
case es if es.map(_.payload).contains("boom") => throw boom
case _ => super.write(events, tracker, batch)
}
override def write(events: Seq[DurableEvent], partition: Long, clock: Clock): Unit =
if (events.map(_.payload).contains("boom")) throw boom else super.write(events, partition, clock)

override def unhandled(message: Any): Unit = message match {
case "boom" =>
Expand Down Expand Up @@ -210,94 +201,64 @@ object EventLogLifecycleCassandra {

def props(logId: String, failureSpec: TestFailureSpec, indexProbe: Option[ActorRef], batching: Boolean): Props = {
val logProps = Props(new TestEventLog(logId, failureSpec, indexProbe)).withDispatcher("eventuate.log.cassandra.write-dispatcher")
if (batching) Props(new BatchingEventLog(logProps)) else logProps
if (batching) Props(new BatchingLayer(logProps)) else logProps
}
}

class TestEventLog(id: String, failureSpec: TestFailureSpec, indexProbe: Option[ActorRef]) extends CassandraEventLog(id) {
import context.dispatcher

private var index: ActorRef = _

override def currentSystemTime: Long =
0L
override def currentSystemTime: Long = 0L

override def unhandled(message: Any): Unit = message match {
case GetReplicationProgresses =>
val sdr = sender()
getReplicationProgressMap(List(EventLogSpec.remoteLogId, "x", "y")) onComplete {
case Success(r) => sdr ! GetReplicationProgressesSuccess(r)
case Failure(e) => sdr ! GetReplicationProgressesFailure(e)
}
case "boom" =>
throw boom
case _ =>
super.unhandled(message)
}

private[eventuate] override def replayer(requestor: ActorRef, iterator: => Iterator[DurableEvent] with Closeable, from: Long): ActorRef =
if (from == -1L) super.replayer(requestor, EventLogLifecycle.failingEventIterator, from) else super.replayer(requestor, iterator, from)

private[eventuate] override def write(partition: Long, events: Seq[DurableEvent], tracker: TimeTracker): TimeTracker = events match {
case es if es.map(_.payload).contains("boom") => throw boom
case _ => super.write(partition, events, tracker)
}

private[eventuate] override def createEventReader(cassandra: Cassandra, logId: String) =
new TestEventReader(cassandra, logId)

private[eventuate] override def createIndex(cassandra: Cassandra, eventReader: CassandraEventReader, logId: String): ActorRef = {
index = context.actorOf(Props(new TestIndex(cassandra, eventReader, logId, failureSpec, indexProbe)))
index
}

private def getReplicationProgressMap(sourceLogIds: Seq[String]): Future[Map[String, Long]] = {
implicit val timeout = Timeout(10.seconds)
override def eventIterator(parameters: CassandraEventIteratorParameters): Iterator[DurableEvent] with Closeable =
if (parameters.fromSequenceNr == -1L) EventLogLifecycle.failingEventIterator else super.eventIterator(parameters)

Future.sequence(sourceLogIds.map(sourceLogId => index.ask(GetReplicationProgress(sourceLogId)).mapTo[GetReplicationProgressSuccess])).map { results =>
results.foldLeft[Map[String, Long]](Map.empty) {
case (acc, GetReplicationProgressSuccess(logId, snr, _)) => if (snr == 0L) acc else acc + (logId -> snr)
}
}
}
}
override def write(events: Seq[DurableEvent], partition: Long, clock: Clock): Unit =
if (events.map(_.payload).contains("boom")) throw boom else super.write(events, partition, clock)

class TestEventReader(cassandra: Cassandra, logId: String) extends CassandraEventReader(cassandra, logId) {
override def read(from: Long, to: Long, max: Int, filter: ReplicationFilter, lower: VectorTime, targetLogId: String): CassandraEventReader.ReadResult =
if (from == -1L) throw boom else super.read(from, to: Long, max, filter, lower, targetLogId)
}

class TestIndex(cassandra: Cassandra, eventReader: CassandraEventReader, logId: String, failureSpec: TestFailureSpec, indexProbe: Option[ActorRef]) extends CassandraIndex(cassandra, eventReader, logId) {
val stream = context.system.eventStream
private[eventuate] override def createEventLogStore(cassandra: Cassandra, logId: String) =
new TestEventLogStore(cassandra, logId)

private[eventuate] override def createIndexStore(cassandra: Cassandra, logId: String) =
new TestIndexStore(cassandra, logId, failureSpec)

override def onIndexEvent(event: Any): Unit =
private[eventuate] override def onIndexEvent(event: Any): Unit =
indexProbe.foreach(_ ! event)
}

class TestEventLogStore(cassandra: Cassandra, logId: String) extends CassandraEventLogStore(cassandra, logId) {
override def read(from: Long, to: Long, max: Int, filter: DurableEvent => Boolean): ReadResult =
if (from == -1L) throw boom else super.read(from, to: Long, max, filter)
}

class TestIndexStore(cassandra: Cassandra, logId: String, failureSpec: TestFailureSpec) extends CassandraIndexStore(cassandra, logId) {
private var writeIndexIncrementFailed = false
private var readSequenceNumberFailed = false

private[eventuate] override def writeAsync(aggregateEvents: AggregateEvents, timeTracker: TimeTracker)(implicit executor: ExecutionContext): Future[TimeTracker] =
private[eventuate] override def writeAsync(aggregateEvents: AggregateEvents, clock: Clock)(implicit executor: ExecutionContext): Future[Clock] =
if (failureSpec.failBeforeIndexIncrementWrite && !writeIndexIncrementFailed) {
writeIndexIncrementFailed = true
Future.failed(boom)
} else if (failureSpec.failAfterIndexIncrementWrite && !writeIndexIncrementFailed) {
writeIndexIncrementFailed = true
for {
_ <- super.writeAsync(aggregateEvents, timeTracker)
_ <- super.writeAsync(aggregateEvents, clock)
r <- Future.failed(boom)
} yield r
} else super.writeAsync(aggregateEvents, timeTracker)
} else super.writeAsync(aggregateEvents, clock)

private[eventuate] override def readTimeTrackerAsync: Future[TimeTracker] =
private[eventuate] override def readClockAsync: Future[Clock] =
if (failureSpec.failOnSequenceNrRead && !readSequenceNumberFailed) {
readSequenceNumberFailed = true
Future.failed(boom)
} else super.readTimeTrackerAsync
} else super.readClockAsync
}
}

Expand Down
Loading

0 comments on commit ed65959

Please sign in to comment.