{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}

module Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server (localTxMonitorServer) where

import           Ouroboros.Consensus.Block
import           Ouroboros.Consensus.Ledger.SupportsMempool
import           Ouroboros.Consensus.Mempool
import           Ouroboros.Consensus.Util.IOLike
import           Ouroboros.Network.Protocol.LocalTxMonitor.Server
import           Ouroboros.Network.Protocol.LocalTxMonitor.Type

-- | Local transaction monitoring server, for inspecting the mempool.
--
localTxMonitorServer ::
     forall blk m.
     ( MonadSTM m
     , LedgerSupportsMempool blk
     )
  => Mempool m blk
  -> LocalTxMonitorServer (GenTxId blk) (GenTx blk) SlotNo m ()
localTxMonitorServer :: forall blk (m :: * -> *).
(MonadSTM m, LedgerSupportsMempool blk) =>
Mempool m blk
-> LocalTxMonitorServer (GenTxId blk) (GenTx blk) SlotNo m ()
localTxMonitorServer Mempool m blk
mempool =
    m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
-> LocalTxMonitorServer (GenTxId blk) (GenTx blk) SlotNo m ()
forall txid tx slot (m :: * -> *) a.
m (ServerStIdle txid tx slot m a)
-> LocalTxMonitorServer txid tx slot m a
LocalTxMonitorServer (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle)
  where
    serverStIdle
      :: ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
    serverStIdle :: ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle =
      ServerStIdle
      { recvMsgDone :: m ()
recvMsgDone = do
          () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      , recvMsgAcquire :: m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgAcquire = do
          (ByteSize32, MempoolSnapshot blk)
s <- STM m (ByteSize32, MempoolSnapshot blk)
-> m (ByteSize32, MempoolSnapshot blk)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (ByteSize32, MempoolSnapshot blk)
 -> m (ByteSize32, MempoolSnapshot blk))
-> STM m (ByteSize32, MempoolSnapshot blk)
-> m (ByteSize32, MempoolSnapshot blk)
forall a b. (a -> b) -> a -> b
$
                (,)
            (ByteSize32
 -> MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
-> STM m ByteSize32
-> STM m (MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (TxMeasure blk -> ByteSize32
forall a. HasByteSize a => a -> ByteSize32
txMeasureByteSize (TxMeasure blk -> ByteSize32)
-> STM m (TxMeasure blk) -> STM m ByteSize32
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Mempool m blk -> STM m (TxMeasure blk)
forall (m :: * -> *) blk. Mempool m blk -> STM m (TxMeasure blk)
getCapacity Mempool m blk
mempool)
            STM m (MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
-> STM m (MempoolSnapshot blk)
-> STM m (ByteSize32, MempoolSnapshot blk)
forall a b. STM m (a -> b) -> STM m a -> STM m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Mempool m blk -> STM m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
Mempool m blk -> STM m (MempoolSnapshot blk)
getSnapshot Mempool m blk
mempool
          ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
 -> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ (ByteSize32, MempoolSnapshot blk)
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquiring (ByteSize32, MempoolSnapshot blk)
s
      }

    serverStAcquiring
      :: (ByteSize32, MempoolSnapshot blk)
      -> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
    serverStAcquiring :: (ByteSize32, MempoolSnapshot blk)
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquiring s :: (ByteSize32, MempoolSnapshot blk)
s@(ByteSize32
_, MempoolSnapshot blk
snapshot) =
      SlotNo
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
forall slot txid tx (m :: * -> *) a.
slot
-> ServerStAcquired txid tx slot m a
-> ServerStAcquiring txid tx slot m a
SendMsgAcquired (MempoolSnapshot blk -> SlotNo
forall blk. MempoolSnapshot blk -> SlotNo
snapshotSlotNo MempoolSnapshot blk
snapshot) ((ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (ByteSize32, MempoolSnapshot blk)
s (MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
forall blk.
MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
snapshotTxs MempoolSnapshot blk
snapshot))

    serverStAcquired
      :: (ByteSize32, MempoolSnapshot blk)
      -> [(Validated (GenTx blk), idx, ByteSize32)]
      -> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
    serverStAcquired :: forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired s :: (ByteSize32, MempoolSnapshot blk)
s@(ByteSize32
capacity, MempoolSnapshot blk
snapshot) [(Validated (GenTx blk), idx, ByteSize32)]
txs =
      ServerStAcquired
      { recvMsgNextTx :: m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgNextTx =
          case [(Validated (GenTx blk), idx, ByteSize32)]
txs of
            []  ->
              ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
 -> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ Maybe (GenTx blk)
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
forall tx txid slot (m :: * -> *) a.
Maybe tx
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'NextTx txid tx slot m a
SendMsgReplyNextTx Maybe (GenTx blk)
forall a. Maybe a
Nothing ((ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), Any, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (ByteSize32, MempoolSnapshot blk)
s [])
            (Validated (GenTx blk) -> GenTx blk
forall blk.
LedgerSupportsMempool blk =>
Validated (GenTx blk) -> GenTx blk
txForgetValidated -> GenTx blk
h, idx
_tno, ByteSize32
_byteSize):[(Validated (GenTx blk), idx, ByteSize32)]
q ->
              ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
 -> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ Maybe (GenTx blk)
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
forall tx txid slot (m :: * -> *) a.
Maybe tx
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'NextTx txid tx slot m a
SendMsgReplyNextTx (GenTx blk -> Maybe (GenTx blk)
forall a. a -> Maybe a
Just GenTx blk
h) ((ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (ByteSize32, MempoolSnapshot blk)
s [(Validated (GenTx blk), idx, ByteSize32)]
q)
      , recvMsgHasTx :: GenTxId blk
-> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgHasTx = \GenTxId blk
txid ->
          ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
 -> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ Bool
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
forall txid tx slot (m :: * -> *) a.
Bool
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'HasTx txid tx slot m a
SendMsgReplyHasTx (MempoolSnapshot blk -> GenTxId blk -> Bool
forall blk. MempoolSnapshot blk -> GenTxId blk -> Bool
snapshotHasTx MempoolSnapshot blk
snapshot GenTxId blk
txid) ((ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (ByteSize32, MempoolSnapshot blk)
s [(Validated (GenTx blk), idx, ByteSize32)]
txs)
      , recvMsgGetSizes :: m (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgGetSizes = do
          let MempoolSize{Word32
msNumTxs :: Word32
msNumTxs :: MempoolSize -> Word32
msNumTxs,ByteSize32
msNumBytes :: ByteSize32
msNumBytes :: MempoolSize -> ByteSize32
msNumBytes} = MempoolSnapshot blk -> MempoolSize
forall blk. MempoolSnapshot blk -> MempoolSize
snapshotMempoolSize MempoolSnapshot blk
snapshot
          let sizes :: MempoolSizeAndCapacity
sizes = MempoolSizeAndCapacity
                { capacityInBytes :: Word32
capacityInBytes = ByteSize32 -> Word32
unByteSize32 ByteSize32
capacity
                , sizeInBytes :: Word32
sizeInBytes     = ByteSize32 -> Word32
unByteSize32 ByteSize32
msNumBytes
                , numberOfTxs :: Word32
numberOfTxs     = Word32
msNumTxs
                }
          ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
 -> m (ServerStBusy
         'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ MempoolSizeAndCapacity
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
forall txid tx slot (m :: * -> *) a.
MempoolSizeAndCapacity
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'GetSizes txid tx slot m a
SendMsgReplyGetSizes MempoolSizeAndCapacity
sizes ((ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
forall idx.
(ByteSize32, MempoolSnapshot blk)
-> [(Validated (GenTx blk), idx, ByteSize32)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (ByteSize32, MempoolSnapshot blk)
s [(Validated (GenTx blk), idx, ByteSize32)]
txs)
      , recvMsgAwaitAcquire :: m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgAwaitAcquire = do
          (ByteSize32, MempoolSnapshot blk)
s' <- STM m (ByteSize32, MempoolSnapshot blk)
-> m (ByteSize32, MempoolSnapshot blk)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (ByteSize32, MempoolSnapshot blk)
 -> m (ByteSize32, MempoolSnapshot blk))
-> STM m (ByteSize32, MempoolSnapshot blk)
-> m (ByteSize32, MempoolSnapshot blk)
forall a b. (a -> b) -> a -> b
$ do
            s' :: (ByteSize32, MempoolSnapshot blk)
s'@(ByteSize32
_, MempoolSnapshot blk
snapshot') <-
                  (,)
              (ByteSize32
 -> MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
-> STM m ByteSize32
-> STM m (MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (TxMeasure blk -> ByteSize32
forall a. HasByteSize a => a -> ByteSize32
txMeasureByteSize (TxMeasure blk -> ByteSize32)
-> STM m (TxMeasure blk) -> STM m ByteSize32
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Mempool m blk -> STM m (TxMeasure blk)
forall (m :: * -> *) blk. Mempool m blk -> STM m (TxMeasure blk)
getCapacity Mempool m blk
mempool)
              STM m (MempoolSnapshot blk -> (ByteSize32, MempoolSnapshot blk))
-> STM m (MempoolSnapshot blk)
-> STM m (ByteSize32, MempoolSnapshot blk)
forall a b. STM m (a -> b) -> STM m a -> STM m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Mempool m blk -> STM m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
Mempool m blk -> STM m (MempoolSnapshot blk)
getSnapshot Mempool m blk
mempool
            (ByteSize32, MempoolSnapshot blk)
s' (ByteSize32, MempoolSnapshot blk)
-> STM m () -> STM m (ByteSize32, MempoolSnapshot blk)
forall a b. a -> STM m b -> STM m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> Bool
not (MempoolSnapshot blk
snapshot MempoolSnapshot blk -> MempoolSnapshot blk -> Bool
`isSameSnapshot` MempoolSnapshot blk
snapshot'))
          ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
 -> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ (ByteSize32, MempoolSnapshot blk)
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquiring (ByteSize32, MempoolSnapshot blk)
s'
      , recvMsgRelease :: m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgRelease =
          ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle
      }

    -- Are two snapshots equal? (from the perspective of this protocol)
    isSameSnapshot
      :: MempoolSnapshot blk
      -> MempoolSnapshot blk
      -> Bool
    isSameSnapshot :: MempoolSnapshot blk -> MempoolSnapshot blk -> Bool
isSameSnapshot MempoolSnapshot blk
a MempoolSnapshot blk
b =
      ((Validated (GenTx blk), TicketNo, ByteSize32) -> TicketNo
forall {a} {c}. (a, TicketNo, c) -> TicketNo
tno ((Validated (GenTx blk), TicketNo, ByteSize32) -> TicketNo)
-> [(Validated (GenTx blk), TicketNo, ByteSize32)] -> [TicketNo]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
forall blk.
MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
snapshotTxs MempoolSnapshot blk
a) [TicketNo] -> [TicketNo] -> Bool
forall a. Eq a => a -> a -> Bool
== ((Validated (GenTx blk), TicketNo, ByteSize32) -> TicketNo
forall {a} {c}. (a, TicketNo, c) -> TicketNo
tno ((Validated (GenTx blk), TicketNo, ByteSize32) -> TicketNo)
-> [(Validated (GenTx blk), TicketNo, ByteSize32)] -> [TicketNo]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
forall blk.
MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
snapshotTxs MempoolSnapshot blk
b)
      Bool -> Bool -> Bool
&&
      MempoolSnapshot blk -> SlotNo
forall blk. MempoolSnapshot blk -> SlotNo
snapshotSlotNo MempoolSnapshot blk
a SlotNo -> SlotNo -> Bool
forall a. Eq a => a -> a -> Bool
== MempoolSnapshot blk -> SlotNo
forall blk. MempoolSnapshot blk -> SlotNo
snapshotSlotNo MempoolSnapshot blk
b

    tno :: (a, TicketNo, c) -> TicketNo
tno (a
_a, TicketNo
b, c
_c) = TicketNo
b :: TicketNo