{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}

-- | Background tasks:
--
-- * Copying blocks from the VolatileDB to the ImmutableDB
-- * Performing and scheduling garbage collections on the VolatileDB
-- * Writing snapshots of the LedgerDB to disk and deleting old ones
-- * Executing scheduled chain selections
module Ouroboros.Consensus.Storage.ChainDB.Impl.Background (
    -- * Launch background tasks
    launchBgTasks
    -- * Copying blocks from the VolatileDB to the ImmutableDB
  , copyAndSnapshotRunner
  , copyToImmutableDB
    -- * Executing garbage collection
  , garbageCollect
    -- * Scheduling garbage collections
  , GcParams (..)
  , GcSchedule
  , computeTimeForGC
  , gcScheduleRunner
  , newGcSchedule
  , scheduleGC
    -- ** Testing
  , ScheduledGc (..)
  , dumpGcSchedule
    -- * Adding blocks to the ChainDB
  , addBlockRunner
  ) where

import           Cardano.Ledger.BaseTypes (unNonZero)
import           Control.Exception (assert)
import           Control.Monad (forM_, forever, void)
import           Control.Monad.Trans.Class (lift)
import           Control.ResourceRegistry
import           Control.Tracer
import           Data.Foldable (toList)
import qualified Data.Map.Strict as Map
import           Data.Sequence.Strict (StrictSeq (..))
import qualified Data.Sequence.Strict as Seq
import           Data.Time.Clock
import           Data.Void (Void)
import           Data.Word
import           GHC.Generics (Generic)
import           GHC.Stack (HasCallStack)
import           Ouroboros.Consensus.Block
import           Ouroboros.Consensus.Config
import           Ouroboros.Consensus.HardFork.Abstract
import           Ouroboros.Consensus.Ledger.Inspect
import           Ouroboros.Consensus.Ledger.SupportsProtocol
import           Ouroboros.Consensus.Protocol.Abstract
import           Ouroboros.Consensus.Storage.ChainDB.API (AddBlockResult (..),
                     BlockComponent (..))
import           Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel
                     (chainSelSync)
import           Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import           Ouroboros.Consensus.Util
import           Ouroboros.Consensus.Util.Condense
import           Ouroboros.Consensus.Util.Enclose (Enclosing' (..))
import           Ouroboros.Consensus.Util.IOLike
import           Ouroboros.Network.AnchoredFragment (AnchoredSeq (..))
import qualified Ouroboros.Network.AnchoredFragment as AF

{-------------------------------------------------------------------------------
  Launch background tasks
-------------------------------------------------------------------------------}

launchBgTasks ::
     forall m blk.
     ( IOLike m
     , LedgerSupportsProtocol blk
     , BlockSupportsDiffusionPipelining blk
     , InspectLedger blk
     , HasHardForkHistory blk
     )
  => ChainDbEnv m blk
  -> Word64 -- ^ Number of immutable blocks replayed on ledger DB startup
  -> m ()
launchBgTasks :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
 BlockSupportsDiffusionPipelining blk, InspectLedger blk,
 HasHardForkHistory blk) =>
ChainDbEnv m blk -> Word64 -> m ()
launchBgTasks cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
..} Word64
replayed = do
    !addBlockThread <- String -> m Void -> m (m ())
launch String
"ChainDB.addBlockRunner" (m Void -> m (m ())) -> m Void -> m (m ())
forall a b. (a -> b) -> a -> b
$
      Fuse m -> ChainDbEnv m blk -> m Void
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
 BlockSupportsDiffusionPipelining blk, InspectLedger blk,
 HasHardForkHistory blk, HasCallStack) =>
Fuse m -> ChainDbEnv m blk -> m Void
addBlockRunner Fuse m
cdbChainSelFuse ChainDbEnv m blk
cdb
    gcSchedule <- newGcSchedule
    !gcThread <- launch "ChainDB.gcScheduleRunner" $
      gcScheduleRunner gcSchedule $ garbageCollect cdb
    !copyAndSnapshotThread <- launch "ChainDB.copyAndSnapshotRunner" $
      copyAndSnapshotRunner cdb gcSchedule replayed cdbCopyFuse
    atomically $ writeTVar cdbKillBgThreads $
      sequence_ [addBlockThread, gcThread, copyAndSnapshotThread]
  where
    launch :: String -> m Void -> m (m ())
    launch :: String -> m Void -> m (m ())
launch = (Thread m Void -> m ()) -> m (Thread m Void) -> m (m ())
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Thread m Void -> m ()
forall (m :: * -> *) a. MonadAsync m => Thread m a -> m ()
cancelThread (m (Thread m Void) -> m (m ()))
-> (String -> m Void -> m (Thread m Void))
-> String
-> m Void
-> m (m ())
forall y z x0 x1. (y -> z) -> (x0 -> x1 -> y) -> x0 -> x1 -> z
.: ResourceRegistry m -> String -> m Void -> m (Thread m Void)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
cdbRegistry

{-------------------------------------------------------------------------------
  Copying blocks from the VolatileDB to the ImmutableDB
-------------------------------------------------------------------------------}

-- | Copy the blocks older than @k@ from the VolatileDB to the ImmutableDB.
--
-- These headers of these blocks can be retrieved by dropping the @k@ most
-- recent blocks from the fragment stored in 'cdbChain'.
--
-- The copied blocks are removed from the fragment stored in 'cdbChain'.
--
-- This function does not remove blocks from the VolatileDB.
--
-- The 'SlotNo' of the tip of the ImmutableDB after copying the blocks is
-- returned. This can be used for a garbage collection on the VolatileDB.
copyToImmutableDB ::
     forall m blk.
     ( IOLike m
     , ConsensusProtocol (BlockProtocol blk)
     , HasHeader blk
     , GetHeader blk
     , HasCallStack
     )
  => ChainDbEnv m blk
  -> Electric m (WithOrigin SlotNo)
copyToImmutableDB :: forall (m :: * -> *) blk.
(IOLike m, ConsensusProtocol (BlockProtocol blk), HasHeader blk,
 GetHeader blk, HasCallStack) =>
ChainDbEnv m blk -> Electric m (WithOrigin SlotNo)
copyToImmutableDB CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} = m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo)
forall {k} (m :: k -> *) (a :: k). m a -> Electric m a
electric (m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo))
-> m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ do
    toCopy <- STM m [Point blk] -> m [Point blk]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m [Point blk] -> m [Point blk])
-> STM m [Point blk] -> m [Point blk]
forall a b. (a -> b) -> a -> b
$ do
      curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
      let nbToCopy = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChain Int -> Int -> Int
forall a. Num a => a -> a -> a
- Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (NonZero Word64 -> Word64
forall a. NonZero a -> a
unNonZero NonZero Word64
k))
          toCopy :: [Point blk]
          toCopy = (Header blk -> Point blk) -> [Header blk] -> [Point blk]
forall a b. (a -> b) -> [a] -> [b]
map Header blk -> Point blk
forall blk. HasHeader (Header blk) => Header blk -> Point blk
headerPoint
                 ([Header blk] -> [Point blk]) -> [Header blk] -> [Point blk]
forall a b. (a -> b) -> a -> b
$ AnchoredFragment (Header blk) -> [Header blk]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst
                 (AnchoredFragment (Header blk) -> [Header blk])
-> AnchoredFragment (Header blk) -> [Header blk]
forall a b. (a -> b) -> a -> b
$ Int
-> AnchoredFragment (Header blk) -> AnchoredFragment (Header blk)
forall v a b.
Anchorable v a b =>
Int -> AnchoredSeq v a b -> AnchoredSeq v a b
AF.takeOldest Int
nbToCopy AnchoredFragment (Header blk)
curChain
      return toCopy

    if null toCopy
      -- This can't happen in practice, as we're only called when the fragment
      -- is longer than @k@. However, in the tests, we will be calling this
      -- function manually, which means it might be called when there are no
      -- blocks to copy.
      then trace NoBlocksToCopyToImmutableDB
      else forM_ toCopy $ \Point blk
pt -> do
        let hash :: HeaderHash blk
hash = case Point blk -> ChainHash blk
forall {k} (block :: k). Point block -> ChainHash block
pointHash Point blk
pt of
              BlockHash HeaderHash blk
h -> HeaderHash blk
h
              -- There is no actual genesis block that can occur on a chain
              ChainHash blk
GenesisHash -> String -> HeaderHash blk
forall a. HasCallStack => String -> a
error String
"genesis block on current chain"
        slotNoAtImmutableDBTip <- STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo))
-> STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ ImmutableDB m blk -> STM m (WithOrigin SlotNo)
forall (m :: * -> *) blk.
(MonadSTM m, HasCallStack) =>
ImmutableDB m blk -> STM m (WithOrigin SlotNo)
ImmutableDB.getTipSlot ImmutableDB m blk
cdbImmutableDB
        assert (pointSlot pt >= slotNoAtImmutableDBTip) $ return ()
        -- When the block is corrupt, the function below will throw an
        -- exception. This exception will make sure that we shut down the node
        -- and that the next time we start, validation will be enabled.
        blk <- VolatileDB.getKnownBlockComponent cdbVolatileDB GetVerifiedBlock hash
        -- We're the only one modifying the ImmutableDB, so the tip cannot
        -- have changed since we last checked it.
        ImmutableDB.appendBlock cdbImmutableDB blk
        -- TODO the invariant of 'cdbChain' is shortly violated between
        -- these two lines: the tip was updated on the line above, but the
        -- anchor point is only updated on the line below.
        atomically $ removeFromChain pt
        trace $ CopiedBlockToImmutableDB pt

    -- Get the /possibly/ updated tip of the ImmutableDB
    atomically $ ImmutableDB.getTipSlot cdbImmutableDB
  where
    SecurityParam NonZero Word64
k = TopLevelConfig blk -> SecurityParam
forall blk.
ConsensusProtocol (BlockProtocol blk) =>
TopLevelConfig blk -> SecurityParam
configSecurityParam TopLevelConfig blk
cdbTopLevelConfig
    trace :: TraceCopyToImmutableDBEvent blk -> m ()
trace = Tracer m (TraceCopyToImmutableDBEvent blk)
-> TraceCopyToImmutableDBEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith ((TraceCopyToImmutableDBEvent blk -> TraceEvent blk)
-> Tracer m (TraceEvent blk)
-> Tracer m (TraceCopyToImmutableDBEvent blk)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap TraceCopyToImmutableDBEvent blk -> TraceEvent blk
forall blk. TraceCopyToImmutableDBEvent blk -> TraceEvent blk
TraceCopyToImmutableDBEvent Tracer m (TraceEvent blk)
cdbTracer)

    -- | Remove the header corresponding to the given point from the beginning
    -- of the current chain fragment.
    --
    -- PRECONDITION: the header must be the first one (oldest) in the chain
    removeFromChain :: Point blk -> STM m ()
    removeFromChain :: Point blk -> STM m ()
removeFromChain Point blk
pt = do
      -- The chain might have been extended in the meantime.
      curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
      case curChain of
        Header blk
hdr :< AnchoredFragment (Header blk)
curChain'
          | Header blk -> Point blk
forall blk. HasHeader (Header blk) => Header blk -> Point blk
headerPoint Header blk
hdr Point blk -> Point blk -> Bool
forall a. Eq a => a -> a -> Bool
== Point blk
pt
          -> StrictTVar m (AnchoredFragment (Header blk))
-> AnchoredFragment (Header blk) -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain AnchoredFragment (Header blk)
curChain'
        -- We're the only one removing things from 'curChain', so this cannot
        -- happen if the precondition was satisfied.
        AnchoredFragment (Header blk)
_ -> String -> STM m ()
forall a. HasCallStack => String -> a
error String
"header to remove not on the current chain"

{-------------------------------------------------------------------------------
  Snapshotting
-------------------------------------------------------------------------------}

-- | Copy blocks from the VolatileDB to ImmutableDB and take snapshots of the
-- LedgerDB
--
-- We watch the chain for changes. Whenever the chain is longer than @k@, then
-- the headers older than @k@ are copied from the VolatileDB to the ImmutableDB
-- (using 'copyToImmutableDB'). Once that is complete,
--
-- * We periodically take a snapshot of the LedgerDB (depending on its config).
--   When enough blocks (depending on its config) have been replayed during
--   startup, a snapshot of the replayed LedgerDB will be written to disk at the
--   start of this function. NOTE: After this initial snapshot we do not take a
--   snapshot of the LedgerDB until the chain has changed again, irrespective of
--   the LedgerDB policy.
--
-- * Schedule GC of the VolatileDB ('scheduleGC') for the 'SlotNo' of the most
--   recent block that was copied.
--
-- It is important that we only take LedgerDB snapshots when are are /sure/ they
-- have been copied to the ImmutableDB, since the LedgerDB assumes that all
-- snapshots correspond to immutable blocks. (Of course, data corruption can
-- occur and we can handle it by reverting to an older LedgerDB snapshot, but we
-- should need this only in exceptional circumstances.)
--
-- We do not store any state of the VolatileDB GC. If the node shuts down before
-- GC can happen, when we restart the node and schedule the /next/ GC, it will
-- /imply/ any previously scheduled GC, since GC is driven by slot number
-- ("garbage collect anything older than @x@").
copyAndSnapshotRunner ::
     forall m blk.
     ( IOLike m
     , LedgerSupportsProtocol blk
     )
  => ChainDbEnv m blk
  -> GcSchedule m
  -> Word64 -- ^ Number of immutable blocks replayed on ledger DB startup
  -> Fuse m
  -> m Void
copyAndSnapshotRunner :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk) =>
ChainDbEnv m blk -> GcSchedule m -> Word64 -> Fuse m -> m Void
copyAndSnapshotRunner cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} GcSchedule m
gcSchedule Word64
replayed Fuse m
fuse = do
    -- this first flush will persist the differences that come from the initial
    -- chain selection.
    LedgerDB' m blk -> m ()
forall (m :: * -> *) (l :: LedgerStateKind) blk.
LedgerDB m l blk -> m ()
LedgerDB.tryFlush LedgerDB' m blk
cdbLedgerDB
    SnapCounters -> m Void
loop (SnapCounters -> m Void) -> m SnapCounters -> m Void
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< LedgerDB' m blk
-> (ExtLedgerState blk ~ ExtLedgerState blk) =>
   Maybe (Time, Time) -> Word64 -> m SnapCounters
forall (m :: * -> *) (l :: LedgerStateKind) blk.
LedgerDB m l blk
-> (l ~ ExtLedgerState blk) =>
   Maybe (Time, Time) -> Word64 -> m SnapCounters
LedgerDB.tryTakeSnapshot LedgerDB' m blk
cdbLedgerDB Maybe (Time, Time)
forall a. Maybe a
Nothing Word64
replayed
  where
    SecurityParam NonZero Word64
k = TopLevelConfig blk -> SecurityParam
forall blk.
ConsensusProtocol (BlockProtocol blk) =>
TopLevelConfig blk -> SecurityParam
configSecurityParam TopLevelConfig blk
cdbTopLevelConfig

    loop :: LedgerDB.SnapCounters -> m Void
    loop :: SnapCounters -> m Void
loop SnapCounters
counters = do
      let LedgerDB.SnapCounters {
              Maybe Time
prevSnapshotTime :: Maybe Time
prevSnapshotTime :: SnapCounters -> Maybe Time
prevSnapshotTime
            , Word64
ntBlocksSinceLastSnap :: Word64
ntBlocksSinceLastSnap :: SnapCounters -> Word64
ntBlocksSinceLastSnap
            } = SnapCounters
counters

      -- Wait for the chain to grow larger than @k@
      numToWrite <- STM m Word64 -> m Word64
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Word64 -> m Word64) -> STM m Word64 -> m Word64
forall a b. (a -> b) -> a -> b
$ do
        curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
        check $ fromIntegral (AF.length curChain) > unNonZero k
        return $ fromIntegral (AF.length curChain) - unNonZero k

      -- Copy blocks to ImmutableDB
      --
      -- This is a synchronous operation: when it returns, the blocks have been
      -- copied to disk (though not flushed, necessarily).
      withFuse fuse (copyToImmutableDB cdb) >>= scheduleGC'

      LedgerDB.tryFlush cdbLedgerDB

      now <- getMonotonicTime
      let ntBlocksSinceLastSnap' = Word64
ntBlocksSinceLastSnap Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
numToWrite

      loop =<< LedgerDB.tryTakeSnapshot cdbLedgerDB ((,now) <$> prevSnapshotTime) ntBlocksSinceLastSnap'

    scheduleGC' :: WithOrigin SlotNo -> m ()
    scheduleGC' :: WithOrigin SlotNo -> m ()
scheduleGC' WithOrigin SlotNo
Origin             = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    scheduleGC' (NotOrigin SlotNo
slotNo) =
        Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
forall (m :: * -> *) blk.
IOLike m =>
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
scheduleGC
          ((TraceGCEvent blk -> TraceEvent blk)
-> Tracer m (TraceEvent blk) -> Tracer m (TraceGCEvent blk)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap TraceGCEvent blk -> TraceEvent blk
forall blk. TraceGCEvent blk -> TraceEvent blk
TraceGCEvent Tracer m (TraceEvent blk)
cdbTracer)
          SlotNo
slotNo
          GcParams {
              gcDelay :: DiffTime
gcDelay    = DiffTime
cdbGcDelay
            , gcInterval :: DiffTime
gcInterval = DiffTime
cdbGcInterval
            }
          GcSchedule m
gcSchedule

{-------------------------------------------------------------------------------
  Executing garbage collection
-------------------------------------------------------------------------------}

-- | Trigger a garbage collection for blocks older than the given 'SlotNo' on
-- the VolatileDB.
--
-- Also removes the corresponding cached "previously applied points" from the
-- LedgerDB.
--
-- This is thread-safe as the VolatileDB locks itself while performing a GC.
--
-- When calling this function it is __critical__ that the blocks that will be
-- garbage collected, which are determined by the @slotNo@ parameter, have
-- already been copied to the immutable DB (if they are part of the current
-- selection).
--
-- TODO will a long GC be a bottleneck? It will block any other calls to
-- @putBlock@ and @getBlock@.
garbageCollect :: forall m blk. IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect :: forall (m :: * -> *) blk.
IOLike m =>
ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} SlotNo
slotNo = do
    VolatileDB m blk -> HasCallStack => SlotNo -> m ()
forall (m :: * -> *) blk.
VolatileDB m blk -> HasCallStack => SlotNo -> m ()
VolatileDB.garbageCollect VolatileDB m blk
cdbVolatileDB SlotNo
slotNo
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      LedgerDB' m blk -> SlotNo -> STM m ()
forall (m :: * -> *) (l :: LedgerStateKind) blk.
LedgerDB m l blk -> SlotNo -> STM m ()
LedgerDB.garbageCollect LedgerDB' m blk
cdbLedgerDB SlotNo
slotNo
      StrictTVar m (WithFingerprint (InvalidBlocks blk))
-> (WithFingerprint (InvalidBlocks blk)
    -> WithFingerprint (InvalidBlocks blk))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbInvalid ((WithFingerprint (InvalidBlocks blk)
  -> WithFingerprint (InvalidBlocks blk))
 -> STM m ())
-> (WithFingerprint (InvalidBlocks blk)
    -> WithFingerprint (InvalidBlocks blk))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ (InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
forall a b. (a -> b) -> WithFingerprint a -> WithFingerprint b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((InvalidBlocks blk -> InvalidBlocks blk)
 -> WithFingerprint (InvalidBlocks blk)
 -> WithFingerprint (InvalidBlocks blk))
-> (InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
forall a b. (a -> b) -> a -> b
$ (InvalidBlockInfo blk -> Bool)
-> InvalidBlocks blk -> InvalidBlocks blk
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter ((SlotNo -> SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
>= SlotNo
slotNo) (SlotNo -> Bool)
-> (InvalidBlockInfo blk -> SlotNo) -> InvalidBlockInfo blk -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InvalidBlockInfo blk -> SlotNo
forall blk. InvalidBlockInfo blk -> SlotNo
invalidBlockSlotNo)
    Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ()) -> TraceEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ TraceGCEvent blk -> TraceEvent blk
forall blk. TraceGCEvent blk -> TraceEvent blk
TraceGCEvent (TraceGCEvent blk -> TraceEvent blk)
-> TraceGCEvent blk -> TraceEvent blk
forall a b. (a -> b) -> a -> b
$ SlotNo -> TraceGCEvent blk
forall blk. SlotNo -> TraceGCEvent blk
PerformedGC SlotNo
slotNo

{-------------------------------------------------------------------------------
  Scheduling garbage collections
-------------------------------------------------------------------------------}

-- | Scheduled garbage collections
--
-- When a block has been copied to the ImmutableDB, we schedule a VolatileDB
-- garbage collection for the slot corresponding to the block in the future.
-- How far in the future is determined by the 'gcDelay' parameter. The goal is
-- to allow some overlap so that the write to the ImmutableDB will have been
-- flushed to disk before the block is removed from the VolatileDB.
--
-- We store scheduled garbage collections in a LIFO queue. Since the queue
-- will be very short (see further down for why) and entries are more often
-- added (at the block sync speed by a single thread) than removed (once every
-- 'gcInterval'), we simply use a 'StrictSeq' stored in a 'TVar' to make
-- reasoning and testing easier. Entries are enqueued at the end (right) and
-- dequeued from the head (left).
--
-- The 'Time's in the queue will be monotonically increasing. A fictional
-- example (with hh:mm:ss):
--
-- > [(16:01:12, SlotNo 1012), (16:04:38, SlotNo 1045), ..]
--
-- Scheduling a garbage collection with 'scheduleGC' will add an entry to the
-- end of the queue for the given slot at the time equal to now
-- ('getMonotonicTime') + the @gcDelay@ rounded to @gcInterval@. Unless the
-- last entry in the queue was scheduled for the same rounded time, in that
-- case the new entry replaces the existing entry. The goal of this is to
-- batch garbage collections so that, when possible, at most one garbage
-- collection happens every @gcInterval@.
--
-- For example, starting with an empty queue and @gcDelay = 5min@ and
-- @gcInterval = 10s@:
--
-- At 8:43:22, we schedule a GC for slot 10:
--
-- > [(8:48:30, SlotNo 10)]
--
-- The scheduled time is rounded up to the next interval. Next, at 8:43:24, we
-- schedule a GC for slot 11:
--
-- > [(8:48:30, SlotNo 11)]
--
-- Note that the existing entry is replaced with the new one, as they map to
-- the same @gcInterval@. Instead of two GCs 2 seconds apart, we will only
-- schedule one GC.
--
-- Next, at 8:44:02, we schedule a GC for slot 12:
--
-- > [(8:48:30, SlotNo 11), (8:49:10, SlotNo 12)]
--
-- Now, a new entry was appended to the queue, as it doesn't map to the same
-- @gcInterval@ as the last one.
--
-- In other words, everything scheduled in the first 10s will be done after
-- 20s. The bounds are the open-closed interval:
--
-- > (now + gcDelay, now + gcDelay + gcInterval]
--
-- Whether we're syncing at high speed or downloading blocks as they are
-- produced, the length of the queue will be at most @⌈gcDelay / gcInterval⌉ +
-- 1@, e.g., 5min / 10s = 31 entries. The @+ 1@ is needed because we might be
-- somewhere in the middle of a @gcInterval@.
--
-- The background thread will look at head of the queue and wait until that
-- has 'Time' passed. After the wait, it will pop off the head of the queue
-- and perform a garbage collection for the 'SlotNo' in the head. Note that
-- the 'SlotNo' before the wait can be different from the one after the wait,
-- precisely because of batching.
newtype GcSchedule m = GcSchedule (StrictTVar m (StrictSeq ScheduledGc))

data ScheduledGc = ScheduledGc {
      ScheduledGc -> Time
scheduledGcTime :: !Time
      -- ^ Time at which to run the garbage collection
    , ScheduledGc -> SlotNo
scheduledGcSlot :: !SlotNo
      -- ^ For which slot to run the garbage collection
    }
  deriving (ScheduledGc -> ScheduledGc -> Bool
(ScheduledGc -> ScheduledGc -> Bool)
-> (ScheduledGc -> ScheduledGc -> Bool) -> Eq ScheduledGc
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ScheduledGc -> ScheduledGc -> Bool
== :: ScheduledGc -> ScheduledGc -> Bool
$c/= :: ScheduledGc -> ScheduledGc -> Bool
/= :: ScheduledGc -> ScheduledGc -> Bool
Eq, Int -> ScheduledGc -> ShowS
[ScheduledGc] -> ShowS
ScheduledGc -> String
(Int -> ScheduledGc -> ShowS)
-> (ScheduledGc -> String)
-> ([ScheduledGc] -> ShowS)
-> Show ScheduledGc
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ScheduledGc -> ShowS
showsPrec :: Int -> ScheduledGc -> ShowS
$cshow :: ScheduledGc -> String
show :: ScheduledGc -> String
$cshowList :: [ScheduledGc] -> ShowS
showList :: [ScheduledGc] -> ShowS
Show, (forall x. ScheduledGc -> Rep ScheduledGc x)
-> (forall x. Rep ScheduledGc x -> ScheduledGc)
-> Generic ScheduledGc
forall x. Rep ScheduledGc x -> ScheduledGc
forall x. ScheduledGc -> Rep ScheduledGc x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ScheduledGc -> Rep ScheduledGc x
from :: forall x. ScheduledGc -> Rep ScheduledGc x
$cto :: forall x. Rep ScheduledGc x -> ScheduledGc
to :: forall x. Rep ScheduledGc x -> ScheduledGc
Generic, Context -> ScheduledGc -> IO (Maybe ThunkInfo)
Proxy ScheduledGc -> String
(Context -> ScheduledGc -> IO (Maybe ThunkInfo))
-> (Context -> ScheduledGc -> IO (Maybe ThunkInfo))
-> (Proxy ScheduledGc -> String)
-> NoThunks ScheduledGc
forall a.
(Context -> a -> IO (Maybe ThunkInfo))
-> (Context -> a -> IO (Maybe ThunkInfo))
-> (Proxy a -> String)
-> NoThunks a
$cnoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
noThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
$cwNoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
wNoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
$cshowTypeOf :: Proxy ScheduledGc -> String
showTypeOf :: Proxy ScheduledGc -> String
NoThunks)

instance Condense ScheduledGc where
  condense :: ScheduledGc -> String
condense (ScheduledGc Time
time SlotNo
slot) = (Time, SlotNo) -> String
forall a. Condense a => a -> String
condense (Time
time, SlotNo
slot)

data GcParams = GcParams {
      GcParams -> DiffTime
gcDelay    :: !DiffTime
      -- ^ How long to wait until performing the GC. See 'cdbsGcDelay'.
    , GcParams -> DiffTime
gcInterval :: !DiffTime
      -- ^ The GC interval: the minimum time between two GCs. See
      -- 'cdbsGcInterval'.
    }
  deriving (Int -> GcParams -> ShowS
[GcParams] -> ShowS
GcParams -> String
(Int -> GcParams -> ShowS)
-> (GcParams -> String) -> ([GcParams] -> ShowS) -> Show GcParams
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> GcParams -> ShowS
showsPrec :: Int -> GcParams -> ShowS
$cshow :: GcParams -> String
show :: GcParams -> String
$cshowList :: [GcParams] -> ShowS
showList :: [GcParams] -> ShowS
Show)

newGcSchedule :: IOLike m => m (GcSchedule m)
newGcSchedule :: forall (m :: * -> *). IOLike m => m (GcSchedule m)
newGcSchedule = StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m
forall (m :: * -> *).
StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m
GcSchedule (StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m)
-> m (StrictTVar m (StrictSeq ScheduledGc)) -> m (GcSchedule m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictSeq ScheduledGc -> m (StrictTVar m (StrictSeq ScheduledGc))
forall (m :: * -> *) a.
(HasCallStack, MonadSTM m, NoThunks a) =>
a -> m (StrictTVar m a)
newTVarIO StrictSeq ScheduledGc
forall a. StrictSeq a
Seq.empty

scheduleGC ::
     forall m blk. IOLike m
  => Tracer m (TraceGCEvent blk)
  -> SlotNo    -- ^ The slot to use for garbage collection
  -> GcParams
  -> GcSchedule m
  -> m ()
scheduleGC :: forall (m :: * -> *) blk.
IOLike m =>
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
scheduleGC Tracer m (TraceGCEvent blk)
tracer SlotNo
slotNo GcParams
gcParams (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) = do
    timeScheduledForGC <- GcParams -> Time -> Time
computeTimeForGC GcParams
gcParams (Time -> Time) -> m Time -> m Time
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
    atomically $ modifyTVar varQueue $ \case
      StrictSeq ScheduledGc
queue' :|> ScheduledGc { scheduledGcTime :: ScheduledGc -> Time
scheduledGcTime = Time
lastTimeScheduledForGC }
        | Time
timeScheduledForGC Time -> Time -> Bool
forall a. Eq a => a -> a -> Bool
== Time
lastTimeScheduledForGC
        -- Same interval, batch it
        -> StrictSeq ScheduledGc
queue' StrictSeq ScheduledGc -> ScheduledGc -> StrictSeq ScheduledGc
forall a. StrictSeq a -> a -> StrictSeq a
:|> Time -> SlotNo -> ScheduledGc
ScheduledGc Time
timeScheduledForGC SlotNo
slotNo
      StrictSeq ScheduledGc
queue
        -- Different interval or empty, so append it
        -> StrictSeq ScheduledGc
queue  StrictSeq ScheduledGc -> ScheduledGc -> StrictSeq ScheduledGc
forall a. StrictSeq a -> a -> StrictSeq a
:|> Time -> SlotNo -> ScheduledGc
ScheduledGc Time
timeScheduledForGC SlotNo
slotNo
    traceWith tracer $ ScheduledGC slotNo timeScheduledForGC

computeTimeForGC ::
     GcParams
  -> Time  -- ^ Now
  -> Time  -- ^ The time at which to perform the GC
computeTimeForGC :: GcParams -> Time -> Time
computeTimeForGC GcParams { DiffTime
gcDelay :: GcParams -> DiffTime
gcDelay :: DiffTime
gcDelay, DiffTime
gcInterval :: GcParams -> DiffTime
gcInterval :: DiffTime
gcInterval } (Time DiffTime
now) =
    DiffTime -> Time
Time (DiffTime -> Time) -> DiffTime -> Time
forall a b. (a -> b) -> a -> b
$ Integer -> DiffTime
picosecondsToDiffTime (Integer -> DiffTime) -> Integer -> DiffTime
forall a b. (a -> b) -> a -> b
$
      -- We're rounding up to the nearest interval, because rounding down
      -- would mean GC'ing too early.
      Integer -> Integer -> Integer
forall a b. (Integral a, Integral b) => b -> a -> a
roundUpToInterval
        (DiffTime -> Integer
diffTimeToPicoseconds DiffTime
gcInterval)
        (DiffTime -> Integer
diffTimeToPicoseconds (DiffTime
now DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
+ DiffTime
gcDelay))

-- | Round to an interval
--
-- PRECONDITION: interval > 0
--
-- >    [roundUpToInterval 5 n | n <- [1..15]]
-- > == [5,5,5,5,5, 10,10,10,10,10, 15,15,15,15,15]
--
-- >    roundUpToInterval 5 0
-- > == 0
roundUpToInterval :: (Integral a, Integral b) => b -> a -> a
roundUpToInterval :: forall a b. (Integral a, Integral b) => b -> a -> a
roundUpToInterval b
interval a
x
    | a
m a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
0
    = a
d a -> a -> a
forall a. Num a => a -> a -> a
* b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
    | Bool
otherwise
    = (a
d a -> a -> a
forall a. Num a => a -> a -> a
+ a
1) a -> a -> a
forall a. Num a => a -> a -> a
* b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
  where
    (a
d, a
m) = a
x a -> a -> (a, a)
forall a. Integral a => a -> a -> (a, a)
`divMod` b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval

gcScheduleRunner ::
     forall m. IOLike m
  => GcSchedule m
  -> (SlotNo -> m ())  -- ^ GC function
  -> m Void
gcScheduleRunner :: forall (m :: * -> *).
IOLike m =>
GcSchedule m -> (SlotNo -> m ()) -> m Void
gcScheduleRunner (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) SlotNo -> m ()
runGc = m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Void) -> m () -> m Void
forall a b. (a -> b) -> a -> b
$ do
    -- Peek to know how long to wait
    timeScheduledForGC <- STM m Time -> m Time
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Time -> m Time) -> STM m Time -> m Time
forall a b. (a -> b) -> a -> b
$
      StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue STM m (StrictSeq ScheduledGc)
-> (StrictSeq ScheduledGc -> STM m Time) -> STM m Time
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        StrictSeq ScheduledGc
Seq.Empty                             -> STM m Time
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
        ScheduledGc { Time
scheduledGcTime :: ScheduledGc -> Time
scheduledGcTime :: Time
scheduledGcTime } :<| StrictSeq ScheduledGc
_ -> Time -> STM m Time
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Time
scheduledGcTime

    currentTime <- getMonotonicTime
    let toWait = DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
max DiffTime
0 (Time
timeScheduledForGC Time -> Time -> DiffTime
`diffTime` Time
currentTime)
    threadDelay toWait

    -- After waiting, find the slot for which to GC and remove the entry from
    -- the queue.
    slotNo <- atomically $
      readTVar varQueue >>= \case
        ScheduledGc { SlotNo
scheduledGcSlot :: ScheduledGc -> SlotNo
scheduledGcSlot :: SlotNo
scheduledGcSlot } :<| StrictSeq ScheduledGc
queue' -> do
          StrictTVar m (StrictSeq ScheduledGc)
-> StrictSeq ScheduledGc -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue StrictSeq ScheduledGc
queue'
          SlotNo -> STM m SlotNo
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return SlotNo
scheduledGcSlot

        -- Impossible, we peeked at the queue and it contained an entry. We
        -- are the only one removing entries, so it can't have been removed
        -- while we were waiting.
        StrictSeq ScheduledGc
Seq.Empty -> String -> STM m SlotNo
forall a. HasCallStack => String -> a
error String
"queue empty after waiting"

    -- Garbage collection is called synchronously
    runGc slotNo

-- | Return the current contents of the 'GcSchedule' queue without modifying
-- it.
--
-- For testing purposes.
dumpGcSchedule :: IOLike m => GcSchedule m -> STM m [ScheduledGc]
dumpGcSchedule :: forall (m :: * -> *).
IOLike m =>
GcSchedule m -> STM m [ScheduledGc]
dumpGcSchedule (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) = StrictSeq ScheduledGc -> [ScheduledGc]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (StrictSeq ScheduledGc -> [ScheduledGc])
-> STM m (StrictSeq ScheduledGc) -> STM m [ScheduledGc]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue

{-------------------------------------------------------------------------------
  Adding blocks to the ChainDB
-------------------------------------------------------------------------------}

-- | Read blocks from 'cdbChainSelQueue' and add them synchronously to the
-- ChainDB.
addBlockRunner ::
     ( IOLike m
     , LedgerSupportsProtocol blk
     , BlockSupportsDiffusionPipelining blk
     , InspectLedger blk
     , HasHardForkHistory blk
     , HasCallStack
     )
  => Fuse m
  -> ChainDbEnv m blk
  -> m Void
addBlockRunner :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
 BlockSupportsDiffusionPipelining blk, InspectLedger blk,
 HasHardForkHistory blk, HasCallStack) =>
Fuse m -> ChainDbEnv m blk -> m Void
addBlockRunner Fuse m
fuse cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} = m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Void) -> m () -> m Void
forall a b. (a -> b) -> a -> b
$ do
    let trace :: TraceAddBlockEvent blk -> m ()
trace = Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ())
-> (TraceAddBlockEvent blk -> TraceEvent blk)
-> TraceAddBlockEvent blk
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TraceAddBlockEvent blk -> TraceEvent blk
forall blk. TraceAddBlockEvent blk -> TraceEvent blk
TraceAddBlockEvent
    TraceAddBlockEvent blk -> m ()
trace (TraceAddBlockEvent blk -> m ()) -> TraceAddBlockEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall blk. Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
PoppedBlockFromQueue Enclosing' (RealPoint blk)
forall a. Enclosing' a
RisingEdge
    -- if the `chainSelSync` does not complete because it was killed by an async
    -- exception (or it errored), notify the blocked thread
    Fuse m -> Electric m () -> m ()
forall (m :: * -> *) a.
(MonadThrow m, MonadMVar m) =>
Fuse m -> Electric m a -> m a
withFuse Fuse m
fuse (Electric m () -> m ()) -> Electric m () -> m ()
forall a b. (a -> b) -> a -> b
$
      Electric m (ChainSelMessage m blk)
-> (ChainSelMessage m blk -> Electric m ())
-> (ChainSelMessage m blk -> Electric m ())
-> Electric m ()
forall a b c.
Electric m a
-> (a -> Electric m b) -> (a -> Electric m c) -> Electric m c
forall (m :: * -> *) a b c.
MonadCatch m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError
        (m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk)
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk))
-> m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk)
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceChainSelStarvationEvent blk)
-> StrictTVar m ChainSelStarvation
-> ChainSelQueue m blk
-> m (ChainSelMessage m blk)
forall (m :: * -> *) blk.
(HasHeader blk, IOLike m) =>
Tracer m (TraceChainSelStarvationEvent blk)
-> StrictTVar m ChainSelStarvation
-> ChainSelQueue m blk
-> m (ChainSelMessage m blk)
getChainSelMessage Tracer m (TraceChainSelStarvationEvent blk)
starvationTracer StrictTVar m ChainSelStarvation
cdbChainSelStarvation ChainSelQueue m blk
cdbChainSelQueue)
        (\ChainSelMessage m blk
message -> m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          case ChainSelMessage m blk
message of
            ChainSelReprocessLoEBlocks StrictTMVar m ()
varProcessed ->
              STM m Bool -> STM m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM m Bool -> STM m ()) -> STM m Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m () -> () -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m ()
varProcessed ()
            ChainSelAddBlock BlockToAdd{StrictTMVar m Bool
varBlockWrittenToDisk :: StrictTMVar m Bool
varBlockWrittenToDisk :: forall (m :: * -> *) blk. BlockToAdd m blk -> StrictTMVar m Bool
varBlockWrittenToDisk, StrictTMVar m (AddBlockResult blk)
varBlockProcessed :: StrictTMVar m (AddBlockResult blk)
varBlockProcessed :: forall (m :: * -> *) blk.
BlockToAdd m blk -> StrictTMVar m (AddBlockResult blk)
varBlockProcessed} -> do
              _ <- StrictTMVar m Bool -> Bool -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m Bool
varBlockWrittenToDisk
                              Bool
False
              _ <- tryPutTMVar varBlockProcessed
                              (FailedToAddBlock "Failed to add block synchronously")
              pure ()
          ChainSelQueue m blk -> STM m ()
forall (m :: * -> *) blk.
IOLike m =>
ChainSelQueue m blk -> STM m ()
closeChainSelQueue ChainSelQueue m blk
cdbChainSelQueue)
        (\ChainSelMessage m blk
message -> do
          m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ case ChainSelMessage m blk
message of
            ChainSelReprocessLoEBlocks StrictTMVar m ()
_ ->
              TraceAddBlockEvent blk -> m ()
trace TraceAddBlockEvent blk
forall blk. TraceAddBlockEvent blk
PoppedReprocessLoEBlocksFromQueue
            ChainSelAddBlock BlockToAdd{blk
blockToAdd :: blk
blockToAdd :: forall (m :: * -> *) blk. BlockToAdd m blk -> blk
blockToAdd} ->
              TraceAddBlockEvent blk -> m ()
trace (TraceAddBlockEvent blk -> m ()) -> TraceAddBlockEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall blk. Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
PoppedBlockFromQueue (Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk)
-> Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> Enclosing' (RealPoint blk)
forall a. a -> Enclosing' a
FallingEdgeWith (RealPoint blk -> Enclosing' (RealPoint blk))
-> RealPoint blk -> Enclosing' (RealPoint blk)
forall a b. (a -> b) -> a -> b
$
                      blk -> RealPoint blk
forall blk. HasHeader blk => blk -> RealPoint blk
blockRealPoint blk
blockToAdd
          ChainDbEnv m blk -> ChainSelMessage m blk -> Electric m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
 BlockSupportsDiffusionPipelining blk, InspectLedger blk,
 HasHardForkHistory blk, HasCallStack) =>
ChainDbEnv m blk -> ChainSelMessage m blk -> Electric m ()
chainSelSync ChainDbEnv m blk
cdb ChainSelMessage m blk
message
          m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ ChainSelQueue m blk -> ChainSelMessage m blk -> STM m ()
forall (m :: * -> *) blk.
(IOLike m, HasHeader blk) =>
ChainSelQueue m blk -> ChainSelMessage m blk -> STM m ()
processedChainSelMessage ChainSelQueue m blk
cdbChainSelQueue ChainSelMessage m blk
message)
  where
    starvationTracer :: Tracer m (TraceChainSelStarvationEvent blk)
starvationTracer = (TraceChainSelStarvationEvent blk -> m ())
-> Tracer m (TraceChainSelStarvationEvent blk)
forall (m :: * -> *) a. (a -> m ()) -> Tracer m a
Tracer ((TraceChainSelStarvationEvent blk -> m ())
 -> Tracer m (TraceChainSelStarvationEvent blk))
-> (TraceChainSelStarvationEvent blk -> m ())
-> Tracer m (TraceChainSelStarvationEvent blk)
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ())
-> (TraceChainSelStarvationEvent blk -> TraceEvent blk)
-> TraceChainSelStarvationEvent blk
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TraceChainSelStarvationEvent blk -> TraceEvent blk
forall blk. TraceChainSelStarvationEvent blk -> TraceEvent blk
TraceChainSelStarvationEvent