Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #210 from RBMHTechnology/wip-209-poe-execution-rej…
Browse files Browse the repository at this point in the history
…ected

Fix rejected execution of PersistOnEventRequests loaded from a snapshot
  • Loading branch information
krasserm committed Feb 11, 2016
2 parents 464ab8a + 0a8fab3 commit 5930c24
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ trait EventsourcedActor extends EventsourcedView with EventsourcedClock {
messageStash.unstash()
}
}
case PersistOnEventRequest(persistOnEventSequenceNr: Long, parameters, iid) => if (iid == instanceId) {
case PersistOnEventRequest(persistOnEventSequenceNr: Long, invocations, iid) => if (iid == instanceId) {
writeOrDelay {
writeHandlers = Vector.fill(parameters.length)(PersistOnEvent.DefaultHandler)
writeRequests = parameters.map {
writeHandlers = Vector.fill(invocations.length)(PersistOnEvent.DefaultHandler)
writeRequests = invocations.map {
case PersistOnEventInvocation(event, customDestinationAggregateIds) =>
durableEvent(event, customDestinationAggregateIds, Some(persistOnEventSequenceNr))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ trait PersistOnEvent extends EventsourcedActor {
override private[eventuate] def snapshotLoaded(snapshot: Snapshot): Unit = {
super.snapshotLoaded(snapshot)
snapshot.persistOnEventRequests.foreach { pr =>
requests = requests + (pr.persistOnEventSequenceNr -> pr)
requests = requests + (pr.persistOnEventSequenceNr -> pr.copy(instanceId = instanceId))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,51 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike

save.snapshot.persistOnEventRequests(0) should be(expected)
}
"recover from a snapshot with persistOnEvent requests whose execution failed" in {
val actor = recoveredTestActor(stateSync = false)

actor ! Written(event("x", 1L))

val write1 = logProbe.expectMsgClass(classOf[Write])

actor ! "snap"

val save = logProbe.expectMsgClass(classOf[SaveSnapshot])

actor ! "boom"

logProbe.expectMsg(LoadSnapshot(emitterIdA, instanceId + 1))
logProbe.sender() ! LoadSnapshotSuccess(Some(save.snapshot), instanceId + 1)
logProbe.expectMsg(Replay(2L, Some(actor), instanceId + 1))
logProbe.sender() ! ReplaySuccess(Nil, 1L, instanceId + 1)

val write2 = logProbe.expectMsgClass(classOf[Write])
write1.events should be(write2.events)
}
"recover from a snapshot with persistOnEvent requests whose execution succeeded" in {
val actor = recoveredTestActor(stateSync = false)

actor ! Written(event("x", 1L))

val write1 = logProbe.expectMsgClass(classOf[Write])
val written = List(
event(write1.events(0).payload, 2L, Some(1L)),
event(write1.events(1).payload, 3L, Some(1L)))

actor ! "snap"

val save = logProbe.expectMsgClass(classOf[SaveSnapshot])

actor ! "boom"

logProbe.expectMsg(LoadSnapshot(emitterIdA, instanceId + 1))
logProbe.sender() ! LoadSnapshotSuccess(Some(save.snapshot), instanceId + 1)
logProbe.expectMsg(Replay(2L, Some(actor), instanceId + 1))
logProbe.sender() ! ReplaySuccess(written, 3L, instanceId + 1)
logProbe.expectMsg(Replay(4L, None, instanceId + 1))
logProbe.sender() ! ReplaySuccess(Nil, 3L, instanceId + 1)
logProbe.expectNoMsg(timeout)
}
"be tolerant to changing actor paths across incarnations" in {
val actor = unrecoveredTestActor(stateSync = false)
val path = ActorPath.fromString("akka://test/user/invalid")
Expand Down

0 comments on commit 5930c24

Please sign in to comment.