{-# LANGUAGE NamedFieldPuns #-}
module Test.Consensus.PeerSimulator.Resources
( BlockFetchResources (..)
, ChainSyncResources (..)
, PeerResources (..)
, PeerSimulatorResources (..)
, SharedResources (..)
, makeChainSyncResources
, makePeerResources
, makePeerSimulatorResources
) where
import Control.Concurrent.Class.MonadSTM.Strict
( atomically
, dupTChan
, newBroadcastTChan
, readTChan
, writeTChan
)
import Control.Tracer (Tracer)
import Data.Foldable (toList)
import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Traversable (for)
import Ouroboros.Consensus.Block.Abstract
( GetHeader
, Header
, Point (..)
, WithOrigin (Origin)
)
import Ouroboros.Consensus.Ledger.SupportsProtocol
( LedgerSupportsProtocol
)
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
( ChainSyncClientHandleCollection
, newChainSyncClientHandleCollection
)
import Ouroboros.Consensus.Util.IOLike
( IOLike
, MonadSTM (STM)
, StrictTVar
, readTVar
, uncheckedNewTVarM
, writeTVar
)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (Tip (..))
import Ouroboros.Network.Protocol.BlockFetch.Server (BlockFetchServer)
import Ouroboros.Network.Protocol.ChainSync.Server
( ChainSyncServer (..)
)
import Test.Consensus.BlockTree (BlockTree)
import Test.Consensus.PeerSimulator.Handlers
import Test.Consensus.PeerSimulator.ScheduledBlockFetchServer
( BlockFetchServerHandlers (..)
, runScheduledBlockFetchServer
)
import Test.Consensus.PeerSimulator.ScheduledChainSyncServer
import Test.Consensus.PeerSimulator.Trace (TraceEvent)
import Test.Consensus.PointSchedule.NodeState
import Test.Consensus.PointSchedule.Peers (PeerId)
import Test.Util.Orphans.IOLike ()
data SharedResources m blk
= SharedResources
{ forall (m :: * -> *) blk. SharedResources m blk -> PeerId
srPeerId :: PeerId
, forall (m :: * -> *) blk. SharedResources m blk -> BlockTree blk
srBlockTree :: BlockTree blk
, forall (m :: * -> *) blk.
SharedResources m blk -> StrictTVar m (Maybe (NodeState blk))
srCurrentState :: StrictTVar m (Maybe (NodeState blk))
, forall (m :: * -> *) blk.
SharedResources m blk -> Tracer m (TraceEvent blk)
srTracer :: Tracer m (TraceEvent blk)
}
data ChainSyncResources m blk
= ChainSyncResources
{ forall (m :: * -> *) blk.
ChainSyncResources m blk -> StrictTVar m (Point blk)
csrCurrentIntersection :: StrictTVar m (Point blk)
, forall (m :: * -> *) blk.
ChainSyncResources m blk
-> ChainSyncServer (Header blk) (Point blk) (Tip blk) m ()
csrServer :: ChainSyncServer (Header blk) (Point blk) (Tip blk) m ()
, forall (m :: * -> *) blk. ChainSyncResources m blk -> STM m ()
csrTickStarted :: STM m ()
}
data BlockFetchResources m blk
= BlockFetchResources
{ forall (m :: * -> *) blk.
BlockFetchResources m blk -> BlockFetchServer blk (Point blk) m ()
bfrServer :: BlockFetchServer blk (Point blk) m ()
, forall (m :: * -> *) blk. BlockFetchResources m blk -> STM m ()
bfrTickStarted :: STM m ()
}
data PeerResources m blk
= PeerResources
{ forall (m :: * -> *) blk.
PeerResources m blk -> SharedResources m blk
prShared :: SharedResources m blk
, forall (m :: * -> *) blk.
PeerResources m blk -> ChainSyncResources m blk
prChainSync :: ChainSyncResources m blk
, forall (m :: * -> *) blk.
PeerResources m blk -> BlockFetchResources m blk
prBlockFetch :: BlockFetchResources m blk
, forall (m :: * -> *) blk.
PeerResources m blk -> NodeState blk -> STM m ()
prUpdateState :: NodeState blk -> STM m ()
}
data PeerSimulatorResources m blk
= PeerSimulatorResources
{ forall (m :: * -> *) blk.
PeerSimulatorResources m blk -> Map PeerId (PeerResources m blk)
psrPeers :: Map PeerId (PeerResources m blk)
, forall (m :: * -> *) blk.
PeerSimulatorResources m blk
-> ChainSyncClientHandleCollection PeerId m blk
psrHandles :: ChainSyncClientHandleCollection PeerId m blk
}
makeChainSyncServerHandlers ::
( IOLike m
, GetHeader blk
, AF.HasHeader blk
, Eq blk
) =>
StrictTVar m (Point blk) ->
BlockTree blk ->
ChainSyncServerHandlers m (NodeState blk) blk
makeChainSyncServerHandlers :: forall (m :: * -> *) blk.
(IOLike m, GetHeader blk, HasHeader blk, Eq blk) =>
StrictTVar m (Point blk)
-> BlockTree blk -> ChainSyncServerHandlers m (NodeState blk) blk
makeChainSyncServerHandlers StrictTVar m (Point blk)
currentIntersection BlockTree blk
blockTree =
ChainSyncServerHandlers
{ csshFindIntersection :: [Point blk]
-> NodeState blk
-> STM
m
(Maybe (FindIntersect blk),
[TraceScheduledChainSyncServerEvent (NodeState blk) blk])
csshFindIntersection = StrictTVar m (Point blk)
-> BlockTree blk
-> [Point blk]
-> NodeState blk
-> STM
m
(Maybe (FindIntersect blk),
[TraceScheduledChainSyncServerEvent (NodeState blk) blk])
forall (m :: * -> *) blk.
(IOLike m, HasHeader blk) =>
StrictTVar m (Point blk)
-> BlockTree blk
-> [Point blk]
-> NodeState blk
-> STM
m
(Maybe (FindIntersect blk),
[TraceScheduledChainSyncServerEvent (NodeState blk) blk])
handlerFindIntersection StrictTVar m (Point blk)
currentIntersection BlockTree blk
blockTree
, csshRequestNext :: NodeState blk
-> STM
m
(Maybe (RequestNext blk),
[TraceScheduledChainSyncServerEvent (NodeState blk) blk])
csshRequestNext = StrictTVar m (Point blk)
-> BlockTree blk
-> NodeState blk
-> STM
m
(Maybe (RequestNext blk),
[TraceScheduledChainSyncServerEvent (NodeState blk) blk])
forall (m :: * -> *) blk.
(IOLike m, HasHeader blk, GetHeader blk, Eq blk) =>
StrictTVar m (Point blk)
-> BlockTree blk
-> NodeState blk
-> STM
m
(Maybe (RequestNext blk),
[TraceScheduledChainSyncServerEvent (NodeState blk) blk])
handlerRequestNext StrictTVar m (Point blk)
currentIntersection BlockTree blk
blockTree
}
makeChainSyncResources ::
( IOLike m
, GetHeader blk
, AF.HasHeader blk
, Eq blk
) =>
STM m () ->
SharedResources m blk ->
m (ChainSyncResources m blk)
makeChainSyncResources :: forall (m :: * -> *) blk.
(IOLike m, GetHeader blk, HasHeader blk, Eq blk) =>
STM m () -> SharedResources m blk -> m (ChainSyncResources m blk)
makeChainSyncResources STM m ()
csrTickStarted SharedResources{PeerId
srPeerId :: forall (m :: * -> *) blk. SharedResources m blk -> PeerId
srPeerId :: PeerId
srPeerId, Tracer m (TraceEvent blk)
srTracer :: forall (m :: * -> *) blk.
SharedResources m blk -> Tracer m (TraceEvent blk)
srTracer :: Tracer m (TraceEvent blk)
srTracer, BlockTree blk
srBlockTree :: forall (m :: * -> *) blk. SharedResources m blk -> BlockTree blk
srBlockTree :: BlockTree blk
srBlockTree, StrictTVar m (Maybe (NodeState blk))
srCurrentState :: forall (m :: * -> *) blk.
SharedResources m blk -> StrictTVar m (Maybe (NodeState blk))
srCurrentState :: StrictTVar m (Maybe (NodeState blk))
srCurrentState} = do
csrCurrentIntersection <- Point blk -> m (StrictTVar m (Point blk))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
uncheckedNewTVarM (Point blk -> m (StrictTVar m (Point blk)))
-> Point blk -> m (StrictTVar m (Point blk))
forall a b. (a -> b) -> a -> b
$ WithOrigin (Block SlotNo (HeaderHash blk)) -> Point blk
forall {k} (block :: k).
WithOrigin (Block SlotNo (HeaderHash block)) -> Point block
AF.Point WithOrigin (Block SlotNo (HeaderHash blk))
forall t. WithOrigin t
Origin
let
handlers = StrictTVar m (Point blk)
-> BlockTree blk -> ChainSyncServerHandlers m (NodeState blk) blk
forall (m :: * -> *) blk.
(IOLike m, GetHeader blk, HasHeader blk, Eq blk) =>
StrictTVar m (Point blk)
-> BlockTree blk -> ChainSyncServerHandlers m (NodeState blk) blk
makeChainSyncServerHandlers StrictTVar m (Point blk)
csrCurrentIntersection BlockTree blk
srBlockTree
csrServer = PeerId
-> STM m ()
-> STM m (Maybe (NodeState blk))
-> Tracer m (TraceEvent blk)
-> ChainSyncServerHandlers m (NodeState blk) blk
-> ChainSyncServer (Header blk) (Point blk) (Tip blk) m ()
forall (m :: * -> *) blk.
IOLike m =>
PeerId
-> STM m ()
-> STM m (Maybe (NodeState blk))
-> Tracer m (TraceEvent blk)
-> ChainSyncServerHandlers m (NodeState blk) blk
-> ChainSyncServer (Header blk) (Point blk) (Tip blk) m ()
runScheduledChainSyncServer PeerId
srPeerId STM m ()
csrTickStarted (StrictTVar m (Maybe (NodeState blk))
-> STM m (Maybe (NodeState blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Maybe (NodeState blk))
srCurrentState) Tracer m (TraceEvent blk)
srTracer ChainSyncServerHandlers m (NodeState blk) blk
handlers
pure ChainSyncResources{csrTickStarted, csrServer, csrCurrentIntersection}
makeBlockFetchResources ::
( IOLike m
, AF.HasHeader blk
, Eq blk
) =>
STM m () ->
SharedResources m blk ->
BlockFetchResources m blk
makeBlockFetchResources :: forall (m :: * -> *) blk.
(IOLike m, HasHeader blk, Eq blk) =>
STM m () -> SharedResources m blk -> BlockFetchResources m blk
makeBlockFetchResources STM m ()
bfrTickStarted SharedResources{PeerId
srPeerId :: forall (m :: * -> *) blk. SharedResources m blk -> PeerId
srPeerId :: PeerId
srPeerId, Tracer m (TraceEvent blk)
srTracer :: forall (m :: * -> *) blk.
SharedResources m blk -> Tracer m (TraceEvent blk)
srTracer :: Tracer m (TraceEvent blk)
srTracer, BlockTree blk
srBlockTree :: forall (m :: * -> *) blk. SharedResources m blk -> BlockTree blk
srBlockTree :: BlockTree blk
srBlockTree, StrictTVar m (Maybe (NodeState blk))
srCurrentState :: forall (m :: * -> *) blk.
SharedResources m blk -> StrictTVar m (Maybe (NodeState blk))
srCurrentState :: StrictTVar m (Maybe (NodeState blk))
srCurrentState} =
BlockFetchResources
{ STM m ()
bfrTickStarted :: STM m ()
bfrTickStarted :: STM m ()
bfrTickStarted
, BlockFetchServer blk (Point blk) m ()
bfrServer :: BlockFetchServer blk (Point blk) m ()
bfrServer :: BlockFetchServer blk (Point blk) m ()
bfrServer
}
where
handlers :: BlockFetchServerHandlers m (NodeState blk) blk
handlers =
BlockFetchServerHandlers
{ bfshBlockFetch :: ChainRange (Point blk)
-> NodeState blk
-> STM
m
(Maybe (BlockFetch blk),
[TraceScheduledBlockFetchServerEvent (NodeState blk) blk])
bfshBlockFetch = BlockTree blk
-> ChainRange (Point blk)
-> NodeState blk
-> STM
m
(Maybe (BlockFetch blk),
[TraceScheduledBlockFetchServerEvent (NodeState blk) blk])
forall (m :: * -> *) blk.
(IOLike m, HasHeader blk) =>
BlockTree blk
-> ChainRange (Point blk)
-> NodeState blk
-> STM
m
(Maybe (BlockFetch blk),
[TraceScheduledBlockFetchServerEvent (NodeState blk) blk])
handlerBlockFetch BlockTree blk
srBlockTree
, bfshSendBlocks :: [blk]
-> NodeState blk
-> STM
m
(Maybe (SendBlocks blk),
[TraceScheduledBlockFetchServerEvent (NodeState blk) blk])
bfshSendBlocks = BlockTree blk
-> [blk]
-> NodeState blk
-> STM
m
(Maybe (SendBlocks blk),
[TraceScheduledBlockFetchServerEvent (NodeState blk) blk])
forall (m :: * -> *) blk.
(IOLike m, HasHeader blk, Eq blk) =>
BlockTree blk
-> [blk]
-> NodeState blk
-> STM
m
(Maybe (SendBlocks blk),
[TraceScheduledBlockFetchServerEvent (NodeState blk) blk])
handlerSendBlocks BlockTree blk
srBlockTree
}
bfrServer :: BlockFetchServer blk (Point blk) m ()
bfrServer =
PeerId
-> STM m ()
-> STM m (Maybe (NodeState blk))
-> Tracer m (TraceEvent blk)
-> BlockFetchServerHandlers m (NodeState blk) blk
-> BlockFetchServer blk (Point blk) m ()
forall (m :: * -> *) blk.
IOLike m =>
PeerId
-> STM m ()
-> STM m (Maybe (NodeState blk))
-> Tracer m (TraceEvent blk)
-> BlockFetchServerHandlers m (NodeState blk) blk
-> BlockFetchServer blk (Point blk) m ()
runScheduledBlockFetchServer
PeerId
srPeerId
STM m ()
bfrTickStarted
(StrictTVar m (Maybe (NodeState blk))
-> STM m (Maybe (NodeState blk))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Maybe (NodeState blk))
srCurrentState)
Tracer m (TraceEvent blk)
srTracer
BlockFetchServerHandlers m (NodeState blk) blk
handlers
updateState ::
IOLike m =>
StrictTVar m (Maybe (NodeState blk)) ->
m (NodeState blk -> STM m (), STM m (), STM m ())
updateState :: forall (m :: * -> *) blk.
IOLike m =>
StrictTVar m (Maybe (NodeState blk))
-> m (NodeState blk -> STM m (), STM m (), STM m ())
updateState StrictTVar m (Maybe (NodeState blk))
srCurrentState =
STM m (NodeState blk -> STM m (), STM m (), STM m ())
-> m (NodeState blk -> STM m (), STM m (), STM m ())
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (NodeState blk -> STM m (), STM m (), STM m ())
-> m (NodeState blk -> STM m (), STM m (), STM m ()))
-> STM m (NodeState blk -> STM m (), STM m (), STM m ())
-> m (NodeState blk -> STM m (), STM m (), STM m ())
forall a b. (a -> b) -> a -> b
$ do
publisher <- STM m (StrictTChan m ())
forall (m :: * -> *) a. MonadSTM m => STM m (StrictTChan m a)
newBroadcastTChan
consumer1 <- dupTChan publisher
consumer2 <- dupTChan publisher
let
newState NodeState blk
points = do
StrictTVar m (Maybe (NodeState blk))
-> Maybe (NodeState blk) -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (Maybe (NodeState blk))
srCurrentState (Maybe (NodeState blk) -> STM m ())
-> STM m (Maybe (NodeState blk)) -> STM m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< do
StrictTChan m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTChan m a -> a -> STM m ()
writeTChan StrictTChan m ()
publisher ()
Maybe (NodeState blk) -> STM m (Maybe (NodeState blk))
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NodeState blk -> Maybe (NodeState blk)
forall a. a -> Maybe a
Just NodeState blk
points)
pure (newState, readTChan consumer1, readTChan consumer2)
makePeerResources ::
( IOLike m
, AF.HasHeader blk
, GetHeader blk
, Eq blk
) =>
Tracer m (TraceEvent blk) ->
BlockTree blk ->
PeerId ->
m (PeerResources m blk)
makePeerResources :: forall (m :: * -> *) blk.
(IOLike m, HasHeader blk, GetHeader blk, Eq blk) =>
Tracer m (TraceEvent blk)
-> BlockTree blk -> PeerId -> m (PeerResources m blk)
makePeerResources Tracer m (TraceEvent blk)
srTracer BlockTree blk
srBlockTree PeerId
srPeerId = do
srCurrentState <- Maybe (NodeState blk) -> m (StrictTVar m (Maybe (NodeState blk)))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
uncheckedNewTVarM Maybe (NodeState blk)
forall a. Maybe a
Nothing
(prUpdateState, csrTickStarted, bfrTickStarted) <- updateState srCurrentState
let prShared = SharedResources{Tracer m (TraceEvent blk)
srTracer :: Tracer m (TraceEvent blk)
srTracer :: Tracer m (TraceEvent blk)
srTracer, BlockTree blk
srBlockTree :: BlockTree blk
srBlockTree :: BlockTree blk
srBlockTree, PeerId
srPeerId :: PeerId
srPeerId :: PeerId
srPeerId, StrictTVar m (Maybe (NodeState blk))
srCurrentState :: StrictTVar m (Maybe (NodeState blk))
srCurrentState :: StrictTVar m (Maybe (NodeState blk))
srCurrentState}
prBlockFetch = STM m () -> SharedResources m blk -> BlockFetchResources m blk
forall (m :: * -> *) blk.
(IOLike m, HasHeader blk, Eq blk) =>
STM m () -> SharedResources m blk -> BlockFetchResources m blk
makeBlockFetchResources STM m ()
bfrTickStarted SharedResources m blk
prShared
prChainSync <- makeChainSyncResources csrTickStarted prShared
pure PeerResources{prShared, prChainSync, prBlockFetch, prUpdateState}
makePeerSimulatorResources ::
( IOLike m
, LedgerSupportsProtocol blk
, Eq blk
) =>
Tracer m (TraceEvent blk) ->
BlockTree blk ->
NonEmpty PeerId ->
m (PeerSimulatorResources m blk)
makePeerSimulatorResources :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsProtocol blk, Eq blk) =>
Tracer m (TraceEvent blk)
-> BlockTree blk
-> NonEmpty PeerId
-> m (PeerSimulatorResources m blk)
makePeerSimulatorResources Tracer m (TraceEvent blk)
tracer BlockTree blk
blockTree NonEmpty PeerId
peers = do
resources <- NonEmpty PeerId
-> (PeerId -> m (PeerId, PeerResources m blk))
-> m (NonEmpty (PeerId, PeerResources m blk))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for NonEmpty PeerId
peers ((PeerId -> m (PeerId, PeerResources m blk))
-> m (NonEmpty (PeerId, PeerResources m blk)))
-> (PeerId -> m (PeerId, PeerResources m blk))
-> m (NonEmpty (PeerId, PeerResources m blk))
forall a b. (a -> b) -> a -> b
$ \PeerId
peerId -> do
peerResources <- Tracer m (TraceEvent blk)
-> BlockTree blk -> PeerId -> m (PeerResources m blk)
forall (m :: * -> *) blk.
(IOLike m, HasHeader blk, GetHeader blk, Eq blk) =>
Tracer m (TraceEvent blk)
-> BlockTree blk -> PeerId -> m (PeerResources m blk)
makePeerResources Tracer m (TraceEvent blk)
tracer BlockTree blk
blockTree PeerId
peerId
pure (peerId, peerResources)
psrHandles <- atomically newChainSyncClientHandleCollection
pure PeerSimulatorResources{psrPeers = Map.fromList $ toList resources, psrHandles}