{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound
  ( objectDiffusionOutbound
  , TraceObjectDiffusionOutbound (..)
  , ObjectDiffusionOutboundError (..)
  ) where

import Cardano.Network.NodeToNode.Version (NodeToNodeVersion)
import Control.Monad (join, unless, when)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer, traceWith)
import Data.List.NonEmpty qualified as NonEmpty
import Data.Map qualified as Map
import Data.Maybe (fromMaybe)
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as Seq
import Data.Set qualified as Set
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound
import Ouroboros.Network.Protocol.ObjectDiffusion.Type

-- Note: This module is inspired from TxSubmission outbound side.

data TraceObjectDiffusionOutbound objectId object
  = TraceObjectDiffusionOutboundRecvMsgRequestObjectIds NumObjectIdsReq
  | -- | The IDs to be sent in the response
    TraceObjectDiffusionOutboundSendMsgReplyObjectIds [objectId]
  | -- | The IDs of the objects requested.
    TraceObjectDiffusionOutboundRecvMsgRequestObjects
      [objectId]
  | -- | The objects to be sent in the response.
    TraceObjectDiffusionOutboundSendMsgReplyObjects
      [object]
  | -- | Received 'MsgDone'
    TraceObjectDiffusionOutboundTerminated
  deriving Int -> TraceObjectDiffusionOutbound objectId object -> ShowS
[TraceObjectDiffusionOutbound objectId object] -> ShowS
TraceObjectDiffusionOutbound objectId object -> String
(Int -> TraceObjectDiffusionOutbound objectId object -> ShowS)
-> (TraceObjectDiffusionOutbound objectId object -> String)
-> ([TraceObjectDiffusionOutbound objectId object] -> ShowS)
-> Show (TraceObjectDiffusionOutbound objectId object)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall objectId object.
(Show objectId, Show object) =>
Int -> TraceObjectDiffusionOutbound objectId object -> ShowS
forall objectId object.
(Show objectId, Show object) =>
[TraceObjectDiffusionOutbound objectId object] -> ShowS
forall objectId object.
(Show objectId, Show object) =>
TraceObjectDiffusionOutbound objectId object -> String
$cshowsPrec :: forall objectId object.
(Show objectId, Show object) =>
Int -> TraceObjectDiffusionOutbound objectId object -> ShowS
showsPrec :: Int -> TraceObjectDiffusionOutbound objectId object -> ShowS
$cshow :: forall objectId object.
(Show objectId, Show object) =>
TraceObjectDiffusionOutbound objectId object -> String
show :: TraceObjectDiffusionOutbound objectId object -> String
$cshowList :: forall objectId object.
(Show objectId, Show object) =>
[TraceObjectDiffusionOutbound objectId object] -> ShowS
showList :: [TraceObjectDiffusionOutbound objectId object] -> ShowS
Show

data ObjectDiffusionOutboundError
  = ProtocolErrorAckedTooManyObjectIds
  | ProtocolErrorRequestedNothing
  | ProtocolErrorRequestedTooManyObjectIds NumObjectIdsReq NumObjectsUnacknowledged
  | ProtocolErrorRequestBlocking
  | ProtocolErrorRequestNonBlocking
  | ProtocolErrorRequestedUnavailableObject
  | ProtocolErrorRequestedDuplicateObject
  deriving Int -> ObjectDiffusionOutboundError -> ShowS
[ObjectDiffusionOutboundError] -> ShowS
ObjectDiffusionOutboundError -> String
(Int -> ObjectDiffusionOutboundError -> ShowS)
-> (ObjectDiffusionOutboundError -> String)
-> ([ObjectDiffusionOutboundError] -> ShowS)
-> Show ObjectDiffusionOutboundError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ObjectDiffusionOutboundError -> ShowS
showsPrec :: Int -> ObjectDiffusionOutboundError -> ShowS
$cshow :: ObjectDiffusionOutboundError -> String
show :: ObjectDiffusionOutboundError -> String
$cshowList :: [ObjectDiffusionOutboundError] -> ShowS
showList :: [ObjectDiffusionOutboundError] -> ShowS
Show

instance Exception ObjectDiffusionOutboundError where
  displayException :: ObjectDiffusionOutboundError -> String
displayException ObjectDiffusionOutboundError
ProtocolErrorAckedTooManyObjectIds =
    String
"The peer tried to acknowledged more objectIds than are available to do so."
  displayException (ProtocolErrorRequestedTooManyObjectIds NumObjectIdsReq
reqNo NumObjectsUnacknowledged
maxUnacked) =
    String
"The peer requested "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ NumObjectIdsReq -> String
forall a. Show a => a -> String
show NumObjectIdsReq
reqNo
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" objectIds which would put the "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"total in flight over the limit of "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ NumObjectsUnacknowledged -> String
forall a. Show a => a -> String
show NumObjectsUnacknowledged
maxUnacked
  displayException ObjectDiffusionOutboundError
ProtocolErrorRequestedNothing =
    String
"The peer requested zero objectIds."
  displayException ObjectDiffusionOutboundError
ProtocolErrorRequestBlocking =
    String
"The peer made a blocking request for more objectIds when there are still "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"unacknowledged objectIds. It should have used a non-blocking request."
  displayException ObjectDiffusionOutboundError
ProtocolErrorRequestNonBlocking =
    String
"The peer made a non-blocking request for more objectIds when there are "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"no unacknowledged objectIds. It should have used a blocking request."
  displayException ObjectDiffusionOutboundError
ProtocolErrorRequestedUnavailableObject =
    String
"The peer requested an object which is not available, either "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"because it was never available or because it was previously requested."
  displayException ObjectDiffusionOutboundError
ProtocolErrorRequestedDuplicateObject =
    String
"The peer requested the same object twice."

data OutboundSt objectId object ticketNo = OutboundSt
  { forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> StrictSeq object
outstandingFifo :: !(StrictSeq object)
  , forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> ticketNo
lastTicketNo :: !ticketNo
  }

objectDiffusionOutbound ::
  forall objectId object ticketNo m.
  (Ord objectId, MonadSTM m, MonadThrow m) =>
  Tracer m (TraceObjectDiffusionOutbound objectId object) ->
  -- | Maximum number of unacknowledged objectIds allowed
  NumObjectsUnacknowledged ->
  ObjectPoolReader objectId object ticketNo m ->
  NodeToNodeVersion ->
  ObjectDiffusionOutbound objectId object m ()
objectDiffusionOutbound :: forall objectId object ticketNo (m :: * -> *).
(Ord objectId, MonadSTM m, MonadThrow m) =>
Tracer m (TraceObjectDiffusionOutbound objectId object)
-> NumObjectsUnacknowledged
-> ObjectPoolReader objectId object ticketNo m
-> NodeToNodeVersion
-> ObjectDiffusionOutbound objectId object m ()
objectDiffusionOutbound Tracer m (TraceObjectDiffusionOutbound objectId object)
tracer NumObjectsUnacknowledged
maxFifoLength ObjectPoolReader{ticketNo
object -> objectId
ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object)))
oprObjectId :: object -> objectId
oprZeroTicketNo :: ticketNo
oprObjectsAfter :: ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object)))
oprObjectId :: forall objectId object ticketNo (m :: * -> *).
ObjectPoolReader objectId object ticketNo m -> object -> objectId
oprObjectsAfter :: forall objectId object ticketNo (m :: * -> *).
ObjectPoolReader objectId object ticketNo m
-> ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object)))
oprZeroTicketNo :: forall objectId object ticketNo (m :: * -> *).
ObjectPoolReader objectId object ticketNo m -> ticketNo
..} NodeToNodeVersion
_version =
  m (OutboundStIdle objectId object m ())
-> ObjectDiffusionOutbound objectId object m ()
forall objectId object (m :: * -> *) a.
m (OutboundStIdle objectId object m a)
-> ObjectDiffusionOutbound objectId object m a
ObjectDiffusionOutbound (OutboundStIdle objectId object m ()
-> m (OutboundStIdle objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (OutboundSt objectId object ticketNo
-> OutboundStIdle objectId object m ()
makeBundle (OutboundSt objectId object ticketNo
 -> OutboundStIdle objectId object m ())
-> OutboundSt objectId object ticketNo
-> OutboundStIdle objectId object m ()
forall a b. (a -> b) -> a -> b
$ StrictSeq object -> ticketNo -> OutboundSt objectId object ticketNo
forall objectId object ticketNo.
StrictSeq object -> ticketNo -> OutboundSt objectId object ticketNo
OutboundSt StrictSeq object
forall a. StrictSeq a
Seq.empty ticketNo
oprZeroTicketNo))
 where
  makeBundle :: OutboundSt objectId object ticketNo -> OutboundStIdle objectId object m ()
  makeBundle :: OutboundSt objectId object ticketNo
-> OutboundStIdle objectId object m ()
makeBundle !OutboundSt objectId object ticketNo
st =
    OutboundStIdle
      { recvMsgRequestObjectIds :: forall (blocking :: StBlockingStyle).
SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
recvMsgRequestObjectIds = OutboundSt objectId object ticketNo
-> SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
forall (blocking :: StBlockingStyle).
OutboundSt objectId object ticketNo
-> SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
recvMsgRequestObjectIds OutboundSt objectId object ticketNo
st
      , recvMsgRequestObjects :: [objectId] -> m (OutboundStObjects objectId object m ())
recvMsgRequestObjects = OutboundSt objectId object ticketNo
-> [objectId] -> m (OutboundStObjects objectId object m ())
recvMsgRequestObjects OutboundSt objectId object ticketNo
st
      , recvMsgDone :: m ()
recvMsgDone = Tracer m (TraceObjectDiffusionOutbound objectId object)
-> TraceObjectDiffusionOutbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionOutbound objectId object)
tracer TraceObjectDiffusionOutbound objectId object
forall objectId object.
TraceObjectDiffusionOutbound objectId object
TraceObjectDiffusionOutboundTerminated
      }

  updateStNewObjects ::
    OutboundSt objectId object ticketNo ->
    [(ticketNo, object)] ->
    OutboundSt objectId object ticketNo
  updateStNewObjects :: OutboundSt objectId object ticketNo
-> [(ticketNo, object)] -> OutboundSt objectId object ticketNo
updateStNewObjects !OutboundSt{ticketNo
StrictSeq object
outstandingFifo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> StrictSeq object
lastTicketNo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> ticketNo
outstandingFifo :: StrictSeq object
lastTicketNo :: ticketNo
..} [(ticketNo, object)]
sortedNewContent =
    let !outstandingFifo' :: StrictSeq object
outstandingFifo' =
          StrictSeq object
outstandingFifo
            StrictSeq object -> StrictSeq object -> StrictSeq object
forall a. Semigroup a => a -> a -> a
<> ([object] -> StrictSeq object
forall a. [a] -> StrictSeq a
Seq.fromList ([object] -> StrictSeq object) -> [object] -> StrictSeq object
forall a b. (a -> b) -> a -> b
$ (ticketNo, object) -> object
forall a b. (a, b) -> b
snd ((ticketNo, object) -> object) -> [(ticketNo, object)] -> [object]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(ticketNo, object)]
sortedNewContent)
        !lastTicketNo' :: ticketNo
lastTicketNo'
          | [(ticketNo, object)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(ticketNo, object)]
sortedNewContent = ticketNo
lastTicketNo
          | Bool
otherwise = (ticketNo, object) -> ticketNo
forall a b. (a, b) -> a
fst ((ticketNo, object) -> ticketNo) -> (ticketNo, object) -> ticketNo
forall a b. (a -> b) -> a -> b
$ [(ticketNo, object)] -> (ticketNo, object)
forall a. HasCallStack => [a] -> a
last [(ticketNo, object)]
sortedNewContent
     in OutboundSt
          { outstandingFifo :: StrictSeq object
outstandingFifo = StrictSeq object
outstandingFifo'
          , lastTicketNo :: ticketNo
lastTicketNo = ticketNo
lastTicketNo'
          }

  recvMsgRequestObjectIds ::
    forall blocking.
    OutboundSt objectId object ticketNo ->
    SingBlockingStyle blocking ->
    NumObjectIdsAck ->
    NumObjectIdsReq ->
    m (OutboundStObjectIds blocking objectId object m ())
  recvMsgRequestObjectIds :: forall (blocking :: StBlockingStyle).
OutboundSt objectId object ticketNo
-> SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
recvMsgRequestObjectIds !st :: OutboundSt objectId object ticketNo
st@OutboundSt{ticketNo
StrictSeq object
outstandingFifo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> StrictSeq object
lastTicketNo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> ticketNo
outstandingFifo :: StrictSeq object
lastTicketNo :: ticketNo
..} SingBlockingStyle blocking
blocking NumObjectIdsAck
numIdsToAck NumObjectIdsReq
numIdsToReq = do
    Tracer m (TraceObjectDiffusionOutbound objectId object)
-> TraceObjectDiffusionOutbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionOutbound objectId object)
tracer (NumObjectIdsReq -> TraceObjectDiffusionOutbound objectId object
forall objectId object.
NumObjectIdsReq -> TraceObjectDiffusionOutbound objectId object
TraceObjectDiffusionOutboundRecvMsgRequestObjectIds NumObjectIdsReq
numIdsToReq)

    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumObjectIdsAck
numIdsToAck NumObjectIdsAck -> NumObjectIdsAck -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> NumObjectIdsAck
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq object -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq object
outstandingFifo)) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
      ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorAckedTooManyObjectIds

    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when
      ( StrictSeq object -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq object
outstandingFifo
          Int -> Int -> Int
forall a. Num a => a -> a -> a
- NumObjectIdsAck -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsAck
numIdsToAck
          Int -> Int -> Int
forall a. Num a => a -> a -> a
+ NumObjectIdsReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
numIdsToReq
          Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> NumObjectsUnacknowledged -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectsUnacknowledged
maxFifoLength
      )
      (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (NumObjectIdsReq
-> NumObjectsUnacknowledged -> ObjectDiffusionOutboundError
ProtocolErrorRequestedTooManyObjectIds NumObjectIdsReq
numIdsToReq NumObjectsUnacknowledged
maxFifoLength)

    -- First we update our FIFO to remove the number of objectIds that the
    -- inbound peer has acknowledged.
    let !outstandingFifo' :: StrictSeq object
outstandingFifo' = Int -> StrictSeq object -> StrictSeq object
forall a. Int -> StrictSeq a -> StrictSeq a
Seq.drop (NumObjectIdsAck -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsAck
numIdsToAck) StrictSeq object
outstandingFifo
        -- must specify the type here otherwise GHC complains about mismatch objectId types
        st' :: OutboundSt objectId object ticketNo
        !st' :: OutboundSt objectId object ticketNo
st' = OutboundSt objectId object ticketNo
st{outstandingFifo = outstandingFifo'}

    -- Grab info about any new objects after the last object ticketNo we've
    -- seen, up to the number that the peer has requested.
    case SingBlockingStyle blocking
blocking of
      -----------------------------------------------------------------------
      SingBlockingStyle blocking
SingBlocking -> do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumObjectIdsReq
numIdsToReq NumObjectIdsReq -> NumObjectIdsReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumObjectIdsReq
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestedNothing
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StrictSeq object -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq object
outstandingFifo') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestBlocking

        -- oprObjectsAfter returns STM (Maybe (m (Map ...))).
        -- The STM layer retries efficiently until new content is signalled (Just).
        -- However, in rare cases the IO action may still yield an empty
        -- map (e.g. objects GC'd between the STM check and IO read),
        -- so we loop in IO as well.
        let getNewContent :: m (Map ticketNo object)
getNewContent = do
              content <- m (m (Map ticketNo object)) -> m (Map ticketNo object)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m (Map ticketNo object)) -> m (Map ticketNo object))
-> (STM m (m (Map ticketNo object)) -> m (m (Map ticketNo object)))
-> STM m (m (Map ticketNo object))
-> m (Map ticketNo object)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM m (m (Map ticketNo object)) -> m (m (Map ticketNo object))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (m (Map ticketNo object)) -> m (Map ticketNo object))
-> STM m (m (Map ticketNo object)) -> m (Map ticketNo object)
forall a b. (a -> b) -> a -> b
$ do
                maybeNewObjectsAction <-
                  ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object)))
oprObjectsAfter
                    ticketNo
lastTicketNo
                    (NumObjectIdsReq -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
numIdsToReq)
                case maybeNewObjectsAction of
                  Maybe (m (Map ticketNo object))
Nothing -> STM m (m (Map ticketNo object))
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
                  Just m (Map ticketNo object)
newObjectsAction -> m (Map ticketNo object) -> STM m (m (Map ticketNo object))
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure m (Map ticketNo object)
newObjectsAction
              if null content then getNewContent else pure content
        sortedNewContent <- Map ticketNo object -> [(ticketNo, object)]
forall k a. Map k a -> [(k, a)]
Map.toAscList (Map ticketNo object -> [(ticketNo, object)])
-> m (Map ticketNo object) -> m [(ticketNo, object)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Map ticketNo object)
getNewContent

        let !newIds = object -> objectId
oprObjectId (object -> objectId)
-> ((ticketNo, object) -> object) -> (ticketNo, object) -> objectId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ticketNo, object) -> object
forall a b. (a, b) -> b
snd ((ticketNo, object) -> objectId)
-> [(ticketNo, object)] -> [objectId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(ticketNo, object)]
sortedNewContent
            st'' = OutboundSt objectId object ticketNo
-> [(ticketNo, object)] -> OutboundSt objectId object ticketNo
updateStNewObjects OutboundSt objectId object ticketNo
st' [(ticketNo, object)]
sortedNewContent

        traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds)

        pure $
          SendMsgReplyObjectIds
            (BlockingReply (NonEmpty.fromList $ newIds))
            (makeBundle st'')

      -----------------------------------------------------------------------
      SingBlockingStyle blocking
SingNonBlocking -> do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumObjectIdsReq
numIdsToReq NumObjectIdsReq -> NumObjectIdsReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumObjectIdsReq
0 Bool -> Bool -> Bool
&& NumObjectIdsAck
numIdsToAck NumObjectIdsAck -> NumObjectIdsAck -> Bool
forall a. Eq a => a -> a -> Bool
== NumObjectIdsAck
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestedNothing
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (StrictSeq object -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq object
outstandingFifo') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestNonBlocking

        let getNewContent :: m (Map ticketNo object)
getNewContent = m (m (Map ticketNo object)) -> m (Map ticketNo object)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m (Map ticketNo object)) -> m (Map ticketNo object))
-> (STM m (m (Map ticketNo object)) -> m (m (Map ticketNo object)))
-> STM m (m (Map ticketNo object))
-> m (Map ticketNo object)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM m (m (Map ticketNo object)) -> m (m (Map ticketNo object))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (m (Map ticketNo object)) -> m (Map ticketNo object))
-> STM m (m (Map ticketNo object)) -> m (Map ticketNo object)
forall a b. (a -> b) -> a -> b
$ do
              maybeNewObjectsAction <-
                ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object)))
oprObjectsAfter
                  ticketNo
lastTicketNo
                  (NumObjectIdsReq -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
numIdsToReq)
              pure $ fromMaybe (pure Map.empty) maybeNewObjectsAction

        sortedNewContent <- Map ticketNo object -> [(ticketNo, object)]
forall k a. Map k a -> [(k, a)]
Map.toAscList (Map ticketNo object -> [(ticketNo, object)])
-> m (Map ticketNo object) -> m [(ticketNo, object)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Map ticketNo object)
getNewContent

        let !newIds = object -> objectId
oprObjectId (object -> objectId)
-> ((ticketNo, object) -> object) -> (ticketNo, object) -> objectId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ticketNo, object) -> object
forall a b. (a, b) -> b
snd ((ticketNo, object) -> objectId)
-> [(ticketNo, object)] -> [objectId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(ticketNo, object)]
sortedNewContent
            st'' = OutboundSt objectId object ticketNo
-> [(ticketNo, object)] -> OutboundSt objectId object ticketNo
updateStNewObjects OutboundSt objectId object ticketNo
st' [(ticketNo, object)]
sortedNewContent

        traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds)

        pure (SendMsgReplyObjectIds (NonBlockingReply newIds) (makeBundle st''))

  recvMsgRequestObjects ::
    OutboundSt objectId object ticketNo ->
    [objectId] ->
    m (OutboundStObjects objectId object m ())
  recvMsgRequestObjects :: OutboundSt objectId object ticketNo
-> [objectId] -> m (OutboundStObjects objectId object m ())
recvMsgRequestObjects !st :: OutboundSt objectId object ticketNo
st@OutboundSt{ticketNo
StrictSeq object
outstandingFifo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> StrictSeq object
lastTicketNo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> ticketNo
outstandingFifo :: StrictSeq object
lastTicketNo :: ticketNo
..} [objectId]
requestedIds = do
    Tracer m (TraceObjectDiffusionOutbound objectId object)
-> TraceObjectDiffusionOutbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionOutbound objectId object)
tracer ([objectId] -> TraceObjectDiffusionOutbound objectId object
forall objectId object.
[objectId] -> TraceObjectDiffusionOutbound objectId object
TraceObjectDiffusionOutboundRecvMsgRequestObjects [objectId]
requestedIds)

    -- All the objects correspond to advertised objectIds are already in the
    -- outstandingFifo. So we don't need to read from the object pool here.

    -- I've optimized the search to do only one traversal of 'outstandingFifo'.
    -- When the 'requestedIds' is exactly the whole 'outstandingFifo', then this
    -- should take O(n * log n) time.
    --
    -- TODO: We might need to revisit the underlying 'outstandingFifo' data
    -- structure and the search if performance isn't sufficient when we'll use
    -- ObjectDiffusion for votes diffusion (and not just cert diffusion).

    let requestedIdsSet :: Set objectId
requestedIdsSet = [objectId] -> Set objectId
forall a. Ord a => [a] -> Set a
Set.fromList [objectId]
requestedIds

    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set objectId -> Int
forall a. Set a -> Int
Set.size Set objectId
requestedIdsSet Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= [objectId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [objectId]
requestedIds) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
      ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestedDuplicateObject

    let requestedObjects :: [object]
requestedObjects =
          (object -> [object] -> [object])
-> [object] -> StrictSeq object -> [object]
forall a b. (a -> b -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr
            ( \object
obj [object]
acc ->
                if objectId -> Set objectId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (object -> objectId
oprObjectId object
obj) Set objectId
requestedIdsSet
                  then object
obj object -> [object] -> [object]
forall a. a -> [a] -> [a]
: [object]
acc
                  else [object]
acc
            )
            []
            StrictSeq object
outstandingFifo

    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set objectId -> Int
forall a. Set a -> Int
Set.size Set objectId
requestedIdsSet Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= [object] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [object]
requestedObjects) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
      ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestedUnavailableObject

    Tracer m (TraceObjectDiffusionOutbound objectId object)
-> TraceObjectDiffusionOutbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionOutbound objectId object)
tracer ([object] -> TraceObjectDiffusionOutbound objectId object
forall objectId object.
[object] -> TraceObjectDiffusionOutbound objectId object
TraceObjectDiffusionOutboundSendMsgReplyObjects [object]
requestedObjects)

    OutboundStObjects objectId object m ()
-> m (OutboundStObjects objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([object]
-> OutboundStIdle objectId object m ()
-> OutboundStObjects objectId object m ()
forall object objectId (m :: * -> *) a.
[object]
-> OutboundStIdle objectId object m a
-> OutboundStObjects objectId object m a
SendMsgReplyObjects [object]
requestedObjects (OutboundSt objectId object ticketNo
-> OutboundStIdle objectId object m ()
makeBundle OutboundSt objectId object ticketNo
st))