{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
module Ouroboros.Consensus.Storage.ChainDB.Impl.Background (
launchBgTasks
, copyAndSnapshotRunner
, copyToImmutableDB
, garbageCollect
, GcParams (..)
, GcSchedule
, computeTimeForGC
, gcScheduleRunner
, newGcSchedule
, scheduleGC
, ScheduledGc (..)
, dumpGcSchedule
, addBlockRunner
) where
import Cardano.Ledger.BaseTypes (unNonZero)
import Control.Exception (assert)
import Control.Monad (forM_, forever, void)
import Control.Monad.Trans.Class (lift)
import Control.ResourceRegistry
import Control.Tracer
import Data.Foldable (toList)
import qualified Data.Map.Strict as Map
import Data.Sequence.Strict (StrictSeq (..))
import qualified Data.Sequence.Strict as Seq
import Data.Time.Clock
import Data.Void (Void)
import Data.Word
import GHC.Generics (Generic)
import GHC.Stack (HasCallStack)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.HardFork.Abstract
import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockResult (..),
BlockComponent (..))
import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel
(chainSelSync)
import Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import Ouroboros.Consensus.Util
import Ouroboros.Consensus.Util.Condense
import Ouroboros.Consensus.Util.Enclose (Enclosing' (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..))
import qualified Ouroboros.Network.AnchoredFragment as AF
launchBgTasks ::
forall m blk.
( IOLike m
, LedgerSupportsProtocol blk
, BlockSupportsDiffusionPipelining blk
, InspectLedger blk
, HasHardForkHistory blk
)
=> ChainDbEnv m blk
-> Word64
-> m ()
launchBgTasks :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk) =>
ChainDbEnv m blk -> Word64 -> m ()
launchBgTasks cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
..} Word64
replayed = do
!addBlockThread <- String -> m Void -> m (m ())
launch String
"ChainDB.addBlockRunner" (m Void -> m (m ())) -> m Void -> m (m ())
forall a b. (a -> b) -> a -> b
$
Fuse m -> ChainDbEnv m blk -> m Void
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk, HasCallStack) =>
Fuse m -> ChainDbEnv m blk -> m Void
addBlockRunner Fuse m
cdbChainSelFuse ChainDbEnv m blk
cdb
gcSchedule <- newGcSchedule
!gcThread <- launch "ChainDB.gcScheduleRunner" $
gcScheduleRunner gcSchedule $ garbageCollect cdb
!copyAndSnapshotThread <- launch "ChainDB.copyAndSnapshotRunner" $
copyAndSnapshotRunner cdb gcSchedule replayed cdbCopyFuse
atomically $ writeTVar cdbKillBgThreads $
sequence_ [addBlockThread, gcThread, copyAndSnapshotThread]
where
launch :: String -> m Void -> m (m ())
launch :: String -> m Void -> m (m ())
launch = (Thread m Void -> m ()) -> m (Thread m Void) -> m (m ())
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Thread m Void -> m ()
forall (m :: * -> *) a. MonadAsync m => Thread m a -> m ()
cancelThread (m (Thread m Void) -> m (m ()))
-> (String -> m Void -> m (Thread m Void))
-> String
-> m Void
-> m (m ())
forall y z x0 x1. (y -> z) -> (x0 -> x1 -> y) -> x0 -> x1 -> z
.: ResourceRegistry m -> String -> m Void -> m (Thread m Void)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
cdbRegistry
copyToImmutableDB ::
forall m blk.
( IOLike m
, ConsensusProtocol (BlockProtocol blk)
, HasHeader blk
, GetHeader blk
, HasCallStack
)
=> ChainDbEnv m blk
-> Electric m (WithOrigin SlotNo)
copyToImmutableDB :: forall (m :: * -> *) blk.
(IOLike m, ConsensusProtocol (BlockProtocol blk), HasHeader blk,
GetHeader blk, HasCallStack) =>
ChainDbEnv m blk -> Electric m (WithOrigin SlotNo)
copyToImmutableDB CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} = m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo)
forall {k} (m :: k -> *) (a :: k). m a -> Electric m a
electric (m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo))
-> m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ do
toCopy <- STM m [Point blk] -> m [Point blk]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m [Point blk] -> m [Point blk])
-> STM m [Point blk] -> m [Point blk]
forall a b. (a -> b) -> a -> b
$ do
curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
let nbToCopy = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChain Int -> Int -> Int
forall a. Num a => a -> a -> a
- Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (NonZero Word64 -> Word64
forall a. NonZero a -> a
unNonZero NonZero Word64
k))
toCopy :: [Point blk]
toCopy = (Header blk -> Point blk) -> [Header blk] -> [Point blk]
forall a b. (a -> b) -> [a] -> [b]
map Header blk -> Point blk
forall blk. HasHeader (Header blk) => Header blk -> Point blk
headerPoint
([Header blk] -> [Point blk]) -> [Header blk] -> [Point blk]
forall a b. (a -> b) -> a -> b
$ AnchoredFragment (Header blk) -> [Header blk]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst
(AnchoredFragment (Header blk) -> [Header blk])
-> AnchoredFragment (Header blk) -> [Header blk]
forall a b. (a -> b) -> a -> b
$ Int
-> AnchoredFragment (Header blk) -> AnchoredFragment (Header blk)
forall v a b.
Anchorable v a b =>
Int -> AnchoredSeq v a b -> AnchoredSeq v a b
AF.takeOldest Int
nbToCopy AnchoredFragment (Header blk)
curChain
return toCopy
if null toCopy
then trace NoBlocksToCopyToImmutableDB
else forM_ toCopy $ \Point blk
pt -> do
let hash :: HeaderHash blk
hash = case Point blk -> ChainHash blk
forall {k} (block :: k). Point block -> ChainHash block
pointHash Point blk
pt of
BlockHash HeaderHash blk
h -> HeaderHash blk
h
ChainHash blk
GenesisHash -> String -> HeaderHash blk
forall a. HasCallStack => String -> a
error String
"genesis block on current chain"
slotNoAtImmutableDBTip <- STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo))
-> STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ ImmutableDB m blk -> STM m (WithOrigin SlotNo)
forall (m :: * -> *) blk.
(MonadSTM m, HasCallStack) =>
ImmutableDB m blk -> STM m (WithOrigin SlotNo)
ImmutableDB.getTipSlot ImmutableDB m blk
cdbImmutableDB
assert (pointSlot pt >= slotNoAtImmutableDBTip) $ return ()
blk <- VolatileDB.getKnownBlockComponent cdbVolatileDB GetVerifiedBlock hash
ImmutableDB.appendBlock cdbImmutableDB blk
atomically $ removeFromChain pt
trace $ CopiedBlockToImmutableDB pt
atomically $ ImmutableDB.getTipSlot cdbImmutableDB
where
SecurityParam NonZero Word64
k = TopLevelConfig blk -> SecurityParam
forall blk.
ConsensusProtocol (BlockProtocol blk) =>
TopLevelConfig blk -> SecurityParam
configSecurityParam TopLevelConfig blk
cdbTopLevelConfig
trace :: TraceCopyToImmutableDBEvent blk -> m ()
trace = Tracer m (TraceCopyToImmutableDBEvent blk)
-> TraceCopyToImmutableDBEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith ((TraceCopyToImmutableDBEvent blk -> TraceEvent blk)
-> Tracer m (TraceEvent blk)
-> Tracer m (TraceCopyToImmutableDBEvent blk)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap TraceCopyToImmutableDBEvent blk -> TraceEvent blk
forall blk. TraceCopyToImmutableDBEvent blk -> TraceEvent blk
TraceCopyToImmutableDBEvent Tracer m (TraceEvent blk)
cdbTracer)
removeFromChain :: Point blk -> STM m ()
removeFromChain :: Point blk -> STM m ()
removeFromChain Point blk
pt = do
curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
case curChain of
Header blk
hdr :< AnchoredFragment (Header blk)
curChain'
| Header blk -> Point blk
forall blk. HasHeader (Header blk) => Header blk -> Point blk
headerPoint Header blk
hdr Point blk -> Point blk -> Bool
forall a. Eq a => a -> a -> Bool
== Point blk
pt
-> StrictTVar m (AnchoredFragment (Header blk))
-> AnchoredFragment (Header blk) -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain AnchoredFragment (Header blk)
curChain'
AnchoredFragment (Header blk)
_ -> String -> STM m ()
forall a. HasCallStack => String -> a
error String
"header to remove not on the current chain"
copyAndSnapshotRunner ::
forall m blk.
( IOLike m
, LedgerSupportsProtocol blk
)
=> ChainDbEnv m blk
-> GcSchedule m
-> Word64
-> Fuse m
-> m Void
copyAndSnapshotRunner :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk) =>
ChainDbEnv m blk -> GcSchedule m -> Word64 -> Fuse m -> m Void
copyAndSnapshotRunner cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} GcSchedule m
gcSchedule Word64
replayed Fuse m
fuse = do
LedgerDB' m blk -> m ()
forall (m :: * -> *) (l :: LedgerStateKind) blk.
LedgerDB m l blk -> m ()
LedgerDB.tryFlush LedgerDB' m blk
cdbLedgerDB
SnapCounters -> m Void
loop (SnapCounters -> m Void) -> m SnapCounters -> m Void
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< LedgerDB' m blk
-> (ExtLedgerState blk ~ ExtLedgerState blk) =>
Maybe (Time, Time) -> Word64 -> m SnapCounters
forall (m :: * -> *) (l :: LedgerStateKind) blk.
LedgerDB m l blk
-> (l ~ ExtLedgerState blk) =>
Maybe (Time, Time) -> Word64 -> m SnapCounters
LedgerDB.tryTakeSnapshot LedgerDB' m blk
cdbLedgerDB Maybe (Time, Time)
forall a. Maybe a
Nothing Word64
replayed
where
SecurityParam NonZero Word64
k = TopLevelConfig blk -> SecurityParam
forall blk.
ConsensusProtocol (BlockProtocol blk) =>
TopLevelConfig blk -> SecurityParam
configSecurityParam TopLevelConfig blk
cdbTopLevelConfig
loop :: LedgerDB.SnapCounters -> m Void
loop :: SnapCounters -> m Void
loop SnapCounters
counters = do
let LedgerDB.SnapCounters {
Maybe Time
prevSnapshotTime :: Maybe Time
prevSnapshotTime :: SnapCounters -> Maybe Time
prevSnapshotTime
, Word64
ntBlocksSinceLastSnap :: Word64
ntBlocksSinceLastSnap :: SnapCounters -> Word64
ntBlocksSinceLastSnap
} = SnapCounters
counters
numToWrite <- STM m Word64 -> m Word64
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Word64 -> m Word64) -> STM m Word64 -> m Word64
forall a b. (a -> b) -> a -> b
$ do
curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
check $ fromIntegral (AF.length curChain) > unNonZero k
return $ fromIntegral (AF.length curChain) - unNonZero k
withFuse fuse (copyToImmutableDB cdb) >>= scheduleGC'
LedgerDB.tryFlush cdbLedgerDB
now <- getMonotonicTime
let ntBlocksSinceLastSnap' = Word64
ntBlocksSinceLastSnap Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
numToWrite
loop =<< LedgerDB.tryTakeSnapshot cdbLedgerDB ((,now) <$> prevSnapshotTime) ntBlocksSinceLastSnap'
scheduleGC' :: WithOrigin SlotNo -> m ()
scheduleGC' :: WithOrigin SlotNo -> m ()
scheduleGC' WithOrigin SlotNo
Origin = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
scheduleGC' (NotOrigin SlotNo
slotNo) =
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
forall (m :: * -> *) blk.
IOLike m =>
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
scheduleGC
((TraceGCEvent blk -> TraceEvent blk)
-> Tracer m (TraceEvent blk) -> Tracer m (TraceGCEvent blk)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap TraceGCEvent blk -> TraceEvent blk
forall blk. TraceGCEvent blk -> TraceEvent blk
TraceGCEvent Tracer m (TraceEvent blk)
cdbTracer)
SlotNo
slotNo
GcParams {
gcDelay :: DiffTime
gcDelay = DiffTime
cdbGcDelay
, gcInterval :: DiffTime
gcInterval = DiffTime
cdbGcInterval
}
GcSchedule m
gcSchedule
garbageCollect :: forall m blk. IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect :: forall (m :: * -> *) blk.
IOLike m =>
ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} SlotNo
slotNo = do
VolatileDB m blk -> HasCallStack => SlotNo -> m ()
forall (m :: * -> *) blk.
VolatileDB m blk -> HasCallStack => SlotNo -> m ()
VolatileDB.garbageCollect VolatileDB m blk
cdbVolatileDB SlotNo
slotNo
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
LedgerDB' m blk -> SlotNo -> STM m ()
forall (m :: * -> *) (l :: LedgerStateKind) blk.
LedgerDB m l blk -> SlotNo -> STM m ()
LedgerDB.garbageCollect LedgerDB' m blk
cdbLedgerDB SlotNo
slotNo
StrictTVar m (WithFingerprint (InvalidBlocks blk))
-> (WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbInvalid ((WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> STM m ())
-> (WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ (InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
forall a b. (a -> b) -> WithFingerprint a -> WithFingerprint b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> (InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
forall a b. (a -> b) -> a -> b
$ (InvalidBlockInfo blk -> Bool)
-> InvalidBlocks blk -> InvalidBlocks blk
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter ((SlotNo -> SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
>= SlotNo
slotNo) (SlotNo -> Bool)
-> (InvalidBlockInfo blk -> SlotNo) -> InvalidBlockInfo blk -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InvalidBlockInfo blk -> SlotNo
forall blk. InvalidBlockInfo blk -> SlotNo
invalidBlockSlotNo)
Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ()) -> TraceEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ TraceGCEvent blk -> TraceEvent blk
forall blk. TraceGCEvent blk -> TraceEvent blk
TraceGCEvent (TraceGCEvent blk -> TraceEvent blk)
-> TraceGCEvent blk -> TraceEvent blk
forall a b. (a -> b) -> a -> b
$ SlotNo -> TraceGCEvent blk
forall blk. SlotNo -> TraceGCEvent blk
PerformedGC SlotNo
slotNo
newtype GcSchedule m = GcSchedule (StrictTVar m (StrictSeq ScheduledGc))
data ScheduledGc = ScheduledGc {
ScheduledGc -> Time
scheduledGcTime :: !Time
, ScheduledGc -> SlotNo
scheduledGcSlot :: !SlotNo
}
deriving (ScheduledGc -> ScheduledGc -> Bool
(ScheduledGc -> ScheduledGc -> Bool)
-> (ScheduledGc -> ScheduledGc -> Bool) -> Eq ScheduledGc
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ScheduledGc -> ScheduledGc -> Bool
== :: ScheduledGc -> ScheduledGc -> Bool
$c/= :: ScheduledGc -> ScheduledGc -> Bool
/= :: ScheduledGc -> ScheduledGc -> Bool
Eq, Int -> ScheduledGc -> ShowS
[ScheduledGc] -> ShowS
ScheduledGc -> String
(Int -> ScheduledGc -> ShowS)
-> (ScheduledGc -> String)
-> ([ScheduledGc] -> ShowS)
-> Show ScheduledGc
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ScheduledGc -> ShowS
showsPrec :: Int -> ScheduledGc -> ShowS
$cshow :: ScheduledGc -> String
show :: ScheduledGc -> String
$cshowList :: [ScheduledGc] -> ShowS
showList :: [ScheduledGc] -> ShowS
Show, (forall x. ScheduledGc -> Rep ScheduledGc x)
-> (forall x. Rep ScheduledGc x -> ScheduledGc)
-> Generic ScheduledGc
forall x. Rep ScheduledGc x -> ScheduledGc
forall x. ScheduledGc -> Rep ScheduledGc x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ScheduledGc -> Rep ScheduledGc x
from :: forall x. ScheduledGc -> Rep ScheduledGc x
$cto :: forall x. Rep ScheduledGc x -> ScheduledGc
to :: forall x. Rep ScheduledGc x -> ScheduledGc
Generic, Context -> ScheduledGc -> IO (Maybe ThunkInfo)
Proxy ScheduledGc -> String
(Context -> ScheduledGc -> IO (Maybe ThunkInfo))
-> (Context -> ScheduledGc -> IO (Maybe ThunkInfo))
-> (Proxy ScheduledGc -> String)
-> NoThunks ScheduledGc
forall a.
(Context -> a -> IO (Maybe ThunkInfo))
-> (Context -> a -> IO (Maybe ThunkInfo))
-> (Proxy a -> String)
-> NoThunks a
$cnoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
noThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
$cwNoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
wNoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
$cshowTypeOf :: Proxy ScheduledGc -> String
showTypeOf :: Proxy ScheduledGc -> String
NoThunks)
instance Condense ScheduledGc where
condense :: ScheduledGc -> String
condense (ScheduledGc Time
time SlotNo
slot) = (Time, SlotNo) -> String
forall a. Condense a => a -> String
condense (Time
time, SlotNo
slot)
data GcParams = GcParams {
GcParams -> DiffTime
gcDelay :: !DiffTime
, GcParams -> DiffTime
gcInterval :: !DiffTime
}
deriving (Int -> GcParams -> ShowS
[GcParams] -> ShowS
GcParams -> String
(Int -> GcParams -> ShowS)
-> (GcParams -> String) -> ([GcParams] -> ShowS) -> Show GcParams
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> GcParams -> ShowS
showsPrec :: Int -> GcParams -> ShowS
$cshow :: GcParams -> String
show :: GcParams -> String
$cshowList :: [GcParams] -> ShowS
showList :: [GcParams] -> ShowS
Show)
newGcSchedule :: IOLike m => m (GcSchedule m)
newGcSchedule :: forall (m :: * -> *). IOLike m => m (GcSchedule m)
newGcSchedule = StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m
forall (m :: * -> *).
StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m
GcSchedule (StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m)
-> m (StrictTVar m (StrictSeq ScheduledGc)) -> m (GcSchedule m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictSeq ScheduledGc -> m (StrictTVar m (StrictSeq ScheduledGc))
forall (m :: * -> *) a.
(HasCallStack, MonadSTM m, NoThunks a) =>
a -> m (StrictTVar m a)
newTVarIO StrictSeq ScheduledGc
forall a. StrictSeq a
Seq.empty
scheduleGC ::
forall m blk. IOLike m
=> Tracer m (TraceGCEvent blk)
-> SlotNo
-> GcParams
-> GcSchedule m
-> m ()
scheduleGC :: forall (m :: * -> *) blk.
IOLike m =>
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
scheduleGC Tracer m (TraceGCEvent blk)
tracer SlotNo
slotNo GcParams
gcParams (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) = do
timeScheduledForGC <- GcParams -> Time -> Time
computeTimeForGC GcParams
gcParams (Time -> Time) -> m Time -> m Time
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
atomically $ modifyTVar varQueue $ \case
StrictSeq ScheduledGc
queue' :|> ScheduledGc { scheduledGcTime :: ScheduledGc -> Time
scheduledGcTime = Time
lastTimeScheduledForGC }
| Time
timeScheduledForGC Time -> Time -> Bool
forall a. Eq a => a -> a -> Bool
== Time
lastTimeScheduledForGC
-> StrictSeq ScheduledGc
queue' StrictSeq ScheduledGc -> ScheduledGc -> StrictSeq ScheduledGc
forall a. StrictSeq a -> a -> StrictSeq a
:|> Time -> SlotNo -> ScheduledGc
ScheduledGc Time
timeScheduledForGC SlotNo
slotNo
StrictSeq ScheduledGc
queue
-> StrictSeq ScheduledGc
queue StrictSeq ScheduledGc -> ScheduledGc -> StrictSeq ScheduledGc
forall a. StrictSeq a -> a -> StrictSeq a
:|> Time -> SlotNo -> ScheduledGc
ScheduledGc Time
timeScheduledForGC SlotNo
slotNo
traceWith tracer $ ScheduledGC slotNo timeScheduledForGC
computeTimeForGC ::
GcParams
-> Time
-> Time
computeTimeForGC :: GcParams -> Time -> Time
computeTimeForGC GcParams { DiffTime
gcDelay :: GcParams -> DiffTime
gcDelay :: DiffTime
gcDelay, DiffTime
gcInterval :: GcParams -> DiffTime
gcInterval :: DiffTime
gcInterval } (Time DiffTime
now) =
DiffTime -> Time
Time (DiffTime -> Time) -> DiffTime -> Time
forall a b. (a -> b) -> a -> b
$ Integer -> DiffTime
picosecondsToDiffTime (Integer -> DiffTime) -> Integer -> DiffTime
forall a b. (a -> b) -> a -> b
$
Integer -> Integer -> Integer
forall a b. (Integral a, Integral b) => b -> a -> a
roundUpToInterval
(DiffTime -> Integer
diffTimeToPicoseconds DiffTime
gcInterval)
(DiffTime -> Integer
diffTimeToPicoseconds (DiffTime
now DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
+ DiffTime
gcDelay))
roundUpToInterval :: (Integral a, Integral b) => b -> a -> a
roundUpToInterval :: forall a b. (Integral a, Integral b) => b -> a -> a
roundUpToInterval b
interval a
x
| a
m a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
0
= a
d a -> a -> a
forall a. Num a => a -> a -> a
* b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
| Bool
otherwise
= (a
d a -> a -> a
forall a. Num a => a -> a -> a
+ a
1) a -> a -> a
forall a. Num a => a -> a -> a
* b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
where
(a
d, a
m) = a
x a -> a -> (a, a)
forall a. Integral a => a -> a -> (a, a)
`divMod` b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
gcScheduleRunner ::
forall m. IOLike m
=> GcSchedule m
-> (SlotNo -> m ())
-> m Void
gcScheduleRunner :: forall (m :: * -> *).
IOLike m =>
GcSchedule m -> (SlotNo -> m ()) -> m Void
gcScheduleRunner (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) SlotNo -> m ()
runGc = m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Void) -> m () -> m Void
forall a b. (a -> b) -> a -> b
$ do
timeScheduledForGC <- STM m Time -> m Time
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Time -> m Time) -> STM m Time -> m Time
forall a b. (a -> b) -> a -> b
$
StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue STM m (StrictSeq ScheduledGc)
-> (StrictSeq ScheduledGc -> STM m Time) -> STM m Time
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
StrictSeq ScheduledGc
Seq.Empty -> STM m Time
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
ScheduledGc { Time
scheduledGcTime :: ScheduledGc -> Time
scheduledGcTime :: Time
scheduledGcTime } :<| StrictSeq ScheduledGc
_ -> Time -> STM m Time
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Time
scheduledGcTime
currentTime <- getMonotonicTime
let toWait = DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
max DiffTime
0 (Time
timeScheduledForGC Time -> Time -> DiffTime
`diffTime` Time
currentTime)
threadDelay toWait
slotNo <- atomically $
readTVar varQueue >>= \case
ScheduledGc { SlotNo
scheduledGcSlot :: ScheduledGc -> SlotNo
scheduledGcSlot :: SlotNo
scheduledGcSlot } :<| StrictSeq ScheduledGc
queue' -> do
StrictTVar m (StrictSeq ScheduledGc)
-> StrictSeq ScheduledGc -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue StrictSeq ScheduledGc
queue'
SlotNo -> STM m SlotNo
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return SlotNo
scheduledGcSlot
StrictSeq ScheduledGc
Seq.Empty -> String -> STM m SlotNo
forall a. HasCallStack => String -> a
error String
"queue empty after waiting"
runGc slotNo
dumpGcSchedule :: IOLike m => GcSchedule m -> STM m [ScheduledGc]
dumpGcSchedule :: forall (m :: * -> *).
IOLike m =>
GcSchedule m -> STM m [ScheduledGc]
dumpGcSchedule (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) = StrictSeq ScheduledGc -> [ScheduledGc]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (StrictSeq ScheduledGc -> [ScheduledGc])
-> STM m (StrictSeq ScheduledGc) -> STM m [ScheduledGc]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue
addBlockRunner ::
( IOLike m
, LedgerSupportsProtocol blk
, BlockSupportsDiffusionPipelining blk
, InspectLedger blk
, HasHardForkHistory blk
, HasCallStack
)
=> Fuse m
-> ChainDbEnv m blk
-> m Void
addBlockRunner :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk, HasCallStack) =>
Fuse m -> ChainDbEnv m blk -> m Void
addBlockRunner Fuse m
fuse cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} = m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Void) -> m () -> m Void
forall a b. (a -> b) -> a -> b
$ do
let trace :: TraceAddBlockEvent blk -> m ()
trace = Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ())
-> (TraceAddBlockEvent blk -> TraceEvent blk)
-> TraceAddBlockEvent blk
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TraceAddBlockEvent blk -> TraceEvent blk
forall blk. TraceAddBlockEvent blk -> TraceEvent blk
TraceAddBlockEvent
TraceAddBlockEvent blk -> m ()
trace (TraceAddBlockEvent blk -> m ()) -> TraceAddBlockEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall blk. Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
PoppedBlockFromQueue Enclosing' (RealPoint blk)
forall a. Enclosing' a
RisingEdge
Fuse m -> Electric m () -> m ()
forall (m :: * -> *) a.
(MonadThrow m, MonadMVar m) =>
Fuse m -> Electric m a -> m a
withFuse Fuse m
fuse (Electric m () -> m ()) -> Electric m () -> m ()
forall a b. (a -> b) -> a -> b
$
Electric m (ChainSelMessage m blk)
-> (ChainSelMessage m blk -> Electric m ())
-> (ChainSelMessage m blk -> Electric m ())
-> Electric m ()
forall a b c.
Electric m a
-> (a -> Electric m b) -> (a -> Electric m c) -> Electric m c
forall (m :: * -> *) a b c.
MonadCatch m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError
(m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk)
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk))
-> m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk)
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceChainSelStarvationEvent blk)
-> StrictTVar m ChainSelStarvation
-> ChainSelQueue m blk
-> m (ChainSelMessage m blk)
forall (m :: * -> *) blk.
(HasHeader blk, IOLike m) =>
Tracer m (TraceChainSelStarvationEvent blk)
-> StrictTVar m ChainSelStarvation
-> ChainSelQueue m blk
-> m (ChainSelMessage m blk)
getChainSelMessage Tracer m (TraceChainSelStarvationEvent blk)
starvationTracer StrictTVar m ChainSelStarvation
cdbChainSelStarvation ChainSelQueue m blk
cdbChainSelQueue)
(\ChainSelMessage m blk
message -> m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
case ChainSelMessage m blk
message of
ChainSelReprocessLoEBlocks StrictTMVar m ()
varProcessed ->
STM m Bool -> STM m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM m Bool -> STM m ()) -> STM m Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m () -> () -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m ()
varProcessed ()
ChainSelAddBlock BlockToAdd{StrictTMVar m Bool
varBlockWrittenToDisk :: StrictTMVar m Bool
varBlockWrittenToDisk :: forall (m :: * -> *) blk. BlockToAdd m blk -> StrictTMVar m Bool
varBlockWrittenToDisk, StrictTMVar m (AddBlockResult blk)
varBlockProcessed :: StrictTMVar m (AddBlockResult blk)
varBlockProcessed :: forall (m :: * -> *) blk.
BlockToAdd m blk -> StrictTMVar m (AddBlockResult blk)
varBlockProcessed} -> do
_ <- StrictTMVar m Bool -> Bool -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m Bool
varBlockWrittenToDisk
Bool
False
_ <- tryPutTMVar varBlockProcessed
(FailedToAddBlock "Failed to add block synchronously")
pure ()
ChainSelQueue m blk -> STM m ()
forall (m :: * -> *) blk.
IOLike m =>
ChainSelQueue m blk -> STM m ()
closeChainSelQueue ChainSelQueue m blk
cdbChainSelQueue)
(\ChainSelMessage m blk
message -> do
m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ case ChainSelMessage m blk
message of
ChainSelReprocessLoEBlocks StrictTMVar m ()
_ ->
TraceAddBlockEvent blk -> m ()
trace TraceAddBlockEvent blk
forall blk. TraceAddBlockEvent blk
PoppedReprocessLoEBlocksFromQueue
ChainSelAddBlock BlockToAdd{blk
blockToAdd :: blk
blockToAdd :: forall (m :: * -> *) blk. BlockToAdd m blk -> blk
blockToAdd} ->
TraceAddBlockEvent blk -> m ()
trace (TraceAddBlockEvent blk -> m ()) -> TraceAddBlockEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall blk. Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
PoppedBlockFromQueue (Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk)
-> Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> Enclosing' (RealPoint blk)
forall a. a -> Enclosing' a
FallingEdgeWith (RealPoint blk -> Enclosing' (RealPoint blk))
-> RealPoint blk -> Enclosing' (RealPoint blk)
forall a b. (a -> b) -> a -> b
$
blk -> RealPoint blk
forall blk. HasHeader blk => blk -> RealPoint blk
blockRealPoint blk
blockToAdd
ChainDbEnv m blk -> ChainSelMessage m blk -> Electric m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk, HasCallStack) =>
ChainDbEnv m blk -> ChainSelMessage m blk -> Electric m ()
chainSelSync ChainDbEnv m blk
cdb ChainSelMessage m blk
message
m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ ChainSelQueue m blk -> ChainSelMessage m blk -> STM m ()
forall (m :: * -> *) blk.
(IOLike m, HasHeader blk) =>
ChainSelQueue m blk -> ChainSelMessage m blk -> STM m ()
processedChainSelMessage ChainSelQueue m blk
cdbChainSelQueue ChainSelMessage m blk
message)
where
starvationTracer :: Tracer m (TraceChainSelStarvationEvent blk)
starvationTracer = (TraceChainSelStarvationEvent blk -> m ())
-> Tracer m (TraceChainSelStarvationEvent blk)
forall (m :: * -> *) a. (a -> m ()) -> Tracer m a
Tracer ((TraceChainSelStarvationEvent blk -> m ())
-> Tracer m (TraceChainSelStarvationEvent blk))
-> (TraceChainSelStarvationEvent blk -> m ())
-> Tracer m (TraceChainSelStarvationEvent blk)
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ())
-> (TraceChainSelStarvationEvent blk -> TraceEvent blk)
-> TraceChainSelStarvationEvent blk
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TraceChainSelStarvationEvent blk -> TraceEvent blk
forall blk. TraceChainSelStarvationEvent blk -> TraceEvent blk
TraceChainSelStarvationEvent