Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for "Dynamic Consistency Boundary" (DCB) #166

Open
johanhaleby opened this issue Oct 25, 2024 · 5 comments
Open

Add support for "Dynamic Consistency Boundary" (DCB) #166

johanhaleby opened this issue Oct 25, 2024 · 5 comments

Comments

@johanhaleby
Copy link
Owner

johanhaleby commented Oct 25, 2024

The goal of this issue is to support the approach discussed in various talks such as this.

First of all, WriteCondition should be renamed and a "condition" method should be added:

ConsistencyCondition {
	fun streamVersionEq(...)
	fun condition(Filter filer) // It's the filter that we specify types etc, like Filter.type(in("A", "B"))
}

Actual Example

A "User" has a name and an address. Atm, there are no invariants between name and address, so we decide to create two deciders/aggregates/functions for the User, one that deals with "name changes" (i.e. one that is interested in "name" related events), and another that deals with "address changes" (i.e. one that is interested in "address" related events).

Now let's say we have a stream for this User at stream1 that contains the following events:

stream1: NameDefined, AddressDefined, AddressChanged, NameChanged, AddressRemoved

If the user wants to change his/her name again, this is how it can be done without taking the address events into account:

val eventStream = eventStore.read("stream1", condition(Filter.type(in("NameDefined", "NameChanged", "NameRemoved"))))
val newEvents = NameAggregate.changeName(eventStream.events(), "John")
val eventStore.write("stream1", eventStream.consistencyCondition(), newEvents);

The EventStream returned by read looks something like this today:

public interface EventStream<T> {
    String id();
    long version();
    Stream<T> events();
}

This should be changed into something like this:

public interface EventStream<T> {
    String id();
    long version();
    String consistencyTag;
    ConsistencyCondition consistencyCondition();
    Stream<T> events();
}

Now, in the ES impl we can no longer just check that the streamVersion is equal to the stream version in the event store when writing new events. Instead, before returning the EventStream we need to calculate the consistencyTag (mongodb example):

db.events.aggregate([
    {
        $match: { /* "streamId + consistencyCondition" query */ }
    },
    {
        $project: { id: 1 }
    },
   {
       $sort: { id: 1 } // Ensure deterministic order
    },
    {
        $group: {
            _id: null,
            consistencyTag: { $md5: { $toString: "$id" } }
        }
    }
]);

This will generate a unique consistencyTag per write. Note that we don't store the consistency tag in the database, I don't think that is required, it's an in-memory thing and it's calculated on the fly. So, instead of reading the "stream version" and comparing it to the supplied stream version of the WriteCondition, we can just check if the result of the "consistencyTag" matches. This means that we're also backward compatible, because the default streamVersionEq condition will just yield "streamId+streamVersion" consistency condition.

Now, if the business rule changes, and you cannot change the name to "John Doe" if the address is "Address street", we can change the "consistency condition" to include more events (or the entire stream like before).

Note 1

While the above (hopefully) solves the case where you have smaller "logical aggregates" than the number of different event types you have in the stream, it doesn't solve the reverse problem. For example, if you represent the "User" in two different streams, one for name and one for address:

user1:name: NameDefined, NameChanged
user1:address: AddressDefined, AddressChanged, AddressRemoved

To solve this case, if the new business rule is applied, the consistencyCondition must span multiple streams so the EventStream model above needs to be changed. Maybe to this:

public interface EventStream<T> {
    List<StreamInfo> streams();
    String consistencyTag;
    ConsistencyCondition consistencyCondition();
    Stream<T> events();
}

where StreamInfo contains "streamId" and "version" for each involved stream. Then we can we can perform the use case like this:

val eventStream = eventStore.read(streamId(in("user1:name", "user1:address").and(type(in("NameDefined", "NameChanged", "NameRemoved", "AddressDefined", "AddressChanged", "AddressRemoved"))))
val newEvents = NameAggregate.changeName(eventStream.events(), "John")
eventStore.write("user1:name", eventStream.consistencyCondition(), newEvents);

Note the subtle difference on the "read" method here. We don't pass a "streamId" explicitly, but only a ConsistencyCondition. This means that read(<streamId>, <consistencyCondition>) will just do:

read(streamId(<streamId>).and(<consistencyCondition>))

Another easier and arguably better way (if only these two streams exist for a User), would be to change the consistency condition to make use of a "subject" filter:

val eventStream = eventStore.read(subject("user1"))
val newEvents = NameAggregate.changeName(eventStream.events(), "John")
val eventStore.write("user1:name", eventStream.consistencyCondition(), newEvents);

Note 2

A cool thing if we implement what's implied by "Note 1" is that we can have consistency conditions that selectively pick explicit events from different streams like this:

Filter nameChangesForUser1 = stream("user1:name").and(type("NameChanged"))
Filter adressChangesForUser1 = stream("user1:address").and(type("AddressChanged"))
val eventStream = eventStore.read(nameChangesForUser1.and(adressChangesForUser1))

Note 3

We should NOT replace "version" with the consistency tag. This won't be good because then projections have no way to compare etags "in time" (i.e. now you can check if a version is greater than another version).

Note 4

Consistency conditions will only be performant if correct indexes are added.

Note 5

Maybe it would be a good idea to include some sort of "global time" when a consistency tag is generated:

db.events.aggregate([
    {
        $match: { /* "consistencyCondition" query */ }
    },
    {
        $project: { id: 1 }
    },
    {
        $group: {
            _id: null,
            consistencyTag: { $md5: { $toString: "$id" } }, 
            timestamp: { $first: "$$NOW" }
        }
    }
]);

Then the EventStream could contain a ConsistencyTag object:

ConsistencyTag(String tag, ZonedDateTime globalTimestamp)

Idea: Could this replace the "stream version" altogether? For example, instead of doing:

val eventStream = eventStore.read("streamId")
..
eventStore.write("streamId", eventStream.version(), newEvents);

you could do:

val eventStream = eventStore.read("streamId")
..
eventStore.write("streamId", eventStream.consistencyTag(), newEvents);

The "consistencyTag" here is constructed from the mongo aggregation above, with an (implicitly created) "consistency condition" of "streamId".

Maybe it would be a good idea to write the consistencyTag, including the timestamp instead of the version. This way you would have a global position as well!? (it'll fail as soon as the time is changed on the server though which is bad).

Note 6

This is a variant of "Note 5", but we instead combine the md5 checksum + timestamp into the "consistency tag":

db.events.aggregate([
    {
        $match: { /* "consistencyCondition" query */ }
    },
    {
        $project: { id: 1 }
    },
    {
        $group: {
            _id: null,
            checksum: { $md5: { $toString: "$id" } }, 
            timestamp: { $first: "$$NOW" }
        }
    },
    {
        $project: {
            rawConsistencyTag: {
                $concat: [
                    "$checksum",
                    ":",
                    { $dateToString: { format: "%Y-%m-%dT%H:%M:%S.%LZ", date: "$timestamp" } }
                ]
            }
        }
    }
]);

Then in the code we could Base64 encode the rawConsistencyTag so that it's possible to easily just send it in HTTP query params etc (this cannot be done in MongoDB). The ConsistencyTag object in EventStream may look like this:

// consistencyTag is Base64 encoded
record ConsistencyTag(String consistencyTag) {
       String checksum() {
            return Base64.decode(consistencyTag).substringBefore(":");
       }

       ZonedDateTime timestamp() {
                  var timestampString = Base64.decode(consistencyTag).substringAfter(":")
                 return someDateTimeFormatter.parse(timestampString);
       }

      boolean isAfter(ConsistencyTag ct) { ... }
      boolean isBefore(ConsistencyTag ct) { ... }
}

The cool thing with this is that projections (or whoever else) that need to check if one ConsistencyTag is before or after another can easily do so (and it works globally).

@johanhaleby johanhaleby changed the title Add support for "killing the aggregate"-style "aggreates" Add support for "Dynamic Consistency Boundary" (DCB) Oct 25, 2024
@johanhaleby
Copy link
Owner Author

Also note that if we're to generate a consistencyTag in the db, it should be based on "id+source".

Also, how well does the consistencyTag calculation do for large streams?

@johanhaleby
Copy link
Owner Author

We must also consider backward compatibility with old versions that uses "stream version"

@johanhaleby
Copy link
Owner Author

johanhaleby commented Nov 15, 2024

After talking to chatgpt (🙇‍♂️), we came up with this approach:

db.events.aggregate([
    // Step 1: Match - Filter events based on the "consistencyCondition"
    // Purpose: This ensures only the relevant events for the current stream or boundary are included.
    {
        $match: {
            /* "consistencyCondition" query */
        }
    },

    // Step 2: Sort - Ensure a deterministic order of events
    // Purpose: Sorting by ID ensures the MD5 computation remains consistent regardless of the underlying order of documents in MongoDB.
    {
        $sort: { id: 1 }
    },

    // Step 3: Project - Extract only the necessary fields
    // Purpose: This minimizes the data processed in later stages, improving performance.
    {
        $project: { _id: 0, id: 1, eventTimestamp: 1 }
    },

    // Step 4: Group - Calculate the MD5 checksum and the latest event timestamp
    // Purpose:
    // - The MD5 checksum serves as the core for optimistic locking.
    // - Using $max ensures the timestamp represents the most recent event in the stream, 
    //   aligning with subscription replay logic.
    {
        $group: {
            _id: null,
            checksum: { 
                $md5: { $reduce: {
                    input: "$id", // Combine all IDs into a single string
                    initialValue: "",
                    in: { $concat: ["$$value", "$$this"] } // Concatenate IDs in order
                }}
            },
            timestamp: { $max: "$eventTimestamp" } // Capture the latest event timestamp
        }
    },

    // Step 5: Project - Format the consistency tag
    // Purpose: Combine the MD5 checksum and the timestamp 
    {
        $project: {
            rawConsistencyTag: {
                $concat: [
                    "$checksum", // The MD5 checksum of the stream's state
                    ":md5:", // Separtor that indicates which algorithm that is used to calculate the checksum
                    { 
                        $dateToString: { 
                            format: "%Y-%m-%dT%H:%M:%S.%LZ", // ISO 8601 format
                            date: "$timestamp" // Include the latest event timestamp
                        }
                    }
                ]
            }
        }
    }
]);

A nice thing with { $max: "$eventTimestamp" } is that we'll use the logical time of the events instead of relying on the wallclock time of the server (and it aligns with catchup subscription logic which uses eventTime).

@johanhaleby
Copy link
Owner Author

Solves unique email constraint: eventType = 'AccountSignedUp' AND tags CONTAIN 'username:$username'

@johanhaleby
Copy link
Owner Author

Idea: If we have a globalposition, we could return the global poisition of the last written event as the "consistency marker" (see this ES impl). We can then use a query like this:

{ $and : [ {"globalposition" : {$gt : <consistencyMarker> }}, {$not : { $match: <criteria>}}] }

To determine if we can write the events within a transaction.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant