Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projection Catch-up #1221

Merged
merged 200 commits into from
Feb 18, 2020
Merged

Projection Catch-up #1221

merged 200 commits into from
Feb 18, 2020

Conversation

armiol
Copy link
Contributor

@armiol armiol commented Jan 9, 2020

This PR introduces the Projection catch-up and re-structures the Delivery routines.

Catch-up

A Projection catch-up is a replay of the historical events from the event log on the requested projection instances, starting a certain point in the past.

During the catch-up, the previous state of the catching-up projection instance is killed. The new state then starts to continuously build-up from the historical events, starting from the point in time specified.

Use-cases

Typical scenarios in which a catch-up may be helpful are listed below.

  • A new Projection type is registered and the instances of this type need to be populated with the historical data.
  • The business logic of an existing Projection has changed and its instances must be re-built.
  • A technical failure has occurred somewhere in the past, breaking the state of some of the Projection instances.

Starting a catch-up

The ProjectionRepository now exposes an API to start a catch-up of its instances:

OrderViewProjectionRepository repo = new OrderViewProjectionRepository();

// skipping the registration in the Bounded Context.

Timestamp catchUpSince = ...;
OrderId myOrderId = ...;
OrderId anotherOrderId = ...;

repo.catchUp(catchUpSince, ImmutableSet.of(myOrderId, anotherOrderId));

It is impossible to catch-up the same instance more than once at a time. A respective CatchUpAlreadyStartedException is then thrown.

In case a new repository has been registered and needs to be filled with the projections according to the event history, another API endpoint is available:

QuarterlyReportRepository repo = new QuarterlyReportRepository();

// skipping the registration in the Bounded Context.

Timestamp catchUpSince = ...;

repo.catchUpAll(catchUpSince);

Once the catch-up is completed, a special CatchUpCompleted event is emitted by the framework.

During the catch-up, the live messages dispatched to the catching-up entities are not delivered to them. When the catch-up is completed, the dispatching of the live events continues in a normal fashion. The framework guarantees there will be no events lost during the catch-up.

Configuring the catch-up through DeliveryBuilder

From now on, in addition to delivering the messages sent in a real-time, Delivery dispatches the historical events sent to the catching-up projections. These events are dispatched through the same shards as the live messages. A special CatchUpStation is responsible for handling this use-case. See more on that in the respective section.

To control how many historical events are read and put into shards, the end-users may configure the maximum number of messages read from the history per at a time.

Delivery.newBuilder()
        .setCatchUpPageSize(42)
        // ... the rest of the configuration

This is helpful to balance the per-shard throughput so that the live messages are still dispatched through the same shards in a reasonable time.

The statuses of the ongoing catch-up processes are stored in a dedicated CatchUpStorage, which can also be customized either through the StorageFactory or via the DeliveryBuilder:

Delivery.newBuilder()
        .setCatchUpStorage(myCustomStorage)
        // ... the rest of the configuration

The latter is useful, if the creation of a custom CatchUpStorage requires more application-specific context.

Changes in Delivery

As previously the delivery process for each shard index is now split into DeliveryStage. In the scope of
each stage, a certain number of messages is read from the respective shard of the Inbox.

In the scope of each DeliveryStage, the page of the InboxMessages is placed to the newly introduced Conveyor, which is responsible for tracking the status of each message. The conveyor instance is then run through the pipeline of stations, each modifying the state of the messages in the conveyor.
At the end of the pipeline, the changes made to the messages are committed to the underlying
InboxStorage in a bulk. Such an approach allows minimizing the number of requests sent to the storage.

InboxMessages now have a new Timestamp keep_until field, which tells for how long the message should be kept in its storage after it is delivered. In this way, the already delivered messages are used as a source for deduplication. As previously, the framework users may configure this value
via the DeliveryBuilder:

Delivery.newBuilder()
        .setIdempotenceWindow(Durations.fromSeconds(15))
        // ... the rest of the configuration

Below is the list of the conveyor stations in the current Delivery pipeline.

  1. Catch-up station

This station is responsible for dispatching the historical events in
TO_CATCH_UP status to the respective targets. Also, while the target entity is under a catch-up,
all the live messages headed to it are removed by this station.

Once the catch-up is completed, this station handles the transition period, in which the last batch of the historical events and live messages are dispatched together. See the CatchUpStation API documentation for more details.

  1. Live delivery station

This station is responsible for dispatching the messages sent in a real-time by filtering the conveyor contents for the messages in TO_DELIVER Another responsibility of this station is to set for how long the delivered messages should be kept according to the idempotence window settings. See the LiveDeliveryStation API documentation for more details.

  1. Cleanup station

This station removes the messages which are already delivered and are no longer needed for the
deduplication. See the CleanupStation for the description.

Deduplication

During the dispatching, the Conveyor keeps track of the delivered messages. The stations performing the actual message dispatching rely on this knowledge and deduplicate the messages prior to calling the target's endpoint.

Additionally, the Delivery now provides a DeliveredMessages, which is a cache of recently delivered messages. Each instance of the Conveyor has access to it and uses it in deduplication procedures.

Shard maintenance

In this changeset, a special ShardMaintenanceProcess has been introduced. Via it, the Delivery performs the maintenance of the shards. In the scope of this PR, a special ShardProcessingRequested event is handled by this process. It is guaranteed to be dispatched to the maintenance process via the selected shard, thus triggering the Delivery to handle the messages from it. It is required to ensure that all historical events are delivered to all their targets. See more on that in the description of the CatchUpProcess down below.

How CatchUpProcess works

The process is started by CatchUpStarter composing a request received from the ProjectionRepository's catchUp(..)/ catchUpAll(..) methods. A corresponding CatchUpRequested event is emitted.

In its lifecycle, this process moves through several statuses.

Not started

The process is created in this status upon receiving a CatchUpRequested event. The actions
include:

  • A CatchUpStarted event is emitted. The target projection repository listens to this event and kills the state of the matching entities.
  • The catch-up process in moved to the IN_PROGRESS status.

IN_PROGRESS

When the process is in this status, the event history is read and the matching events are sent to the Inboxes of the corresponding projections.

The catch-up maintains this status until history is read until the point in time, which is very close to the Time.currentTime().

At this stage, the actions are as follows.

  • The historical event messages are read from the EventStore respecting the time range requested and the time of the last read operation performed by this process. The maximum number of the events read is determined by DeliveryBuilder.setCatchUpPageSize(int).

  • Depending on the targets requested for the catch-up, the events are posted to the corresponding entities.

  • Unless the timestamps of the events are getting close to the current time, an HistoryEventsRecalled is emitted, leaving the process in the IN_PROGRESS status and triggering the next round similar to this one.

  • If the timestamps of the events read on this step are as close to the current time as CatchUpProcessBuilder.withTurbulencePeriod(Duration) value — so-called "turbulence" period, which by default is 500 ms — the HistoryFullyRecalled is emitted.

FINALIZING

The process moves to this status when the event history has been fully recalled and the corresponding HistoryFullyRecalled is received.

At this stage the Delivery stops the propagation of the events to the catch-up messages, waiting for this process to populate the inboxes with the messages arriving to be dispatched during the "turbulence" period.

Potentially, the inboxes will contain the duplicates produced by both the live users and this process. To deal with it, a deduplication is performed by the Delivery.
See CatchUpStation API documentation for more details on that.

The actions are as follows.

  • All the remaining messages of the matching event types are read EventStore. In this operation, the read limits set by the Delivery are NOT used, since the goal is to read the remainder of the events.

  • If there were no events read, the CatchUpCompleted is emitted and the process is moved to the COMPLETED status.

  • If there were some events read, the LiveEventsPickedUp event is emitted. This allows some time for the posted events to become visible to the Delivery. The reacting handler of the LiveEventsPickedUp completes the process, once this event is delivered to this process.

COMPLETED

Once the process moves to the COMPLETED status, the corresponding Delivery routines deduplicate, reorder and dispatch the "paused" events. After that, normal live delivery flow is resumed.

To ensure that all the shards in which the "paused" historical events reside are processed, this process additionally emits the "maintenance" ShardProcessingRequested events for each shard involved. By dispatching them, the system guarantees that the Delivery observes the COMPLETED status of this process and delivers the remainder of the messages.

Other changes

AbstractStatefulReactor

The CatchUp process manager implemented within this changeset should have had access to the EventStore of the Domain Bounded Context. Therefore it was not possible to have it registered in the System counterpart. However, it is impossible to have a process manager with a certain state more than in one Bounded Context.

Therefore, a special AbstractStatefulReactor was introduced. It is an AbstractEventReactor with state. Unlike conventional AbstractEventReactors, the instances of this one have their own Inboxes and are served with the messages through Delivery. The CatchUpProcess is one good example of such an event reactor.

Limit for EventStore queries

It is now possible to specify the maximum number of results when reading from an EventStore:

EventStreamQuery.Limit limit =  EventStreamQuery.Limit
                    .newBuilder()
                    .setValue(value)
                    .vBuild();
EventStreamQuery query = EventStreamQuery
                    .newBuilder()
                    .setAfter(longPast)
                    .setBefore(future)
                    .setLimit(limit)
                    .vBuild();
 eventStore.read(query, observer);                    

lastHandledEventTime

A per-ProjectionRepository property called lastHandledEventTime has been removed as obsolete.

Idempotence window

In order to be closer in the meaning, the idempotenceWindow configuration property has been renamed to deduplicationWindow:

Delivery.newBuilder()
        .setDeduplicationWindow(Durations.fromSeconds(10));

Version update

The framework version is set to 1.4.6.

Introduce the internal ordering for the `Event`s stored in the `EventStore`.
Check-in numerous debug `System.out`s in order to trace the issues in the concurrent-heavy environment.
Do not record the last order read during the catch-up.
Revert the modifications made to `EventContext`.
when reading the event history during the catch-up.
@armiol
Copy link
Contributor Author

armiol commented Feb 14, 2020

@alexander-yevsyukov @dmdashenkov I will still need to merge with the today-merged PR, but please feel free to review again. Not much is expected to change upon the back-merge from master.

Copy link
Contributor

@alexander-yevsyukov alexander-yevsyukov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comments. There's more I need to review.


/*
* Handlers acting when process is {@code Not Started}
************************/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This header delimiter should be either longer (to align with the end of the text above), or be just one *.


/*
* Handlers acting in the process finalization
************************/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment for this delimiter. I think one star would be a safer bet.


/*
* Handlers dealing with the process completion
************************/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here, please, too.


/*
* Utilities and re-usable actions
************************/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexander-yevsyukov from what I can see from your reaction, the sections are a bad idea at all.

Going to remove these here, and let's discuss our code style wiki page.

private static final TypeUrl TYPE = TypeUrl.from(CatchUp.getDescriptor());

/**
* The duration of the "turbulence" period, counting back from the current time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the “turbulence” mean in the context of the catch-up operation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a static utility which does the calculation in this. Consider extracting them to a Turbulence class with the documentation which explains the concept.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexander-yevsyukov there is a decent description of the turbulence period in the class-level documentation. I have added it to address your previous comment a few days ago.

Either you have missed it (which seems more likely), or you really want this concept to be introduced at full speed. I think that's going to be an overdesign, but let's try.


if (job.matches(message)) {
DispatchingId dispatchingId = new DispatchingId(message);
if (jobStatus == IN_PROGRESS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using switch/case here. It would be easier to grasp the branches as alternatives.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we look at this code from the other side: operations that the station performs. To me it looks like there are several outcomes:

  • conveyour.remove(message);
  • dispatchToCatchUp.put(dispatchingId, message);
  • conveyor.keepForLonger(message, HOW_LONG_TO_KEEP);

If we formulate conditions to these operations, we probably can reduce the number of if/else branches. Otherwise, this piece is really hard to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexander-yevsyukov This piece is explained by the class-level doc. The same applies the LiveDeliveryStation.

This whole class is itself a method object, as is the LiveDeliveryStation.

I will try to extract pieces one more time. It's may make it look better designed. But the extra call chain will make it harder to follow.
And, to avoid creating extra objects, I will probably have to modify the variables by references.

Let's see if it makes the code easier for non-authors :)


for (InboxMessage message : conveyor) {
for (CatchUp job : jobs) {
CatchUpStatus jobStatus = job.getStatus();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't have to do this if the job does not matches the messag.

for (CatchUp job : jobs) {
CatchUpStatus jobStatus = job.getStatus();

if (job.matches(message)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having this:

if (!job.matches(message)) {
     continue;
}

* The comparator which sorts the messages chronologically, but ensures that if there is
* a {@link CatchUpStarted} event in the sorted batch, it goes on top.
*/
private static final class CatchUpMessageComparator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll need to revisit the delivery package soon. Presumably, when we get down to working on new Buses. The process() method in this class really needs refactoring, and we need a Method Object there. This Method Object would be really linked with the CatchUpStation class, while this Comparator is a more separate thing. It can be easier understood being at the top level of the package. It won't hurt. We have many CatchUp-classes there. One more there, won't hurt if the price for this would be the process( method which is easier to understand.

* dispatching
*/
@Override
public final Result process(Conveyor conveyor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a case similar to CatchUpStation here. Only the author of the code can understand what's going on in this method and why. The Javadoc for this method does not help much because it says that it dispatches messages. It's understood already by the name of the method process() and its parameter Conveyor. But how does it do it? Why do we have logic which is that complex?

I think we need another Method Object here. Please consider.

Copy link
Contributor

@alexander-yevsyukov alexander-yevsyukov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm done with this round. Please see my comments and questions.

* <p>Has its own {@link Inbox}, so the messages arriving to it are dispatched
* by the {@link Delivery}.
*/
class ShardMaintenanceProcess extends AbstractEventSubscriber {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this class final?

static Result emptyResult() {
DeliveryErrors noErrors = DeliveryErrors.newBuilder()
.build();
return new Result(0, noErrors);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we have the whole thing as a constant?

* are compared lexicographically.
*/
@Internal
public final class EventComparator implements Comparator<Event>, Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this thing captures the whole idea of comparing events and says that there's only way of doing this. The static constant chronologically does not relieve it. It simply repeats what the Javadoc says, but shortly. It's not a really big issue, but it's better to address it now. When we need other event comparators, we'd have to change this class significantly. I see two options.

Option 1 Have EventComparator as an abstract class which a) simply implements the Comparator interface with Event as a generic parameter and b) returns implementations of a specific kind. One of them would be EventComparator.chonologically().

Option 2 Have an interface EventComparator, and ChronologicalEventComparator as its implementation.

The first option looks better for me, assuming the class is @Internal.

*/
public abstract class ProjectionRepository<I, P extends Projection<I, S, ?>, S extends EntityState>
extends EventDispatchingRepository<I, P, S> {

private @MonotonicNonNull Inbox<I> inbox;

private @MonotonicNonNull CatchUpProcess<I> catchUpProcess;

private @MonotonicNonNull RepositoryCache<I, P> cache;

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please review the Javadoc of this method. I think it's outdated. It probably should mention something about Catch-Up. I'm not sure.

The piece about state updates routing looks dated. Please look into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appended the docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The piece on the state update routing makes sense, as discussed vocally.

Delivery delivery = ServerEnvironment.instance()
.delivery();
initInbox(delivery);
initCatchUp(context, delivery);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always have to call the catch-up initialization?

How do I run the catch-up for entities of my repositories? Can we have something about it in the class Javadoc? Or, maybe there should be a better place for this. Please advise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appended the documentation.

checkNotNull(timestamp);
projectionStorage().writeLastHandledEventTime(timestamp);
/**
* Repeats the dispatching of the events from the event log to the requested entities
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see how the catch-up is started, but can we link to this method from 1) Javadoc of the class 2) registration method Javadoc? We need to hint the reader that this ability exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* progress
*/
public void catchUp(Timestamp since, @Nullable Set<I> ids) throws
CatchUpAlreadyStartedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have throws and the name of the exception class on the same next line?

* the value to be used as an argument
* @return the object to use as an argument in the message-formatting API
*/
private static Object lazyArg(Object value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a reusable utility for the Stringifiers class or somewhere near. Please find a better place for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, perhaps, we may want to have our custom precondition utilities that would accept LazyArg (or something similar). To see what I mean, please see how lazy logging is done in Flogger in com.google.common.flogger.LogContext.logImpl() method.

This isn't a small, but useful change. We may have some other checks that accept time consuming operations in other places in our code. We can improve that altogether, and then change this piece too. If you concur, please file an issue referring to this method.

Set<I> catchUpTargets;
if (envelope.message() instanceof CatchUpSignal) {
catchUpTargets = restrictToIds == null
? ImmutableSet.copyOf(index())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operation is potentially big and long (assuming we go for the index() to the storage). Can we run it once per repository under the catch up operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to have the most recent index(), so we shouldn't cache it.

In the real use cases, it is going to be used only if a catch-up for all the repository entities is requested. If this repository is a new one and requires catching up, the index is going to be empty. And if the repository entities are all messed up and require the complete re-catch-up,
it will and should take long. So the expectations are managed.

/**
* A mixin for the state of the {@linkplain CatchUpProcess catch-up process job}.
*/
public interface CatchUpMixin extends CatchUpOrBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface can be package-private, so that the framework users do not implement it. Also, please mark as a @GeneratedMixin.

* @return {@code true} if the message matches the job, {@code false} otherwise
*/
@VisibleForTesting
default boolean matches(InboxMessage message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not visible for testing. This is visible because of a lack of language expressiveness. I'd mark it as @Internal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed this annotation and marked the whole class as Internal.

It originally was public, as this interface was in the different package.

return timeComparison;
}
int versionComparison = Integer.compare(m1.getVersion(), m2.getVersion());
if(versionComparison != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a space.
I'd also consider using a Comparator.comparing(..) chain instead of implementing the interface directly.

.toSubscriber(message.getId());
}

private class EventEndpoint implements MessageEndpoint<ShardIndex, EventEnvelope> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make it final and document.

* <p>Depending on the status of the job, the matched messages are processed accordingly. See
* more on that below.
*
* <b>1. Catch-up {@code IN_PROGRESS}.</b>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please have these headers coming in separate paragraphs. Now they come as a part of a proceeding paragraph.

Copy link
Contributor

@alexander-yevsyukov alexander-yevsyukov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pushing it further per our comments. LGTM

@armiol armiol merged commit 31cbea4 into master Feb 18, 2020
@armiol armiol deleted the catch-up branch February 18, 2020 15:07
@armiol armiol mentioned this pull request Mar 8, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants