{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Cardano.Tools.ImmDBServer.Diffusion (run) where
import Cardano.Tools.ImmDBServer.MiniProtocols (immDBServer)
import Control.ResourceRegistry
import Control.Tracer
import qualified Data.ByteString.Lazy as BL
import Data.Functor.Contravariant ((>$<))
import Data.Void (Void)
import qualified Network.Mux as Mux
import Network.Socket (SockAddr (..))
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.Config.SupportsNode
import Ouroboros.Consensus.Node.InitStorage
( NodeInitStorage (nodeCheckIntegrity, nodeImmutableDbChunkInfo)
)
import Ouroboros.Consensus.Node.NetworkProtocolVersion
import Ouroboros.Consensus.Node.Run (SerialiseNodeToNodeConstraints)
import Ouroboros.Consensus.Storage.ImmutableDB (ImmutableDbArgs (..))
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import Ouroboros.Consensus.Util
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Network.IOManager (withIOManager)
import Ouroboros.Network.Mux
import qualified Ouroboros.Network.NodeToNode as N2N
import Ouroboros.Network.PeerSelection.PeerSharing.Codec
( decodeRemoteAddress
, encodeRemoteAddress
)
import Ouroboros.Network.Protocol.Handshake (HandshakeArguments (..))
import qualified Ouroboros.Network.Protocol.Handshake as Handshake
import qualified Ouroboros.Network.Server.Simple as Server
import qualified Ouroboros.Network.Snocket as Snocket
import Ouroboros.Network.Socket (SomeResponderApplication (..), configureSocket)
import System.FS.API (SomeHasFS (..))
import System.FS.API.Types (MountPoint (MountPoint))
import System.FS.IO (ioHasFS)
serve ::
SockAddr ->
N2N.Versions
N2N.NodeToNodeVersion
N2N.NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx 'Mux.ResponderMode SockAddr BL.ByteString IO Void ()) ->
IO Void
serve :: SockAddr
-> Versions
NodeToNodeVersion
NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx
'ResponderMode SockAddr ByteString IO Void ())
-> IO Void
serve SockAddr
sockAddr Versions
NodeToNodeVersion
NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx
'ResponderMode SockAddr ByteString IO Void ())
application = (IOManager -> IO Void) -> IO Void
WithIOManager
withIOManager \IOManager
iocp ->
Snocket IO Socket SockAddr
-> MakeBearer IO Socket
-> (Socket -> SockAddr -> IO ())
-> SockAddr
-> HandshakeArguments
(ConnectionId SockAddr) NodeToNodeVersion NodeToNodeVersionData IO
-> Versions
NodeToNodeVersion
NodeToNodeVersionData
(SomeResponderApplication SockAddr ByteString IO ())
-> (SockAddr -> Async IO Void -> IO Void)
-> IO Void
forall fd addr vNumber vData (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadDelay m, MonadFork m,
MonadLabelledSTM m, MonadMask m, MonadTimer m, MonadThrow (STM m),
Ord vNumber, Typeable vNumber, Show vNumber) =>
Snocket m fd addr
-> MakeBearer m fd
-> (fd -> addr -> m ())
-> addr
-> HandshakeArguments (ConnectionId addr) vNumber vData m
-> Versions
vNumber vData (SomeResponderApplication addr ByteString m b)
-> (addr -> Async m Void -> m a)
-> m a
Server.with
(IOManager -> Snocket IO Socket SockAddr
Snocket.socketSnocket IOManager
iocp)
MakeBearer IO Socket
Snocket.makeSocketBearer
(\Socket
sock SockAddr
addr -> Socket -> Maybe SockAddr -> IO ()
configureSocket Socket
sock (SockAddr -> Maybe SockAddr
forall a. a -> Maybe a
Just SockAddr
addr))
SockAddr
sockAddr
HandshakeArguments
{ haHandshakeTracer :: Tracer
IO
(WithBearer
(ConnectionId SockAddr)
(TraceSendRecv (Handshake NodeToNodeVersion Term)))
haHandshakeTracer = WithBearer
(ConnectionId SockAddr)
(TraceSendRecv (Handshake NodeToNodeVersion Term))
-> String
forall a. Show a => a -> String
show (WithBearer
(ConnectionId SockAddr)
(TraceSendRecv (Handshake NodeToNodeVersion Term))
-> String)
-> Tracer IO String
-> Tracer
IO
(WithBearer
(ConnectionId SockAddr)
(TraceSendRecv (Handshake NodeToNodeVersion Term)))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer IO String
forall (m :: * -> *). MonadIO m => Tracer m String
stdoutTracer
, haBearerTracer :: Tracer IO (WithBearer (ConnectionId SockAddr) BearerTrace)
haBearerTracer = WithBearer (ConnectionId SockAddr) BearerTrace -> String
forall a. Show a => a -> String
show (WithBearer (ConnectionId SockAddr) BearerTrace -> String)
-> Tracer IO String
-> Tracer IO (WithBearer (ConnectionId SockAddr) BearerTrace)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer IO String
forall (m :: * -> *). MonadIO m => Tracer m String
stdoutTracer
, haHandshakeCodec :: Codec
(Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
haHandshakeCodec = Codec
(Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
(Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
Handshake.nodeToNodeHandshakeCodec
, haVersionDataCodec :: VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
haVersionDataCodec = (NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
Handshake.cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
N2N.nodeToNodeCodecCBORTerm
, haAcceptVersion :: NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
haAcceptVersion = NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
Handshake.acceptableVersion
, haQueryVersion :: NodeToNodeVersionData -> Bool
haQueryVersion = NodeToNodeVersionData -> Bool
forall v. Queryable v => v -> Bool
Handshake.queryVersion
, haTimeLimits :: ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
haTimeLimits = ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
Handshake.timeLimitsHandshake
}
(OuroborosApplicationWithMinimalCtx
'ResponderMode SockAddr ByteString IO Void ()
-> SomeResponderApplication SockAddr ByteString IO ()
forall (muxMode :: Mode) addr bytes (m :: * -> *) a b.
(HasResponder muxMode ~ 'True) =>
OuroborosApplicationWithMinimalCtx muxMode addr bytes m a b
-> SomeResponderApplication addr bytes m b
SomeResponderApplication (OuroborosApplicationWithMinimalCtx
'ResponderMode SockAddr ByteString IO Void ()
-> SomeResponderApplication SockAddr ByteString IO ())
-> Versions
NodeToNodeVersion
NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx
'ResponderMode SockAddr ByteString IO Void ())
-> Versions
NodeToNodeVersion
NodeToNodeVersionData
(SomeResponderApplication SockAddr ByteString IO ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Versions
NodeToNodeVersion
NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx
'ResponderMode SockAddr ByteString IO Void ())
application)
(\SockAddr
_ Async IO Void
serverAsync -> Async IO Void -> IO Void
forall a. Async IO a -> IO a
forall (m :: * -> *) a. MonadAsync m => Async m a -> m a
wait Async IO Void
serverAsync)
run ::
forall blk.
( GetPrevHash blk
, ShowProxy blk
, SupportedNetworkProtocolVersion blk
, SerialiseNodeToNodeConstraints blk
, ImmutableDB.ImmutableDbSerialiseConstraints blk
, NodeInitStorage blk
, ConfigSupportsNode blk
) =>
FilePath ->
SockAddr ->
TopLevelConfig blk ->
IO Void
run :: forall blk.
(GetPrevHash blk, ShowProxy blk,
SupportedNetworkProtocolVersion blk,
SerialiseNodeToNodeConstraints blk,
ImmutableDbSerialiseConstraints blk, NodeInitStorage blk,
ConfigSupportsNode blk) =>
String -> SockAddr -> TopLevelConfig blk -> IO Void
run String
immDBDir SockAddr
sockAddr TopLevelConfig blk
cfg = (ResourceRegistry IO -> IO Void) -> IO Void
forall (m :: * -> *) a.
(MonadSTM m, MonadMask m, MonadThread m, HasCallStack) =>
(ResourceRegistry m -> m a) -> m a
withRegistry \ResourceRegistry IO
registry ->
IO (ImmutableDB IO blk)
-> (ImmutableDB IO blk -> IO Void) -> IO Void
forall (m :: * -> *) blk a.
(HasCallStack, MonadThrow m) =>
m (ImmutableDB m blk) -> (ImmutableDB m blk -> m a) -> m a
ImmutableDB.withDB
(Complete ImmutableDbArgs IO blk
-> (forall st.
WithTempRegistry st IO (ImmutableDB IO blk, st)
-> IO (ImmutableDB IO blk))
-> IO (ImmutableDB IO blk)
forall (m :: * -> *) blk ans.
(IOLike m, GetPrevHash blk, ConvertRawHash blk,
ImmutableDbSerialiseConstraints blk, HasCallStack) =>
Complete ImmutableDbArgs m blk
-> (forall st.
WithTempRegistry st m (ImmutableDB m blk, st) -> ans)
-> ans
ImmutableDB.openDB (HKD Identity (ResourceRegistry IO)
-> Complete ImmutableDbArgs IO blk
forall {f :: * -> *} {blk} {m :: * -> *} {m :: * -> *}.
(HKD f (blk -> Bool) ~ (blk -> Bool),
HKD f (SomeHasFS m) ~ SomeHasFS m,
HKD f (CodecConfig blk) ~ CodecConfig blk,
HKD f ChunkInfo ~ ChunkInfo, PrimState m ~ RealWorld, MonadIO m,
Applicative m) =>
HKD f (ResourceRegistry m) -> ImmutableDbArgs f m blk
immDBArgs HKD Identity (ResourceRegistry IO)
ResourceRegistry IO
registry) WithTempRegistry st IO (ImmutableDB IO blk, st)
-> IO (ImmutableDB IO blk)
forall st.
WithTempRegistry st IO (ImmutableDB IO blk, st)
-> IO (ImmutableDB IO blk)
forall (m :: * -> *) st a.
(MonadSTM m, MonadMask m, MonadThread m, HasCallStack) =>
WithTempRegistry st m (a, st) -> m a
runWithTempRegistry)
\ImmutableDB IO blk
immDB ->
SockAddr
-> Versions
NodeToNodeVersion
NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx
'ResponderMode SockAddr ByteString IO Void ())
-> IO Void
serve SockAddr
sockAddr (Versions
NodeToNodeVersion
NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx
'ResponderMode SockAddr ByteString IO Void ())
-> IO Void)
-> Versions
NodeToNodeVersion
NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx
'ResponderMode SockAddr ByteString IO Void ())
-> IO Void
forall a b. (a -> b) -> a -> b
$
CodecConfig blk
-> (NodeToNodeVersion -> SockAddr -> Encoding)
-> (NodeToNodeVersion -> forall s. Decoder s SockAddr)
-> ImmutableDB IO blk
-> NetworkMagic
-> Versions
NodeToNodeVersion
NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx
'ResponderMode SockAddr ByteString IO Void ())
forall (m :: * -> *) blk addr.
(IOLike m, HasHeader blk, ShowProxy blk,
SerialiseNodeToNodeConstraints blk,
SupportedNetworkProtocolVersion blk) =>
CodecConfig blk
-> (NodeToNodeVersion -> addr -> Encoding)
-> (NodeToNodeVersion -> forall s. Decoder s addr)
-> ImmutableDB m blk
-> NetworkMagic
-> Versions
NodeToNodeVersion
NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx
'ResponderMode addr ByteString m Void ())
immDBServer
CodecConfig blk
codecCfg
NodeToNodeVersion -> SockAddr -> Encoding
encodeRemoteAddress
NodeToNodeVersion -> Decoder s SockAddr
NodeToNodeVersion -> forall s. Decoder s SockAddr
forall s. NodeToNodeVersion -> Decoder s SockAddr
decodeRemoteAddress
ImmutableDB IO blk
immDB
NetworkMagic
networkMagic
where
immDBArgs :: HKD f (ResourceRegistry m) -> ImmutableDbArgs f m blk
immDBArgs HKD f (ResourceRegistry m)
registry =
Incomplete ImmutableDbArgs m blk
forall (m :: * -> *) blk.
Applicative m =>
Incomplete ImmutableDbArgs m blk
ImmutableDB.defaultArgs
{ immCheckIntegrity = nodeCheckIntegrity storageCfg
, immChunkInfo = nodeImmutableDbChunkInfo storageCfg
, immCodecConfig = codecCfg
, immRegistry = registry
, immHasFS = SomeHasFS $ ioHasFS $ MountPoint immDBDir
}
codecCfg :: CodecConfig blk
codecCfg = TopLevelConfig blk -> CodecConfig blk
forall blk. TopLevelConfig blk -> CodecConfig blk
configCodec TopLevelConfig blk
cfg
storageCfg :: StorageConfig blk
storageCfg = TopLevelConfig blk -> StorageConfig blk
forall blk. TopLevelConfig blk -> StorageConfig blk
configStorage TopLevelConfig blk
cfg
networkMagic :: NetworkMagic
networkMagic = BlockConfig blk -> NetworkMagic
forall blk.
ConfigSupportsNode blk =>
BlockConfig blk -> NetworkMagic
getNetworkMagic (BlockConfig blk -> NetworkMagic)
-> (TopLevelConfig blk -> BlockConfig blk)
-> TopLevelConfig blk
-> NetworkMagic
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TopLevelConfig blk -> BlockConfig blk
forall blk. TopLevelConfig blk -> BlockConfig blk
configBlock (TopLevelConfig blk -> NetworkMagic)
-> TopLevelConfig blk -> NetworkMagic
forall a b. (a -> b) -> a -> b
$ TopLevelConfig blk
cfg