-
Notifications
You must be signed in to change notification settings - Fork 100
Conversation
@@ -44,6 +44,9 @@ eventuate { | |||
# resumed automatically after the replayed event batch has been handled | |||
# (= replay backpressure). | |||
replay-batch-size = 4096 | |||
|
|||
# Maximum number of replay attempts before finally stopping the actor itself. | |||
max-replay-attempts = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a naming convention for these kind of parameters (*-retry-max
)
@@ -133,6 +136,12 @@ trait EventsourcedView extends Actor with Stash { | |||
settings.replayBatchSize | |||
|
|||
/** | |||
* Maximum number of replay attempts before finally stopping the actor itself. | |||
*/ | |||
def maxReplayAttempts: Int = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is only overridden in tests. Why do we need this method instead of a special test config?
@@ -84,7 +84,7 @@ object EventsourcingProtocol { | |||
/** | |||
* Failure reply after a [[Replay]]. | |||
*/ | |||
case class ReplayFailure(cause: Throwable, instanceId: Int) | |||
case class ReplayFailure(cause: Throwable, fromSequenceNr: Long, instanceId: Int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please name this parameter like the corresponding one in ReplaySuccess
. They are semantically equivalent.
// retry replay request while decreasing the remaining attempts | ||
logger.warning("replay failed - {} attempts remaining [{}]", remainingAttempts, cause.getMessage) | ||
context.become(initiating(remainingAttempts)) | ||
replay(from) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whenever Eventuate retries something it also delays the retry by a configurable amount of time. This should be done here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Akka Streams adapter currently uses an adapter-specific read-retry-delay parameter for such a delay. When introducing a retry-delay for replay, the adapter should be changed to use that new config parameter instead.
Try(onRecovery(Failure(cause))) | ||
context.stop(self) | ||
case ReplayFailure(cause, from, iid) => if (iid == instanceId) { | ||
val remainingAttempts = replayAttempts - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is remainingAttemps
really needed? Set replayAttempts
parameter accordingly and use it directly.
val remainingAttempts = replayAttempts - 1 | ||
if (remainingAttempts < 1) { | ||
// all replay attempts exceeded -> stop the actor | ||
logger.error(cause, "replay failed ({} attempts exceeded), stopping self", maxReplayAttempts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the number of attempts exceeded or reached?
val actor = unrecoveredEventsourcedWriter() | ||
actor ! "cmd" | ||
processRead(Success("rs")) | ||
processLoad(actor) | ||
processReplay(actor, 1) | ||
appProbe.expectMsg("cmd") | ||
} | ||
"retry on failure" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retry what on which failure?
logProbe.expectMsg(Replay(1, 2, None, instanceId)) | ||
logProbe.sender() ! ReplaySuccess(Nil, 0L, instanceId) | ||
|
||
appProbe.expectMsg("cmd") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you test command stashing here again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that way we can easily ensure that the recovery succeeds after all I think
msgProbe.expectMsg(TestException) | ||
} | ||
"retry recovery on replay failure" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is recovery or replay retried?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after including new review comments
context.stop(self) | ||
case ReplayFailure(cause, progress, iid) => if (iid == instanceId) { | ||
if (replayAttempts < 1) { | ||
// all replay attempts exceeded -> stop the actor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maximum number of replay attempts reached? Is that comment needed at all? It just repeats what the logger.error
in the next line says
case ReplayFailure(cause, progress, iid) => if (iid == instanceId) { | ||
if (replayAttempts < 1) { | ||
// all replay attempts exceeded -> stop the actor | ||
logger.error(cause, "replay failed ({} retries reached), stopping self", settings.replayRetryMax) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replay failed (maximum number of {} replay attempts reached), stopping self
(see also next comment)
} else { | ||
// retry replay request while decreasing the remaining attempts | ||
val attemptsRemaining = replayAttempts - 1 | ||
logger.warning("replay failed - {} attempts remaining [{}] - scheduling retry in {}ms", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.warning("replay failed: [{}] ({} replay attempts remaining), scheduling retry in {}ms", cause.getMessage, attemptsRemaining, settings.replayRetryDelay.toMillis)
(more consistent with previous logger.error
)
} | ||
case ReplayRetry(from) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from
-> progress
like in matching ReplaySuccess
and ReplayFailure
?
@@ -536,3 +536,44 @@ class EventsourcedViewSpec extends TestKit(ActorSystem("test")) with WordSpecLik | |||
} | |||
} | |||
} | |||
|
|||
object EventsourcedViewReplaySpec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be called EventsourcedViewReplayRetrySpec
because other replay tests run elsewhere.
import EventsourcedViewSpec._ | ||
import EventsourcingProtocol._ | ||
|
||
val instanceId: Int = EventsourcedView.instanceIdCounter.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will break as soon as you add a new test below. Add to beforeEach
.
|
||
val instanceId: Int = EventsourcedView.instanceIdCounter.get | ||
val logProbe: TestProbe = TestProbe() | ||
val msgProbe: TestProbe = TestProbe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for these two probes
|
||
msgProbe.expectMsg(TestException) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No success test case?
logProbe.sender() ! ReplaySuccess(Nil, 0L, instanceId) | ||
|
||
appProbe.expectMsg("cmd") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No failure test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Please squash and push. Thanks for your contribution, Gregor!
8e8686b
to
f5b5358
Compare
Hi,
especially during recovery of huge event logs it is desirable to have some kind of retry-mechanism of the event log's replay. Otherwise a small hiccup on the storage backend that didn't respond in a timely manner will lead to an immediate shutdown of the
EventsourcedView
and therefore discard all of its progress so far.Cheers,
Gregor