{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}
module Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server (localTxMonitorServer) where
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Mempool
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Network.Protocol.LocalTxMonitor.Server
import Ouroboros.Network.Protocol.LocalTxMonitor.Type
localTxMonitorServer ::
forall blk m.
( MonadSTM m
, LedgerSupportsMempool blk
)
=> Mempool m blk
-> LocalTxMonitorServer (GenTxId blk) (GenTx blk) SlotNo m ()
localTxMonitorServer :: forall blk (m :: * -> *).
(MonadSTM m, LedgerSupportsMempool blk) =>
Mempool m blk
-> LocalTxMonitorServer (GenTxId blk) (GenTx blk) SlotNo m ()
localTxMonitorServer Mempool m blk
mempool =
m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
-> LocalTxMonitorServer (GenTxId blk) (GenTx blk) SlotNo m ()
forall txid tx slot (m :: * -> *) a.
m (ServerStIdle txid tx slot m a)
-> LocalTxMonitorServer txid tx slot m a
LocalTxMonitorServer (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle)
where
serverStIdle
:: ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle :: ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle =
ServerStIdle
{ recvMsgDone :: m ()
recvMsgDone = do
() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, recvMsgAcquire :: m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgAcquire = do
(ByteSize32, MempoolSnapshot blk)
s <- STM m (ByteSize32, MempoolSnapshot blk)
-> m (ByteSize32, MempoolSnapshot blk)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (ByteSize32, MempoolSnapshot blk)
-> m (ByteSize32, MempoolSnapshot blk))
-> STM m (ByteSize32, MempoolSnapshot blk)
-> m (ByteSize32, MempoolSnapshot blk)
forall a b. (a -> b) -> a -> b
$
(,)
(ByteSize32
-> MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
-> STM m ByteSize32
-> STM m (MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (TxMeasure blk -> ByteSize32
forall a. HasByteSize a => a -> ByteSize32
txMeasureByteSize (TxMeasure blk -> ByteSize32)
-> STM m (TxMeasure blk) -> STM m ByteSize32
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Mempool m blk -> STM m (TxMeasure blk)
forall (m :: * -> *) blk. Mempool m blk -> STM m (TxMeasure blk)
getCapacity Mempool m blk
mempool)
STM m (MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
-> STM m (MempoolSnapshot blk)
-> STM m (ByteSize32, MempoolSnapshot blk)
forall a b. STM m (a -> b) -> STM m a -> STM m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Mempool m blk -> STM m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
Mempool m blk -> STM m (MempoolSnapshot blk)
getSnapshot Mempool m blk
mempool
ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ (ByteSize32, MempoolSnapshot blk)
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquiring (ByteSize32, MempoolSnapshot blk)
s
}
serverStAcquiring
:: (ByteSize32, MempoolSnapshot blk)
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquiring :: (ByteSize32, MempoolSnapshot blk)
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquiring s :: (ByteSize32, MempoolSnapshot blk)
s@(ByteSize32
_, MempoolSnapshot blk
snapshot) =
SlotNo
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
forall slot txid tx (m :: * -> *) a.
slot
-> ServerStAcquired txid tx slot m a
-> ServerStAcquiring txid tx slot m a
SendMsgAcquired (MempoolSnapshot blk -> SlotNo
forall blk. MempoolSnapshot blk -> SlotNo
snapshotSlotNo MempoolSnapshot blk
snapshot) ((ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (ByteSize32, MempoolSnapshot blk)
s (MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
forall blk.
MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
snapshotTxs MempoolSnapshot blk
snapshot))
serverStAcquired
:: (ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired :: forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired s :: (ByteSize32, MempoolSnapshot blk)
s@(ByteSize32
capacity, MempoolSnapshot blk
snapshot) [(Validated (GenTx blk), idx, ByteSize32)]
txs =
ServerStAcquired
{ recvMsgNextTx :: m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgNextTx =
case [(Validated (GenTx blk), idx, ByteSize32)]
txs of
[] ->
ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ Maybe (GenTx blk)
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
forall tx txid slot (m :: * -> *) a.
Maybe tx
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'NextTx txid tx slot m a
SendMsgReplyNextTx Maybe (GenTx blk)
forall a. Maybe a
Nothing ((ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), Any, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (ByteSize32, MempoolSnapshot blk)
s [])
(Validated (GenTx blk) -> GenTx blk
forall blk.
LedgerSupportsMempool blk =>
Validated (GenTx blk) -> GenTx blk
txForgetValidated -> GenTx blk
h, idx
_tno, ByteSize32
_byteSize):[(Validated (GenTx blk), idx, ByteSize32)]
q ->
ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ Maybe (GenTx blk)
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
forall tx txid slot (m :: * -> *) a.
Maybe tx
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'NextTx txid tx slot m a
SendMsgReplyNextTx (GenTx blk -> Maybe (GenTx blk)
forall a. a -> Maybe a
Just GenTx blk
h) ((ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (ByteSize32, MempoolSnapshot blk)
s [(Validated (GenTx blk), idx, ByteSize32)]
q)
, recvMsgHasTx :: GenTxId blk
-> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgHasTx = \GenTxId blk
txid ->
ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ Bool
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
forall txid tx slot (m :: * -> *) a.
Bool
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'HasTx txid tx slot m a
SendMsgReplyHasTx (MempoolSnapshot blk -> GenTxId blk -> Bool
forall blk. MempoolSnapshot blk -> GenTxId blk -> Bool
snapshotHasTx MempoolSnapshot blk
snapshot GenTxId blk
txid) ((ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (ByteSize32, MempoolSnapshot blk)
s [(Validated (GenTx blk), idx, ByteSize32)]
txs)
, recvMsgGetSizes :: m (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgGetSizes = do
let MempoolSize{Word32
msNumTxs :: Word32
msNumTxs :: MempoolSize -> Word32
msNumTxs,ByteSize32
msNumBytes :: ByteSize32
msNumBytes :: MempoolSize -> ByteSize32
msNumBytes} = MempoolSnapshot blk -> MempoolSize
forall blk. MempoolSnapshot blk -> MempoolSize
snapshotMempoolSize MempoolSnapshot blk
snapshot
let sizes :: MempoolSizeAndCapacity
sizes = MempoolSizeAndCapacity
{ capacityInBytes :: Word32
capacityInBytes = ByteSize32 -> Word32
unByteSize32 ByteSize32
capacity
, sizeInBytes :: Word32
sizeInBytes = ByteSize32 -> Word32
unByteSize32 ByteSize32
msNumBytes
, numberOfTxs :: Word32
numberOfTxs = Word32
msNumTxs
}
ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy
'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ MempoolSizeAndCapacity
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
forall txid tx slot (m :: * -> *) a.
MempoolSizeAndCapacity
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'GetSizes txid tx slot m a
SendMsgReplyGetSizes MempoolSizeAndCapacity
sizes ((ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (ByteSize32, MempoolSnapshot blk)
s [(Validated (GenTx blk), idx, ByteSize32)]
txs)
, recvMsgAwaitAcquire :: m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgAwaitAcquire = do
(ByteSize32, MempoolSnapshot blk)
s' <- STM m (ByteSize32, MempoolSnapshot blk)
-> m (ByteSize32, MempoolSnapshot blk)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (ByteSize32, MempoolSnapshot blk)
-> m (ByteSize32, MempoolSnapshot blk))
-> STM m (ByteSize32, MempoolSnapshot blk)
-> m (ByteSize32, MempoolSnapshot blk)
forall a b. (a -> b) -> a -> b
$ do
s' :: (ByteSize32, MempoolSnapshot blk)
s'@(ByteSize32
_, MempoolSnapshot blk
snapshot') <-
(,)
(ByteSize32
-> MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
-> STM m ByteSize32
-> STM m (MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (TxMeasure blk -> ByteSize32
forall a. HasByteSize a => a -> ByteSize32
txMeasureByteSize (TxMeasure blk -> ByteSize32)
-> STM m (TxMeasure blk) -> STM m ByteSize32
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Mempool m blk -> STM m (TxMeasure blk)
forall (m :: * -> *) blk. Mempool m blk -> STM m (TxMeasure blk)
getCapacity Mempool m blk
mempool)
STM m (MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
-> STM m (MempoolSnapshot blk)
-> STM m (ByteSize32, MempoolSnapshot blk)
forall a b. STM m (a -> b) -> STM m a -> STM m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Mempool m blk -> STM m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
Mempool m blk -> STM m (MempoolSnapshot blk)
getSnapshot Mempool m blk
mempool
(ByteSize32, MempoolSnapshot blk)
s' (ByteSize32, MempoolSnapshot blk)
-> STM m () -> STM m (ByteSize32, MempoolSnapshot blk)
forall a b. a -> STM m b -> STM m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> Bool
not (MempoolSnapshot blk
snapshot MempoolSnapshot blk -> MempoolSnapshot blk -> Bool
`isSameSnapshot` MempoolSnapshot blk
snapshot'))
ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ (ByteSize32, MempoolSnapshot blk)
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquiring (ByteSize32, MempoolSnapshot blk)
s'
, recvMsgRelease :: m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgRelease =
ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle
}
isSameSnapshot
:: MempoolSnapshot blk
-> MempoolSnapshot blk
-> Bool
isSameSnapshot :: MempoolSnapshot blk -> MempoolSnapshot blk -> Bool
isSameSnapshot MempoolSnapshot blk
a MempoolSnapshot blk
b =
((Validated (GenTx blk), TicketNo, ByteSize32) -> TicketNo
forall {a} {c}. (a, TicketNo, c) -> TicketNo
tno ((Validated (GenTx blk), TicketNo, ByteSize32) -> TicketNo)
-> [(Validated (GenTx blk), TicketNo, ByteSize32)] -> [TicketNo]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
forall blk.
MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
snapshotTxs MempoolSnapshot blk
a) [TicketNo] -> [TicketNo] -> Bool
forall a. Eq a => a -> a -> Bool
== ((Validated (GenTx blk), TicketNo, ByteSize32) -> TicketNo
forall {a} {c}. (a, TicketNo, c) -> TicketNo
tno ((Validated (GenTx blk), TicketNo, ByteSize32) -> TicketNo)
-> [(Validated (GenTx blk), TicketNo, ByteSize32)] -> [TicketNo]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
forall blk.
MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
snapshotTxs MempoolSnapshot blk
b)
Bool -> Bool -> Bool
&&
MempoolSnapshot blk -> SlotNo
forall blk. MempoolSnapshot blk -> SlotNo
snapshotSlotNo MempoolSnapshot blk
a SlotNo -> SlotNo -> Bool
forall a. Eq a => a -> a -> Bool
== MempoolSnapshot blk -> SlotNo
forall blk. MempoolSnapshot blk -> SlotNo
snapshotSlotNo MempoolSnapshot blk
b
tno :: (a, TicketNo, c) -> TicketNo
tno (a
_a, TicketNo
b, c
_c) = TicketNo
b :: TicketNo