{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE EmptyDataDeriving #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module Ouroboros.Consensus.MiniProtocol.BlockFetch.Server
( blockFetchServer
, TraceBlockFetchServerEvent (..)
, BlockFetchServerException
, blockFetchServer'
) where
import Control.ResourceRegistry (ResourceRegistry)
import Control.Tracer (Tracer, traceWith)
import Data.Typeable (Typeable)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Storage.ChainDB
( ChainDB
, Iterator
, IteratorResult (..)
, UnknownRange
, WithPoint (..)
, getSerialisedBlockWithPoint
)
import qualified Ouroboros.Consensus.Storage.ChainDB as ChainDB
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Network.Block (Serialised (..))
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.BlockFetch.Server
( BlockFetchBlockSender (..)
, BlockFetchSendBlocks (..)
, BlockFetchServer (..)
)
import Ouroboros.Network.Protocol.BlockFetch.Type (ChainRange (..))
data BlockFetchServerException
=
forall blk.
(Typeable blk, StandardHash blk) =>
BlockGCed (RealPoint blk)
|
NoGenesisBlock
deriving instance Show BlockFetchServerException
instance Exception BlockFetchServerException
blockFetchServer ::
forall m blk.
( IOLike m
, StandardHash blk
, Typeable blk
) =>
Tracer m (TraceBlockFetchServerEvent blk) ->
ChainDB m blk ->
NodeToNodeVersion ->
ResourceRegistry m ->
BlockFetchServer (Serialised blk) (Point blk) m ()
blockFetchServer :: forall (m :: * -> *) blk.
(IOLike m, StandardHash blk, Typeable blk) =>
Tracer m (TraceBlockFetchServerEvent blk)
-> ChainDB m blk
-> NodeToNodeVersion
-> ResourceRegistry m
-> BlockFetchServer (Serialised blk) (Point blk) m ()
blockFetchServer Tracer m (TraceBlockFetchServerEvent blk)
tracer ChainDB m blk
chainDB NodeToNodeVersion
_version ResourceRegistry m
registry =
Tracer m (TraceBlockFetchServerEvent blk)
-> (StreamFrom blk
-> StreamTo blk
-> m (Either
(UnknownRange blk)
(Iterator m blk (WithPoint blk (Serialised blk)))))
-> BlockFetchServer (Serialised blk) (Point blk) m ()
forall (m :: * -> *) blk a.
(IOLike m, StandardHash blk, Typeable blk) =>
Tracer m (TraceBlockFetchServerEvent blk)
-> (StreamFrom blk
-> StreamTo blk
-> m (Either
(UnknownRange blk) (Iterator m blk (WithPoint blk a))))
-> BlockFetchServer a (Point blk) m ()
blockFetchServer' Tracer m (TraceBlockFetchServerEvent blk)
tracer ((StreamFrom blk
-> StreamTo blk
-> m (Either
(UnknownRange blk)
(Iterator m blk (WithPoint blk (Serialised blk)))))
-> BlockFetchServer (Serialised blk) (Point blk) m ())
-> (StreamFrom blk
-> StreamTo blk
-> m (Either
(UnknownRange blk)
(Iterator m blk (WithPoint blk (Serialised blk)))))
-> BlockFetchServer (Serialised blk) (Point blk) m ()
forall a b. (a -> b) -> a -> b
$
ChainDB m blk
-> forall b.
ResourceRegistry m
-> BlockComponent blk b
-> StreamFrom blk
-> StreamTo blk
-> m (Either (UnknownRange blk) (Iterator m blk b))
forall (m :: * -> *) blk.
ChainDB m blk
-> forall b.
ResourceRegistry m
-> BlockComponent blk b
-> StreamFrom blk
-> StreamTo blk
-> m (Either (UnknownRange blk) (Iterator m blk b))
ChainDB.stream ChainDB m blk
chainDB ResourceRegistry m
registry BlockComponent blk (WithPoint blk (Serialised blk))
forall blk. BlockComponent blk (WithPoint blk (Serialised blk))
getSerialisedBlockWithPoint
blockFetchServer' ::
forall m blk a.
( IOLike m
, StandardHash blk
, Typeable blk
) =>
Tracer m (TraceBlockFetchServerEvent blk) ->
( ChainDB.StreamFrom blk ->
ChainDB.StreamTo blk ->
m (Either (UnknownRange blk) (Iterator m blk (WithPoint blk a)))
) ->
BlockFetchServer a (Point blk) m ()
blockFetchServer' :: forall (m :: * -> *) blk a.
(IOLike m, StandardHash blk, Typeable blk) =>
Tracer m (TraceBlockFetchServerEvent blk)
-> (StreamFrom blk
-> StreamTo blk
-> m (Either
(UnknownRange blk) (Iterator m blk (WithPoint blk a))))
-> BlockFetchServer a (Point blk) m ()
blockFetchServer' Tracer m (TraceBlockFetchServerEvent blk)
tracer StreamFrom blk
-> StreamTo blk
-> m (Either (UnknownRange blk) (Iterator m blk (WithPoint blk a)))
stream = BlockFetchServer a (Point blk) m ()
senderSide
where
senderSide :: BlockFetchServer a (Point blk) m ()
senderSide :: BlockFetchServer a (Point blk) m ()
senderSide = (ChainRange (Point blk)
-> m (BlockFetchBlockSender a (Point blk) m ()))
-> () -> BlockFetchServer a (Point blk) m ()
forall point (m :: * -> *) block a.
(ChainRange point -> m (BlockFetchBlockSender block point m a))
-> a -> BlockFetchServer block point m a
BlockFetchServer ChainRange (Point blk)
-> m (BlockFetchBlockSender a (Point blk) m ())
receiveReq' ()
receiveReq' ::
ChainRange (Point blk) ->
m (BlockFetchBlockSender a (Point blk) m ())
receiveReq' :: ChainRange (Point blk)
-> m (BlockFetchBlockSender a (Point blk) m ())
receiveReq' (ChainRange Point blk
start Point blk
end) =
case (Point blk
start, Point blk
end) of
(BlockPoint SlotNo
s HeaderHash blk
h, BlockPoint SlotNo
s' HeaderHash blk
h') ->
RealPoint blk
-> RealPoint blk -> m (BlockFetchBlockSender a (Point blk) m ())
receiveReq (SlotNo -> HeaderHash blk -> RealPoint blk
forall blk. SlotNo -> HeaderHash blk -> RealPoint blk
RealPoint SlotNo
s HeaderHash blk
h) (SlotNo -> HeaderHash blk -> RealPoint blk
forall blk. SlotNo -> HeaderHash blk -> RealPoint blk
RealPoint SlotNo
s' HeaderHash blk
h')
(Point blk, Point blk)
_otherwise ->
BlockFetchServerException
-> m (BlockFetchBlockSender a (Point blk) m ())
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO BlockFetchServerException
NoGenesisBlock
receiveReq ::
RealPoint blk ->
RealPoint blk ->
m (BlockFetchBlockSender a (Point blk) m ())
receiveReq :: RealPoint blk
-> RealPoint blk -> m (BlockFetchBlockSender a (Point blk) m ())
receiveReq RealPoint blk
start RealPoint blk
end = do
errIt <-
StreamFrom blk
-> StreamTo blk
-> m (Either (UnknownRange blk) (Iterator m blk (WithPoint blk a)))
stream
(RealPoint blk -> StreamFrom blk
forall blk. RealPoint blk -> StreamFrom blk
ChainDB.StreamFromInclusive RealPoint blk
start)
(RealPoint blk -> StreamTo blk
forall blk. RealPoint blk -> StreamTo blk
ChainDB.StreamToInclusive RealPoint blk
end)
return $ case errIt of
Left UnknownRange blk
_ -> m (BlockFetchServer a (Point blk) m ())
-> BlockFetchBlockSender a (Point blk) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchBlockSender block point m a
SendMsgNoBlocks (m (BlockFetchServer a (Point blk) m ())
-> BlockFetchBlockSender a (Point blk) m ())
-> m (BlockFetchServer a (Point blk) m ())
-> BlockFetchBlockSender a (Point blk) m ()
forall a b. (a -> b) -> a -> b
$ BlockFetchServer a (Point blk) m ()
-> m (BlockFetchServer a (Point blk) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return BlockFetchServer a (Point blk) m ()
senderSide
Right Iterator m blk (WithPoint blk a)
it -> m (BlockFetchSendBlocks a (Point blk) m ())
-> BlockFetchBlockSender a (Point blk) m ()
forall (m :: * -> *) block point a.
m (BlockFetchSendBlocks block point m a)
-> BlockFetchBlockSender block point m a
SendMsgStartBatch (m (BlockFetchSendBlocks a (Point blk) m ())
-> BlockFetchBlockSender a (Point blk) m ())
-> m (BlockFetchSendBlocks a (Point blk) m ())
-> BlockFetchBlockSender a (Point blk) m ()
forall a b. (a -> b) -> a -> b
$ Iterator m blk (WithPoint blk a)
-> m (BlockFetchSendBlocks a (Point blk) m ())
sendBlocks Iterator m blk (WithPoint blk a)
it
sendBlocks ::
ChainDB.Iterator m blk (WithPoint blk a) ->
m (BlockFetchSendBlocks a (Point blk) m ())
sendBlocks :: Iterator m blk (WithPoint blk a)
-> m (BlockFetchSendBlocks a (Point blk) m ())
sendBlocks Iterator m blk (WithPoint blk a)
it = do
next <- Iterator m blk (WithPoint blk a)
-> m (IteratorResult blk (WithPoint blk a))
forall (m :: * -> *) blk b.
Iterator m blk b -> m (IteratorResult blk b)
ChainDB.iteratorNext Iterator m blk (WithPoint blk a)
it
case next of
IteratorResult WithPoint blk a
blk -> do
Tracer m (TraceBlockFetchServerEvent blk)
-> TraceBlockFetchServerEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceBlockFetchServerEvent blk)
tracer (TraceBlockFetchServerEvent blk -> m ())
-> TraceBlockFetchServerEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ Point blk -> TraceBlockFetchServerEvent blk
forall blk. Point blk -> TraceBlockFetchServerEvent blk
TraceBlockFetchServerSendBlock (Point blk -> TraceBlockFetchServerEvent blk)
-> Point blk -> TraceBlockFetchServerEvent blk
forall a b. (a -> b) -> a -> b
$ WithPoint blk a -> Point blk
forall blk b. WithPoint blk b -> Point blk
point WithPoint blk a
blk
BlockFetchSendBlocks a (Point blk) m ()
-> m (BlockFetchSendBlocks a (Point blk) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks a (Point blk) m ()
-> m (BlockFetchSendBlocks a (Point blk) m ()))
-> BlockFetchSendBlocks a (Point blk) m ()
-> m (BlockFetchSendBlocks a (Point blk) m ())
forall a b. (a -> b) -> a -> b
$ a
-> m (BlockFetchSendBlocks a (Point blk) m ())
-> BlockFetchSendBlocks a (Point blk) m ()
forall block (m :: * -> *) point a.
block
-> m (BlockFetchSendBlocks block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBlock (WithPoint blk a -> a
forall blk b. WithPoint blk b -> b
withoutPoint WithPoint blk a
blk) (Iterator m blk (WithPoint blk a)
-> m (BlockFetchSendBlocks a (Point blk) m ())
sendBlocks Iterator m blk (WithPoint blk a)
it)
IteratorResult blk (WithPoint blk a)
IteratorExhausted -> do
Iterator m blk (WithPoint blk a) -> m ()
forall (m :: * -> *) blk b. Iterator m blk b -> m ()
ChainDB.iteratorClose Iterator m blk (WithPoint blk a)
it
BlockFetchSendBlocks a (Point blk) m ()
-> m (BlockFetchSendBlocks a (Point blk) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks a (Point blk) m ()
-> m (BlockFetchSendBlocks a (Point blk) m ()))
-> BlockFetchSendBlocks a (Point blk) m ()
-> m (BlockFetchSendBlocks a (Point blk) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchServer a (Point blk) m ())
-> BlockFetchSendBlocks a (Point blk) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBatchDone (m (BlockFetchServer a (Point blk) m ())
-> BlockFetchSendBlocks a (Point blk) m ())
-> m (BlockFetchServer a (Point blk) m ())
-> BlockFetchSendBlocks a (Point blk) m ()
forall a b. (a -> b) -> a -> b
$ BlockFetchServer a (Point blk) m ()
-> m (BlockFetchServer a (Point blk) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return BlockFetchServer a (Point blk) m ()
senderSide
IteratorBlockGCed RealPoint blk
pt -> do
Iterator m blk (WithPoint blk a) -> m ()
forall (m :: * -> *) blk b. Iterator m blk b -> m ()
ChainDB.iteratorClose Iterator m blk (WithPoint blk a)
it
BlockFetchServerException
-> m (BlockFetchSendBlocks a (Point blk) m ())
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (BlockFetchServerException
-> m (BlockFetchSendBlocks a (Point blk) m ()))
-> BlockFetchServerException
-> m (BlockFetchSendBlocks a (Point blk) m ())
forall a b. (a -> b) -> a -> b
$ forall blk.
(Typeable blk, StandardHash blk) =>
RealPoint blk -> BlockFetchServerException
BlockGCed @blk RealPoint blk
pt
data TraceBlockFetchServerEvent blk
=
TraceBlockFetchServerSendBlock !(Point blk)
deriving (TraceBlockFetchServerEvent blk
-> TraceBlockFetchServerEvent blk -> Bool
(TraceBlockFetchServerEvent blk
-> TraceBlockFetchServerEvent blk -> Bool)
-> (TraceBlockFetchServerEvent blk
-> TraceBlockFetchServerEvent blk -> Bool)
-> Eq (TraceBlockFetchServerEvent blk)
forall blk.
StandardHash blk =>
TraceBlockFetchServerEvent blk
-> TraceBlockFetchServerEvent blk -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall blk.
StandardHash blk =>
TraceBlockFetchServerEvent blk
-> TraceBlockFetchServerEvent blk -> Bool
== :: TraceBlockFetchServerEvent blk
-> TraceBlockFetchServerEvent blk -> Bool
$c/= :: forall blk.
StandardHash blk =>
TraceBlockFetchServerEvent blk
-> TraceBlockFetchServerEvent blk -> Bool
/= :: TraceBlockFetchServerEvent blk
-> TraceBlockFetchServerEvent blk -> Bool
Eq, Int -> TraceBlockFetchServerEvent blk -> ShowS
[TraceBlockFetchServerEvent blk] -> ShowS
TraceBlockFetchServerEvent blk -> String
(Int -> TraceBlockFetchServerEvent blk -> ShowS)
-> (TraceBlockFetchServerEvent blk -> String)
-> ([TraceBlockFetchServerEvent blk] -> ShowS)
-> Show (TraceBlockFetchServerEvent blk)
forall blk.
StandardHash blk =>
Int -> TraceBlockFetchServerEvent blk -> ShowS
forall blk.
StandardHash blk =>
[TraceBlockFetchServerEvent blk] -> ShowS
forall blk.
StandardHash blk =>
TraceBlockFetchServerEvent blk -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall blk.
StandardHash blk =>
Int -> TraceBlockFetchServerEvent blk -> ShowS
showsPrec :: Int -> TraceBlockFetchServerEvent blk -> ShowS
$cshow :: forall blk.
StandardHash blk =>
TraceBlockFetchServerEvent blk -> String
show :: TraceBlockFetchServerEvent blk -> String
$cshowList :: forall blk.
StandardHash blk =>
[TraceBlockFetchServerEvent blk] -> ShowS
showList :: [TraceBlockFetchServerEvent blk] -> ShowS
Show)