Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #310 from RBMHTechnology/wip-290-vertx-adapter
Browse files Browse the repository at this point in the history
Vert.x Adapter
  • Loading branch information
krasserm authored Oct 7, 2016
2 parents 8d35ec3 + 0bb2786 commit 97a8ab7
Show file tree
Hide file tree
Showing 54 changed files with 4,355 additions and 15 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ script:
- .travis/test-leveldb.sh ++$TRAVIS_SCALA_VERSION
- .travis/test-crdt.sh ++$TRAVIS_SCALA_VERSION
- .travis/test-spark.sh ++$TRAVIS_SCALA_VERSION
- .travis/test-vertx.sh ++$TRAVIS_SCALA_VERSION
- find $HOME/.sbt -name "*.lock" | xargs rm
- find $HOME/.ivy2 -name "ivydata-*.properties" | xargs rm
after_success:
Expand Down
3 changes: 3 additions & 0 deletions .travis/test-vertx.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/sh

sbt $1 "adapterVertx/it:test"
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
import sbt._import sbt.Keys._import sbtunidoc.Plugin.UnidocKeys._import MultiJvmKeys._import ProjectSettings._import ProjectDependencies._version in ThisBuild := "0.8-SNAPSHOT"organization in ThisBuild := "com.rbmhtechnology"scalaVersion in ThisBuild := "2.11.7"lazy val root = (project in file(".")) .aggregate(core, crdt, logCassandra, logLeveldb, adapterSpark, examples, exampleSpark) .dependsOn(core, logCassandra, logLeveldb) .settings(name := "eventuate") .settings(commonSettings: _*) .settings(documentationSettings: _*) .settings(unidocProjectFilter in (ScalaUnidoc, unidoc) := inAnyProject -- inProjects(examples)) .settings(libraryDependencies ++= Seq(AkkaRemote)) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val core = (project in file("eventuate-core")) .settings(name := "eventuate-core") .settings(commonSettings: _*) .settings(protocSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CommonsIo, Java8Compat, Scalaz)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Javaslang % "test", JunitInterface % "test", Scalatest % "test,it")) .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val logCassandra = (project in file("eventuate-log-cassandra")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-log-cassandra") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraDriver)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Log4jApi % "test,it", Log4jCore % "test,it", Log4jSlf4j % "test,it", Scalatest % "test,it", Sigar % "test,it")) .settings(libraryDependencies ++= Seq(CassandraUnit % "test,it" excludeAll ExclusionRule(organization = "ch.qos.logback"))) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4712") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val logLeveldb = (project in file("eventuate-log-leveldb")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-log-leveldb") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, Leveldb)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Scalatest % "test,it")) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4713") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val adapterSpark = (project in file("eventuate-adapter-spark")) .dependsOn(logCassandra % "compile->compile;it->it;multi-jvm->multi-jvm") .dependsOn(logLeveldb % "compile->compile;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-adapter-spark") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraClientUtil, CassandraConnector, SparkCore % "provided" exclude("org.slf4j", "slf4j-log4j12"), SparkSql % "provided" exclude("org.slf4j", "slf4j-log4j12"), SparkStreaming % "provided" exclude("org.slf4j", "slf4j-log4j12"))) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Scalatest % "test,it", Sigar % "test,it")) .settings(libraryDependencies ++= Seq(CassandraUnit % "test,it" excludeAll ExclusionRule(organization = "ch.qos.logback"))) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4714") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val crdt = (project in file("eventuate-crdt")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .dependsOn(logLeveldb % "test;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-crdt") .settings(commonSettings: _*) .settings(protocSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Scalatest % "test,it")) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4715") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val examples = (project in file("eventuate-examples")) .dependsOn(core, logLeveldb) .settings(name := "eventuate-examples") .settings(commonSettings: _*) .settings(exampleSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraDriver, Javaslang, Log4jApi, Log4jCore, Log4jSlf4j)) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val exampleSpark = (project in file("eventuate-example-spark")) .dependsOn(core, logCassandra, adapterSpark) .settings(name := "eventuate-example-spark") .settings(commonSettings: _*) .settings(exampleSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraDriver, Log4jApi, Log4jCore, Log4jSlf4j, SparkCore exclude("org.slf4j", "slf4j-log4j12"), SparkSql exclude("org.slf4j", "slf4j-log4j12"), SparkStreaming exclude("org.slf4j", "slf4j-log4j12"))) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)
import sbt._import sbt.Keys._import sbtunidoc.Plugin.UnidocKeys._import MultiJvmKeys._import ProjectSettings._import ProjectDependencies._version in ThisBuild := "0.8-SNAPSHOT"organization in ThisBuild := "com.rbmhtechnology"scalaVersion in ThisBuild := "2.11.7"lazy val root = (project in file(".")) .aggregate(core, crdt, logCassandra, logLeveldb, adapterSpark, adapterVertx, examples, exampleSpark, exampleVertx) .dependsOn(core, logCassandra, logLeveldb) .settings(name := "eventuate") .settings(commonSettings: _*) .settings(documentationSettings: _*) .settings(unidocProjectFilter in (ScalaUnidoc, unidoc) := inAnyProject -- inProjects(examples)) .settings(libraryDependencies ++= Seq(AkkaRemote)) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val core = (project in file("eventuate-core")) .settings(name := "eventuate-core") .settings(commonSettings: _*) .settings(protocSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CommonsIo, Java8Compat, Scalaz)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Javaslang % "test", JunitInterface % "test", Scalatest % "test,it")) .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val logCassandra = (project in file("eventuate-log-cassandra")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-log-cassandra") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraDriver)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Log4jApi % "test,it", Log4jCore % "test,it", Log4jSlf4j % "test,it", Scalatest % "test,it", Sigar % "test,it")) .settings(libraryDependencies ++= Seq(CassandraUnit % "test,it" excludeAll ExclusionRule(organization = "ch.qos.logback"))) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4712") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val logLeveldb = (project in file("eventuate-log-leveldb")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-log-leveldb") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, Leveldb)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Scalatest % "test,it")) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4713") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val adapterSpark = (project in file("eventuate-adapter-spark")) .dependsOn(logCassandra % "compile->compile;it->it;multi-jvm->multi-jvm") .dependsOn(logLeveldb % "compile->compile;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-adapter-spark") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraClientUtil, CassandraConnector, SparkCore % "provided" exclude("org.slf4j", "slf4j-log4j12"), SparkSql % "provided" exclude("org.slf4j", "slf4j-log4j12"), SparkStreaming % "provided" exclude("org.slf4j", "slf4j-log4j12"))) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Scalatest % "test,it", Sigar % "test,it")) .settings(libraryDependencies ++= Seq(CassandraUnit % "test,it" excludeAll ExclusionRule(organization = "ch.qos.logback"))) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4714") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val adapterVertx = (project in file("eventuate-adapter-vertx")) .dependsOn(core % "compile->compile;it->it") .dependsOn(logLeveldb % "it->it") .settings(name := "eventuate-adapter-vertx") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, VertxCore % "provided", VertxRxJava % "provided")) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", Scalatest % "test,it")) .configs(IntegrationTest) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val crdt = (project in file("eventuate-crdt")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .dependsOn(logLeveldb % "test;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-crdt") .settings(commonSettings: _*) .settings(protocSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Scalatest % "test,it")) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4715") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val examples = (project in file("eventuate-examples")) .dependsOn(core, logLeveldb) .settings(name := "eventuate-examples") .settings(commonSettings: _*) .settings(exampleSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraDriver, Javaslang, Log4jApi, Log4jCore, Log4jSlf4j)) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val exampleSpark = (project in file("eventuate-example-spark")) .dependsOn(core, logCassandra, adapterSpark) .settings(name := "eventuate-example-spark") .settings(commonSettings: _*) .settings(exampleSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraDriver, Log4jApi, Log4jCore, Log4jSlf4j, SparkCore exclude("org.slf4j", "slf4j-log4j12"), SparkSql exclude("org.slf4j", "slf4j-log4j12"), SparkStreaming exclude("org.slf4j", "slf4j-log4j12"))) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val exampleVertx = (project in file("eventuate-example-vertx")) .dependsOn(core, logLeveldb, adapterVertx) .settings(name := "eventuate-example-vertx") .settings(commonSettings: _*) .settings(exampleSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, Leveldb, Javaslang, Log4jApi, Log4jCore, Log4jSlf4j, ExampleVertxCore, ExampleVertxRxJava)) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)
Expand Down
5 changes: 5 additions & 0 deletions eventuate-adapter-vertx/src/it/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
akka.loglevel = "ERROR"
akka.test.single-expect-default = 20s

eventuate.log.leveldb.dir = target/test-log
eventuate.snapshot.filesystem.dir = target/test-snapshot
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2015 - 2016 Red Bull Media House GmbH <http://www.redbullmediahouse.com> - all rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.rbmhtechnology.eventuate.adapter.vertx

import akka.pattern.ask
import akka.testkit.{TestKit, TestProbe}
import akka.util.Timeout
import com.rbmhtechnology.eventuate.adapter.vertx.api.StorageProvider
import org.scalatest.{BeforeAndAfterEach, Suite}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

trait ActorStorage extends BeforeAndAfterEach {
this: TestKit with Suite =>

var storageProbe: TestProbe = _

override def beforeEach(): Unit = {
super.beforeEach()

storageProbe = TestProbe()
}

def actorStorageProvider(): StorageProvider = new StorageProvider {
implicit val timeout = Timeout(20.seconds)

override def readProgress(logName: String)(implicit executionContext: ExecutionContext): Future[Long] =
storageProbe.ref.ask(read(logName)).mapTo[Long]

override def writeProgress(logName: String, sequenceNr: Long)(implicit executionContext: ExecutionContext): Future[Long] =
storageProbe.ref.ask(write(logName)(sequenceNr)).mapTo[Long]
}

def read(logName: String): String =
s"read[$logName]"

def write(logName: String)(progress: Long): String =
s"write[$logName]-$progress"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright 2015 - 2016 Red Bull Media House GmbH <http://www.redbullmediahouse.com> - all rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.rbmhtechnology.eventuate.adapter.vertx

import akka.actor.ActorSystem
import akka.serialization.{Serializer, SerializerWithStringManifest}
import akka.testkit.TestKit
import com.typesafe.config.ConfigFactory
import io.vertx.core.buffer.Buffer
import io.vertx.core.eventbus.MessageCodec
import org.scalatest.{BeforeAndAfterEach, MustMatchers, WordSpecLike}

object AkkaSerializationMessageCodecSpec {

implicit class MessageCodecExtension[A, B](c: MessageCodec[A, B]) {
def encodeAndDecode(o: A, b: Buffer = Buffer.buffer(), pos: Int = 0): B = {
c.encodeToWire(b, o)
c.decodeFromWire(pos, b)
}
}

case class TypeWithDefaultSerialization(payload: String)

class TypeWithCustomSerializer(val payload: String) {
def canEqual(other: Any): Boolean = other.isInstanceOf[TypeWithCustomSerializer]

override def equals(other: Any): Boolean = other match {
case that: TypeWithCustomSerializer =>
(that canEqual this) &&
payload == that.payload
case _ => false
}
}

class TypeWithCustomStringManifestSerializer(val payload: String) {
def canEqual(other: Any): Boolean = other.isInstanceOf[TypeWithCustomStringManifestSerializer]

override def equals(other: Any): Boolean = other match {
case that: TypeWithCustomStringManifestSerializer =>
(that canEqual this) &&
payload == that.payload
case _ => false
}
}

class CustomSerializer extends Serializer {
override def identifier: Int = 11223344

override def includeManifest: Boolean =
true

override def toBinary(o: AnyRef): Array[Byte] = o match {
case c: TypeWithCustomSerializer => c.payload.getBytes
case _ => Array.empty
}

override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
case Some(c) if c == classOf[TypeWithCustomSerializer] => new TypeWithCustomSerializer(new String(bytes))
case _ => "invalid type "
}
}

class CustomStringManifestSerializer extends SerializerWithStringManifest {
override def identifier: Int = 22334455

override def manifest(o: AnyRef): String = o match {
case c: TypeWithCustomStringManifestSerializer => "custom-type"
case _ => "unknown"
}

override def toBinary(o: AnyRef): Array[Byte] = o match {
case c: TypeWithCustomStringManifestSerializer => c.payload.getBytes
case _ => Array.empty
}

override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case "custom-type" => new TypeWithCustomStringManifestSerializer(new String(bytes))
case _ => "invalid type "
}
}

val Config = ConfigFactory.parseString(
"""
| akka.actor {
| serializers {
| custom-simple = "com.rbmhtechnology.eventuate.adapter.vertx.AkkaSerializationMessageCodecSpec$CustomSerializer"
| custom-stringManifest = "com.rbmhtechnology.eventuate.adapter.vertx.AkkaSerializationMessageCodecSpec$CustomStringManifestSerializer"
| }
| serialization-bindings {
| "com.rbmhtechnology.eventuate.adapter.vertx.AkkaSerializationMessageCodecSpec$TypeWithCustomSerializer" = custom-simple
| "com.rbmhtechnology.eventuate.adapter.vertx.AkkaSerializationMessageCodecSpec$TypeWithCustomStringManifestSerializer" = custom-stringManifest
| }
| }
""".stripMargin)
}

class AkkaSerializationMessageCodecSpec extends TestKit(ActorSystem("test", AkkaSerializationMessageCodecSpec.Config))
with WordSpecLike with MustMatchers with BeforeAndAfterEach with StopSystemAfterAll {

import AkkaSerializationMessageCodecSpec._

var codec: MessageCodec[AnyRef, AnyRef] = _

override def beforeEach(): Unit = {
codec = AkkaSerializationMessageCodec("test-codec")
}

"An AkkaSerializationMessageCodec" must {
"encode and decode a simple data type" in {
codec.encodeAndDecode("test-value") mustBe "test-value"
}
"encode and decode a data type supported by Akka default serialization" in {
val o = TypeWithDefaultSerialization("content")

codec.encodeAndDecode(o) mustBe o
}
"encode and decode a data type configured with a custom serializer" in {
val o = new TypeWithCustomSerializer("content")

codec.encodeAndDecode(o) mustBe o
}
"encode and decode a data type configured with a custom string-manifest serializer" in {
val o = new TypeWithCustomStringManifestSerializer("content")

codec.encodeAndDecode(o) mustBe o
}
"encode and decode from the correct position in the underlying buffer" in {
val initial = "initial".getBytes()
val b = Buffer.buffer(initial)

codec.encodeAndDecode("test-value", b, initial.length) mustBe "test-value"
}
}
}
Loading

0 comments on commit 97a8ab7

Please sign in to comment.