{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound
  ( objectDiffusionInbound
  , TraceObjectDiffusionInbound (..)
  , ObjectDiffusionInboundError (..)
  , NumObjectsProcessed (..)
  ) where

import Cardano.Network.NodeToNode.Version (NodeToNodeVersion)
import Cardano.Prelude (catMaybes, (&))
import Control.Concurrent.Class.MonadSTM.Strict.TVar.Checked
import Control.Exception (assert)
import Control.Monad (when)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer, traceWith)
import Data.Data (Typeable)
import Data.Foldable as Foldable (foldl', toList)
import Data.List qualified as List
import Data.List.NonEmpty qualified as NonEmpty
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as Seq
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Word (Word64)
import GHC.Generics (Generic)
import Network.TypedProtocol.Core (N (Z), Nat (..), natToInt)
import NoThunks.Class (NoThunks (..))
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
import Ouroboros.Consensus.Util.NormalForm.Invariant (noThunksInvariant)
import Ouroboros.Network.ControlMessage
import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound
import Ouroboros.Network.Protocol.ObjectDiffusion.Type

-- Note: This module is inspired from TxSubmission inbound side.

newtype NumObjectsProcessed
  = NumObjectsProcessed
  { NumObjectsProcessed -> Word64
getNumObjectsProcessed :: Word64
  }
  deriving (NumObjectsProcessed -> NumObjectsProcessed -> Bool
(NumObjectsProcessed -> NumObjectsProcessed -> Bool)
-> (NumObjectsProcessed -> NumObjectsProcessed -> Bool)
-> Eq NumObjectsProcessed
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: NumObjectsProcessed -> NumObjectsProcessed -> Bool
== :: NumObjectsProcessed -> NumObjectsProcessed -> Bool
$c/= :: NumObjectsProcessed -> NumObjectsProcessed -> Bool
/= :: NumObjectsProcessed -> NumObjectsProcessed -> Bool
Eq, Int -> NumObjectsProcessed -> ShowS
[NumObjectsProcessed] -> ShowS
NumObjectsProcessed -> String
(Int -> NumObjectsProcessed -> ShowS)
-> (NumObjectsProcessed -> String)
-> ([NumObjectsProcessed] -> ShowS)
-> Show NumObjectsProcessed
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NumObjectsProcessed -> ShowS
showsPrec :: Int -> NumObjectsProcessed -> ShowS
$cshow :: NumObjectsProcessed -> String
show :: NumObjectsProcessed -> String
$cshowList :: [NumObjectsProcessed] -> ShowS
showList :: [NumObjectsProcessed] -> ShowS
Show)

data TraceObjectDiffusionInbound objectId object
  = -- | Number of objects just about to be inserted.
    TraceObjectDiffusionInboundCollectedObjects Int
  | -- | Just processed object pass/fail breakdown.
    TraceObjectDiffusionInboundAddedObjects NumObjectsProcessed
  | -- | Received a 'ControlMessage' from the outbound peer governor, and about
    -- to act on it.
    TraceObjectDiffusionInboundRecvControlMessage ControlMessage
  | TraceObjectDiffusionInboundCanRequestMoreObjects Int
  | TraceObjectDiffusionInboundCannotRequestMoreObjects Int
  deriving (TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
(TraceObjectDiffusionInbound objectId object
 -> TraceObjectDiffusionInbound objectId object -> Bool)
-> (TraceObjectDiffusionInbound objectId object
    -> TraceObjectDiffusionInbound objectId object -> Bool)
-> Eq (TraceObjectDiffusionInbound objectId object)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall objectId object.
TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
$c== :: forall objectId object.
TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
== :: TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
$c/= :: forall objectId object.
TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
/= :: TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
Eq, Int -> TraceObjectDiffusionInbound objectId object -> ShowS
[TraceObjectDiffusionInbound objectId object] -> ShowS
TraceObjectDiffusionInbound objectId object -> String
(Int -> TraceObjectDiffusionInbound objectId object -> ShowS)
-> (TraceObjectDiffusionInbound objectId object -> String)
-> ([TraceObjectDiffusionInbound objectId object] -> ShowS)
-> Show (TraceObjectDiffusionInbound objectId object)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object -> ShowS
forall objectId object.
[TraceObjectDiffusionInbound objectId object] -> ShowS
forall objectId object.
TraceObjectDiffusionInbound objectId object -> String
$cshowsPrec :: forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object -> ShowS
showsPrec :: Int -> TraceObjectDiffusionInbound objectId object -> ShowS
$cshow :: forall objectId object.
TraceObjectDiffusionInbound objectId object -> String
show :: TraceObjectDiffusionInbound objectId object -> String
$cshowList :: forall objectId object.
[TraceObjectDiffusionInbound objectId object] -> ShowS
showList :: [TraceObjectDiffusionInbound objectId object] -> ShowS
Show)

data ObjectDiffusionInboundError objectId object
  = ProtocolErrorObjectsDifferentThanRequested (Set objectId) (Set objectId)
  | ProtocolErrorObjectIdsNotRequested Int Int
  | ProtocolErrorObjectIdsAlreadyKnown (Set objectId)
  | ProtocolErrorObjectIdsDuplicate (Map objectId Int)
  deriving Int -> ObjectDiffusionInboundError objectId object -> ShowS
[ObjectDiffusionInboundError objectId object] -> ShowS
ObjectDiffusionInboundError objectId object -> String
(Int -> ObjectDiffusionInboundError objectId object -> ShowS)
-> (ObjectDiffusionInboundError objectId object -> String)
-> ([ObjectDiffusionInboundError objectId object] -> ShowS)
-> Show (ObjectDiffusionInboundError objectId object)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall objectId object.
Show objectId =>
Int -> ObjectDiffusionInboundError objectId object -> ShowS
forall objectId object.
Show objectId =>
[ObjectDiffusionInboundError objectId object] -> ShowS
forall objectId object.
Show objectId =>
ObjectDiffusionInboundError objectId object -> String
$cshowsPrec :: forall objectId object.
Show objectId =>
Int -> ObjectDiffusionInboundError objectId object -> ShowS
showsPrec :: Int -> ObjectDiffusionInboundError objectId object -> ShowS
$cshow :: forall objectId object.
Show objectId =>
ObjectDiffusionInboundError objectId object -> String
show :: ObjectDiffusionInboundError objectId object -> String
$cshowList :: forall objectId object.
Show objectId =>
[ObjectDiffusionInboundError objectId object] -> ShowS
showList :: [ObjectDiffusionInboundError objectId object] -> ShowS
Show

instance
  (Show objectId, Typeable object, Typeable objectId) =>
  Exception (ObjectDiffusionInboundError objectId object)
  where
  displayException :: ObjectDiffusionInboundError objectId object -> String
displayException (ProtocolErrorObjectsDifferentThanRequested Set objectId
reqButNotRecvd Set objectId
recvButNotReq) =
    String
"The peer replied with different objects than those we asked for: "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"requested but didn't receive "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set objectId -> String
forall a. Show a => a -> String
show Set objectId
reqButNotRecvd
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"; received but didn't requested "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set objectId -> String
forall a. Show a => a -> String
show Set objectId
recvButNotReq
  displayException (ProtocolErrorObjectIdsNotRequested Int
expected Int
actual) =
    String
"The peer replied with more objectIds than we asked for: expected "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
expected
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" , received "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
actual
  displayException (ProtocolErrorObjectIdsAlreadyKnown Set objectId
offenders) =
    String
"The peer replied with some objectIds that it has already sent us previously: "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set objectId -> String
forall a. Show a => a -> String
show Set objectId
offenders
  displayException (ProtocolErrorObjectIdsDuplicate Map objectId Int
offenders) =
    String
"The peer replied with a batch of objectIds containing duplicates: "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Map objectId Int -> String
forall a. Show a => a -> String
show Map objectId Int
offenders

-- | Information maintained internally in the 'objectDiffusionInbound'
-- implementation.
data InboundSt objectId object = InboundSt
  { forall objectId object.
InboundSt objectId object -> NumObjectIdsReq
numIdsInFlight :: !NumObjectIdsReq
  -- ^ The number of object identifiers that we have requested but
  -- which have not yet been replied to. We need to track this to keep
  -- our requests within the limit on the 'outstandingFifo' size.
  , forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo :: !(StrictSeq objectId)
  -- ^ This mirrors the queue of objects that the outbound peer has available
  -- for us. Objects are kept in the order in which the outbound peer
  -- advertised them to us. This is the same order in which we submit them to
  -- the objectPool. It is also the order we acknowledge them.
  , forall objectId object. InboundSt objectId object -> Set objectId
canRequestNext :: !(Set objectId)
  -- ^ The objectIds that we can request. These are a subset of the
  -- 'outstandingFifo' that we have not yet requested or not have in the pool
  -- already. This is not ordered to illustrate the fact that we can
  -- request objects out of order.
  , forall objectId object.
InboundSt objectId object -> Map objectId (Maybe object)
pendingObjects :: !(Map objectId (Maybe object))
  -- ^ Objects we have successfully downloaded (or decided intentionally to
  -- skip download) but have not yet added to the objectPool or acknowledged.
  --
  -- Object IDs in this 'Map' are mapped to 'Nothing' if we notice that
  -- they are already in the objectPool. That way we can skip requesting them
  -- from the outbound peer, but still acknowledge them when the time comes.
  , forall objectId object.
InboundSt objectId object -> NumObjectIdsAck
numToAckOnNextReq :: !NumObjectIdsAck
  -- ^ The number of objects we can acknowledge on our next request
  -- for more object IDs. Their corresponding IDs have already been removed
  -- from 'outstandingFifo'.
  }
  deriving stock (Int -> InboundSt objectId object -> ShowS
[InboundSt objectId object] -> ShowS
InboundSt objectId object -> String
(Int -> InboundSt objectId object -> ShowS)
-> (InboundSt objectId object -> String)
-> ([InboundSt objectId object] -> ShowS)
-> Show (InboundSt objectId object)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall objectId object.
(Show objectId, Show object) =>
Int -> InboundSt objectId object -> ShowS
forall objectId object.
(Show objectId, Show object) =>
[InboundSt objectId object] -> ShowS
forall objectId object.
(Show objectId, Show object) =>
InboundSt objectId object -> String
$cshowsPrec :: forall objectId object.
(Show objectId, Show object) =>
Int -> InboundSt objectId object -> ShowS
showsPrec :: Int -> InboundSt objectId object -> ShowS
$cshow :: forall objectId object.
(Show objectId, Show object) =>
InboundSt objectId object -> String
show :: InboundSt objectId object -> String
$cshowList :: forall objectId object.
(Show objectId, Show object) =>
[InboundSt objectId object] -> ShowS
showList :: [InboundSt objectId object] -> ShowS
Show, (forall x.
 InboundSt objectId object -> Rep (InboundSt objectId object) x)
-> (forall x.
    Rep (InboundSt objectId object) x -> InboundSt objectId object)
-> Generic (InboundSt objectId object)
forall x.
Rep (InboundSt objectId object) x -> InboundSt objectId object
forall x.
InboundSt objectId object -> Rep (InboundSt objectId object) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall objectId object x.
Rep (InboundSt objectId object) x -> InboundSt objectId object
forall objectId object x.
InboundSt objectId object -> Rep (InboundSt objectId object) x
$cfrom :: forall objectId object x.
InboundSt objectId object -> Rep (InboundSt objectId object) x
from :: forall x.
InboundSt objectId object -> Rep (InboundSt objectId object) x
$cto :: forall objectId object x.
Rep (InboundSt objectId object) x -> InboundSt objectId object
to :: forall x.
Rep (InboundSt objectId object) x -> InboundSt objectId object
Generic)
  deriving anyclass Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
Proxy (InboundSt objectId object) -> String
(Context -> InboundSt objectId object -> IO (Maybe ThunkInfo))
-> (Context -> InboundSt objectId object -> IO (Maybe ThunkInfo))
-> (Proxy (InboundSt objectId object) -> String)
-> NoThunks (InboundSt objectId object)
forall a.
(Context -> a -> IO (Maybe ThunkInfo))
-> (Context -> a -> IO (Maybe ThunkInfo))
-> (Proxy a -> String)
-> NoThunks a
forall objectId object.
(NoThunks objectId, NoThunks object) =>
Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
forall objectId object.
(NoThunks objectId, NoThunks object) =>
Proxy (InboundSt objectId object) -> String
$cnoThunks :: forall objectId object.
(NoThunks objectId, NoThunks object) =>
Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
noThunks :: Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
$cwNoThunks :: forall objectId object.
(NoThunks objectId, NoThunks object) =>
Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
wNoThunks :: Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
$cshowTypeOf :: forall objectId object.
(NoThunks objectId, NoThunks object) =>
Proxy (InboundSt objectId object) -> String
showTypeOf :: Proxy (InboundSt objectId object) -> String
NoThunks

initialInboundSt :: InboundSt objectId object
initialInboundSt :: forall objectId object. InboundSt objectId object
initialInboundSt = NumObjectIdsReq
-> StrictSeq objectId
-> Set objectId
-> Map objectId (Maybe object)
-> NumObjectIdsAck
-> InboundSt objectId object
forall objectId object.
NumObjectIdsReq
-> StrictSeq objectId
-> Set objectId
-> Map objectId (Maybe object)
-> NumObjectIdsAck
-> InboundSt objectId object
InboundSt NumObjectIdsReq
0 StrictSeq objectId
forall a. StrictSeq a
Seq.empty Set objectId
forall a. Set a
Set.empty Map objectId (Maybe object)
forall k a. Map k a
Map.empty NumObjectIdsAck
0

objectDiffusionInbound ::
  forall objectId object m.
  ( Ord objectId
  , Show objectId
  , Typeable objectId
  , Typeable object
  , NoThunks objectId
  , NoThunks object
  , MonadSTM m
  , MonadThrow m
  ) =>
  Tracer m (TraceObjectDiffusionInbound objectId object) ->
  -- | Maximum values for outstanding FIFO length, number of IDs to request,
  -- and number of objects to request
  (NumObjectsUnacknowledged, NumObjectIdsReq, NumObjectsReq) ->
  ObjectPoolWriter objectId object m ->
  NodeToNodeVersion ->
  ControlMessageSTM m ->
  ObjectDiffusionInboundPipelined objectId object m ()
objectDiffusionInbound :: forall objectId object (m :: * -> *).
(Ord objectId, Show objectId, Typeable objectId, Typeable object,
 NoThunks objectId, NoThunks object, MonadSTM m, MonadThrow m) =>
Tracer m (TraceObjectDiffusionInbound objectId object)
-> (NumObjectsUnacknowledged, NumObjectIdsReq, NumObjectsReq)
-> ObjectPoolWriter objectId object m
-> NodeToNodeVersion
-> ControlMessageSTM m
-> ObjectDiffusionInboundPipelined objectId object m ()
objectDiffusionInbound
  Tracer m (TraceObjectDiffusionInbound objectId object)
tracer
  (NumObjectsUnacknowledged
maxFifoLength, NumObjectIdsReq
maxNumIdsToReq, NumObjectsReq
maxNumObjectsToReq)
  ObjectPoolWriter{STM m (objectId -> Bool)
object -> objectId
[object] -> m ()
opwObjectId :: object -> objectId
opwAddObjects :: [object] -> m ()
opwHasObject :: STM m (objectId -> Bool)
opwAddObjects :: forall objectId object (m :: * -> *).
ObjectPoolWriter objectId object m -> [object] -> m ()
opwHasObject :: forall objectId object (m :: * -> *).
ObjectPoolWriter objectId object m -> STM m (objectId -> Bool)
opwObjectId :: forall objectId object (m :: * -> *).
ObjectPoolWriter objectId object m -> object -> objectId
..}
  NodeToNodeVersion
_version
  ControlMessageSTM m
controlMessageSTM =
    InboundStIdle 'Z objectId object m ()
-> ObjectDiffusionInboundPipelined objectId object m ()
forall objectId object (m :: * -> *) a.
InboundStIdle 'Z objectId object m a
-> ObjectDiffusionInboundPipelined objectId object m a
ObjectDiffusionInboundPipelined (InboundStIdle 'Z objectId object m ()
 -> ObjectDiffusionInboundPipelined objectId object m ())
-> InboundStIdle 'Z objectId object m ()
-> ObjectDiffusionInboundPipelined objectId object m ()
forall a b. (a -> b) -> a -> b
$!
      InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
forall objectId object. InboundSt objectId object
initialInboundSt InboundSt objectId object
-> (InboundSt objectId object
    -> InboundStIdle 'Z objectId object m ())
-> InboundStIdle 'Z objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat 'Z
-> InboundSt objectId object
-> InboundStIdle 'Z objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
go Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero
   where
    canRequestMoreObjects :: InboundSt k object -> Bool
    canRequestMoreObjects :: forall k. InboundSt k object -> Bool
canRequestMoreObjects !InboundSt k object
st =
      Bool -> Bool
not (Set k -> Bool
forall a. Set a -> Bool
Set.null (InboundSt k object -> Set k
forall objectId object. InboundSt objectId object -> Set objectId
canRequestNext InboundSt k object
st))

    -- Computes how many new IDs we can request so that receiving all of them
    -- won't make 'outstandingFifo' exceed 'maxFifoLength'.
    numIdsToReq :: InboundSt objectId object -> NumObjectIdsReq
    numIdsToReq :: InboundSt objectId object -> NumObjectIdsReq
numIdsToReq !InboundSt objectId object
st =
      NumObjectIdsReq
maxNumIdsToReq
        NumObjectIdsReq -> NumObjectIdsReq -> NumObjectIdsReq
forall a. Ord a => a -> a -> a
`min` ( NumObjectsUnacknowledged -> NumObjectIdsReq
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectsUnacknowledged
maxFifoLength
                  NumObjectIdsReq -> NumObjectIdsReq -> NumObjectIdsReq
forall a. Num a => a -> a -> a
- (Int -> NumObjectIdsReq
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> NumObjectIdsReq) -> Int -> NumObjectIdsReq
forall a b. (a -> b) -> a -> b
$ StrictSeq objectId -> Int
forall a. StrictSeq a -> Int
Seq.length (StrictSeq objectId -> Int) -> StrictSeq objectId -> Int
forall a b. (a -> b) -> a -> b
$ InboundSt objectId object -> StrictSeq objectId
forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo InboundSt objectId object
st)
                  NumObjectIdsReq -> NumObjectIdsReq -> NumObjectIdsReq
forall a. Num a => a -> a -> a
- InboundSt objectId object -> NumObjectIdsReq
forall objectId object.
InboundSt objectId object -> NumObjectIdsReq
numIdsInFlight InboundSt objectId object
st
              )

    -- Updates 'InboundSt' with new object IDs and return the updated 'InboundSt'.
    --
    -- Collected object IDs that are already in the objectPool are pre-emptively
    -- acknowledged so that we don't need to bother requesting them from the
    -- outbound peer.
    preAcknowledge ::
      InboundSt objectId object ->
      (objectId -> Bool) ->
      [objectId] ->
      InboundSt objectId object
    preAcknowledge :: InboundSt objectId object
-> (objectId -> Bool) -> [objectId] -> InboundSt objectId object
preAcknowledge !InboundSt objectId object
st objectId -> Bool
_ [objectId]
collectedIds | [objectId] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [objectId]
collectedIds = InboundSt objectId object
st
    preAcknowledge !InboundSt objectId object
st objectId -> Bool
poolHasObject [objectId]
collectedIds =
      let
        -- Divide the collected IDs in two parts: those that are already in the
        -- objectPool and those that are not.
        ([objectId]
alreadyObtained, [objectId]
notYetObtained) =
          (objectId -> Bool) -> [objectId] -> ([objectId], [objectId])
forall a. (a -> Bool) -> [a] -> ([a], [a])
List.partition
            objectId -> Bool
poolHasObject
            [objectId]
collectedIds

        -- The objects that we intentionally don't request, because they are
        -- already in the objectPool, will need to be acknowledged.
        -- So we extend 'pendingObjects' with those objects (so of course they
        -- have no corresponding reply).
        pendingObjects' :: Map objectId (Maybe object)
pendingObjects' =
          (Map objectId (Maybe object)
 -> objectId -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> [objectId]
-> Map objectId (Maybe object)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl'
            (\Map objectId (Maybe object)
accMap objectId
objectId -> objectId
-> Maybe object
-> Map objectId (Maybe object)
-> Map objectId (Maybe object)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert objectId
objectId Maybe object
forall a. Maybe a
Nothing Map objectId (Maybe object)
accMap)
            (InboundSt objectId object -> Map objectId (Maybe object)
forall objectId object.
InboundSt objectId object -> Map objectId (Maybe object)
pendingObjects InboundSt objectId object
st)
            [objectId]
alreadyObtained

        -- We initially extend 'outstandingFifo' with the all the collected IDs
        -- (to properly mirror the server state).
        outstandingFifo' :: StrictSeq objectId
outstandingFifo' = InboundSt objectId object -> StrictSeq objectId
forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo InboundSt objectId object
st StrictSeq objectId -> StrictSeq objectId -> StrictSeq objectId
forall a. Semigroup a => a -> a -> a
<> [objectId] -> StrictSeq objectId
forall a. [a] -> StrictSeq a
Seq.fromList [objectId]
collectedIds

        -- Now check if the update of 'pendingObjects' let us acknowledge a prefix
        -- of the 'outstandingFifo', as we do in 'goCollect' -> 'CollectObjects'.
        (StrictSeq objectId
objectIdsToAck, StrictSeq objectId
outstandingFifo'') =
          (objectId -> Bool)
-> StrictSeq objectId -> (StrictSeq objectId, StrictSeq objectId)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (objectId -> Map objectId (Maybe object) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map objectId (Maybe object)
pendingObjects') StrictSeq objectId
outstandingFifo'

        -- If so we can remove them from the 'pendingObjects' structure.
        --
        -- Note that unlike in TX-Submission, we made sure the outstanding FIFO
        -- couldn't have duplicate IDs, so we don't have to worry about re-adding
        -- the duplicate IDs to 'pendingObjects' for future acknowledgment.
        pendingObjects'' :: Map objectId (Maybe object)
pendingObjects'' =
          (Map objectId (Maybe object)
 -> objectId -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> StrictSeq objectId
-> Map objectId (Maybe object)
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl'
            ((objectId
 -> Map objectId (Maybe object) -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> objectId
-> Map objectId (Maybe object)
forall a b c. (a -> b -> c) -> b -> a -> c
flip objectId
-> Map objectId (Maybe object) -> Map objectId (Maybe object)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete)
            Map objectId (Maybe object)
pendingObjects'
            StrictSeq objectId
objectIdsToAck

        !st' :: InboundSt objectId object
st' =
          InboundSt objectId object
st
            { canRequestNext = canRequestNext st <> Set.fromList notYetObtained
            , pendingObjects = pendingObjects''
            , outstandingFifo = outstandingFifo''
            , numToAckOnNextReq =
                numToAckOnNextReq st
                  + fromIntegral (Seq.length objectIdsToAck)
            }
       in
        InboundSt objectId object
st'

    go ::
      forall (n :: N).
      Nat n ->
      InboundSt objectId object ->
      InboundStIdle n objectId object m ()
    go :: forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
go Nat n
n !InboundSt objectId object
st = m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall (m :: * -> *) (n :: N) objectId object a.
m (InboundStIdle n objectId object m a)
-> InboundStIdle n objectId object m a
WithEffect (m (InboundStIdle n objectId object m ())
 -> InboundStIdle n objectId object m ())
-> m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. (a -> b) -> a -> b
$ do
      -- Check whether we should continue engaging in the protocol.
      ctrlMsg <- ControlMessageSTM m -> m ControlMessage
forall a. (?callStack::CallStack) => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically ControlMessageSTM m
controlMessageSTM
      traceWith tracer $
        TraceObjectDiffusionInboundRecvControlMessage ctrlMsg
      case ctrlMsg of
        -- The peer selection governor is asking us to terminate the connection.
        ControlMessage
Terminate ->
          InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
 -> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$! Nat n -> InboundStIdle n objectId object m ()
forall (n :: N). Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain Nat n
n
        -- Otherwise, we can continue the protocol normally.
        ControlMessage
_continue -> case Nat n
n of
          -- We didn't pipeline any requests, so there are no replies in flight
          -- (nothing to collect)
          Nat n
Zero -> do
            if InboundSt objectId object -> Bool
forall k. InboundSt k object -> Bool
canRequestMoreObjects InboundSt objectId object
st
              then do
                -- There are no replies in flight, but we do know some more objects
                -- we can ask for, so lets ask for them and more objectIds in a
                -- pipelined way.
                Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
                  Int -> TraceObjectDiffusionInbound objectId object
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundCanRequestMoreObjects (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n)
                InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
 -> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$! InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
    -> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goReqObjectsAndObjectIdsPipelined Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero
              else do
                -- There's no replies in flight, and we have no more objects we can
                -- ask for so the only remaining thing to do is to ask for more
                -- objectIds.
                -- Since this is the only thing to do now, and as per the protocol
                -- requirements, we make this a blocking call.
                Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
                  Int -> TraceObjectDiffusionInbound objectId object
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundCannotRequestMoreObjects (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n)
                InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
 -> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$! InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
    -> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& InboundSt objectId object -> InboundStIdle n objectId object m ()
InboundSt objectId object -> InboundStIdle 'Z objectId object m ()
goReqObjectIdsBlocking

          -- We have pipelined some requests, so there are some replies in flight.
          Succ Nat n
n' ->
            if InboundSt objectId object -> Bool
forall k. InboundSt k object -> Bool
canRequestMoreObjects InboundSt objectId object
st
              then do
                -- We have replies in flight and we should eagerly collect them if
                -- available, but there are objects to request too so we
                -- should *not* block waiting for replies.
                -- So we ask for new objects and objectIds in a pipelined way.
                Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
                  Int -> TraceObjectDiffusionInbound objectId object
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundCanRequestMoreObjects (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n)
                InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
 -> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$!
                  Maybe (InboundStIdle ('S n) objectId object m ())
-> (Collect objectId object
    -> InboundStIdle n objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall (n1 :: N) objectId object (m :: * -> *) a.
Maybe (InboundStIdle ('S n1) objectId object m a)
-> (Collect objectId object
    -> InboundStIdle n1 objectId object m a)
-> InboundStIdle ('S n1) objectId object m a
CollectPipelined
                    (InboundStIdle ('S n) objectId object m ()
-> Maybe (InboundStIdle ('S n) objectId object m ())
forall a. a -> Maybe a
Just (InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
    -> InboundStIdle ('S n) objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat ('S n)
-> InboundSt objectId object
-> InboundStIdle ('S n) objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goReqObjectsAndObjectIdsPipelined (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n')))
                    (\Collect objectId object
collected -> InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
    -> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
forall (n :: N).
Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goCollect Nat n
n' Collect objectId object
collected)
              else do
                Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
                  Int -> TraceObjectDiffusionInbound objectId object
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundCannotRequestMoreObjects (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n)
                -- In this case we can theoretically only collect replies or request
                -- new object IDs.
                --
                -- But it's important not to pipeline more requests for objectIds now
                -- because if we did, then immediately after sending the request (but
                -- having not yet received a response to either this or the other
                -- pipelined requests), we would directly re-enter this code path,
                -- resulting us in filling the pipeline with an unbounded number of
                -- requests.
                --
                -- So we instead block until we collect a reply.
                InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
 -> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$!
                  Maybe (InboundStIdle ('S n) objectId object m ())
-> (Collect objectId object
    -> InboundStIdle n objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall (n1 :: N) objectId object (m :: * -> *) a.
Maybe (InboundStIdle ('S n1) objectId object m a)
-> (Collect objectId object
    -> InboundStIdle n1 objectId object m a)
-> InboundStIdle ('S n1) objectId object m a
CollectPipelined
                    Maybe (InboundStIdle ('S n) objectId object m ())
forall a. Maybe a
Nothing
                    (\Collect objectId object
collected -> InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
    -> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
forall (n :: N).
Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goCollect Nat n
n' Collect objectId object
collected)

    goCollect ::
      forall (n :: N).
      Nat n ->
      Collect objectId object ->
      InboundSt objectId object ->
      InboundStIdle n objectId object m ()
    goCollect :: forall (n :: N).
Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goCollect Nat n
n Collect objectId object
collect !InboundSt objectId object
st = case Collect objectId object
collect of
      CollectObjectIds NumObjectIdsReq
numIdsRequested [objectId]
collectedIds -> m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall (m :: * -> *) (n :: N) objectId object a.
m (InboundStIdle n objectId object m a)
-> InboundStIdle n objectId object m a
WithEffect (m (InboundStIdle n objectId object m ())
 -> InboundStIdle n objectId object m ())
-> m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. (a -> b) -> a -> b
$ do
        let numCollectedIds :: Int
numCollectedIds = [objectId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [objectId]
collectedIds
            collectedIdsMap :: Map objectId Int
collectedIdsMap = (Int -> Int -> Int) -> [(objectId, Int)] -> Map objectId Int
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
Map.fromListWith Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) [(objectId
x, Int
1 :: Int) | objectId
x <- [objectId]
collectedIds]

        -- Check they didn't send more than we asked for. We don't need to
        -- check for a minimum: the blocking case checks for non-zero
        -- elsewhere, and for the non-blocking case it is quite normal for
        -- them to send us none.
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
numCollectedIds Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> NumObjectIdsReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
numIdsRequested) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          ObjectDiffusionInboundError objectId object -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
            ( forall objectId object.
Int -> Int -> ObjectDiffusionInboundError objectId object
ProtocolErrorObjectIdsNotRequested @objectId @object
                (NumObjectIdsReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
numIdsRequested)
                Int
numCollectedIds
            )

        -- Check that the server didn't send duplicate IDs in its response
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Map objectId Int -> Int
forall k a. Map k a -> Int
Map.size Map objectId Int
collectedIdsMap Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
numCollectedIds) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          ObjectDiffusionInboundError objectId object -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
            ( forall objectId object.
Map objectId Int -> ObjectDiffusionInboundError objectId object
ProtocolErrorObjectIdsDuplicate @objectId @object (Map objectId Int -> ObjectDiffusionInboundError objectId object)
-> Map objectId Int -> ObjectDiffusionInboundError objectId object
forall a b. (a -> b) -> a -> b
$
                (Int -> Bool) -> Map objectId Int -> Map objectId Int
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1) (Map objectId Int -> Map objectId Int)
-> Map objectId Int -> Map objectId Int
forall a b. (a -> b) -> a -> b
$
                  Map objectId Int
collectedIdsMap
            )

        -- Check that the server didn't send IDs that were already in the
        -- outstanding FIFO
        let alreadyKnownIds :: StrictSeq objectId
alreadyKnownIds = (objectId -> Bool) -> StrictSeq objectId -> StrictSeq objectId
forall a. (a -> Bool) -> StrictSeq a -> StrictSeq a
Seq.filter (objectId -> Map objectId Int -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map objectId Int
collectedIdsMap) (InboundSt objectId object -> StrictSeq objectId
forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo InboundSt objectId object
st)
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (StrictSeq objectId -> Bool
forall a. StrictSeq a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null StrictSeq objectId
alreadyKnownIds)) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          ObjectDiffusionInboundError objectId object -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
            ( forall objectId object.
Set objectId -> ObjectDiffusionInboundError objectId object
ProtocolErrorObjectIdsAlreadyKnown @objectId @object (Set objectId -> ObjectDiffusionInboundError objectId object)
-> Set objectId -> ObjectDiffusionInboundError objectId object
forall a b. (a -> b) -> a -> b
$
                [objectId] -> Set objectId
forall a. Ord a => [a] -> Set a
Set.fromList (StrictSeq objectId -> [objectId]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq objectId
alreadyKnownIds)
            )

        -- We extend our outstanding FIFO with the newly received objectIds by
        -- calling 'preAcknowledge' which will also pre-emptively acknowledge the
        -- objectIds that we already have in the pool and thus don't need to
        -- request.
        let !st' :: InboundSt objectId object
st' = InboundSt objectId object
st{numIdsInFlight = numIdsInFlight st - numIdsRequested}
        poolHasObject <- STM m (objectId -> Bool) -> m (objectId -> Bool)
forall a. (?callStack::CallStack) => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (objectId -> Bool) -> m (objectId -> Bool))
-> STM m (objectId -> Bool) -> m (objectId -> Bool)
forall a b. (a -> b) -> a -> b
$ STM m (objectId -> Bool)
opwHasObject
        let !st'' = InboundSt objectId object
-> (objectId -> Bool) -> [objectId] -> InboundSt objectId object
preAcknowledge InboundSt objectId object
st' objectId -> Bool
poolHasObject [objectId]
collectedIds
        pure $! checkState st'' & go n
      CollectObjects [objectId]
requestedIds [object]
collectedObjects -> m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall (m :: * -> *) (n :: N) objectId object a.
m (InboundStIdle n objectId object m a)
-> InboundStIdle n objectId object m a
WithEffect (m (InboundStIdle n objectId object m ())
 -> InboundStIdle n objectId object m ())
-> m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. (a -> b) -> a -> b
$ do
        let requestedIdsSet :: Set objectId
requestedIdsSet = [objectId] -> Set objectId
forall a. Ord a => [a] -> Set a
Set.fromList [objectId]
requestedIds
            obtainedIdsSet :: Set objectId
obtainedIdsSet = [objectId] -> Set objectId
forall a. Ord a => [a] -> Set a
Set.fromList (object -> objectId
opwObjectId (object -> objectId) -> [object] -> [objectId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [object]
collectedObjects)

        -- To start with we have to verify that the objects they have sent us are
        -- exactly the objects we asked for, not more, not less.
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set objectId
requestedIdsSet Set objectId -> Set objectId -> Bool
forall a. Eq a => a -> a -> Bool
/= Set objectId
obtainedIdsSet) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          let reqButNotRecvd :: Set objectId
reqButNotRecvd = Set objectId
requestedIdsSet Set objectId -> Set objectId -> Set objectId
forall a. Ord a => Set a -> Set a -> Set a
`Set.difference` Set objectId
obtainedIdsSet
              recvButNotReq :: Set objectId
recvButNotReq = Set objectId
obtainedIdsSet Set objectId -> Set objectId -> Set objectId
forall a. Ord a => Set a -> Set a -> Set a
`Set.difference` Set objectId
requestedIdsSet
           in ObjectDiffusionInboundError objectId object -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
                ( forall objectId object.
Set objectId
-> Set objectId -> ObjectDiffusionInboundError objectId object
ProtocolErrorObjectsDifferentThanRequested @objectId @object
                    Set objectId
reqButNotRecvd
                    Set objectId
recvButNotReq
                )

        Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
          Int -> TraceObjectDiffusionInbound objectId object
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundCollectedObjects ([object] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [object]
collectedObjects)

        -- We update 'pendingObjects' with the newly obtained objects
        let pendingObjects' :: Map objectId (Maybe object)
pendingObjects' =
              (Map objectId (Maybe object)
 -> object -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> [object]
-> Map objectId (Maybe object)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl'
                (\Map objectId (Maybe object)
accMap object
object -> objectId
-> Maybe object
-> Map objectId (Maybe object)
-> Map objectId (Maybe object)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (object -> objectId
opwObjectId object
object) (object -> Maybe object
forall a. a -> Maybe a
Just object
object) Map objectId (Maybe object)
accMap)
                (InboundSt objectId object -> Map objectId (Maybe object)
forall objectId object.
InboundSt objectId object -> Map objectId (Maybe object)
pendingObjects InboundSt objectId object
st)
                [object]
collectedObjects

            -- We then find the longest prefix of 'outstandingFifo' for which we have
            -- all the corresponding IDs in 'pendingObjects'.
            -- We remove this prefix from 'outstandingFifo'.
            (StrictSeq objectId
objectIdsToAck, StrictSeq objectId
outstandingFifo') =
              (objectId -> Bool)
-> StrictSeq objectId -> (StrictSeq objectId, StrictSeq objectId)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (objectId -> Map objectId (Maybe object) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map objectId (Maybe object)
pendingObjects') (InboundSt objectId object -> StrictSeq objectId
forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo InboundSt objectId object
st)

            -- And also remove these entries from 'pendingObjects'.
            --
            -- Note that unlike in TX-Submission, we made sure the outstanding FIFO
            -- couldn't have duplicate IDs, so we don't have to worry about re-adding
            -- the duplicate IDs to 'pendingObjects' for future acknowledgment.
            pendingObjects'' :: Map objectId (Maybe object)
pendingObjects'' =
              (Map objectId (Maybe object)
 -> objectId -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> StrictSeq objectId
-> Map objectId (Maybe object)
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl'
                ((objectId
 -> Map objectId (Maybe object) -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> objectId
-> Map objectId (Maybe object)
forall a b c. (a -> b -> c) -> b -> a -> c
flip objectId
-> Map objectId (Maybe object) -> Map objectId (Maybe object)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete)
                Map objectId (Maybe object)
pendingObjects'
                StrictSeq objectId
objectIdsToAck

            -- These are the objects we need to submit to the object pool
            --
            -- Note that objects are submitted in the same order as the order of
            -- announcement, for consistency with TX-Submission, and for
            -- simplicity of implementation/acknowledgment. However this is not
            -- a strict requirement of the protocol, so we could consider
            -- changing this in the future if we wanted to (e.g. for
            -- optimizations reasons).
            objectsToAck :: [object]
objectsToAck =
              [Maybe object] -> [object]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe object] -> [object]) -> [Maybe object] -> [object]
forall a b. (a -> b) -> a -> b
$
                ((Map objectId (Maybe object) -> objectId -> Maybe object
forall k a. Ord k => Map k a -> k -> a
(Map.!) Map objectId (Maybe object)
pendingObjects') (objectId -> Maybe object) -> [objectId] -> [Maybe object]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictSeq objectId -> [objectId]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq objectId
objectIdsToAck)

        [object] -> m ()
opwAddObjects [object]
objectsToAck
        Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
          NumObjectsProcessed -> TraceObjectDiffusionInbound objectId object
forall objectId object.
NumObjectsProcessed -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundAddedObjects
            (Word64 -> NumObjectsProcessed
NumObjectsProcessed (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word64) -> Int -> Word64
forall a b. (a -> b) -> a -> b
$ [object] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [object]
objectsToAck))

        let !st' :: InboundSt objectId object
st' =
              InboundSt objectId object
st
                { pendingObjects = pendingObjects''
                , outstandingFifo = outstandingFifo'
                , numToAckOnNextReq =
                    numToAckOnNextReq st
                      + fromIntegral (Seq.length objectIdsToAck)
                }
        InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
 -> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$! InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st' InboundSt objectId object
-> (InboundSt objectId object
    -> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
go Nat n
n

    goReqObjectIdsBlocking ::
      InboundSt objectId object ->
      InboundStIdle 'Z objectId object m ()
    goReqObjectIdsBlocking :: InboundSt objectId object -> InboundStIdle 'Z objectId object m ()
goReqObjectIdsBlocking !InboundSt objectId object
st =
      let numIdsToRequest :: NumObjectIdsReq
numIdsToRequest = InboundSt objectId object -> NumObjectIdsReq
numIdsToReq InboundSt objectId object
st
          -- We should only request new object IDs in a blocking way if we have
          -- absolutely nothing else we can do.
          !st' :: InboundSt objectId object
st' =
            InboundSt objectId object
st
              { numToAckOnNextReq = 0
              , numIdsInFlight = numIdsToRequest
              }
       in Bool
-> InboundStIdle 'Z objectId object m ()
-> InboundStIdle 'Z objectId object m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert
            ( InboundSt objectId object -> NumObjectIdsReq
forall objectId object.
InboundSt objectId object -> NumObjectIdsReq
numIdsInFlight InboundSt objectId object
st NumObjectIdsReq -> NumObjectIdsReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumObjectIdsReq
0
                Bool -> Bool -> Bool
&& StrictSeq objectId -> Bool
forall a. StrictSeq a -> Bool
Seq.null (InboundSt objectId object -> StrictSeq objectId
forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo InboundSt objectId object
st)
                Bool -> Bool -> Bool
&& Set objectId -> Bool
forall a. Set a -> Bool
Set.null (InboundSt objectId object -> Set objectId
forall objectId object. InboundSt objectId object -> Set objectId
canRequestNext InboundSt objectId object
st)
                Bool -> Bool -> Bool
&& Map objectId (Maybe object) -> Bool
forall k a. Map k a -> Bool
Map.null (InboundSt objectId object -> Map objectId (Maybe object)
forall objectId object.
InboundSt objectId object -> Map objectId (Maybe object)
pendingObjects InboundSt objectId object
st)
            )
            (InboundStIdle 'Z objectId object m ()
 -> InboundStIdle 'Z objectId object m ())
-> InboundStIdle 'Z objectId object m ()
-> InboundStIdle 'Z objectId object m ()
forall a b. (a -> b) -> a -> b
$ NumObjectIdsAck
-> NumObjectIdsReq
-> (NonEmpty objectId -> InboundStIdle 'Z objectId object m ())
-> InboundStIdle 'Z objectId object m ()
forall objectId object (m :: * -> *) a.
NumObjectIdsAck
-> NumObjectIdsReq
-> (NonEmpty objectId -> InboundStIdle 'Z objectId object m a)
-> InboundStIdle 'Z objectId object m a
SendMsgRequestObjectIdsBlocking
              (InboundSt objectId object -> NumObjectIdsAck
forall objectId object.
InboundSt objectId object -> NumObjectIdsAck
numToAckOnNextReq InboundSt objectId object
st)
              NumObjectIdsReq
numIdsToRequest
              ( \NonEmpty objectId
neCollectedIds ->
                  InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st' InboundSt objectId object
-> (InboundSt objectId object
    -> InboundStIdle 'Z objectId object m ())
-> InboundStIdle 'Z objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat 'Z
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle 'Z objectId object m ()
forall (n :: N).
Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goCollect Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero (NumObjectIdsReq -> [objectId] -> Collect objectId object
forall objectId object.
NumObjectIdsReq -> [objectId] -> Collect objectId object
CollectObjectIds NumObjectIdsReq
numIdsToRequest (NonEmpty objectId -> [objectId]
forall a. NonEmpty a -> [a]
NonEmpty.toList NonEmpty objectId
neCollectedIds))
              )

    goReqObjectsAndObjectIdsPipelined ::
      forall (n :: N).
      Nat n ->
      InboundSt objectId object ->
      InboundStIdle n objectId object m ()
    goReqObjectsAndObjectIdsPipelined :: forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goReqObjectsAndObjectIdsPipelined Nat n
n !InboundSt objectId object
st =
      -- TODO: This implementation is deliberately naive, we pick in an
      -- arbitrary order. We may want to revisit this later.
      let (Set objectId
toRequest, Set objectId
canRequestNext') =
            Int -> Set objectId -> (Set objectId, Set objectId)
forall a. Int -> Set a -> (Set a, Set a)
Set.splitAt (NumObjectsReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectsReq
maxNumObjectsToReq) (InboundSt objectId object -> Set objectId
forall objectId object. InboundSt objectId object -> Set objectId
canRequestNext InboundSt objectId object
st)
          !st' :: InboundSt objectId object
st' = InboundSt objectId object
st{canRequestNext = canRequestNext'}
       in [objectId]
-> InboundStIdle ('S n) objectId object m ()
-> InboundStIdle n objectId object m ()
forall objectId (n :: N) object (m :: * -> *) a.
[objectId]
-> InboundStIdle ('S n) objectId object m a
-> InboundStIdle n objectId object m a
SendMsgRequestObjectsPipelined
            (Set objectId -> [objectId]
forall a. Set a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Set objectId
toRequest)
            (InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st' InboundSt objectId object
-> (InboundSt objectId object
    -> InboundStIdle ('S n) objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat ('S n)
-> InboundSt objectId object
-> InboundStIdle ('S n) objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goReqObjectIdsPipelined (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))

    goReqObjectIdsPipelined ::
      forall (n :: N).
      Nat n ->
      InboundSt objectId object ->
      InboundStIdle n objectId object m ()
    goReqObjectIdsPipelined :: forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goReqObjectIdsPipelined Nat n
n !InboundSt objectId object
st =
      let numIdsToRequest :: NumObjectIdsReq
numIdsToRequest = InboundSt objectId object -> NumObjectIdsReq
numIdsToReq InboundSt objectId object
st
       in if NumObjectIdsReq
numIdsToRequest NumObjectIdsReq -> NumObjectIdsReq -> Bool
forall a. Ord a => a -> a -> Bool
<= NumObjectIdsReq
0
            then InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
    -> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
go Nat n
n
            else
              let !st' :: InboundSt objectId object
st' =
                    InboundSt objectId object
st
                      { numIdsInFlight =
                          numIdsInFlight st
                            + numIdsToRequest
                      , numToAckOnNextReq = 0
                      }
               in NumObjectIdsAck
-> NumObjectIdsReq
-> InboundStIdle ('S n) objectId object m ()
-> InboundStIdle n objectId object m ()
forall (n :: N) objectId object (m :: * -> *) a.
NumObjectIdsAck
-> NumObjectIdsReq
-> InboundStIdle ('S n) objectId object m a
-> InboundStIdle n objectId object m a
SendMsgRequestObjectIdsPipelined
                    (InboundSt objectId object -> NumObjectIdsAck
forall objectId object.
InboundSt objectId object -> NumObjectIdsAck
numToAckOnNextReq InboundSt objectId object
st)
                    NumObjectIdsReq
numIdsToRequest
                    (InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st' InboundSt objectId object
-> (InboundSt objectId object
    -> InboundStIdle ('S n) objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat ('S n)
-> InboundSt objectId object
-> InboundStIdle ('S n) objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
go (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))

    -- Ignore all outstanding replies to messages we pipelined ("drain"), and then
    -- terminate.
    terminateAfterDrain ::
      Nat n -> InboundStIdle n objectId object m ()
    terminateAfterDrain :: forall (n :: N). Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain = \case
      Nat n
Zero -> () -> InboundStIdle 'Z objectId object m ()
forall a objectId object (m :: * -> *).
a -> InboundStIdle 'Z objectId object m a
SendMsgDone ()
      Succ Nat n
n -> Maybe (InboundStIdle ('S n) objectId object m ())
-> (Collect objectId object
    -> InboundStIdle n objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall (n1 :: N) objectId object (m :: * -> *) a.
Maybe (InboundStIdle ('S n1) objectId object m a)
-> (Collect objectId object
    -> InboundStIdle n1 objectId object m a)
-> InboundStIdle ('S n1) objectId object m a
CollectPipelined Maybe (InboundStIdle ('S n) objectId object m ())
forall a. Maybe a
Nothing ((Collect objectId object -> InboundStIdle n objectId object m ())
 -> InboundStIdle ('S n) objectId object m ())
-> (Collect objectId object
    -> InboundStIdle n objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall a b. (a -> b) -> a -> b
$ \Collect objectId object
_ignoredMsg -> Nat n -> InboundStIdle n objectId object m ()
forall (n :: N). Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain Nat n
n

-- | Helper to ensure that the `InboundSt` is free of unexpected thunks and
-- stays strict during the whole process
checkState :: NoThunks s => s -> s
checkState :: forall s. NoThunks s => s -> s
checkState !s
st = Maybe String -> s -> s
forall a. (?callStack::CallStack) => Maybe String -> a -> a
checkInvariant (s -> Maybe String
forall a. NoThunks a => a -> Maybe String
noThunksInvariant s
st) s
st