{-# LANGUAGE DisambiguateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test.Consensus.PeerSimulator.NodeLifecycle
( LiveInterval (..)
, LiveIntervalResult (..)
, LiveNode (..)
, LiveResources (..)
, NodeLifecycle (..)
, lifecycleStart
, lifecycleStop
, restoreNode
) where
import Control.ResourceRegistry
import Control.Tracer (Tracer (..), traceWith)
import Data.Functor (void)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Typeable (Typeable)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config (TopLevelConfig (..))
import Ouroboros.Consensus.HardFork.Abstract (HasHardForkHistory)
import Ouroboros.Consensus.HeaderValidation (HeaderWithTime (..))
import Ouroboros.Consensus.Ledger.Basics (LedgerState)
import Ouroboros.Consensus.Ledger.Extended (ExtLedgerState)
import Ouroboros.Consensus.Ledger.Inspect (InspectLedger)
import Ouroboros.Consensus.Ledger.SupportsProtocol
( LedgerSupportsProtocol
)
import Ouroboros.Consensus.Ledger.Tables.MapKind (ValuesMK)
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
( ChainSyncClientHandleCollection (..)
)
import Ouroboros.Consensus.Storage.ChainDB.API
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl as ChainDB
import Ouroboros.Consensus.Storage.ChainDB.Impl.Args
( cdbsLoE
, updateTracer
)
import Ouroboros.Consensus.Storage.ImmutableDB.Chunks.Internal
( ChunkInfo
)
import Ouroboros.Consensus.Storage.LedgerDB.API
( CanUpgradeLedgerTables
)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import System.FS.Sim.MockFS (MockFS)
import qualified System.FS.Sim.MockFS as MockFS
import Test.Consensus.PeerSimulator.Resources
import Test.Consensus.PeerSimulator.StateView
import Test.Consensus.PeerSimulator.Trace
import Test.Consensus.PointSchedule.Peers (PeerId)
import Test.Util.ChainDB
import Test.Util.Orphans.IOLike ()
data LiveNode blk m = LiveNode
{ forall blk (m :: * -> *). LiveNode blk m -> ChainDB m blk
lnChainDb :: ChainDB m blk
, forall blk (m :: * -> *). LiveNode blk m -> StateViewTracers blk m
lnStateViewTracers :: StateViewTracers blk m
, forall blk (m :: * -> *). LiveNode blk m -> Tracer m ()
lnStateTracer :: Tracer m ()
, forall blk (m :: * -> *). LiveNode blk m -> m (WithOrigin SlotNo)
lnCopyToImmDb :: m (WithOrigin SlotNo)
, forall blk (m :: * -> *). LiveNode blk m -> Set PeerId
lnPeers :: Set PeerId
}
data LiveIntervalResult blk = LiveIntervalResult
{ forall blk. LiveIntervalResult blk -> [PeerSimulatorResult blk]
lirPeerResults :: [PeerSimulatorResult blk]
, forall blk. LiveIntervalResult blk -> Set PeerId
lirActive :: Set PeerId
}
data LiveResources blk m = LiveResources
{ forall blk (m :: * -> *). LiveResources blk m -> ResourceRegistry m
lrRegistry :: ResourceRegistry m
, forall blk (m :: * -> *).
LiveResources blk m -> PeerSimulatorResources m blk
lrPeerSim :: PeerSimulatorResources m blk
, forall blk (m :: * -> *).
LiveResources blk m -> Tracer m (TraceEvent blk)
lrTracer :: Tracer m (TraceEvent blk)
, forall blk (m :: * -> *).
LiveResources blk m -> ChainDB m blk -> m (Tracer m ())
lrSTracer :: ChainDB m blk -> m (Tracer m ())
, forall blk (m :: * -> *). LiveResources blk m -> TopLevelConfig blk
lrConfig :: TopLevelConfig blk
, forall blk (m :: * -> *). LiveResources blk m -> ChunkInfo
lrChunkInfo :: ChunkInfo
, forall blk (m :: * -> *).
LiveResources blk m -> ExtLedgerState blk ValuesMK
lrInitLedger :: ExtLedgerState blk ValuesMK
, forall blk (m :: * -> *).
LiveResources blk m -> NodeDBs (StrictTMVar m MockFS)
lrCdb :: NodeDBs (StrictTMVar m MockFS)
, forall blk (m :: * -> *).
LiveResources blk m
-> LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
}
data LiveInterval blk m = LiveInterval
{ forall blk (m :: * -> *). LiveInterval blk m -> LiveResources blk m
liResources :: LiveResources blk m
, forall blk (m :: * -> *).
LiveInterval blk m -> LiveIntervalResult blk
liResult :: LiveIntervalResult blk
, forall blk (m :: * -> *). LiveInterval blk m -> LiveNode blk m
liNode :: LiveNode blk m
}
data NodeLifecycle blk m = NodeLifecycle
{ forall blk (m :: * -> *). NodeLifecycle blk m -> Maybe DiffTime
nlMinDuration :: Maybe DiffTime
, forall blk (m :: * -> *).
NodeLifecycle blk m -> LiveIntervalResult blk -> m (LiveNode blk m)
nlStart :: LiveIntervalResult blk -> m (LiveNode blk m)
, forall blk (m :: * -> *).
NodeLifecycle blk m -> LiveNode blk m -> m (LiveIntervalResult blk)
nlShutdown :: LiveNode blk m -> m (LiveIntervalResult blk)
}
mkChainDb ::
IOLike m =>
( LedgerSupportsProtocol blk
, ChainDB.SerialiseDiskConstraints blk
, BlockSupportsDiffusionPipelining blk
, InspectLedger blk
, HasHardForkHistory blk
, ConvertRawHash blk
, CanUpgradeLedgerTables (LedgerState blk)
) =>
LiveResources blk m ->
m (ChainDB m blk, m (WithOrigin SlotNo))
mkChainDb :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
SerialiseDiskConstraints blk, BlockSupportsDiffusionPipelining blk,
InspectLedger blk, HasHardForkHistory blk, ConvertRawHash blk,
CanUpgradeLedgerTables (LedgerState blk)) =>
LiveResources blk m -> m (ChainDB m blk, m (WithOrigin SlotNo))
mkChainDb LiveResources blk m
resources = do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
STM m MockFS -> STM m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM m MockFS -> STM m ()) -> STM m MockFS -> STM m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m MockFS -> MockFS -> STM m MockFS
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m a
swapTMVar (NodeDBs (StrictTMVar m MockFS) -> StrictTMVar m MockFS
forall db. NodeDBs db -> db
nodeDBsGsm NodeDBs (StrictTMVar m MockFS)
lrCdb) MockFS
MockFS.empty
STM m MockFS -> STM m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM m MockFS -> STM m ()) -> STM m MockFS -> STM m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m MockFS -> MockFS -> STM m MockFS
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m a
swapTMVar (NodeDBs (StrictTMVar m MockFS) -> StrictTMVar m MockFS
forall db. NodeDBs db -> db
nodeDBsLgr NodeDBs (StrictTMVar m MockFS)
lrCdb) MockFS
MockFS.empty
chainDbArgs <- do
let args :: ChainDbArgs Identity m blk
args =
Tracer m (TraceEvent blk)
-> ChainDbArgs Identity m blk -> ChainDbArgs Identity m blk
forall (m :: * -> *) blk (f :: * -> *).
Tracer m (TraceEvent blk)
-> ChainDbArgs f m blk -> ChainDbArgs f m blk
updateTracer
((TraceEvent blk -> m ()) -> Tracer m (TraceEvent blk)
forall (m :: * -> *) a. (a -> m ()) -> Tracer m a
Tracer (Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
lrTracer (TraceEvent blk -> m ())
-> (TraceEvent blk -> TraceEvent blk) -> TraceEvent blk -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TraceEvent blk -> TraceEvent blk
forall blk. TraceEvent blk -> TraceEvent blk
TraceChainDBEvent))
( MinimalChainDbArgs m blk -> ChainDbArgs Identity m blk
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
LedgerSupportsLedgerDB blk) =>
MinimalChainDbArgs m blk -> Complete ChainDbArgs m blk
fromMinimalChainDbArgs
MinimalChainDbArgs
{ mcdbTopLevelConfig :: TopLevelConfig blk
mcdbTopLevelConfig = TopLevelConfig blk
lrConfig
, mcdbChunkInfo :: ChunkInfo
mcdbChunkInfo = ChunkInfo
lrChunkInfo
, mcdbInitLedger :: ExtLedgerState blk ValuesMK
mcdbInitLedger = ExtLedgerState blk ValuesMK
lrInitLedger
, mcdbRegistry :: ResourceRegistry m
mcdbRegistry = ResourceRegistry m
lrRegistry
, mcdbNodeDBs :: NodeDBs (StrictTMVar m MockFS)
mcdbNodeDBs = NodeDBs (StrictTMVar m MockFS)
lrCdb
}
)
ChainDbArgs Identity m blk -> m (ChainDbArgs Identity m blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ChainDbArgs Identity m blk -> m (ChainDbArgs Identity m blk))
-> ChainDbArgs Identity m blk -> m (ChainDbArgs Identity m blk)
forall a b. (a -> b) -> a -> b
$
ChainDbArgs Identity m blk
args
{ ChainDB.cdbsArgs =
(ChainDB.cdbsArgs args)
{ cdbsLoE = traverse readTVarIO lrLoEVar
}
}
(_, (chainDB, internal)) <-
allocate
lrRegistry
(\ResourceId
_ -> ChainDbArgs Identity m blk
-> Bool -> m (ChainDB m blk, Internal m blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
BlockSupportsDiffusionPipelining blk, InspectLedger blk,
HasHardForkHistory blk, ConvertRawHash blk,
SerialiseDiskConstraints blk, HasCallStack,
LedgerSupportsLedgerDB blk) =>
Complete ChainDbArgs m blk
-> Bool -> m (ChainDB m blk, Internal m blk)
ChainDB.openDBInternal ChainDbArgs Identity m blk
chainDbArgs Bool
False)
(ChainDB.closeDB . fst)
let ChainDB.Internal{intCopyToImmutableDB, intAddBlockRunner} = internal
void $ forkLinkedThread lrRegistry "AddBlockRunner" (void intAddBlockRunner)
pure (chainDB, intCopyToImmutableDB)
where
LiveResources{ResourceRegistry m
lrRegistry :: forall blk (m :: * -> *). LiveResources blk m -> ResourceRegistry m
lrRegistry :: ResourceRegistry m
lrRegistry, Tracer m (TraceEvent blk)
lrTracer :: forall blk (m :: * -> *).
LiveResources blk m -> Tracer m (TraceEvent blk)
lrTracer :: Tracer m (TraceEvent blk)
lrTracer, TopLevelConfig blk
lrConfig :: forall blk (m :: * -> *). LiveResources blk m -> TopLevelConfig blk
lrConfig :: TopLevelConfig blk
lrConfig, NodeDBs (StrictTMVar m MockFS)
lrCdb :: forall blk (m :: * -> *).
LiveResources blk m -> NodeDBs (StrictTMVar m MockFS)
lrCdb :: NodeDBs (StrictTMVar m MockFS)
lrCdb, LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar :: forall blk (m :: * -> *).
LiveResources blk m
-> LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar, ChunkInfo
lrChunkInfo :: forall blk (m :: * -> *). LiveResources blk m -> ChunkInfo
lrChunkInfo :: ChunkInfo
lrChunkInfo, ExtLedgerState blk ValuesMK
lrInitLedger :: forall blk (m :: * -> *).
LiveResources blk m -> ExtLedgerState blk ValuesMK
lrInitLedger :: ExtLedgerState blk ValuesMK
lrInitLedger} = LiveResources blk m
resources
restoreNode ::
( IOLike m
, LedgerSupportsProtocol blk
, ChainDB.SerialiseDiskConstraints blk
, BlockSupportsDiffusionPipelining blk
, InspectLedger blk
, HasHardForkHistory blk
, ConvertRawHash blk
, CanUpgradeLedgerTables (LedgerState blk)
) =>
LiveResources blk m ->
LiveIntervalResult blk ->
m (LiveNode blk m)
restoreNode :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
SerialiseDiskConstraints blk, BlockSupportsDiffusionPipelining blk,
InspectLedger blk, HasHardForkHistory blk, ConvertRawHash blk,
CanUpgradeLedgerTables (LedgerState blk)) =>
LiveResources blk m -> LiveIntervalResult blk -> m (LiveNode blk m)
restoreNode LiveResources blk m
resources LiveIntervalResult{[PeerSimulatorResult blk]
lirPeerResults :: forall blk. LiveIntervalResult blk -> [PeerSimulatorResult blk]
lirPeerResults :: [PeerSimulatorResult blk]
lirPeerResults, Set PeerId
lirActive :: forall blk. LiveIntervalResult blk -> Set PeerId
lirActive :: Set PeerId
lirActive} = do
lnStateViewTracers <- [PeerSimulatorResult blk] -> m (StateViewTracers blk m)
forall (m :: * -> *) blk.
IOLike m =>
[PeerSimulatorResult blk] -> m (StateViewTracers blk m)
stateViewTracersWithInitial [PeerSimulatorResult blk]
lirPeerResults
(lnChainDb, lnCopyToImmDb) <- mkChainDb resources
lnStateTracer <- lrSTracer resources lnChainDb
pure
LiveNode
{ lnChainDb
, lnStateViewTracers
, lnStateTracer
, lnCopyToImmDb
, lnPeers = lirActive
}
lifecycleStart ::
forall m blk.
( IOLike m
, LedgerSupportsProtocol blk
, ChainDB.SerialiseDiskConstraints blk
, BlockSupportsDiffusionPipelining blk
, InspectLedger blk
, HasHardForkHistory blk
, ConvertRawHash blk
, CanUpgradeLedgerTables (LedgerState blk)
) =>
(LiveInterval blk m -> m ()) ->
LiveResources blk m ->
LiveIntervalResult blk ->
m (LiveNode blk m)
lifecycleStart :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
SerialiseDiskConstraints blk, BlockSupportsDiffusionPipelining blk,
InspectLedger blk, HasHardForkHistory blk, ConvertRawHash blk,
CanUpgradeLedgerTables (LedgerState blk)) =>
(LiveInterval blk m -> m ())
-> LiveResources blk m
-> LiveIntervalResult blk
-> m (LiveNode blk m)
lifecycleStart LiveInterval blk m -> m ()
start LiveResources blk m
liResources LiveIntervalResult blk
liResult = do
TraceEvent blk -> m ()
trace (TraceSchedulerEvent blk -> TraceEvent blk
forall blk. TraceSchedulerEvent blk -> TraceEvent blk
TraceSchedulerEvent TraceSchedulerEvent blk
forall blk. TraceSchedulerEvent blk
TraceNodeStartupStart)
liNode <- LiveResources blk m -> LiveIntervalResult blk -> m (LiveNode blk m)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk,
SerialiseDiskConstraints blk, BlockSupportsDiffusionPipelining blk,
InspectLedger blk, HasHardForkHistory blk, ConvertRawHash blk,
CanUpgradeLedgerTables (LedgerState blk)) =>
LiveResources blk m -> LiveIntervalResult blk -> m (LiveNode blk m)
restoreNode LiveResources blk m
liResources LiveIntervalResult blk
liResult
start LiveInterval{liResources, liResult, liNode}
chain <- atomically (ChainDB.getCurrentChain (lnChainDb liNode))
trace (TraceSchedulerEvent (TraceNodeStartupComplete chain))
pure liNode
where
trace :: TraceEvent blk -> m ()
trace = Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith (LiveResources blk m -> Tracer m (TraceEvent blk)
forall blk (m :: * -> *).
LiveResources blk m -> Tracer m (TraceEvent blk)
lrTracer LiveResources blk m
liResources)
lifecycleStop ::
(IOLike m, GetHeader blk, Typeable blk) =>
LiveResources blk m ->
LiveNode blk m ->
m (LiveIntervalResult blk)
lifecycleStop :: forall (m :: * -> *) blk.
(IOLike m, GetHeader blk, Typeable blk) =>
LiveResources blk m -> LiveNode blk m -> m (LiveIntervalResult blk)
lifecycleStop LiveResources blk m
resources LiveNode{StateViewTracers blk m
lnStateViewTracers :: forall blk (m :: * -> *). LiveNode blk m -> StateViewTracers blk m
lnStateViewTracers :: StateViewTracers blk m
lnStateViewTracers, m (WithOrigin SlotNo)
lnCopyToImmDb :: forall blk (m :: * -> *). LiveNode blk m -> m (WithOrigin SlotNo)
lnCopyToImmDb :: m (WithOrigin SlotNo)
lnCopyToImmDb, Set PeerId
lnPeers :: forall blk (m :: * -> *). LiveNode blk m -> Set PeerId
lnPeers :: Set PeerId
lnPeers} = do
immutableTip <- m (WithOrigin SlotNo)
lnCopyToImmDb
trace (TraceSchedulerEvent (TraceNodeShutdownStart immutableTip))
lirPeerResults <- svtGetPeerSimulatorResults lnStateViewTracers
let disconnectedPeers = [PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList (PeerSimulatorResult blk -> PeerId
forall blk. PeerSimulatorResult blk -> PeerId
psePeerId (PeerSimulatorResult blk -> PeerId)
-> [PeerSimulatorResult blk] -> [PeerId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [PeerSimulatorResult blk]
lirPeerResults)
lirActive = Set PeerId
lnPeers Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ Set PeerId
disconnectedPeers
releaseAll lrRegistry
atomically $ do
cschcRemoveAllHandles psrHandles
case lrLoEVar of
LoEEnabled StrictTVar m (AnchoredFragment (HeaderWithTime blk))
var -> StrictTVar m (AnchoredFragment (HeaderWithTime blk))
-> (AnchoredFragment (HeaderWithTime blk)
-> AnchoredFragment (HeaderWithTime blk))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (AnchoredFragment (HeaderWithTime blk))
var (AnchoredFragment (HeaderWithTime blk)
-> AnchoredFragment (HeaderWithTime blk)
-> AnchoredFragment (HeaderWithTime blk)
forall a b. a -> b -> a
const (Anchor (HeaderWithTime blk)
-> AnchoredFragment (HeaderWithTime blk)
forall v a b. Anchorable v a b => a -> AnchoredSeq v a b
AF.Empty Anchor (HeaderWithTime blk)
forall block. Anchor block
AF.AnchorGenesis))
LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
LoEDisabled -> () -> STM m ()
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
trace (TraceSchedulerEvent TraceNodeShutdownComplete)
pure LiveIntervalResult{lirActive, lirPeerResults}
where
trace :: TraceEvent blk -> m ()
trace = Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
lrTracer
LiveResources
{ ResourceRegistry m
lrRegistry :: forall blk (m :: * -> *). LiveResources blk m -> ResourceRegistry m
lrRegistry :: ResourceRegistry m
lrRegistry
, Tracer m (TraceEvent blk)
lrTracer :: forall blk (m :: * -> *).
LiveResources blk m -> Tracer m (TraceEvent blk)
lrTracer :: Tracer m (TraceEvent blk)
lrTracer
, lrPeerSim :: forall blk (m :: * -> *).
LiveResources blk m -> PeerSimulatorResources m blk
lrPeerSim = PeerSimulatorResources{ChainSyncClientHandleCollection PeerId m blk
psrHandles :: ChainSyncClientHandleCollection PeerId m blk
psrHandles :: forall (m :: * -> *) blk.
PeerSimulatorResources m blk
-> ChainSyncClientHandleCollection PeerId m blk
psrHandles}
, LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar :: forall blk (m :: * -> *).
LiveResources blk m
-> LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar
} = LiveResources blk m
resources