{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

{-# OPTIONS_GHC -Wno-missing-export-lists #-}

-- | Functions that call to the BlockFetch API to start clients and servers
module Test.Consensus.PeerSimulator.BlockFetch (
    blockFetchNoTimeouts
  , runBlockFetchClient
  , runBlockFetchServer
  , startBlockFetchLogic
  , startKeepAliveThread
  ) where

import           Control.Exception (SomeException)
import           Control.Monad (void)
import           Control.Monad.Class.MonadTime
import           Control.Monad.Class.MonadTimer.SI (MonadTimer)
import           Control.ResourceRegistry
import           Control.Tracer (Tracer, nullTracer, traceWith)
import           Data.Functor.Contravariant ((>$<))
import           Data.Map.Strict (Map)
import           Network.TypedProtocol.Codec (ActiveState, AnyMessage,
                     StateToken, notActiveState)
import           Ouroboros.Consensus.Block (HasHeader)
import           Ouroboros.Consensus.Block.Abstract (Header, Point (..))
import           Ouroboros.Consensus.Config
import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface
import           Ouroboros.Consensus.Node.ProtocolInfo
                     (NumCoreNodes (NumCoreNodes))
import           Ouroboros.Consensus.Storage.ChainDB.API
import           Ouroboros.Consensus.Util (ShowProxy)
import           Ouroboros.Consensus.Util.IOLike (DiffTime,
                     Exception (fromException), IOLike, STM, atomically, retry,
                     try)
import           Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import           Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..),
                     FetchClientRegistry, FetchMode (..), blockFetchLogic,
                     bracketFetchClient, bracketKeepAliveClient)
import           Ouroboros.Network.BlockFetch.Client (blockFetchClient)
import           Ouroboros.Network.Channel (Channel)
import           Ouroboros.Network.ControlMessage (ControlMessageSTM)
import           Ouroboros.Network.Driver (runPeer)
import           Ouroboros.Network.Driver.Limits
                     (ProtocolLimitFailure (ExceededSizeLimit, ExceededTimeLimit),
                     runPipelinedPeerWithLimits)
import           Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import           Ouroboros.Network.Protocol.BlockFetch.Codec
                     (byteLimitsBlockFetch, codecBlockFetchId)
import           Ouroboros.Network.Protocol.BlockFetch.Server
                     (BlockFetchServer (..), blockFetchServerPeer)
import           Ouroboros.Network.Protocol.BlockFetch.Type (BlockFetch (..),
                     SingBlockFetch (..))
import           Ouroboros.Network.Protocol.Limits (ProtocolSizeLimits (..),
                     ProtocolTimeLimits (..), waitForever)
import           Test.Consensus.PeerSimulator.StateView
                     (PeerSimulatorComponentResult (..),
                     PeerSimulatorResult (..),
                     StateViewTracers (StateViewTracers, svtPeerSimulatorResultsTracer))
import           Test.Consensus.PeerSimulator.Trace
                     (TraceBlockFetchClientTerminationEvent (..),
                     TraceEvent (..))
import           Test.Consensus.PointSchedule (BlockFetchTimeout (..))
import           Test.Consensus.PointSchedule.Peers (PeerId)
import           Test.Util.Orphans.IOLike ()
import           Test.Util.TestBlock (BlockConfig (TestBlockConfig), TestBlock)
import           Test.Util.Time (dawnOfTime)

startBlockFetchLogic ::
     forall m.
     (IOLike m)
  => ResourceRegistry m
  -> Tracer m (TraceEvent TestBlock)
  -> ChainDB m TestBlock
  -> FetchClientRegistry PeerId (Header TestBlock) TestBlock m
  -> STM m (Map PeerId (AnchoredFragment (Header TestBlock)))
  -> m ()
startBlockFetchLogic :: forall (m :: * -> *).
IOLike m =>
ResourceRegistry m
-> Tracer m (TraceEvent TestBlock)
-> ChainDB m TestBlock
-> FetchClientRegistry PeerId (Header TestBlock) TestBlock m
-> STM m (Map PeerId (AnchoredFragment (Header TestBlock)))
-> m ()
startBlockFetchLogic ResourceRegistry m
registry Tracer m (TraceEvent TestBlock)
tracer ChainDB m TestBlock
chainDb FetchClientRegistry PeerId (Header TestBlock) TestBlock m
fetchClientRegistry STM m (Map PeerId (AnchoredFragment (Header TestBlock)))
getCandidates = do
    let slotForgeTime :: BlockFetchClientInterface.SlotForgeTimeOracle m blk
        slotForgeTime :: forall blk. SlotForgeTimeOracle m blk
slotForgeTime RealPoint blk
_ = UTCTime -> STM m UTCTime
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure UTCTime
dawnOfTime

        blockFetchConsensusInterface :: BlockFetchConsensusInterface PeerId (Header TestBlock) TestBlock m
blockFetchConsensusInterface =
          BlockConfig TestBlock
-> ChainDbView m TestBlock
-> STM m (Map PeerId (AnchoredFragment (Header TestBlock)))
-> (Header TestBlock -> SizeInBytes)
-> SlotForgeTimeOracle m TestBlock
-> STM m FetchMode
-> DiffusionPipeliningSupport
-> BlockFetchConsensusInterface
     PeerId (Header TestBlock) TestBlock m
forall (m :: * -> *) peer blk.
(IOLike m, BlockSupportsDiffusionPipelining blk,
 BlockSupportsProtocol blk) =>
BlockConfig blk
-> ChainDbView m blk
-> STM m (Map peer (AnchoredFragment (Header blk)))
-> (Header blk -> SizeInBytes)
-> SlotForgeTimeOracle m blk
-> STM m FetchMode
-> DiffusionPipeliningSupport
-> BlockFetchConsensusInterface peer (Header blk) blk m
BlockFetchClientInterface.mkBlockFetchConsensusInterface
            (NumCoreNodes -> BlockConfig TestBlock
forall ptype. NumCoreNodes -> BlockConfig (TestBlockWith ptype)
TestBlockConfig (NumCoreNodes -> BlockConfig TestBlock)
-> NumCoreNodes -> BlockConfig TestBlock
forall a b. (a -> b) -> a -> b
$ Word64 -> NumCoreNodes
NumCoreNodes Word64
0) -- Only needed when minting blocks
            (ChainDB m TestBlock -> ChainDbView m TestBlock
forall (m :: * -> *) blk.
IOLike m =>
ChainDB m blk -> ChainDbView m blk
BlockFetchClientInterface.defaultChainDbView ChainDB m TestBlock
chainDb)
            STM m (Map PeerId (AnchoredFragment (Header TestBlock)))
getCandidates
            -- The size of headers in bytes is irrelevant because our tests
            -- do not serialize the blocks.
            (\Header TestBlock
_hdr -> SizeInBytes
1000)
            SlotForgeTimeOracle m TestBlock
forall blk. SlotForgeTimeOracle m blk
slotForgeTime
            -- Initially, we tried FetchModeBulkSync, but adversaries had the
            -- opportunity to delay syncing by not responding to block requests.
            -- The BlockFetch logic would then wait for the timeout to expire
            -- before trying to download the block from another peer.
            (FetchMode -> STM m FetchMode
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure FetchMode
FetchModeDeadline)
            DiffusionPipeliningSupport
DiffusionPipeliningOn

        -- Values taken from
        -- ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs
        blockFetchCfg :: BlockFetchConfiguration
blockFetchCfg = BlockFetchConfiguration
          { -- We set a higher value here to allow downloading blocks from all
            -- peers.
            --
            -- If the value is too low, block downloads from a peer may prevent
            -- blocks from being downloaded from other peers. This can be
            -- problematic, since the batch download of a simulated BlockFetch
            -- server can last serveral ticks if the block pointer is not
            -- advanced to allow completion of the batch.
            --
            bfcMaxConcurrencyBulkSync :: Word
bfcMaxConcurrencyBulkSync = Word
50
          , bfcMaxConcurrencyDeadline :: Word
bfcMaxConcurrencyDeadline = Word
50
          , bfcMaxRequestsInflight :: Word
bfcMaxRequestsInflight = Word
10
          , bfcDecisionLoopInterval :: DiffTime
bfcDecisionLoopInterval = DiffTime
0
          , bfcSalt :: Int
bfcSalt = Int
0
          }

    m (Thread m Void) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Void) -> m ()) -> m (Thread m Void) -> m ()
forall a b. (a -> b) -> a -> b
$ ResourceRegistry m -> String -> m Void -> m (Thread m Void)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
registry String
"BlockFetchLogic" (m Void -> m (Thread m Void)) -> m Void -> m (Thread m Void)
forall a b. (a -> b) -> a -> b
$
      Tracer
  m
  [TraceLabelPeer PeerId (FetchDecision [Point (Header TestBlock)])]
-> Tracer
     m
     (TraceLabelPeer PeerId (TraceFetchClientState (Header TestBlock)))
-> BlockFetchConsensusInterface
     PeerId (Header TestBlock) TestBlock m
-> FetchClientRegistry PeerId (Header TestBlock) TestBlock m
-> BlockFetchConfiguration
-> m Void
forall addr header block (m :: * -> *).
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block, MonadDelay m, MonadSTM m,
 Ord addr, Hashable addr) =>
Tracer m [TraceLabelPeer addr (FetchDecision [Point header])]
-> Tracer m (TraceLabelPeer addr (TraceFetchClientState header))
-> BlockFetchConsensusInterface addr header block m
-> FetchClientRegistry addr header block m
-> BlockFetchConfiguration
-> m Void
blockFetchLogic
        Tracer
  m
  [TraceLabelPeer PeerId (FetchDecision [Point (Header TestBlock)])]
decisionTracer
        Tracer
  m
  (TraceLabelPeer PeerId (TraceFetchClientState (Header TestBlock)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
        BlockFetchConsensusInterface PeerId (Header TestBlock) TestBlock m
blockFetchConsensusInterface
        FetchClientRegistry PeerId (Header TestBlock) TestBlock m
fetchClientRegistry
        BlockFetchConfiguration
blockFetchCfg
  where
    decisionTracer :: Tracer
  m
  [TraceLabelPeer PeerId (FetchDecision [Point (Header TestBlock)])]
decisionTracer = String -> TraceEvent TestBlock
forall blk. String -> TraceEvent blk
TraceOther (String -> TraceEvent TestBlock)
-> ([TraceLabelPeer
       PeerId (FetchDecision [Point (Header TestBlock)])]
    -> String)
-> [TraceLabelPeer
      PeerId (FetchDecision [Point (Header TestBlock)])]
-> TraceEvent TestBlock
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String
"BlockFetchLogic | " String -> String -> String
forall a. [a] -> [a] -> [a]
++) (String -> String)
-> ([TraceLabelPeer
       PeerId (FetchDecision [Point (Header TestBlock)])]
    -> String)
-> [TraceLabelPeer
      PeerId (FetchDecision [Point (Header TestBlock)])]
-> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [TraceLabelPeer PeerId (FetchDecision [Point (Header TestBlock)])]
-> String
forall a. Show a => a -> String
show ([TraceLabelPeer PeerId (FetchDecision [Point (Header TestBlock)])]
 -> TraceEvent TestBlock)
-> Tracer m (TraceEvent TestBlock)
-> Tracer
     m
     [TraceLabelPeer PeerId (FetchDecision [Point (Header TestBlock)])]
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer m (TraceEvent TestBlock)
tracer

startKeepAliveThread ::
     forall m peer blk.
     (Ord peer, IOLike m)
  => ResourceRegistry m
  -> FetchClientRegistry peer (Header blk) blk m
  -> peer
  -> m ()
startKeepAliveThread :: forall (m :: * -> *) peer blk.
(Ord peer, IOLike m) =>
ResourceRegistry m
-> FetchClientRegistry peer (Header blk) blk m -> peer -> m ()
startKeepAliveThread ResourceRegistry m
registry FetchClientRegistry peer (Header blk) blk m
fetchClientRegistry peer
peerId =
    m (Thread m Any) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Any) -> m ()) -> m (Thread m Any) -> m ()
forall a b. (a -> b) -> a -> b
$ ResourceRegistry m -> String -> m Any -> m (Thread m Any)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread ResourceRegistry m
registry String
"KeepAlive" (m Any -> m (Thread m Any)) -> m Any -> m (Thread m Any)
forall a b. (a -> b) -> a -> b
$
      FetchClientRegistry peer (Header blk) blk m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m Any) -> m Any
forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient FetchClientRegistry peer (Header blk) blk m
fetchClientRegistry peer
peerId ((StrictTVar m (Map peer PeerGSV) -> m Any) -> m Any)
-> (StrictTVar m (Map peer PeerGSV) -> m Any) -> m Any
forall a b. (a -> b) -> a -> b
$ \StrictTVar m (Map peer PeerGSV)
_ ->
        STM m Any -> m Any
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m Any
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry

runBlockFetchClient ::
     (IOLike m, MonadTime m, MonadTimer m, HasHeader blk, HasHeader (Header blk), ShowProxy blk)
  => Tracer m (TraceEvent blk)
  -> PeerId
  -> BlockFetchTimeout
  -> StateViewTracers blk m
  -> FetchClientRegistry PeerId (Header blk) blk m
  -> ControlMessageSTM m
  -> Channel m (AnyMessage (BlockFetch blk (Point blk)))
     -- ^ Send and receive message via the given 'Channel'.
  -> m ()
runBlockFetchClient :: forall (m :: * -> *) blk.
(IOLike m, MonadTime m, MonadTimer m, HasHeader blk,
 HasHeader (Header blk), ShowProxy blk) =>
Tracer m (TraceEvent blk)
-> PeerId
-> BlockFetchTimeout
-> StateViewTracers blk m
-> FetchClientRegistry PeerId (Header blk) blk m
-> ControlMessageSTM m
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> m ()
runBlockFetchClient Tracer m (TraceEvent blk)
tracer PeerId
peerId BlockFetchTimeout
blockFetchTimeouts StateViewTracers {Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: forall blk (m :: * -> *).
StateViewTracers blk m -> Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer} FetchClientRegistry PeerId (Header blk) blk m
fetchClientRegistry ControlMessageSTM m
controlMsgSTM Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel = do
    FetchClientRegistry PeerId (Header blk) blk m
-> NodeToNodeVersion
-> PeerId
-> (FetchClientContext (Header blk) blk m -> m ())
-> m ()
forall (m :: * -> *) a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer) =>
FetchClientRegistry peer header block m
-> version
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient FetchClientRegistry PeerId (Header blk) blk m
fetchClientRegistry NodeToNodeVersion
ntnVersion PeerId
peerId ((FetchClientContext (Header blk) blk m -> m ()) -> m ())
-> (FetchClientContext (Header blk) blk m -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \FetchClientContext (Header blk) blk m
clientCtx -> do
      Either
  SomeException ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
res <-
        m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall e a. Exception e => m a -> m (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> m (Either
         SomeException
         ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))))
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall a b. (a -> b) -> a -> b
$
          Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
-> Codec
     (BlockFetch blk (Point blk))
     CodecFailure
     m
     (AnyMessage (BlockFetch blk (Point blk)))
-> ProtocolSizeLimits
     (BlockFetch blk (Point blk))
     (AnyMessage (BlockFetch blk (Point blk)))
-> ProtocolTimeLimits (BlockFetch blk (Point blk))
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> PeerPipelined
     (BlockFetch blk (Point blk)) 'AsClient 'BFIdle m ()
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
 MonadThrow (STM m), ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits
            Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
            Codec
  (BlockFetch blk (Point blk))
  CodecFailure
  m
  (AnyMessage (BlockFetch blk (Point blk)))
forall {k} {k1} (block :: k) (point :: k1) (m :: * -> *).
Monad m =>
Codec
  (BlockFetch block point)
  CodecFailure
  m
  (AnyMessage (BlockFetch block point))
codecBlockFetchId
            ProtocolSizeLimits
  (BlockFetch blk (Point blk))
  (AnyMessage (BlockFetch blk (Point blk)))
forall {k} {k1} (block :: k) (point :: k1) bytes.
ProtocolSizeLimits (BlockFetch block point) bytes
blockFetchNoSizeLimits
            (BlockFetchTimeout
-> ProtocolTimeLimits (BlockFetch blk (Point blk))
forall {k} {k1} (block :: k) (point :: k1).
BlockFetchTimeout -> ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch BlockFetchTimeout
blockFetchTimeouts)
            Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel
            (NodeToNodeVersion
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext (Header blk) blk m
-> PeerPipelined
     (BlockFetch blk (Point blk)) 'AsClient 'BFIdle m ()
forall header block versionNumber (m :: * -> *).
(MonadSTM m, MonadThrow m, MonadTime m, MonadMonotonicTime m,
 HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block) =>
versionNumber
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext header block m
-> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m ()
blockFetchClient NodeToNodeVersion
ntnVersion ControlMessageSTM m
controlMsgSTM FetchedMetricsTracer m
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer FetchClientContext (Header blk) blk m
clientCtx)
      case Either
  SomeException ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
res of
        Right ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes) -> Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
          PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$ Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchClientResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$ Maybe (AnyMessage (BlockFetch blk (Point blk)))
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. b -> Either a b
Right Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes
        Left SomeException
exn -> do
          Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
            PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$ Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchClientResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$ SomeException
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. a -> Either a b
Left SomeException
exn
          case SomeException -> Maybe ProtocolLimitFailure
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exn of
            Just (ExceededSizeLimit StateToken st
_) ->
              Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
tracer (TraceEvent blk -> m ()) -> TraceEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
forall blk.
PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
TraceBlockFetchClientTerminationEvent PeerId
peerId TraceBlockFetchClientTerminationEvent
TraceExceededSizeLimitBF
            Just (ExceededTimeLimit StateToken st
_) ->
              Tracer m (TraceEvent blk) -> TraceEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEvent blk)
tracer (TraceEvent blk -> m ()) -> TraceEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
forall blk.
PeerId -> TraceBlockFetchClientTerminationEvent -> TraceEvent blk
TraceBlockFetchClientTerminationEvent PeerId
peerId TraceBlockFetchClientTerminationEvent
TraceExceededTimeLimitBF
            Maybe ProtocolLimitFailure
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  where
    ntnVersion :: NodeToNodeVersion
    ntnVersion :: NodeToNodeVersion
ntnVersion = NodeToNodeVersion
forall a. Bounded a => a
maxBound

blockFetchNoSizeLimits :: ProtocolSizeLimits (BlockFetch block point) bytes
blockFetchNoSizeLimits :: forall {k} {k1} (block :: k) (point :: k1) bytes.
ProtocolSizeLimits (BlockFetch block point) bytes
blockFetchNoSizeLimits = (bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
forall {k} {k1} bytes (block :: k) (point :: k1).
(bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
byteLimitsBlockFetch (Word -> bytes -> Word
forall a b. a -> b -> a
const Word
0)

-- | Same as 'timeLimitsChainSync' for BlockFetch. NOTE: There exists a
-- @timeLimitsBlockFetch@ in 'Ouroboros.Network.Protocol.BlockFetch.Codec' but
-- it does not allow customising the values as 'timeLimitsChainSync' does.
-- REVIEW: Should this be upstreamed to `ouroboros-network-protocols`?
timeLimitsBlockFetch :: forall block point. BlockFetchTimeout -> ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch :: forall {k} {k1} (block :: k) (point :: k1).
BlockFetchTimeout -> ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch BlockFetchTimeout{Maybe DiffTime
busyTimeout :: Maybe DiffTime
$sel:busyTimeout:BlockFetchTimeout :: BlockFetchTimeout -> Maybe DiffTime
busyTimeout, Maybe DiffTime
streamingTimeout :: Maybe DiffTime
$sel:streamingTimeout:BlockFetchTimeout :: BlockFetchTimeout -> Maybe DiffTime
streamingTimeout} =
  (forall (st :: BlockFetch block point).
 ActiveState st =>
 StateToken st -> Maybe DiffTime)
-> ProtocolTimeLimits (BlockFetch block point)
forall ps.
(forall (st :: ps).
 ActiveState st =>
 StateToken st -> Maybe DiffTime)
-> ProtocolTimeLimits ps
ProtocolTimeLimits StateToken st -> Maybe DiffTime
forall (st :: BlockFetch block point).
ActiveState st =>
StateToken st -> Maybe DiffTime
stateToLimit
  where
    stateToLimit :: forall (st :: BlockFetch block point).
                    ActiveState st => StateToken st-> Maybe DiffTime
    stateToLimit :: forall (st :: BlockFetch block point).
ActiveState st =>
StateToken st -> Maybe DiffTime
stateToLimit SingBlockFetch st
StateToken st
SingBFIdle      = Maybe DiffTime
waitForever
    stateToLimit SingBlockFetch st
StateToken st
SingBFBusy      = Maybe DiffTime
busyTimeout
    stateToLimit SingBlockFetch st
StateToken st
SingBFStreaming = Maybe DiffTime
streamingTimeout
    stateToLimit a :: StateToken st
a@SingBlockFetch st
StateToken st
SingBFDone    = StateToken 'BFDone -> forall a. a
forall ps (st :: ps).
(StateAgency st ~ 'NobodyAgency, ActiveState st) =>
StateToken st -> forall a. a
notActiveState StateToken st
StateToken 'BFDone
a

blockFetchNoTimeouts :: BlockFetchTimeout
blockFetchNoTimeouts :: BlockFetchTimeout
blockFetchNoTimeouts =
  BlockFetchTimeout
    { $sel:busyTimeout:BlockFetchTimeout :: Maybe DiffTime
busyTimeout = Maybe DiffTime
forall a. Maybe a
Nothing,
      $sel:streamingTimeout:BlockFetchTimeout :: Maybe DiffTime
streamingTimeout = Maybe DiffTime
forall a. Maybe a
Nothing
    }

runBlockFetchServer ::
  (IOLike m, ShowProxy blk) =>
  Tracer m (TraceEvent blk) ->
  PeerId ->
  StateViewTracers blk m ->
  BlockFetchServer blk (Point blk) m () ->
  Channel m (AnyMessage (BlockFetch blk (Point blk))) ->
  -- ^ Send and receive message via the given 'Channel'.
  m ()
runBlockFetchServer :: forall (m :: * -> *) blk.
(IOLike m, ShowProxy blk) =>
Tracer m (TraceEvent blk)
-> PeerId
-> StateViewTracers blk m
-> BlockFetchServer blk (Point blk) m ()
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> m ()
runBlockFetchServer Tracer m (TraceEvent blk)
_tracer PeerId
peerId StateViewTracers {Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: forall blk (m :: * -> *).
StateViewTracers blk m -> Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer :: Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer} BlockFetchServer blk (Point blk) m ()
server Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel = do
  Either
  SomeException ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
res <- m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall e a. Exception e => m a -> m (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> m (Either
         SomeException
         ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))))
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> m (Either
        SomeException
        ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))))
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
-> Codec
     (BlockFetch blk (Point blk))
     CodecFailure
     m
     (AnyMessage (BlockFetch blk (Point blk)))
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-> Peer
     (BlockFetch blk (Point blk)) 'AsServer 'NonPipelined 'BFIdle m ()
-> m ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadThrow m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeer Tracer m (TraceSendRecv (BlockFetch blk (Point blk)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Codec
  (BlockFetch blk (Point blk))
  CodecFailure
  m
  (AnyMessage (BlockFetch blk (Point blk)))
forall {k} {k1} (block :: k) (point :: k1) (m :: * -> *).
Monad m =>
Codec
  (BlockFetch block point)
  CodecFailure
  m
  (AnyMessage (BlockFetch block point))
codecBlockFetchId Channel m (AnyMessage (BlockFetch blk (Point blk)))
channel (BlockFetchServer blk (Point blk) m ()
-> Peer
     (BlockFetch blk (Point blk)) 'AsServer 'NonPipelined 'BFIdle m ()
forall block point (m :: * -> *) a.
Functor m =>
BlockFetchServer block point m a
-> Server (BlockFetch block point) 'NonPipelined 'BFIdle m a
blockFetchServerPeer BlockFetchServer blk (Point blk) m ()
server)
  case Either
  SomeException ((), Maybe (AnyMessage (BlockFetch blk (Point blk))))
res of
    Right ((), Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes) -> Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
      PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$ Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchServerResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$ Maybe (AnyMessage (BlockFetch blk (Point blk)))
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. b -> Either a b
Right Maybe (AnyMessage (BlockFetch blk (Point blk)))
msgRes
    Left SomeException
exn -> do
      Tracer m (PeerSimulatorResult blk)
-> PeerSimulatorResult blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (PeerSimulatorResult blk)
svtPeerSimulatorResultsTracer (PeerSimulatorResult blk -> m ())
-> PeerSimulatorResult blk -> m ()
forall a b. (a -> b) -> a -> b
$
        PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall blk.
PeerId
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
PeerSimulatorResult PeerId
peerId (PeerSimulatorComponentResult blk -> PeerSimulatorResult blk)
-> PeerSimulatorComponentResult blk -> PeerSimulatorResult blk
forall a b. (a -> b) -> a -> b
$ Either
  SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall blk.
Either SomeException (Maybe (BlockFetchResult blk))
-> PeerSimulatorComponentResult blk
SomeBlockFetchServerResult (Either
   SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
 -> PeerSimulatorComponentResult blk)
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
-> PeerSimulatorComponentResult blk
forall a b. (a -> b) -> a -> b
$ SomeException
-> Either
     SomeException (Maybe (AnyMessage (BlockFetch blk (Point blk))))
forall a b. a -> Either a b
Left SomeException
exn
      -- NOTE: here we are able to trace exceptions, as what is done in `runChainSyncClient`
      case SomeException -> Maybe SomeException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exn of
        (Maybe SomeException
_ :: Maybe SomeException) -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()