-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Projection Catch-up #1221
Projection Catch-up #1221
Changes from 133 commits
db495e2
5c9e3cb
1a9d66e
f86a3d9
7db54c1
d1937ec
cbc68f9
9e554e8
6b87287
aa71c5e
8e2a5a1
cbd2b9d
c6872b2
701cedd
872a1cb
06ad4b1
bd578e3
716b7a7
5744159
513dcd7
13a28aa
3bff358
23a255c
3750501
f20ef8a
64c3611
88e923b
0767a2c
6ee3c1d
43b02ed
702ee9b
9b7f9e3
bb8056d
1fc7e49
f4bd902
7a89bda
80cae77
dfe24a5
7507056
1ef67ef
ae29aaa
f5fe3d8
f07c016
830535f
046d763
ffe7b11
cde9d23
63680aa
305ac72
16fdfaf
d9aa93b
309223a
89b66bb
e69c987
0ddba80
a7cab0a
57ca15f
d129803
9bbef99
e9159e4
87aa7db
ff5bdc9
97e15f2
419619a
2b65067
b9993ab
53eda0e
e199f4e
e5f08cf
87e2d78
4356cbc
302f3ca
e53141a
ced3d3b
0e11980
61598c0
aec9063
6379d7a
bac4c97
16c2f9a
075b8c3
04621c8
71a59ee
48080ca
8a51af9
22e7e92
a094722
3d86350
f1135a6
ed832d0
69d283e
d5a9ec3
686b333
d3d35e4
c96e52c
e064c2e
4d727c1
2f30a99
cb3798f
8a95e6e
1f1a120
c1c82ae
e387285
e9697ac
cba4400
1b5886c
95936cb
427a3c9
000f957
bbde7a5
9581d17
c761145
611c753
8423bf4
2efeb19
d458fdf
639fab6
7076689
688dd41
68961f3
6e976cd
4a2e282
7c7cf68
193ae5a
c81bce4
52f626d
73662ad
625a249
0d1e0e2
9291ba1
0c9c551
c2d8c22
6c9626d
17379ee
aa9645b
047204c
e2db653
ff37737
dbc6154
ae8ca67
5355eb2
8214169
722f06c
f888edc
d5c7f17
ea470a7
e0bddd6
026ca8a
f70dd32
b09ec01
60a9285
21be397
57e81c2
2ee8e21
56fbf4f
09345a5
ebce0fe
0245ba7
cfb08e1
98ca9e3
f8335cb
ffd59ef
0b05474
d37a2a0
14648bf
6b2da5a
59543f3
ad34026
6ed7b26
97eb723
f871c71
6f70356
44e2a02
b39842c
1d45917
39f5cdd
ee39fc9
432e01f
f0e76a1
0abcea8
b357b56
4ddedf7
008e4f9
d1b2921
2bf59b9
0f477ca
f8b708e
8318725
6062ee8
015642c
699357d
a2e2c34
c37356a
4e36865
78a0180
85c30d6
2c46776
8846cab
8dbb2b1
2e2c105
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import io.spine.server.entity.Repository; | ||
import io.spine.server.entity.model.EntityClass; | ||
import io.spine.type.TypeName; | ||
import io.spine.type.TypeUrl; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
@@ -107,6 +108,12 @@ Optional<Repository> repositoryFor(Class<? extends EntityState> stateClass) { | |
return repositoryAccess.get(); | ||
} | ||
|
||
Optional<Repository> repositoryFor(TypeUrl stateType) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please document. |
||
checkNotNull(stateType); | ||
RepositoryAccess repositoryAccess = findOrThrow(stateType); | ||
return repositoryAccess.get(); | ||
} | ||
|
||
private RepositoryAccess findOrThrow(Class<? extends EntityState> stateClass) { | ||
RepositoryAccess repository = repositories.get(stateClass); | ||
if (repository == null) { | ||
|
@@ -118,6 +125,22 @@ private RepositoryAccess findOrThrow(Class<? extends EntityState> stateClass) { | |
return repository; | ||
} | ||
|
||
private RepositoryAccess findOrThrow(TypeUrl stateType) { | ||
Optional<RepositoryAccess> result = | ||
repositories.values() | ||
.stream() | ||
.filter((r) -> r.repository.entityStateType() | ||
.equals(stateType)) | ||
.findFirst(); | ||
if (!result.isPresent()) { | ||
throw newIllegalStateException( | ||
"A repository for the state type `%s` is not registered.", | ||
stateType | ||
); | ||
} | ||
return result.get(); | ||
} | ||
|
||
/** | ||
* Obtains a set of entity type names by their visibility. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright 2019, TeamDev. All rights reserved. | ||
* | ||
* Redistribution and use in source and/or binary forms, with or without | ||
* modification, must retain the above copyright notice and the following | ||
* disclaimer. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
*/ | ||
|
||
/** | ||
* A package defining the catch-up related commands and events. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I cannot find any generated code in this package. Is this really used? |
||
*/ | ||
@ParametersAreNonnullByDefault | ||
@CheckReturnValue | ||
package io.spine.server.catchup; | ||
|
||
import com.google.errorprone.annotations.CheckReturnValue; | ||
|
||
import javax.annotation.ParametersAreNonnullByDefault; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright 2020, TeamDev. All rights reserved. | ||
* | ||
* Redistribution and use in source and/or binary forms, with or without | ||
* modification, must retain the above copyright notice and the following | ||
* disclaimer. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
*/ | ||
|
||
package io.spine.server.delivery; | ||
|
||
import com.google.common.base.Joiner; | ||
import com.google.common.collect.ImmutableSet; | ||
import io.spine.type.TypeUrl; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
import java.util.Set; | ||
|
||
/** | ||
* An exception telling that the projection catch-up cannot be started, since some of the requested | ||
* entities are already catching up. | ||
*/ | ||
public final class CatchUpAlreadyStartedException extends IllegalStateException { | ||
|
||
private static final long serialVersionUID = 0L; | ||
|
||
private final TypeUrl projectionStateType; | ||
private final @Nullable ImmutableSet<Object> requestedIds; | ||
|
||
CatchUpAlreadyStartedException(TypeUrl type, @Nullable Set<?> ids) { | ||
super(); | ||
projectionStateType = type; | ||
requestedIds = ids == null ? null : ImmutableSet.copyOf(ids); | ||
} | ||
|
||
@Override | ||
public String getMessage() { | ||
String message = String.format( | ||
"Cannot start the catch-up for the `%s` Projection, `%s`. " + | ||
"Another catch-up is already in progress.", | ||
projectionStateType, targetsAsString()); | ||
return message; | ||
} | ||
|
||
/** | ||
* Returns the type URL of the projection for which the catch-up was requested. | ||
*/ | ||
public TypeUrl projectionStateType() { | ||
return projectionStateType; | ||
} | ||
|
||
/** | ||
* Returns the IDs of the targets which were asked to catch up. | ||
* | ||
* <p>If all the projection entities were specified as a target, returns an empty {@code Set}. | ||
*/ | ||
public ImmutableSet<Object> requestedIds() { | ||
return requestedIds == null? ImmutableSet.of() : requestedIds; | ||
} | ||
|
||
private String targetsAsString() { | ||
if (requestedIds == null) { | ||
return "[all instances]"; | ||
} | ||
return Joiner.on(',') | ||
.join(requestedIds); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
/* | ||
* Copyright 2020, TeamDev. All rights reserved. | ||
* | ||
* Redistribution and use in source and/or binary forms, with or without | ||
* modification, must retain the above copyright notice and the following | ||
* disclaimer. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
*/ | ||
|
||
package io.spine.server.delivery; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import com.google.protobuf.ProtocolStringList; | ||
import io.spine.server.delivery.event.CatchUpCompleted; | ||
import io.spine.server.delivery.event.CatchUpStarted; | ||
import io.spine.server.delivery.event.HistoryEventsRecalled; | ||
import io.spine.server.delivery.event.HistoryFullyRecalled; | ||
import io.spine.server.delivery.event.LiveEventsPickedUp; | ||
import io.spine.server.delivery.event.ShardProcessingRequested; | ||
import io.spine.server.event.EventFilter; | ||
import io.spine.server.event.EventStreamQuery; | ||
|
||
import static com.google.common.base.Preconditions.checkNotNull; | ||
import static com.google.common.collect.ImmutableList.toImmutableList; | ||
|
||
/** | ||
* A utility class producing the messages related to the {@code CatchUp} process. | ||
*/ | ||
final class CatchUpMessages { | ||
|
||
private CatchUpMessages() { | ||
} | ||
|
||
/** | ||
* Creates a {@code CatchUpStarted} event messages with the specified ID. | ||
*/ | ||
static CatchUpStarted started(CatchUpId id) { | ||
checkNotNull(id); | ||
return CatchUpStarted.newBuilder() | ||
.setId(id) | ||
.vBuild(); | ||
} | ||
|
||
/** | ||
* Creates a limit to be used in quering the {@link io.spine.server.event.EventStore | ||
* EventStore}. | ||
*/ | ||
static EventStreamQuery.Limit limitOf(int value) { | ||
return EventStreamQuery.Limit.newBuilder() | ||
.setValue(value) | ||
.vBuild(); | ||
} | ||
|
||
/** | ||
* Creates a {@code HistoryEventsRecalled} event messages with the specified ID. | ||
*/ | ||
static HistoryEventsRecalled recalled(CatchUpId id) { | ||
checkNotNull(id); | ||
return HistoryEventsRecalled.newBuilder() | ||
.setId(id) | ||
.vBuild(); | ||
} | ||
|
||
/** | ||
* Creates a {@code HistoryFullyRecalled} event messages with the specified ID. | ||
*/ | ||
static HistoryFullyRecalled fullyRecalled(CatchUpId id) { | ||
checkNotNull(id); | ||
return HistoryFullyRecalled.newBuilder() | ||
.setId(id) | ||
.vBuild(); | ||
} | ||
|
||
/** | ||
* Wraps the passed list of {@code String}s into a list of {@code EventFilter}s. | ||
*/ | ||
static ImmutableList<EventFilter> toFilters(ProtocolStringList rawEventTypes) { | ||
checkNotNull(rawEventTypes); | ||
return rawEventTypes.stream() | ||
.map(type -> EventFilter | ||
.newBuilder() | ||
.setEventType(type) | ||
.build()) | ||
.collect(toImmutableList()); | ||
} | ||
|
||
/** | ||
* Creates a {@code LiveEventsPickedUp} event messages with the specified ID. | ||
*/ | ||
static LiveEventsPickedUp liveEventsPickedUp(CatchUpId id) { | ||
checkNotNull(id); | ||
return LiveEventsPickedUp.newBuilder() | ||
.setId(id) | ||
.vBuild(); | ||
} | ||
|
||
/** | ||
* Creates a {@code CatchUpCompleted} event messages with the specified ID. | ||
*/ | ||
static CatchUpCompleted catchUpCompleted(CatchUpId id) { | ||
checkNotNull(id); | ||
return CatchUpCompleted.newBuilder() | ||
.setId(id) | ||
.vBuild(); | ||
} | ||
|
||
/** | ||
* Creates a {@code ShardProcessingRequested} event messages with the specified index. | ||
*/ | ||
static ShardProcessingRequested shardProcessingRequested(ShardIndex shardIndex) { | ||
checkNotNull(shardIndex); | ||
return ShardProcessingRequested | ||
.newBuilder() | ||
.setId(shardIndex) | ||
.vBuild(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Copyright 2020, TeamDev. All rights reserved. | ||
* | ||
* Redistribution and use in source and/or binary forms, with or without | ||
* modification, must retain the above copyright notice and the following | ||
* disclaimer. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
*/ | ||
|
||
package io.spine.server.delivery; | ||
|
||
/** | ||
* A mixin for the state of the {@linkplain CatchUp catch-up} {@linkplain CatchUpProcess processes}. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd go with only one link. Probably, to the message itself. |
||
*/ | ||
public interface CatchUpMixin { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this empty mixin? |
||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document.