Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

support retry on recovery #352

Merged
merged 1 commit into from
Nov 4, 2016

Conversation

kongo2002
Copy link
Contributor

Hi,

especially during recovery of huge event logs it is desirable to have some kind of retry-mechanism of the event log's replay. Otherwise a small hiccup on the storage backend that didn't respond in a timely manner will lead to an immediate shutdown of the EventsourcedView and therefore discard all of its progress so far.

Cheers,
Gregor

@@ -44,6 +44,9 @@ eventuate {
# resumed automatically after the replayed event batch has been handled
# (= replay backpressure).
replay-batch-size = 4096

# Maximum number of replay attempts before finally stopping the actor itself.
max-replay-attempts = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a naming convention for these kind of parameters (*-retry-max)

@@ -133,6 +136,12 @@ trait EventsourcedView extends Actor with Stash {
settings.replayBatchSize

/**
* Maximum number of replay attempts before finally stopping the actor itself.
*/
def maxReplayAttempts: Int =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only overridden in tests. Why do we need this method instead of a special test config?

@@ -84,7 +84,7 @@ object EventsourcingProtocol {
/**
* Failure reply after a [[Replay]].
*/
case class ReplayFailure(cause: Throwable, instanceId: Int)
case class ReplayFailure(cause: Throwable, fromSequenceNr: Long, instanceId: Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please name this parameter like the corresponding one in ReplaySuccess. They are semantically equivalent.

// retry replay request while decreasing the remaining attempts
logger.warning("replay failed - {} attempts remaining [{}]", remainingAttempts, cause.getMessage)
context.become(initiating(remainingAttempts))
replay(from)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whenever Eventuate retries something it also delays the retry by a configurable amount of time. This should be done here as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Akka Streams adapter currently uses an adapter-specific read-retry-delay parameter for such a delay. When introducing a retry-delay for replay, the adapter should be changed to use that new config parameter instead.

Try(onRecovery(Failure(cause)))
context.stop(self)
case ReplayFailure(cause, from, iid) => if (iid == instanceId) {
val remainingAttempts = replayAttempts - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is remainingAttemps really needed? Set replayAttempts parameter accordingly and use it directly.

val remainingAttempts = replayAttempts - 1
if (remainingAttempts < 1) {
// all replay attempts exceeded -> stop the actor
logger.error(cause, "replay failed ({} attempts exceeded), stopping self", maxReplayAttempts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the number of attempts exceeded or reached?

val actor = unrecoveredEventsourcedWriter()
actor ! "cmd"
processRead(Success("rs"))
processLoad(actor)
processReplay(actor, 1)
appProbe.expectMsg("cmd")
}
"retry on failure" in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry what on which failure?

logProbe.expectMsg(Replay(1, 2, None, instanceId))
logProbe.sender() ! ReplaySuccess(Nil, 0L, instanceId)

appProbe.expectMsg("cmd")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you test command stashing here again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that way we can easily ensure that the recovery succeeds after all I think

msgProbe.expectMsg(TestException)
}
"retry recovery on replay failure" in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is recovery or replay retried?

@krasserm krasserm self-assigned this Nov 3, 2016
Copy link
Contributor

@krasserm krasserm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after including new review comments

context.stop(self)
case ReplayFailure(cause, progress, iid) => if (iid == instanceId) {
if (replayAttempts < 1) {
// all replay attempts exceeded -> stop the actor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maximum number of replay attempts reached? Is that comment needed at all? It just repeats what the logger.error in the next line says

case ReplayFailure(cause, progress, iid) => if (iid == instanceId) {
if (replayAttempts < 1) {
// all replay attempts exceeded -> stop the actor
logger.error(cause, "replay failed ({} retries reached), stopping self", settings.replayRetryMax)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replay failed (maximum number of {} replay attempts reached), stopping self (see also next comment)

} else {
// retry replay request while decreasing the remaining attempts
val attemptsRemaining = replayAttempts - 1
logger.warning("replay failed - {} attempts remaining [{}] - scheduling retry in {}ms",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.warning("replay failed: [{}] ({} replay attempts remaining), scheduling retry in {}ms", cause.getMessage, attemptsRemaining, settings.replayRetryDelay.toMillis) (more consistent with previous logger.error)

}
case ReplayRetry(from) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from -> progress like in matching ReplaySuccess and ReplayFailure?

@@ -536,3 +536,44 @@ class EventsourcedViewSpec extends TestKit(ActorSystem("test")) with WordSpecLik
}
}
}

object EventsourcedViewReplaySpec {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be called EventsourcedViewReplayRetrySpec because other replay tests run elsewhere.

import EventsourcedViewSpec._
import EventsourcingProtocol._

val instanceId: Int = EventsourcedView.instanceIdCounter.get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break as soon as you add a new test below. Add to beforeEach.


val instanceId: Int = EventsourcedView.instanceIdCounter.get
val logProbe: TestProbe = TestProbe()
val msgProbe: TestProbe = TestProbe()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for these two probes


msgProbe.expectMsg(TestException)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No success test case?

logProbe.sender() ! ReplaySuccess(Nil, 0L, instanceId)

appProbe.expectMsg("cmd")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No failure test case?

@krasserm krasserm added this to the 0.8 milestone Nov 4, 2016
Copy link
Contributor

@krasserm krasserm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Please squash and push. Thanks for your contribution, Gregor!

@krasserm krasserm merged commit 331d6cd into RBMHTechnology:master Nov 4, 2016
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants