{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Consensus.Storage.ImmutableDB.Stream (
    NextItem (..)
  , StreamAPI (..)
  , streamAPI
  , streamAPI'
  , streamAll
  ) where

import           Control.Monad.Except
import           GHC.Stack
import           Ouroboros.Consensus.Block
import           Ouroboros.Consensus.Storage.Common
import           Ouroboros.Consensus.Storage.ImmutableDB hiding (streamAll)
import qualified Ouroboros.Consensus.Storage.ImmutableDB.API as ImmutableDB
import           Ouroboros.Consensus.Util.IOLike
import           Ouroboros.Consensus.Util.ResourceRegistry

{-------------------------------------------------------------------------------
  Abstraction over the streaming API provided by the Chain DB
-------------------------------------------------------------------------------}

-- | Next item returned during streaming
data NextItem blk = NoMoreItems | NextItem blk

-- | Stream items from the immutable DB
--
-- When we initialize the ledger DB, we try to find a snapshot close to the
-- tip of the immutable DB, and then stream blocks from the immutable DB to its
-- tip to bring the ledger up to date with the tip of the immutable DB.
--
-- In CPS form to enable the use of 'withXYZ' style iterator init functions.
newtype StreamAPI m blk a = StreamAPI {
      -- | Start streaming after the specified block
      forall (m :: * -> *) blk a.
StreamAPI m blk a
-> forall b.
   HasCallStack =>
   Point blk
   -> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter :: forall b. HasCallStack
        => Point blk
        -- Reference to the block corresponding to the snapshot we found
        -- (or 'GenesisPoint' if we didn't find any)

        -> (Either (RealPoint blk) (m (NextItem a)) -> m b)
        -- Get the next item
        --
        -- Should be @Left pt@ if the snapshot we found is more recent than the
        -- tip of the immutable DB. Since we only store snapshots to disk for
        -- blocks in the immutable DB, this can only happen if the immutable DB
        -- got truncated due to disk corruption. The returned @pt@ is a
        -- 'RealPoint', not a 'Point', since it must always be possible to
        -- stream after genesis.
        -> m b
    }

-- | Stream all items
streamAll ::
     forall m blk e b a. (Monad m, HasCallStack)
  => StreamAPI m blk b
  -> Point blk             -- ^ Starting point for streaming
  -> (RealPoint blk -> e)  -- ^ Error when tip not found
  -> a                     -- ^ Starting point when tip /is/ found
  -> (b -> a -> m a)       -- ^ Update function for each item
  -> ExceptT e m a
streamAll :: forall (m :: * -> *) blk e b a.
(Monad m, HasCallStack) =>
StreamAPI m blk b
-> Point blk
-> (RealPoint blk -> e)
-> a
-> (b -> a -> m a)
-> ExceptT e m a
streamAll StreamAPI{forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m b) -> m b
streamAfter :: forall (m :: * -> *) blk a.
StreamAPI m blk a
-> forall b.
   HasCallStack =>
   Point blk
   -> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter :: forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m b) -> m b
..} Point blk
tip RealPoint blk -> e
notFound a
e b -> a -> m a
f = m (Either e a) -> ExceptT e m a
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (m (Either e a) -> ExceptT e m a)
-> m (Either e a) -> ExceptT e m a
forall a b. (a -> b) -> a -> b
$
    Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m (Either e a))
-> m (Either e a)
forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m b) -> m b
streamAfter Point blk
tip ((Either (RealPoint blk) (m (NextItem b)) -> m (Either e a))
 -> m (Either e a))
-> (Either (RealPoint blk) (m (NextItem b)) -> m (Either e a))
-> m (Either e a)
forall a b. (a -> b) -> a -> b
$ \case
      Left RealPoint blk
tip' -> Either e a -> m (Either e a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either e a -> m (Either e a)) -> Either e a -> m (Either e a)
forall a b. (a -> b) -> a -> b
$ e -> Either e a
forall a b. a -> Either a b
Left (RealPoint blk -> e
notFound RealPoint blk
tip')

      Right m (NextItem b)
getNext -> do
        let go :: a -> m a
            go :: a -> m a
go a
a = do NextItem b
mNext <- m (NextItem b)
getNext
                      case NextItem b
mNext of
                        NextItem b
NoMoreItems -> a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                        NextItem b
b  -> a -> m a
go (a -> m a) -> m a -> m a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< b -> a -> m a
f b
b a
a
        a -> Either e a
forall a b. b -> Either a b
Right (a -> Either e a) -> m a -> m (Either e a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m a
go a
e


streamAPI ::
     (IOLike m, HasHeader blk)
  => ImmutableDB m blk -> StreamAPI m blk blk
streamAPI :: forall (m :: * -> *) blk.
(IOLike m, HasHeader blk) =>
ImmutableDB m blk -> StreamAPI m blk blk
streamAPI = (blk -> m (NextItem blk))
-> BlockComponent blk blk
-> ImmutableDB m blk
-> StreamAPI m blk blk
forall (m :: * -> *) blk a.
(IOLike m, HasHeader blk) =>
(a -> m (NextItem a))
-> BlockComponent blk a -> ImmutableDB m blk -> StreamAPI m blk a
streamAPI' (NextItem blk -> m (NextItem blk)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (NextItem blk -> m (NextItem blk))
-> (blk -> NextItem blk) -> blk -> m (NextItem blk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. blk -> NextItem blk
forall blk. blk -> NextItem blk
NextItem) BlockComponent blk blk
forall blk. BlockComponent blk blk
GetBlock

streamAPI' ::
     forall m blk a.
     (IOLike m, HasHeader blk)
  => (a -> m (NextItem a)) -- ^ Stop condition
  -> BlockComponent   blk a
  -> ImmutableDB    m blk
  -> StreamAPI      m blk a
streamAPI' :: forall (m :: * -> *) blk a.
(IOLike m, HasHeader blk) =>
(a -> m (NextItem a))
-> BlockComponent blk a -> ImmutableDB m blk -> StreamAPI m blk a
streamAPI' a -> m (NextItem a)
shouldStop BlockComponent blk a
blockComponent ImmutableDB m blk
immutableDB = (forall b.
 HasCallStack =>
 Point blk
 -> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b)
-> StreamAPI m blk a
forall (m :: * -> *) blk a.
(forall b.
 HasCallStack =>
 Point blk
 -> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b)
-> StreamAPI m blk a
StreamAPI Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
forall b.
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter
  where
    streamAfter :: Point blk
                -> (Either (RealPoint blk) (m (NextItem a)) -> m b)
                -> m b
    streamAfter :: forall b.
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter Point blk
tip Either (RealPoint blk) (m (NextItem a)) -> m b
k = (ResourceRegistry m -> m b) -> m b
forall (m :: * -> *) a.
(IOLike m, HasCallStack) =>
(ResourceRegistry m -> m a) -> m a
withRegistry ((ResourceRegistry m -> m b) -> m b)
-> (ResourceRegistry m -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \ResourceRegistry m
registry -> do
        Either (MissingBlock blk) (Iterator m blk a)
eItr <-
          ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk a
-> Point blk
-> m (Either (MissingBlock blk) (Iterator m blk a))
forall (m :: * -> *) blk b.
(MonadSTM m, HasHeader blk, HasCallStack) =>
ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk b
-> Point blk
-> m (Either (MissingBlock blk) (Iterator m blk b))
ImmutableDB.streamAfterPoint
            ImmutableDB m blk
immutableDB
            ResourceRegistry m
registry
            BlockComponent blk a
blockComponent
            Point blk
tip
        case Either (MissingBlock blk) (Iterator m blk a)
eItr of
          -- Snapshot is too recent
          Left  MissingBlock blk
err -> Either (RealPoint blk) (m (NextItem a)) -> m b
k (Either (RealPoint blk) (m (NextItem a)) -> m b)
-> Either (RealPoint blk) (m (NextItem a)) -> m b
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> Either (RealPoint blk) (m (NextItem a))
forall a b. a -> Either a b
Left  (RealPoint blk -> Either (RealPoint blk) (m (NextItem a)))
-> RealPoint blk -> Either (RealPoint blk) (m (NextItem a))
forall a b. (a -> b) -> a -> b
$ MissingBlock blk -> RealPoint blk
forall blk. MissingBlock blk -> RealPoint blk
ImmutableDB.missingBlockPoint MissingBlock blk
err
          Right Iterator m blk a
itr -> Either (RealPoint blk) (m (NextItem a)) -> m b
k (Either (RealPoint blk) (m (NextItem a)) -> m b)
-> Either (RealPoint blk) (m (NextItem a)) -> m b
forall a b. (a -> b) -> a -> b
$ m (NextItem a) -> Either (RealPoint blk) (m (NextItem a))
forall a b. b -> Either a b
Right (m (NextItem a) -> Either (RealPoint blk) (m (NextItem a)))
-> m (NextItem a) -> Either (RealPoint blk) (m (NextItem a))
forall a b. (a -> b) -> a -> b
$ Iterator m blk a -> m (NextItem a)
streamUsing Iterator m blk a
itr

    streamUsing :: ImmutableDB.Iterator m blk a
                -> m (NextItem a)
    streamUsing :: Iterator m blk a -> m (NextItem a)
streamUsing Iterator m blk a
itr = do
        IteratorResult a
itrResult <- Iterator m blk a -> HasCallStack => m (IteratorResult a)
forall (m :: * -> *) blk b.
Iterator m blk b -> HasCallStack => m (IteratorResult b)
ImmutableDB.iteratorNext Iterator m blk a
itr
        case IteratorResult a
itrResult of
          IteratorResult a
ImmutableDB.IteratorExhausted -> NextItem a -> m (NextItem a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return NextItem a
forall blk. NextItem blk
NoMoreItems
          ImmutableDB.IteratorResult a
b  -> a -> m (NextItem a)
shouldStop a
b