Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
support retry on recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
kongo2002 committed Nov 4, 2016
1 parent 45cdbf0 commit f5b5358
Show file tree
Hide file tree
Showing 16 changed files with 195 additions and 42 deletions.
3 changes: 0 additions & 3 deletions eventuate-adapter-stream/src/main/resources/reference.conf

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ private class DurableEventSourceSettings(config: Config) {
val readTimeout =
config.getDuration("eventuate.log.read-timeout", TimeUnit.MILLISECONDS).millis

val readRetryDelay =
config.getDuration("eventuate.adapter.stream.read-retry-delay", TimeUnit.MILLISECONDS).millis
val replayRetryDelay =
config.getDuration("eventuate.log.replay-retry-delay", TimeUnit.MILLISECONDS).millis
}

/**
Expand All @@ -56,7 +56,7 @@ object DurableEventSource {
* - `eventuate.log.replay-batch-size`. Maximum number of events to read from the event log when the internal
* buffer (of same size) is empty.
* - `eventuate.log.read-timeout`. Timeout for reading events from the event log.
* - `eventuate.adapter.stream.read-retry-delay`. Delay between a failed read and the next read retry.
* - `eventuate.log.replay-retry-delay`. Delay between a failed read and the next read retry.
*
* @param eventLog source event log.
* @param fromSequenceNr sequence number from where the [[DurableEvent]] stream should start.
Expand Down Expand Up @@ -97,7 +97,7 @@ private class DurableEventSourceActor(eventLog: ActorRef, fromSequenceNr: Long,
} else {
context.become(waiting)
}
case ReplayFailure(cause, _) =>
case ReplayFailure(cause, _, _) =>
schedule = Some(schedulePaused())
context.become(pausing)
log.warning(s"reading from log failed: $cause")
Expand Down Expand Up @@ -154,9 +154,10 @@ private class DurableEventSourceActor(eventLog: ActorRef, fromSequenceNr: Long,
private def read(subscribe: Boolean = false): Unit = {
implicit val timeout = Timeout(settings.readTimeout)
val subscriber = if (subscribe) Some(self) else None
val fromSequenceNr = progress + 1L

eventLog ? Replay(progress + 1L, settings.replayBatchSize, subscriber, aggregateId, 1) recover {
case t => ReplayFailure(t, 1)
eventLog ? Replay(fromSequenceNr, settings.replayBatchSize, subscriber, aggregateId, 1) recover {
case t => ReplayFailure(t, fromSequenceNr, 1)
} pipeTo self
}

Expand All @@ -175,5 +176,5 @@ private class DurableEventSourceActor(eventLog: ActorRef, fromSequenceNr: Long,
}

private def schedulePaused(): Cancellable =
context.system.scheduler.scheduleOnce(settings.readRetryDelay, self, Paused)
context.system.scheduler.scheduleOnce(settings.replayRetryDelay, self, Paused)
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object DurableEventSourceActorSpec {
val ProcessId = "process"
val LogId = "log"

val config = ConfigFactory.parseString("eventuate.adapter.stream.read-retry-delay = 500ms")
val config = ConfigFactory.parseString("eventuate.log.replay-retry-delay = 500ms")

val e1 = durableEvent("a", 1)
val e2 = durableEvent("b", 2)
Expand Down Expand Up @@ -145,7 +145,7 @@ class DurableEventSourceActorSpec extends TestKit(ActorSystem("test", DurableEve
}

def replayFailure(cause: Throwable): Unit = {
val msg = ReplayFailure(cause, 1)
val msg = ReplayFailure(cause, 1L, 1)
log.sender() ! msg
prb.expectMsg(msg)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ trait EventLogSpec extends TestKitBase with EventLogSpecSupport {
}
"reply with a failure message if replay fails" in {
log.tell(Replay(ErrorSequenceNr, None, 0), replyToProbe.ref)
replyToProbe.expectMsg(ReplayFailure(IntegrationTestException, 0))
replyToProbe.expectMsg(ReplayFailure(IntegrationTestException, ErrorSequenceNr, 0))
}
"replication-read local events" in {
generateEmittedEvents()
Expand Down
6 changes: 6 additions & 0 deletions eventuate-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ eventuate {
# resumed automatically after the replayed event batch has been handled
# (= replay backpressure).
replay-batch-size = 4096

# Maximum number of replay attempts before finally stopping the actor itself
replay-retry-max = 10

# Delay between consecutive replay attempts
replay-retry-delay = 10s
}

log.circuit-breaker {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ private class EventsourcedViewSettings(config: Config) {
val replayBatchSize =
config.getInt("eventuate.log.replay-batch-size")

val replayRetryMax =
config.getInt("eventuate.log.replay-retry-max")

val replayRetryDelay =
config.getDuration("eventuate.log.replay-retry-delay", TimeUnit.MILLISECONDS).millis

val readTimeout =
config.getDuration("eventuate.log.read-timeout", TimeUnit.MILLISECONDS).millis

Expand Down Expand Up @@ -101,6 +107,7 @@ trait EventsourcedView extends Actor with Stash {
private var _lastReceivedSequenceNr = 0L

private val settings = new EventsourcedViewSettings(context.system.settings.config)

private var saveRequests: Map[SnapshotMetadata, Handler[SnapshotMetadata]] = Map.empty

private lazy val _commandContext: BehaviorContext = new DefaultBehaviorContext(onCommand)
Expand Down Expand Up @@ -382,14 +389,14 @@ trait EventsourcedView extends Actor with Stash {
val iid = instanceId

eventLog ? Replay(fromSequenceNr, replayBatchSize, sub, aggregateId, instanceId) recover {
case t => ReplayFailure(t, iid)
case t => ReplayFailure(t, fromSequenceNr, iid)
} pipeTo self
}

/**
* Internal API.
*/
private[eventuate] def initiating: Receive = {
private[eventuate] def initiating(replayAttempts: Int): Receive = {
case LoadSnapshotSuccess(Some(snapshot), iid) => if (iid == instanceId) {
val behavior = _snapshotContext.current
if (behavior.isDefinedAt(snapshot.payload)) {
Expand All @@ -415,13 +422,26 @@ trait EventsourcedView extends Actor with Stash {
}
case ReplaySuccess(events, progress, iid) => if (iid == instanceId) {
events.foreach(receiveEvent)
// reset retry attempts
context.become(initiating(settings.replayRetryMax))
replay(progress + 1L)
}
case ReplayFailure(cause, iid) => if (iid == instanceId) {
logger.error(cause, s"replay failed, stopping self")
Try(onRecovery(Failure(cause)))
context.stop(self)
case ReplayFailure(cause, progress, iid) => if (iid == instanceId) {
if (replayAttempts < 1) {
logger.error(cause, "replay failed (maximum number of {} replay attempts reached), stopping self", settings.replayRetryMax)
Try(onRecovery(Failure(cause)))
context.stop(self)
} else {
// retry replay request while decreasing the remaining attempts
val attemptsRemaining = replayAttempts - 1
logger.warning("replay failed [{}] ({} replay attempts remaining), scheduling retry in {}ms",
cause.getMessage, attemptsRemaining, settings.replayRetryDelay.toMillis)
context.become(initiating(attemptsRemaining))
context.system.scheduler.scheduleOnce(settings.replayRetryDelay, self, ReplayRetry(progress))
}
}
case ReplayRetry(progress) =>
replay(progress)
case Terminated(ref) if ref == eventLog =>
context.stop(self)
case other =>
Expand Down Expand Up @@ -454,7 +474,7 @@ trait EventsourcedView extends Actor with Stash {
/**
* Initialization behavior.
*/
final def receive = initiating
final def receive = initiating(settings.replayRetryMax)

/**
* Adds the current command to the user's command stash. Must not be used in the event handler.
Expand Down Expand Up @@ -494,3 +514,4 @@ trait EventsourcedView extends Actor with Stash {
super.postStop()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
package com.rbmhtechnology.eventuate

import akka.actor._
import com.typesafe.config.Config

import scala.concurrent.Future
import scala.util._

private class EventsourcedWriterSettings(config: Config) {
val replayRetryMax =
config.getInt("eventuate.log.replay-retry-max")
}

object EventsourcedWriter {
/**
* Thrown by [[EventsourcedWriter]] to indicate that a read operation from an external database failed.
Expand Down Expand Up @@ -86,6 +92,8 @@ trait EventsourcedWriter[R, W] extends EventsourcedView {
private case class WriteSuccess(result: W, instanceId: Int)
private case class WriteFailure(cause: Throwable, instanceId: Int)

private val settings = new EventsourcedWriterSettings(context.system.settings.config)

private var numPending: Int = 0

/**
Expand Down Expand Up @@ -165,7 +173,7 @@ trait EventsourcedWriter[R, W] extends EventsourcedView {
/**
* Internal API.
*/
override private[eventuate] def initiating: Receive = {
override private[eventuate] def initiating(replayAttempts: Int): Receive = {
case ReadSuccess(r, iid) => if (iid == instanceId) {
readSuccess(r) match {
case Some(snr) => replay(snr, subscribe = true)
Expand All @@ -184,12 +192,14 @@ trait EventsourcedWriter[R, W] extends EventsourcedView {
case ReplaySuccess(events, progress, iid) => if (iid == instanceId) {
events.foreach(receiveEvent)
if (numPending > 0) {
context.become(initiatingWrite(progress) orElse initiating)
context.become(initiatingWrite(progress) orElse initiating(settings.replayRetryMax))
write(instanceId)
} else replay(progress + 1L)
} else {
replay(progress + 1L)
}
}
case other =>
super.initiating(other)
super.initiating(replayAttempts)(other)
}

/**
Expand All @@ -210,7 +220,7 @@ trait EventsourcedWriter[R, W] extends EventsourcedView {
private def initiatingWrite(progress: Long): Receive = {
case WriteSuccess(r, iid) => if (iid == instanceId) {
writeSuccess(r)
context.become(initiating)
context.become(initiating(settings.replayRetryMax))
replay(progress + 1L)
}
case WriteFailure(cause, iid) => if (iid == instanceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,12 @@ object EventsourcingProtocol {
/**
* Failure reply after a [[Replay]].
*/
case class ReplayFailure(cause: Throwable, instanceId: Int)
case class ReplayFailure(cause: Throwable, replayProgress: Long, instanceId: Int)

/**
* Internal message to trigger a new [[Replay]] attempt
*/
private[eventuate] case class ReplayRetry(replayProgress: Long)

/**
* Instructs an event log to delete events with a sequence nr less or equal a given one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ abstract class EventLog[A <: EventLogState](id: String) extends Actor with Event
}
read(adjustFromSequenceNr(from), clock.sequenceNr, max, emitterAggregateId) onComplete {
case Success(r) => sdr ! ReplaySuccess(r.events, r.to, iid)
case Failure(e) => sdr ! ReplayFailure(e, iid)
case Failure(e) => sdr ! ReplayFailure(e, from, iid)
}
case Replay(from, max, subscriber, None, iid) =>
import services.readDispatcher
Expand All @@ -397,7 +397,7 @@ abstract class EventLog[A <: EventLogState](id: String) extends Actor with Event
}
read(adjustFromSequenceNr(from), clock.sequenceNr, max) onComplete {
case Success(r) => sdr ! ReplaySuccess(r.events, r.to, iid)
case Failure(e) => sdr ! ReplayFailure(e, iid)
case Failure(e) => sdr ! ReplayFailure(e, from, iid)
}
case r @ ReplicationRead(from, max, scanLimit, filter, targetLogId, _, currentTargetVersionVector) =>
import services.readDispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void shouldCallFailedRecoverCompletionHandler() {
logProbe.sender().tell(new LoadSnapshotSuccess(Option.empty(), instanceId), logProbe.ref());
logProbe.expectMsg(new Replay(1L, MAX_REPLAY_SIZE, Option.apply(actor), Option.empty(), instanceId));

actor.tell(new ReplayFailure(FAILURE, instanceId), getRef());
actor.tell(new ReplayFailure(FAILURE, 1L, instanceId), getRef());
msgProbe.expectMsg(FAILURE);
}

Expand Down
5 changes: 5 additions & 0 deletions eventuate-core/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
akka.loglevel = "ERROR"
akka.test.single-expect-default = 20s

eventuate.log {
replay-retry-delay = 5ms
replay-retry-max = 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package com.rbmhtechnology.eventuate

import akka.actor._
import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.scalatest._

import scala.concurrent.Future
import scala.util._

object EventsourcedViewSpec {
Expand Down Expand Up @@ -475,7 +475,7 @@ class EventsourcedViewSpec extends TestKit(ActorSystem("test")) with WordSpecLik
logProbe.expectMsg(LoadSnapshot(emitterIdA, instanceId))
logProbe.sender() ! LoadSnapshotSuccess(None, instanceId)
logProbe.expectMsg(Replay(1L, Some(actor), instanceId))
actor ! ReplayFailure(TestException, instanceId)
actor ! ReplayFailure(TestException, 1L, instanceId)
msgProbe.expectMsg(TestException)
}
"support command handler behavior changes" in {
Expand Down Expand Up @@ -536,3 +536,67 @@ class EventsourcedViewSpec extends TestKit(ActorSystem("test")) with WordSpecLik
}
}
}

object EventsourcedViewReplaySpec {
val MaxRetries = 5
val config = ConfigFactory.parseString(
s"""
|eventuate.log.replay-retry-max = $MaxRetries
|eventuate.log.replay-retry-delay = 5ms
""".stripMargin)
}

class EventsourcedViewReplayRetrySpec extends TestKit(ActorSystem("test", EventsourcedViewReplaySpec.config)) with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach {
import EventsourcedViewReplaySpec._
import EventsourcedViewSpec._
import EventsourcingProtocol._

var instanceId: Int = _
var logProbe: TestProbe = _
var msgProbe: TestProbe = _

override def beforeEach(): Unit = {
instanceId = EventsourcedView.instanceIdCounter.get
logProbe = TestProbe()
msgProbe = TestProbe()
}

override def afterAll(): Unit =
TestKit.shutdownActorSystem(system)

def unrecoveredCompletionView(): ActorRef =
system.actorOf(Props(new TestCompletionView(logProbe.ref, msgProbe.ref)))

"An EventsourcedView" must {
"retry replay on failure" in {
val actor = unrecoveredCompletionView()

logProbe.expectMsg(LoadSnapshot(emitterIdA, instanceId))
logProbe.sender() ! LoadSnapshotSuccess(None, instanceId)

logProbe.expectMsg(Replay(1L, Some(actor), instanceId))
actor ! ReplayFailure(TestException, 1L, instanceId)

1 to MaxRetries foreach { _ =>
logProbe.expectMsg(Replay(1L, None, instanceId))
actor ! ReplayFailure(TestException, 1L, instanceId)
}

msgProbe.expectMsg(TestException)
}
"successfully finish recovery after replay retry" in {
val actor = unrecoveredCompletionView()

logProbe.expectMsg(LoadSnapshot(emitterIdA, instanceId))
logProbe.sender() ! LoadSnapshotSuccess(None, instanceId)

logProbe.expectMsg(Replay(1L, Some(actor), instanceId))
actor ! ReplayFailure(TestException, 1L, instanceId)

logProbe.expectMsg(Replay(1L, None, instanceId))
actor ! ReplaySuccess(Nil, 0L, instanceId)

msgProbe.expectMsg("success")
}
}
}
Loading

0 comments on commit f5b5358

Please sign in to comment.