{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

module Test.Consensus.PeerSimulator.Run (
    SchedulerConfig (..)
  , debugScheduler
  , defaultSchedulerConfig
  , runPointSchedule
  ) where

import           Control.Monad (foldM, forM, void, when)
import           Control.Monad.Class.MonadTime (MonadTime)
import           Control.Monad.Class.MonadTimer.SI (MonadTimer)
import           Control.ResourceRegistry
import           Control.Tracer (Tracer (..), nullTracer, traceWith)
import           Data.Coerce (coerce)
import           Data.Foldable (for_)
import           Data.List (sort)
import qualified Data.List.NonEmpty as NonEmpty
import           Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import           Ouroboros.Consensus.Block
import           Ouroboros.Consensus.Config (TopLevelConfig (..))
import           Ouroboros.Consensus.Genesis.Governor (gddWatcher)
import           Ouroboros.Consensus.HeaderValidation (HeaderWithTime)
import           Ouroboros.Consensus.Ledger.SupportsProtocol
                     (LedgerSupportsProtocol)
import           Ouroboros.Consensus.MiniProtocol.ChainSync.Client
                     (CSJConfig (..), CSJEnabledConfig (..), ChainDbView,
                     ChainSyncClientHandle,
                     ChainSyncClientHandleCollection (..),
                     ChainSyncLoPBucketConfig (..),
                     ChainSyncLoPBucketEnabledConfig (..), viewChainSyncState)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
import qualified Ouroboros.Consensus.Node.GsmState as GSM
import           Ouroboros.Consensus.Storage.ChainDB.API
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
import           Ouroboros.Consensus.Util.Condense (Condense (..))
import           Ouroboros.Consensus.Util.IOLike
import           Ouroboros.Consensus.Util.STM (forkLinkedWatcher)
import           Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import           Ouroboros.Network.BlockFetch (FetchClientRegistry,
                     bracketSyncWithFetchClient, newFetchClientRegistry)
import           Ouroboros.Network.Channel (createConnectedChannels)
import           Ouroboros.Network.ControlMessage (ControlMessage (..),
                     ControlMessageSTM)
import           Ouroboros.Network.Protocol.ChainSync.Codec
import           Ouroboros.Network.Util.ShowProxy (ShowProxy)
import qualified Test.Consensus.PeerSimulator.BlockFetch as BlockFetch
import qualified Test.Consensus.PeerSimulator.ChainSync as ChainSync
import           Test.Consensus.PeerSimulator.Config
import qualified Test.Consensus.PeerSimulator.CSJInvariants as CSJInvariants
import           Test.Consensus.PeerSimulator.NodeLifecycle
import           Test.Consensus.PeerSimulator.Resources
import           Test.Consensus.PeerSimulator.StateDiagram
                     (peerSimStateDiagramSTMTracerDebug)
import           Test.Consensus.PeerSimulator.StateView
import           Test.Consensus.PeerSimulator.Trace
import           Test.Consensus.PointSchedule (BlockFetchTimeout,
                     CSJParams (..), GenesisTest (..), GenesisTestFull,
                     LoPBucketParams (..), PointSchedule (..), peersStates,
                     peersStatesRelative)
import           Test.Consensus.PointSchedule.NodeState (NodeState)
import           Test.Consensus.PointSchedule.Peers (Peer (..), PeerId,
                     getPeerIds)
import           Test.Util.ChainDB
import           Test.Util.Header (dropTimeFromFragment)
import           Test.Util.Orphans.IOLike ()
import           Test.Util.TestBlock (TestBlock)

-- | Behavior config for the scheduler.
data SchedulerConfig =
  SchedulerConfig {
    -- | Whether to enable timeouts for the ChainSync protocol. The value of
    -- timeouts themselves is defined in 'GenesisTest'.
      SchedulerConfig -> Bool
scEnableChainSyncTimeouts  :: Bool

    -- | Whether to enable timeouts for the BlockFetch protocol. The value of
    -- timeouts themselves is defined in 'GenesisTest'.
    , SchedulerConfig -> Bool
scEnableBlockFetchTimeouts :: Bool

    -- | If 'True', 'Test.Consensus.Genesis.Setup.runTest' will print traces
    -- to stderr.
    --
    -- Use 'debugScheduler' to toggle it conveniently.
    , SchedulerConfig -> Bool
scDebug                    :: Bool

    -- | Whether to trace when running the scheduler.
    , SchedulerConfig -> Bool
scTrace                    :: Bool

    -- | Whether to trace only the current state of the candidates and selection,
    -- which provides a less verbose view of the test progress.
    , SchedulerConfig -> Bool
scTraceState               :: Bool

    -- | Enable Limit on Eagerness (LoE) and the Genesis Density Disconnection
    -- governor (GDD).
    , SchedulerConfig -> Bool
scEnableLoE                :: Bool

    -- | Whether to enable the LoP. The parameters of the LoP come from
    -- 'GenesisTest'.
    , SchedulerConfig -> Bool
scEnableLoP                :: Bool

    -- | Enable node downtime if this is 'Just', using the value as minimum tick
    -- duration to trigger it.
    , SchedulerConfig -> Maybe DiffTime
scDowntime                 :: Maybe DiffTime

    -- | Enable the use of ChainSel starvation information in the block fetch
    -- decision logic. It is never actually disabled, but rather the grace
    -- period is made virtually infinite.
    , SchedulerConfig -> Bool
scEnableChainSelStarvation :: Bool

    -- | Whether to enable ChainSync Jumping. The parameters come from
    -- 'GenesisTest'.
    , SchedulerConfig -> Bool
scEnableCSJ                :: Bool
  }

-- | Default scheduler config
defaultSchedulerConfig :: SchedulerConfig
defaultSchedulerConfig :: SchedulerConfig
defaultSchedulerConfig =
  SchedulerConfig {
    scEnableChainSyncTimeouts :: Bool
scEnableChainSyncTimeouts = Bool
True,
    scEnableBlockFetchTimeouts :: Bool
scEnableBlockFetchTimeouts = Bool
True,
    scDebug :: Bool
scDebug = Bool
False,
    scTrace :: Bool
scTrace = Bool
True,
    scTraceState :: Bool
scTraceState = Bool
False,
    scEnableLoE :: Bool
scEnableLoE = Bool
False,
    scEnableLoP :: Bool
scEnableLoP = Bool
False,
    scDowntime :: Maybe DiffTime
scDowntime = Maybe DiffTime
forall a. Maybe a
Nothing,
    scEnableChainSelStarvation :: Bool
scEnableChainSelStarvation = Bool
True,
    scEnableCSJ :: Bool
scEnableCSJ = Bool
False
  }

-- | Enable debug tracing during a scheduler test.
debugScheduler :: SchedulerConfig -> SchedulerConfig
debugScheduler :: SchedulerConfig -> SchedulerConfig
debugScheduler SchedulerConfig
conf = SchedulerConfig
conf { scDebug = True }

-- | Run a ChainSync protocol for one peer, consisting of a server and client.
--
-- The connection uses timeouts based on the ASC.
--
-- The client is synchronized with BlockFetch using the supplied 'FetchClientRegistry'.
--
-- Execution is started asynchronously, returning an action that kills the thread,
-- to allow extraction of a potential exception.
startChainSyncConnectionThread ::
  (IOLike m, MonadTimer m, LedgerSupportsProtocol blk, ShowProxy blk, ShowProxy (Header blk)) =>
  ResourceRegistry m ->
  Tracer m (TraceEvent blk) ->
  TopLevelConfig blk ->
  ChainDbView m blk ->
  FetchClientRegistry PeerId (HeaderWithTime blk) blk m ->
  SharedResources m blk ->
  ChainSyncResources m blk ->
  ChainSyncTimeout ->
  ChainSyncLoPBucketConfig ->
  CSJConfig ->
  StateViewTracers blk m ->
  ChainSyncClientHandleCollection PeerId m blk ->
  m (Thread m (), Thread m ())
startChainSyncConnectionThread :: forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsProtocol blk, ShowProxy blk,
 ShowProxy (Header blk)) =>
ResourceRegistry m
-> Tracer m (TraceEvent blk)
-> TopLevelConfig blk
-> ChainDbView m blk
-> FetchClientRegistry PeerId (HeaderWithTime blk) blk m
-> SharedResources m blk
-> ChainSyncResources m blk
-> ChainSyncTimeout
-> ChainSyncLoPBucketConfig
-> CSJConfig
-> StateViewTracers blk m
-> ChainSyncClientHandleCollection PeerId m blk
-> m (Thread m (), Thread m ())
startChainSyncConnectionThread
  ResourceRegistry m
registry
  Tracer m (TraceEvent blk)
tracer
  TopLevelConfig blk
cfg
  ChainDbView m blk
chainDbView
  FetchClientRegistry PeerId (HeaderWithTime blk) blk m
fetchClientRegistry
  SharedResources {PeerId
srPeerId :: PeerId
srPeerId :: forall (m :: * -> *) blk. SharedResources m blk -> PeerId
srPeerId}
  ChainSyncResources {ChainSyncServer (Header blk) (Point blk) (Tip blk) m ()
csrServer :: ChainSyncServer (Header blk) (Point blk) (Tip blk) m ()
csrServer :: forall (m :: * -> *) blk.
ChainSyncResources m blk
-> ChainSyncServer (Header blk) (Point blk) (Tip blk) m ()
csrServer}
  ChainSyncTimeout
chainSyncTimeouts_
  ChainSyncLoPBucketConfig
chainSyncLoPBucketConfig
  CSJConfig
csjConfig
  StateViewTracers blk m
tracers
  ChainSyncClientHandleCollection PeerId m blk
varHandles
  = do
    (clientChannel, serverChannel) <- m (Channel
     m (AnyMessage (ChainSync (Header blk) (Point blk) (Tip blk))),
   Channel
     m (AnyMessage (ChainSync (Header blk) (Point blk) (Tip blk))))
forall (m :: * -> *) a. MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels
    clientThread <-
      forkLinkedThread registry ("ChainSyncClient" <> condense srPeerId) $
        bracketSyncWithFetchClient fetchClientRegistry srPeerId $
          ChainSync.runChainSyncClient tracer cfg chainDbView srPeerId chainSyncTimeouts_ chainSyncLoPBucketConfig csjConfig tracers varHandles clientChannel
    serverThread <-
      forkLinkedThread registry ("ChainSyncServer" <> condense srPeerId) $
        ChainSync.runChainSyncServer tracer srPeerId tracers csrServer serverChannel
    pure (clientThread, serverThread)

-- | Start the BlockFetch client, using the supplied 'FetchClientRegistry' to
-- register it for synchronization with the ChainSync client.
startBlockFetchConnectionThread ::
  (IOLike m, MonadTime m, MonadTimer m, HasHeader blk, HasHeader (Header blk), ShowProxy blk) =>
  ResourceRegistry m ->
  Tracer m (TraceEvent blk) ->
  StateViewTracers blk m ->
  FetchClientRegistry PeerId (HeaderWithTime blk) blk m ->
  ControlMessageSTM m ->
  SharedResources m blk ->
  BlockFetchResources m blk ->
  BlockFetchTimeout ->
  m (Thread m (), Thread m ())
startBlockFetchConnectionThread :: forall (m :: * -> *) blk.
(IOLike m, MonadTime m, MonadTimer m, HasHeader blk,
 HasHeader (Header blk), ShowProxy blk) =>
ResourceRegistry m
-> Tracer m (TraceEvent blk)
-> StateViewTracers blk m
-> FetchClientRegistry PeerId (HeaderWithTime blk) blk m
-> ControlMessageSTM m
-> SharedResources m blk
-> BlockFetchResources m blk
-> BlockFetchTimeout
-> m (Thread m (), Thread m ())
startBlockFetchConnectionThread
  ResourceRegistry m
registry
  Tracer m (TraceEvent blk)
tracer
  StateViewTracers blk m
tracers
  FetchClientRegistry PeerId (HeaderWithTime blk) blk m
fetchClientRegistry
  ControlMessageSTM m
controlMsgSTM
  SharedResources {PeerId
srPeerId :: forall (m :: * -> *) blk. SharedResources m blk -> PeerId
srPeerId :: PeerId
srPeerId}
  BlockFetchResources {BlockFetchServer blk (Point blk) m ()
bfrServer :: BlockFetchServer blk (Point blk) m ()
bfrServer :: forall (m :: * -> *) blk.
BlockFetchResources m blk -> BlockFetchServer blk (Point blk) m ()
bfrServer}
  BlockFetchTimeout
blockFetchTimeouts = do
    (clientChannel, serverChannel) <- m (Channel m (AnyMessage (BlockFetch blk (Point blk))),
   Channel m (AnyMessage (BlockFetch blk (Point blk))))
forall (m :: * -> *) a. MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels
    clientThread <-
      forkLinkedThread registry ("BlockFetchClient" <> condense srPeerId) $
        BlockFetch.runBlockFetchClient tracer srPeerId blockFetchTimeouts tracers fetchClientRegistry controlMsgSTM clientChannel
    serverThread <-
      forkLinkedThread registry ("BlockFetchServer" <> condense srPeerId) $
        BlockFetch.runBlockFetchServer tracer srPeerId tracers bfrServer serverChannel
    pure (clientThread, serverThread)

-- | Wait for the given duration, but if the duration is longer than the minimum
-- duration in the live cycle, shutdown the node and restart it after the delay.
smartDelay ::
  (MonadDelay m) =>
  NodeLifecycle blk m ->
  LiveNode blk m ->
  DiffTime ->
  m (LiveNode blk m)
smartDelay :: forall (m :: * -> *) blk.
MonadDelay m =>
NodeLifecycle blk m
-> LiveNode blk m -> DiffTime -> m (LiveNode blk m)
smartDelay lifecycle :: NodeLifecycle blk m
lifecycle@NodeLifecycle {LiveIntervalResult blk -> m (LiveNode blk m)
nlStart :: LiveIntervalResult blk -> m (LiveNode blk m)
nlStart :: forall blk (m :: * -> *).
NodeLifecycle blk m -> LiveIntervalResult blk -> m (LiveNode blk m)
nlStart, LiveNode blk m -> m (LiveIntervalResult blk)
nlShutdown :: LiveNode blk m -> m (LiveIntervalResult blk)
nlShutdown :: forall blk (m :: * -> *).
NodeLifecycle blk m -> LiveNode blk m -> m (LiveIntervalResult blk)
nlShutdown} LiveNode blk m
node DiffTime
duration
  | NodeLifecycle blk m -> DiffTime -> Bool
forall blk (m :: * -> *). NodeLifecycle blk m -> DiffTime -> Bool
itIsTimeToRestartTheNode NodeLifecycle blk m
lifecycle DiffTime
duration = do
    results <- LiveNode blk m -> m (LiveIntervalResult blk)
nlShutdown LiveNode blk m
node
    threadDelay duration
    nlStart results
smartDelay NodeLifecycle blk m
_ LiveNode blk m
node DiffTime
duration = do
  DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
duration
  LiveNode blk m -> m (LiveNode blk m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure LiveNode blk m
node

itIsTimeToRestartTheNode :: NodeLifecycle blk m -> DiffTime -> Bool
itIsTimeToRestartTheNode :: forall blk (m :: * -> *). NodeLifecycle blk m -> DiffTime -> Bool
itIsTimeToRestartTheNode NodeLifecycle {Maybe DiffTime
nlMinDuration :: Maybe DiffTime
nlMinDuration :: forall blk (m :: * -> *). NodeLifecycle blk m -> Maybe DiffTime
nlMinDuration} DiffTime
duration =
  case Maybe DiffTime
nlMinDuration of
    Just DiffTime
minInterval -> DiffTime
duration DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> DiffTime
minInterval
    Maybe DiffTime
Nothing          -> Bool
False

-- | The 'Tick' contains a state update for a specific peer.
-- If the peer has not terminated by protocol rules, this will update its TMVar
-- with the new state, thereby unblocking the handler that's currently waiting
-- for new instructions.
--
-- TODO doc is outdated
dispatchTick :: forall m blk.
  (IOLike m, HasHeader (Header blk)) =>
  Tracer m (TraceSchedulerEvent blk) ->
  STM m (Map PeerId (ChainSyncClientHandle m blk)) ->
  Map PeerId (PeerResources m blk) ->
  NodeLifecycle blk m ->
  LiveNode blk m ->
  (Int, (DiffTime, Peer (NodeState blk))) ->
  m (LiveNode blk m)
dispatchTick :: forall (m :: * -> *) blk.
(IOLike m, HasHeader (Header blk)) =>
Tracer m (TraceSchedulerEvent blk)
-> STM m (Map PeerId (ChainSyncClientHandle m blk))
-> Map PeerId (PeerResources m blk)
-> NodeLifecycle blk m
-> LiveNode blk m
-> (Int, (DiffTime, Peer (NodeState blk)))
-> m (LiveNode blk m)
dispatchTick Tracer m (TraceSchedulerEvent blk)
tracer STM m (Map PeerId (ChainSyncClientHandle m blk))
varHandles Map PeerId (PeerResources m blk)
peers NodeLifecycle blk m
lifecycle LiveNode blk m
node (Int
number, (DiffTime
duration, Peer PeerId
pid NodeState blk
state)) =
  case Map PeerId (PeerResources m blk)
peers Map PeerId (PeerResources m blk)
-> PeerId -> Maybe (PeerResources m blk)
forall k a. Ord k => Map k a -> k -> Maybe a
Map.!? PeerId
pid of
    Just PeerResources {NodeState blk -> STM m ()
prUpdateState :: NodeState blk -> STM m ()
prUpdateState :: forall (m :: * -> *) blk.
PeerResources m blk -> NodeState blk -> STM m ()
prUpdateState} -> do
      m ()
traceNewTick
      STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (NodeState blk -> STM m ()
prUpdateState NodeState blk
state)
      newNode <- NodeLifecycle blk m
-> LiveNode blk m -> DiffTime -> m (LiveNode blk m)
forall (m :: * -> *) blk.
MonadDelay m =>
NodeLifecycle blk m
-> LiveNode blk m -> DiffTime -> m (LiveNode blk m)
smartDelay NodeLifecycle blk m
lifecycle LiveNode blk m
node DiffTime
duration
      traceWith (lnStateTracer newNode) ()
      pure newNode
    Maybe (PeerResources m blk)
Nothing -> String -> m (LiveNode blk m)
forall a. HasCallStack => String -> a
error String
"“The impossible happened,” as GHC would say."
  where
    traceNewTick :: m ()
    traceNewTick :: m ()
traceNewTick = do
      currentChain <- STM m (AnchoredFragment (Header blk))
-> m (AnchoredFragment (Header blk))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (AnchoredFragment (Header blk))
 -> m (AnchoredFragment (Header blk)))
-> STM m (AnchoredFragment (Header blk))
-> m (AnchoredFragment (Header blk))
forall a b. (a -> b) -> a -> b
$ ChainDB m blk -> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) blk.
ChainDB m blk -> STM m (AnchoredFragment (Header blk))
ChainDB.getCurrentChain (LiveNode blk m -> ChainDB m blk
forall blk (m :: * -> *). LiveNode blk m -> ChainDB m blk
lnChainDb LiveNode blk m
node)
      (csState, jumpingStates) <- atomically $ do
         m <- varHandles
         csState <- traverse (readTVar . CSClient.cschState) (m Map.!? pid)
         jumpingStates <- forM (Map.toList m) $ \(PeerId
peer, ChainSyncClientHandle m blk
h) -> do
           st <- StrictTVar m (ChainSyncJumpingState m blk)
-> STM m (ChainSyncJumpingState m blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (ChainSyncClientHandle m blk
-> StrictTVar m (ChainSyncJumpingState m blk)
forall (m :: * -> *) blk.
ChainSyncClientHandle m blk
-> StrictTVar m (ChainSyncJumpingState m blk)
CSClient.cschJumping ChainSyncClientHandle m blk
h)
           pure (peer, st)
         pure (csState, jumpingStates)
      traceWith tracer $ TraceNewTick
        number
        duration
        (Peer pid state)
        currentChain
        (dropTimeFromFragment . CSClient.csCandidate <$> csState)
        jumpingStates

-- | Iterate over a 'PointSchedule', sending each tick to the associated peer in turn,
-- giving each peer a chunk of computation time, sequentially, until it satisfies the
-- conditions given by the tick.
-- This usually means for the ChainSync server to have sent the target header to the
-- client.
runScheduler ::
  (IOLike m, HasHeader (Header blk)) =>
  Tracer m (TraceSchedulerEvent blk) ->
  STM m (Map PeerId (ChainSyncClientHandle m blk)) ->
  PointSchedule blk ->
  Map PeerId (PeerResources m blk) ->
  NodeLifecycle blk m ->
  m (ChainDB m blk, StateViewTracers blk m)
runScheduler :: forall (m :: * -> *) blk.
(IOLike m, HasHeader (Header blk)) =>
Tracer m (TraceSchedulerEvent blk)
-> STM m (Map PeerId (ChainSyncClientHandle m blk))
-> PointSchedule blk
-> Map PeerId (PeerResources m blk)
-> NodeLifecycle blk m
-> m (ChainDB m blk, StateViewTracers blk m)
runScheduler Tracer m (TraceSchedulerEvent blk)
tracer STM m (Map PeerId (ChainSyncClientHandle m blk))
varHandles ps :: PointSchedule blk
ps@PointSchedule{Time
psMinEndTime :: Time
psMinEndTime :: forall blk. PointSchedule blk -> Time
psMinEndTime} Map PeerId (PeerResources m blk)
peers lifecycle :: NodeLifecycle blk m
lifecycle@NodeLifecycle {LiveIntervalResult blk -> m (LiveNode blk m)
nlStart :: forall blk (m :: * -> *).
NodeLifecycle blk m -> LiveIntervalResult blk -> m (LiveNode blk m)
nlStart :: LiveIntervalResult blk -> m (LiveNode blk m)
nlStart} = do
  node0 <- LiveIntervalResult blk -> m (LiveNode blk m)
nlStart LiveIntervalResult {lirActive :: Set PeerId
lirActive = Map PeerId (PeerResources m blk) -> Set PeerId
forall k a. Map k a -> Set k
Map.keysSet Map PeerId (PeerResources m blk)
peers, lirPeerResults :: [PeerSimulatorResult blk]
lirPeerResults = []}
  traceWith tracer TraceBeginningOfTime
  nodeEnd <- foldM tick node0 (zip [0..] (peersStatesRelative ps))
  let extraDelay = case Int
-> [(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))]
forall a. Int -> [a] -> [a]
take Int
1 ([(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))])
-> [(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))]
forall a b. (a -> b) -> a -> b
$ [(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))]
forall a. [a] -> [a]
reverse ([(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))])
-> [(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))]
forall a b. (a -> b) -> a -> b
$ PointSchedule blk -> [(Time, Peer (NodeState blk))]
forall blk. PointSchedule blk -> [(Time, Peer (NodeState blk))]
peersStates PointSchedule blk
ps of
        [(Time
t, Peer (NodeState blk)
_)] -> if Time
t Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
psMinEndTime
            then DiffTime -> Maybe DiffTime
forall a. a -> Maybe a
Just (DiffTime -> Maybe DiffTime) -> DiffTime -> Maybe DiffTime
forall a b. (a -> b) -> a -> b
$ Time -> Time -> DiffTime
diffTime Time
psMinEndTime Time
t
            else Maybe DiffTime
forall a. Maybe a
Nothing
        [(Time, Peer (NodeState blk))]
_        -> DiffTime -> Maybe DiffTime
forall a. a -> Maybe a
Just (DiffTime -> Maybe DiffTime) -> DiffTime -> Maybe DiffTime
forall a b. (a -> b) -> a -> b
$ Time -> DiffTime
forall a b. Coercible a b => a -> b
coerce Time
psMinEndTime
  LiveNode{lnChainDb, lnStateViewTracers} <-
    case extraDelay of
      Just DiffTime
duration -> do
        nodeEnd' <- NodeLifecycle blk m
-> LiveNode blk m -> DiffTime -> m (LiveNode blk m)
forall (m :: * -> *) blk.
MonadDelay m =>
NodeLifecycle blk m
-> LiveNode blk m -> DiffTime -> m (LiveNode blk m)
smartDelay NodeLifecycle blk m
lifecycle LiveNode blk m
nodeEnd DiffTime
duration
        -- Give an opportunity to the node to finish whatever it was doing at
        -- shutdown
        when (itIsTimeToRestartTheNode lifecycle duration) $
          threadDelay $ coerce psMinEndTime
        pure nodeEnd'
      Maybe DiffTime
Nothing ->
        LiveNode blk m -> m (LiveNode blk m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure LiveNode blk m
nodeEnd
  traceWith tracer TraceEndOfTime
  pure (lnChainDb, lnStateViewTracers)
  where
    tick :: LiveNode blk m
-> (Int, (DiffTime, Peer (NodeState blk))) -> m (LiveNode blk m)
tick = Tracer m (TraceSchedulerEvent blk)
-> STM m (Map PeerId (ChainSyncClientHandle m blk))
-> Map PeerId (PeerResources m blk)
-> NodeLifecycle blk m
-> LiveNode blk m
-> (Int, (DiffTime, Peer (NodeState blk)))
-> m (LiveNode blk m)
forall (m :: * -> *) blk.
(IOLike m, HasHeader (Header blk)) =>
Tracer m (TraceSchedulerEvent blk)
-> STM m (Map PeerId (ChainSyncClientHandle m blk))
-> Map PeerId (PeerResources m blk)
-> NodeLifecycle blk m
-> LiveNode blk m
-> (Int, (DiffTime, Peer (NodeState blk)))
-> m (LiveNode blk m)
dispatchTick Tracer m (TraceSchedulerEvent blk)
tracer STM m (Map PeerId (ChainSyncClientHandle m blk))
varHandles Map PeerId (PeerResources m blk)
peers NodeLifecycle blk m
lifecycle

-- | Create the shared resource for the LoE if the feature is enabled in the config.
-- This is used by the ChainDB and the GDD governor.
mkLoEVar ::
  IOLike m =>
  SchedulerConfig ->
  m (LoE (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock))))
mkLoEVar :: forall (m :: * -> *).
IOLike m =>
SchedulerConfig
-> m (LoE
        (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock))))
mkLoEVar SchedulerConfig {Bool
scEnableLoE :: SchedulerConfig -> Bool
scEnableLoE :: Bool
scEnableLoE}
  | Bool
scEnableLoE
  = StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock))
-> LoE (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock)))
forall a. a -> LoE a
LoEEnabled (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock))
 -> LoE
      (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock))))
-> m (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock)))
-> m (LoE
        (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock))))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AnchoredFragment (HeaderWithTime TestBlock)
-> m (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock)))
forall (m :: * -> *) a.
(HasCallStack, MonadSTM m, NoThunks a) =>
a -> m (StrictTVar m a)
newTVarIO (Anchor (HeaderWithTime TestBlock)
-> AnchoredFragment (HeaderWithTime TestBlock)
forall v a b. Anchorable v a b => a -> AnchoredSeq v a b
AF.Empty Anchor (HeaderWithTime TestBlock)
forall block. Anchor block
AF.AnchorGenesis)
  | Bool
otherwise
  = LoE (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock)))
-> m (LoE
        (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock))))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure LoE (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock)))
forall a. LoE a
LoEDisabled

mkStateTracer ::
  IOLike m =>
  SchedulerConfig ->
  GenesisTest TestBlock s ->
  PeerSimulatorResources m TestBlock ->
  ChainDB m TestBlock ->
  m (Tracer m ())
mkStateTracer :: forall (m :: * -> *) s.
IOLike m =>
SchedulerConfig
-> GenesisTest TestBlock s
-> PeerSimulatorResources m TestBlock
-> ChainDB m TestBlock
-> m (Tracer m ())
mkStateTracer SchedulerConfig
schedulerConfig GenesisTest {BlockTree TestBlock
gtBlockTree :: BlockTree TestBlock
gtBlockTree :: forall blk schedule. GenesisTest blk schedule -> BlockTree blk
gtBlockTree} PeerSimulatorResources {ChainSyncClientHandleCollection PeerId m TestBlock
psrHandles :: ChainSyncClientHandleCollection PeerId m TestBlock
psrHandles :: forall (m :: * -> *) blk.
PeerSimulatorResources m blk
-> ChainSyncClientHandleCollection PeerId m TestBlock
psrHandles, Map PeerId (PeerResources m TestBlock)
psrPeers :: Map PeerId (PeerResources m TestBlock)
psrPeers :: forall (m :: * -> *) blk.
PeerSimulatorResources m blk -> Map PeerId (PeerResources m blk)
psrPeers} ChainDB m TestBlock
chainDb
  | SchedulerConfig -> Bool
scTraceState SchedulerConfig
schedulerConfig
  , let getCandidates :: STM m (Map PeerId (AnchoredFragment (HeaderWithTime TestBlock)))
getCandidates = STM m (Map PeerId (ChainSyncClientHandle m TestBlock))
-> (ChainSyncState TestBlock
    -> AnchoredFragment (HeaderWithTime TestBlock))
-> STM m (Map PeerId (AnchoredFragment (HeaderWithTime TestBlock)))
forall (m :: * -> *) peer blk a.
IOLike m =>
STM m (Map peer (ChainSyncClientHandle m blk))
-> (ChainSyncState blk -> a) -> STM m (Map peer a)
viewChainSyncState (ChainSyncClientHandleCollection PeerId m TestBlock
-> STM m (Map PeerId (ChainSyncClientHandle m TestBlock))
forall peer (m :: * -> *) blk.
ChainSyncClientHandleCollection peer m blk
-> STM m (Map peer (ChainSyncClientHandle m blk))
cschcMap ChainSyncClientHandleCollection PeerId m TestBlock
psrHandles) ChainSyncState TestBlock
-> AnchoredFragment (HeaderWithTime TestBlock)
forall blk.
ChainSyncState blk -> AnchoredFragment (HeaderWithTime blk)
CSClient.csCandidate
        getCurrentChain :: STM m (AnchoredFragment (Header TestBlock))
getCurrentChain = ChainDB m TestBlock -> STM m (AnchoredFragment (Header TestBlock))
forall (m :: * -> *) blk.
ChainDB m blk -> STM m (AnchoredFragment (Header blk))
ChainDB.getCurrentChain ChainDB m TestBlock
chainDb
        getPoints :: STM m (Map PeerId (Maybe (NodeState TestBlock)))
getPoints = (StrictTVar m (Maybe (NodeState TestBlock))
 -> STM m (Maybe (NodeState TestBlock)))
-> Map PeerId (StrictTVar m (Maybe (NodeState TestBlock)))
-> STM m (Map PeerId (Maybe (NodeState TestBlock)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Map PeerId a -> f (Map PeerId b)
traverse StrictTVar m (Maybe (NodeState TestBlock))
-> STM m (Maybe (NodeState TestBlock))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (SharedResources m TestBlock
-> StrictTVar m (Maybe (NodeState TestBlock))
forall (m :: * -> *) blk.
SharedResources m blk -> StrictTVar m (Maybe (NodeState blk))
srCurrentState (SharedResources m TestBlock
 -> StrictTVar m (Maybe (NodeState TestBlock)))
-> (PeerResources m TestBlock -> SharedResources m TestBlock)
-> PeerResources m TestBlock
-> StrictTVar m (Maybe (NodeState TestBlock))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PeerResources m TestBlock -> SharedResources m TestBlock
forall (m :: * -> *) blk.
PeerResources m blk -> SharedResources m blk
prShared (PeerResources m TestBlock
 -> StrictTVar m (Maybe (NodeState TestBlock)))
-> Map PeerId (PeerResources m TestBlock)
-> Map PeerId (StrictTVar m (Maybe (NodeState TestBlock)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map PeerId (PeerResources m TestBlock)
psrPeers)
  = BlockTree TestBlock
-> STM m (AnchoredFragment (Header TestBlock))
-> STM m (Map PeerId (AnchoredFragment (Header TestBlock)))
-> STM m (Map PeerId (Maybe (NodeState TestBlock)))
-> m (Tracer m ())
forall (m :: * -> *).
IOLike m =>
BlockTree TestBlock
-> STM m (AnchoredFragment (Header TestBlock))
-> STM m (Map PeerId (AnchoredFragment (Header TestBlock)))
-> STM m (Map PeerId (Maybe (NodeState TestBlock)))
-> m (Tracer m ())
peerSimStateDiagramSTMTracerDebug
        BlockTree TestBlock
gtBlockTree
        STM m (AnchoredFragment (Header TestBlock))
getCurrentChain
        ((Map PeerId (AnchoredFragment (HeaderWithTime TestBlock))
 -> Map PeerId (AnchoredFragment (Header TestBlock)))
-> STM m (Map PeerId (AnchoredFragment (HeaderWithTime TestBlock)))
-> STM m (Map PeerId (AnchoredFragment (Header TestBlock)))
forall a b. (a -> b) -> STM m a -> STM m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((AnchoredFragment (HeaderWithTime TestBlock)
 -> AnchoredFragment (Header TestBlock))
-> Map PeerId (AnchoredFragment (HeaderWithTime TestBlock))
-> Map PeerId (AnchoredFragment (Header TestBlock))
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map AnchoredFragment (HeaderWithTime TestBlock)
-> AnchoredFragment (Header TestBlock)
forall blk.
HasHeader (Header blk) =>
AnchoredFragment (HeaderWithTime blk)
-> AnchoredFragment (Header blk)
dropTimeFromFragment) STM m (Map PeerId (AnchoredFragment (HeaderWithTime TestBlock)))
getCandidates)
        STM m (Map PeerId (Maybe (NodeState TestBlock)))
getPoints
  | Bool
otherwise
  = Tracer m () -> m (Tracer m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Tracer m ()
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer

-- | Start all threads for ChainSync, BlockFetch and GDD, using the resources
-- for a single live interval.
-- Only start peers that haven't been disconnected in a previous interval,
-- provided by 'LiveIntervalResult'.
startNode ::
  forall m.
  ( IOLike m
  , MonadTime m
  , MonadTimer m
  ) =>
  SchedulerConfig ->
  GenesisTestFull TestBlock ->
  LiveInterval TestBlock m ->
  m ()
startNode :: forall (m :: * -> *).
(IOLike m, MonadTime m, MonadTimer m) =>
SchedulerConfig
-> GenesisTestFull TestBlock -> LiveInterval TestBlock m -> m ()
startNode SchedulerConfig
schedulerConfig GenesisTestFull TestBlock
genesisTest LiveInterval TestBlock m
interval = do
  let handles :: ChainSyncClientHandleCollection PeerId m TestBlock
handles = PeerSimulatorResources m TestBlock
-> ChainSyncClientHandleCollection PeerId m TestBlock
forall (m :: * -> *) blk.
PeerSimulatorResources m blk
-> ChainSyncClientHandleCollection PeerId m TestBlock
psrHandles PeerSimulatorResources m TestBlock
lrPeerSim
  fetchClientRegistry <- m (FetchClientRegistry
     PeerId (HeaderWithTime TestBlock) TestBlock m)
forall (m :: * -> *) peer header block.
MonadSTM m =>
m (FetchClientRegistry peer header block m)
newFetchClientRegistry
  let chainDbView = ChainDB m TestBlock -> ChainDbView m TestBlock
forall (m :: * -> *) blk. ChainDB m blk -> ChainDbView m blk
CSClient.defaultChainDbView ChainDB m TestBlock
lnChainDb
      activePeers = Map PeerId (PeerResources m TestBlock)
-> [(PeerId, PeerResources m TestBlock)]
forall k a. Map k a -> [(k, a)]
Map.toList (Map PeerId (PeerResources m TestBlock)
 -> [(PeerId, PeerResources m TestBlock)])
-> Map PeerId (PeerResources m TestBlock)
-> [(PeerId, PeerResources m TestBlock)]
forall a b. (a -> b) -> a -> b
$ Map PeerId (PeerResources m TestBlock)
-> Set PeerId -> Map PeerId (PeerResources m TestBlock)
forall k a. Ord k => Map k a -> Set k -> Map k a
Map.restrictKeys (PeerSimulatorResources m TestBlock
-> Map PeerId (PeerResources m TestBlock)
forall (m :: * -> *) blk.
PeerSimulatorResources m blk -> Map PeerId (PeerResources m blk)
psrPeers PeerSimulatorResources m TestBlock
lrPeerSim) (LiveIntervalResult TestBlock -> Set PeerId
forall blk. LiveIntervalResult blk -> Set PeerId
lirActive LiveIntervalResult TestBlock
liveResult)
      peersStartOrder = [PeerId]
psStartOrder [PeerId] -> [PeerId] -> [PeerId]
forall a. [a] -> [a] -> [a]
++ [PeerId] -> [PeerId]
forall a. Ord a => [a] -> [a]
sort [PeerId
pid | (PeerId
pid, PeerResources m TestBlock
_) <- [(PeerId, PeerResources m TestBlock)]
activePeers, PeerId
pid PeerId -> [PeerId] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` [PeerId]
psStartOrder]
      activePeersOrdered = [
          PeerResources m TestBlock
peerResources
          | PeerId
pid <- [PeerId]
peersStartOrder
          , (PeerId
pid', PeerResources m TestBlock
peerResources) <- [(PeerId, PeerResources m TestBlock)]
activePeers
          , PeerId
pid PeerId -> PeerId -> Bool
forall a. Eq a => a -> a -> Bool
== PeerId
pid'
          ]
  for_ activePeersOrdered $ \PeerResources {SharedResources m TestBlock
prShared :: forall (m :: * -> *) blk.
PeerResources m blk -> SharedResources m blk
prShared :: SharedResources m TestBlock
prShared, ChainSyncResources m TestBlock
prChainSync :: ChainSyncResources m TestBlock
prChainSync :: forall (m :: * -> *) blk.
PeerResources m blk -> ChainSyncResources m blk
prChainSync, BlockFetchResources m TestBlock
prBlockFetch :: BlockFetchResources m TestBlock
prBlockFetch :: forall (m :: * -> *) blk.
PeerResources m blk -> BlockFetchResources m blk
prBlockFetch} -> do
    let pid :: PeerId
pid = SharedResources m TestBlock -> PeerId
forall (m :: * -> *) blk. SharedResources m blk -> PeerId
srPeerId SharedResources m TestBlock
prShared
    ResourceRegistry m -> String -> m () -> m (Thread m ())
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
lrRegistry (String
"Peer overview " String -> String -> String
forall a. [a] -> [a] -> [a]
++ PeerId -> String
forall a. Show a => a -> String
show PeerId
pid) (m () -> m (Thread m ())) -> m () -> m (Thread m ())
forall a b. (a -> b) -> a -> b
$
      -- The peerRegistry helps ensuring that if any thread fails, then
      -- the registry is closed and all threads related to the peer are
      -- killed.
      (ResourceRegistry m -> m ()) -> m ()
forall (m :: * -> *) a.
(MonadSTM m, MonadMask m, MonadThread m, HasCallStack) =>
(ResourceRegistry m -> m a) -> m a
withRegistry ((ResourceRegistry m -> m ()) -> m ())
-> (ResourceRegistry m -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \ResourceRegistry m
peerRegistry -> do
        (csClient, csServer) <-
          ResourceRegistry m
-> Tracer m (TraceEvent TestBlock)
-> TopLevelConfig TestBlock
-> ChainDbView m TestBlock
-> FetchClientRegistry
     PeerId (HeaderWithTime TestBlock) TestBlock m
-> SharedResources m TestBlock
-> ChainSyncResources m TestBlock
-> ChainSyncTimeout
-> ChainSyncLoPBucketConfig
-> CSJConfig
-> StateViewTracers TestBlock m
-> ChainSyncClientHandleCollection PeerId m TestBlock
-> m (Thread m (), Thread m ())
forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsProtocol blk, ShowProxy blk,
 ShowProxy (Header blk)) =>
ResourceRegistry m
-> Tracer m (TraceEvent blk)
-> TopLevelConfig blk
-> ChainDbView m blk
-> FetchClientRegistry PeerId (HeaderWithTime blk) blk m
-> SharedResources m blk
-> ChainSyncResources m blk
-> ChainSyncTimeout
-> ChainSyncLoPBucketConfig
-> CSJConfig
-> StateViewTracers blk m
-> ChainSyncClientHandleCollection PeerId m blk
-> m (Thread m (), Thread m ())
startChainSyncConnectionThread
          ResourceRegistry m
peerRegistry
          Tracer m (TraceEvent TestBlock)
tracer
          TopLevelConfig TestBlock
lrConfig
          ChainDbView m TestBlock
chainDbView
          FetchClientRegistry PeerId (HeaderWithTime TestBlock) TestBlock m
fetchClientRegistry
          SharedResources m TestBlock
prShared
          ChainSyncResources m TestBlock
prChainSync
          ChainSyncTimeout
chainSyncTimeouts_
          ChainSyncLoPBucketConfig
chainSyncLoPBucketConfig
          CSJConfig
csjConfig
          StateViewTracers TestBlock m
lnStateViewTracers
          ChainSyncClientHandleCollection PeerId m TestBlock
handles
        BlockFetch.startKeepAliveThread peerRegistry fetchClientRegistry pid
        (bfClient, bfServer) <-
          startBlockFetchConnectionThread
          peerRegistry
          tracer
          lnStateViewTracers
          fetchClientRegistry
          (pure Continue)
          prShared
          prBlockFetch
          blockFetchTimeouts_
        waitAnyThread [csClient, csServer, bfClient, bfServer]
  -- The block fetch logic needs to be started after the block fetch clients
  -- otherwise, an internal assertion fails because getCandidates yields more
  -- peer fragments than registered clients.
  BlockFetch.startBlockFetchLogic
    (scEnableChainSelStarvation schedulerConfig)
    lrRegistry
    lrTracer
    lnChainDb
    fetchClientRegistry
    handles

  for_ lrLoEVar $ \ StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock))
var -> do
      ResourceRegistry m
-> String
-> Watcher
     m
     (GsmState, GDDStateView m TestBlock PeerId)
     (Map PeerId (StrictMaybe (WithOrigin SlotNo), Bool))
-> m (Thread m Void)
forall (m :: * -> *) a fp.
(IOLike m, Eq fp, HasCallStack) =>
ResourceRegistry m -> String -> Watcher m a fp -> m (Thread m Void)
forkLinkedWatcher ResourceRegistry m
lrRegistry String
"LoE updater background" (Watcher
   m
   (GsmState, GDDStateView m TestBlock PeerId)
   (Map PeerId (StrictMaybe (WithOrigin SlotNo), Bool))
 -> m (Thread m Void))
-> Watcher
     m
     (GsmState, GDDStateView m TestBlock PeerId)
     (Map PeerId (StrictMaybe (WithOrigin SlotNo), Bool))
-> m (Thread m Void)
forall a b. (a -> b) -> a -> b
$
        TopLevelConfig TestBlock
-> Tracer m (TraceGDDEvent PeerId TestBlock)
-> ChainDB m TestBlock
-> DiffTime
-> STM m GsmState
-> STM m (Map PeerId (ChainSyncClientHandle m TestBlock))
-> StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock))
-> Watcher
     m
     (GsmState, GDDStateView m TestBlock PeerId)
     (Map PeerId (StrictMaybe (WithOrigin SlotNo), Bool))
forall (m :: * -> *) blk peer.
(IOLike m, Ord peer, LedgerSupportsProtocol blk,
 HasHardForkHistory blk) =>
TopLevelConfig blk
-> Tracer m (TraceGDDEvent peer blk)
-> ChainDB m blk
-> DiffTime
-> STM m GsmState
-> STM m (Map peer (ChainSyncClientHandle m blk))
-> StrictTVar m (AnchoredFragment (HeaderWithTime blk))
-> Watcher
     m
     (GsmState, GDDStateView m blk peer)
     (Map peer (StrictMaybe (WithOrigin SlotNo), Bool))
gddWatcher
          TopLevelConfig TestBlock
lrConfig
          (Tracer m (TraceEvent TestBlock)
-> Tracer m (TraceGDDEvent PeerId TestBlock)
forall (m :: * -> *).
Tracer m (TraceEvent TestBlock)
-> Tracer m (TraceGDDEvent PeerId TestBlock)
mkGDDTracerTestBlock Tracer m (TraceEvent TestBlock)
lrTracer)
          ChainDB m TestBlock
lnChainDb
          DiffTime
0.0 -- The rate limit makes simpler the calculations of how long tests
              -- should run and still should produce interesting interleavings.
              -- It is similar to the setting of bfcDecisionLoopInterval in
              -- Test.Consensus.PeerSimulator.BlockFetch
          (GsmState -> STM m GsmState
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure GsmState
GSM.Syncing) -- TODO actually run GSM
          (ChainSyncClientHandleCollection PeerId m TestBlock
-> STM m (Map PeerId (ChainSyncClientHandle m TestBlock))
forall peer (m :: * -> *) blk.
ChainSyncClientHandleCollection peer m blk
-> STM m (Map peer (ChainSyncClientHandle m blk))
cschcMap ChainSyncClientHandleCollection PeerId m TestBlock
handles)
          StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock))
var

  void $ forkLinkedWatcher lrRegistry "CSJ invariants watcher" $
    CSJInvariants.watcher (cschcMap handles)
  where
    LiveResources {ResourceRegistry m
lrRegistry :: ResourceRegistry m
lrRegistry :: forall blk (m :: * -> *). LiveResources blk m -> ResourceRegistry m
lrRegistry, Tracer m (TraceEvent TestBlock)
lrTracer :: Tracer m (TraceEvent TestBlock)
lrTracer :: forall blk (m :: * -> *).
LiveResources blk m -> Tracer m (TraceEvent blk)
lrTracer, TopLevelConfig TestBlock
lrConfig :: TopLevelConfig TestBlock
lrConfig :: forall blk (m :: * -> *). LiveResources blk m -> TopLevelConfig blk
lrConfig, PeerSimulatorResources m TestBlock
lrPeerSim :: PeerSimulatorResources m TestBlock
lrPeerSim :: forall blk (m :: * -> *).
LiveResources blk m -> PeerSimulatorResources m blk
lrPeerSim, LoE (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock)))
lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock)))
lrLoEVar :: forall blk (m :: * -> *).
LiveResources blk m
-> LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar} = LiveResources TestBlock m
resources

    LiveInterval {
        liResources :: forall blk (m :: * -> *). LiveInterval blk m -> LiveResources blk m
liResources = LiveResources TestBlock m
resources
      , liResult :: forall blk (m :: * -> *).
LiveInterval blk m -> LiveIntervalResult blk
liResult = LiveIntervalResult TestBlock
liveResult
      , liNode :: forall blk (m :: * -> *). LiveInterval blk m -> LiveNode blk m
liNode = LiveNode {ChainDB m TestBlock
lnChainDb :: forall blk (m :: * -> *). LiveNode blk m -> ChainDB m blk
lnChainDb :: ChainDB m TestBlock
lnChainDb, StateViewTracers TestBlock m
lnStateViewTracers :: forall blk (m :: * -> *). LiveNode blk m -> StateViewTracers blk m
lnStateViewTracers :: StateViewTracers TestBlock m
lnStateViewTracers}
      } = LiveInterval TestBlock m
interval

    GenesisTest
      { ChainSyncTimeout
gtChainSyncTimeouts :: ChainSyncTimeout
gtChainSyncTimeouts :: forall blk schedule. GenesisTest blk schedule -> ChainSyncTimeout
gtChainSyncTimeouts
      , BlockFetchTimeout
gtBlockFetchTimeouts :: BlockFetchTimeout
gtBlockFetchTimeouts :: forall blk schedule. GenesisTest blk schedule -> BlockFetchTimeout
gtBlockFetchTimeouts
      , gtLoPBucketParams :: forall blk schedule. GenesisTest blk schedule -> LoPBucketParams
gtLoPBucketParams = LoPBucketParams { Integer
lbpCapacity :: Integer
lbpCapacity :: LoPBucketParams -> Integer
lbpCapacity, Rational
lbpRate :: Rational
lbpRate :: LoPBucketParams -> Rational
lbpRate }
      , gtCSJParams :: forall blk schedule. GenesisTest blk schedule -> CSJParams
gtCSJParams = CSJParams { SlotNo
csjpJumpSize :: SlotNo
csjpJumpSize :: CSJParams -> SlotNo
csjpJumpSize }
      , gtSchedule :: forall blk schedule. GenesisTest blk schedule -> schedule
gtSchedule = PointSchedule {[PeerId]
psStartOrder :: [PeerId]
psStartOrder :: forall blk. PointSchedule blk -> [PeerId]
psStartOrder}
      } = GenesisTestFull TestBlock
genesisTest

    StateViewTracers{Tracer m (TraceEvent TestBlock)
svtTraceTracer :: Tracer m (TraceEvent TestBlock)
svtTraceTracer :: forall blk (m :: * -> *).
StateViewTracers blk m -> Tracer m (TraceEvent blk)
svtTraceTracer} = StateViewTracers TestBlock m
lnStateViewTracers

    -- FIXME: This type of configuration should move to `Trace.mkTracer`.
    tracer :: Tracer m (TraceEvent TestBlock)
tracer = if SchedulerConfig -> Bool
scTrace SchedulerConfig
schedulerConfig
      then (TraceEvent TestBlock -> m ()) -> Tracer m (TraceEvent TestBlock)
forall (m :: * -> *) a. (a -> m ()) -> Tracer m a
Tracer (\TraceEvent TestBlock
evt -> Tracer m (TraceEvent TestBlock) -> TraceEvent TestBlock -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent TestBlock)
lrTracer TraceEvent TestBlock
evt m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Tracer m (TraceEvent TestBlock) -> TraceEvent TestBlock -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent TestBlock)
svtTraceTracer TraceEvent TestBlock
evt)
      else Tracer m (TraceEvent TestBlock)
svtTraceTracer

    chainSyncTimeouts_ :: ChainSyncTimeout
chainSyncTimeouts_ =
      if SchedulerConfig -> Bool
scEnableChainSyncTimeouts SchedulerConfig
schedulerConfig
        then ChainSyncTimeout
gtChainSyncTimeouts
        else ChainSyncTimeout
ChainSync.chainSyncNoTimeouts

    chainSyncLoPBucketConfig :: ChainSyncLoPBucketConfig
chainSyncLoPBucketConfig =
      if SchedulerConfig -> Bool
scEnableLoP SchedulerConfig
schedulerConfig
        then ChainSyncLoPBucketEnabledConfig -> ChainSyncLoPBucketConfig
ChainSyncLoPBucketEnabled ChainSyncLoPBucketEnabledConfig { csbcCapacity :: Integer
csbcCapacity = Integer
lbpCapacity, csbcRate :: Rational
csbcRate = Rational
lbpRate }
        else ChainSyncLoPBucketConfig
ChainSyncLoPBucketDisabled

    csjConfig :: CSJConfig
csjConfig =
      if SchedulerConfig -> Bool
scEnableCSJ SchedulerConfig
schedulerConfig
        then CSJEnabledConfig -> CSJConfig
CSJEnabled CSJEnabledConfig { csjcJumpSize :: SlotNo
csjcJumpSize = SlotNo
csjpJumpSize }
        else CSJConfig
CSJDisabled

    blockFetchTimeouts_ :: BlockFetchTimeout
blockFetchTimeouts_ =
      if SchedulerConfig -> Bool
scEnableBlockFetchTimeouts SchedulerConfig
schedulerConfig
        then BlockFetchTimeout
gtBlockFetchTimeouts
        else BlockFetchTimeout
BlockFetch.blockFetchNoTimeouts

-- | Set up all resources related to node start/shutdown.
nodeLifecycle ::
  (IOLike m, MonadTime m, MonadTimer m) =>
  SchedulerConfig ->
  GenesisTestFull TestBlock ->
  Tracer m (TraceEvent TestBlock) ->
  ResourceRegistry m ->
  PeerSimulatorResources m TestBlock ->
  m (NodeLifecycle TestBlock m)
nodeLifecycle :: forall (m :: * -> *).
(IOLike m, MonadTime m, MonadTimer m) =>
SchedulerConfig
-> GenesisTestFull TestBlock
-> Tracer m (TraceEvent TestBlock)
-> ResourceRegistry m
-> PeerSimulatorResources m TestBlock
-> m (NodeLifecycle TestBlock m)
nodeLifecycle SchedulerConfig
schedulerConfig GenesisTestFull TestBlock
genesisTest Tracer m (TraceEvent TestBlock)
lrTracer ResourceRegistry m
lrRegistry PeerSimulatorResources m TestBlock
lrPeerSim = do
  lrCdb <- m (NodeDBs (StrictTMVar m MockFS))
forall (m :: * -> *).
MonadSTM m =>
m (NodeDBs (StrictTMVar m MockFS))
emptyNodeDBs
  lrLoEVar <- mkLoEVar schedulerConfig
  let
    resources =
      LiveResources {
          ResourceRegistry m
lrRegistry :: ResourceRegistry m
lrRegistry :: ResourceRegistry m
lrRegistry
        , Tracer m (TraceEvent TestBlock)
lrTracer :: Tracer m (TraceEvent TestBlock)
lrTracer :: Tracer m (TraceEvent TestBlock)
lrTracer
        , lrSTracer :: ChainDB m TestBlock -> m (Tracer m ())
lrSTracer = SchedulerConfig
-> GenesisTestFull TestBlock
-> PeerSimulatorResources m TestBlock
-> ChainDB m TestBlock
-> m (Tracer m ())
forall (m :: * -> *) s.
IOLike m =>
SchedulerConfig
-> GenesisTest TestBlock s
-> PeerSimulatorResources m TestBlock
-> ChainDB m TestBlock
-> m (Tracer m ())
mkStateTracer SchedulerConfig
schedulerConfig GenesisTestFull TestBlock
genesisTest PeerSimulatorResources m TestBlock
lrPeerSim
        , TopLevelConfig TestBlock
lrConfig :: TopLevelConfig TestBlock
lrConfig :: TopLevelConfig TestBlock
lrConfig
        , PeerSimulatorResources m TestBlock
lrPeerSim :: PeerSimulatorResources m TestBlock
lrPeerSim :: PeerSimulatorResources m TestBlock
lrPeerSim
        , NodeDBs (StrictTMVar m MockFS)
lrCdb :: NodeDBs (StrictTMVar m MockFS)
lrCdb :: NodeDBs (StrictTMVar m MockFS)
lrCdb
        , LoE (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock)))
lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock)))
lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (HeaderWithTime TestBlock)))
lrLoEVar
        }
  pure NodeLifecycle {
      nlMinDuration = scDowntime schedulerConfig
    , nlStart = lifecycleStart (startNode schedulerConfig genesisTest) resources
    , nlShutdown = lifecycleStop resources
    }
  where
    lrConfig :: TopLevelConfig TestBlock
lrConfig = SecurityParam
-> ForecastRange -> GenesisWindow -> TopLevelConfig TestBlock
defaultCfg SecurityParam
k ForecastRange
gtForecastRange GenesisWindow
gtGenesisWindow

    GenesisTest {
        gtSecurityParam :: forall blk schedule. GenesisTest blk schedule -> SecurityParam
gtSecurityParam = SecurityParam
k
      , ForecastRange
gtForecastRange :: ForecastRange
gtForecastRange :: forall blk schedule. GenesisTest blk schedule -> ForecastRange
gtForecastRange
      , GenesisWindow
gtGenesisWindow :: GenesisWindow
gtGenesisWindow :: forall blk schedule. GenesisTest blk schedule -> GenesisWindow
gtGenesisWindow
      } = GenesisTestFull TestBlock
genesisTest

-- | Construct STM resources, set up ChainSync and BlockFetch threads, and
-- send all ticks in a 'PointSchedule' to all given peers in turn.
runPointSchedule ::
  forall m.
  (IOLike m, MonadTime m, MonadTimer m) =>
  SchedulerConfig ->
  GenesisTestFull TestBlock ->
  Tracer m (TraceEvent TestBlock) ->
  m (StateView TestBlock)
runPointSchedule :: forall (m :: * -> *).
(IOLike m, MonadTime m, MonadTimer m) =>
SchedulerConfig
-> GenesisTestFull TestBlock
-> Tracer m (TraceEvent TestBlock)
-> m (StateView TestBlock)
runPointSchedule SchedulerConfig
schedulerConfig GenesisTestFull TestBlock
genesisTest Tracer m (TraceEvent TestBlock)
tracer0 =
  (ResourceRegistry m -> m (StateView TestBlock))
-> m (StateView TestBlock)
forall (m :: * -> *) a.
(MonadSTM m, MonadMask m, MonadThread m, HasCallStack) =>
(ResourceRegistry m -> m a) -> m a
withRegistry ((ResourceRegistry m -> m (StateView TestBlock))
 -> m (StateView TestBlock))
-> (ResourceRegistry m -> m (StateView TestBlock))
-> m (StateView TestBlock)
forall a b. (a -> b) -> a -> b
$ \ResourceRegistry m
registry -> do
    peerSim <- Tracer m (TraceEvent TestBlock)
-> BlockTree TestBlock
-> NonEmpty PeerId
-> m (PeerSimulatorResources m TestBlock)
forall (m :: * -> *).
IOLike m =>
Tracer m (TraceEvent TestBlock)
-> BlockTree TestBlock
-> NonEmpty PeerId
-> m (PeerSimulatorResources m TestBlock)
makePeerSimulatorResources Tracer m (TraceEvent TestBlock)
tracer BlockTree TestBlock
gtBlockTree ([PeerId] -> NonEmpty PeerId
forall a. HasCallStack => [a] -> NonEmpty a
NonEmpty.fromList ([PeerId] -> NonEmpty PeerId) -> [PeerId] -> NonEmpty PeerId
forall a b. (a -> b) -> a -> b
$ Peers (PeerSchedule TestBlock) -> [PeerId]
forall a. Peers a -> [PeerId]
getPeerIds (Peers (PeerSchedule TestBlock) -> [PeerId])
-> Peers (PeerSchedule TestBlock) -> [PeerId]
forall a b. (a -> b) -> a -> b
$ PointSchedule TestBlock -> Peers (PeerSchedule TestBlock)
forall blk. PointSchedule blk -> Peers (PeerSchedule blk)
psSchedule PointSchedule TestBlock
gtSchedule)
    lifecycle <- nodeLifecycle schedulerConfig genesisTest tracer registry peerSim
    (chainDb, stateViewTracers) <- runScheduler
      (Tracer $ traceWith tracer . TraceSchedulerEvent)
      (cschcMap (psrHandles peerSim))
      gtSchedule
      (psrPeers peerSim)
      lifecycle
    snapshotStateView stateViewTracers chainDb
  where

    GenesisTest {
        BlockTree TestBlock
gtBlockTree :: forall blk schedule. GenesisTest blk schedule -> BlockTree blk
gtBlockTree :: BlockTree TestBlock
gtBlockTree
      , PointSchedule TestBlock
gtSchedule :: forall blk schedule. GenesisTest blk schedule -> schedule
gtSchedule :: PointSchedule TestBlock
gtSchedule
      } = GenesisTestFull TestBlock
genesisTest

    -- FIXME: This type of configuration should move to `Trace.mkTracer`.
    tracer :: Tracer m (TraceEvent TestBlock)
tracer = if SchedulerConfig -> Bool
scTrace SchedulerConfig
schedulerConfig then Tracer m (TraceEvent TestBlock)
tracer0 else Tracer m (TraceEvent TestBlock)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer