{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
module Ouroboros.Consensus.Storage.ChainDB.Impl.Background
(
launchBgTasks
, copyToImmutableDB
, garbageCollectBlocks
, GcParams (..)
, GcSchedule
, computeTimeForGC
, gcScheduleRunner
, newGcSchedule
, scheduleGC
, ScheduledGc (..)
, dumpGcSchedule
, addBlockRunner
) where
import Control.Exception (assert)
import Control.Monad (forM_, forever, void)
import Control.Monad.Trans.Class (lift)
import Control.ResourceRegistry
import Control.Tracer
import Data.Foldable (toList)
import qualified Data.Map.Strict as Map
import Data.Sequence.Strict (StrictSeq (..))
import qualified Data.Sequence.Strict as Seq
import Data.Time.Clock
import Data.Void (Void)
import Data.Word
import GHC.Generics (Generic)
import GHC.Stack (HasCallStack)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.HardFork.Abstract
import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Storage.ChainDB.API
( AddBlockResult (..)
, BlockComponent (..)
)
import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel
( chainSelSync
)
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.Query as Query
import Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import Ouroboros.Consensus.Util
import Ouroboros.Consensus.Util.Condense
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher)
import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..))
import qualified Ouroboros.Network.AnchoredFragment as AF
launchBgTasks ::
forall m blk.
( IOLike m
, LedgerSupportsProtocol blk
, BlockSupportsDiffusionPipelining blk
, InspectLedger blk
, HasHardForkHistory blk
) =>
ChainDbEnv m blk ->
Word64 ->
m ()
launchBgTasks :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk) =>
ChainDbEnv m blk -> Word64 -> m ()
launchBgTasks cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (HeaderWithTime blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
StrictTVar m (InternalChain blk)
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (InternalChain blk)
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (InternalChain blk)
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
..} Word64
replayed = do
!addBlockThread <-
String -> m Void -> m (m ())
launch String
"ChainDB.addBlockRunner" (m Void -> m (m ())) -> m Void -> m (m ())
forall a b. (a -> b) -> a -> b
$
Fuse m -> ChainDbEnv m blk -> m Void
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk, HasCallStack) =>
Fuse m -> ChainDbEnv m blk -> m Void
addBlockRunner Fuse m
cdbChainSelFuse ChainDbEnv m blk
cdb
ledgerDbTasksTrigger <- newLedgerDbTasksTrigger replayed
!ledgerDbMaintenaceThread <-
forkLinkedWatcher cdbRegistry "ChainDB.ledgerDbTaskWatcher" $
ledgerDbTaskWatcher cdb ledgerDbTasksTrigger
gcSchedule <- newGcSchedule
!gcThread <-
launch "ChainDB.gcBlocksScheduleRunner" $
gcScheduleRunner gcSchedule $
garbageCollectBlocks cdb
!copyToImmutableDBThread <-
launch "ChainDB.copyToImmutableDBRunner" $
copyToImmutableDBRunner cdb ledgerDbTasksTrigger gcSchedule cdbCopyFuse
atomically $
writeTVar cdbKillBgThreads $
sequence_
[ addBlockThread
, cancelThread ledgerDbMaintenaceThread
, gcThread
, copyToImmutableDBThread
]
where
launch :: String -> m Void -> m (m ())
launch :: String -> m Void -> m (m ())
launch = (Thread m Void -> m ()) -> m (Thread m Void) -> m (m ())
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Thread m Void -> m ()
forall (m :: * -> *) a. MonadAsync m => Thread m a -> m ()
cancelThread (m (Thread m Void) -> m (m ()))
-> (String -> m Void -> m (Thread m Void))
-> String
-> m Void
-> m (m ())
forall y z x0 x1. (y -> z) -> (x0 -> x1 -> y) -> x0 -> x1 -> z
.: ResourceRegistry m -> String -> m Void -> m (Thread m Void)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
cdbRegistry
copyToImmutableDB ::
forall m blk.
( IOLike m
, ConsensusProtocol (BlockProtocol blk)
, HasHeader blk
, GetHeader blk
, HasCallStack
) =>
ChainDbEnv m blk ->
Electric m (WithOrigin SlotNo)
copyToImmutableDB :: forall (m :: * -> *) blk.
(IOLike m, ConsensusProtocol (BlockProtocol blk), HasHeader blk,
GetHeader blk, HasCallStack) =>
ChainDbEnv m blk -> Electric m (WithOrigin SlotNo)
copyToImmutableDB cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (HeaderWithTime blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
StrictTVar m (InternalChain blk)
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (InternalChain blk)
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (InternalChain blk)
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} = m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo)
forall {k} (m :: k -> *) (a :: k). m a -> Electric m a
electric (m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo))
-> m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ do
toCopy <- STM m [Point blk] -> m [Point blk]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m [Point blk] -> m [Point blk])
-> STM m [Point blk] -> m [Point blk]
forall a b. (a -> b) -> a -> b
$ do
curChain <- InternalChain blk -> AnchoredFragment (Header blk)
forall blk. InternalChain blk -> AnchoredFragment (Header blk)
icWithoutTime (InternalChain blk -> AnchoredFragment (Header blk))
-> STM m (InternalChain blk)
-> STM m (AnchoredFragment (Header blk))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (InternalChain blk) -> STM m (InternalChain blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalChain blk)
cdbChain
curChainVolSuffix <- Query.getCurrentChain cdb
let nbToCopy = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChain Int -> Int -> Int
forall a. Num a => a -> a -> a
- AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChainVolSuffix
toCopy :: [Point blk]
toCopy =
(Header blk -> Point blk) -> [Header blk] -> [Point blk]
forall a b. (a -> b) -> [a] -> [b]
map Header blk -> Point blk
forall blk. HasHeader (Header blk) => Header blk -> Point blk
headerPoint ([Header blk] -> [Point blk]) -> [Header blk] -> [Point blk]
forall a b. (a -> b) -> a -> b
$
AnchoredFragment (Header blk) -> [Header blk]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst (AnchoredFragment (Header blk) -> [Header blk])
-> AnchoredFragment (Header blk) -> [Header blk]
forall a b. (a -> b) -> a -> b
$
Int
-> AnchoredFragment (Header blk) -> AnchoredFragment (Header blk)
forall v a b.
Anchorable v a b =>
Int -> AnchoredSeq v a b -> AnchoredSeq v a b
AF.takeOldest Int
nbToCopy AnchoredFragment (Header blk)
curChain
return toCopy
if null toCopy
then trace NoBlocksToCopyToImmutableDB
else forM_ toCopy $ \Point blk
pt -> do
let hash :: HeaderHash blk
hash = case Point blk -> ChainHash blk
forall {k} (block :: k). Point block -> ChainHash block
pointHash Point blk
pt of
BlockHash HeaderHash blk
h -> HeaderHash blk
h
ChainHash blk
GenesisHash -> String -> HeaderHash blk
forall a. HasCallStack => String -> a
error String
"genesis block on current chain"
slotNoAtImmutableDBTip <- STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo))
-> STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ ImmutableDB m blk -> STM m (WithOrigin SlotNo)
forall (m :: * -> *) blk.
(MonadSTM m, HasCallStack) =>
ImmutableDB m blk -> STM m (WithOrigin SlotNo)
ImmutableDB.getTipSlot ImmutableDB m blk
cdbImmutableDB
assert (pointSlot pt >= slotNoAtImmutableDBTip) $ return ()
blk <- VolatileDB.getKnownBlockComponent cdbVolatileDB GetVerifiedBlock hash
ImmutableDB.appendBlock cdbImmutableDB blk
atomically $ removeFromChain pt
trace $ CopiedBlockToImmutableDB pt
atomically $ ImmutableDB.getTipSlot cdbImmutableDB
where
trace :: TraceCopyToImmutableDBEvent blk -> m ()
trace = Tracer m (TraceCopyToImmutableDBEvent blk)
-> TraceCopyToImmutableDBEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith ((TraceCopyToImmutableDBEvent blk -> TraceEvent blk)
-> Tracer m (TraceEvent blk)
-> Tracer m (TraceCopyToImmutableDBEvent blk)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap TraceCopyToImmutableDBEvent blk -> TraceEvent blk
forall blk. TraceCopyToImmutableDBEvent blk -> TraceEvent blk
TraceCopyToImmutableDBEvent Tracer m (TraceEvent blk)
cdbTracer)
removeFromChain :: Point blk -> STM m ()
removeFromChain :: Point blk -> STM m ()
removeFromChain Point blk
pt = do
StrictTVar m (InternalChain blk) -> STM m (InternalChain blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalChain blk)
cdbChain STM m (InternalChain blk)
-> (InternalChain blk -> STM m ()) -> STM m ()
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
InternalChain (Header blk
hdr :< AnchoredFragment (Header blk)
newChain) (HeaderWithTime blk
_hwt :< AnchoredFragment (HeaderWithTime blk)
newChainWithTime)
| Header blk -> Point blk
forall blk. HasHeader (Header blk) => Header blk -> Point blk
headerPoint Header blk
hdr Point blk -> Point blk -> Bool
forall a. Eq a => a -> a -> Bool
== Point blk
pt ->
StrictTVar m (InternalChain blk) -> InternalChain blk -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (InternalChain blk)
cdbChain (InternalChain blk -> STM m ()) -> InternalChain blk -> STM m ()
forall a b. (a -> b) -> a -> b
$ AnchoredFragment (Header blk)
-> AnchoredFragment (HeaderWithTime blk) -> InternalChain blk
forall blk.
AnchoredFragment (Header blk)
-> AnchoredFragment (HeaderWithTime blk) -> InternalChain blk
InternalChain AnchoredFragment (Header blk)
newChain AnchoredFragment (HeaderWithTime blk)
newChainWithTime
InternalChain blk
_ -> String -> STM m ()
forall a. HasCallStack => String -> a
error String
"header to remove not on the current chain"
copyToImmutableDBRunner ::
forall m blk.
( IOLike m
, LedgerSupportsProtocol blk
) =>
ChainDbEnv m blk ->
LedgerDbTasksTrigger m ->
GcSchedule m ->
Fuse m ->
m Void
copyToImmutableDBRunner :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk) =>
ChainDbEnv m blk
-> LedgerDbTasksTrigger m -> GcSchedule m -> Fuse m -> m Void
copyToImmutableDBRunner cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (HeaderWithTime blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
StrictTVar m (InternalChain blk)
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (InternalChain blk)
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (InternalChain blk)
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} LedgerDbTasksTrigger m
ledgerDbTasksTrigger GcSchedule m
gcSchedule Fuse m
fuse = do
LedgerDB' m blk -> m ()
forall (m :: * -> *) (l :: LedgerStateKind) blk.
LedgerDB m l blk -> m ()
LedgerDB.tryFlush LedgerDB' m blk
cdbLedgerDB
m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever m ()
copyAndTrigger
where
copyAndTrigger :: m ()
copyAndTrigger :: m ()
copyAndTrigger = do
numToWrite <- STM m Word64 -> m Word64
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Word64 -> m Word64) -> STM m Word64 -> m Word64
forall a b. (a -> b) -> a -> b
$ do
curChain <- InternalChain blk -> AnchoredFragment (Header blk)
forall blk. InternalChain blk -> AnchoredFragment (Header blk)
icWithoutTime (InternalChain blk -> AnchoredFragment (Header blk))
-> STM m (InternalChain blk)
-> STM m (AnchoredFragment (Header blk))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (InternalChain blk) -> STM m (InternalChain blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalChain blk)
cdbChain
curChainVolSuffix <- Query.getCurrentChain cdb
let numToWrite = AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChain Int -> Int -> Int
forall a. Num a => a -> a -> a
- AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChainVolSuffix
check $ numToWrite > 0
return $ fromIntegral numToWrite
gcSlotNo <- withFuse fuse (copyToImmutableDB cdb)
triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo numToWrite
scheduleGC' gcSlotNo
scheduleGC' :: WithOrigin SlotNo -> m ()
scheduleGC' :: WithOrigin SlotNo -> m ()
scheduleGC' WithOrigin SlotNo
Origin = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
scheduleGC' (NotOrigin SlotNo
slotNo) =
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
forall (m :: * -> *) blk.
IOLike m =>
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
scheduleGC
((TraceGCEvent blk -> TraceEvent blk)
-> Tracer m (TraceEvent blk) -> Tracer m (TraceGCEvent blk)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap TraceGCEvent blk -> TraceEvent blk
forall blk. TraceGCEvent blk -> TraceEvent blk
TraceGCEvent Tracer m (TraceEvent blk)
cdbTracer)
SlotNo
slotNo
GcParams
{ gcDelay :: DiffTime
gcDelay = DiffTime
cdbGcDelay
, gcInterval :: DiffTime
gcInterval = DiffTime
cdbGcInterval
}
GcSchedule m
gcSchedule
newtype LedgerDbTasksTrigger m
= LedgerDbTasksTrigger (StrictTVar m LedgerDbTaskState)
data LedgerDbTaskState = LedgerDbTaskState
{ LedgerDbTaskState -> WithOrigin SlotNo
ldbtsImmTip :: !(WithOrigin SlotNo)
, LedgerDbTaskState -> Maybe Time
ldbtsPrevSnapshotTime :: !(Maybe Time)
, LedgerDbTaskState -> Word64
ldbtsBlocksSinceLastSnapshot :: !Word64
}
deriving stock (forall x. LedgerDbTaskState -> Rep LedgerDbTaskState x)
-> (forall x. Rep LedgerDbTaskState x -> LedgerDbTaskState)
-> Generic LedgerDbTaskState
forall x. Rep LedgerDbTaskState x -> LedgerDbTaskState
forall x. LedgerDbTaskState -> Rep LedgerDbTaskState x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. LedgerDbTaskState -> Rep LedgerDbTaskState x
from :: forall x. LedgerDbTaskState -> Rep LedgerDbTaskState x
$cto :: forall x. Rep LedgerDbTaskState x -> LedgerDbTaskState
to :: forall x. Rep LedgerDbTaskState x -> LedgerDbTaskState
Generic
deriving anyclass Context -> LedgerDbTaskState -> IO (Maybe ThunkInfo)
Proxy LedgerDbTaskState -> String
(Context -> LedgerDbTaskState -> IO (Maybe ThunkInfo))
-> (Context -> LedgerDbTaskState -> IO (Maybe ThunkInfo))
-> (Proxy LedgerDbTaskState -> String)
-> NoThunks LedgerDbTaskState
forall a.
(Context -> a -> IO (Maybe ThunkInfo))
-> (Context -> a -> IO (Maybe ThunkInfo))
-> (Proxy a -> String)
-> NoThunks a
$cnoThunks :: Context -> LedgerDbTaskState -> IO (Maybe ThunkInfo)
noThunks :: Context -> LedgerDbTaskState -> IO (Maybe ThunkInfo)
$cwNoThunks :: Context -> LedgerDbTaskState -> IO (Maybe ThunkInfo)
wNoThunks :: Context -> LedgerDbTaskState -> IO (Maybe ThunkInfo)
$cshowTypeOf :: Proxy LedgerDbTaskState -> String
showTypeOf :: Proxy LedgerDbTaskState -> String
NoThunks
newLedgerDbTasksTrigger ::
IOLike m =>
Word64 ->
m (LedgerDbTasksTrigger m)
newLedgerDbTasksTrigger :: forall (m :: * -> *).
IOLike m =>
Word64 -> m (LedgerDbTasksTrigger m)
newLedgerDbTasksTrigger Word64
replayed = StrictTVar m LedgerDbTaskState -> LedgerDbTasksTrigger m
forall (m :: * -> *).
StrictTVar m LedgerDbTaskState -> LedgerDbTasksTrigger m
LedgerDbTasksTrigger (StrictTVar m LedgerDbTaskState -> LedgerDbTasksTrigger m)
-> m (StrictTVar m LedgerDbTaskState) -> m (LedgerDbTasksTrigger m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerDbTaskState -> m (StrictTVar m LedgerDbTaskState)
forall (m :: * -> *) a.
(HasCallStack, MonadSTM m, NoThunks a) =>
a -> m (StrictTVar m a)
newTVarIO LedgerDbTaskState
st
where
st :: LedgerDbTaskState
st =
LedgerDbTaskState
{ ldbtsImmTip :: WithOrigin SlotNo
ldbtsImmTip = WithOrigin SlotNo
forall t. WithOrigin t
Origin
, ldbtsPrevSnapshotTime :: Maybe Time
ldbtsPrevSnapshotTime = Maybe Time
forall a. Maybe a
Nothing
, ldbtsBlocksSinceLastSnapshot :: Word64
ldbtsBlocksSinceLastSnapshot = Word64
replayed
}
triggerLedgerDbTasks ::
forall m.
IOLike m =>
LedgerDbTasksTrigger m ->
WithOrigin SlotNo ->
Word64 ->
m ()
triggerLedgerDbTasks :: forall (m :: * -> *).
IOLike m =>
LedgerDbTasksTrigger m -> WithOrigin SlotNo -> Word64 -> m ()
triggerLedgerDbTasks (LedgerDbTasksTrigger StrictTVar m LedgerDbTaskState
varSt) WithOrigin SlotNo
immTip Word64
numWritten =
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m LedgerDbTaskState
-> (LedgerDbTaskState -> LedgerDbTaskState) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m LedgerDbTaskState
varSt ((LedgerDbTaskState -> LedgerDbTaskState) -> STM m ())
-> (LedgerDbTaskState -> LedgerDbTaskState) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \LedgerDbTaskState
st ->
LedgerDbTaskState
st
{ ldbtsImmTip = immTip
, ldbtsBlocksSinceLastSnapshot = ldbtsBlocksSinceLastSnapshot st + numWritten
}
ledgerDbTaskWatcher ::
forall m blk.
IOLike m =>
ChainDbEnv m blk ->
LedgerDbTasksTrigger m ->
Watcher m LedgerDbTaskState (WithOrigin SlotNo)
ledgerDbTaskWatcher :: forall (m :: * -> *) blk.
IOLike m =>
ChainDbEnv m blk
-> LedgerDbTasksTrigger m
-> Watcher m LedgerDbTaskState (WithOrigin SlotNo)
ledgerDbTaskWatcher CDB{m (LoE (AnchoredFragment (HeaderWithTime blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
StrictTVar m (InternalChain blk)
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (InternalChain blk)
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (InternalChain blk)
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} (LedgerDbTasksTrigger StrictTVar m LedgerDbTaskState
varSt) =
Watcher
{ wFingerprint :: LedgerDbTaskState -> WithOrigin SlotNo
wFingerprint = LedgerDbTaskState -> WithOrigin SlotNo
ldbtsImmTip
, wInitial :: Maybe (WithOrigin SlotNo)
wInitial = Maybe (WithOrigin SlotNo)
forall a. Maybe a
Nothing
, wReader :: STM m LedgerDbTaskState
wReader = StrictTVar m LedgerDbTaskState -> STM m LedgerDbTaskState
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m LedgerDbTaskState
varSt
, wNotify :: LedgerDbTaskState -> m ()
wNotify =
\LedgerDbTaskState
{ WithOrigin SlotNo
ldbtsImmTip :: LedgerDbTaskState -> WithOrigin SlotNo
ldbtsImmTip :: WithOrigin SlotNo
ldbtsImmTip
, ldbtsBlocksSinceLastSnapshot :: LedgerDbTaskState -> Word64
ldbtsBlocksSinceLastSnapshot = Word64
blocksSinceLast
, ldbtsPrevSnapshotTime :: LedgerDbTaskState -> Maybe Time
ldbtsPrevSnapshotTime = Maybe Time
prevSnapTime
} ->
Maybe SlotNo -> (SlotNo -> m ()) -> m ()
forall (f :: * -> *) a.
Applicative f =>
Maybe a -> (a -> f ()) -> f ()
whenJust (WithOrigin SlotNo -> Maybe SlotNo
forall t. WithOrigin t -> Maybe t
withOriginToMaybe WithOrigin SlotNo
ldbtsImmTip) ((SlotNo -> m ()) -> m ()) -> (SlotNo -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \SlotNo
slotNo -> do
LedgerDB' m blk -> m ()
forall (m :: * -> *) (l :: LedgerStateKind) blk.
LedgerDB m l blk -> m ()
LedgerDB.tryFlush LedgerDB' m blk
cdbLedgerDB
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
LedgerDB.SnapCounters
{ prevSnapshotTime
, ntBlocksSinceLastSnap
} <-
LedgerDB.tryTakeSnapshot
cdbLedgerDB
((,now) <$> prevSnapTime)
blocksSinceLast
atomically $ modifyTVar varSt $ \LedgerDbTaskState
st ->
LedgerDbTaskState
st
{ ldbtsBlocksSinceLastSnapshot =
ldbtsBlocksSinceLastSnapshot st - blocksSinceLast + ntBlocksSinceLastSnap
, ldbtsPrevSnapshotTime = prevSnapshotTime
}
LedgerDB.garbageCollect cdbLedgerDB slotNo
}
garbageCollectBlocks :: forall m blk. IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
garbageCollectBlocks :: forall (m :: * -> *) blk.
IOLike m =>
ChainDbEnv m blk -> SlotNo -> m ()
garbageCollectBlocks CDB{m (LoE (AnchoredFragment (HeaderWithTime blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
StrictTVar m (InternalChain blk)
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (InternalChain blk)
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (InternalChain blk)
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} SlotNo
slotNo = do
VolatileDB m blk -> HasCallStack => SlotNo -> m ()
forall (m :: * -> *) blk.
VolatileDB m blk -> HasCallStack => SlotNo -> m ()
VolatileDB.garbageCollect VolatileDB m blk
cdbVolatileDB SlotNo
slotNo
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
StrictTVar m (WithFingerprint (InvalidBlocks blk))
-> (WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbInvalid ((WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> STM m ())
-> (WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ (InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
forall a b. (a -> b) -> WithFingerprint a -> WithFingerprint b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> (InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
forall a b. (a -> b) -> a -> b
$ (InvalidBlockInfo blk -> Bool)
-> InvalidBlocks blk -> InvalidBlocks blk
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter ((SlotNo -> SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
>= SlotNo
slotNo) (SlotNo -> Bool)
-> (InvalidBlockInfo blk -> SlotNo) -> InvalidBlockInfo blk -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InvalidBlockInfo blk -> SlotNo
forall blk. InvalidBlockInfo blk -> SlotNo
invalidBlockSlotNo)
Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ()) -> TraceEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ TraceGCEvent blk -> TraceEvent blk
forall blk. TraceGCEvent blk -> TraceEvent blk
TraceGCEvent (TraceGCEvent blk -> TraceEvent blk)
-> TraceGCEvent blk -> TraceEvent blk
forall a b. (a -> b) -> a -> b
$ SlotNo -> TraceGCEvent blk
forall blk. SlotNo -> TraceGCEvent blk
PerformedGC SlotNo
slotNo
newtype GcSchedule m = GcSchedule (StrictTVar m (StrictSeq ScheduledGc))
data ScheduledGc = ScheduledGc
{ ScheduledGc -> Time
scheduledGcTime :: !Time
, ScheduledGc -> SlotNo
scheduledGcSlot :: !SlotNo
}
deriving (ScheduledGc -> ScheduledGc -> Bool
(ScheduledGc -> ScheduledGc -> Bool)
-> (ScheduledGc -> ScheduledGc -> Bool) -> Eq ScheduledGc
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ScheduledGc -> ScheduledGc -> Bool
== :: ScheduledGc -> ScheduledGc -> Bool
$c/= :: ScheduledGc -> ScheduledGc -> Bool
/= :: ScheduledGc -> ScheduledGc -> Bool
Eq, Int -> ScheduledGc -> ShowS
[ScheduledGc] -> ShowS
ScheduledGc -> String
(Int -> ScheduledGc -> ShowS)
-> (ScheduledGc -> String)
-> ([ScheduledGc] -> ShowS)
-> Show ScheduledGc
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ScheduledGc -> ShowS
showsPrec :: Int -> ScheduledGc -> ShowS
$cshow :: ScheduledGc -> String
show :: ScheduledGc -> String
$cshowList :: [ScheduledGc] -> ShowS
showList :: [ScheduledGc] -> ShowS
Show, (forall x. ScheduledGc -> Rep ScheduledGc x)
-> (forall x. Rep ScheduledGc x -> ScheduledGc)
-> Generic ScheduledGc
forall x. Rep ScheduledGc x -> ScheduledGc
forall x. ScheduledGc -> Rep ScheduledGc x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ScheduledGc -> Rep ScheduledGc x
from :: forall x. ScheduledGc -> Rep ScheduledGc x
$cto :: forall x. Rep ScheduledGc x -> ScheduledGc
to :: forall x. Rep ScheduledGc x -> ScheduledGc
Generic, Context -> ScheduledGc -> IO (Maybe ThunkInfo)
Proxy ScheduledGc -> String
(Context -> ScheduledGc -> IO (Maybe ThunkInfo))
-> (Context -> ScheduledGc -> IO (Maybe ThunkInfo))
-> (Proxy ScheduledGc -> String)
-> NoThunks ScheduledGc
forall a.
(Context -> a -> IO (Maybe ThunkInfo))
-> (Context -> a -> IO (Maybe ThunkInfo))
-> (Proxy a -> String)
-> NoThunks a
$cnoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
noThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
$cwNoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
wNoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
$cshowTypeOf :: Proxy ScheduledGc -> String
showTypeOf :: Proxy ScheduledGc -> String
NoThunks)
instance Condense ScheduledGc where
condense :: ScheduledGc -> String
condense (ScheduledGc Time
time SlotNo
slot) = (Time, SlotNo) -> String
forall a. Condense a => a -> String
condense (Time
time, SlotNo
slot)
data GcParams = GcParams
{ GcParams -> DiffTime
gcDelay :: !DiffTime
, GcParams -> DiffTime
gcInterval :: !DiffTime
}
deriving Int -> GcParams -> ShowS
[GcParams] -> ShowS
GcParams -> String
(Int -> GcParams -> ShowS)
-> (GcParams -> String) -> ([GcParams] -> ShowS) -> Show GcParams
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> GcParams -> ShowS
showsPrec :: Int -> GcParams -> ShowS
$cshow :: GcParams -> String
show :: GcParams -> String
$cshowList :: [GcParams] -> ShowS
showList :: [GcParams] -> ShowS
Show
newGcSchedule :: IOLike m => m (GcSchedule m)
newGcSchedule :: forall (m :: * -> *). IOLike m => m (GcSchedule m)
newGcSchedule = StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m
forall (m :: * -> *).
StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m
GcSchedule (StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m)
-> m (StrictTVar m (StrictSeq ScheduledGc)) -> m (GcSchedule m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictSeq ScheduledGc -> m (StrictTVar m (StrictSeq ScheduledGc))
forall (m :: * -> *) a.
(HasCallStack, MonadSTM m, NoThunks a) =>
a -> m (StrictTVar m a)
newTVarIO StrictSeq ScheduledGc
forall a. StrictSeq a
Seq.empty
scheduleGC ::
forall m blk.
IOLike m =>
Tracer m (TraceGCEvent blk) ->
SlotNo ->
GcParams ->
GcSchedule m ->
m ()
scheduleGC :: forall (m :: * -> *) blk.
IOLike m =>
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
scheduleGC Tracer m (TraceGCEvent blk)
tracer SlotNo
slotNo GcParams
gcParams (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) = do
timeScheduledForGC <- GcParams -> Time -> Time
computeTimeForGC GcParams
gcParams (Time -> Time) -> m Time -> m Time
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
atomically $ modifyTVar varQueue $ \case
StrictSeq ScheduledGc
queue' :|> ScheduledGc{scheduledGcTime :: ScheduledGc -> Time
scheduledGcTime = Time
lastTimeScheduledForGC}
| Time
timeScheduledForGC Time -> Time -> Bool
forall a. Eq a => a -> a -> Bool
== Time
lastTimeScheduledForGC ->
StrictSeq ScheduledGc
queue' StrictSeq ScheduledGc -> ScheduledGc -> StrictSeq ScheduledGc
forall a. StrictSeq a -> a -> StrictSeq a
:|> Time -> SlotNo -> ScheduledGc
ScheduledGc Time
timeScheduledForGC SlotNo
slotNo
StrictSeq ScheduledGc
queue ->
StrictSeq ScheduledGc
queue StrictSeq ScheduledGc -> ScheduledGc -> StrictSeq ScheduledGc
forall a. StrictSeq a -> a -> StrictSeq a
:|> Time -> SlotNo -> ScheduledGc
ScheduledGc Time
timeScheduledForGC SlotNo
slotNo
traceWith tracer $ ScheduledGC slotNo timeScheduledForGC
computeTimeForGC ::
GcParams ->
Time ->
Time
computeTimeForGC :: GcParams -> Time -> Time
computeTimeForGC GcParams{DiffTime
gcDelay :: GcParams -> DiffTime
gcDelay :: DiffTime
gcDelay, DiffTime
gcInterval :: GcParams -> DiffTime
gcInterval :: DiffTime
gcInterval} (Time DiffTime
now) =
DiffTime -> Time
Time (DiffTime -> Time) -> DiffTime -> Time
forall a b. (a -> b) -> a -> b
$
Integer -> DiffTime
picosecondsToDiffTime (Integer -> DiffTime) -> Integer -> DiffTime
forall a b. (a -> b) -> a -> b
$
Integer -> Integer -> Integer
forall a b. (Integral a, Integral b) => b -> a -> a
roundUpToInterval
(DiffTime -> Integer
diffTimeToPicoseconds DiffTime
gcInterval)
(DiffTime -> Integer
diffTimeToPicoseconds (DiffTime
now DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
+ DiffTime
gcDelay))
roundUpToInterval :: (Integral a, Integral b) => b -> a -> a
roundUpToInterval :: forall a b. (Integral a, Integral b) => b -> a -> a
roundUpToInterval b
interval a
x
| a
m a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
0 =
a
d a -> a -> a
forall a. Num a => a -> a -> a
* b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
| Bool
otherwise =
(a
d a -> a -> a
forall a. Num a => a -> a -> a
+ a
1) a -> a -> a
forall a. Num a => a -> a -> a
* b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
where
(a
d, a
m) = a
x a -> a -> (a, a)
forall a. Integral a => a -> a -> (a, a)
`divMod` b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
gcScheduleRunner ::
forall m.
IOLike m =>
GcSchedule m ->
(SlotNo -> m ()) ->
m Void
gcScheduleRunner :: forall (m :: * -> *).
IOLike m =>
GcSchedule m -> (SlotNo -> m ()) -> m Void
gcScheduleRunner (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) SlotNo -> m ()
runGc = m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Void) -> m () -> m Void
forall a b. (a -> b) -> a -> b
$ do
timeScheduledForGC <-
STM m Time -> m Time
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Time -> m Time) -> STM m Time -> m Time
forall a b. (a -> b) -> a -> b
$
StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue STM m (StrictSeq ScheduledGc)
-> (StrictSeq ScheduledGc -> STM m Time) -> STM m Time
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
StrictSeq ScheduledGc
Seq.Empty -> STM m Time
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
ScheduledGc{Time
scheduledGcTime :: ScheduledGc -> Time
scheduledGcTime :: Time
scheduledGcTime} :<| StrictSeq ScheduledGc
_ -> Time -> STM m Time
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Time
scheduledGcTime
currentTime <- getMonotonicTime
let toWait = DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
max DiffTime
0 (Time
timeScheduledForGC Time -> Time -> DiffTime
`diffTime` Time
currentTime)
threadDelay toWait
slotNo <-
atomically $
readTVar varQueue >>= \case
ScheduledGc{SlotNo
scheduledGcSlot :: ScheduledGc -> SlotNo
scheduledGcSlot :: SlotNo
scheduledGcSlot} :<| StrictSeq ScheduledGc
queue' -> do
StrictTVar m (StrictSeq ScheduledGc)
-> StrictSeq ScheduledGc -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue StrictSeq ScheduledGc
queue'
SlotNo -> STM m SlotNo
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return SlotNo
scheduledGcSlot
StrictSeq ScheduledGc
Seq.Empty -> String -> STM m SlotNo
forall a. HasCallStack => String -> a
error String
"queue empty after waiting"
runGc slotNo
dumpGcSchedule :: IOLike m => GcSchedule m -> STM m [ScheduledGc]
dumpGcSchedule :: forall (m :: * -> *).
IOLike m =>
GcSchedule m -> STM m [ScheduledGc]
dumpGcSchedule (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) = StrictSeq ScheduledGc -> [ScheduledGc]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (StrictSeq ScheduledGc -> [ScheduledGc])
-> STM m (StrictSeq ScheduledGc) -> STM m [ScheduledGc]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue
addBlockRunner ::
( IOLike m
, LedgerSupportsProtocol blk
, BlockSupportsDiffusionPipelining blk
, InspectLedger blk
, HasHardForkHistory blk
, HasCallStack
) =>
Fuse m ->
ChainDbEnv m blk ->
m Void
addBlockRunner :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk, HasCallStack) =>
Fuse m -> ChainDbEnv m blk -> m Void
addBlockRunner Fuse m
fuse cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (HeaderWithTime blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m ChainSelStarvation
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
StrictTVar m (InternalChain blk)
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LedgerDB' m blk
ChainSelQueue m blk
cdbChainSelStarvation :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m ChainSelStarvation
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (InternalChain blk)
cdbLedgerDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LedgerDB' m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLedgerDB :: LedgerDB' m blk
cdbChain :: StrictTVar m (InternalChain blk)
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (HeaderWithTime blk)))
cdbChainSelStarvation :: StrictTVar m ChainSelStarvation
..} = m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Void) -> m () -> m Void
forall a b. (a -> b) -> a -> b
$ do
let trace :: TraceAddBlockEvent blk -> m ()
trace = Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ())
-> (TraceAddBlockEvent blk -> TraceEvent blk)
-> TraceAddBlockEvent blk
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TraceAddBlockEvent blk -> TraceEvent blk
forall blk. TraceAddBlockEvent blk -> TraceEvent blk
TraceAddBlockEvent
TraceAddBlockEvent blk -> m ()
trace TraceAddBlockEvent blk
forall blk. TraceAddBlockEvent blk
PoppingFromQueue
Fuse m -> Electric m () -> m ()
forall (m :: * -> *) a.
(MonadThrow m, MonadMVar m) =>
Fuse m -> Electric m a -> m a
withFuse Fuse m
fuse (Electric m () -> m ()) -> Electric m () -> m ()
forall a b. (a -> b) -> a -> b
$
Electric m (ChainSelMessage m blk)
-> (ChainSelMessage m blk -> Electric m ())
-> (ChainSelMessage m blk -> Electric m ())
-> Electric m ()
forall a b c.
Electric m a
-> (a -> Electric m b) -> (a -> Electric m c) -> Electric m c
forall (m :: * -> *) a b c.
MonadCatch m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError
(m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk)
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk))
-> m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk)
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceChainSelStarvationEvent blk)
-> StrictTVar m ChainSelStarvation
-> ChainSelQueue m blk
-> m (ChainSelMessage m blk)
forall (m :: * -> *) blk.
(HasHeader blk, IOLike m) =>
Tracer m (TraceChainSelStarvationEvent blk)
-> StrictTVar m ChainSelStarvation
-> ChainSelQueue m blk
-> m (ChainSelMessage m blk)
getChainSelMessage Tracer m (TraceChainSelStarvationEvent blk)
starvationTracer StrictTVar m ChainSelStarvation
cdbChainSelStarvation ChainSelQueue m blk
cdbChainSelQueue)
( \ChainSelMessage m blk
message -> m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
case ChainSelMessage m blk
message of
ChainSelReprocessLoEBlocks StrictTMVar m ()
varProcessed ->
STM m Bool -> STM m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM m Bool -> STM m ()) -> STM m Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m () -> () -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m ()
varProcessed ()
ChainSelAddBlock BlockToAdd{StrictTMVar m Bool
varBlockWrittenToDisk :: StrictTMVar m Bool
varBlockWrittenToDisk :: forall (m :: * -> *) blk. BlockToAdd m blk -> StrictTMVar m Bool
varBlockWrittenToDisk, StrictTMVar m (AddBlockResult blk)
varBlockProcessed :: StrictTMVar m (AddBlockResult blk)
varBlockProcessed :: forall (m :: * -> *) blk.
BlockToAdd m blk -> StrictTMVar m (AddBlockResult blk)
varBlockProcessed} -> do
_ <-
StrictTMVar m Bool -> Bool -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar
StrictTMVar m Bool
varBlockWrittenToDisk
Bool
False
_ <-
tryPutTMVar
varBlockProcessed
(FailedToAddBlock "Failed to add block synchronously")
pure ()
ChainSelQueue m blk -> STM m ()
forall (m :: * -> *) blk.
IOLike m =>
ChainSelQueue m blk -> STM m ()
closeChainSelQueue ChainSelQueue m blk
cdbChainSelQueue
)
( \ChainSelMessage m blk
message -> do
m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ case ChainSelMessage m blk
message of
ChainSelReprocessLoEBlocks StrictTMVar m ()
_ ->
TraceAddBlockEvent blk -> m ()
trace TraceAddBlockEvent blk
forall blk. TraceAddBlockEvent blk
PoppedReprocessLoEBlocksFromQueue
ChainSelAddBlock BlockToAdd{blk
blockToAdd :: blk
blockToAdd :: forall (m :: * -> *) blk. BlockToAdd m blk -> blk
blockToAdd} ->
TraceAddBlockEvent blk -> m ()
trace (TraceAddBlockEvent blk -> m ()) -> TraceAddBlockEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> TraceAddBlockEvent blk
forall blk. RealPoint blk -> TraceAddBlockEvent blk
PoppedBlockFromQueue (RealPoint blk -> TraceAddBlockEvent blk)
-> RealPoint blk -> TraceAddBlockEvent blk
forall a b. (a -> b) -> a -> b
$ blk -> RealPoint blk
forall blk. HasHeader blk => blk -> RealPoint blk
blockRealPoint blk
blockToAdd
ChainDbEnv m blk -> ChainSelMessage m blk -> Electric m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk, HasCallStack) =>
ChainDbEnv m blk -> ChainSelMessage m blk -> Electric m ()
chainSelSync ChainDbEnv m blk
cdb ChainSelMessage m blk
message
m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ ChainSelQueue m blk -> ChainSelMessage m blk -> STM m ()
forall (m :: * -> *) blk.
(IOLike m, HasHeader blk) =>
ChainSelQueue m blk -> ChainSelMessage m blk -> STM m ()
processedChainSelMessage ChainSelQueue m blk
cdbChainSelQueue ChainSelMessage m blk
message
)
where
starvationTracer :: Tracer m (TraceChainSelStarvationEvent blk)
starvationTracer = (TraceChainSelStarvationEvent blk -> m ())
-> Tracer m (TraceChainSelStarvationEvent blk)
forall (m :: * -> *) a. (a -> m ()) -> Tracer m a
Tracer ((TraceChainSelStarvationEvent blk -> m ())
-> Tracer m (TraceChainSelStarvationEvent blk))
-> (TraceChainSelStarvationEvent blk -> m ())
-> Tracer m (TraceChainSelStarvationEvent blk)
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ())
-> (TraceChainSelStarvationEvent blk -> TraceEvent blk)
-> TraceChainSelStarvationEvent blk
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TraceChainSelStarvationEvent blk -> TraceEvent blk
forall blk. TraceChainSelStarvationEvent blk -> TraceEvent blk
TraceChainSelStarvationEvent