@author andrii dobroshynski
-
Implementation of a version of system proposed in the Calvin paper - a partitioned key-value store in Elixir, supporting asynchronous and Raft-based synchronous replication of transactional input, designed to run on arbitrary shared-nothing machines
-
Deterministic storage system - transaction client requests are agreed upon by designated components of the system and guaranteed to be executed by all replicas and corresponding partitions within each replica
-
mix test
runs all tests inapps/calvin/test
-
Some tests for specific behavior
-
test/test_storage_async.exs
- tests consistency ofStorage
components on replicas when using:async
replication of transactional input -
test/test_storage_raft.exs
- tests consistency ofStorage
components on replicas when using:raft
replication of transactional input -
test/test_transactions_multipartition.exs
- tests multipartition txns within a single replica, including multiple transactions ordered as 'earlier'/'later' than one another for testing serializability (transaction ordered by aSequencer
component as occuring 'later' sees the state with 'earlier' writes applied and 'earlier' transaction doing READs does not see later transaction's WRITEs)
-
-
Experiments can be run from
apps/calvin/experiments
withmix test experiments/<file>
Elixir script in balance_transfer.exs
executes simple 'balance transfer' multipartition transactions in form of
READ <- 'Alice', 'Zoe'
INVARIANT ('Alice' >= 10)
SET 'Alice' -= 10
SET 'Zoe' += 10
where the Alice
record is stored on partition 1 and Zoe
stored on partition 2, as the system does automatic partitioning of keys based on how many partitions are specified to a Configuration.new/2
via a PartitionScheme
. An equivalent transaction of balance transfer with a check to abort if the balance is insufficient in our implementation would be
tx = Transaction.new(_operations=[
Transaction.Op.invariant(
Transaction.Expression.new(:Alice, :>=, 10)
),
Transaction.Op.update(:Alice, Transaction.Expression.new(:Alice, :-, 10)),
Transaction.Op.update(:Zoe, Transaction.Expression.new(:Zoe, :+, 10))
])
Scripts in with_async.exs
and with_raft.exs
run experiments measuring execution latencies of txns with either :async
or :raft
replication mode
-
Multiple operations applied as an atomic unit, or none of them applied, storage system remains in consistent state
-
When having multiple replicas that communicate via message passing, how do we ensure they stay in sync? What to do if a replica accepts a transaction but fails just before execution? What about in the middle of transaction execution?
-
No fundamental reason why a transaction should abort amidst a nondeterministic event (machine failure or equivalent) - systems might do so from practical considerations
-
If can agree on input to the system prior to beginning of execution, then on failure and subsequent recovery can
- re-play input logged to disk or
- recover from a fellow replica that is guaranteed to be executing the exact same transactional input
-
The original Calvin paper proposes a layer to run against a non-transactional storage system to transform it into “highly available” DB system supporting ACID transactions
-
Splits time into epochs, transactions are batched by 'sequencers` which impose an order on transactions received
-
Focuses on throughput, proposing to use a specific locking mechanism to ensure transactions execute in parallel (efficiently) but in agreed upon order against the storage layer
- A working prototype of a replicated, partitioned key-value store based on the Calvin system
-
Elixir Components for
Sequencer
,Scheduler
,Storage
that run as independent RSMs -
Every transaction computes its own write and read sets, data automatically partitioned according to the keys in the operations that determine these sets
-
Replication mode specified as
:async
or:raft
, every Sequencer maintains a Raft state that it uses to manage Raft-based replication -
Why need Raft? Async replication = very difficult failover when primary / main fails, want to use quorum for commit
-
Transaction batches get forwarded to correct Schedulers at every epoch & interleaved
-
Each storage node only contains the data that it needs to based on the partitioning scheme - during transaction execution the system figures out
- Which transaction updates are “local” and which are not and executes accordingly
- Which elements of the read set are local vs. remote and gathers/reads both the local and remote sets from participating partitions
Chart 1 | Chart 2 |
---|---|
![]() |
![]() |
-
Supports launch of arbitrary number of replicas partitioned against an arbitrary number of nodes per replica
-
Lots of Elixir tests to make sure expected & consistent state & more in code!
-
Empty batches need to be replicated by design leading to halts with network delays when delay >> epoch time - ways to optimize?
-
Load balancing client requests?
- Elixir documentation - especially Enum, List
- Calvin Paper - Thomson et al. '12
- Different print of Calvin paper - Fast Distributed Transactions and Strongly Consistent Replication for OLTP Database Systems
- Paper preceding Calvin by same authors - Thomson, Abadi VLDB '10
- Paper with detailed analysis of the Calvin deterministic db system pros/cons implemented by same authors - Ren, Thomson, Abadi VLDB '14
- Reading on transactions, recovery - Concurrency Control and Recovery
- Reading on atomicity, locking - Principles of Computer System Design, Chapter 9