Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #212 from RBMHTechnology/wip-207-support-string-ma…
Browse files Browse the repository at this point in the history
…nifest

Support custom serializers with string-manifest
  • Loading branch information
krasserm committed Feb 11, 2016
2 parents 5930c24 + 893e150 commit 56cf325
Show file tree
Hide file tree
Showing 16 changed files with 275 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package com.rbmhtechnology.eventuate.serializer

import akka.serialization.SerializationExtension
import akka.testkit.TestProbe

import com.rbmhtechnology.eventuate._
import com.rbmhtechnology.eventuate.crdt._
import com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec.ExamplePayload
import com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec.serializerConfig
import com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec.serializerWithStringManifestConfig

import org.scalatest._

Expand All @@ -40,19 +41,10 @@ class CRDTSerializerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
import DurableEventSerializerSpec.ExamplePayload
import CRDTSerializerSpec._

val config =
"""
|akka.actor.serializers {
| eventuate-test = "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayloadSerializer"
|}
|akka.actor.serialization-bindings {
| "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayload" = eventuate-test
|}
""".stripMargin

val support = new SerializerSpecSupport(
ReplicationConfig.create(2552),
ReplicationConfig.create(2553, config))
ReplicationConfig.create(2553, serializerConfig),
ReplicationConfig.create(2554, serializerWithStringManifestConfig))

override def afterAll(): Unit =
support.shutdown()
Expand All @@ -61,54 +53,38 @@ class CRDTSerializerSpec extends WordSpec with Matchers with BeforeAndAfterAll {

"A CRDTSerializer" must {
"support ORSet serialization with default payload serialization" in {
val probe = new TestProbe(system1)
val serialization = SerializationExtension(system1)
val serialization = SerializationExtension(systems(0))

val initial = orSet(ExamplePayload("foo", "bar"))
val expected = initial

serialization.deserialize(serialization.serialize(initial).get, classOf[ORSet[_]]).get should be(expected)
}
"support ORSet serialization with custom payload serialization" in {
val probe = new TestProbe(system2)
val serialization = SerializationExtension(system2)

"support ORSet serialization with custom payload serialization" in serializations.tail.foreach { serialization =>
val initial = orSet(ExamplePayload("foo", "bar"))
val expected = orSet(ExamplePayload("bar", "foo"))

serialization.deserialize(serialization.serialize(initial).get, classOf[ORSet[_]]).get should be(expected)
}
"support MVRegister serialization with default payload serialization" in {
val probe = new TestProbe(system1)
val serialization = SerializationExtension(system1)

val initial = mvRegister(ExamplePayload("foo", "bar"))
val expected = initial

serialization.deserialize(serialization.serialize(initial).get, classOf[MVRegister[_]]).get should be(expected)
serializations(0).deserialize(serializations(0).serialize(initial).get, classOf[MVRegister[_]]).get should be(expected)
}
"support MVRegister serialization with custom payload serialization" in {
val probe = new TestProbe(system2)
val serialization = SerializationExtension(system2)

"support MVRegister serialization with custom payload serialization" in serializations.tail.foreach { serialization =>
val initial = mvRegister(ExamplePayload("foo", "bar"))
val expected = mvRegister(ExamplePayload("bar", "foo"))

serialization.deserialize(serialization.serialize(initial).get, classOf[MVRegister[_]]).get should be(expected)
}
"support LWWRegister serialization with default payload serialization" in {
val probe = new TestProbe(system1)
val serialization = SerializationExtension(system1)

val initial = lwwRegister(ExamplePayload("foo", "bar"))
val expected = initial

serialization.deserialize(serialization.serialize(initial).get, classOf[LWWRegister[_]]).get should be(expected)
serializations(0).deserialize(serializations(0).serialize(initial).get, classOf[LWWRegister[_]]).get should be(expected)
}
"support LWWRegister serialization with custom payload serialization" in {
val probe = new TestProbe(system2)
val serialization = SerializationExtension(system2)

"support LWWRegister serialization with custom payload serialization" in serializations.tail.foreach { serialization =>
val initial = lwwRegister(ExamplePayload("foo", "bar"))
val expected = lwwRegister(ExamplePayload("bar", "foo"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.rbmhtechnology.eventuate.serializer

import akka.actor._
import akka.serialization.{SerializationExtension, Serializer}
import akka.serialization.SerializerWithStringManifest
import akka.serialization.Serializer

import com.rbmhtechnology.eventuate._

Expand All @@ -26,23 +27,59 @@ import org.scalatest._
object DurableEventSerializerSpec {
case class ExamplePayload(foo: String, bar: String)

val serializerConfig =
"""
|akka.actor.serializers {
| eventuate-test = "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayloadSerializer"
|}
|akka.actor.serialization-bindings {
| "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayload" = eventuate-test
|}
""".stripMargin

val serializerWithStringManifestConfig =
"""
|akka.actor.serializers {
| eventuate-test = "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayloadSerializerWithStringManifest"
|}
|akka.actor.serialization-bindings {
| "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayload" = eventuate-test
|}
""".stripMargin
/**
* Swaps `foo` and `bar` of `ExamplePayload`.
*/
class ExamplePayloadSerializer(system: ExtendedActorSystem) extends Serializer {
trait SwappingExamplePayloadSerializer {
def toBinary(o: AnyRef): Array[Byte] = o match {
case ExamplePayload(foo, bar) => s"${foo}-${bar}".getBytes("UTF-8")
}

def _fromBinary(bytes: Array[Byte]): AnyRef = {
val s = new String(bytes, "UTF-8").split("-")
ExamplePayload(s(1), s(0))
}
}

class ExamplePayloadSerializer(system: ExtendedActorSystem) extends Serializer with SwappingExamplePayloadSerializer {
val ExamplePayloadClass = classOf[ExamplePayload]

override def identifier: Int = 44085
override def includeManifest: Boolean = true

override def toBinary(o: AnyRef): Array[Byte] = o match {
case ExamplePayload(foo, bar) => s"${foo}-${bar}".getBytes("UTF-8")
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest.get match {
case ExamplePayloadClass => _fromBinary(bytes)
}
}

override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest.get match {
case ExamplePayloadClass =>
val s = new String(bytes, "UTF-8").split("-")
ExamplePayload(s(1), s(0))
class ExamplePayloadSerializerWithStringManifest(system: ExtendedActorSystem) extends SerializerWithStringManifest with SwappingExamplePayloadSerializer {
val StringManifest = "manifest"

override def identifier: Int = 44084

override def manifest(o: AnyRef) = StringManifest

override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case StringManifest => _fromBinary(bytes)
}
}

Expand All @@ -62,19 +99,10 @@ object DurableEventSerializerSpec {
class DurableEventSerializerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
import DurableEventSerializerSpec._

val config =
"""
|akka.actor.serializers {
| eventuate-test = "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayloadSerializer"
|}
|akka.actor.serialization-bindings {
| "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayload" = eventuate-test
|}
""".stripMargin

val support = new SerializerSpecSupport(
ReplicationConfig.create(2552),
ReplicationConfig.create(2553, config))
ReplicationConfig.create(2553, serializerConfig),
ReplicationConfig.create(2554, serializerWithStringManifestConfig))

override def afterAll(): Unit =
support.shutdown()
Expand All @@ -83,13 +111,11 @@ class DurableEventSerializerSpec extends WordSpec with Matchers with BeforeAndAf

"A DurableEventSerializer" must {
"support default payload serialization" in {
val serialization = SerializationExtension(system1)
val expected = event

serialization.deserialize(serialization.serialize(event).get, classOf[DurableEvent]).get should be(expected)
serializations(0).deserialize(serializations(0).serialize(event).get, classOf[DurableEvent]).get should be(expected)
}
"support custom payload serialization" in {
val serialization = SerializationExtension(system2)
"support custom payload serialization" in serializations.tail.foreach { serialization =>
val expected = event.copy(ExamplePayload("bar", "foo"))

serialization.deserialize(serialization.serialize(event).get, classOf[DurableEvent]).get should be(expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@

package com.rbmhtechnology.eventuate.serializer

import akka.actor.Props
import akka.actor._
import akka.serialization.Serializer
import akka.serialization.SerializerWithStringManifest
import akka.testkit.TestProbe

import com.rbmhtechnology.eventuate.ReplicationFilter.AndFilter
import com.rbmhtechnology.eventuate.ReplicationFilter.NoFilter
import com.rbmhtechnology.eventuate.ReplicationFilter.OrFilter

import com.rbmhtechnology.eventuate._
import com.rbmhtechnology.eventuate.serializer.SerializerSpecSupport.ReceiverActor
import com.rbmhtechnology.eventuate.serializer.SerializerSpecSupport.SenderActor

import org.scalatest._

Expand All @@ -36,21 +41,59 @@ object ReplicationFilterSerializerSpec {
def apply(event: DurableEvent): Boolean = num == 1
}

val serializerConfig =
"""
|akka.actor.serializers {
| eventuate-test = "com.rbmhtechnology.eventuate.serializer.ReplicationFilterSerializerSpec$ExampleFilterSerializer"
|}
|akka.actor.serialization-bindings {
| "com.rbmhtechnology.eventuate.serializer.ReplicationFilterSerializerSpec$ExampleFilter" = eventuate-test
|}
""".stripMargin

val serializerWithStringManifestConfig =
"""
|akka.actor.serializers {
| eventuate-test = "com.rbmhtechnology.eventuate.serializer.ReplicationFilterSerializerSpec$ExampleFilterSerializerWithStringManifest"
|}
|akka.actor.serialization-bindings {
| "com.rbmhtechnology.eventuate.serializer.ReplicationFilterSerializerSpec$ExampleFilter" = eventuate-test
|}
""".stripMargin

/**
* Increments `ExampleFilter.num` by 1 during deserialization.
*/
class ExampleFilterSerializer(system: ExtendedActorSystem) extends Serializer {
trait IncrementingExampleFilterSerializer {
def toBinary(o: AnyRef): Array[Byte] = o match {
case ExampleFilter(num) => num.toString.getBytes("UTF-8")
}

def _fromBinary(bytes: Array[Byte]): AnyRef =
ExampleFilter(new String(bytes, "UTF-8").toInt + 1)
}

class ExampleFilterSerializer(system: ExtendedActorSystem) extends Serializer with IncrementingExampleFilterSerializer {
val ExampleFilterClass = classOf[ExampleFilter]

override def identifier: Int = 44086
override def includeManifest: Boolean = true

override def toBinary(o: AnyRef): Array[Byte] = o match {
case ExampleFilter(num) => num.toString.getBytes("UTF-8")
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest.get match {
case ExampleFilterClass => _fromBinary(bytes)
}
}

override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest.get match {
case ExampleFilterClass => ExampleFilter(new String(bytes, "UTF-8").toInt + 1)
val StringManifest = "manifest"

class ExampleFilterSerializerWithStringManifest(system: ExtendedActorSystem) extends SerializerWithStringManifest with IncrementingExampleFilterSerializer {

override def identifier: Int = 44087

override def manifest(o: AnyRef) = StringManifest

override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case StringManifest => _fromBinary(bytes)
}
}

Expand Down Expand Up @@ -80,44 +123,39 @@ object ReplicationFilterSerializerSpec {
class ReplicationFilterSerializerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
import ReplicationFilterSerializerSpec._

val config =
"""
|akka.actor.serializers {
| eventuate-test = "com.rbmhtechnology.eventuate.serializer.ReplicationFilterSerializerSpec$ExampleFilterSerializer"
|}
|akka.actor.serialization-bindings {
| "com.rbmhtechnology.eventuate.serializer.ReplicationFilterSerializerSpec$ExampleFilter" = eventuate-test
|}
""".stripMargin

val support = new SerializerSpecSupport(
ReplicationConfig.create(2552),
ReplicationConfig.create(2553, config))
ReplicationConfig.create(2553, serializerConfig),
ReplicationConfig.create(2554, serializerWithStringManifestConfig))

override def afterAll(): Unit =
support.shutdown()

import support._

val receiverProbe = new TestProbe(systems(1))
val receiverActor = systems(1).actorOf(Props(new ReceiverActor(receiverProbe.ref)), "receiver")
val senderActor = systems(0).actorOf(Props(new SenderActor(systems(0).actorSelection("akka.tcp://[email protected]:2553/user/receiver"))))

"A ReplicationFilterSerializer" must {
"serialize replication filter trees with an and-filter root" in {
serialization1.deserialize(serialization1.serialize(filter1()).get, classOf[AndFilter]).get should be(filter1())
serializations(0).deserialize(serializations(0).serialize(filter1()).get, classOf[AndFilter]).get should be(filter1())
}
"serialize replication filter trees with an or-filter root" in {
serialization1.deserialize(serialization1.serialize(filter2()).get, classOf[OrFilter]).get should be(filter2())
serializations(0).deserialize(serializations(0).serialize(filter2()).get, classOf[OrFilter]).get should be(filter2())
}
"serialize NoFilter" in {
serialization1.deserialize(serialization1.serialize(NoFilter).get, NoFilter.getClass).get should be(NoFilter)
serializations(0).deserialize(serializations(0).serialize(NoFilter).get, NoFilter.getClass).get should be(NoFilter)
}
"serialize custom filters" in {
serialization1.deserialize(serialization1.serialize(filter3).get, classOf[ProcessIdFilter]).get should be(filter3)
serialization2.deserialize(serialization2.serialize(ExampleFilter(1)).get, classOf[ExampleFilter]).get should be(ExampleFilter(2))
serializations(0).deserialize(serializations(0).serialize(filter3).get, classOf[ProcessIdFilter]).get should be(filter3)
serializations(1).deserialize(serializations(1).serialize(ExampleFilter(1)).get, classOf[ExampleFilter]).get should be(ExampleFilter(2))
}
"serialize replication filter trees with an and-filter root and a custom leaf" in {
serialization2.deserialize(serialization2.serialize(filter1(ExampleFilter(1))).get, classOf[AndFilter]).get should be(filter1(ExampleFilter(2)))
"serialize replication filter trees with an and-filter root and a custom filter serialization" in serializations.tail.foreach { serialization =>
serialization.deserialize(serialization.serialize(filter1(ExampleFilter(1))).get, classOf[AndFilter]).get should be(filter1(ExampleFilter(2)))
}
"serialize replication filter trees with an or-filter root and a custom leaf" in {
serialization2.deserialize(serialization2.serialize(filter2(ExampleFilter(1))).get, classOf[OrFilter]).get should be(filter2(ExampleFilter(2)))
"serialize replication filter trees with an or-filter root and a custom filter serialization" in serializations.tail.foreach { serialization =>
serialization.deserialize(serialization.serialize(filter2(ExampleFilter(1))).get, classOf[OrFilter]).get should be(filter2(ExampleFilter(2)))
}
"support remoting of replication filter trees with an and-filter root" in {
senderActor ! filter1()
Expand Down
Loading

0 comments on commit 56cf325

Please sign in to comment.