{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

module Test.Consensus.PeerSimulator.Run
  ( SchedulerConfig (..)
  , debugScheduler
  , defaultSchedulerConfig
  , runPointSchedule
  ) where

import Control.Monad (foldM, forM, void, when)
import Control.Monad.Class.MonadTime (MonadTime)
import Control.Monad.Class.MonadTimer.SI (MonadTimer)
import Control.ResourceRegistry
import Control.Tracer (Tracer (..), nullTracer, traceWith)
import Data.Coerce (coerce)
import Data.Foldable (for_)
import Data.List (sort)
import qualified Data.List.NonEmpty as NonEmpty
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Typeable
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config (TopLevelConfig (..))
import Ouroboros.Consensus.Config.SupportsNode (ConfigSupportsNode)
import Ouroboros.Consensus.Genesis.Governor (gddWatcher)
import Ouroboros.Consensus.HardFork.Abstract (HasHardForkHistory)
import Ouroboros.Consensus.HeaderValidation (HeaderWithTime)
import Ouroboros.Consensus.Ledger.Basics (LedgerState)
import Ouroboros.Consensus.Ledger.Inspect (InspectLedger)
import Ouroboros.Consensus.Ledger.SupportsProtocol
  ( LedgerSupportsProtocol
  )
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
  ( CSJConfig (..)
  , CSJEnabledConfig (..)
  , ChainDbView
  , ChainSyncClientHandle
  , ChainSyncClientHandleCollection (..)
  , ChainSyncLoPBucketConfig (..)
  , ChainSyncLoPBucketEnabledConfig (..)
  , viewChainSyncState
  )
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
import qualified Ouroboros.Consensus.Node.GsmState as GSM
import Ouroboros.Consensus.Node.ProtocolInfo (ProtocolInfo (..))
import Ouroboros.Consensus.Storage.ChainDB.API
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl as ChainDB
import Ouroboros.Consensus.Storage.LedgerDB.API
  ( CanUpgradeLedgerTables
  )
import Ouroboros.Consensus.Util.Condense (Condense (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.STM (forkLinkedWatcher)
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.BlockFetch
  ( FetchClientRegistry
  , bracketSyncWithFetchClient
  , newFetchClientRegistry
  )
import Ouroboros.Network.Channel (createConnectedChannels)
import Ouroboros.Network.ControlMessage
  ( ControlMessage (..)
  , ControlMessageSTM
  )
import Ouroboros.Network.Util.ShowProxy (ShowProxy)
import qualified Test.Consensus.PeerSimulator.BlockFetch as BlockFetch
import qualified Test.Consensus.PeerSimulator.CSJInvariants as CSJInvariants
import qualified Test.Consensus.PeerSimulator.ChainSync as ChainSync
import Test.Consensus.PeerSimulator.NodeLifecycle
import Test.Consensus.PeerSimulator.Resources
import Test.Consensus.PeerSimulator.StateDiagram
  ( peerSimStateDiagramSTMTracerDebug
  )
import Test.Consensus.PeerSimulator.StateView
import Test.Consensus.PeerSimulator.Trace
import Test.Consensus.PointSchedule
  ( BlockFetchTimeout
  , CSJParams (..)
  , ChainSyncTimeout
  , GenesisTest (..)
  , GenesisTestFull
  , HasPointScheduleTestParams (..)
  , LoPBucketParams (..)
  , PointSchedule (..)
  , peersStates
  , peersStatesRelative
  )
import Test.Consensus.PointSchedule.NodeState (NodeState)
import Test.Consensus.PointSchedule.Peers
  ( Peer (..)
  , PeerId
  , getPeerIds
  )
import Test.Util.ChainDB
import Test.Util.Header (dropTimeFromFragment)
import Test.Util.Orphans.IOLike ()

-- | Behavior config for the scheduler.
data SchedulerConfig
  = SchedulerConfig
  { SchedulerConfig -> Bool
scEnableChainSyncTimeouts :: Bool
  -- ^ Whether to enable timeouts for the ChainSync protocol. The value of
  -- timeouts themselves is defined in 'GenesisTest'.
  , SchedulerConfig -> Bool
scEnableBlockFetchTimeouts :: Bool
  -- ^ Whether to enable timeouts for the BlockFetch protocol. The value of
  -- timeouts themselves is defined in 'GenesisTest'.
  , SchedulerConfig -> Bool
scDebug :: Bool
  -- ^ If 'True', 'Test.Consensus.Genesis.Setup.runTest' will print traces
  -- to stderr.
  --
  -- Use 'debugScheduler' to toggle it conveniently.
  , SchedulerConfig -> Bool
scTrace :: Bool
  -- ^ Whether to trace when running the scheduler.
  , SchedulerConfig -> Bool
scTraceState :: Bool
  -- ^ Whether to trace only the current state of the candidates and selection,
  -- which provides a less verbose view of the test progress.
  , SchedulerConfig -> Bool
scEnableLoE :: Bool
  -- ^ Enable Limit on Eagerness (LoE) and the Genesis Density Disconnection
  -- governor (GDD).
  , SchedulerConfig -> Bool
scEnableLoP :: Bool
  -- ^ Whether to enable the LoP. The parameters of the LoP come from
  -- 'GenesisTest'.
  , SchedulerConfig -> Maybe DiffTime
scDowntime :: Maybe DiffTime
  -- ^ Enable node downtime if this is 'Just', using the value as minimum tick
  -- duration to trigger it.
  , SchedulerConfig -> Bool
scEnableChainSelStarvation :: Bool
  -- ^ Enable the use of ChainSel starvation information in the block fetch
  -- decision logic. It is never actually disabled, but rather the grace
  -- period is made virtually infinite.
  , SchedulerConfig -> Bool
scEnableCSJ :: Bool
  -- ^ Whether to enable ChainSync Jumping. The parameters come from
  -- 'GenesisTest'.
  }

-- | Default scheduler config
defaultSchedulerConfig :: SchedulerConfig
defaultSchedulerConfig :: SchedulerConfig
defaultSchedulerConfig =
  SchedulerConfig
    { scEnableChainSyncTimeouts :: Bool
scEnableChainSyncTimeouts = Bool
True
    , scEnableBlockFetchTimeouts :: Bool
scEnableBlockFetchTimeouts = Bool
True
    , scDebug :: Bool
scDebug = Bool
False
    , scTrace :: Bool
scTrace = Bool
True
    , scTraceState :: Bool
scTraceState = Bool
False
    , scEnableLoE :: Bool
scEnableLoE = Bool
False
    , scEnableLoP :: Bool
scEnableLoP = Bool
False
    , scDowntime :: Maybe DiffTime
scDowntime = Maybe DiffTime
forall a. Maybe a
Nothing
    , scEnableChainSelStarvation :: Bool
scEnableChainSelStarvation = Bool
True
    , scEnableCSJ :: Bool
scEnableCSJ = Bool
False
    }

-- | Enable debug tracing during a scheduler test.
debugScheduler :: SchedulerConfig -> SchedulerConfig
debugScheduler :: SchedulerConfig -> SchedulerConfig
debugScheduler SchedulerConfig
conf = SchedulerConfig
conf{scDebug = True}

-- | Run a ChainSync protocol for one peer, consisting of a server and client.
--
-- The connection uses timeouts based on the ASC.
--
-- The client is synchronized with BlockFetch using the supplied 'FetchClientRegistry'.
--
-- Execution is started asynchronously, returning an action that kills the thread,
-- to allow extraction of a potential exception.
startChainSyncConnectionThread ::
  (IOLike m, MonadTimer m, LedgerSupportsProtocol blk, ShowProxy blk, ShowProxy (Header blk)) =>
  ResourceRegistry m ->
  Tracer m (TraceEvent blk) ->
  TopLevelConfig blk ->
  ChainDbView m blk ->
  FetchClientRegistry PeerId (HeaderWithTime blk) blk m ->
  SharedResources m blk ->
  ChainSyncResources m blk ->
  ChainSyncTimeout ->
  ChainSyncLoPBucketConfig ->
  CSJConfig ->
  StateViewTracers blk m ->
  ChainSyncClientHandleCollection PeerId m blk ->
  m (Thread m (), Thread m ())
startChainSyncConnectionThread :: forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsProtocol blk, ShowProxy blk,
 ShowProxy (Header blk)) =>
ResourceRegistry m
-> Tracer m (TraceEvent blk)
-> TopLevelConfig blk
-> ChainDbView m blk
-> FetchClientRegistry PeerId (HeaderWithTime blk) blk m
-> SharedResources m blk
-> ChainSyncResources m blk
-> ChainSyncTimeout
-> ChainSyncLoPBucketConfig
-> CSJConfig
-> StateViewTracers blk m
-> ChainSyncClientHandleCollection PeerId m blk
-> m (Thread m (), Thread m ())
startChainSyncConnectionThread
  ResourceRegistry m
registry
  Tracer m (TraceEvent blk)
tracer
  TopLevelConfig blk
cfg
  ChainDbView m blk
chainDbView
  FetchClientRegistry PeerId (HeaderWithTime blk) blk m
fetchClientRegistry
  SharedResources{PeerId
srPeerId :: PeerId
srPeerId :: forall (m :: * -> *) blk. SharedResources m blk -> PeerId
srPeerId}
  ChainSyncResources{ChainSyncServer (Header blk) (Point blk) (Tip blk) m ()
csrServer :: ChainSyncServer (Header blk) (Point blk) (Tip blk) m ()
csrServer :: forall (m :: * -> *) blk.
ChainSyncResources m blk
-> ChainSyncServer (Header blk) (Point blk) (Tip blk) m ()
csrServer}
  ChainSyncTimeout
chainSyncTimeouts_
  ChainSyncLoPBucketConfig
chainSyncLoPBucketConfig
  CSJConfig
csjConfig
  StateViewTracers blk m
tracers
  ChainSyncClientHandleCollection PeerId m blk
varHandles =
    do
      (clientChannel, serverChannel) <- m (Channel
     m (AnyMessage (ChainSync (Header blk) (Point blk) (Tip blk))),
   Channel
     m (AnyMessage (ChainSync (Header blk) (Point blk) (Tip blk))))
forall (m :: * -> *) a. MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels
      clientThread <-
        forkLinkedThread registry ("ChainSyncClient" <> condense srPeerId) $
          bracketSyncWithFetchClient fetchClientRegistry srPeerId $
            ChainSync.runChainSyncClient
              tracer
              cfg
              chainDbView
              srPeerId
              chainSyncTimeouts_
              chainSyncLoPBucketConfig
              csjConfig
              tracers
              varHandles
              clientChannel
      serverThread <-
        forkLinkedThread registry ("ChainSyncServer" <> condense srPeerId) $
          ChainSync.runChainSyncServer tracer srPeerId tracers csrServer serverChannel
      pure (clientThread, serverThread)

-- | Start the BlockFetch client, using the supplied 'FetchClientRegistry' to
-- register it for synchronization with the ChainSync client.
startBlockFetchConnectionThread ::
  (IOLike m, MonadTime m, MonadTimer m, HasHeader blk, HasHeader (Header blk), ShowProxy blk) =>
  ResourceRegistry m ->
  Tracer m (TraceEvent blk) ->
  StateViewTracers blk m ->
  FetchClientRegistry PeerId (HeaderWithTime blk) blk m ->
  ControlMessageSTM m ->
  SharedResources m blk ->
  BlockFetchResources m blk ->
  BlockFetchTimeout ->
  m (Thread m (), Thread m ())
startBlockFetchConnectionThread :: forall (m :: * -> *) blk.
(IOLike m, MonadTime m, MonadTimer m, HasHeader blk,
 HasHeader (Header blk), ShowProxy blk) =>
ResourceRegistry m
-> Tracer m (TraceEvent blk)
-> StateViewTracers blk m
-> FetchClientRegistry PeerId (HeaderWithTime blk) blk m
-> ControlMessageSTM m
-> SharedResources m blk
-> BlockFetchResources m blk
-> BlockFetchTimeout
-> m (Thread m (), Thread m ())
startBlockFetchConnectionThread
  ResourceRegistry m
registry
  Tracer m (TraceEvent blk)
tracer
  StateViewTracers blk m
tracers
  FetchClientRegistry PeerId (HeaderWithTime blk) blk m
fetchClientRegistry
  ControlMessageSTM m
controlMsgSTM
  SharedResources{PeerId
srPeerId :: forall (m :: * -> *) blk. SharedResources m blk -> PeerId
srPeerId :: PeerId
srPeerId}
  BlockFetchResources{BlockFetchServer blk (Point blk) m ()
bfrServer :: BlockFetchServer blk (Point blk) m ()
bfrServer :: forall (m :: * -> *) blk.
BlockFetchResources m blk -> BlockFetchServer blk (Point blk) m ()
bfrServer}
  BlockFetchTimeout
blockFetchTimeouts = do
    (clientChannel, serverChannel) <- m (Channel m (AnyMessage (BlockFetch blk (Point blk))),
   Channel m (AnyMessage (BlockFetch blk (Point blk))))
forall (m :: * -> *) a. MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels
    clientThread <-
      forkLinkedThread registry ("BlockFetchClient" <> condense srPeerId) $
        BlockFetch.runBlockFetchClient
          tracer
          srPeerId
          blockFetchTimeouts
          tracers
          fetchClientRegistry
          controlMsgSTM
          clientChannel
    serverThread <-
      forkLinkedThread registry ("BlockFetchServer" <> condense srPeerId) $
        BlockFetch.runBlockFetchServer tracer srPeerId tracers bfrServer serverChannel
    pure (clientThread, serverThread)

-- | Wait for the given duration, but if the duration is longer than the minimum
-- duration in the live cycle, shutdown the node and restart it after the delay.
smartDelay ::
  MonadDelay m =>
  NodeLifecycle blk m ->
  LiveNode blk m ->
  DiffTime ->
  m (LiveNode blk m)
smartDelay :: forall (m :: * -> *) blk.
MonadDelay m =>
NodeLifecycle blk m
-> LiveNode blk m -> DiffTime -> m (LiveNode blk m)
smartDelay lifecycle :: NodeLifecycle blk m
lifecycle@NodeLifecycle{LiveIntervalResult blk -> m (LiveNode blk m)
nlStart :: LiveIntervalResult blk -> m (LiveNode blk m)
nlStart :: forall blk (m :: * -> *).
NodeLifecycle blk m -> LiveIntervalResult blk -> m (LiveNode blk m)
nlStart, LiveNode blk m -> m (LiveIntervalResult blk)
nlShutdown :: LiveNode blk m -> m (LiveIntervalResult blk)
nlShutdown :: forall blk (m :: * -> *).
NodeLifecycle blk m -> LiveNode blk m -> m (LiveIntervalResult blk)
nlShutdown} LiveNode blk m
node DiffTime
duration
  | NodeLifecycle blk m -> DiffTime -> Bool
forall blk (m :: * -> *). NodeLifecycle blk m -> DiffTime -> Bool
itIsTimeToRestartTheNode NodeLifecycle blk m
lifecycle DiffTime
duration = do
      results <- LiveNode blk m -> m (LiveIntervalResult blk)
nlShutdown LiveNode blk m
node
      threadDelay duration
      nlStart results
smartDelay NodeLifecycle blk m
_ LiveNode blk m
node DiffTime
duration = do
  DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
duration
  LiveNode blk m -> m (LiveNode blk m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure LiveNode blk m
node

itIsTimeToRestartTheNode :: NodeLifecycle blk m -> DiffTime -> Bool
itIsTimeToRestartTheNode :: forall blk (m :: * -> *). NodeLifecycle blk m -> DiffTime -> Bool
itIsTimeToRestartTheNode NodeLifecycle{Maybe DiffTime
nlMinDuration :: Maybe DiffTime
nlMinDuration :: forall blk (m :: * -> *). NodeLifecycle blk m -> Maybe DiffTime
nlMinDuration} DiffTime
duration =
  case Maybe DiffTime
nlMinDuration of
    Just DiffTime
minInterval -> DiffTime
duration DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> DiffTime
minInterval
    Maybe DiffTime
Nothing -> Bool
False

-- | The 'Tick' contains a state update for a specific peer.
-- If the peer has not terminated by protocol rules, this will update its TMVar
-- with the new state, thereby unblocking the handler that's currently waiting
-- for new instructions.
--
-- TODO doc is outdated
dispatchTick ::
  forall m blk.
  (IOLike m, HasHeader (Header blk)) =>
  Tracer m (TraceSchedulerEvent blk) ->
  STM m (Map PeerId (ChainSyncClientHandle m blk)) ->
  Map PeerId (PeerResources m blk) ->
  NodeLifecycle blk m ->
  LiveNode blk m ->
  (Int, (DiffTime, Peer (NodeState blk))) ->
  m (LiveNode blk m)
dispatchTick :: forall (m :: * -> *) blk.
(IOLike m, HasHeader (Header blk)) =>
Tracer m (TraceSchedulerEvent blk)
-> STM m (Map PeerId (ChainSyncClientHandle m blk))
-> Map PeerId (PeerResources m blk)
-> NodeLifecycle blk m
-> LiveNode blk m
-> (Int, (DiffTime, Peer (NodeState blk)))
-> m (LiveNode blk m)
dispatchTick Tracer m (TraceSchedulerEvent blk)
tracer STM m (Map PeerId (ChainSyncClientHandle m blk))
varHandles Map PeerId (PeerResources m blk)
peers NodeLifecycle blk m
lifecycle LiveNode blk m
node (Int
number, (DiffTime
duration, Peer PeerId
pid NodeState blk
state)) =
  case Map PeerId (PeerResources m blk)
peers Map PeerId (PeerResources m blk)
-> PeerId -> Maybe (PeerResources m blk)
forall k a. Ord k => Map k a -> k -> Maybe a
Map.!? PeerId
pid of
    Just PeerResources{NodeState blk -> STM m ()
prUpdateState :: NodeState blk -> STM m ()
prUpdateState :: forall (m :: * -> *) blk.
PeerResources m blk -> NodeState blk -> STM m ()
prUpdateState} -> do
      m ()
traceNewTick
      STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (NodeState blk -> STM m ()
prUpdateState NodeState blk
state)
      newNode <- NodeLifecycle blk m
-> LiveNode blk m -> DiffTime -> m (LiveNode blk m)
forall (m :: * -> *) blk.
MonadDelay m =>
NodeLifecycle blk m
-> LiveNode blk m -> DiffTime -> m (LiveNode blk m)
smartDelay NodeLifecycle blk m
lifecycle LiveNode blk m
node DiffTime
duration
      traceWith (lnStateTracer newNode) ()
      pure newNode
    Maybe (PeerResources m blk)
Nothing -> String -> m (LiveNode blk m)
forall a. HasCallStack => String -> a
error String
"“The impossible happened,” as GHC would say."
 where
  traceNewTick :: m ()
  traceNewTick :: m ()
traceNewTick = do
    currentChain <- STM m (AnchoredFragment (Header blk))
-> m (AnchoredFragment (Header blk))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (AnchoredFragment (Header blk))
 -> m (AnchoredFragment (Header blk)))
-> STM m (AnchoredFragment (Header blk))
-> m (AnchoredFragment (Header blk))
forall a b. (a -> b) -> a -> b
$ ChainDB m blk -> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) blk.
ChainDB m blk -> STM m (AnchoredFragment (Header blk))
ChainDB.getCurrentChain (LiveNode blk m -> ChainDB m blk
forall blk (m :: * -> *). LiveNode blk m -> ChainDB m blk
lnChainDb LiveNode blk m
node)
    (csState, jumpingStates) <- atomically $ do
      m <- varHandles
      csState <- traverse (readTVar . CSClient.cschState) (m Map.!? pid)
      jumpingStates <- forM (Map.toList m) $ \(PeerId
peer, ChainSyncClientHandle m blk
h) -> do
        st <- StrictTVar m (ChainSyncJumpingState m blk)
-> STM m (ChainSyncJumpingState m blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (ChainSyncClientHandle m blk
-> StrictTVar m (ChainSyncJumpingState m blk)
forall (m :: * -> *) blk.
ChainSyncClientHandle m blk
-> StrictTVar m (ChainSyncJumpingState m blk)
CSClient.cschJumping ChainSyncClientHandle m blk
h)
        pure (peer, st)
      pure (csState, jumpingStates)
    traceWith tracer $
      TraceNewTick
        number
        duration
        (Peer pid state)
        currentChain
        (dropTimeFromFragment . CSClient.csCandidate <$> csState)
        jumpingStates

-- | Iterate over a 'PointSchedule', sending each tick to the associated peer in turn,
-- giving each peer a chunk of computation time, sequentially, until it satisfies the
-- conditions given by the tick.
-- This usually means for the ChainSync server to have sent the target header to the
-- client.
runScheduler ::
  (IOLike m, HasHeader (Header blk)) =>
  Tracer m (TraceSchedulerEvent blk) ->
  STM m (Map PeerId (ChainSyncClientHandle m blk)) ->
  PointSchedule blk ->
  Map PeerId (PeerResources m blk) ->
  NodeLifecycle blk m ->
  m (ChainDB m blk, StateViewTracers blk m)
runScheduler :: forall (m :: * -> *) blk.
(IOLike m, HasHeader (Header blk)) =>
Tracer m (TraceSchedulerEvent blk)
-> STM m (Map PeerId (ChainSyncClientHandle m blk))
-> PointSchedule blk
-> Map PeerId (PeerResources m blk)
-> NodeLifecycle blk m
-> m (ChainDB m blk, StateViewTracers blk m)
runScheduler Tracer m (TraceSchedulerEvent blk)
tracer STM m (Map PeerId (ChainSyncClientHandle m blk))
varHandles ps :: PointSchedule blk
ps@PointSchedule{Time
psMinEndTime :: Time
psMinEndTime :: forall blk. PointSchedule blk -> Time
psMinEndTime} Map PeerId (PeerResources m blk)
peers lifecycle :: NodeLifecycle blk m
lifecycle@NodeLifecycle{LiveIntervalResult blk -> m (LiveNode blk m)
nlStart :: forall blk (m :: * -> *).
NodeLifecycle blk m -> LiveIntervalResult blk -> m (LiveNode blk m)
nlStart :: LiveIntervalResult blk -> m (LiveNode blk m)
nlStart} = do
  node0 <- LiveIntervalResult blk -> m (LiveNode blk m)
nlStart LiveIntervalResult{lirActive :: Set PeerId
lirActive = Map PeerId (PeerResources m blk) -> Set PeerId
forall k a. Map k a -> Set k
Map.keysSet Map PeerId (PeerResources m blk)
peers, lirPeerResults :: [PeerSimulatorResult blk]
lirPeerResults = []}
  traceWith tracer TraceBeginningOfTime
  nodeEnd <- foldM tick node0 (zip [0 ..] (peersStatesRelative ps))
  let extraDelay = case Int
-> [(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))]
forall a. Int -> [a] -> [a]
take Int
1 ([(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))])
-> [(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))]
forall a b. (a -> b) -> a -> b
$ [(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))]
forall a. [a] -> [a]
reverse ([(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))])
-> [(Time, Peer (NodeState blk))] -> [(Time, Peer (NodeState blk))]
forall a b. (a -> b) -> a -> b
$ PointSchedule blk -> [(Time, Peer (NodeState blk))]
forall blk. PointSchedule blk -> [(Time, Peer (NodeState blk))]
peersStates PointSchedule blk
ps of
        [(Time
t, Peer (NodeState blk)
_)] ->
          if Time
t Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
psMinEndTime
            then DiffTime -> Maybe DiffTime
forall a. a -> Maybe a
Just (DiffTime -> Maybe DiffTime) -> DiffTime -> Maybe DiffTime
forall a b. (a -> b) -> a -> b
$ Time -> Time -> DiffTime
diffTime Time
psMinEndTime Time
t
            else Maybe DiffTime
forall a. Maybe a
Nothing
        [(Time, Peer (NodeState blk))]
_ -> DiffTime -> Maybe DiffTime
forall a. a -> Maybe a
Just (DiffTime -> Maybe DiffTime) -> DiffTime -> Maybe DiffTime
forall a b. (a -> b) -> a -> b
$ Time -> DiffTime
forall a b. Coercible a b => a -> b
coerce Time
psMinEndTime
  LiveNode{lnChainDb, lnStateViewTracers} <-
    case extraDelay of
      Just DiffTime
duration -> do
        nodeEnd' <- NodeLifecycle blk m
-> LiveNode blk m -> DiffTime -> m (LiveNode blk m)
forall (m :: * -> *) blk.
MonadDelay m =>
NodeLifecycle blk m
-> LiveNode blk m -> DiffTime -> m (LiveNode blk m)
smartDelay NodeLifecycle blk m
lifecycle LiveNode blk m
nodeEnd DiffTime
duration
        -- Give an opportunity to the node to finish whatever it was doing at
        -- shutdown
        when (itIsTimeToRestartTheNode lifecycle duration) $
          threadDelay $
            coerce psMinEndTime
        pure nodeEnd'
      Maybe DiffTime
Nothing ->
        LiveNode blk m -> m (LiveNode blk m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure LiveNode blk m
nodeEnd
  traceWith tracer TraceEndOfTime
  pure (lnChainDb, lnStateViewTracers)
 where
  tick :: LiveNode blk m
-> (Int, (DiffTime, Peer (NodeState blk))) -> m (LiveNode blk m)
tick = Tracer m (TraceSchedulerEvent blk)
-> STM m (Map PeerId (ChainSyncClientHandle m blk))
-> Map PeerId (PeerResources m blk)
-> NodeLifecycle blk m
-> LiveNode blk m
-> (Int, (DiffTime, Peer (NodeState blk)))
-> m (LiveNode blk m)
forall (m :: * -> *) blk.
(IOLike m, HasHeader (Header blk)) =>
Tracer m (TraceSchedulerEvent blk)
-> STM m (Map PeerId (ChainSyncClientHandle m blk))
-> Map PeerId (PeerResources m blk)
-> NodeLifecycle blk m
-> LiveNode blk m
-> (Int, (DiffTime, Peer (NodeState blk)))
-> m (LiveNode blk m)
dispatchTick Tracer m (TraceSchedulerEvent blk)
tracer STM m (Map PeerId (ChainSyncClientHandle m blk))
varHandles Map PeerId (PeerResources m blk)
peers NodeLifecycle blk m
lifecycle

-- | Create the shared resource for the LoE if the feature is enabled in the config.
-- This is used by the ChainDB and the GDD governor.
mkLoEVar ::
  ( IOLike m
  , StandardHash blk
  , NoThunks (Header blk)
  , HasHeader (Header blk)
  , Typeable blk
  ) =>
  SchedulerConfig ->
  m (LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk))))
mkLoEVar :: forall (m :: * -> *) blk.
(IOLike m, StandardHash blk, NoThunks (Header blk),
 HasHeader (Header blk), Typeable blk) =>
SchedulerConfig
-> m (LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk))))
mkLoEVar SchedulerConfig{Bool
scEnableLoE :: SchedulerConfig -> Bool
scEnableLoE :: Bool
scEnableLoE}
  | Bool
scEnableLoE =
      StrictTVar m (AnchoredFragment (HeaderWithTime blk))
-> LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
forall a. a -> LoE a
LoEEnabled (StrictTVar m (AnchoredFragment (HeaderWithTime blk))
 -> LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk))))
-> m (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
-> m (LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk))))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AnchoredFragment (HeaderWithTime blk)
-> m (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
forall (m :: * -> *) a.
(HasCallStack, MonadSTM m, NoThunks a) =>
a -> m (StrictTVar m a)
newTVarIO (Anchor (HeaderWithTime blk)
-> AnchoredFragment (HeaderWithTime blk)
forall v a b. Anchorable v a b => a -> AnchoredSeq v a b
AF.Empty Anchor (HeaderWithTime blk)
forall block. Anchor block
AF.AnchorGenesis)
  | Bool
otherwise =
      LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
-> m (LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk))))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
forall a. LoE a
LoEDisabled

mkStateTracer ::
  IOLike m =>
  (GetHeader blk, HasHeader blk, Eq (Header blk)) =>
  SchedulerConfig ->
  GenesisTest blk s ->
  PeerSimulatorResources m blk ->
  ChainDB m blk ->
  m (Tracer m ())
mkStateTracer :: forall (m :: * -> *) blk s.
(IOLike m, GetHeader blk, HasHeader blk, Eq (Header blk)) =>
SchedulerConfig
-> GenesisTest blk s
-> PeerSimulatorResources m blk
-> ChainDB m blk
-> m (Tracer m ())
mkStateTracer SchedulerConfig
schedulerConfig GenesisTest{BlockTree blk
gtBlockTree :: BlockTree blk
gtBlockTree :: forall blk schedule. GenesisTest blk schedule -> BlockTree blk
gtBlockTree} PeerSimulatorResources{ChainSyncClientHandleCollection PeerId m blk
psrHandles :: ChainSyncClientHandleCollection PeerId m blk
psrHandles :: forall (m :: * -> *) blk.
PeerSimulatorResources m blk
-> ChainSyncClientHandleCollection PeerId m blk
psrHandles, Map PeerId (PeerResources m blk)
psrPeers :: Map PeerId (PeerResources m blk)
psrPeers :: forall (m :: * -> *) blk.
PeerSimulatorResources m blk -> Map PeerId (PeerResources m blk)
psrPeers} ChainDB m blk
chainDb
  | SchedulerConfig -> Bool
scTraceState SchedulerConfig
schedulerConfig
  , let getCandidates :: STM m (Map PeerId (AnchoredFragment (HeaderWithTime blk)))
getCandidates = STM m (Map PeerId (ChainSyncClientHandle m blk))
-> (ChainSyncState blk -> AnchoredFragment (HeaderWithTime blk))
-> STM m (Map PeerId (AnchoredFragment (HeaderWithTime blk)))
forall (m :: * -> *) peer blk a.
IOLike m =>
STM m (Map peer (ChainSyncClientHandle m blk))
-> (ChainSyncState blk -> a) -> STM m (Map peer a)
viewChainSyncState (ChainSyncClientHandleCollection PeerId m blk
-> STM m (Map PeerId (ChainSyncClientHandle m blk))
forall peer (m :: * -> *) blk.
ChainSyncClientHandleCollection peer m blk
-> STM m (Map peer (ChainSyncClientHandle m blk))
cschcMap ChainSyncClientHandleCollection PeerId m blk
psrHandles) ChainSyncState blk -> AnchoredFragment (HeaderWithTime blk)
forall blk.
ChainSyncState blk -> AnchoredFragment (HeaderWithTime blk)
CSClient.csCandidate
        getCurrentChain :: STM m (AnchoredFragment (Header blk))
getCurrentChain = ChainDB m blk -> STM m (AnchoredFragment (Header blk))
forall (m :: * -> *) blk.
ChainDB m blk -> STM m (AnchoredFragment (Header blk))
ChainDB.getCurrentChain ChainDB m blk
chainDb
        getPoints :: STM m (Map PeerId (Maybe (NodeState blk)))
getPoints = (StrictTVar m (Maybe (NodeState blk))
 -> STM m (Maybe (NodeState blk)))
-> Map PeerId (StrictTVar m (Maybe (NodeState blk)))
-> STM m (Map PeerId (Maybe (NodeState blk)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Map PeerId a -> f (Map PeerId b)
traverse StrictTVar m (Maybe (NodeState blk))
-> STM m (Maybe (NodeState blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (SharedResources m blk -> StrictTVar m (Maybe (NodeState blk))
forall (m :: * -> *) blk.
SharedResources m blk -> StrictTVar m (Maybe (NodeState blk))
srCurrentState (SharedResources m blk -> StrictTVar m (Maybe (NodeState blk)))
-> (PeerResources m blk -> SharedResources m blk)
-> PeerResources m blk
-> StrictTVar m (Maybe (NodeState blk))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PeerResources m blk -> SharedResources m blk
forall (m :: * -> *) blk.
PeerResources m blk -> SharedResources m blk
prShared (PeerResources m blk -> StrictTVar m (Maybe (NodeState blk)))
-> Map PeerId (PeerResources m blk)
-> Map PeerId (StrictTVar m (Maybe (NodeState blk)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map PeerId (PeerResources m blk)
psrPeers) =
      BlockTree blk
-> STM m (AnchoredFragment (Header blk))
-> STM m (Map PeerId (AnchoredFragment (Header blk)))
-> STM m (Map PeerId (Maybe (NodeState blk)))
-> m (Tracer m ())
forall (m :: * -> *) blk.
(IOLike m, HasHeader blk, Eq (Header blk), GetHeader blk) =>
BlockTree blk
-> STM m (AnchoredFragment (Header blk))
-> STM m (Map PeerId (AnchoredFragment (Header blk)))
-> STM m (Map PeerId (Maybe (NodeState blk)))
-> m (Tracer m ())
peerSimStateDiagramSTMTracerDebug
        BlockTree blk
gtBlockTree
        STM m (AnchoredFragment (Header blk))
getCurrentChain
        ((Map PeerId (AnchoredFragment (HeaderWithTime blk))
 -> Map PeerId (AnchoredFragment (Header blk)))
-> STM m (Map PeerId (AnchoredFragment (HeaderWithTime blk)))
-> STM m (Map PeerId (AnchoredFragment (Header blk)))
forall a b. (a -> b) -> STM m a -> STM m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((AnchoredFragment (HeaderWithTime blk)
 -> AnchoredFragment (Header blk))
-> Map PeerId (AnchoredFragment (HeaderWithTime blk))
-> Map PeerId (AnchoredFragment (Header blk))
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map AnchoredFragment (HeaderWithTime blk)
-> AnchoredFragment (Header blk)
forall blk.
HasHeader (Header blk) =>
AnchoredFragment (HeaderWithTime blk)
-> AnchoredFragment (Header blk)
dropTimeFromFragment) STM m (Map PeerId (AnchoredFragment (HeaderWithTime blk)))
getCandidates)
        STM m (Map PeerId (Maybe (NodeState blk)))
getPoints
  | Bool
otherwise =
      Tracer m () -> m (Tracer m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Tracer m ()
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer

-- | Start all threads for ChainSync, BlockFetch and GDD, using the resources
-- for a single live interval.
-- Only start peers that haven't been disconnected in a previous interval,
-- provided by 'LiveIntervalResult'.
startNode ::
  forall m blk.
  ( IOLike m
  , MonadTime m
  , MonadTimer m
  , LedgerSupportsProtocol blk
  , ShowProxy blk
  , ShowProxy (Header blk)
  , BlockSupportsDiffusionPipelining blk
  , ConfigSupportsNode blk
  , HasHardForkHistory blk
  ) =>
  ProtocolInfo blk ->
  SchedulerConfig ->
  GenesisTestFull blk ->
  LiveInterval blk m ->
  m ()
startNode :: forall (m :: * -> *) blk.
(IOLike m, MonadTime m, MonadTimer m, LedgerSupportsProtocol blk,
 ShowProxy blk, ShowProxy (Header blk),
 BlockSupportsDiffusionPipelining blk, ConfigSupportsNode blk,
 HasHardForkHistory blk) =>
ProtocolInfo blk
-> SchedulerConfig
-> GenesisTestFull blk
-> LiveInterval blk m
-> m ()
startNode ProtocolInfo blk
protocolInfo SchedulerConfig
schedulerConfig GenesisTestFull blk
genesisTest LiveInterval blk m
interval = do
  let handles :: ChainSyncClientHandleCollection PeerId m blk
handles = PeerSimulatorResources m blk
-> ChainSyncClientHandleCollection PeerId m blk
forall (m :: * -> *) blk.
PeerSimulatorResources m blk
-> ChainSyncClientHandleCollection PeerId m blk
psrHandles PeerSimulatorResources m blk
lrPeerSim
  fetchClientRegistry <- m (FetchClientRegistry PeerId (HeaderWithTime blk) blk m)
forall (m :: * -> *) peer header block.
MonadSTM m =>
m (FetchClientRegistry peer header block m)
newFetchClientRegistry
  let chainDbView = ChainDB m blk -> ChainDbView m blk
forall (m :: * -> *) blk. ChainDB m blk -> ChainDbView m blk
CSClient.defaultChainDbView ChainDB m blk
lnChainDb
      activePeers = Map PeerId (PeerResources m blk) -> [(PeerId, PeerResources m blk)]
forall k a. Map k a -> [(k, a)]
Map.toList (Map PeerId (PeerResources m blk)
 -> [(PeerId, PeerResources m blk)])
-> Map PeerId (PeerResources m blk)
-> [(PeerId, PeerResources m blk)]
forall a b. (a -> b) -> a -> b
$ Map PeerId (PeerResources m blk)
-> Set PeerId -> Map PeerId (PeerResources m blk)
forall k a. Ord k => Map k a -> Set k -> Map k a
Map.restrictKeys (PeerSimulatorResources m blk -> Map PeerId (PeerResources m blk)
forall (m :: * -> *) blk.
PeerSimulatorResources m blk -> Map PeerId (PeerResources m blk)
psrPeers PeerSimulatorResources m blk
lrPeerSim) (LiveIntervalResult blk -> Set PeerId
forall blk. LiveIntervalResult blk -> Set PeerId
lirActive LiveIntervalResult blk
liveResult)
      peersStartOrder = [PeerId]
psStartOrder [PeerId] -> [PeerId] -> [PeerId]
forall a. [a] -> [a] -> [a]
++ [PeerId] -> [PeerId]
forall a. Ord a => [a] -> [a]
sort [PeerId
pid | (PeerId
pid, PeerResources m blk
_) <- [(PeerId, PeerResources m blk)]
activePeers, PeerId
pid PeerId -> [PeerId] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` [PeerId]
psStartOrder]
      activePeersOrdered =
        [ PeerResources m blk
peerResources
        | PeerId
pid <- [PeerId]
peersStartOrder
        , (PeerId
pid', PeerResources m blk
peerResources) <- [(PeerId, PeerResources m blk)]
activePeers
        , PeerId
pid PeerId -> PeerId -> Bool
forall a. Eq a => a -> a -> Bool
== PeerId
pid'
        ]
  for_ activePeersOrdered $ \PeerResources{SharedResources m blk
prShared :: forall (m :: * -> *) blk.
PeerResources m blk -> SharedResources m blk
prShared :: SharedResources m blk
prShared, ChainSyncResources m blk
prChainSync :: ChainSyncResources m blk
prChainSync :: forall (m :: * -> *) blk.
PeerResources m blk -> ChainSyncResources m blk
prChainSync, BlockFetchResources m blk
prBlockFetch :: BlockFetchResources m blk
prBlockFetch :: forall (m :: * -> *) blk.
PeerResources m blk -> BlockFetchResources m blk
prBlockFetch} -> do
    let pid :: PeerId
pid = SharedResources m blk -> PeerId
forall (m :: * -> *) blk. SharedResources m blk -> PeerId
srPeerId SharedResources m blk
prShared
    ResourceRegistry m -> String -> m () -> m (Thread m ())
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
lrRegistry (String
"Peer overview " String -> String -> String
forall a. [a] -> [a] -> [a]
++ PeerId -> String
forall a. Show a => a -> String
show PeerId
pid) (m () -> m (Thread m ())) -> m () -> m (Thread m ())
forall a b. (a -> b) -> a -> b
$
      -- The peerRegistry helps ensuring that if any thread fails, then
      -- the registry is closed and all threads related to the peer are
      -- killed.
      (ResourceRegistry m -> m ()) -> m ()
forall (m :: * -> *) a.
(MonadSTM m, MonadMask m, MonadThread m, HasCallStack) =>
(ResourceRegistry m -> m a) -> m a
withRegistry ((ResourceRegistry m -> m ()) -> m ())
-> (ResourceRegistry m -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \ResourceRegistry m
peerRegistry -> do
        (csClient, csServer) <-
          ResourceRegistry m
-> Tracer m (TraceEvent blk)
-> TopLevelConfig blk
-> ChainDbView m blk
-> FetchClientRegistry PeerId (HeaderWithTime blk) blk m
-> SharedResources m blk
-> ChainSyncResources m blk
-> ChainSyncTimeout
-> ChainSyncLoPBucketConfig
-> CSJConfig
-> StateViewTracers blk m
-> ChainSyncClientHandleCollection PeerId m blk
-> m (Thread m (), Thread m ())
forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsProtocol blk, ShowProxy blk,
 ShowProxy (Header blk)) =>
ResourceRegistry m
-> Tracer m (TraceEvent blk)
-> TopLevelConfig blk
-> ChainDbView m blk
-> FetchClientRegistry PeerId (HeaderWithTime blk) blk m
-> SharedResources m blk
-> ChainSyncResources m blk
-> ChainSyncTimeout
-> ChainSyncLoPBucketConfig
-> CSJConfig
-> StateViewTracers blk m
-> ChainSyncClientHandleCollection PeerId m blk
-> m (Thread m (), Thread m ())
startChainSyncConnectionThread
            ResourceRegistry m
peerRegistry
            Tracer m (TraceEvent blk)
tracer
            TopLevelConfig blk
lrConfig
            ChainDbView m blk
chainDbView
            FetchClientRegistry PeerId (HeaderWithTime blk) blk m
fetchClientRegistry
            SharedResources m blk
prShared
            ChainSyncResources m blk
prChainSync
            ChainSyncTimeout
chainSyncTimeouts_
            ChainSyncLoPBucketConfig
chainSyncLoPBucketConfig
            CSJConfig
csjConfig
            StateViewTracers blk m
lnStateViewTracers
            ChainSyncClientHandleCollection PeerId m blk
handles
        BlockFetch.startKeepAliveThread peerRegistry fetchClientRegistry pid
        (bfClient, bfServer) <-
          startBlockFetchConnectionThread
            peerRegistry
            tracer
            lnStateViewTracers
            fetchClientRegistry
            (pure Continue)
            prShared
            prBlockFetch
            blockFetchTimeouts_
        waitAnyThread [csClient, csServer, bfClient, bfServer]
  -- The block fetch logic needs to be started after the block fetch clients
  -- otherwise, an internal assertion fails because getCandidates yields more
  -- peer fragments than registered clients.
  BlockFetch.startBlockFetchLogic
    (scEnableChainSelStarvation schedulerConfig)
    lrRegistry
    lrTracer
    protocolInfo
    lnChainDb
    fetchClientRegistry
    handles

  for_ lrLoEVar $ \StrictTVar m (AnchoredFragment (HeaderWithTime blk))
var -> do
    ResourceRegistry m
-> String
-> Watcher
     m
     (GDDTrigger (GDDStateView m blk PeerId))
     (GDDTrigger (Map PeerId (StrictMaybe (WithOrigin SlotNo), Bool)))
-> m (Thread m Void)
forall (m :: * -> *) a fp.
(IOLike m, Eq fp, HasCallStack) =>
ResourceRegistry m -> String -> Watcher m a fp -> m (Thread m Void)
forkLinkedWatcher ResourceRegistry m
lrRegistry String
"LoE updater background" (Watcher
   m
   (GDDTrigger (GDDStateView m blk PeerId))
   (GDDTrigger (Map PeerId (StrictMaybe (WithOrigin SlotNo), Bool)))
 -> m (Thread m Void))
-> Watcher
     m
     (GDDTrigger (GDDStateView m blk PeerId))
     (GDDTrigger (Map PeerId (StrictMaybe (WithOrigin SlotNo), Bool)))
-> m (Thread m Void)
forall a b. (a -> b) -> a -> b
$
      TopLevelConfig blk
-> Tracer m (TraceGDDEvent PeerId blk)
-> ChainDB m blk
-> DiffTime
-> STM m GsmState
-> STM m (Map PeerId (ChainSyncClientHandle m blk))
-> StrictTVar m (AnchoredFragment (HeaderWithTime blk))
-> Watcher
     m
     (GDDTrigger (GDDStateView m blk PeerId))
     (GDDTrigger (Map PeerId (StrictMaybe (WithOrigin SlotNo), Bool)))
forall (m :: * -> *) blk peer.
(IOLike m, Ord peer, LedgerSupportsProtocol blk,
 HasHardForkHistory blk) =>
TopLevelConfig blk
-> Tracer m (TraceGDDEvent peer blk)
-> ChainDB m blk
-> DiffTime
-> STM m GsmState
-> STM m (Map peer (ChainSyncClientHandle m blk))
-> StrictTVar m (AnchoredFragment (HeaderWithTime blk))
-> Watcher
     m
     (GDDTrigger (GDDStateView m blk peer))
     (GDDTrigger (Map peer (StrictMaybe (WithOrigin SlotNo), Bool)))
gddWatcher
        TopLevelConfig blk
lrConfig
        (Tracer m (TraceEvent blk) -> Tracer m (TraceGDDEvent PeerId blk)
forall (m :: * -> *) blk.
Tracer m (TraceEvent blk) -> Tracer m (TraceGDDEvent PeerId blk)
mkGDDTracerTestBlock Tracer m (TraceEvent blk)
lrTracer)
        ChainDB m blk
lnChainDb
        DiffTime
0.0 -- The rate limit makes simpler the calculations of how long tests
        -- should run and still should produce interesting interleavings.
        -- It is similar to the setting of bfcDecisionLoopInterval in
        -- Test.Consensus.PeerSimulator.BlockFetch
        (GsmState -> STM m GsmState
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure GsmState
GSM.Syncing) -- TODO actually run GSM
        (ChainSyncClientHandleCollection PeerId m blk
-> STM m (Map PeerId (ChainSyncClientHandle m blk))
forall peer (m :: * -> *) blk.
ChainSyncClientHandleCollection peer m blk
-> STM m (Map peer (ChainSyncClientHandle m blk))
cschcMap ChainSyncClientHandleCollection PeerId m blk
handles)
        StrictTVar m (AnchoredFragment (HeaderWithTime blk))
var

  void $
    forkLinkedWatcher lrRegistry "CSJ invariants watcher" $
      CSJInvariants.watcher (cschcMap handles)
 where
  LiveResources{ResourceRegistry m
lrRegistry :: ResourceRegistry m
lrRegistry :: forall blk (m :: * -> *). LiveResources blk m -> ResourceRegistry m
lrRegistry, Tracer m (TraceEvent blk)
lrTracer :: Tracer m (TraceEvent blk)
lrTracer :: forall blk (m :: * -> *).
LiveResources blk m -> Tracer m (TraceEvent blk)
lrTracer, TopLevelConfig blk
lrConfig :: TopLevelConfig blk
lrConfig :: forall blk (m :: * -> *). LiveResources blk m -> TopLevelConfig blk
lrConfig, PeerSimulatorResources m blk
lrPeerSim :: PeerSimulatorResources m blk
lrPeerSim :: forall blk (m :: * -> *).
LiveResources blk m -> PeerSimulatorResources m blk
lrPeerSim, LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar :: forall blk (m :: * -> *).
LiveResources blk m
-> LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar} = LiveResources blk m
resources

  LiveInterval
    { liResources :: forall blk (m :: * -> *). LiveInterval blk m -> LiveResources blk m
liResources = LiveResources blk m
resources
    , liResult :: forall blk (m :: * -> *).
LiveInterval blk m -> LiveIntervalResult blk
liResult = LiveIntervalResult blk
liveResult
    , liNode :: forall blk (m :: * -> *). LiveInterval blk m -> LiveNode blk m
liNode = LiveNode{ChainDB m blk
lnChainDb :: forall blk (m :: * -> *). LiveNode blk m -> ChainDB m blk
lnChainDb :: ChainDB m blk
lnChainDb, StateViewTracers blk m
lnStateViewTracers :: forall blk (m :: * -> *). LiveNode blk m -> StateViewTracers blk m
lnStateViewTracers :: StateViewTracers blk m
lnStateViewTracers}
    } = LiveInterval blk m
interval

  GenesisTest
    { ChainSyncTimeout
gtChainSyncTimeouts :: ChainSyncTimeout
gtChainSyncTimeouts :: forall blk schedule. GenesisTest blk schedule -> ChainSyncTimeout
gtChainSyncTimeouts
    , BlockFetchTimeout
gtBlockFetchTimeouts :: BlockFetchTimeout
gtBlockFetchTimeouts :: forall blk schedule. GenesisTest blk schedule -> BlockFetchTimeout
gtBlockFetchTimeouts
    , gtLoPBucketParams :: forall blk schedule. GenesisTest blk schedule -> LoPBucketParams
gtLoPBucketParams = LoPBucketParams{Integer
lbpCapacity :: Integer
lbpCapacity :: LoPBucketParams -> Integer
lbpCapacity, Rational
lbpRate :: Rational
lbpRate :: LoPBucketParams -> Rational
lbpRate}
    , gtCSJParams :: forall blk schedule. GenesisTest blk schedule -> CSJParams
gtCSJParams = CSJParams{SlotNo
csjpJumpSize :: SlotNo
csjpJumpSize :: CSJParams -> SlotNo
csjpJumpSize}
    , gtSchedule :: forall blk schedule. GenesisTest blk schedule -> schedule
gtSchedule = PointSchedule{[PeerId]
psStartOrder :: [PeerId]
psStartOrder :: forall blk. PointSchedule blk -> [PeerId]
psStartOrder}
    } = GenesisTestFull blk
genesisTest

  StateViewTracers{Tracer m (TraceEvent blk)
svtTraceTracer :: Tracer m (TraceEvent blk)
svtTraceTracer :: forall blk (m :: * -> *).
StateViewTracers blk m -> Tracer m (TraceEvent blk)
svtTraceTracer} = StateViewTracers blk m
lnStateViewTracers

  -- FIXME: This type of configuration should move to `Trace.mkTracer`.
  tracer :: Tracer m (TraceEvent blk)
tracer =
    if SchedulerConfig -> Bool
scTrace SchedulerConfig
schedulerConfig
      then (TraceEvent blk -> m ()) -> Tracer m (TraceEvent blk)
forall (m :: * -> *) a. (a -> m ()) -> Tracer m a
Tracer (\TraceEvent blk
evt -> Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
lrTracer TraceEvent blk
evt m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
svtTraceTracer TraceEvent blk
evt)
      else Tracer m (TraceEvent blk)
svtTraceTracer

  chainSyncTimeouts_ :: ChainSyncTimeout
chainSyncTimeouts_ =
    if SchedulerConfig -> Bool
scEnableChainSyncTimeouts SchedulerConfig
schedulerConfig
      then ChainSyncTimeout
gtChainSyncTimeouts
      else ChainSyncTimeout
ChainSync.chainSyncNoTimeouts

  chainSyncLoPBucketConfig :: ChainSyncLoPBucketConfig
chainSyncLoPBucketConfig =
    if SchedulerConfig -> Bool
scEnableLoP SchedulerConfig
schedulerConfig
      then
        ChainSyncLoPBucketEnabledConfig -> ChainSyncLoPBucketConfig
ChainSyncLoPBucketEnabled
          ChainSyncLoPBucketEnabledConfig{csbcCapacity :: Integer
csbcCapacity = Integer
lbpCapacity, csbcRate :: Rational
csbcRate = Rational
lbpRate}
      else ChainSyncLoPBucketConfig
ChainSyncLoPBucketDisabled

  csjConfig :: CSJConfig
csjConfig =
    if SchedulerConfig -> Bool
scEnableCSJ SchedulerConfig
schedulerConfig
      then CSJEnabledConfig -> CSJConfig
CSJEnabled CSJEnabledConfig{csjcJumpSize :: SlotNo
csjcJumpSize = SlotNo
csjpJumpSize}
      else CSJConfig
CSJDisabled

  blockFetchTimeouts_ :: BlockFetchTimeout
blockFetchTimeouts_ =
    if SchedulerConfig -> Bool
scEnableBlockFetchTimeouts SchedulerConfig
schedulerConfig
      then BlockFetchTimeout
gtBlockFetchTimeouts
      else BlockFetchTimeout
BlockFetch.blockFetchNoTimeouts

-- | Set up all resources related to node start/shutdown.
nodeLifecycle ::
  ( IOLike m
  , MonadTime m
  , MonadTimer m
  , ShowProxy blk
  , ShowProxy (Header blk)
  , ConfigSupportsNode blk
  , LedgerSupportsProtocol blk
  , ChainDB.SerialiseDiskConstraints blk
  , BlockSupportsDiffusionPipelining blk
  , InspectLedger blk
  , HasHardForkHistory blk
  , ConvertRawHash blk
  , CanUpgradeLedgerTables (LedgerState blk)
  , HasPointScheduleTestParams blk
  , Eq (Header blk)
  ) =>
  ProtocolInfoArgs blk ->
  SchedulerConfig ->
  GenesisTestFull blk ->
  Tracer m (TraceEvent blk) ->
  ResourceRegistry m ->
  PeerSimulatorResources m blk ->
  m (NodeLifecycle blk m)
nodeLifecycle :: forall (m :: * -> *) blk.
(IOLike m, MonadTime m, MonadTimer m, ShowProxy blk,
 ShowProxy (Header blk), ConfigSupportsNode blk,
 LedgerSupportsProtocol blk, SerialiseDiskConstraints blk,
 BlockSupportsDiffusionPipelining blk, InspectLedger blk,
 HasHardForkHistory blk, ConvertRawHash blk,
 CanUpgradeLedgerTables (LedgerState blk),
 HasPointScheduleTestParams blk, Eq (Header blk)) =>
ProtocolInfoArgs blk
-> SchedulerConfig
-> GenesisTestFull blk
-> Tracer m (TraceEvent blk)
-> ResourceRegistry m
-> PeerSimulatorResources m blk
-> m (NodeLifecycle blk m)
nodeLifecycle ProtocolInfoArgs blk
protocolArgs SchedulerConfig
schedulerConfig GenesisTestFull blk
genesisTest Tracer m (TraceEvent blk)
lrTracer ResourceRegistry m
lrRegistry PeerSimulatorResources m blk
lrPeerSim = do
  lrCdb <- m (NodeDBs (StrictTMVar m MockFS))
forall (m :: * -> *).
MonadSTM m =>
m (NodeDBs (StrictTMVar m MockFS))
emptyNodeDBs
  lrLoEVar <- mkLoEVar schedulerConfig
  let
    protocolInfo = SecurityParam
-> ForecastRange
-> GenesisWindow
-> ProtocolInfoArgs blk
-> ProtocolInfo blk
forall blk.
HasPointScheduleTestParams blk =>
SecurityParam
-> ForecastRange
-> GenesisWindow
-> ProtocolInfoArgs blk
-> ProtocolInfo blk
mkProtocolInfo SecurityParam
k ForecastRange
gtForecastRange GenesisWindow
gtGenesisWindow ProtocolInfoArgs blk
protocolArgs
    topLevelConfig = ProtocolInfo blk -> TopLevelConfig blk
forall b. ProtocolInfo b -> TopLevelConfig b
pInfoConfig ProtocolInfo blk
protocolInfo
    resources =
      LiveResources
        { ResourceRegistry m
lrRegistry :: ResourceRegistry m
lrRegistry :: ResourceRegistry m
lrRegistry
        , Tracer m (TraceEvent blk)
lrTracer :: Tracer m (TraceEvent blk)
lrTracer :: Tracer m (TraceEvent blk)
lrTracer
        , lrSTracer :: ChainDB m blk -> m (Tracer m ())
lrSTracer = SchedulerConfig
-> GenesisTestFull blk
-> PeerSimulatorResources m blk
-> ChainDB m blk
-> m (Tracer m ())
forall (m :: * -> *) blk s.
(IOLike m, GetHeader blk, HasHeader blk, Eq (Header blk)) =>
SchedulerConfig
-> GenesisTest blk s
-> PeerSimulatorResources m blk
-> ChainDB m blk
-> m (Tracer m ())
mkStateTracer SchedulerConfig
schedulerConfig GenesisTestFull blk
genesisTest PeerSimulatorResources m blk
lrPeerSim
        , lrConfig :: TopLevelConfig blk
lrConfig = TopLevelConfig blk
topLevelConfig
        , lrInitLedger :: ExtLedgerState blk ValuesMK
lrInitLedger = ProtocolInfo blk -> ExtLedgerState blk ValuesMK
forall b. ProtocolInfo b -> ExtLedgerState b ValuesMK
pInfoInitLedger ProtocolInfo blk
protocolInfo
        , lrChunkInfo :: ChunkInfo
lrChunkInfo = TopLevelConfig blk -> ChunkInfo
forall blk.
HasPointScheduleTestParams blk =>
TopLevelConfig blk -> ChunkInfo
getChunkInfoFromTopLevelConfig TopLevelConfig blk
topLevelConfig
        , PeerSimulatorResources m blk
lrPeerSim :: PeerSimulatorResources m blk
lrPeerSim :: PeerSimulatorResources m blk
lrPeerSim
        , NodeDBs (StrictTMVar m MockFS)
lrCdb :: NodeDBs (StrictTMVar m MockFS)
lrCdb :: NodeDBs (StrictTMVar m MockFS)
lrCdb
        , LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
lrLoEVar
        }
  pure
    NodeLifecycle
      { nlMinDuration = scDowntime schedulerConfig
      , nlStart = lifecycleStart (startNode protocolInfo schedulerConfig genesisTest) resources
      , nlShutdown = lifecycleStop resources
      }
 where
  GenesisTest
    { gtSecurityParam :: forall blk schedule. GenesisTest blk schedule -> SecurityParam
gtSecurityParam = SecurityParam
k
    , ForecastRange
gtForecastRange :: ForecastRange
gtForecastRange :: forall blk schedule. GenesisTest blk schedule -> ForecastRange
gtForecastRange
    , GenesisWindow
gtGenesisWindow :: GenesisWindow
gtGenesisWindow :: forall blk schedule. GenesisTest blk schedule -> GenesisWindow
gtGenesisWindow
    } = GenesisTestFull blk
genesisTest

-- | Construct STM resources, set up ChainSync and BlockFetch threads, and
-- send all ticks in a 'PointSchedule' to all given peers in turn.
runPointSchedule ::
  forall m blk.
  ( IOLike m
  , MonadTime m
  , MonadTimer m
  , ShowProxy blk
  , ShowProxy (Header blk)
  , ConfigSupportsNode blk
  , LedgerSupportsProtocol blk
  , ChainDB.SerialiseDiskConstraints blk
  , BlockSupportsDiffusionPipelining blk
  , InspectLedger blk
  , HasHardForkHistory blk
  , ConvertRawHash blk
  , CanUpgradeLedgerTables (LedgerState blk)
  , HasPointScheduleTestParams blk
  , Eq (Header blk)
  , Eq blk
  ) =>
  ProtocolInfoArgs blk ->
  SchedulerConfig ->
  GenesisTestFull blk ->
  Tracer m (TraceEvent blk) ->
  m (StateView blk)
runPointSchedule :: forall (m :: * -> *) blk.
(IOLike m, MonadTime m, MonadTimer m, ShowProxy blk,
 ShowProxy (Header blk), ConfigSupportsNode blk,
 LedgerSupportsProtocol blk, SerialiseDiskConstraints blk,
 BlockSupportsDiffusionPipelining blk, InspectLedger blk,
 HasHardForkHistory blk, ConvertRawHash blk,
 CanUpgradeLedgerTables (LedgerState blk),
 HasPointScheduleTestParams blk, Eq (Header blk), Eq blk) =>
ProtocolInfoArgs blk
-> SchedulerConfig
-> GenesisTestFull blk
-> Tracer m (TraceEvent blk)
-> m (StateView blk)
runPointSchedule ProtocolInfoArgs blk
protocolInfoArgs SchedulerConfig
schedulerConfig GenesisTestFull blk
genesisTest Tracer m (TraceEvent blk)
tracer0 =
  (ResourceRegistry m -> m (StateView blk)) -> m (StateView blk)
forall (m :: * -> *) a.
(MonadSTM m, MonadMask m, MonadThread m, HasCallStack) =>
(ResourceRegistry m -> m a) -> m a
withRegistry ((ResourceRegistry m -> m (StateView blk)) -> m (StateView blk))
-> (ResourceRegistry m -> m (StateView blk)) -> m (StateView blk)
forall a b. (a -> b) -> a -> b
$ \ResourceRegistry m
registry -> do
    peerSim <-
      Tracer m (TraceEvent blk)
-> BlockTree blk
-> NonEmpty PeerId
-> m (PeerSimulatorResources m blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk, Eq blk) =>
Tracer m (TraceEvent blk)
-> BlockTree blk
-> NonEmpty PeerId
-> m (PeerSimulatorResources m blk)
makePeerSimulatorResources
        Tracer m (TraceEvent blk)
tracer
        BlockTree blk
gtBlockTree
        ([PeerId] -> NonEmpty PeerId
forall a. HasCallStack => [a] -> NonEmpty a
NonEmpty.fromList ([PeerId] -> NonEmpty PeerId) -> [PeerId] -> NonEmpty PeerId
forall a b. (a -> b) -> a -> b
$ Peers (PeerSchedule blk) -> [PeerId]
forall a. Peers a -> [PeerId]
getPeerIds (Peers (PeerSchedule blk) -> [PeerId])
-> Peers (PeerSchedule blk) -> [PeerId]
forall a b. (a -> b) -> a -> b
$ PointSchedule blk -> Peers (PeerSchedule blk)
forall blk. PointSchedule blk -> Peers (PeerSchedule blk)
psSchedule PointSchedule blk
gtSchedule)
    lifecycle <- nodeLifecycle protocolInfoArgs schedulerConfig genesisTest tracer registry peerSim
    (chainDb, stateViewTracers) <-
      runScheduler
        (Tracer $ traceWith tracer . TraceSchedulerEvent)
        (cschcMap (psrHandles peerSim))
        gtSchedule
        (psrPeers peerSim)
        lifecycle
    snapshotStateView stateViewTracers chainDb
 where
  GenesisTest
    { BlockTree blk
gtBlockTree :: forall blk schedule. GenesisTest blk schedule -> BlockTree blk
gtBlockTree :: BlockTree blk
gtBlockTree
    , PointSchedule blk
gtSchedule :: forall blk schedule. GenesisTest blk schedule -> schedule
gtSchedule :: PointSchedule blk
gtSchedule
    } = GenesisTestFull blk
genesisTest

  -- FIXME: This type of configuration should move to `Trace.mkTracer`.
  tracer :: Tracer m (TraceEvent blk)
tracer = if SchedulerConfig -> Bool
scTrace SchedulerConfig
schedulerConfig then Tracer m (TraceEvent blk)
tracer0 else Tracer m (TraceEvent blk)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer