{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Consensus.Storage.ChainDB.Impl.Background (
launchBgTasks
, copyAndSnapshotRunner
, copyToImmutableDB
, updateLedgerSnapshots
, garbageCollect
, GcParams (..)
, GcSchedule
, computeTimeForGC
, gcScheduleRunner
, newGcSchedule
, scheduleGC
, ScheduledGc (..)
, dumpGcSchedule
, addBlockRunner
) where
import Control.Exception (assert)
import Control.Monad (forM_, forever, void)
import Control.Monad.Trans.Class (lift)
import Control.ResourceRegistry
import Control.Tracer
import Data.Foldable (toList)
import qualified Data.Map.Strict as Map
import Data.Sequence.Strict (StrictSeq (..))
import qualified Data.Sequence.Strict as Seq
import Data.Time.Clock
import Data.Void (Void)
import Data.Word
import GHC.Generics (Generic)
import GHC.Stack (HasCallStack)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.HardFork.Abstract
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockResult (..),
BlockComponent (..))
import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel
(chainSelSync)
import Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB
(LgrDbSerialiseConstraints)
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB as LgrDB
import Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import Ouroboros.Consensus.Storage.LedgerDB (TimeSinceLast (..))
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import Ouroboros.Consensus.Util
import Ouroboros.Consensus.Util.Condense
import Ouroboros.Consensus.Util.Enclose (Enclosing' (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..))
import qualified Ouroboros.Network.AnchoredFragment as AF
launchBgTasks ::
forall m blk.
( IOLike m
, LedgerSupportsProtocol blk
, BlockSupportsDiffusionPipelining blk
, InspectLedger blk
, HasHardForkHistory blk
, LgrDbSerialiseConstraints blk
)
=> ChainDbEnv m blk
-> Word64
-> m ()
launchBgTasks :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk, LgrDbSerialiseConstraints blk) =>
ChainDbEnv m blk -> Word64 -> m ()
launchBgTasks cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
..} Word64
replayed = do
!m ()
addBlockThread <- String -> m Void -> m (m ())
launch String
"ChainDB.addBlockRunner" (m Void -> m (m ())) -> m Void -> m (m ())
forall a b. (a -> b) -> a -> b
$
Fuse m -> ChainDbEnv m blk -> m Void
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk, HasCallStack) =>
Fuse m -> ChainDbEnv m blk -> m Void
addBlockRunner Fuse m
cdbChainSelFuse ChainDbEnv m blk
cdb
GcSchedule m
gcSchedule <- m (GcSchedule m)
forall (m :: * -> *). IOLike m => m (GcSchedule m)
newGcSchedule
!m ()
gcThread <- String -> m Void -> m (m ())
launch String
"ChainDB.gcScheduleRunner" (m Void -> m (m ())) -> m Void -> m (m ())
forall a b. (a -> b) -> a -> b
$
GcSchedule m -> (SlotNo -> m ()) -> m Void
forall (m :: * -> *).
IOLike m =>
GcSchedule m -> (SlotNo -> m ()) -> m Void
gcScheduleRunner GcSchedule m
gcSchedule ((SlotNo -> m ()) -> m Void) -> (SlotNo -> m ()) -> m Void
forall a b. (a -> b) -> a -> b
$ ChainDbEnv m blk -> SlotNo -> m ()
forall (m :: * -> *) blk.
IOLike m =>
ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect ChainDbEnv m blk
cdb
!m ()
copyAndSnapshotThread <- String -> m Void -> m (m ())
launch String
"ChainDB.copyAndSnapshotRunner" (m Void -> m (m ())) -> m Void -> m (m ())
forall a b. (a -> b) -> a -> b
$
ChainDbEnv m blk -> GcSchedule m -> Word64 -> Fuse m -> m Void
forall (m :: * -> *) blk.
(IOLike m, ConsensusProtocol (BlockProtocol blk), HasHeader blk,
GetHeader blk, IsLedger (LedgerState blk),
LgrDbSerialiseConstraints blk) =>
ChainDbEnv m blk -> GcSchedule m -> Word64 -> Fuse m -> m Void
copyAndSnapshotRunner ChainDbEnv m blk
cdb GcSchedule m
gcSchedule Word64
replayed Fuse m
cdbCopyFuse
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (m ()) -> m () -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (m ())
cdbKillBgThreads (m () -> STM m ()) -> m () -> STM m ()
forall a b. (a -> b) -> a -> b
$
[m ()] -> m ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [m ()
addBlockThread, m ()
gcThread, m ()
copyAndSnapshotThread]
where
launch :: String -> m Void -> m (m ())
launch :: String -> m Void -> m (m ())
launch = (Thread m Void -> m ()) -> m (Thread m Void) -> m (m ())
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Thread m Void -> m ()
forall (m :: * -> *) a. MonadAsync m => Thread m a -> m ()
cancelThread (m (Thread m Void) -> m (m ()))
-> (String -> m Void -> m (Thread m Void))
-> String
-> m Void
-> m (m ())
forall y z x0 x1. (y -> z) -> (x0 -> x1 -> y) -> x0 -> x1 -> z
.: ResourceRegistry m -> String -> m Void -> m (Thread m Void)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
cdbRegistry
copyToImmutableDB ::
forall m blk.
( IOLike m
, ConsensusProtocol (BlockProtocol blk)
, HasHeader blk
, GetHeader blk
, HasCallStack
)
=> ChainDbEnv m blk
-> Electric m (WithOrigin SlotNo)
copyToImmutableDB :: forall (m :: * -> *) blk.
(IOLike m, ConsensusProtocol (BlockProtocol blk), HasHeader blk,
GetHeader blk, HasCallStack) =>
ChainDbEnv m blk -> Electric m (WithOrigin SlotNo)
copyToImmutableDB CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
..} = m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo)
forall {k} (m :: k -> *) (a :: k). m a -> Electric m a
electric (m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo))
-> m (WithOrigin SlotNo) -> Electric m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ do
[Point blk]
toCopy <- STM m [Point blk] -> m [Point blk]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m [Point blk] -> m [Point blk])
-> STM m [Point blk] -> m [Point blk]
forall a b. (a -> b) -> a -> b
$ do
AnchoredFragment (Header blk)
curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
let nbToCopy :: Int
nbToCopy = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChain Int -> Int -> Int
forall a. Num a => a -> a -> a
- Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
k)
toCopy :: [Point blk]
toCopy :: [Point blk]
toCopy = (Header blk -> Point blk) -> [Header blk] -> [Point blk]
forall a b. (a -> b) -> [a] -> [b]
map Header blk -> Point blk
forall blk. HasHeader (Header blk) => Header blk -> Point blk
headerPoint
([Header blk] -> [Point blk]) -> [Header blk] -> [Point blk]
forall a b. (a -> b) -> a -> b
$ AnchoredFragment (Header blk) -> [Header blk]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst
(AnchoredFragment (Header blk) -> [Header blk])
-> AnchoredFragment (Header blk) -> [Header blk]
forall a b. (a -> b) -> a -> b
$ Int
-> AnchoredFragment (Header blk) -> AnchoredFragment (Header blk)
forall v a b.
Anchorable v a b =>
Int -> AnchoredSeq v a b -> AnchoredSeq v a b
AF.takeOldest Int
nbToCopy AnchoredFragment (Header blk)
curChain
[Point blk] -> STM m [Point blk]
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return [Point blk]
toCopy
if [Point blk] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Point blk]
toCopy
then TraceCopyToImmutableDBEvent blk -> m ()
trace TraceCopyToImmutableDBEvent blk
forall blk. TraceCopyToImmutableDBEvent blk
NoBlocksToCopyToImmutableDB
else [Point blk] -> (Point blk -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Point blk]
toCopy ((Point blk -> m ()) -> m ()) -> (Point blk -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Point blk
pt -> do
let hash :: HeaderHash blk
hash = case Point blk -> ChainHash blk
forall {k} (block :: k). Point block -> ChainHash block
pointHash Point blk
pt of
BlockHash HeaderHash blk
h -> HeaderHash blk
h
ChainHash blk
GenesisHash -> String -> HeaderHash blk
forall a. HasCallStack => String -> a
error String
"genesis block on current chain"
WithOrigin SlotNo
slotNoAtImmutableDBTip <- STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo))
-> STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ ImmutableDB m blk -> STM m (WithOrigin SlotNo)
forall (m :: * -> *) blk.
(MonadSTM m, HasCallStack) =>
ImmutableDB m blk -> STM m (WithOrigin SlotNo)
ImmutableDB.getTipSlot ImmutableDB m blk
cdbImmutableDB
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Point blk -> WithOrigin SlotNo
forall {k} (block :: k). Point block -> WithOrigin SlotNo
pointSlot Point blk
pt WithOrigin SlotNo -> WithOrigin SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
>= WithOrigin SlotNo
slotNoAtImmutableDBTip) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
blk
blk <- VolatileDB m blk
-> BlockComponent blk blk -> HeaderHash blk -> m blk
forall (m :: * -> *) blk b.
(MonadThrow m, HasHeader blk) =>
VolatileDB m blk -> BlockComponent blk b -> HeaderHash blk -> m b
VolatileDB.getKnownBlockComponent VolatileDB m blk
cdbVolatileDB BlockComponent blk blk
forall blk. BlockComponent blk blk
GetVerifiedBlock HeaderHash blk
hash
ImmutableDB m blk -> blk -> m ()
forall (m :: * -> *) blk.
HasCallStack =>
ImmutableDB m blk -> blk -> m ()
ImmutableDB.appendBlock ImmutableDB m blk
cdbImmutableDB blk
blk
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Point blk -> STM m ()
removeFromChain Point blk
pt
TraceCopyToImmutableDBEvent blk -> m ()
trace (TraceCopyToImmutableDBEvent blk -> m ())
-> TraceCopyToImmutableDBEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ Point blk -> TraceCopyToImmutableDBEvent blk
forall blk. Point blk -> TraceCopyToImmutableDBEvent blk
CopiedBlockToImmutableDB Point blk
pt
STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo))
-> STM m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall a b. (a -> b) -> a -> b
$ ImmutableDB m blk -> STM m (WithOrigin SlotNo)
forall (m :: * -> *) blk.
(MonadSTM m, HasCallStack) =>
ImmutableDB m blk -> STM m (WithOrigin SlotNo)
ImmutableDB.getTipSlot ImmutableDB m blk
cdbImmutableDB
where
SecurityParam Word64
k = TopLevelConfig blk -> SecurityParam
forall blk.
ConsensusProtocol (BlockProtocol blk) =>
TopLevelConfig blk -> SecurityParam
configSecurityParam TopLevelConfig blk
cdbTopLevelConfig
trace :: TraceCopyToImmutableDBEvent blk -> m ()
trace = Tracer m (TraceCopyToImmutableDBEvent blk)
-> TraceCopyToImmutableDBEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith ((TraceCopyToImmutableDBEvent blk -> TraceEvent blk)
-> Tracer m (TraceEvent blk)
-> Tracer m (TraceCopyToImmutableDBEvent blk)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap TraceCopyToImmutableDBEvent blk -> TraceEvent blk
forall blk. TraceCopyToImmutableDBEvent blk -> TraceEvent blk
TraceCopyToImmutableDBEvent Tracer m (TraceEvent blk)
cdbTracer)
removeFromChain :: Point blk -> STM m ()
removeFromChain :: Point blk -> STM m ()
removeFromChain Point blk
pt = do
AnchoredFragment (Header blk)
curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
case AnchoredFragment (Header blk)
curChain of
Header blk
hdr :< AnchoredFragment (Header blk)
curChain'
| Header blk -> Point blk
forall blk. HasHeader (Header blk) => Header blk -> Point blk
headerPoint Header blk
hdr Point blk -> Point blk -> Bool
forall a. Eq a => a -> a -> Bool
== Point blk
pt
-> StrictTVar m (AnchoredFragment (Header blk))
-> AnchoredFragment (Header blk) -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain AnchoredFragment (Header blk)
curChain'
AnchoredFragment (Header blk)
_ -> String -> STM m ()
forall a. HasCallStack => String -> a
error String
"header to remove not on the current chain"
copyAndSnapshotRunner ::
forall m blk.
( IOLike m
, ConsensusProtocol (BlockProtocol blk)
, HasHeader blk
, GetHeader blk
, IsLedger (LedgerState blk)
, LgrDbSerialiseConstraints blk
)
=> ChainDbEnv m blk
-> GcSchedule m
-> Word64
-> Fuse m
-> m Void
copyAndSnapshotRunner :: forall (m :: * -> *) blk.
(IOLike m, ConsensusProtocol (BlockProtocol blk), HasHeader blk,
GetHeader blk, IsLedger (LedgerState blk),
LgrDbSerialiseConstraints blk) =>
ChainDbEnv m blk -> GcSchedule m -> Word64 -> Fuse m -> m Void
copyAndSnapshotRunner cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
..} GcSchedule m
gcSchedule Word64
replayed Fuse m
fuse =
if TimeSinceLast DiffTime -> Word64 -> Bool
onDiskShouldTakeSnapshot TimeSinceLast DiffTime
forall time. TimeSinceLast time
NoSnapshotTakenYet Word64
replayed then do
ChainDbEnv m blk -> m ()
forall (m :: * -> *) blk.
(IOLike m, LgrDbSerialiseConstraints blk, HasHeader blk,
IsLedger (LedgerState blk)) =>
ChainDbEnv m blk -> m ()
updateLedgerSnapshots ChainDbEnv m blk
cdb
Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
TimeSinceLast Time -> Word64 -> m Void
loop (Time -> TimeSinceLast Time
forall time. time -> TimeSinceLast time
TimeSinceLast Time
now) Word64
0
else
TimeSinceLast Time -> Word64 -> m Void
loop TimeSinceLast Time
forall time. TimeSinceLast time
NoSnapshotTakenYet Word64
replayed
where
SecurityParam Word64
k = TopLevelConfig blk -> SecurityParam
forall blk.
ConsensusProtocol (BlockProtocol blk) =>
TopLevelConfig blk -> SecurityParam
configSecurityParam TopLevelConfig blk
cdbTopLevelConfig
LgrDB.DiskPolicy{Word
Flag "DoDiskSnapshotChecksum"
TimeSinceLast DiffTime -> Word64 -> Bool
onDiskShouldTakeSnapshot :: TimeSinceLast DiffTime -> Word64 -> Bool
onDiskNumSnapshots :: Word
onDiskShouldChecksumSnapshots :: Flag "DoDiskSnapshotChecksum"
onDiskNumSnapshots :: DiskPolicy -> Word
onDiskShouldTakeSnapshot :: DiskPolicy -> TimeSinceLast DiffTime -> Word64 -> Bool
onDiskShouldChecksumSnapshots :: DiskPolicy -> Flag "DoDiskSnapshotChecksum"
..} = LgrDB m blk -> DiskPolicy
forall (m :: * -> *) blk. LgrDB m blk -> DiskPolicy
LgrDB.getDiskPolicy LgrDB m blk
cdbLgrDB
loop :: TimeSinceLast Time -> Word64 -> m Void
loop :: TimeSinceLast Time -> Word64 -> m Void
loop TimeSinceLast Time
mPrevSnapshot Word64
distance = do
Word64
numToWrite <- STM m Word64 -> m Word64
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Word64 -> m Word64) -> STM m Word64 -> m Word64
forall a b. (a -> b) -> a -> b
$ do
AnchoredFragment (Header blk)
curChain <- StrictTVar m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (AnchoredFragment (Header blk))
cdbChain
Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> STM m ()) -> Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChain) Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Word64
k
Word64 -> STM m Word64
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Word64 -> STM m Word64) -> Word64 -> STM m Word64
forall a b. (a -> b) -> a -> b
$ Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AnchoredFragment (Header blk) -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length AnchoredFragment (Header blk)
curChain) Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
- Word64
k
Fuse m -> Electric m (WithOrigin SlotNo) -> m (WithOrigin SlotNo)
forall (m :: * -> *) a.
(MonadThrow m, MonadMVar m) =>
Fuse m -> Electric m a -> m a
withFuse Fuse m
fuse (ChainDbEnv m blk -> Electric m (WithOrigin SlotNo)
forall (m :: * -> *) blk.
(IOLike m, ConsensusProtocol (BlockProtocol blk), HasHeader blk,
GetHeader blk, HasCallStack) =>
ChainDbEnv m blk -> Electric m (WithOrigin SlotNo)
copyToImmutableDB ChainDbEnv m blk
cdb) m (WithOrigin SlotNo) -> (WithOrigin SlotNo -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= WithOrigin SlotNo -> m ()
scheduleGC'
Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
let distance' :: Word64
distance' = Word64
distance Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
numToWrite
elapsed :: TimeSinceLast DiffTime
elapsed = (\Time
prev -> Time
now Time -> Time -> DiffTime
`diffTime` Time
prev) (Time -> DiffTime) -> TimeSinceLast Time -> TimeSinceLast DiffTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TimeSinceLast Time
mPrevSnapshot
if TimeSinceLast DiffTime -> Word64 -> Bool
onDiskShouldTakeSnapshot TimeSinceLast DiffTime
elapsed Word64
distance' then do
ChainDbEnv m blk -> m ()
forall (m :: * -> *) blk.
(IOLike m, LgrDbSerialiseConstraints blk, HasHeader blk,
IsLedger (LedgerState blk)) =>
ChainDbEnv m blk -> m ()
updateLedgerSnapshots ChainDbEnv m blk
cdb
TimeSinceLast Time -> Word64 -> m Void
loop (Time -> TimeSinceLast Time
forall time. time -> TimeSinceLast time
TimeSinceLast Time
now) Word64
0
else
TimeSinceLast Time -> Word64 -> m Void
loop TimeSinceLast Time
mPrevSnapshot Word64
distance'
scheduleGC' :: WithOrigin SlotNo -> m ()
scheduleGC' :: WithOrigin SlotNo -> m ()
scheduleGC' WithOrigin SlotNo
Origin = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
scheduleGC' (NotOrigin SlotNo
slotNo) =
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
forall (m :: * -> *) blk.
IOLike m =>
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
scheduleGC
((TraceGCEvent blk -> TraceEvent blk)
-> Tracer m (TraceEvent blk) -> Tracer m (TraceGCEvent blk)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap TraceGCEvent blk -> TraceEvent blk
forall blk. TraceGCEvent blk -> TraceEvent blk
TraceGCEvent Tracer m (TraceEvent blk)
cdbTracer)
SlotNo
slotNo
GcParams {
gcDelay :: DiffTime
gcDelay = DiffTime
cdbGcDelay
, gcInterval :: DiffTime
gcInterval = DiffTime
cdbGcInterval
}
GcSchedule m
gcSchedule
updateLedgerSnapshots ::
( IOLike m
, LgrDbSerialiseConstraints blk
, HasHeader blk
, IsLedger (LedgerState blk)
)
=> ChainDbEnv m blk -> m ()
updateLedgerSnapshots :: forall (m :: * -> *) blk.
(IOLike m, LgrDbSerialiseConstraints blk, HasHeader blk,
IsLedger (LedgerState blk)) =>
ChainDbEnv m blk -> m ()
updateLedgerSnapshots CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
..} = do
m (Maybe (DiskSnapshot, RealPoint blk)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe (DiskSnapshot, RealPoint blk)) -> m ())
-> m (Maybe (DiskSnapshot, RealPoint blk)) -> m ()
forall a b. (a -> b) -> a -> b
$ LgrDB m blk -> m (Maybe (DiskSnapshot, RealPoint blk))
forall (m :: * -> *) blk.
(IOLike m, LgrDbSerialiseConstraints blk, HasHeader blk,
IsLedger (LedgerState blk)) =>
LgrDB m blk -> m (Maybe (DiskSnapshot, RealPoint blk))
LgrDB.takeSnapshot LgrDB m blk
cdbLgrDB
m [DiskSnapshot] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [DiskSnapshot] -> m ()) -> m [DiskSnapshot] -> m ()
forall a b. (a -> b) -> a -> b
$ LgrDB m blk -> m [DiskSnapshot]
forall (m :: * -> *) blk.
(MonadCatch m, HasHeader blk) =>
LgrDB m blk -> m [DiskSnapshot]
LgrDB.trimSnapshots LgrDB m blk
cdbLgrDB
garbageCollect :: forall m blk. IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect :: forall (m :: * -> *) blk.
IOLike m =>
ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
..} SlotNo
slotNo = do
VolatileDB m blk -> HasCallStack => SlotNo -> m ()
forall (m :: * -> *) blk.
VolatileDB m blk -> HasCallStack => SlotNo -> m ()
VolatileDB.garbageCollect VolatileDB m blk
cdbVolatileDB SlotNo
slotNo
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
LgrDB m blk -> SlotNo -> STM m ()
forall (m :: * -> *) blk.
IOLike m =>
LgrDB m blk -> SlotNo -> STM m ()
LgrDB.garbageCollectPrevApplied LgrDB m blk
cdbLgrDB SlotNo
slotNo
StrictTVar m (WithFingerprint (InvalidBlocks blk))
-> (WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbInvalid ((WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> STM m ())
-> (WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ (InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
forall a b. (a -> b) -> WithFingerprint a -> WithFingerprint b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk))
-> (InvalidBlocks blk -> InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
-> WithFingerprint (InvalidBlocks blk)
forall a b. (a -> b) -> a -> b
$ (InvalidBlockInfo blk -> Bool)
-> InvalidBlocks blk -> InvalidBlocks blk
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter ((SlotNo -> SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
>= SlotNo
slotNo) (SlotNo -> Bool)
-> (InvalidBlockInfo blk -> SlotNo) -> InvalidBlockInfo blk -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InvalidBlockInfo blk -> SlotNo
forall blk. InvalidBlockInfo blk -> SlotNo
invalidBlockSlotNo)
Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ()) -> TraceEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ TraceGCEvent blk -> TraceEvent blk
forall blk. TraceGCEvent blk -> TraceEvent blk
TraceGCEvent (TraceGCEvent blk -> TraceEvent blk)
-> TraceGCEvent blk -> TraceEvent blk
forall a b. (a -> b) -> a -> b
$ SlotNo -> TraceGCEvent blk
forall blk. SlotNo -> TraceGCEvent blk
PerformedGC SlotNo
slotNo
newtype GcSchedule m = GcSchedule (StrictTVar m (StrictSeq ScheduledGc))
data ScheduledGc = ScheduledGc {
ScheduledGc -> Time
scheduledGcTime :: !Time
, ScheduledGc -> SlotNo
scheduledGcSlot :: !SlotNo
}
deriving (ScheduledGc -> ScheduledGc -> Bool
(ScheduledGc -> ScheduledGc -> Bool)
-> (ScheduledGc -> ScheduledGc -> Bool) -> Eq ScheduledGc
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ScheduledGc -> ScheduledGc -> Bool
== :: ScheduledGc -> ScheduledGc -> Bool
$c/= :: ScheduledGc -> ScheduledGc -> Bool
/= :: ScheduledGc -> ScheduledGc -> Bool
Eq, Int -> ScheduledGc -> ShowS
[ScheduledGc] -> ShowS
ScheduledGc -> String
(Int -> ScheduledGc -> ShowS)
-> (ScheduledGc -> String)
-> ([ScheduledGc] -> ShowS)
-> Show ScheduledGc
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ScheduledGc -> ShowS
showsPrec :: Int -> ScheduledGc -> ShowS
$cshow :: ScheduledGc -> String
show :: ScheduledGc -> String
$cshowList :: [ScheduledGc] -> ShowS
showList :: [ScheduledGc] -> ShowS
Show, (forall x. ScheduledGc -> Rep ScheduledGc x)
-> (forall x. Rep ScheduledGc x -> ScheduledGc)
-> Generic ScheduledGc
forall x. Rep ScheduledGc x -> ScheduledGc
forall x. ScheduledGc -> Rep ScheduledGc x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ScheduledGc -> Rep ScheduledGc x
from :: forall x. ScheduledGc -> Rep ScheduledGc x
$cto :: forall x. Rep ScheduledGc x -> ScheduledGc
to :: forall x. Rep ScheduledGc x -> ScheduledGc
Generic, Context -> ScheduledGc -> IO (Maybe ThunkInfo)
Proxy ScheduledGc -> String
(Context -> ScheduledGc -> IO (Maybe ThunkInfo))
-> (Context -> ScheduledGc -> IO (Maybe ThunkInfo))
-> (Proxy ScheduledGc -> String)
-> NoThunks ScheduledGc
forall a.
(Context -> a -> IO (Maybe ThunkInfo))
-> (Context -> a -> IO (Maybe ThunkInfo))
-> (Proxy a -> String)
-> NoThunks a
$cnoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
noThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
$cwNoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
wNoThunks :: Context -> ScheduledGc -> IO (Maybe ThunkInfo)
$cshowTypeOf :: Proxy ScheduledGc -> String
showTypeOf :: Proxy ScheduledGc -> String
NoThunks)
instance Condense ScheduledGc where
condense :: ScheduledGc -> String
condense (ScheduledGc Time
time SlotNo
slot) = (Time, SlotNo) -> String
forall a. Condense a => a -> String
condense (Time
time, SlotNo
slot)
data GcParams = GcParams {
GcParams -> DiffTime
gcDelay :: !DiffTime
, GcParams -> DiffTime
gcInterval :: !DiffTime
}
deriving (Int -> GcParams -> ShowS
[GcParams] -> ShowS
GcParams -> String
(Int -> GcParams -> ShowS)
-> (GcParams -> String) -> ([GcParams] -> ShowS) -> Show GcParams
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> GcParams -> ShowS
showsPrec :: Int -> GcParams -> ShowS
$cshow :: GcParams -> String
show :: GcParams -> String
$cshowList :: [GcParams] -> ShowS
showList :: [GcParams] -> ShowS
Show)
newGcSchedule :: IOLike m => m (GcSchedule m)
newGcSchedule :: forall (m :: * -> *). IOLike m => m (GcSchedule m)
newGcSchedule = StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m
forall (m :: * -> *).
StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m
GcSchedule (StrictTVar m (StrictSeq ScheduledGc) -> GcSchedule m)
-> m (StrictTVar m (StrictSeq ScheduledGc)) -> m (GcSchedule m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictSeq ScheduledGc -> m (StrictTVar m (StrictSeq ScheduledGc))
forall (m :: * -> *) a.
(HasCallStack, MonadSTM m, NoThunks a) =>
a -> m (StrictTVar m a)
newTVarIO StrictSeq ScheduledGc
forall a. StrictSeq a
Seq.empty
scheduleGC ::
forall m blk. IOLike m
=> Tracer m (TraceGCEvent blk)
-> SlotNo
-> GcParams
-> GcSchedule m
-> m ()
scheduleGC :: forall (m :: * -> *) blk.
IOLike m =>
Tracer m (TraceGCEvent blk)
-> SlotNo -> GcParams -> GcSchedule m -> m ()
scheduleGC Tracer m (TraceGCEvent blk)
tracer SlotNo
slotNo GcParams
gcParams (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) = do
Time
timeScheduledForGC <- GcParams -> Time -> Time
computeTimeForGC GcParams
gcParams (Time -> Time) -> m Time -> m Time
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (StrictSeq ScheduledGc)
-> (StrictSeq ScheduledGc -> StrictSeq ScheduledGc) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue ((StrictSeq ScheduledGc -> StrictSeq ScheduledGc) -> STM m ())
-> (StrictSeq ScheduledGc -> StrictSeq ScheduledGc) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \case
StrictSeq ScheduledGc
queue' :|> ScheduledGc { scheduledGcTime :: ScheduledGc -> Time
scheduledGcTime = Time
lastTimeScheduledForGC }
| Time
timeScheduledForGC Time -> Time -> Bool
forall a. Eq a => a -> a -> Bool
== Time
lastTimeScheduledForGC
-> StrictSeq ScheduledGc
queue' StrictSeq ScheduledGc -> ScheduledGc -> StrictSeq ScheduledGc
forall a. StrictSeq a -> a -> StrictSeq a
:|> Time -> SlotNo -> ScheduledGc
ScheduledGc Time
timeScheduledForGC SlotNo
slotNo
StrictSeq ScheduledGc
queue
-> StrictSeq ScheduledGc
queue StrictSeq ScheduledGc -> ScheduledGc -> StrictSeq ScheduledGc
forall a. StrictSeq a -> a -> StrictSeq a
:|> Time -> SlotNo -> ScheduledGc
ScheduledGc Time
timeScheduledForGC SlotNo
slotNo
Tracer m (TraceGCEvent blk) -> TraceGCEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceGCEvent blk)
tracer (TraceGCEvent blk -> m ()) -> TraceGCEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ SlotNo -> Time -> TraceGCEvent blk
forall blk. SlotNo -> Time -> TraceGCEvent blk
ScheduledGC SlotNo
slotNo Time
timeScheduledForGC
computeTimeForGC ::
GcParams
-> Time
-> Time
computeTimeForGC :: GcParams -> Time -> Time
computeTimeForGC GcParams { DiffTime
gcDelay :: GcParams -> DiffTime
gcDelay :: DiffTime
gcDelay, DiffTime
gcInterval :: GcParams -> DiffTime
gcInterval :: DiffTime
gcInterval } (Time DiffTime
now) =
DiffTime -> Time
Time (DiffTime -> Time) -> DiffTime -> Time
forall a b. (a -> b) -> a -> b
$ Integer -> DiffTime
picosecondsToDiffTime (Integer -> DiffTime) -> Integer -> DiffTime
forall a b. (a -> b) -> a -> b
$
Integer -> Integer -> Integer
forall a b. (Integral a, Integral b) => b -> a -> a
roundUpToInterval
(DiffTime -> Integer
diffTimeToPicoseconds DiffTime
gcInterval)
(DiffTime -> Integer
diffTimeToPicoseconds (DiffTime
now DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
+ DiffTime
gcDelay))
roundUpToInterval :: (Integral a, Integral b) => b -> a -> a
roundUpToInterval :: forall a b. (Integral a, Integral b) => b -> a -> a
roundUpToInterval b
interval a
x
| a
m a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
0
= a
d a -> a -> a
forall a. Num a => a -> a -> a
* b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
| Bool
otherwise
= (a
d a -> a -> a
forall a. Num a => a -> a -> a
+ a
1) a -> a -> a
forall a. Num a => a -> a -> a
* b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
where
(a
d, a
m) = a
x a -> a -> (a, a)
forall a. Integral a => a -> a -> (a, a)
`divMod` b -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral b
interval
gcScheduleRunner ::
forall m. IOLike m
=> GcSchedule m
-> (SlotNo -> m ())
-> m Void
gcScheduleRunner :: forall (m :: * -> *).
IOLike m =>
GcSchedule m -> (SlotNo -> m ()) -> m Void
gcScheduleRunner (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) SlotNo -> m ()
runGc = m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Void) -> m () -> m Void
forall a b. (a -> b) -> a -> b
$ do
Time
timeScheduledForGC <- STM m Time -> m Time
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Time -> m Time) -> STM m Time -> m Time
forall a b. (a -> b) -> a -> b
$
StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue STM m (StrictSeq ScheduledGc)
-> (StrictSeq ScheduledGc -> STM m Time) -> STM m Time
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
StrictSeq ScheduledGc
Seq.Empty -> STM m Time
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
ScheduledGc { Time
scheduledGcTime :: ScheduledGc -> Time
scheduledGcTime :: Time
scheduledGcTime } :<| StrictSeq ScheduledGc
_ -> Time -> STM m Time
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Time
scheduledGcTime
Time
currentTime <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
let toWait :: DiffTime
toWait = DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
max DiffTime
0 (Time
timeScheduledForGC Time -> Time -> DiffTime
`diffTime` Time
currentTime)
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
toWait
SlotNo
slotNo <- STM m SlotNo -> m SlotNo
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m SlotNo -> m SlotNo) -> STM m SlotNo -> m SlotNo
forall a b. (a -> b) -> a -> b
$
StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue STM m (StrictSeq ScheduledGc)
-> (StrictSeq ScheduledGc -> STM m SlotNo) -> STM m SlotNo
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ScheduledGc { SlotNo
scheduledGcSlot :: ScheduledGc -> SlotNo
scheduledGcSlot :: SlotNo
scheduledGcSlot } :<| StrictSeq ScheduledGc
queue' -> do
StrictTVar m (StrictSeq ScheduledGc)
-> StrictSeq ScheduledGc -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue StrictSeq ScheduledGc
queue'
SlotNo -> STM m SlotNo
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return SlotNo
scheduledGcSlot
StrictSeq ScheduledGc
Seq.Empty -> String -> STM m SlotNo
forall a. HasCallStack => String -> a
error String
"queue empty after waiting"
SlotNo -> m ()
runGc SlotNo
slotNo
dumpGcSchedule :: IOLike m => GcSchedule m -> STM m [ScheduledGc]
dumpGcSchedule :: forall (m :: * -> *).
IOLike m =>
GcSchedule m -> STM m [ScheduledGc]
dumpGcSchedule (GcSchedule StrictTVar m (StrictSeq ScheduledGc)
varQueue) = StrictSeq ScheduledGc -> [ScheduledGc]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (StrictSeq ScheduledGc -> [ScheduledGc])
-> STM m (StrictSeq ScheduledGc) -> STM m [ScheduledGc]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (StrictSeq ScheduledGc)
-> STM m (StrictSeq ScheduledGc)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (StrictSeq ScheduledGc)
varQueue
addBlockRunner ::
( IOLike m
, LedgerSupportsProtocol blk
, BlockSupportsDiffusionPipelining blk
, InspectLedger blk
, HasHardForkHistory blk
, HasCallStack
)
=> Fuse m
-> ChainDbEnv m blk
-> m Void
addBlockRunner :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk, HasCallStack) =>
Fuse m -> ChainDbEnv m blk -> m Void
addBlockRunner Fuse m
fuse cdb :: ChainDbEnv m blk
cdb@CDB{m (LoE (AnchoredFragment (Header blk)))
Tracer m (TraceEvent blk)
DiffTime
ResourceRegistry m
StrictTVar m (m ())
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (TentativeHeaderState blk)
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m FollowerKey
StrictTVar m IteratorKey
Fuse m
TopLevelConfig blk
VolatileDB m blk
ImmutableDB m blk
LgrDB m blk
ChainSelQueue m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbCopyFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbChainSelFuse :: forall (m :: * -> *) blk. ChainDbEnv m blk -> Fuse m
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbChainSelQueue :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChainSelQueue m blk
cdbLoE :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> m (LoE (AnchoredFragment (Header blk)))
cdbImmutableDB :: ImmutableDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbLgrDB :: LgrDB m blk
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbTentativeState :: StrictTVar m (TentativeHeaderState blk)
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbCopyFuse :: Fuse m
cdbChainSelFuse :: Fuse m
cdbTracer :: Tracer m (TraceEvent blk)
cdbRegistry :: ResourceRegistry m
cdbGcDelay :: DiffTime
cdbGcInterval :: DiffTime
cdbKillBgThreads :: StrictTVar m (m ())
cdbChainSelQueue :: ChainSelQueue m blk
cdbLoE :: m (LoE (AnchoredFragment (Header blk)))
..} = m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Void) -> m () -> m Void
forall a b. (a -> b) -> a -> b
$ do
let trace :: TraceAddBlockEvent blk -> m ()
trace = Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
cdbTracer (TraceEvent blk -> m ())
-> (TraceAddBlockEvent blk -> TraceEvent blk)
-> TraceAddBlockEvent blk
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TraceAddBlockEvent blk -> TraceEvent blk
forall blk. TraceAddBlockEvent blk -> TraceEvent blk
TraceAddBlockEvent
TraceAddBlockEvent blk -> m ()
trace (TraceAddBlockEvent blk -> m ()) -> TraceAddBlockEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall blk. Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
PoppedBlockFromQueue Enclosing' (RealPoint blk)
forall a. Enclosing' a
RisingEdge
Fuse m -> Electric m () -> m ()
forall (m :: * -> *) a.
(MonadThrow m, MonadMVar m) =>
Fuse m -> Electric m a -> m a
withFuse Fuse m
fuse (Electric m () -> m ()) -> Electric m () -> m ()
forall a b. (a -> b) -> a -> b
$
Electric m (ChainSelMessage m blk)
-> (ChainSelMessage m blk -> Electric m ())
-> (ChainSelMessage m blk -> Electric m ())
-> Electric m ()
forall a b c.
Electric m a
-> (a -> Electric m b) -> (a -> Electric m c) -> Electric m c
forall (m :: * -> *) a b c.
MonadCatch m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError
(m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk)
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk))
-> m (ChainSelMessage m blk) -> Electric m (ChainSelMessage m blk)
forall a b. (a -> b) -> a -> b
$ ChainSelQueue m blk -> m (ChainSelMessage m blk)
forall (m :: * -> *) blk.
IOLike m =>
ChainSelQueue m blk -> m (ChainSelMessage m blk)
getChainSelMessage ChainSelQueue m blk
cdbChainSelQueue)
(\ChainSelMessage m blk
message -> m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
case ChainSelMessage m blk
message of
ChainSelReprocessLoEBlocks StrictTMVar m ()
varProcessed ->
STM m Bool -> STM m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM m Bool -> STM m ()) -> STM m Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m () -> () -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m ()
varProcessed ()
ChainSelAddBlock BlockToAdd{StrictTMVar m Bool
varBlockWrittenToDisk :: StrictTMVar m Bool
varBlockWrittenToDisk :: forall (m :: * -> *) blk. BlockToAdd m blk -> StrictTMVar m Bool
varBlockWrittenToDisk, StrictTMVar m (AddBlockResult blk)
varBlockProcessed :: StrictTMVar m (AddBlockResult blk)
varBlockProcessed :: forall (m :: * -> *) blk.
BlockToAdd m blk -> StrictTMVar m (AddBlockResult blk)
varBlockProcessed} -> do
Bool
_ <- StrictTMVar m Bool -> Bool -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m Bool
varBlockWrittenToDisk
Bool
False
Bool
_ <- StrictTMVar m (AddBlockResult blk)
-> AddBlockResult blk -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m (AddBlockResult blk)
varBlockProcessed
(String -> AddBlockResult blk
forall blk. String -> AddBlockResult blk
FailedToAddBlock String
"Failed to add block synchronously")
() -> STM m ()
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ChainSelQueue m blk -> STM m ()
forall (m :: * -> *) blk.
IOLike m =>
ChainSelQueue m blk -> STM m ()
closeChainSelQueue ChainSelQueue m blk
cdbChainSelQueue)
(\ChainSelMessage m blk
message -> do
m () -> Electric m ()
forall (m :: * -> *) a. Monad m => m a -> Electric m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Electric m ()) -> m () -> Electric m ()
forall a b. (a -> b) -> a -> b
$ case ChainSelMessage m blk
message of
ChainSelReprocessLoEBlocks StrictTMVar m ()
_ ->
TraceAddBlockEvent blk -> m ()
trace TraceAddBlockEvent blk
forall blk. TraceAddBlockEvent blk
PoppedReprocessLoEBlocksFromQueue
ChainSelAddBlock BlockToAdd{blk
blockToAdd :: blk
blockToAdd :: forall (m :: * -> *) blk. BlockToAdd m blk -> blk
blockToAdd} ->
TraceAddBlockEvent blk -> m ()
trace (TraceAddBlockEvent blk -> m ()) -> TraceAddBlockEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall blk. Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
PoppedBlockFromQueue (Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk)
-> Enclosing' (RealPoint blk) -> TraceAddBlockEvent blk
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> Enclosing' (RealPoint blk)
forall a. a -> Enclosing' a
FallingEdgeWith (RealPoint blk -> Enclosing' (RealPoint blk))
-> RealPoint blk -> Enclosing' (RealPoint blk)
forall a b. (a -> b) -> a -> b
$
blk -> RealPoint blk
forall blk. HasHeader blk => blk -> RealPoint blk
blockRealPoint blk
blockToAdd
ChainDbEnv m blk -> ChainSelMessage m blk -> Electric m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk, HasCallStack) =>
ChainDbEnv m blk -> ChainSelMessage m blk -> Electric m ()
chainSelSync ChainDbEnv m blk
cdb ChainSelMessage m blk
message)