From 5cad2cb7056eb2651436d3fe778edeed199e82db Mon Sep 17 00:00:00 2001 From: Edsko de Vries Date: Tue, 28 Jan 2020 16:24:00 +0100 Subject: [PATCH] Better ledger DB policy Closes #1456 Closes #1264 --- .../src/Ouroboros/Storage/ChainDB/Impl.hs | 4 +- .../Ouroboros/Storage/ChainDB/Impl/Args.hs | 2 +- .../Storage/ChainDB/Impl/Background.hs | 129 +++++++++++------- .../Ouroboros/Storage/ChainDB/Impl/LgrDB.hs | 40 ++++-- .../Ouroboros/Storage/ChainDB/Impl/Reopen.hs | 7 +- .../Ouroboros/Storage/ChainDB/Impl/Types.hs | 3 +- .../Ouroboros/Storage/LedgerDB/DiskPolicy.hs | 81 +++++++---- .../src/Ouroboros/Storage/LedgerDB/OnDisk.hs | 32 +++-- .../Test/Ouroboros/Storage/LedgerDB/OnDisk.hs | 19 +-- 9 files changed, 191 insertions(+), 126 deletions(-) diff --git a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl.hs b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl.hs index b0de9215d99..9c22547c7fe 100644 --- a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl.hs +++ b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl.hs @@ -108,7 +108,7 @@ openDBInternal args launchBgTasks = do (Args.cdbEpochInfo args) immDbTipPoint (contramap TraceLedgerReplayEvent tracer) - lgrDB <- LgrDB.openDB argsLgrDb + (lgrDB, replayed) <- LgrDB.openDB argsLgrDb lgrReplayTracer immDB (Query.getAnyKnownBlock immDB volDB) @@ -198,7 +198,7 @@ openDBInternal args launchBgTasks = do , _chainTip = castPoint $ AF.headPoint chain } - when launchBgTasks $ Background.launchBgTasks env + when launchBgTasks $ Background.launchBgTasks env replayed return (chainDB, testing) where diff --git a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Args.hs b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Args.hs index f726f1ef82f..6e5e0217ee8 100644 --- a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Args.hs +++ b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Args.hs @@ -83,7 +83,7 @@ data ChainDbArgs m blk = forall h1 h2 h3. ChainDbArgs { , cdbValidation :: ImmDB.ValidationPolicy , cdbBlocksPerFile :: Int , cdbParamsLgrDB :: LgrDB.LedgerDbParams - , cdbDiskPolicy :: LgrDB.DiskPolicy m + , cdbDiskPolicy :: LgrDB.DiskPolicy -- Integration , cdbNodeConfig :: NodeConfig (BlockProtocol blk) diff --git a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Background.hs b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Background.hs index 83f937eb8a9..20a685686a0 100644 --- a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Background.hs +++ b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Background.hs @@ -16,7 +16,8 @@ module Ouroboros.Storage.ChainDB.Impl.Background launchBgTasks -- * Copying blocks from the VolatileDB to the ImmutableDB , copyToImmDB - , copyToImmDBRunner + , copyAndSnapshotRunner + , updateLedgerSnapshots -- * Executing garbage collection , garbageCollect -- * Scheduling garbage collections @@ -25,8 +26,6 @@ module Ouroboros.Storage.ChainDB.Impl.Background , scheduleGC , gcScheduleRunner -- * Taking and trimming ledger snapshots - , updateLedgerSnapshots - , updateLedgerSnapshotsRunner -- * Executing scheduled chain selections , scheduledChainSelection , scheduledChainSelectionRunner @@ -38,9 +37,11 @@ import qualified Data.List.NonEmpty as NE import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe) import Data.Void (Void) +import Data.Word import GHC.Stack (HasCallStack) import Control.Monad.Class.MonadThrow +import Control.Monad.Class.MonadTime import Control.Tracer import Ouroboros.Network.AnchoredFragment (AnchoredFragment (..)) @@ -71,18 +72,17 @@ import qualified Ouroboros.Storage.ChainDB.Impl.VolDB as VolDB launchBgTasks :: forall m blk. (IOLike m, ProtocolLedgerView blk) => ChainDbEnv m blk + -> Word64 -- ^ Number of immutable blocks replayed on ledger DB startup -> m () -launchBgTasks cdb@CDB{..} = do +launchBgTasks cdb@CDB{..} replayed = do gcSchedule <- newGcSchedule - !gcThread <- launch $ + !gcThread <- launch $ gcScheduleRunner gcSchedule $ garbageCollect cdb - !copyThread <- launch $ - copyToImmDBRunner cdb gcSchedule - !lgrSnapshotThread <- launch $ - updateLedgerSnapshotsRunner cdb + !copyAndSnapshotThread <- launch $ + copyAndSnapshotRunner cdb gcSchedule replayed !chainSyncThread <- scheduledChainSelectionRunner cdb atomically $ writeTVar cdbKillBgThreads $ - sequence_ [gcThread, copyThread, lgrSnapshotThread, chainSyncThread] + sequence_ [gcThread, copyAndSnapshotThread, chainSyncThread] where launch :: m Void -> m (m ()) launch = fmap cancelThread . forkLinkedThread cdbRegistry @@ -186,12 +186,29 @@ copyToImmDB CDB{..} = withCopyLock $ do mustBeUnlocked = fromMaybe $ error "copyToImmDB running concurrently with itself" --- | Watches the current chain for changes. Whenever the chain is longer than --- @k@, then the headers older than @k@ are copied from the VolatileDB to the --- ImmutableDB (with 'copyToImmDB'). Afterwards, a garbage collection of the --- VolatileDB is scheduled using ('scheduleGC') for the 'SlotNo' of the most --- recent block that was copied. -copyToImmDBRunner +-- | Copy blocks from the VolDB to ImmDB and take snapshots of the LgrDB +-- +-- We watch the chain for changes. Whenever the chain is longer than @k@, then +-- the headers older than @k@ are copied from the VolDB to the ImmDB (using +-- 'copyToImmDB'). Once that is complete, +-- +-- * We periodically take a snapshot of the LgrDB (depending on its config). +-- NOTE: This implies we do not take a snapshot of the LgrDB if the chain +-- hasn't changed, irrespective of the LgrDB policy. +-- * Schedule GC of the VolDB ('scheduleGC') for the 'SlotNo' of the most +-- recent block that was copied. +-- +-- It is important that we only take LgrDB snapshots when are are /sure/ they +-- have been copied to the ImmDB, since the LgrDB assumes that all snapshots +-- correspond to immutable blocks. (Of course, data corruption can occur and we +-- can handle it by reverting to an older LgrDB snapshot, but we should need +-- this only in exceptional circumstances.) +-- +-- We do not store any state of the VolDB GC. If the node shuts down before GC +-- can happen, when we restart the node and schedule the /next/ GC, it will +-- /imply/ any previously scheduled GC, since GC is driven by slot number +-- ("garbage collect anything older than @x@"). +copyAndSnapshotRunner :: forall m blk. ( IOLike m , OuroborosTag (BlockProtocol blk) @@ -201,18 +218,54 @@ copyToImmDBRunner ) => ChainDbEnv m blk -> GcSchedule m + -> Word64 -- ^ Number of immutable blocks replayed on ledger DB startup -> m Void -copyToImmDBRunner cdb@CDB{..} gcSchedule = forever $ do - atomically $ do - curChain <- readTVar cdbChain - check $ fromIntegral (AF.length curChain) > k - - mSlotNo <- copyToImmDB cdb - case mSlotNo of - Origin -> pure () - At slotNo -> scheduleGC (contramap TraceGCEvent cdbTracer) slotNo cdbGcDelay gcSchedule +copyAndSnapshotRunner cdb@CDB{..} gcSchedule = + loop Nothing where - SecurityParam k = protocolSecurityParam cdbNodeConfig + SecurityParam k = protocolSecurityParam cdbNodeConfig + LgrDB.DiskPolicy{..} = LgrDB.getDiskPolicy cdbLgrDB + + loop :: Maybe Time -> Word64 -> m Void + loop mPrevSnapshot distance = do + -- Wait for the chain to grow larger than @k@ + numToWrite <- atomically $ do + curChain <- readTVar cdbChain + check $ fromIntegral (AF.length curChain) > k + return $ fromIntegral (AF.length curChain) - k + + -- Copy blocks to imm DB + -- + -- This is a synchronous operation: when it returns, the blocks have been + -- copied to disk (though not flushed, necessarily). + copyToImmDB cdb >>= scheduleGC' + + now <- getMonotonicTime + let distance' = distance + numToWrite + elapsed = (\prev -> now `diffTime` prev) <$> mPrevSnapshot + + if onDiskShouldTakeSnapshot elapsed distance' then do + updateLedgerSnapshots cdb + loop (Just now) 0 + else + loop mPrevSnapshot distance' + + scheduleGC' :: WithOrigin SlotNo -> m () + scheduleGC' Origin = return () + scheduleGC' (At slotNo) = + scheduleGC + (contramap TraceGCEvent cdbTracer) + slotNo + cdbGcDelay + gcSchedule + +-- | Write a snapshot of the LedgerDB to disk and remove old snapshots +-- (typically one) so that only 'onDiskNumSnapshots' snapshots are on disk. +updateLedgerSnapshots :: IOLike m => ChainDbEnv m blk -> m () +updateLedgerSnapshots CDB{..} = do + -- TODO avoid taking multiple snapshots corresponding to the same tip. + void $ LgrDB.takeSnapshot cdbLgrDB + void $ LgrDB.trimSnapshots cdbLgrDB {------------------------------------------------------------------------------- Executing garbage collection @@ -313,30 +366,6 @@ gcScheduleRunner (GcSchedule queue) runGc = forever $ do -- Garbage collection is called synchronously runGc slotNo -{------------------------------------------------------------------------------- - Taking and trimming ledger snapshots --------------------------------------------------------------------------------} - --- | Write a snapshot of the LedgerDB to disk and remove old snapshots --- (typically one) so that only 'onDiskNumSnapshots' snapshots are on disk. -updateLedgerSnapshots :: IOLike m => ChainDbEnv m blk -> m () -updateLedgerSnapshots CDB{..} = do - -- TODO avoid taking multiple snapshots corresponding to the same tip. - void $ LgrDB.takeSnapshot cdbLgrDB - void $ LgrDB.trimSnapshots cdbLgrDB - --- | Execute 'updateLedgerSnapshots', wait 'onDiskWriteInterval', and repeat. -updateLedgerSnapshotsRunner :: IOLike m => ChainDbEnv m blk -> m Void -updateLedgerSnapshotsRunner cdb@CDB{..} = loop - where - LgrDB.DiskPolicy{..} = LgrDB.getDiskPolicy cdbLgrDB - - loop = updateLedgerSnapshots cdb >> waitInterval >> loop - - waitInterval = do - interval <- atomically onDiskWriteInterval - threadDelay interval - {------------------------------------------------------------------------------- Executing scheduled chain selections -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/LgrDB.hs b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/LgrDB.hs index b66cab9a3ec..027499049ad 100644 --- a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/LgrDB.hs +++ b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/LgrDB.hs @@ -151,7 +151,7 @@ data LgrDbArgs m blk = forall h. LgrDbArgs { , lgrEncodeChainState :: ChainState (BlockProtocol blk) -> Encoding , lgrEncodeHash :: HeaderHash blk -> Encoding , lgrParams :: LedgerDbParams - , lgrDiskPolicy :: DiskPolicy m + , lgrDiskPolicy :: DiskPolicy , lgrGenesis :: m (ExtLedgerState blk) , lgrTracer :: Tracer m (TraceEvent (Point blk)) , lgrTraceLedger :: Tracer m (LedgerDB blk) @@ -190,6 +190,9 @@ defaultArgs fp = LgrDbArgs { } -- | Open the ledger DB +-- +-- In addition to the ledger DB also returns the number of immutable blocks +-- that were replayed. openDB :: forall m blk. (IOLike m, ProtocolLedgerView blk) => LgrDbArgs m blk -- ^ Stateless initializaton arguments @@ -208,18 +211,21 @@ openDB :: forall m blk. (IOLike m, ProtocolLedgerView blk) -- -- The block may be in the immutable DB or in the volatile DB; the ledger -- DB does not know where the boundary is at any given point. - -> m (LgrDB m blk) + -> m (LgrDB m blk, Word64) openDB args@LgrDbArgs{..} replayTracer immDB getBlock = do createDirectoryIfMissing lgrHasFS True (mkFsPath []) - db <- initFromDisk args replayTracer lgrDbConf immDB + (db, replayed) <- initFromDisk args replayTracer lgrDbConf immDB (varDB, varPrevApplied) <- (,) <$> newTVarM db <*> newTVarM Set.empty - return LgrDB { - conf = lgrDbConf - , varDB = varDB - , varPrevApplied = varPrevApplied - , args = args - } + return ( + LgrDB { + conf = lgrDbConf + , varDB = varDB + , varPrevApplied = varPrevApplied + , args = args + } + , replayed + ) where apply :: blk -> ExtLedgerState blk @@ -240,23 +246,27 @@ openDB args@LgrDbArgs{..} replayTracer immDB getBlock = do , ldbConfResolve = getBlock } +-- | Reopen the ledger DB +-- +-- Returns the number of immutable blocks replayed. reopen :: (IOLike m, ProtocolLedgerView blk, HasCallStack) => LgrDB m blk -> ImmDB m blk -> Tracer m (TraceReplayEvent (Point blk) () (Point blk)) - -> m () + -> m Word64 reopen LgrDB{..} immDB replayTracer = do - db <- initFromDisk args replayTracer conf immDB + (db, replayed) <- initFromDisk args replayTracer conf immDB atomically $ writeTVar varDB db + return replayed initFromDisk :: (IOLike m, HasHeader blk, HasCallStack) => LgrDbArgs m blk -> Tracer m (TraceReplayEvent (Point blk) () (Point blk)) -> Conf m blk -> ImmDB m blk - -> m (LedgerDB blk) + -> m (LedgerDB blk, Word64) initFromDisk args@LgrDbArgs{..} replayTracer lgrDbConf immDB = wrapFailure args $ do - (_initLog, db) <- + (_initLog, db, replayed) <- LedgerDB.initLedgerDB replayTracer lgrTracer @@ -266,7 +276,7 @@ initFromDisk args@LgrDbArgs{..} replayTracer lgrDbConf immDB = wrapFailure args lgrParams lgrDbConf (streamAPI immDB) - return db + return (db, replayed) {------------------------------------------------------------------------------- TraceReplayEvent decorator @@ -347,7 +357,7 @@ trimSnapshots :: MonadThrow m => LgrDB m blk -> m [DiskSnapshot] trimSnapshots LgrDB{ args = args@LgrDbArgs{..} } = wrapFailure args $ LedgerDB.trimSnapshots lgrTracer lgrHasFS lgrDiskPolicy -getDiskPolicy :: LgrDB m blk -> DiskPolicy m +getDiskPolicy :: LgrDB m blk -> DiskPolicy getDiskPolicy LgrDB{ args = LgrDbArgs{..} } = lgrDiskPolicy {------------------------------------------------------------------------------- diff --git a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Reopen.hs b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Reopen.hs index 4ac46aad353..2e949f8a95f 100644 --- a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Reopen.hs +++ b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Reopen.hs @@ -75,9 +75,6 @@ closeDB (CDBHandle varState) = do killBgThreads <- atomically $ readTVar cdbKillBgThreads killBgThreads - -- Write a 'LedgerDB' snapshot so that we don't have to replay too many - -- blocks when restarting - Background.updateLedgerSnapshots cdb ImmDB.closeDB cdbImmDB VolDB.closeDB cdbVolDB @@ -144,7 +141,7 @@ reopen (CDBHandle varState) launchBgTasks = do cdbEpochInfo immDbTipPoint (contramap TraceLedgerReplayEvent cdbTracer) - LgrDB.reopen cdbLgrDB cdbImmDB lgrReplayTracer + replayed <- LgrDB.reopen cdbLgrDB cdbImmDB lgrReplayTracer traceWith cdbTracer $ TraceOpenEvent OpenedLgrDB curSlot <- atomically $ getCurrentSlot cdbBlockchainTime @@ -173,4 +170,4 @@ reopen (CDBHandle varState) launchBgTasks = do , _chainTip = castPoint $ AF.headPoint chain } - when launchBgTasks $ Background.launchBgTasks env + when launchBgTasks $ Background.launchBgTasks env replayed diff --git a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Types.hs b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Types.hs index 7b5278ec5fc..b808c5b9afe 100644 --- a/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Types.hs +++ b/ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Types.hs @@ -149,7 +149,8 @@ data ChainDbEnv m blk = CDB -- ^ Contains the current chain fragment. -- -- INVARIANT: the anchor point of this fragment is the tip of the - -- ImmutableDB. + -- ImmutableDB. This implies that this fragment never contains any blocks + -- that are stored in the immutable DB. -- -- Note that this fragment might be shorter than @k@ headers when the -- whole chain is shorter than @k@ or in case of corruption of the diff --git a/ouroboros-consensus/src/Ouroboros/Storage/LedgerDB/DiskPolicy.hs b/ouroboros-consensus/src/Ouroboros/Storage/LedgerDB/DiskPolicy.hs index 635c12b8220..9c0c1e959ab 100644 --- a/ouroboros-consensus/src/Ouroboros/Storage/LedgerDB/DiskPolicy.hs +++ b/ouroboros-consensus/src/Ouroboros/Storage/LedgerDB/DiskPolicy.hs @@ -1,19 +1,24 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE RecordWildCards #-} + module Ouroboros.Storage.LedgerDB.DiskPolicy ( DiskPolicy(..) , defaultDiskPolicy ) where -import Data.Time.Clock (DiffTime) +import Data.Word + +import Control.Monad.Class.MonadTime import Ouroboros.Consensus.Protocol.Abstract (SecurityParam (..)) -import Ouroboros.Consensus.Util.IOLike -- | On-disk policy -- -- We only write ledger states that are older than @k@ blocks to disk (that is, -- snapshots that are guaranteed valid). The on-disk policy determines how often -- we write to disk and how many checkpoints we keep. -data DiskPolicy m = DiskPolicy { +data DiskPolicy = DiskPolicy { -- | How many snapshots do we want to keep on disk? -- -- A higher number of on-disk snapshots is primarily a safe-guard against @@ -32,39 +37,57 @@ data DiskPolicy m = DiskPolicy { -- the next snapshot, we delete the oldest one, leaving the middle -- one available in case of truncation of the write. This is -- probably a sane value in most circumstances. - onDiskNumSnapshots :: Word + onDiskNumSnapshots :: Word - -- | How frequently do we want to write to disk? + -- | Should we write a snapshot of the ledger state to disk? -- - -- Specified as an STM transaction that gives the delay (in microsec) - -- between writes, allowing the delay to be varied on the fly if needed. + -- This function is passed two bits of information: -- - -- Writing snapshots more often means we have less work to do when - -- opening the database, but at the cost of doing more IO while the node - -- is running. + -- * The time since the last snapshot, or 'Nothing' if none was taken yet. + -- Note that 'Nothing' merely means no snapshot had been taking yet + -- since the node was started; it does not necessarily mean that none + -- exist on disk. -- - -- A sensible value might be the time interval corresponding to @k@ - -- blocks (12 hours), resulting in a gap between the most recent on disk - -- snapshot and the oldest in-memory snapshot between @k@ and @2k@ blocks. + -- * The distance in terms of blocks applied to the /oldest/ ledger + -- snapshot in memory. During normal operation, this is the number of + -- blocks written to the imm DB since the last snapshot. On startup, it + -- is computed by counting how many immutable blocks we had to reapply + -- to get to the chain tip. This is useful, as it allows the policy to + -- decide to take a snapshot /on node startup/ if a lot of blocks had to + -- be replayed. -- - -- NOTE: Specifying this as a time interval rather than in terms of number - -- of blocks means that during chain synchronization (where a node is - -- catching up with its neighbours) the frequency of writes /in terms of - -- blocks/ automatically goes down. - , onDiskWriteInterval :: STM m DiffTime + -- See also 'defaultDiskPolicy' + , onDiskShouldTakeSnapshot :: Maybe DiffTime -> Word64 -> Bool } -- | Default on-disk policy -- --- Right now we set this to 12 hrs, which for both Byron and for Shelley amounts --- to k blocks (albeit for different reasons). +-- The goal of the default policy is to take a snapshot roughly every @k@ +-- blocks during normal operation, and every 500k blocks during syncing +-- (in early 2020, the chain consists of roughly 3.6M blocks, so we'd take +-- roughly 9 snapshots during a full sync). -- --- TODO: We might want to revise this --- . -defaultDiskPolicy :: IOLike m - => SecurityParam -- ^ Maximum rollback - -> DiskPolicy m -defaultDiskPolicy _k = DiskPolicy { - onDiskNumSnapshots = 2 - , onDiskWriteInterval = return (12 * 60 * 60) - } +-- @k@ blocks during normal operation means one snapshot every 12 hours. +-- We therefore take a snapshot every 12 ours, or every 500k blocks, whichever +-- comes first. +-- +-- If users never leave their wallet running for long, however, this would mean +-- that we /never/ take snapshots after syncing (until we get to 500k blocks). +-- So, on startup, we take a snapshot as soon as there are @k@ blocks replayed. +-- This means that even if users frequently shut down their wallet, we still +-- take a snapshot roughly every @k@ blocks. It does mean the possibility of +-- an extra unnecessary snapshot during syncing (if the node is restarted), but +-- that is not a big deal. +defaultDiskPolicy :: SecurityParam -- ^ Maximum rollback + -> DiskPolicy +defaultDiskPolicy (SecurityParam k) = DiskPolicy {..} + where + onDiskNumSnapshots :: Word + onDiskNumSnapshots = 2 + + onDiskShouldTakeSnapshot :: Maybe DiffTime -> Word64 -> Bool + onDiskShouldTakeSnapshot (Just timeSinceLast) blocksSinceLast = + timeSinceLast >= fromIntegral (k * 20) + || blocksSinceLast >= 500_000 + onDiskShouldTakeSnapshot Nothing blocksSinceLast = + blocksSinceLast >= k diff --git a/ouroboros-consensus/src/Ouroboros/Storage/LedgerDB/OnDisk.hs b/ouroboros-consensus/src/Ouroboros/Storage/LedgerDB/OnDisk.hs index 147c6808093..c9b229400fd 100644 --- a/ouroboros-consensus/src/Ouroboros/Storage/LedgerDB/OnDisk.hs +++ b/ouroboros-consensus/src/Ouroboros/Storage/LedgerDB/OnDisk.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE KindSignatures #-} @@ -7,6 +8,7 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TupleSections #-} module Ouroboros.Storage.LedgerDB.OnDisk ( -- * Opening the database @@ -37,6 +39,7 @@ import qualified Data.List as List import Data.Maybe (mapMaybe) import Data.Set (Set) import qualified Data.Set as Set +import Data.Word import GHC.Generics (Generic) import GHC.Stack import Text.Read (readMaybe) @@ -164,14 +167,14 @@ initLedgerDB :: forall m h l r b e. (IOLike m, HasCallStack) -> LedgerDbParams -> LedgerDbConf m l r b e -> StreamAPI m r b - -> m (InitLog r, LedgerDB l r) + -> m (InitLog r, LedgerDB l r, Word64) initLedgerDB replayTracer tracer hasFS decLedger decRef params conf streamAPI = do snapshots <- listSnapshots hasFS tryNewestFirst id snapshots where tryNewestFirst :: (InitLog r -> InitLog r) -> [DiskSnapshot] - -> m (InitLog r, LedgerDB l r) + -> m (InitLog r, LedgerDB l r, Word64) tryNewestFirst acc [] = do -- We're out of snapshots. Start at genesis traceWith replayTracer $ ReplayFromGenesis () @@ -179,7 +182,7 @@ initLedgerDB replayTracer tracer hasFS decLedger decRef params conf streamAPI = ml <- runExceptT $ initStartingWith replayTracer conf streamAPI initDb case ml of Left _ -> error "invariant violation: invalid current chain" - Right l -> return (acc InitFromGenesis, l) + Right (l, replayed) -> return (acc InitFromGenesis, l, replayed) tryNewestFirst acc (s:ss) = do -- If we fail to use this snapshot, delete it and try an older one ml <- runExceptT $ initFromSnapshot @@ -196,8 +199,8 @@ initLedgerDB replayTracer tracer hasFS decLedger decRef params conf streamAPI = deleteSnapshot hasFS s traceWith tracer $ InvalidSnapshot s err tryNewestFirst (acc . InitFailure s err) ss - Right (r, l) -> - return (acc (InitFromSnapshot s r), l) + Right (r, l, replayed) -> + return (acc (InitFromSnapshot s r), l, replayed) {------------------------------------------------------------------------------- Internal: initialize using the given snapshot @@ -227,13 +230,13 @@ initFromSnapshot :: forall m h l r b e. (IOLike m, HasCallStack) -> LedgerDbConf m l r b e -> StreamAPI m r b -> DiskSnapshot - -> ExceptT (InitFailure r) m (Tip r, LedgerDB l r) + -> ExceptT (InitFailure r) m (Tip r, LedgerDB l r, Word64) initFromSnapshot tracer hasFS decLedger decRef params conf streamAPI ss = do initSS <- withExceptT InitFailureRead $ readSnapshot hasFS decLedger decRef ss lift $ traceWith tracer $ ReplayFromSnapshot ss (csTip initSS) () - initDB <- initStartingWith tracer conf streamAPI (ledgerDbWithAnchor params initSS) - return (csTip initSS, initDB) + (initDB, replayed) <- initStartingWith tracer conf streamAPI (ledgerDbWithAnchor params initSS) + return (csTip initSS, initDB, replayed) -- | Attempt to initialize the ledger DB starting from the given ledger DB initStartingWith :: forall m l r b e. (Monad m, HasCallStack) @@ -241,16 +244,17 @@ initStartingWith :: forall m l r b e. (Monad m, HasCallStack) -> LedgerDbConf m l r b e -> StreamAPI m r b -> LedgerDB l r - -> ExceptT (InitFailure r) m (LedgerDB l r) + -> ExceptT (InitFailure r) m (LedgerDB l r, Word64) initStartingWith tracer conf@LedgerDbConf{..} streamAPI initDb = do streamAll streamAPI (ledgerDbTip initDb) InitFailureTooRecent - initDb + (initDb, 0) push where - push :: (r, b) -> LedgerDB l r -> m (LedgerDB l r) - push (r, b) db = - ledgerDbReapply conf (Val r b) db <* traceWith tracer (ReplayedBlock r r ()) + push :: (r, b) -> (LedgerDB l r, Word64) -> m (LedgerDB l r, Word64) + push (r, b) !(!db, !replayed) = do + traceWith tracer (ReplayedBlock r r ()) + (, replayed + 1) <$> ledgerDbReapply conf (Val r b) db {------------------------------------------------------------------------------- Write to disk @@ -290,7 +294,7 @@ takeSnapshot tracer hasFS encLedger encRef db = do trimSnapshots :: Monad m => Tracer m (TraceEvent r) -> HasFS m h - -> DiskPolicy m + -> DiskPolicy -> m [DiskSnapshot] trimSnapshots tracer hasFS DiskPolicy{..} = do snapshots <- listSnapshots hasFS diff --git a/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/LedgerDB/OnDisk.hs b/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/LedgerDB/OnDisk.hs index 95d2b07812f..61465bd2837 100644 --- a/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/LedgerDB/OnDisk.hs +++ b/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/LedgerDB/OnDisk.hs @@ -652,15 +652,16 @@ runDB standalone@DB{..} cmd = (_, db) <- atomically $ readTVar dbState Snapped <$> takeSnapshot nullTracer hasFS S.encode S.encode db go hasFS Restore = do - (initLog, db) <- initLedgerDB - nullTracer - nullTracer - hasFS - S.decode - S.decode - (dbLgrParams dbEnv) - conf - streamAPI + (initLog, db, _replayed) <- + initLedgerDB + nullTracer + nullTracer + hasFS + S.decode + S.decode + (dbLgrParams dbEnv) + conf + streamAPI atomically $ modifyTVar dbState (\(rs, _) -> (rs, db)) return $ Restored (fromInitLog initLog, ledgerDbCurrent db) go hasFS (Corrupt c ss) =