{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

-- | Functions that call to the BlockFetch API to start clients and servers
module Test.Consensus.PeerSimulator.BlockFetch (
    blockFetchNoTimeouts
  , runBlockFetchClient
  , runBlockFetchServer
  , startBlockFetchLogic
  , startKeepAliveThread
  ) where

import           Control.Monad (void)
import           Control.Monad.Class.MonadTime
import           Control.Monad.Class.MonadTimer.SI (MonadTimer)
import           Control.ResourceRegistry
import           Control.Tracer (Tracer, nullTracer, traceWith)
import           Data.Functor.Contravariant ((>$<))
import           Network.TypedProtocol.Codec (ActiveState, AnyMessage,
                     StateToken, notActiveState)
import           Ouroboros.Consensus.Block (HasHeader)
import           Ouroboros.Consensus.Block.Abstract (Header, Point (..))
import           Ouroboros.Consensus.Config
import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface
import           Ouroboros.Consensus.MiniProtocol.ChainSync.Client
                     (ChainSyncClientHandleCollection)
import           Ouroboros.Consensus.Node.Genesis (GenesisConfig (..),
                     enableGenesisConfigDefault)
import           Ouroboros.Consensus.Node.ProtocolInfo
                     (NumCoreNodes (NumCoreNodes))
import           Ouroboros.Consensus.Storage.ChainDB.API
import           Ouroboros.Consensus.Util (ShowProxy)
import           Ouroboros.Consensus.Util.IOLike
import           Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..),
                     FetchClientRegistry, GenesisBlockFetchConfiguration (..),
                     blockFetchLogic, bracketFetchClient,
                     bracketKeepAliveClient)
import           Ouroboros.Network.BlockFetch.Client (blockFetchClient)
import           Ouroboros.Network.BlockFetch.ConsensusInterface
                     (FetchMode (..))
import           Ouroboros.Network.Channel (Channel)
import           Ouroboros.Network.ControlMessage (ControlMessageSTM)
import           Ouroboros.Network.Driver (runPeer)
import           Ouroboros.Network.Driver.Limits
                     (ProtocolLimitFailure (ExceededSizeLimit, ExceededTimeLimit),
                     runPipelinedPeerWithLimits)
import           Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import           Ouroboros.Network.Protocol.BlockFetch.Codec
                     (byteLimitsBlockFetch, codecBlockFetchId)
import           Ouroboros.Network.Protocol.BlockFetch.Server
                     (BlockFetchServer (..), blockFetchServerPeer)
import           Ouroboros.Network.Protocol.BlockFetch.Type (BlockFetch (..),
                     SingBlockFetch (..))
import           Ouroboros.Network.Protocol.Limits (ProtocolSizeLimits (..),
                     ProtocolTimeLimits (..), waitForever)
import           Test.Consensus.PeerSimulator.StateView
                     (PeerSimulatorComponentResult (..),
                     PeerSimulatorResult (..),
                     StateViewTracers (StateViewTracers, svtPeerSimulatorResultsTracer))
import           Test.Consensus.PeerSimulator.Trace
                     (TraceBlockFetchClientTerminationEvent (..),
                     TraceEvent (..))
import           Test.Consensus.PointSchedule (BlockFetchTimeout (..))
import           Test.Consensus.PointSchedule.Peers (PeerId)
import           Test.Util.Orphans.IOLike ()
import           Test.Util.TestBlock (BlockConfig (TestBlockConfig), TestBlock)
import           Test.Util.Time (dawnOfTime)

startBlockFetchLogic ::
     forall m.
     (IOLike m, MonadTimer m)
  => Bool -- ^ Whether to enable chain selection starvation
  -> ResourceRegistry m
  -> Tracer m (TraceEvent TestBlock)
  -> ChainDB m TestBlock
  -> FetchClientRegistry PeerId (Header TestBlock) TestBlock m
  -> ChainSyncClientHandleCollection PeerId m TestBlock
  -> m ()
startBlockFetchLogic :: forall (m :: * -> *).
(IOLike m, MonadTimer m) =>
Bool
-> ResourceRegistry m
-> Tracer m (TraceEvent TestBlock)
-> ChainDB m TestBlock
-> FetchClientRegistry PeerId (Header TestBlock) TestBlock m
-> ChainSyncClientHandleCollection PeerId m TestBlock
-> m ()
startBlockFetchLogic Bool
enableChainSelStarvation ResourceRegistry m
registry Tracer m (TraceEvent TestBlock)
tracer ChainDB m TestBlock
chainDb FetchClientRegistry PeerId (Header TestBlock) TestBlock m
fetchClientRegistry ChainSyncClientHandleCollection PeerId m TestBlock
csHandlesCol = do
    let slotForgeTime :: BlockFetchClientInterface.SlotForgeTimeOracle m blk
        slotForgeTime :: forall blk. SlotForgeTimeOracle m blk
slotForgeTime RealPoint blk
_ = UTCTime -> STM m UTCTime
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure UTCTime
dawnOfTime

        blockFetchConsensusInterface :: BlockFetchConsensusInterface PeerId (Header TestBlock) TestBlock m
blockFetchConsensusInterface =
          Tracer m (TraceEvent PeerId)
-> BlockConfig TestBlock
-> ChainDbView m TestBlock
-> ChainSyncClientHandleCollection PeerId m TestBlock
-> (Header TestBlock -> SizeInBytes)
-> SlotForgeTimeOracle m TestBlock
-> STM m FetchMode
-> DiffusionPipeliningSupport
-> BlockFetchConsensusInterface
     PeerId (Header TestBlock) TestBlock m
forall (m :: * -> *) peer blk.
(IOLike m, BlockSupportsDiffusionPipelining blk, Ord peer,
 LedgerSupportsProtocol blk) =>
Tracer m (TraceEvent peer)
-> BlockConfig blk
-> ChainDbView m blk
-> ChainSyncClientHandleCollection peer m blk
-> (Header blk -> SizeInBytes)
-> SlotForgeTimeOracle m blk
-> STM m FetchMode
-> DiffusionPipeliningSupport
-> BlockFetchConsensusInterface peer (Header blk) blk m
BlockFetchClientInterface.mkBlockFetchConsensusInterface
            Tracer m (TraceEvent PeerId)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer -- FIXME
            (NumCoreNodes -> BlockConfig TestBlock
forall ptype. NumCoreNodes -> BlockConfig (TestBlockWith ptype)
TestBlockConfig (NumCoreNodes -> BlockConfig TestBlock)
-> NumCoreNodes -> BlockConfig TestBlock
forall a b. (a -> b) -> a -> b
$ Word64 -> NumCoreNodes
NumCoreNodes Word64
0) -- Only needed when minting blocks
            (ChainDB m TestBlock -> ChainDbView m TestBlock
forall (m :: * -> *) blk. ChainDB m blk -> ChainDbView m blk
BlockFetchClientInterface.defaultChainDbView ChainDB m TestBlock
chainDb)
            ChainSyncClientHandleCollection PeerId m TestBlock
csHandlesCol
            -- The size of headers in bytes is irrelevant because our tests
            -- do not serialize the blocks.
            (\Header TestBlock
_hdr -> SizeInBytes
1000)
            SlotForgeTimeOracle m TestBlock
forall blk. SlotForgeTimeOracle m blk
slotForgeTime
            -- This is a syncing test, so we use 'FetchModeGenesis'.
            (FetchMode -> STM m FetchMode
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure FetchMode
FetchModeGenesis)
            DiffusionPipeliningSupport
DiffusionPipeliningOn

        bfcGenesisBFConfig :: GenesisBlockFetchConfiguration
bfcGenesisBFConfig = if Bool
enableChainSelStarvation
          then GenesisBlockFetchConfiguration
            { gbfcGracePeriod :: DiffTime
gbfcGracePeriod =
                if Bool
enableChainSelStarvation then
                  DiffTime
10  -- default value for cardano-node at the time of writing
                else
                  DiffTime
1000000  -- (more than 11 days)
            }
          else GenesisConfig -> GenesisBlockFetchConfiguration
gcBlockFetchConfig GenesisConfig
enableGenesisConfigDefault

        -- Values taken from
        -- ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs
        blockFetchCfg :: BlockFetchConfiguration
blockFetchCfg = BlockFetchConfiguration
          { bfcMaxConcurrencyBulkSync :: Word
bfcMaxConcurrencyBulkSync = Word
50
          , bfcMaxConcurrencyDeadline :: Word
bfcMaxConcurrencyDeadline = Word
50 -- unused because of @pure FetchModeBulkSync@ above
          , bfcMaxRequestsInflight :: Word
bfcMaxRequestsInflight = Word
10
          , bfcDecisionLoopIntervalPraos :: DiffTime
bfcDecisionLoopIntervalPraos = DiffTime
0
          , bfcDecisionLoopIntervalGenesis :: DiffTime
bfcDecisionLoopIntervalGenesis = DiffTime
0
          , bfcSalt :: Int
bfcSalt = Int
0
          , GenesisBlockFetchConfiguration
bfcGenesisBFConfig :: GenesisBlockFetchConfiguration
bfcGenesisBFConfig :: GenesisBlockFetchConfiguration
bfcGenesisBFConfig
          }

    m (Thread m Void) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Void) -> m ()) -> m (Thread m Void) -> m ()
forall a b. (a -> b) -> a -> b
$ ResourceRegistry m -> String -> m Void -> m (Thread m Void)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
registry String
"BlockFetchLogic" (m Void -> m (Thread m Void)) -> m Void -> m (Thread m Void)
forall a b. (a -> b) -> a -> b
$
      Tracer m (TraceDecisionEvent PeerId (Header TestBlock))
-> Tracer
     m
     (TraceLabelPeer PeerId (TraceFetchClientState (Header TestBlock)))
-> BlockFetchConsensusInterface
     PeerId (Header TestBlock) TestBlock m
-> FetchClientRegistry PeerId (Header TestBlock) TestBlock m
-> BlockFetchConfiguration
-> m Void
forall addr header block (m :: * -> *).
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block, MonadDelay m, MonadTimer m,
 Ord addr, Hashable addr) =>
Tracer m (TraceDecisionEvent addr header)
-> Tracer m (TraceLabelPeer addr (TraceFetchClientState header))
-> BlockFetchConsensusInterface addr header block m
-> FetchClientRegistry addr header block m
-> BlockFetchConfiguration
-> m Void
blockFetchLogic
        Tracer m (TraceDecisionEvent PeerId (Header TestBlock))
decisionTracer
        Tracer
  m
  (TraceLabelPeer PeerId (TraceFetchClientState (Header TestBlock)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
        BlockFetchConsensusInterface PeerId (Header TestBlock) TestBlock m
blockFetchConsensusInterface
        FetchClientRegistry PeerId (Header TestBlock) TestBlock m
fetchClientRegistry
        BlockFetchConfiguration
blockFetchCfg
  where
    decisionTracer :: Tracer m (TraceDecisionEvent PeerId (Header TestBlock))
decisionTracer = String -> TraceEvent TestBlock
forall blk. String -> TraceEvent blk
TraceOther (String -> TraceEvent TestBlock)
-> (TraceDecisionEvent PeerId (Header TestBlock) -> String)
-> TraceDecisionEvent PeerId (Header TestBlock)
-> TraceEvent TestBlock
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String
"BlockFetchLogic | " String -> String -> String
forall a. [a] -> [a] -> [a]
++) (String -> String)
-> (TraceDecisionEvent PeerId (Header TestBlock) -> String)
-> TraceDecisionEvent PeerId (Header TestBlock)
-> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TraceDecisionEvent PeerId (Header TestBlock) -> String
forall a. Show a => a -> String
show (TraceDecisionEvent PeerId (Header TestBlock)
 -> TraceEvent TestBlock)
-> Tracer m (TraceEvent TestBlock)
-> Tracer m (TraceDecisionEvent PeerId (Header TestBlock))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer m (TraceEvent TestBlock)
tracer

startKeepAliveThread ::
     forall m peer blk.
     (Ord peer, IOLike m)
  => ResourceRegistry m
  -> FetchClientRegistry peer (Header blk) blk m
  -> peer
  -> m ()
startKeepAliveThread :: forall (m :: * -> *) peer blk.
(Ord peer, IOLike m) =>
ResourceRegistry m
-> FetchClientRegistry peer (Header blk) blk m -> peer -> m ()
startKeepAliveThread ResourceRegistry m
registry FetchClientRegistry peer (Header blk) blk m
fetchClientRegistry peer
peerId =
    m (Thread m Any) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Any) -> m ()) -> m (Thread m Any) -> m ()
forall a b. (a -> b) -> a -> b
$ ResourceRegistry m -> String -> m Any -> m (Thread m Any)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
registry String
"KeepAlive" (m Any -> m (Thread m Any)) -> m Any -> m (Thread m Any)
forall a b. (a -> b) -> a -> b
$
      FetchClientRegistry peer (Header blk) blk m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m Any) -> m Any
forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient FetchClientRegistry peer (Header blk) blk m
fetchClientRegistry peer
peerId ((StrictTVar m (Map peer PeerGSV) -> m Any) -> m Any)
-> (StrictTVar m (Map peer PeerGSV) -> m Any) -> m Any
forall a b. (a -> b) -> a -> b
$ \StrictTVar m (Map peer PeerGSV)
_ ->
        STM m Any -> m Any
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m Any
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry

runBlockFetchClient ::
     (IOLike m, MonadTime m, MonadTimer m, HasHeader blk, HasHeader (Header blk), ShowProxy blk)
  => Tracer m (TraceEvent blk)
  -> PeerId
  -> BlockFetchTimeout
  -> StateViewTracers blk m
  -> FetchClientRegistry PeerId (Header blk) blk m
  -> ControlMessageSTM m
  -> Channel m (AnyMessage (BlockFetch blk (Point blk)))
     -- ^ Send and receive message via the given 'Channel'.
  -> m ()
runBlockFetchClient :: forall (m :: * -> *) blk.
(IOLike m, MonadTime m, MonadTimer m, HasHeader blk,
 HasHeader (Header blk), ShowProxy blk) =>
Tracer m (TraceEvent blk)
-> PeerId
-> BlockFetchTimeout
-> StateViewTracers blk m
-> FetchClientRegistry PeerId (Header blk) blk m
-> ControlMessageSTM m
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> m ()
runBlockFetchClient Tracer m (TraceEvent blk)
tracer PeerId
peerId BlockFetchTimeout
blockFetchTimeouts StateViewTracers {Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: forall blk (m :: * -> *).
StateViewTracers blk m -> Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer} FetchClientRegistry PeerId (Header blk) blk m
fetchClientRegistry ControlMessageSTM m
controlMsgSTM Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel = do
    FetchClientRegistry PeerId (Header blk) blk m
-> NodeToNodeVersion
-> PeerId
-> (FetchClientContext (Header blk) blk m -> m ())
-> m ()
forall (m :: * -> *) a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer) =>
FetchClientRegistry peer header block m
-> version
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient FetchClientRegistry PeerId (Header blk) blk m
fetchClientRegistry NodeToNodeVersion
ntnVersion PeerId
peerId ((FetchClientContext (Header blk) blk m -> m ()) -> m ())
-> (FetchClientContext (Header blk) blk m -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \FetchClientContext (Header blk) blk m
clientCtx -> do
      Either
  SomeException ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
res <-
        m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall e a. Exception e => m a -> m (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> m (Either
         SomeException
         ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))))
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall a b. (a -> b) -> a -> b
$
          Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
-> Codec
     (BlockFetch blk (Point blk))
     CodecFailure
     m
     (AnyMessage (BlockFetch blk (Point blk)))
-> ProtocolSizeLimits
     (BlockFetch blk (Point blk))
     (AnyMessage (BlockFetch blk (Point blk)))
-> ProtocolTimeLimits (BlockFetch blk (Point blk))
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> PeerPipelined
     (BlockFetch blk (Point blk)) 'AsClient 'BFIdle m ()
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
 MonadThrow (STM m), ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits
            Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
            Codec
  (BlockFetch blk (Point blk))
  CodecFailure
  m
  (AnyMessage (BlockFetch blk (Point blk)))
forall {k} {k1} (block :: k) (point :: k1) (m :: * -> *).
Monad m =>
Codec
  (BlockFetch block point)
  CodecFailure
  m
  (AnyMessage (BlockFetch block point))
codecBlockFetchId
            ProtocolSizeLimits
  (BlockFetch blk (Point blk))
  (AnyMessage (BlockFetch blk (Point blk)))
forall {k} {k1} (block :: k) (point :: k1) bytes.
ProtocolSizeLimits (BlockFetch block point) bytes
blockFetchNoSizeLimits
            (BlockFetchTimeout
-> ProtocolTimeLimits (BlockFetch blk (Point blk))
forall {k} {k1} (block :: k) (point :: k1).
BlockFetchTimeout -> ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch BlockFetchTimeout
blockFetchTimeouts)
            Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel
            (NodeToNodeVersion
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext (Header blk) blk m
-> PeerPipelined
     (BlockFetch blk (Point blk)) 'AsClient 'BFIdle m ()
forall header block versionNumber (m :: * -> *).
(MonadSTM m, MonadThrow m, MonadTime m, MonadMonotonicTime m,
 HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block) =>
versionNumber
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext header block m
-> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m ()
blockFetchClient NodeToNodeVersion
ntnVersion ControlMessageSTM m
controlMsgSTM FetchedMetricsTracer m
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer FetchClientContext (Header blk) blk m
clientCtx)
      case Either
  SomeException ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
res of
        Right ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes) -> Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
          PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$ Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchClientResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$ Maybe (AnyMessage (BlockFetch blk (Point blk)))
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. b -> Either a b
Right Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes
        Left SomeException
exn -> do
          Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
            PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$ Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchClientResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$ SomeException
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. a -> Either a b
Left SomeException
exn
          case SomeException -> Maybe ProtocolLimitFailure
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exn of
            Just (ExceededSizeLimit StateToken st
_) ->
              Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
tracer (TraceEvent blk -> m ()) -> TraceEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
forall blk.
PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
TraceBlockFetchClientTerminationEvent PeerId
peerId TraceBlockFetchClientTerminationEvent
TraceExceededSizeLimitBF
            Just (ExceededTimeLimit StateToken st
_) ->
              Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
tracer (TraceEvent blk -> m ()) -> TraceEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
forall blk.
PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
TraceBlockFetchClientTerminationEvent PeerId
peerId TraceBlockFetchClientTerminationEvent
TraceExceededTimeLimitBF
            Maybe ProtocolLimitFailure
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  where
    ntnVersion :: NodeToNodeVersion
    ntnVersion :: NodeToNodeVersion
ntnVersion = NodeToNodeVersion
forall a. Bounded a => a
maxBound

blockFetchNoSizeLimits :: ProtocolSizeLimits (BlockFetch block point) bytes
blockFetchNoSizeLimits :: forall {k} {k1} (block :: k) (point :: k1) bytes.
ProtocolSizeLimits (BlockFetch block point) bytes
blockFetchNoSizeLimits = (bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
forall {k} {k1} bytes (block :: k) (point :: k1).
(bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
byteLimitsBlockFetch (Word -> bytes -> Word
forall a b. a -> b -> a
const Word
0)

-- | Same as 'timeLimitsChainSync' for BlockFetch. NOTE: There exists a
-- @timeLimitsBlockFetch@ in 'Ouroboros.Network.Protocol.BlockFetch.Codec' but
-- it does not allow customising the values as 'timeLimitsChainSync' does.
-- REVIEW: Should this be upstreamed to `ouroboros-network-protocols`?
timeLimitsBlockFetch :: forall block point. BlockFetchTimeout -> ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch :: forall {k} {k1} (block :: k) (point :: k1).
BlockFetchTimeout -> ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch BlockFetchTimeout{Maybe DiffTime
busyTimeout :: Maybe DiffTime
$sel:busyTimeout:BlockFetchTimeout :: BlockFetchTimeout -> Maybe DiffTime
busyTimeout, Maybe DiffTime
streamingTimeout :: Maybe DiffTime
$sel:streamingTimeout:BlockFetchTimeout :: BlockFetchTimeout -> Maybe DiffTime
streamingTimeout} =
  (forall (st :: BlockFetch block point).
 ActiveState st =>
 StateToken st -> Maybe DiffTime)
-> ProtocolTimeLimits (BlockFetch block point)
forall ps.
(forall (st :: ps).
 ActiveState st =>
 StateToken st -> Maybe DiffTime)
-> ProtocolTimeLimits ps
ProtocolTimeLimits StateToken st -> Maybe DiffTime
forall (st :: BlockFetch block point).
ActiveState st =>
StateToken st -> Maybe DiffTime
stateToLimit
  where
    stateToLimit :: forall (st :: BlockFetch block point).
                    ActiveState st => StateToken st-> Maybe DiffTime
    stateToLimit :: forall (st :: BlockFetch block point).
ActiveState st =>
StateToken st -> Maybe DiffTime
stateToLimit SingBlockFetch st
StateToken st
SingBFIdle      = Maybe DiffTime
waitForever
    stateToLimit SingBlockFetch st
StateToken st
SingBFBusy      = Maybe DiffTime
busyTimeout
    stateToLimit SingBlockFetch st
StateToken st
SingBFStreaming = Maybe DiffTime
streamingTimeout
    stateToLimit a :: StateToken st
a@SingBlockFetch st
StateToken st
SingBFDone    = StateToken 'BFDone -> forall a. a
forall ps (st :: ps).
(StateAgency st ~ 'NobodyAgency, ActiveState st) =>
StateToken st -> forall a. a
notActiveState StateToken st
StateToken 'BFDone
a

blockFetchNoTimeouts :: BlockFetchTimeout
blockFetchNoTimeouts :: BlockFetchTimeout
blockFetchNoTimeouts =
  BlockFetchTimeout
    { $sel:busyTimeout:BlockFetchTimeout :: Maybe DiffTime
busyTimeout = Maybe DiffTime
forall a. Maybe a
Nothing,
      $sel:streamingTimeout:BlockFetchTimeout :: Maybe DiffTime
streamingTimeout = Maybe DiffTime
forall a. Maybe a
Nothing
    }

runBlockFetchServer ::
  (IOLike m, ShowProxy blk) =>
  Tracer m (TraceEvent blk) ->
  PeerId ->
  StateViewTracers blk m ->
  BlockFetchServer blk (Point blk) m () ->
  Channel m (AnyMessage (BlockFetch blk (Point blk))) ->
  -- ^ Send and receive message via the given 'Channel'.
  m ()
runBlockFetchServer :: forall (m :: * -> *) blk.
(IOLike m, ShowProxy blk) =>
Tracer m (TraceEvent blk)
-> PeerId
-> StateViewTracers blk m
-> BlockFetchServer blk (Point blk) m ()
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> m ()
runBlockFetchServer Tracer m (TraceEvent blk)
_tracer PeerId
peerId StateViewTracers {Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: forall blk (m :: * -> *).
StateViewTracers blk m -> Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer} BlockFetchServer blk (Point blk) m ()
server Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel = do
  Either
  SomeException ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
res <- m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall e a. Exception e => m a -> m (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> m (Either
         SomeException
         ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))))
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
-> Codec
     (BlockFetch blk (Point blk))
     CodecFailure
     m
     (AnyMessage (BlockFetch blk (Point blk)))
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> Peer
     (BlockFetch blk (Point blk)) 'AsServer 'NonPipelined 'BFIdle m ()
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadThrow m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeer Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Codec
  (BlockFetch blk (Point blk))
  CodecFailure
  m
  (AnyMessage (BlockFetch blk (Point blk)))
forall {k} {k1} (block :: k) (point :: k1) (m :: * -> *).
Monad m =>
Codec
  (BlockFetch block point)
  CodecFailure
  m
  (AnyMessage (BlockFetch block point))
codecBlockFetchId Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel (BlockFetchServer blk (Point blk) m ()
-> Peer
     (BlockFetch blk (Point blk)) 'AsServer 'NonPipelined 'BFIdle m ()
forall block point (m :: * -> *) a.
Functor m =>
BlockFetchServer block point m a
-> Server (BlockFetch block point) 'NonPipelined 'BFIdle m a
blockFetchServerPeer BlockFetchServer blk (Point blk) m ()
server)
  case Either
  SomeException ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
res of
    Right ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes) -> Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
      PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$ Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchServerResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$ Maybe (AnyMessage (BlockFetch blk (Point blk)))
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. b -> Either a b
Right Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes
    Left SomeException
exn -> do
      Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
        PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$ Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchServerResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$ SomeException
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. a -> Either a b
Left SomeException
exn
      -- NOTE: here we are able to trace exceptions, as what is done in `runChainSyncClient`
      case SomeException -> Maybe SomeException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exn of
        (Maybe SomeException
_ :: Maybe SomeException) -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()