{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Consensus.Storage.ImmutableDB.Stream
  ( NextItem (..)
  , StreamAPI (..)
  , streamAPI
  , streamAPI'
  , streamAll
  ) where

import Control.Monad.Except
import Control.ResourceRegistry
import GHC.Stack
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Storage.Common
import Ouroboros.Consensus.Storage.ImmutableDB hiding (streamAll)
import qualified Ouroboros.Consensus.Storage.ImmutableDB.API as ImmutableDB
import Ouroboros.Consensus.Util.IOLike

{-------------------------------------------------------------------------------
  Abstraction over the streaming API provided by the Chain DB
-------------------------------------------------------------------------------}

-- | Next item returned during streaming
data NextItem blk = NoMoreItems | NextItem blk

-- | Stream items from the immutable DB
--
-- When we initialize the ledger DB, we try to find a snapshot close to the
-- tip of the immutable DB, and then stream blocks from the immutable DB to its
-- tip to bring the ledger up to date with the tip of the immutable DB.
--
-- In CPS form to enable the use of 'withXYZ' style iterator init functions.
newtype StreamAPI m blk a = StreamAPI
  { forall (m :: * -> *) blk a.
StreamAPI m blk a
-> forall b.
   HasCallStack =>
   Point blk
   -> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter ::
      forall b.
      HasCallStack =>
      Point blk ->
      -- Reference to the block corresponding to the snapshot we found
      -- (or 'GenesisPoint' if we didn't find any)

      (Either (RealPoint blk) (m (NextItem a)) -> m b) ->
      -- Get the next item
      --
      -- Should be @Left pt@ if the snapshot we found is more recent than the
      -- tip of the immutable DB. Since we only store snapshots to disk for
      -- blocks in the immutable DB, this can only happen if the immutable DB
      -- got truncated due to disk corruption. The returned @pt@ is a
      -- 'RealPoint', not a 'Point', since it must always be possible to
      -- stream after genesis.
      m b
  -- ^ Start streaming after the specified block
  }

-- | Stream all items
streamAll ::
  forall m blk e b a.
  (Monad m, HasCallStack) =>
  StreamAPI m blk b ->
  -- | Starting point for streaming
  Point blk ->
  -- | Error when tip not found
  (RealPoint blk -> e) ->
  -- | Starting point when tip /is/ found
  a ->
  -- | Update function for each item
  (b -> a -> m a) ->
  ExceptT e m a
streamAll :: forall (m :: * -> *) blk e b a.
(Monad m, HasCallStack) =>
StreamAPI m blk b
-> Point blk
-> (RealPoint blk -> e)
-> a
-> (b -> a -> m a)
-> ExceptT e m a
streamAll StreamAPI{forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m b) -> m b
streamAfter :: forall (m :: * -> *) blk a.
StreamAPI m blk a
-> forall b.
   HasCallStack =>
   Point blk
   -> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter :: forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m b) -> m b
..} Point blk
tip RealPoint blk -> e
notFound a
e b -> a -> m a
f = m (Either e a) -> ExceptT e m a
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (m (Either e a) -> ExceptT e m a)
-> m (Either e a) -> ExceptT e m a
forall a b. (a -> b) -> a -> b
$
  Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m (Either e a))
-> m (Either e a)
forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m b) -> m b
streamAfter Point blk
tip ((Either (RealPoint blk) (m (NextItem b)) -> m (Either e a))
 -> m (Either e a))
-> (Either (RealPoint blk) (m (NextItem b)) -> m (Either e a))
-> m (Either e a)
forall a b. (a -> b) -> a -> b
$ \case
    Left RealPoint blk
tip' -> Either e a -> m (Either e a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either e a -> m (Either e a)) -> Either e a -> m (Either e a)
forall a b. (a -> b) -> a -> b
$ e -> Either e a
forall a b. a -> Either a b
Left (RealPoint blk -> e
notFound RealPoint blk
tip')
    Right m (NextItem b)
getNext -> do
      let go :: a -> m a
          go :: a -> m a
go a
a = do
            mNext <- m (NextItem b)
getNext
            case mNext of
              NextItem b
NoMoreItems -> a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
              NextItem b
b -> a -> m a
go (a -> m a) -> m a -> m a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< b -> a -> m a
f b
b a
a
      a -> Either e a
forall a b. b -> Either a b
Right (a -> Either e a) -> m a -> m (Either e a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m a
go a
e

streamAPI ::
  (IOLike m, HasHeader blk) =>
  ImmutableDB m blk -> StreamAPI m blk blk
streamAPI :: forall (m :: * -> *) blk.
(IOLike m, HasHeader blk) =>
ImmutableDB m blk -> StreamAPI m blk blk
streamAPI = (blk -> m (NextItem blk))
-> BlockComponent blk blk
-> ImmutableDB m blk
-> StreamAPI m blk blk
forall (m :: * -> *) blk a.
(IOLike m, HasHeader blk) =>
(a -> m (NextItem a))
-> BlockComponent blk a -> ImmutableDB m blk -> StreamAPI m blk a
streamAPI' (NextItem blk -> m (NextItem blk)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (NextItem blk -> m (NextItem blk))
-> (blk -> NextItem blk) -> blk -> m (NextItem blk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. blk -> NextItem blk
forall blk. blk -> NextItem blk
NextItem) BlockComponent blk blk
forall blk. BlockComponent blk blk
GetBlock

streamAPI' ::
  forall m blk a.
  (IOLike m, HasHeader blk) =>
  -- | Stop condition
  (a -> m (NextItem a)) ->
  BlockComponent blk a ->
  ImmutableDB m blk ->
  StreamAPI m blk a
streamAPI' :: forall (m :: * -> *) blk a.
(IOLike m, HasHeader blk) =>
(a -> m (NextItem a))
-> BlockComponent blk a -> ImmutableDB m blk -> StreamAPI m blk a
streamAPI' a -> m (NextItem a)
shouldStop BlockComponent blk a
blockComponent ImmutableDB m blk
immutableDB = (forall b.
 HasCallStack =>
 Point blk
 -> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b)
-> StreamAPI m blk a
forall (m :: * -> *) blk a.
(forall b.
 HasCallStack =>
 Point blk
 -> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b)
-> StreamAPI m blk a
StreamAPI Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
forall b.
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter
 where
  streamAfter ::
    Point blk ->
    (Either (RealPoint blk) (m (NextItem a)) -> m b) ->
    m b
  streamAfter :: forall b.
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter Point blk
tip Either (RealPoint blk) (m (NextItem a)) -> m b
k = (ResourceRegistry m -> m b) -> m b
forall (m :: * -> *) a.
(MonadSTM m, MonadMask m, MonadThread m, HasCallStack) =>
(ResourceRegistry m -> m a) -> m a
withRegistry ((ResourceRegistry m -> m b) -> m b)
-> (ResourceRegistry m -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \ResourceRegistry m
registry -> do
    eItr <-
      ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk a
-> Point blk
-> m (Either (MissingBlock blk) (Iterator m blk a))
forall (m :: * -> *) blk b.
(MonadSTM m, HasHeader blk, HasCallStack) =>
ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk b
-> Point blk
-> m (Either (MissingBlock blk) (Iterator m blk b))
ImmutableDB.streamAfterPoint
        ImmutableDB m blk
immutableDB
        ResourceRegistry m
registry
        BlockComponent blk a
blockComponent
        Point blk
tip
    case eItr of
      -- Snapshot is too recent
      Left MissingBlock blk
err -> Either (RealPoint blk) (m (NextItem a)) -> m b
k (Either (RealPoint blk) (m (NextItem a)) -> m b)
-> Either (RealPoint blk) (m (NextItem a)) -> m b
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> Either (RealPoint blk) (m (NextItem a))
forall a b. a -> Either a b
Left (RealPoint blk -> Either (RealPoint blk) (m (NextItem a)))
-> RealPoint blk -> Either (RealPoint blk) (m (NextItem a))
forall a b. (a -> b) -> a -> b
$ MissingBlock blk -> RealPoint blk
forall blk. MissingBlock blk -> RealPoint blk
ImmutableDB.missingBlockPoint MissingBlock blk
err
      Right Iterator m blk a
itr -> Either (RealPoint blk) (m (NextItem a)) -> m b
k (Either (RealPoint blk) (m (NextItem a)) -> m b)
-> Either (RealPoint blk) (m (NextItem a)) -> m b
forall a b. (a -> b) -> a -> b
$ m (NextItem a) -> Either (RealPoint blk) (m (NextItem a))
forall a b. b -> Either a b
Right (m (NextItem a) -> Either (RealPoint blk) (m (NextItem a)))
-> m (NextItem a) -> Either (RealPoint blk) (m (NextItem a))
forall a b. (a -> b) -> a -> b
$ Iterator m blk a -> m (NextItem a)
streamUsing Iterator m blk a
itr

  streamUsing ::
    ImmutableDB.Iterator m blk a ->
    m (NextItem a)
  streamUsing :: Iterator m blk a -> m (NextItem a)
streamUsing Iterator m blk a
itr = do
    itrResult <- Iterator m blk a -> HasCallStack => m (IteratorResult a)
forall (m :: * -> *) blk b.
Iterator m blk b -> HasCallStack => m (IteratorResult b)
ImmutableDB.iteratorNext Iterator m blk a
itr
    case itrResult of
      IteratorResult a
ImmutableDB.IteratorExhausted -> NextItem a -> m (NextItem a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return NextItem a
forall blk. NextItem blk
NoMoreItems
      ImmutableDB.IteratorResult a
b -> a -> m (NextItem a)
shouldStop a
b