{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | Background tasks:
--
-- * Copying blocks from the VolatileDB to the ImmutableDB
-- * Performing and scheduling garbage collections on the VolatileDB
-- * Writing snapshots of the LedgerDB to disk and deleting old ones
-- * Executing scheduled chain selections
module Ouroboros.Consensus.Storage.ChainDB.Impl.Background (
    -- * Launch background tasks
    launchBgTasks
    -- * Copying blocks from the VolatileDB to the ImmutableDB
  , copyAndSnapshotRunner
  , copyToImmutableDB
  , updateLedgerSnapshots
    -- * Executing garbage collection
  , garbageCollect
    -- * Scheduling garbage collections
  , GcParams (..)
  , GcSchedule
  , computeTimeForGC
  , gcScheduleRunner
  , newGcSchedule
  , scheduleGC
    -- ** Testing
  , ScheduledGc (..)
  , dumpGcSchedule
    -- * Adding blocks to the ChainDB
  , addBlockRunner
  ) where

import           Control.Exception (assert)
import           Control.Monad (forM_, forever, void)
import           Control.Monad.Trans.Class (lift)
import           Control.ResourceRegistry
import           Control.Tracer
import           Data.Foldable (toList)
import qualified Data.Map.Strict as Map
import           Data.Sequence.Strict (StrictSeq (..))
import qualified Data.Sequence.Strict as Seq
import           Data.Time.Clock
import           Data.Void (Void)
import           Data.Word
import           GHC.Generics (Generic)
import           GHC.Stack (HasCallStack)
import           Ouroboros.Consensus.Block
import           Ouroboros.Consensus.Config
import           Ouroboros.Consensus.HardFork.Abstract
import           Ouroboros.Consensus.Ledger.Abstract
import           Ouroboros.Consensus.Ledger.Inspect
import           Ouroboros.Consensus.Ledger.SupportsProtocol
import           Ouroboros.Consensus.Protocol.Abstract
import           Ouroboros.Consensus.Storage.ChainDB.API (AddBlockResult (..),
                     BlockComponent (..))
import           Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel
                     (chainSelSync)
import           Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB
                     (LgrDbSerialiseConstraints)
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB as LgrDB
import           Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import           Ouroboros.Consensus.Storage.LedgerDB (TimeSinceLast (..))
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import           Ouroboros.Consensus.Util
import           Ouroboros.Consensus.Util.Condense
import           Ouroboros.Consensus.Util.Enclose (Enclosing' (..))
import           Ouroboros.Consensus.Util.IOLike
import           Ouroboros.Network.AnchoredFragment (AnchoredSeq (..))
import qualified Ouroboros.Network.AnchoredFragment as AF

{-------------------------------------------------------------------------------
  Launch background tasks
-------------------------------------------------------------------------------}

launchBgTasks ::
     forall m blk.
     ( IOLike m
     , LedgerSupportsProtocol blk
     , BlockSupportsDiffusionPipelining blk
     , InspectLedger blk
     , HasHardForkHistory blk
     , LgrDbSerialiseConstraints blk
     )
  => ChainDbEnv m blk
  -> Word64 -- ^ Number of immutable blocks replayed on ledger DB startup
  -> m ()
launchBgTasks :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
 BlockSupportsDiffusionPipelining blk, InspectLedger blk,
 HasHardForkHistory blk, LgrDbSerialiseConstraints blk) =>
ChainDbEnv m blk -> Word64 -> m ()
launchBgTasks cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
..} Word64
replayed = do
    !m ()
addBlockThread <- String -> m Void -> m (m ())
launch String
"ChainDB.addBlockRunner" (m Void -> m (m ())) -> m Void -> m (m ())
forall a b. (a -> b) -> a -> b
$
      Fuse m -> ChainDbEnv m blk -> m Void
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
 BlockSupportsDiffusionPipelining blk, InspectLedger blk,
 HasHardForkHistory blk, HasCallStack) =>
Fuse m -> ChainDbEnv m blk -> m Void
addBlockRunner Fuse m
cdbChainSelFuse ChainDbEnv m blk
cdb
    GcSchedule m
gcSchedule <- m (GcSchedule m)
forall (m :: * -> *). IOLike m => m (GcSchedule m)
newGcSchedule
    !m ()
gcThread <- String -> m Void -> m (m ())
launch String
"ChainDB.gcScheduleRunner" (m Void -> m (m ())) -> m Void -> m (m ())
forall a b. (a -> b) -> a -> b
$
      GcSchedule m -> (SlotNo -> m ()) -> m Void
forall (m :: * -> *).
IOLike m =>
GcSchedule m -> (SlotNo -> m ()) -> m Void
gcScheduleRunner GcSchedule m
gcSchedule ((SlotNo -> m ()) -> m Void) -> (SlotNo -> m ()) -> m Void
forall a b. (a -> b) -> a -> b
$ ChainDbEnv m blk -> SlotNo -> m ()
forall (m :: * -> *) blk.
IOLike m =>
ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect ChainDbEnv m blk
cdb
    !m ()
copyAndSnapshotThread <- String -> m Void -> m (m ())
launch String
"ChainDB.copyAndSnapshotRunner" (m Void -> m (m ())) -> m Void -> m (m ())
forall a b. (a -> b) -> a -> b
$
      ChainDbEnv m blk -> GcSchedule m -> Word64 -> Fuse m -> m Void
forall (m :: * -> *) blk.
(IOLike m, ConsensusProtocol (BlockProtocol blk), HasHeader blk,
 GetHeader blk, IsLedger (LedgerState blk),
 LgrDbSerialiseConstraints blk) =>
ChainDbEnv m blk -> GcSchedule m -> Word64 -> Fuse m -> m Void
copyAndSnapshotRunner ChainDbEnv m blk
cdb GcSchedule m
gcSchedule Word64
replayed Fuse m
cdbCopyFuse
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (m ()) -> m () -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (m ())
cdbKillBgThreads (m () -> STM m ()) -> m () -> STM m ()
forall a b. (a -> b) -> a -> b
$
      [m ()] -> m ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [m ()
addBlockThread, m ()
gcThread, m ()
copyAndSnapshotThread]
  where
    launch :: String -> m Void -> m (m ())
    launch :: String -> m Void -> m (m ())
launch = (Thread m Void -> m ()) -> m (Thread m Void) -> m (m ())
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Thread m Void -> m ()
forall (m :: * -> *) a. MonadAsync m => Thread m a -> m ()
cancelThread (m (Thread m Void) -> m (m ()))
-> (String -> m Void -> m (Thread m Void))
-> String
-> m Void
-> m (m ())
forall y z x0 x1. (y -> z) -> (x0 -> x1 -> y) -> x0 -> x1 -> z
.: ResourceRegistry m -> String -> m Void -> m (Thread m Void)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
cdbRegistry

{-------------------------------------------------------------------------------
  Copying blocks from the VolatileDB to the ImmutableDB
-------------------------------------------------------------------------------}

-- | Copy the blocks older than @k@ from the VolatileDB to the ImmutableDB.
--
-- These headers of these blocks can be retrieved by dropping the @k@ most
-- recent blocks from the fragment stored in 'cdbChain'.
--
-- The copied blocks are removed from the fragment stored in 'cdbChain'.
--
-- This function does not remove blocks from the VolatileDB.
--
-- The 'SlotNo' of the tip of the ImmutableDB after copying the blocks is
-- returned. This can be used for a garbage collection on the VolatileDB.
copyToImmutableDB ::
     forall m blk.
     ( IOLike m
     , ConsensusProtocol (BlockProtocol blk)
     , HasHeader blk
     , GetHeader blk
     , HasCallStack
     )
  => ChainDbEnv m blk
  -> Electric m (WithOrigin SlotNo)
copyToImmutableDB :: forall (m :: * -> *) blk.
(IOLike m, ConsensusProtocol (BlockProtocol blk), HasHeader blk,
 GetHeader blk, HasCallStack) =>
ChainDbEnv m blk -> Electric m (WithOrigin SlotNo)
copyToImmutableDB CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
..} = m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo)
forall {k} (m :: k -> *) (a :: k). m a -> Electric m a
electric (m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo))
-> m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ do
    [Point blk]
toCopy <- STM m [Point blk] -> m [Point blk]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m [Point blk] -> m [Point blk])
-> STM m [Point blk] -> m [Point blk]
forall a b. (a -> b) -> a -> b
$ do
      AnchoredFragment (Header blk)
curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
      let nbToCopy :: Int
nbToCopy = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChain Int -> Int -> Int
forall a. Num a => a -> a -> a
- Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
k)
          toCopy :: [Point blk]
          toCopy :: [Point blk]
toCopy = (Header blk -> Point blk) -> [Header blk] -> [Point blk]
forall a b. (a -> b) -> [a] -> [b]
map Header blk -> Point blk
forall blk. HasHeader (Header blk) => Header blk -> Point blk
headerPoint
                 ([Header blk] -> [Point blk]) -> [Header blk] -> [Point blk]
forall a b. (a -> b) -> a -> b
$ AnchoredFragment (Header blk) -> [Header blk]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst
                 (AnchoredFragment (Header blk) -> [Header blk])
-> AnchoredFragment (Header blk) -> [Header blk]
forall a b. (a -> b) -> a -> b
$ Int
-> AnchoredFragment (Header blk) -> AnchoredFragment (Header blk)
forall v a b.
Anchorable v a b =>
Int -> AnchoredSeq v a b -> AnchoredSeq v a b
AF.takeOldest Int
nbToCopy AnchoredFragment (Header blk)
curChain
      [Point blk] -> STM m [Point blk]
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return [Point blk]
toCopy

    if [Point blk] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Point blk]
toCopy
      -- This can't happen in practice, as we're only called when the fragment
      -- is longer than @k@. However, in the tests, we will be calling this
      -- function manually, which means it might be called when there are no
      -- blocks to copy.
      then TraceCopyToImmutableDBEvent blk -> m ()
trace TraceCopyToImmutableDBEvent blk
forall blk. TraceCopyToImmutableDBEvent blk
NoBlocksToCopyToImmutableDB
      else [Point blk] -> (Point blk -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Point blk]
toCopy ((Point blk -> m ()) -> m ()) -> (Point blk -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Point blk
pt -> do
        let hash :: HeaderHash blk
hash = case Point blk -> ChainHash blk
forall {k} (block :: k). Point block -> ChainHash block
pointHash Point blk
pt of
              BlockHash HeaderHash blk
h -> HeaderHash blk
h
              -- There is no actual genesis block that can occur on a chain
              ChainHash blk
GenesisHash -> String -> HeaderHash blk
forall a. HasCallStack => String -> a
error String
"genesis block on current chain"
        WithOrigin SlotNo
slotNoAtImmutableDBTip <- STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo))
-> STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ ImmutableDB m blk -> STM m (WithOrigin SlotNo)
forall (m :: * -> *) blk.
(MonadSTM m, HasCallStack) =>
ImmutableDB m blk -> STM m (WithOrigin SlotNo)
ImmutableDB.getTipSlot ImmutableDB m blk
cdbImmutableDB
        Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Point blk -> WithOrigin SlotNo
forall {k} (block :: k). Point block -> WithOrigin SlotNo
pointSlot Point blk
pt WithOrigin SlotNo -> WithOrigin SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
>= WithOrigin SlotNo
slotNoAtImmutableDBTip) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        -- When the block is corrupt, the function below will throw an
        -- exception. This exception will make sure that we shut down the node
        -- and that the next time we start, validation will be enabled.
        blk
blk <- VolatileDB m blk
-> BlockComponent blk blk -> HeaderHash blk -> m blk
forall (m :: * -> *) blk b.
(MonadThrow m, HasHeader blk) =>
VolatileDB m blk -> BlockComponent blk b -> HeaderHash blk -> m b
VolatileDB.getKnownBlockComponent VolatileDB m blk
cdbVolatileDB BlockComponent blk blk
forall blk. BlockComponent blk blk
GetVerifiedBlock HeaderHash blk
hash
        -- We're the only one modifying the ImmutableDB, so the tip cannot
        -- have changed since we last checked it.
        ImmutableDB m blk -> blk -> m ()
forall (m :: * -> *) blk.
HasCallStack =>
ImmutableDB m blk -> blk -> m ()
ImmutableDB.appendBlock ImmutableDB m blk
cdbImmutableDB blk
blk
        -- TODO the invariant of 'cdbChain' is shortly violated between
        -- these two lines: the tip was updated on the line above, but the
        -- anchor point is only updated on the line below.
        STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Point blk -> STM m ()
removeFromChain Point blk
pt
        TraceCopyToImmutableDBEvent blk -> m ()
trace (TraceCopyToImmutableDBEvent blk -> m ())
-> TraceCopyToImmutableDBEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ Point blk -> TraceCopyToImmutableDBEvent blk
forall blk. Point blk -> TraceCopyToImmutableDBEvent blk
CopiedBlockToImmutableDB Point blk
pt

    -- Get the /possibly/ updated tip of the ImmutableDB
    STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo))
-> STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ ImmutableDB m blk -> STM m (WithOrigin SlotNo)
forall (m :: * -> *) blk.
(MonadSTM m, HasCallStack) =>
ImmutableDB m blk -> STM m (WithOrigin SlotNo)
ImmutableDB.getTipSlot ImmutableDB m blk
cdbImmutableDB
  where
    SecurityParam Word64
k = TopLevelConfig blk -> SecurityParam
forall blk.
ConsensusProtocol (BlockProtocol blk) =>
TopLevelConfig blk -> SecurityParam
configSecurityParam TopLevelConfig blk
cdbTopLevelConfig
    trace :: TraceCopyToImmutableDBEvent blk -> m ()
trace = Tracer m (TraceCopyToImmutableDBEvent blk)
-> TraceCopyToImmutableDBEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith ((TraceCopyToImmutableDBEvent blk -> TraceEvent blk)
-> Tracer m (TraceEvent blk)
-> Tracer m (TraceCopyToImmutableDBEvent blk)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap TraceCopyToImmutableDBEvent blk -> TraceEvent blk
forall blk. TraceCopyToImmutableDBEvent blk -> TraceEvent blk
TraceCopyToImmutableDBEvent Tracer m (TraceEvent blk)
cdbTracer)

    -- | Remove the header corresponding to the given point from the beginning
    -- of the current chain fragment.
    --
    -- PRECONDITION: the header must be the first one (oldest) in the chain
    removeFromChain :: Point blk -> STM m ()
    removeFromChain :: Point blk -> STM m ()
removeFromChain Point blk
pt = do
      -- The chain might have been extended in the meantime.
      AnchoredFragment (Header blk)
curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
      case AnchoredFragment (Header blk)
curChain of
        Header blk
hdr :< AnchoredFragment (Header blk)
curChain'
          | Header blk -> Point blk
forall blk. HasHeader (Header blk) => Header blk -> Point blk
headerPoint Header blk
hdr Point blk -> Point blk -> Bool
forall a. Eq a => a -> a -> Bool
== Point blk
pt
          -> StrictTVar m (AnchoredFragment (Header blk))
-> AnchoredFragment (Header blk) -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain AnchoredFragment (Header blk)
curChain'
        -- We're the only one removing things from 'curChain', so this cannot
        -- happen if the precondition was satisfied.
        AnchoredFragment (Header blk)
_ -> String -> STM m ()
forall a. HasCallStack => String -> a
error String
"header to remove not on the current chain"

-- | Copy blocks from the VolatileDB to ImmutableDB and take snapshots of the
-- LgrDB
--
-- We watch the chain for changes. Whenever the chain is longer than @k@, then
-- the headers older than @k@ are copied from the VolatileDB to the ImmutableDB
-- (using 'copyToImmutableDB'). Once that is complete,
--
-- * We periodically take a snapshot of the LgrDB (depending on its config).
--   When enough blocks (depending on its config) have been replayed during
--   startup, a snapshot of the replayed LgrDB will be written to disk at the
--   start of this function.
--   NOTE: After this initial snapshot we do not take a snapshot of the LgrDB
--   until the chain has changed again, irrespective of the LgrDB policy.
-- * Schedule GC of the VolatileDB ('scheduleGC') for the 'SlotNo' of the most
--   recent block that was copied.
--
-- It is important that we only take LgrDB snapshots when are are /sure/ they
-- have been copied to the ImmutableDB, since the LgrDB assumes that all
-- snapshots correspond to immutable blocks. (Of course, data corruption can
-- occur and we can handle it by reverting to an older LgrDB snapshot, but we
-- should need this only in exceptional circumstances.)
--
-- We do not store any state of the VolatileDB GC. If the node shuts down before
-- GC can happen, when we restart the node and schedule the /next/ GC, it will
-- /imply/ any previously scheduled GC, since GC is driven by slot number
-- ("garbage collect anything older than @x@").
copyAndSnapshotRunner ::
     forall m blk.
     ( IOLike m
     , ConsensusProtocol (BlockProtocol blk)
     , HasHeader blk
     , GetHeader blk
     , IsLedger (LedgerState blk)
     , LgrDbSerialiseConstraints blk
     )
  => ChainDbEnv m blk
  -> GcSchedule m
  -> Word64 -- ^ Number of immutable blocks replayed on ledger DB startup
  -> Fuse m
  -> m Void
copyAndSnapshotRunner :: forall (m :: * -> *) blk.
(IOLike m, ConsensusProtocol (BlockProtocol blk), HasHeader blk,
 GetHeader blk, IsLedger (LedgerState blk),
 LgrDbSerialiseConstraints blk) =>
ChainDbEnv m blk -> GcSchedule m -> Word64 -> Fuse m -> m Void
copyAndSnapshotRunner cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
..} GcSchedule m
gcSchedule Word64
replayed Fuse m
fuse =
    if TimeSinceLast DiffTime -> Word64 -> Bool
onDiskShouldTakeSnapshot TimeSinceLast DiffTime
forall time. TimeSinceLast time
NoSnapshotTakenYet Word64
replayed then do
      ChainDbEnv m blk -> m ()
forall (m :: * -> *) blk.
(IOLike m, LgrDbSerialiseConstraints blk, HasHeader blk,
 IsLedger (LedgerState blk)) =>
ChainDbEnv m blk -> m ()
updateLedgerSnapshots ChainDbEnv m blk
cdb
      Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
      TimeSinceLast Time -> Word64 -> m Void
loop (Time -> TimeSinceLast Time
forall time. time -> TimeSinceLast time
TimeSinceLast Time
now) Word64
0
    else
      TimeSinceLast Time -> Word64 -> m Void
loop TimeSinceLast Time
forall time. TimeSinceLast time
NoSnapshotTakenYet Word64
replayed
  where
    SecurityParam Word64
k      = TopLevelConfig blk -> SecurityParam
forall blk.
ConsensusProtocol (BlockProtocol blk) =>
TopLevelConfig blk -> SecurityParam
configSecurityParam TopLevelConfig blk
cdbTopLevelConfig
    LgrDB.DiskPolicy{Word
Flag "DoDiskSnapshotChecksum"
TimeSinceLast DiffTime -> Word64 -> Bool
onDiskShouldTakeSnapshot :: TimeSinceLast DiffTime -> Word64 -> Bool
onDiskNumSnapshots :: Word
onDiskShouldChecksumSnapshots :: Flag "DoDiskSnapshotChecksum"
onDiskNumSnapshots :: DiskPolicy -> Word
onDiskShouldTakeSnapshot :: DiskPolicy -> TimeSinceLast DiffTime -> Word64 -> Bool
onDiskShouldChecksumSnapshots :: DiskPolicy -> Flag "DoDiskSnapshotChecksum"
..} = LgrDB m blk -> DiskPolicy
forall (m :: * -> *) blk. LgrDB m blk -> DiskPolicy
LgrDB.getDiskPolicy LgrDB m blk
cdbLgrDB

    loop :: TimeSinceLast Time -> Word64 -> m Void
    loop :: TimeSinceLast Time -> Word64 -> m Void
loop TimeSinceLast Time
mPrevSnapshot Word64
distance = do
      -- Wait for the chain to grow larger than @k@
      Word64
numToWrite <- STM m Word64 -> m Word64
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Word64 -> m Word64) -> STM m Word64 -> m Word64
forall a b. (a -> b) -> a -> b
$ do
        AnchoredFragment (Header blk)
curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
        Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> STM m ()) -> Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChain) Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Word64
k
        Word64 -> STM m Word64
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Word64 -> STM m Word64) -> Word64 -> STM m Word64
forall a b. (a -> b) -> a -> b
$ Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChain) Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
- Word64
k

      -- Copy blocks to ImmutableDB
      --
      -- This is a synchronous operation: when it returns, the blocks have been
      -- copied to disk (though not flushed, necessarily).
      Fuse m -> Electric m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall (m :: * -> *) a.
(MonadThrow m, MonadMVar m) =>
Fuse m -> Electric m a -> m a
withFuse Fuse m
fuse (ChainDbEnv m blk -> Electric m (WithOrigin SlotNo)
forall (m :: * -> *) blk.
(IOLike m, ConsensusProtocol (BlockProtocol blk), HasHeader blk,
 GetHeader blk, HasCallStack) =>
ChainDbEnv m blk -> Electric m (WithOrigin SlotNo)
copyToImmutableDB ChainDbEnv m blk
cdb) m (WithOrigin SlotNo) -> (WithOrigin SlotNo -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= WithOrigin SlotNo -> m ()
scheduleGC'

      Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
      let distance' :: Word64
distance' = Word64
distance Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
numToWrite
          elapsed :: TimeSinceLast DiffTime
elapsed   = (\Time
prev -> Time
now Time -> Time -> DiffTime
`diffTime` Time
prev) (Time -> DiffTime) -> TimeSinceLast Time -> TimeSinceLast DiffTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TimeSinceLast Time
mPrevSnapshot

      if TimeSinceLast DiffTime -> Word64 -> Bool
onDiskShouldTakeSnapshot TimeSinceLast DiffTime
elapsed Word64
distance' then do
        ChainDbEnv m blk -> m ()
forall (m :: * -> *) blk.
(IOLike m, LgrDbSerialiseConstraints blk, HasHeader blk,
 IsLedger (LedgerState blk)) =>
ChainDbEnv m blk -> m ()
updateLedgerSnapshots ChainDbEnv m blk
cdb
        TimeSinceLast Time -> Word64 -> m Void
loop (Time -> TimeSinceLast Time
forall time. time -> TimeSinceLast time
TimeSinceLast Time
now) Word64
0
      else
        TimeSinceLast Time -> Word64 -> m Void
loop TimeSinceLast Time
mPrevSnapshot Word64
distance'

    scheduleGC' :: WithOrigin SlotNo -> m ()
    scheduleGC' :: WithOrigin SlotNo -> m ()
scheduleGC' WithOrigin SlotNo
Origin             = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    scheduleGC' (NotOrigin SlotNo
slotNo) =
        Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
forall (m :: * -> *) blk.
IOLike m =>
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
scheduleGC
          ((TraceGCEvent blk -> TraceEvent blk)
-> Tracer m (TraceEvent blk) -> Tracer m (TraceGCEvent blk)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap TraceGCEvent blk -> TraceEvent blk
forall blk. TraceGCEvent blk -> TraceEvent blk
TraceGCEvent Tracer m (TraceEvent blk)
cdbTracer)
          SlotNo
slotNo
          GcParams {
              gcDelay :: DiffTime
gcDelay    = DiffTime
cdbGcDelay
            , gcInterval :: DiffTime
gcInterval = DiffTime
cdbGcInterval
            }
          GcSchedule m
gcSchedule

-- | Write a snapshot of the LedgerDB to disk and remove old snapshots
-- (typically one) so that only 'onDiskNumSnapshots' snapshots are on disk.
updateLedgerSnapshots ::
    ( IOLike m
     , LgrDbSerialiseConstraints blk
     , HasHeader blk
     , IsLedger (LedgerState blk)
     )
  => ChainDbEnv m blk -> m ()
updateLedgerSnapshots :: forall (m :: * -> *) blk.
(IOLike m, LgrDbSerialiseConstraints blk, HasHeader blk,
 IsLedger (LedgerState blk)) =>
ChainDbEnv m blk -> m ()
updateLedgerSnapshots CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
..} = do
    m (Maybe (DiskSnapshot, RealPoint blk)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe (DiskSnapshot, RealPoint blk)) -> m ())
-> m (Maybe (DiskSnapshot, RealPoint blk)) -> m ()
forall a b. (a -> b) -> a -> b
$ LgrDB m blk -> m (Maybe (DiskSnapshot, RealPoint blk))
forall (m :: * -> *) blk.
(IOLike m, LgrDbSerialiseConstraints blk, HasHeader blk,
 IsLedger (LedgerState blk)) =>
LgrDB m blk -> m (Maybe (DiskSnapshot, RealPoint blk))
LgrDB.takeSnapshot  LgrDB m blk
cdbLgrDB
    m [DiskSnapshot] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [DiskSnapshot] -> m ()) -> m [DiskSnapshot] -> m ()
forall a b. (a -> b) -> a -> b
$ LgrDB m blk -> m [DiskSnapshot]
forall (m :: * -> *) blk.
(MonadCatch m, HasHeader blk) =>
LgrDB m blk -> m [DiskSnapshot]
LgrDB.trimSnapshots LgrDB m blk
cdbLgrDB

{-------------------------------------------------------------------------------
  Executing garbage collection
-------------------------------------------------------------------------------}

-- | Trigger a garbage collection for blocks older than the given 'SlotNo' on
-- the VolatileDB.
--
-- Also removes the corresponding cached "previously applied points" from the
-- LedgerDB.
--
-- This is thread-safe as the VolatileDB locks itself while performing a GC.
--
-- When calling this function it is __critical__ that the blocks that will be
-- garbage collected, which are determined by the @slotNo@ parameter, have
-- already been copied to the immutable DB (if they are part of the current
-- selection).
--
-- TODO will a long GC be a bottleneck? It will block any other calls to
-- @putBlock@ and @getBlock@.
garbageCollect :: forall m blk. IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect :: forall (m :: * -> *) blk.
IOLike m =>
ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
..} SlotNo
slotNo = do
    VolatileDB m blk -> HasCallStack => SlotNo -> m ()
forall (m :: * -> *) blk.
VolatileDB m blk -> HasCallStack => SlotNo -> m ()
VolatileDB.garbageCollect VolatileDB m blk
cdbVolatileDB SlotNo
slotNo
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      LgrDB m blk -> SlotNo -> STM m ()
forall (m :: * -> *) blk.
IOLike m =>
LgrDB m blk -> SlotNo -> STM m ()
LgrDB.garbageCollectPrevApplied LgrDB m blk
cdbLgrDB SlotNo
slotNo
      StrictTVar m (WithFingerprint (InvalidBlocks blk))
-> (WithFingerprint (InvalidBlocks blk)
    -> WithFingerprint (InvalidBlocks blk))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbInvalid ((WithFingerprint (InvalidBlocks blk)
  -> WithFingerprint (InvalidBlocks blk))
 -> STM m ())
-> (WithFingerprint (InvalidBlocks blk)
    -> WithFingerprint (InvalidBlocks blk))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ (InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
forall a b. (a -> b) -> WithFingerprint a -> WithFingerprint b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((InvalidBlocks blk -> InvalidBlocks blk)
 -> WithFingerprint (InvalidBlocks blk)
 -> WithFingerprint (InvalidBlocks blk))
-> (InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
forall a b. (a -> b) -> a -> b
$ (InvalidBlockInfo blk -> Bool)
-> InvalidBlocks blk -> InvalidBlocks blk
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter ((SlotNo -> SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
>= SlotNo
slotNo) (SlotNo -> Bool)
-> (InvalidBlockInfo blk -> SlotNo) -> InvalidBlockInfo blk -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InvalidBlockInfo blk -> SlotNo
forall blk. InvalidBlockInfo blk -> SlotNo
invalidBlockSlotNo)
    Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ()) -> TraceEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ TraceGCEvent blk -> TraceEvent blk
forall blk. TraceGCEvent blk -> TraceEvent blk
TraceGCEvent (TraceGCEvent blk -> TraceEvent blk)
-> TraceGCEvent blk -> TraceEvent blk
forall a b. (a -> b) -> a -> b
$ SlotNo -> TraceGCEvent blk
forall blk. SlotNo -> TraceGCEvent blk
PerformedGC SlotNo
slotNo

{-------------------------------------------------------------------------------
  Scheduling garbage collections
-------------------------------------------------------------------------------}

-- | Scheduled garbage collections
--
-- When a block has been copied to the ImmutableDB, we schedule a VolatileDB
-- garbage collection for the slot corresponding to the block in the future.
-- How far in the future is determined by the 'gcDelay' parameter. The goal is
-- to allow some overlap so that the write to the ImmutableDB will have been
-- flushed to disk before the block is removed from the VolatileDB.
--
-- We store scheduled garbage collections in a LIFO queue. Since the queue
-- will be very short (see further down for why) and entries are more often
-- added (at the block sync speed by a single thread) than removed (once every
-- 'gcInterval'), we simply use a 'StrictSeq' stored in a 'TVar' to make
-- reasoning and testing easier. Entries are enqueued at the end (right) and
-- dequeued from the head (left).
--
-- The 'Time's in the queue will be monotonically increasing. A fictional
-- example (with hh:mm:ss):
--
-- > [(16:01:12, SlotNo 1012), (16:04:38, SlotNo 1045), ..]
--
-- Scheduling a garbage collection with 'scheduleGC' will add an entry to the
-- end of the queue for the given slot at the time equal to now
-- ('getMonotonicTime') + the @gcDelay@ rounded to @gcInterval@. Unless the
-- last entry in the queue was scheduled for the same rounded time, in that
-- case the new entry replaces the existing entry. The goal of this is to
-- batch garbage collections so that, when possible, at most one garbage
-- collection happens every @gcInterval@.
--
-- For example, starting with an empty queue and @gcDelay = 5min@ and
-- @gcInterval = 10s@:
--
-- At 8:43:22, we schedule a GC for slot 10:
--
-- > [(8:48:30, SlotNo 10)]
--
-- The scheduled time is rounded up to the next interval. Next, at 8:43:24, we
-- schedule a GC for slot 11:
--
-- > [(8:48:30, SlotNo 11)]
--
-- Note that the existing entry is replaced with the new one, as they map to
-- the same @gcInterval@. Instead of two GCs 2 seconds apart, we will only
-- schedule one GC.
--
-- Next, at 8:44:02, we schedule a GC for slot 12:
--
-- > [(8:48:30, SlotNo 11), (8:49:10, SlotNo 12)]
--
-- Now, a new entry was appended to the queue, as it doesn't map to the same
-- @gcInterval@ as the last one.
--
-- In other words, everything scheduled in the first 10s will be done after
-- 20s. The bounds are the open-closed interval:
--
-- > (now + gcDelay, now + gcDelay + gcInterval]
--
-- Whether we're syncing at high speed or downloading blocks as they are
-- produced, the length of the queue will be at most @⌈gcDelay / gcInterval⌉ +
-- 1@, e.g., 5min / 10s = 31 entries. The @+ 1@ is needed because we might be
-- somewhere in the middle of a @gcInterval@.
--
-- The background thread will look at head of the queue and wait until that
-- has 'Time' passed. After the wait, it will pop off the head of the queue
-- and perform a garbage collection for the 'SlotNo' in the head. Note that
-- the 'SlotNo' before the wait can be different from the one after the wait,
-- precisely because of batching.
newtype GcSchedule m = GcSchedule (StrictTVar m (StrictSeq ScheduledGc))

data ScheduledGc = ScheduledGc {
      ScheduledGc -> Time
scheduledGcTime :: !Time
      -- ^ Time at which to run the garbage collection
    , ScheduledGc -> SlotNo
scheduledGcSlot :: !SlotNo
      -- ^ For which slot to run the garbage collection
    }
  deriving (ScheduledGc -> ScheduledGc -> Bool
(ScheduledGc -> ScheduledGc -> Bool)
-> (ScheduledGc -> ScheduledGc -> Bool) -> Eq ScheduledGc
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ScheduledGc -> ScheduledGc -> Bool
== :: ScheduledGc -> ScheduledGc -> Bool
$c/= :: ScheduledGc -> ScheduledGc -> Bool
/= :: ScheduledGc -> ScheduledGc -> Bool
Eq, Int -> ScheduledGc -> ShowS
[ScheduledGc] -> ShowS
ScheduledGc -> String
(Int -> ScheduledGc -> ShowS)
-> (ScheduledGc -> String)
-> ([ScheduledGc] -> ShowS)
-> Show ScheduledGc
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ScheduledGc -> ShowS
showsPrec :: Int -> ScheduledGc -> ShowS
$cshow :: ScheduledGc -> String
show :: ScheduledGc -> String
$cshowList :: [ScheduledGc] -> ShowS
showList :: [ScheduledGc] -> ShowS
Show, (forall x. ScheduledGc -> Rep ScheduledGc x)
-> (forall x. Rep ScheduledGc x -> ScheduledGc)
-> Generic ScheduledGc
forall x. Rep ScheduledGc x -> ScheduledGc
forall x. ScheduledGc -> Rep ScheduledGc x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ScheduledGc -> Rep ScheduledGc x
from :: forall x. ScheduledGc -> Rep ScheduledGc x
$cto :: forall x. Rep ScheduledGc x -> ScheduledGc
to :: forall x. Rep ScheduledGc x -> ScheduledGc
Generic, Context -> ScheduledGc -> IO (Maybe ThunkInfo)
Proxy ScheduledGc -> String
(Context -> ScheduledGc -> IO (Maybe ThunkInfo))
-> (Context -> ScheduledGc -> IO (Maybe ThunkInfo))
-> (Proxy ScheduledGc -> String)
-> NoThunks ScheduledGc
forall a.
(Context -> a -> IO (Maybe ThunkInfo))
-> (Context -> a -> IO (Maybe ThunkInfo))
-> (Proxy a -> String)
-> NoThunks a
$cnoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
noThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
$cwNoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
wNoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
$cshowTypeOf :: Proxy ScheduledGc -> String
showTypeOf :: Proxy ScheduledGc -> String
NoThunks)

instance Condense ScheduledGc where
  condense :: ScheduledGc -> String
condense (ScheduledGc Time
time SlotNo
slot) = (Time, SlotNo) -> String
forall a. Condense a => a -> String
condense (Time
time, SlotNo
slot)

data GcParams = GcParams {
      GcParams -> DiffTime
gcDelay    :: !DiffTime
      -- ^ How long to wait until performing the GC. See 'cdbsGcDelay'.
    , GcParams -> DiffTime
gcInterval :: !DiffTime
      -- ^ The GC interval: the minimum time between two GCs. See
      -- 'cdbsGcInterval'.
    }
  deriving (Int -> GcParams -> ShowS
[GcParams] -> ShowS
GcParams -> String
(Int -> GcParams -> ShowS)
-> (GcParams -> String) -> ([GcParams] -> ShowS) -> Show GcParams
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> GcParams -> ShowS
showsPrec :: Int -> GcParams -> ShowS
$cshow :: GcParams -> String
show :: GcParams -> String
$cshowList :: [GcParams] -> ShowS
showList :: [GcParams] -> ShowS
Show)

newGcSchedule :: IOLike m => m (GcSchedule m)
newGcSchedule :: forall (m :: * -> *). IOLike m => m (GcSchedule m)
newGcSchedule = StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m
forall (m :: * -> *).
StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m
GcSchedule (StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m)
-> m (StrictTVar m (StrictSeq ScheduledGc)) -> m (GcSchedule m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictSeq ScheduledGc -> m (StrictTVar m (StrictSeq ScheduledGc))
forall (m :: * -> *) a.
(HasCallStack, MonadSTM m, NoThunks a) =>
a -> m (StrictTVar m a)
newTVarIO StrictSeq ScheduledGc
forall a. StrictSeq a
Seq.empty

scheduleGC ::
     forall m blk. IOLike m
  => Tracer m (TraceGCEvent blk)
  -> SlotNo    -- ^ The slot to use for garbage collection
  -> GcParams
  -> GcSchedule m
  -> m ()
scheduleGC :: forall (m :: * -> *) blk.
IOLike m =>
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
scheduleGC Tracer m (TraceGCEvent blk)
tracer SlotNo
slotNo GcParams
gcParams (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) = do
    Time
timeScheduledForGC <- GcParams -> Time -> Time
computeTimeForGC GcParams
gcParams (Time -> Time) -> m Time -> m Time
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (StrictSeq ScheduledGc)
-> (StrictSeq ScheduledGc -> StrictSeq ScheduledGc) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue ((StrictSeq ScheduledGc -> StrictSeq ScheduledGc) -> STM m ())
-> (StrictSeq ScheduledGc -> StrictSeq ScheduledGc) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \case
      StrictSeq ScheduledGc
queue' :|> ScheduledGc { scheduledGcTime :: ScheduledGc -> Time
scheduledGcTime = Time
lastTimeScheduledForGC }
        | Time
timeScheduledForGC Time -> Time -> Bool
forall a. Eq a => a -> a -> Bool
== Time
lastTimeScheduledForGC
        -- Same interval, batch it
        -> StrictSeq ScheduledGc
queue' StrictSeq ScheduledGc -> ScheduledGc -> StrictSeq ScheduledGc
forall a. StrictSeq a -> a -> StrictSeq a
:|> Time -> SlotNo -> ScheduledGc
ScheduledGc Time
timeScheduledForGC SlotNo
slotNo
      StrictSeq ScheduledGc
queue
        -- Different interval or empty, so append it
        -> StrictSeq ScheduledGc
queue  StrictSeq ScheduledGc -> ScheduledGc -> StrictSeq ScheduledGc
forall a. StrictSeq a -> a -> StrictSeq a
:|> Time -> SlotNo -> ScheduledGc
ScheduledGc Time
timeScheduledForGC SlotNo
slotNo
    Tracer m (TraceGCEvent blk) -> TraceGCEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceGCEvent blk)
tracer (TraceGCEvent blk -> m ()) -> TraceGCEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ SlotNo -> Time -> TraceGCEvent blk
forall blk. SlotNo -> Time -> TraceGCEvent blk
ScheduledGC SlotNo
slotNo Time
timeScheduledForGC

computeTimeForGC ::
     GcParams
  -> Time  -- ^ Now
  -> Time  -- ^ The time at which to perform the GC
computeTimeForGC :: GcParams -> Time -> Time
computeTimeForGC GcParams { DiffTime
gcDelay :: GcParams -> DiffTime
gcDelay :: DiffTime
gcDelay, DiffTime
gcInterval :: GcParams -> DiffTime
gcInterval :: DiffTime
gcInterval } (Time DiffTime
now) =
    DiffTime -> Time
Time (DiffTime -> Time) -> DiffTime -> Time
forall a b. (a -> b) -> a -> b
$ Integer -> DiffTime
picosecondsToDiffTime (Integer -> DiffTime) -> Integer -> DiffTime
forall a b. (a -> b) -> a -> b
$
      -- We're rounding up to the nearest interval, because rounding down
      -- would mean GC'ing too early.
      Integer -> Integer -> Integer
forall a b. (Integral a, Integral b) => b -> a -> a
roundUpToInterval
        (DiffTime -> Integer
diffTimeToPicoseconds DiffTime
gcInterval)
        (DiffTime -> Integer
diffTimeToPicoseconds (DiffTime
now DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
+ DiffTime
gcDelay))

-- | Round to an interval
--
-- PRECONDITION: interval > 0
--
-- >    [roundUpToInterval 5 n | n <- [1..15]]
-- > == [5,5,5,5,5, 10,10,10,10,10, 15,15,15,15,15]
--
-- >    roundUpToInterval 5 0
-- > == 0
roundUpToInterval :: (Integral a, Integral b) => b -> a -> a
roundUpToInterval :: forall a b. (Integral a, Integral b) => b -> a -> a
roundUpToInterval b
interval a
x
    | a
m a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
0
    = a
d a -> a -> a
forall a. Num a => a -> a -> a
* b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
    | Bool
otherwise
    = (a
d a -> a -> a
forall a. Num a => a -> a -> a
+ a
1) a -> a -> a
forall a. Num a => a -> a -> a
* b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
  where
    (a
d, a
m) = a
x a -> a -> (a, a)
forall a. Integral a => a -> a -> (a, a)
`divMod` b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval

gcScheduleRunner ::
     forall m. IOLike m
  => GcSchedule m
  -> (SlotNo -> m ())  -- ^ GC function
  -> m Void
gcScheduleRunner :: forall (m :: * -> *).
IOLike m =>
GcSchedule m -> (SlotNo -> m ()) -> m Void
gcScheduleRunner (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) SlotNo -> m ()
runGc = m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Void) -> m () -> m Void
forall a b. (a -> b) -> a -> b
$ do
    -- Peek to know how long to wait
    Time
timeScheduledForGC <- STM m Time -> m Time
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Time -> m Time) -> STM m Time -> m Time
forall a b. (a -> b) -> a -> b
$
      StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue STM m (StrictSeq ScheduledGc)
-> (StrictSeq ScheduledGc -> STM m Time) -> STM m Time
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        StrictSeq ScheduledGc
Seq.Empty                             -> STM m Time
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
        ScheduledGc { Time
scheduledGcTime :: ScheduledGc -> Time
scheduledGcTime :: Time
scheduledGcTime } :<| StrictSeq ScheduledGc
_ -> Time -> STM m Time
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Time
scheduledGcTime

    Time
currentTime <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
    let toWait :: DiffTime
toWait = DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
max DiffTime
0 (Time
timeScheduledForGC Time -> Time -> DiffTime
`diffTime` Time
currentTime)
    DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
toWait

    -- After waiting, find the slot for which to GC and remove the entry from
    -- the queue.
    SlotNo
slotNo <- STM m SlotNo -> m SlotNo
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m SlotNo -> m SlotNo) -> STM m SlotNo -> m SlotNo
forall a b. (a -> b) -> a -> b
$
      StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue STM m (StrictSeq ScheduledGc)
-> (StrictSeq ScheduledGc -> STM m SlotNo) -> STM m SlotNo
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        ScheduledGc { SlotNo
scheduledGcSlot :: ScheduledGc -> SlotNo
scheduledGcSlot :: SlotNo
scheduledGcSlot } :<| StrictSeq ScheduledGc
queue' -> do
          StrictTVar m (StrictSeq ScheduledGc)
-> StrictSeq ScheduledGc -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue StrictSeq ScheduledGc
queue'
          SlotNo -> STM m SlotNo
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return SlotNo
scheduledGcSlot

        -- Impossible, we peeked at the queue and it contained an entry. We
        -- are the only one removing entries, so it can't have been removed
        -- while we were waiting.
        StrictSeq ScheduledGc
Seq.Empty -> String -> STM m SlotNo
forall a. HasCallStack => String -> a
error String
"queue empty after waiting"

    -- Garbage collection is called synchronously
    SlotNo -> m ()
runGc SlotNo
slotNo

-- | Return the current contents of the 'GcSchedule' queue without modifying
-- it.
--
-- For testing purposes.
dumpGcSchedule :: IOLike m => GcSchedule m -> STM m [ScheduledGc]
dumpGcSchedule :: forall (m :: * -> *).
IOLike m =>
GcSchedule m -> STM m [ScheduledGc]
dumpGcSchedule (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) = StrictSeq ScheduledGc -> [ScheduledGc]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (StrictSeq ScheduledGc -> [ScheduledGc])
-> STM m (StrictSeq ScheduledGc) -> STM m [ScheduledGc]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue

{-------------------------------------------------------------------------------
  Adding blocks to the ChainDB
-------------------------------------------------------------------------------}

-- | Read blocks from 'cdbChainSelQueue' and add them synchronously to the
-- ChainDB.
addBlockRunner ::
     ( IOLike m
     , LedgerSupportsProtocol blk
     , BlockSupportsDiffusionPipelining blk
     , InspectLedger blk
     , HasHardForkHistory blk
     , HasCallStack
     )
  => Fuse m
  -> ChainDbEnv m blk
  -> m Void
addBlockRunner :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
 BlockSupportsDiffusionPipelining blk, InspectLedger blk,
 HasHardForkHistory blk, HasCallStack) =>
Fuse m -> ChainDbEnv m blk -> m Void
addBlockRunner Fuse m
fuse cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
..} = m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Void) -> m () -> m Void
forall a b. (a -> b) -> a -> b
$ do
    let trace :: TraceAddBlockEvent blk -> m ()
trace = Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ())
-> (TraceAddBlockEvent blk -> TraceEvent blk)
-> TraceAddBlockEvent blk
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TraceAddBlockEvent blk -> TraceEvent blk
forall blk. TraceAddBlockEvent blk -> TraceEvent blk
TraceAddBlockEvent
    TraceAddBlockEvent blk -> m ()
trace (TraceAddBlockEvent blk -> m ()) -> TraceAddBlockEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall blk. Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
PoppedBlockFromQueue Enclosing' (RealPoint blk)
forall a. Enclosing' a
RisingEdge
    -- if the `chainSelSync` does not complete because it was killed by an async
    -- exception (or it errored), notify the blocked thread
    Fuse m -> Electric m () -> m ()
forall (m :: * -> *) a.
(MonadThrow m, MonadMVar m) =>
Fuse m -> Electric m a -> m a
withFuse Fuse m
fuse (Electric m () -> m ()) -> Electric m () -> m ()
forall a b. (a -> b) -> a -> b
$
      Electric m (ChainSelMessage m blk)
-> (ChainSelMessage m blk -> Electric m ())
-> (ChainSelMessage m blk -> Electric m ())
-> Electric m ()
forall a b c.
Electric m a
-> (a -> Electric m b) -> (a -> Electric m c) -> Electric m c
forall (m :: * -> *) a b c.
MonadCatch m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError
        (m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk)
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk))
-> m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk)
forall a b. (a -> b) -> a -> b
$ ChainSelQueue m blk -> m (ChainSelMessage m blk)
forall (m :: * -> *) blk.
IOLike m =>
ChainSelQueue m blk -> m (ChainSelMessage m blk)
getChainSelMessage ChainSelQueue m blk
cdbChainSelQueue)
        (\ChainSelMessage m blk
message -> m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          case ChainSelMessage m blk
message of
            ChainSelReprocessLoEBlocks StrictTMVar m ()
varProcessed ->
              STM m Bool -> STM m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM m Bool -> STM m ()) -> STM m Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m () -> () -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m ()
varProcessed ()
            ChainSelAddBlock BlockToAdd{StrictTMVar m Bool
varBlockWrittenToDisk :: StrictTMVar m Bool
varBlockWrittenToDisk :: forall (m :: * -> *) blk. BlockToAdd m blk -> StrictTMVar m Bool
varBlockWrittenToDisk, StrictTMVar m (AddBlockResult blk)
varBlockProcessed :: StrictTMVar m (AddBlockResult blk)
varBlockProcessed :: forall (m :: * -> *) blk.
BlockToAdd m blk -> StrictTMVar m (AddBlockResult blk)
varBlockProcessed} -> do
              Bool
_ <- StrictTMVar m Bool -> Bool -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m Bool
varBlockWrittenToDisk
                              Bool
False
              Bool
_ <- StrictTMVar m (AddBlockResult blk)
-> AddBlockResult blk -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m (AddBlockResult blk)
varBlockProcessed
                              (String -> AddBlockResult blk
forall blk. String -> AddBlockResult blk
FailedToAddBlock String
"Failed to add block synchronously")
              () -> STM m ()
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          ChainSelQueue m blk -> STM m ()
forall (m :: * -> *) blk.
IOLike m =>
ChainSelQueue m blk -> STM m ()
closeChainSelQueue ChainSelQueue m blk
cdbChainSelQueue)
        (\ChainSelMessage m blk
message -> do
          m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ case ChainSelMessage m blk
message of
            ChainSelReprocessLoEBlocks StrictTMVar m ()
_ ->
              TraceAddBlockEvent blk -> m ()
trace TraceAddBlockEvent blk
forall blk. TraceAddBlockEvent blk
PoppedReprocessLoEBlocksFromQueue
            ChainSelAddBlock BlockToAdd{blk
blockToAdd :: blk
blockToAdd :: forall (m :: * -> *) blk. BlockToAdd m blk -> blk
blockToAdd} ->
              TraceAddBlockEvent blk -> m ()
trace (TraceAddBlockEvent blk -> m ()) -> TraceAddBlockEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall blk. Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
PoppedBlockFromQueue (Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk)
-> Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> Enclosing' (RealPoint blk)
forall a. a -> Enclosing' a
FallingEdgeWith (RealPoint blk -> Enclosing' (RealPoint blk))
-> RealPoint blk -> Enclosing' (RealPoint blk)
forall a b. (a -> b) -> a -> b
$
                      blk -> RealPoint blk
forall blk. HasHeader blk => blk -> RealPoint blk
blockRealPoint blk
blockToAdd
          ChainDbEnv m blk -> ChainSelMessage m blk -> Electric m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
 BlockSupportsDiffusionPipelining blk, InspectLedger blk,
 HasHardForkHistory blk, HasCallStack) =>
ChainDbEnv m blk -> ChainSelMessage m blk -> Electric m ()
chainSelSync ChainDbEnv m blk
cdb ChainSelMessage m blk
message)