-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Projection Catch-up #1221
Projection Catch-up #1221
Conversation
Remove outdated documentation.
Introduce the internal ordering for the `Event`s stored in the `EventStore`. Check-in numerous debug `System.out`s in order to trace the issues in the concurrent-heavy environment.
Do not record the last order read during the catch-up. Revert the modifications made to `EventContext`.
# Conflicts: # version.gradle
when reading the event history during the catch-up.
@alexander-yevsyukov @dmdashenkov I will still need to merge with the today-merged PR, but please feel free to review again. Not much is expected to change upon the back-merge from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see my comments. There's more I need to review.
|
||
/* | ||
* Handlers acting when process is {@code Not Started} | ||
************************/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This header delimiter should be either longer (to align with the end of the text above), or be just one *
.
|
||
/* | ||
* Handlers acting in the process finalization | ||
************************/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment for this delimiter. I think one star would be a safer bet.
|
||
/* | ||
* Handlers dealing with the process completion | ||
************************/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here, please, too.
|
||
/* | ||
* Utilities and re-usable actions | ||
************************/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexander-yevsyukov from what I can see from your reaction, the sections are a bad idea at all.
Going to remove these here, and let's discuss our code style wiki page.
private static final TypeUrl TYPE = TypeUrl.from(CatchUp.getDescriptor()); | ||
|
||
/** | ||
* The duration of the "turbulence" period, counting back from the current time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does the “turbulence” mean in the context of the catch-up operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also a static utility which does the calculation in this. Consider extracting them to a Turbulence
class with the documentation which explains the concept.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexander-yevsyukov there is a decent description of the turbulence period in the class-level documentation. I have added it to address your previous comment a few days ago.
Either you have missed it (which seems more likely), or you really want this concept to be introduced at full speed. I think that's going to be an overdesign, but let's try.
|
||
if (job.matches(message)) { | ||
DispatchingId dispatchingId = new DispatchingId(message); | ||
if (jobStatus == IN_PROGRESS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using switch/case here. It would be easier to grasp the branches as alternatives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, can we look at this code from the other side: operations that the station performs. To me it looks like there are several outcomes:
conveyour.remove(message);
dispatchToCatchUp.put(dispatchingId, message);
conveyor.keepForLonger(message, HOW_LONG_TO_KEEP);
If we formulate conditions to these operations, we probably can reduce the number of if/else branches. Otherwise, this piece is really hard to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexander-yevsyukov This piece is explained by the class-level doc. The same applies the LiveDeliveryStation
.
This whole class is itself a method object, as is the LiveDeliveryStation
.
I will try to extract pieces one more time. It's may make it look better designed. But the extra call chain will make it harder to follow.
And, to avoid creating extra objects, I will probably have to modify the variables by references.
Let's see if it makes the code easier for non-authors :)
|
||
for (InboxMessage message : conveyor) { | ||
for (CatchUp job : jobs) { | ||
CatchUpStatus jobStatus = job.getStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't have to do this if the job does not matches the messag.
for (CatchUp job : jobs) { | ||
CatchUpStatus jobStatus = job.getStatus(); | ||
|
||
if (job.matches(message)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about having this:
if (!job.matches(message)) {
continue;
}
* The comparator which sorts the messages chronologically, but ensures that if there is | ||
* a {@link CatchUpStarted} event in the sorted batch, it goes on top. | ||
*/ | ||
private static final class CatchUpMessageComparator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll need to revisit the delivery
package soon. Presumably, when we get down to working on new Buses. The process()
method in this class really needs refactoring, and we need a Method Object there. This Method Object would be really linked with the CatchUpStation
class, while this Comparator is a more separate thing. It can be easier understood being at the top level of the package. It won't hurt. We have many CatchUp
-classes there. One more there, won't hurt if the price for this would be the process(
method which is easier to understand.
* dispatching | ||
*/ | ||
@Override | ||
public final Result process(Conveyor conveyor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a case similar to CatchUpStation
here. Only the author of the code can understand what's going on in this method and why. The Javadoc for this method does not help much because it says that it dispatches messages. It's understood already by the name of the method process()
and its parameter Conveyor
. But how does it do it? Why do we have logic which is that complex?
I think we need another Method Object here. Please consider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm done with this round. Please see my comments and questions.
* <p>Has its own {@link Inbox}, so the messages arriving to it are dispatched | ||
* by the {@link Delivery}. | ||
*/ | ||
class ShardMaintenanceProcess extends AbstractEventSubscriber { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this class final?
static Result emptyResult() { | ||
DeliveryErrors noErrors = DeliveryErrors.newBuilder() | ||
.build(); | ||
return new Result(0, noErrors); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we have the whole thing as a constant?
* are compared lexicographically. | ||
*/ | ||
@Internal | ||
public final class EventComparator implements Comparator<Event>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now this thing captures the whole idea of comparing events and says that there's only way of doing this. The static constant chronologically
does not relieve it. It simply repeats what the Javadoc says, but shortly. It's not a really big issue, but it's better to address it now. When we need other event comparators, we'd have to change this class significantly. I see two options.
Option 1 Have EventComparator
as an abstract class which a) simply implements the Comparator
interface with Event
as a generic parameter and b) returns implementations of a specific kind. One of them would be EventComparator.chonologically()
.
Option 2 Have an interface EventComparator
, and ChronologicalEventComparator
as its implementation.
The first option looks better for me, assuming the class is @Internal
.
*/ | ||
public abstract class ProjectionRepository<I, P extends Projection<I, S, ?>, S extends EntityState> | ||
extends EventDispatchingRepository<I, P, S> { | ||
|
||
private @MonotonicNonNull Inbox<I> inbox; | ||
|
||
private @MonotonicNonNull CatchUpProcess<I> catchUpProcess; | ||
|
||
private @MonotonicNonNull RepositoryCache<I, P> cache; | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please review the Javadoc of this method. I think it's outdated. It probably should mention something about Catch-Up. I'm not sure.
The piece about state updates routing looks dated. Please look into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appended the docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The piece on the state update routing makes sense, as discussed vocally.
Delivery delivery = ServerEnvironment.instance() | ||
.delivery(); | ||
initInbox(delivery); | ||
initCatchUp(context, delivery); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we always have to call the catch-up initialization?
How do I run the catch-up for entities of my repositories? Can we have something about it in the class Javadoc? Or, maybe there should be a better place for this. Please advise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appended the documentation.
checkNotNull(timestamp); | ||
projectionStorage().writeLastHandledEventTime(timestamp); | ||
/** | ||
* Repeats the dispatching of the events from the event log to the requested entities |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I see how the catch-up is started, but can we link to this method from 1) Javadoc of the class 2) registration method Javadoc? We need to hint the reader that this ability exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* progress | ||
*/ | ||
public void catchUp(Timestamp since, @Nullable Set<I> ids) throws | ||
CatchUpAlreadyStartedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have throws
and the name of the exception class on the same next line?
* the value to be used as an argument | ||
* @return the object to use as an argument in the message-formatting API | ||
*/ | ||
private static Object lazyArg(Object value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a reusable utility for the Stringifiers
class or somewhere near. Please find a better place for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, perhaps, we may want to have our custom precondition utilities that would accept LazyArg
(or something similar). To see what I mean, please see how lazy logging is done in Flogger in com.google.common.flogger.LogContext.logImpl()
method.
This isn't a small, but useful change. We may have some other checks that accept time consuming operations in other places in our code. We can improve that altogether, and then change this piece too. If you concur, please file an issue referring to this method.
Set<I> catchUpTargets; | ||
if (envelope.message() instanceof CatchUpSignal) { | ||
catchUpTargets = restrictToIds == null | ||
? ImmutableSet.copyOf(index()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This operation is potentially big and long (assuming we go for the index()
to the storage). Can we run it once per repository under the catch up operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is to have the most recent index()
, so we shouldn't cache it.
In the real use cases, it is going to be used only if a catch-up for all the repository entities is requested. If this repository is a new one and requires catching up, the index is going to be empty. And if the repository entities are all messed up and require the complete re-catch-up,
it will and should take long. So the expectations are managed.
/** | ||
* A mixin for the state of the {@linkplain CatchUpProcess catch-up process job}. | ||
*/ | ||
public interface CatchUpMixin extends CatchUpOrBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface can be package-private, so that the framework users do not implement it. Also, please mark as a @GeneratedMixin
.
* @return {@code true} if the message matches the job, {@code false} otherwise | ||
*/ | ||
@VisibleForTesting | ||
default boolean matches(InboxMessage message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not visible for testing. This is visible because of a lack of language expressiveness. I'd mark it as @Internal
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed this annotation and marked the whole class as Internal
.
It originally was public
, as this interface was in the different package.
return timeComparison; | ||
} | ||
int versionComparison = Integer.compare(m1.getVersion(), m2.getVersion()); | ||
if(versionComparison != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a space.
I'd also consider using a Comparator.comparing(..)
chain instead of implementing the interface directly.
.toSubscriber(message.getId()); | ||
} | ||
|
||
private class EventEndpoint implements MessageEndpoint<ShardIndex, EventEnvelope> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make it final
and document.
* <p>Depending on the status of the job, the matched messages are processed accordingly. See | ||
* more on that below. | ||
* | ||
* <b>1. Catch-up {@code IN_PROGRESS}.</b> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please have these headers coming in separate paragraphs. Now they come as a part of a proceeding paragraph.
Mention the catch-up in the class-level docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pushing it further per our comments. LGTM
This PR introduces the
Projection
catch-up and re-structures theDelivery
routines.Catch-up
A
Projection
catch-up is a replay of the historical events from the event log on the requested projection instances, starting a certain point in the past.During the catch-up, the previous state of the catching-up projection instance is killed. The new state then starts to continuously build-up from the historical events, starting from the point in time specified.
Use-cases
Typical scenarios in which a catch-up may be helpful are listed below.
Projection
type is registered and the instances of this type need to be populated with the historical data.Projection
has changed and its instances must be re-built.Projection
instances.Starting a catch-up
The
ProjectionRepository
now exposes an API to start a catch-up of its instances:It is impossible to catch-up the same instance more than once at a time. A respective
CatchUpAlreadyStartedException
is then thrown.In case a new repository has been registered and needs to be filled with the projections according to the event history, another API endpoint is available:
Once the catch-up is completed, a special
CatchUpCompleted
event is emitted by the framework.During the catch-up, the live messages dispatched to the catching-up entities are not delivered to them. When the catch-up is completed, the dispatching of the live events continues in a normal fashion. The framework guarantees there will be no events lost during the catch-up.
Configuring the catch-up through
DeliveryBuilder
From now on, in addition to delivering the messages sent in a real-time,
Delivery
dispatches the historical events sent to the catching-up projections. These events are dispatched through the same shards as the live messages. A specialCatchUpStation
is responsible for handling this use-case. See more on that in the respective section.To control how many historical events are read and put into shards, the end-users may configure the maximum number of messages read from the history per at a time.
This is helpful to balance the per-shard throughput so that the live messages are still dispatched through the same shards in a reasonable time.
The statuses of the ongoing catch-up processes are stored in a dedicated
CatchUpStorage
, which can also be customized either through theStorageFactory
or via theDeliveryBuilder
:The latter is useful, if the creation of a custom
CatchUpStorage
requires more application-specific context.Changes in
Delivery
As previously the delivery process for each shard index is now split into
DeliveryStage
. In the scope ofeach stage, a certain number of messages is read from the respective shard of the
Inbox
.In the scope of each
DeliveryStage
, the page of theInboxMessage
s is placed to the newly introducedConveyor
, which is responsible for tracking the status of each message. The conveyor instance is then run through the pipeline of stations, each modifying the state of the messages in the conveyor.At the end of the pipeline, the changes made to the messages are committed to the underlying
InboxStorage
in a bulk. Such an approach allows minimizing the number of requests sent to the storage.InboxMessage
s now have a newTimestamp keep_until
field, which tells for how long the message should be kept in its storage after it is delivered. In this way, the already delivered messages are used as a source for deduplication. As previously, the framework users may configure this valuevia the
DeliveryBuilder
:Below is the list of the conveyor stations in the current
Delivery
pipeline.This station is responsible for dispatching the historical events in
TO_CATCH_UP
status to the respective targets. Also, while the target entity is under a catch-up,all the live messages headed to it are removed by this station.
Once the catch-up is completed, this station handles the transition period, in which the last batch of the historical events and live messages are dispatched together. See the
CatchUpStation
API documentation for more details.This station is responsible for dispatching the messages sent in a real-time by filtering the conveyor contents for the messages in
TO_DELIVER
Another responsibility of this station is to set for how long the delivered messages should be kept according to the idempotence window settings. See theLiveDeliveryStation
API documentation for more details.This station removes the messages which are already delivered and are no longer needed for the
deduplication. See the
CleanupStation
for the description.Deduplication
During the dispatching, the
Conveyor
keeps track of the delivered messages. The stations performing the actual message dispatching rely on this knowledge and deduplicate the messages prior to calling the target's endpoint.Additionally, the
Delivery
now provides aDeliveredMessages
, which is a cache of recently delivered messages. Each instance of theConveyor
has access to it and uses it in deduplication procedures.Shard maintenance
In this changeset, a special
ShardMaintenanceProcess
has been introduced. Via it, theDelivery
performs the maintenance of the shards. In the scope of this PR, a specialShardProcessingRequested
event is handled by this process. It is guaranteed to be dispatched to the maintenance process via the selected shard, thus triggering theDelivery
to handle the messages from it. It is required to ensure that all historical events are delivered to all their targets. See more on that in the description of theCatchUpProcess
down below.How
CatchUpProcess
worksThe process is started by
CatchUpStarter
composing a request received from theProjectionRepository
'scatchUp(..)
/catchUpAll(..)
methods. A correspondingCatchUpRequested
event is emitted.In its lifecycle, this process moves through several statuses.
Not started
The process is created in this status upon receiving a
CatchUpRequested
event. The actionsinclude:
CatchUpStarted
event is emitted. The target projection repository listens to this event and kills the state of the matching entities.IN_PROGRESS
status.IN_PROGRESS
When the process is in this status, the event history is read and the matching events are sent to the
Inbox
es of the corresponding projections.The catch-up maintains this status until history is read until the point in time, which is very close to the
Time.currentTime()
.At this stage, the actions are as follows.
The historical event messages are read from the
EventStore
respecting the time range requested and the time of the last read operation performed by this process. The maximum number of the events read is determined byDeliveryBuilder.setCatchUpPageSize(int)
.Depending on the targets requested for the catch-up, the events are posted to the corresponding entities.
Unless the timestamps of the events are getting close to the current time, an
HistoryEventsRecalled
is emitted, leaving the process in theIN_PROGRESS
status and triggering the next round similar to this one.If the timestamps of the events read on this step are as close to the current time as
CatchUpProcessBuilder.withTurbulencePeriod(Duration)
value — so-called "turbulence" period, which by default is500
ms — theHistoryFullyRecalled
is emitted.FINALIZING
The process moves to this status when the event history has been fully recalled and the corresponding
HistoryFullyRecalled
is received.At this stage the
Delivery
stops the propagation of the events to the catch-up messages, waiting for this process to populate the inboxes with the messages arriving to be dispatched during the "turbulence" period.Potentially, the inboxes will contain the duplicates produced by both the live users and this process. To deal with it, a deduplication is performed by the
Delivery
.See
CatchUpStation
API documentation for more details on that.The actions are as follows.
All the remaining messages of the matching event types are read
EventStore
. In this operation, the read limits set by theDelivery
are NOT used, since the goal is to read the remainder of the events.If there were no events read, the
CatchUpCompleted
is emitted and the process is moved to theCOMPLETED
status.If there were some events read, the
LiveEventsPickedUp
event is emitted. This allows some time for the posted events to become visible to theDelivery
. The reacting handler of theLiveEventsPickedUp
completes the process, once this event is delivered to this process.COMPLETED
Once the process moves to the
COMPLETED
status, the correspondingDelivery
routines deduplicate, reorder and dispatch the "paused" events. After that, normal live delivery flow is resumed.To ensure that all the shards in which the "paused" historical events reside are processed, this process additionally emits the "maintenance"
ShardProcessingRequested
events for each shard involved. By dispatching them, the system guarantees that theDelivery
observes theCOMPLETED
status of this process and delivers the remainder of the messages.Other changes
AbstractStatefulReactor
The
CatchUp
process manager implemented within this changeset should have had access to theEventStore
of the Domain Bounded Context. Therefore it was not possible to have it registered in the System counterpart. However, it is impossible to have a process manager with a certain state more than in one Bounded Context.Therefore, a special
AbstractStatefulReactor
was introduced. It is anAbstractEventReactor
with state. Unlike conventionalAbstractEventReactor
s, the instances of this one have their ownInbox
es and are served with the messages throughDelivery
. TheCatchUpProcess
is one good example of such an event reactor.Limit
forEventStore
queriesIt is now possible to specify the maximum number of results when reading from an
EventStore
:lastHandledEventTime
A per-
ProjectionRepository
property calledlastHandledEventTime
has been removed as obsolete.Idempotence window
In order to be closer in the meaning, the
idempotenceWindow
configuration property has been renamed todeduplicationWindow
:Version update
The framework version is set to
1.4.6
.