{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

-- | Functions that call to the BlockFetch API to start clients and servers
module Test.Consensus.PeerSimulator.BlockFetch
  ( blockFetchNoTimeouts
  , runBlockFetchClient
  , runBlockFetchServer
  , startBlockFetchLogic
  , startKeepAliveThread
  ) where

import Control.Monad (void)
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer.SI (MonadTimer)
import Control.ResourceRegistry
import Control.Tracer (Tracer, nullTracer, traceWith)
import Data.Functor.Contravariant ((>$<))
import Network.TypedProtocol.Codec
  ( ActiveState
  , AnyMessage
  , StateToken
  , notActiveState
  )
import Ouroboros.Consensus.Block (HasHeader)
import Ouroboros.Consensus.Block.Abstract (Header, Point (..))
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.HeaderValidation (HeaderWithTime (..))
import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
  ( ChainSyncClientHandleCollection
  )
import Ouroboros.Consensus.Node.Genesis
  ( GenesisConfig (..)
  , enableGenesisConfigDefault
  )
import Ouroboros.Consensus.Node.ProtocolInfo
  ( NumCoreNodes (NumCoreNodes)
  )
import Ouroboros.Consensus.Storage.ChainDB.API
import Ouroboros.Consensus.Util (ShowProxy)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Network.BlockFetch
  ( BlockFetchConfiguration (..)
  , FetchClientRegistry
  , GenesisBlockFetchConfiguration (..)
  , blockFetchLogic
  , bracketFetchClient
  , bracketKeepAliveClient
  )
import Ouroboros.Network.BlockFetch.Client (blockFetchClient)
import Ouroboros.Network.BlockFetch.ConsensusInterface
  ( FetchMode (..)
  )
import Ouroboros.Network.Channel (Channel)
import Ouroboros.Network.ControlMessage (ControlMessageSTM)
import Ouroboros.Network.Driver (runPeer)
import Ouroboros.Network.Driver.Limits
  ( ProtocolLimitFailure (ExceededSizeLimit, ExceededTimeLimit)
  , runPipelinedPeerWithLimits
  )
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.BlockFetch.Codec
  ( byteLimitsBlockFetch
  , codecBlockFetchId
  )
import Ouroboros.Network.Protocol.BlockFetch.Server
  ( BlockFetchServer (..)
  , blockFetchServerPeer
  )
import Ouroboros.Network.Protocol.BlockFetch.Type
  ( BlockFetch (..)
  , SingBlockFetch (..)
  )
import Ouroboros.Network.Protocol.Limits
  ( ProtocolSizeLimits (..)
  , ProtocolTimeLimits (..)
  , waitForever
  )
import Test.Consensus.PeerSimulator.StateView
  ( PeerSimulatorComponentResult (..)
  , PeerSimulatorResult (..)
  , StateViewTracers (StateViewTracers, svtPeerSimulatorResultsTracer)
  )
import Test.Consensus.PeerSimulator.Trace
  ( TraceBlockFetchClientTerminationEvent (..)
  , TraceEvent (..)
  )
import Test.Consensus.PointSchedule (BlockFetchTimeout (..))
import Test.Consensus.PointSchedule.Peers (PeerId)
import Test.Util.Orphans.IOLike ()
import Test.Util.TestBlock (BlockConfig (TestBlockConfig), TestBlock)

startBlockFetchLogic ::
  forall m.
  (IOLike m, MonadTimer m) =>
  -- | Whether to enable chain selection starvation
  Bool ->
  ResourceRegistry m ->
  Tracer m (TraceEvent TestBlock) ->
  ChainDB m TestBlock ->
  FetchClientRegistry PeerId (HeaderWithTime TestBlock) TestBlock m ->
  ChainSyncClientHandleCollection PeerId m TestBlock ->
  m ()
startBlockFetchLogic :: forall (m :: * -> *).
(IOLike m, MonadTimer m) =>
Bool
-> ResourceRegistry m
-> Tracer m (TraceEvent TestBlock)
-> ChainDB m TestBlock
-> FetchClientRegistry
     PeerId (HeaderWithTime TestBlock) TestBlock m
-> ChainSyncClientHandleCollection PeerId m TestBlock
-> m ()
startBlockFetchLogic Bool
enableChainSelStarvation ResourceRegistry m
registry Tracer m (TraceEvent TestBlock)
tracer ChainDB m TestBlock
chainDb FetchClientRegistry PeerId (HeaderWithTime TestBlock) TestBlock m
fetchClientRegistry ChainSyncClientHandleCollection PeerId m TestBlock
csHandlesCol = do
  let blockFetchConsensusInterface :: BlockFetchConsensusInterface
  PeerId (HeaderWithTime TestBlock) TestBlock m
blockFetchConsensusInterface =
        Tracer m (TraceEventDbf PeerId)
-> BlockConfig TestBlock
-> ChainDbView m TestBlock
-> ChainSyncClientHandleCollection PeerId m TestBlock
-> (Header TestBlock -> SizeInBytes)
-> STM m FetchMode
-> DiffusionPipeliningSupport
-> BlockFetchConsensusInterface
     PeerId (HeaderWithTime TestBlock) TestBlock m
forall (m :: * -> *) peer blk.
(IOLike m, BlockSupportsDiffusionPipelining blk, Ord peer,
 LedgerSupportsProtocol blk, ConfigSupportsNode blk) =>
Tracer m (TraceEventDbf peer)
-> BlockConfig blk
-> ChainDbView m blk
-> ChainSyncClientHandleCollection peer m blk
-> (Header blk -> SizeInBytes)
-> STM m FetchMode
-> DiffusionPipeliningSupport
-> BlockFetchConsensusInterface peer (HeaderWithTime blk) blk m
BlockFetchClientInterface.mkBlockFetchConsensusInterface
          Tracer m (TraceEventDbf PeerId)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer -- FIXME
          (NumCoreNodes -> BlockConfig TestBlock
forall ptype. NumCoreNodes -> BlockConfig (TestBlockWith ptype)
TestBlockConfig (NumCoreNodes -> BlockConfig TestBlock)
-> NumCoreNodes -> BlockConfig TestBlock
forall a b. (a -> b) -> a -> b
$ Word64 -> NumCoreNodes
NumCoreNodes Word64
0) -- Only needed when minting blocks
          (ChainDB m TestBlock -> ChainDbView m TestBlock
forall (m :: * -> *) blk. ChainDB m blk -> ChainDbView m blk
BlockFetchClientInterface.defaultChainDbView ChainDB m TestBlock
chainDb)
          ChainSyncClientHandleCollection PeerId m TestBlock
csHandlesCol
          -- The size of headers in bytes is irrelevant because our tests
          -- do not serialize the blocks.
          (\Header TestBlock
_hdr -> SizeInBytes
1000)
          -- This is a syncing test, so we use 'FetchModeGenesis'.
          (FetchMode -> STM m FetchMode
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure FetchMode
FetchModeGenesis)
          DiffusionPipeliningSupport
DiffusionPipeliningOn

      bfcGenesisBFConfig :: GenesisBlockFetchConfiguration
bfcGenesisBFConfig =
        if Bool
enableChainSelStarvation
          then
            GenesisBlockFetchConfiguration
              { gbfcGracePeriod :: DiffTime
gbfcGracePeriod =
                  if Bool
enableChainSelStarvation
                    then
                      DiffTime
10 -- default value for cardano-node at the time of writing
                    else
                      DiffTime
1000000 -- (more than 11 days)
              }
          else GenesisConfig -> GenesisBlockFetchConfiguration
gcBlockFetchConfig GenesisConfig
enableGenesisConfigDefault

      -- Values taken from
      -- ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs
      blockFetchCfg :: BlockFetchConfiguration
blockFetchCfg =
        BlockFetchConfiguration
          { bfcMaxConcurrencyBulkSync :: Word
bfcMaxConcurrencyBulkSync = Word
50
          , bfcMaxConcurrencyDeadline :: Word
bfcMaxConcurrencyDeadline = Word
50 -- unused because of @pure FetchModeBulkSync@ above
          , bfcMaxRequestsInflight :: Word
bfcMaxRequestsInflight = Word
10
          , bfcDecisionLoopIntervalPraos :: DiffTime
bfcDecisionLoopIntervalPraos = DiffTime
0
          , bfcDecisionLoopIntervalGenesis :: DiffTime
bfcDecisionLoopIntervalGenesis = DiffTime
0
          , bfcSalt :: Int
bfcSalt = Int
0
          , GenesisBlockFetchConfiguration
bfcGenesisBFConfig :: GenesisBlockFetchConfiguration
bfcGenesisBFConfig :: GenesisBlockFetchConfiguration
bfcGenesisBFConfig
          }

  m (Thread m Void) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Void) -> m ()) -> m (Thread m Void) -> m ()
forall a b. (a -> b) -> a -> b
$
    ResourceRegistry m -> String -> m Void -> m (Thread m Void)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
registry String
"BlockFetchLogic" (m Void -> m (Thread m Void)) -> m Void -> m (Thread m Void)
forall a b. (a -> b) -> a -> b
$
      Tracer m (TraceDecisionEvent PeerId (HeaderWithTime TestBlock))
-> Tracer
     m
     (TraceLabelPeer
        PeerId (TraceFetchClientState (HeaderWithTime TestBlock)))
-> BlockFetchConsensusInterface
     PeerId (HeaderWithTime TestBlock) TestBlock m
-> FetchClientRegistry
     PeerId (HeaderWithTime TestBlock) TestBlock m
-> BlockFetchConfiguration
-> m Void
forall addr header block (m :: * -> *).
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block, MonadDelay m, MonadTimer m,
 Ord addr, Hashable addr) =>
Tracer m (TraceDecisionEvent addr header)
-> Tracer m (TraceLabelPeer addr (TraceFetchClientState header))
-> BlockFetchConsensusInterface addr header block m
-> FetchClientRegistry addr header block m
-> BlockFetchConfiguration
-> m Void
blockFetchLogic
        Tracer m (TraceDecisionEvent PeerId (HeaderWithTime TestBlock))
decisionTracer
        Tracer
  m
  (TraceLabelPeer
     PeerId (TraceFetchClientState (HeaderWithTime TestBlock)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
        BlockFetchConsensusInterface
  PeerId (HeaderWithTime TestBlock) TestBlock m
blockFetchConsensusInterface
        FetchClientRegistry PeerId (HeaderWithTime TestBlock) TestBlock m
fetchClientRegistry
        BlockFetchConfiguration
blockFetchCfg
 where
  decisionTracer :: Tracer m (TraceDecisionEvent PeerId (HeaderWithTime TestBlock))
decisionTracer = String -> TraceEvent TestBlock
forall blk. String -> TraceEvent blk
TraceOther (String -> TraceEvent TestBlock)
-> (TraceDecisionEvent PeerId (HeaderWithTime TestBlock) -> String)
-> TraceDecisionEvent PeerId (HeaderWithTime TestBlock)
-> TraceEvent TestBlock
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String
"BlockFetchLogic | " String -> String -> String
forall a. [a] -> [a] -> [a]
++) (String -> String)
-> (TraceDecisionEvent PeerId (HeaderWithTime TestBlock) -> String)
-> TraceDecisionEvent PeerId (HeaderWithTime TestBlock)
-> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TraceDecisionEvent PeerId (HeaderWithTime TestBlock) -> String
forall a. Show a => a -> String
show (TraceDecisionEvent PeerId (HeaderWithTime TestBlock)
 -> TraceEvent TestBlock)
-> Tracer m (TraceEvent TestBlock)
-> Tracer m (TraceDecisionEvent PeerId (HeaderWithTime TestBlock))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer m (TraceEvent TestBlock)
tracer

startKeepAliveThread ::
  forall m peer blk hdr.
  (Ord peer, IOLike m) =>
  ResourceRegistry m ->
  FetchClientRegistry peer hdr blk m ->
  peer ->
  m ()
startKeepAliveThread :: forall (m :: * -> *) peer blk hdr.
(Ord peer, IOLike m) =>
ResourceRegistry m
-> FetchClientRegistry peer hdr blk m -> peer -> m ()
startKeepAliveThread ResourceRegistry m
registry FetchClientRegistry peer hdr blk m
fetchClientRegistry peer
peerId =
  m (Thread m Any) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Any) -> m ()) -> m (Thread m Any) -> m ()
forall a b. (a -> b) -> a -> b
$
    ResourceRegistry m -> String -> m Any -> m (Thread m Any)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
registry String
"KeepAlive" (m Any -> m (Thread m Any)) -> m Any -> m (Thread m Any)
forall a b. (a -> b) -> a -> b
$
      FetchClientRegistry peer hdr blk m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m Any) -> m Any
forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient FetchClientRegistry peer hdr blk m
fetchClientRegistry peer
peerId ((StrictTVar m (Map peer PeerGSV) -> m Any) -> m Any)
-> (StrictTVar m (Map peer PeerGSV) -> m Any) -> m Any
forall a b. (a -> b) -> a -> b
$ \StrictTVar m (Map peer PeerGSV)
_ ->
        STM m Any -> m Any
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m Any
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry

runBlockFetchClient ::
  (IOLike m, MonadTime m, MonadTimer m, HasHeader blk, HasHeader (Header blk), ShowProxy blk) =>
  Tracer m (TraceEvent blk) ->
  PeerId ->
  BlockFetchTimeout ->
  StateViewTracers blk m ->
  FetchClientRegistry PeerId (HeaderWithTime blk) blk m ->
  ControlMessageSTM m ->
  -- | Send and receive message via the given 'Channel'.
  Channel m (AnyMessage (BlockFetch blk (Point blk))) ->
  m ()
runBlockFetchClient :: forall (m :: * -> *) blk.
(IOLike m, MonadTime m, MonadTimer m, HasHeader blk,
 HasHeader (Header blk), ShowProxy blk) =>
Tracer m (TraceEvent blk)
-> PeerId
-> BlockFetchTimeout
-> StateViewTracers blk m
-> FetchClientRegistry PeerId (HeaderWithTime blk) blk m
-> ControlMessageSTM m
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> m ()
runBlockFetchClient Tracer m (TraceEvent blk)
tracer PeerId
peerId BlockFetchTimeout
blockFetchTimeouts StateViewTracers{Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: forall blk (m :: * -> *).
StateViewTracers blk m -> Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer} FetchClientRegistry PeerId (HeaderWithTime blk) blk m
fetchClientRegistry ControlMessageSTM m
controlMsgSTM Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel = do
  FetchClientRegistry PeerId (HeaderWithTime blk) blk m
-> NodeToNodeVersion
-> PeerId
-> (FetchClientContext (HeaderWithTime blk) blk m -> m ())
-> m ()
forall (m :: * -> *) a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer) =>
FetchClientRegistry peer header block m
-> version
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient FetchClientRegistry PeerId (HeaderWithTime blk) blk m
fetchClientRegistry NodeToNodeVersion
ntnVersion PeerId
peerId ((FetchClientContext (HeaderWithTime blk) blk m -> m ()) -> m ())
-> (FetchClientContext (HeaderWithTime blk) blk m -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \FetchClientContext (HeaderWithTime blk) blk m
clientCtx -> do
    res <-
      m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall e a. Exception e => m a -> m (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> m (Either
         SomeException
         ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))))
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall a b. (a -> b) -> a -> b
$
        Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
-> Codec
     (BlockFetch blk (Point blk))
     CodecFailure
     m
     (AnyMessage (BlockFetch blk (Point blk)))
-> ProtocolSizeLimits
     (BlockFetch blk (Point blk))
     (AnyMessage (BlockFetch blk (Point blk)))
-> ProtocolTimeLimits (BlockFetch blk (Point blk))
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> PeerPipelined
     (BlockFetch blk (Point blk)) 'AsClient 'BFIdle m ()
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
 MonadThrow (STM m), ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits
          Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
          Codec
  (BlockFetch blk (Point blk))
  CodecFailure
  m
  (AnyMessage (BlockFetch blk (Point blk)))
forall block point (m :: * -> *).
Monad m =>
Codec
  (BlockFetch block point)
  CodecFailure
  m
  (AnyMessage (BlockFetch block point))
codecBlockFetchId
          ProtocolSizeLimits
  (BlockFetch blk (Point blk))
  (AnyMessage (BlockFetch blk (Point blk)))
forall block point bytes.
ProtocolSizeLimits (BlockFetch block point) bytes
blockFetchNoSizeLimits
          (BlockFetchTimeout
-> ProtocolTimeLimits (BlockFetch blk (Point blk))
forall block point.
BlockFetchTimeout -> ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch BlockFetchTimeout
blockFetchTimeouts)
          Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel
          (NodeToNodeVersion
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext (HeaderWithTime blk) blk m
-> PeerPipelined
     (BlockFetch blk (Point blk)) 'AsClient 'BFIdle m ()
forall header block versionNumber (m :: * -> *).
(MonadSTM m, MonadThrow m, MonadTime m, MonadMonotonicTime m,
 HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block) =>
versionNumber
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext header block m
-> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m ()
blockFetchClient NodeToNodeVersion
ntnVersion ControlMessageSTM m
controlMsgSTM FetchedMetricsTracer m
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer FetchClientContext (HeaderWithTime blk) blk m
clientCtx)
    case res of
      Right ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes) ->
        Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
          PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$
            Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchClientResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$
              Maybe (AnyMessage (BlockFetch blk (Point blk)))
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. b -> Either a b
Right Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes
      Left SomeException
exn -> do
        Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
          PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$
            Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchClientResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$
              SomeException
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. a -> Either a b
Left SomeException
exn
        case SomeException -> Maybe ProtocolLimitFailure
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exn of
          Just (ExceededSizeLimit StateToken st
_) ->
            Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
tracer (TraceEvent blk -> m ()) -> TraceEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
forall blk.
PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
TraceBlockFetchClientTerminationEvent PeerId
peerId TraceBlockFetchClientTerminationEvent
TraceExceededSizeLimitBF
          Just (ExceededTimeLimit StateToken st
_) ->
            Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
tracer (TraceEvent blk -> m ()) -> TraceEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
forall blk.
PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
TraceBlockFetchClientTerminationEvent PeerId
peerId TraceBlockFetchClientTerminationEvent
TraceExceededTimeLimitBF
          Maybe ProtocolLimitFailure
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
 where
  ntnVersion :: NodeToNodeVersion
  ntnVersion :: NodeToNodeVersion
ntnVersion = NodeToNodeVersion
forall a. Bounded a => a
maxBound

blockFetchNoSizeLimits :: ProtocolSizeLimits (BlockFetch block point) bytes
blockFetchNoSizeLimits :: forall block point bytes.
ProtocolSizeLimits (BlockFetch block point) bytes
blockFetchNoSizeLimits = (bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
forall bytes block point.
(bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
byteLimitsBlockFetch (Word -> bytes -> Word
forall a b. a -> b -> a
const Word
0)

-- | Same as 'timeLimitsChainSync' for BlockFetch. NOTE: There exists a
-- @timeLimitsBlockFetch@ in 'Ouroboros.Network.Protocol.BlockFetch.Codec' but
-- it does not allow customising the values as 'timeLimitsChainSync' does.
-- REVIEW: Should this be upstreamed to `ouroboros-network-protocols`?
timeLimitsBlockFetch ::
  forall block point. BlockFetchTimeout -> ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch :: forall block point.
BlockFetchTimeout -> ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch BlockFetchTimeout{Maybe DiffTime
busyTimeout :: Maybe DiffTime
busyTimeout :: BlockFetchTimeout -> Maybe DiffTime
busyTimeout, Maybe DiffTime
streamingTimeout :: Maybe DiffTime
streamingTimeout :: BlockFetchTimeout -> Maybe DiffTime
streamingTimeout} =
  (forall (st :: BlockFetch block point).
 ActiveState st =>
 StateToken st -> Maybe DiffTime)
-> ProtocolTimeLimits (BlockFetch block point)
forall ps.
(forall (st :: ps).
 ActiveState st =>
 StateToken st -> Maybe DiffTime)
-> ProtocolTimeLimits ps
ProtocolTimeLimits StateToken st -> Maybe DiffTime
forall (st :: BlockFetch block point).
ActiveState st =>
StateToken st -> Maybe DiffTime
stateToLimit
 where
  stateToLimit ::
    forall (st :: BlockFetch block point).
    ActiveState st => StateToken st -> Maybe DiffTime
  stateToLimit :: forall (st :: BlockFetch block point).
ActiveState st =>
StateToken st -> Maybe DiffTime
stateToLimit SingBlockFetch st
StateToken st
SingBFIdle = Maybe DiffTime
waitForever
  stateToLimit SingBlockFetch st
StateToken st
SingBFBusy = Maybe DiffTime
busyTimeout
  stateToLimit SingBlockFetch st
StateToken st
SingBFStreaming = Maybe DiffTime
streamingTimeout
  stateToLimit a :: StateToken st
a@SingBlockFetch st
StateToken st
SingBFDone = StateToken 'BFDone -> forall a. a
forall ps (st :: ps).
(StateAgency st ~ 'NobodyAgency, ActiveState st) =>
StateToken st -> forall a. a
notActiveState StateToken st
StateToken 'BFDone
a

blockFetchNoTimeouts :: BlockFetchTimeout
blockFetchNoTimeouts :: BlockFetchTimeout
blockFetchNoTimeouts =
  BlockFetchTimeout
    { busyTimeout :: Maybe DiffTime
busyTimeout = Maybe DiffTime
forall a. Maybe a
Nothing
    , streamingTimeout :: Maybe DiffTime
streamingTimeout = Maybe DiffTime
forall a. Maybe a
Nothing
    }

runBlockFetchServer ::
  (IOLike m, ShowProxy blk) =>
  Tracer m (TraceEvent blk) ->
  PeerId ->
  StateViewTracers blk m ->
  BlockFetchServer blk (Point blk) m () ->
  -- | Send and receive message via the given 'Channel'.
  Channel m (AnyMessage (BlockFetch blk (Point blk))) ->
  m ()
runBlockFetchServer :: forall (m :: * -> *) blk.
(IOLike m, ShowProxy blk) =>
Tracer m (TraceEvent blk)
-> PeerId
-> StateViewTracers blk m
-> BlockFetchServer blk (Point blk) m ()
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> m ()
runBlockFetchServer Tracer m (TraceEvent blk)
_tracer PeerId
peerId StateViewTracers{Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: forall blk (m :: * -> *).
StateViewTracers blk m -> Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer} BlockFetchServer blk (Point blk) m ()
server Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel = do
  res <- m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall e a. Exception e => m a -> m (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> m (Either
         SomeException
         ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))))
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
-> Codec
     (BlockFetch blk (Point blk))
     CodecFailure
     m
     (AnyMessage (BlockFetch blk (Point blk)))
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> Peer
     (BlockFetch blk (Point blk)) 'AsServer 'NonPipelined 'BFIdle m ()
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadThrow m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeer Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Codec
  (BlockFetch blk (Point blk))
  CodecFailure
  m
  (AnyMessage (BlockFetch blk (Point blk)))
forall block point (m :: * -> *).
Monad m =>
Codec
  (BlockFetch block point)
  CodecFailure
  m
  (AnyMessage (BlockFetch block point))
codecBlockFetchId Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel (BlockFetchServer blk (Point blk) m ()
-> Peer
     (BlockFetch blk (Point blk)) 'AsServer 'NonPipelined 'BFIdle m ()
forall block point (m :: * -> *) a.
Functor m =>
BlockFetchServer block point m a
-> Server (BlockFetch block point) 'NonPipelined 'BFIdle m a
blockFetchServerPeer BlockFetchServer blk (Point blk) m ()
server)
  case res of
    Right ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes) ->
      Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
        PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$
          Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchServerResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$
            Maybe (AnyMessage (BlockFetch blk (Point blk)))
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. b -> Either a b
Right Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes
    Left SomeException
exn -> do
      Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
        PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$
          Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchServerResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$
            SomeException
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. a -> Either a b
Left SomeException
exn
      -- NOTE: here we are able to trace exceptions, as what is done in `runChainSyncClient`
      case SomeException -> Maybe SomeException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exn of
        (Maybe SomeException
_ :: Maybe SomeException) -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()