{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Consensus.Storage.VolatileDB.Impl.Parser (
    ParseError (..)
  , ParsedBlockInfo (..)
  , parseBlockFile
    -- * Auxiliary
  , extractBlockInfo
  ) where

import           Data.Bifunctor (bimap)
import qualified Data.ByteString.Lazy as Lazy
import           Data.Word (Word64)
import           Ouroboros.Consensus.Block
import           Ouroboros.Consensus.Storage.Serialisation
import           Ouroboros.Consensus.Storage.VolatileDB.API (BlockInfo (..))
import           Ouroboros.Consensus.Storage.VolatileDB.Impl.Types
import           Ouroboros.Consensus.Util.CBOR (ReadIncrementalErr (..),
                     withStreamIncrementalOffsets)
import           Ouroboros.Consensus.Util.IOLike
import qualified Streaming.Prelude as S
import           Streaming.Prelude (Of (..), Stream)
import           System.FS.API (HasFS)
import           System.FS.API.Types (FsPath)

-- | Information returned by the parser about a single block.
--
-- The parser returns for each block, its offset, its size and its 'BlockInfo'
--
-- The fields of this record are strict to make sure that by evaluating this
-- record to WHNF, we no longer hold on to the entire block. Otherwise, we might
-- accidentally keep all blocks in a single file in memory during parsing.
data ParsedBlockInfo blk = ParsedBlockInfo {
      forall blk. ParsedBlockInfo blk -> BlockOffset
pbiBlockOffset :: !BlockOffset
    , forall blk. ParsedBlockInfo blk -> BlockSize
pbiBlockSize   :: !BlockSize
    , forall blk. ParsedBlockInfo blk -> BlockInfo blk
pbiBlockInfo   :: !(BlockInfo blk)
    , forall blk.
ParsedBlockInfo blk -> SomeSecond (NestedCtxt Header) blk
pbiNestedCtxt  :: !(SomeSecond (NestedCtxt Header) blk)
    }

-- | Parse the given file containing blocks.
--
-- Return the 'ParsedBlockInfo' for all the valid blocks in the file. Stop
-- when encountering an error and include the offset to truncate to.
parseBlockFile ::
     forall m blk h.
     ( IOLike m
     , GetPrevHash blk
     , HasBinaryBlockInfo blk
     , HasNestedContent Header blk
     , DecodeDisk blk (Lazy.ByteString -> blk)
     )
  => CodecConfig blk
  -> HasFS m h
  -> (blk -> Bool)
  -> BlockValidationPolicy
  -> FsPath
  -> m ( [ParsedBlockInfo blk]
       , Maybe (ParseError blk, BlockOffset)
       )
parseBlockFile :: forall (m :: * -> *) blk h.
(IOLike m, GetPrevHash blk, HasBinaryBlockInfo blk,
 HasNestedContent Header blk, DecodeDisk blk (ByteString -> blk)) =>
CodecConfig blk
-> HasFS m h
-> (blk -> Bool)
-> BlockValidationPolicy
-> FsPath
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
parseBlockFile CodecConfig blk
ccfg HasFS m h
hasFS blk -> Bool
isNotCorrupt BlockValidationPolicy
validationPolicy FsPath
fsPath =
    HasFS m h
-> (forall s. Decoder s (ByteString -> blk))
-> FsPath
-> (Stream
      (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
    -> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall (m :: * -> *) h a r.
(IOLike m, HasCallStack) =>
HasFS m h
-> (forall s. Decoder s (ByteString -> a))
-> FsPath
-> (Stream
      (Of (Word64, (Word64, a))) m (Maybe (ReadIncrementalErr, Word64))
    -> m r)
-> m r
withStreamIncrementalOffsets HasFS m h
hasFS (CodecConfig blk -> forall s. Decoder s (ByteString -> blk)
forall blk a.
DecodeDisk blk a =>
CodecConfig blk -> forall s. Decoder s a
decodeDisk CodecConfig blk
ccfg) FsPath
fsPath ((Stream
    (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
  -> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
 -> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> (Stream
      (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
    -> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall a b. (a -> b) -> a -> b
$
      [ParsedBlockInfo blk]
-> Stream
     (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries []
  where
    noValidation :: Bool
    noValidation :: Bool
noValidation = BlockValidationPolicy
validationPolicy BlockValidationPolicy -> BlockValidationPolicy -> Bool
forall a. Eq a => a -> a -> Bool
== BlockValidationPolicy
NoValidation

    checkEntries ::
         [ParsedBlockInfo blk]
      -> Stream (Of (Word64, (Word64, blk)))
                m
                (Maybe (ReadIncrementalErr, Word64))
      -> m ( [ParsedBlockInfo blk]
           , Maybe (ParseError blk, BlockOffset)
           )
    checkEntries :: [ParsedBlockInfo blk]
-> Stream
     (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries [ParsedBlockInfo blk]
parsed Stream
  (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream = Stream
  (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m (Either
        (Maybe (ReadIncrementalErr, Word64))
        ((Word64, (Word64, blk)),
         Stream
           (Of (Word64, (Word64, blk)))
           m
           (Maybe (ReadIncrementalErr, Word64))))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
S.next Stream
  (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream m (Either
     (Maybe (ReadIncrementalErr, Word64))
     ((Word64, (Word64, blk)),
      Stream
        (Of (Word64, (Word64, blk)))
        m
        (Maybe (ReadIncrementalErr, Word64))))
-> (Either
      (Maybe (ReadIncrementalErr, Word64))
      ((Word64, (Word64, blk)),
       Stream
         (Of (Word64, (Word64, blk)))
         m
         (Maybe (ReadIncrementalErr, Word64)))
    -> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left Maybe (ReadIncrementalErr, Word64)
mbErr
        -> ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. [a] -> [a]
reverse [ParsedBlockInfo blk]
parsed, (ReadIncrementalErr -> ParseError blk)
-> (Word64 -> BlockOffset)
-> (ReadIncrementalErr, Word64)
-> (ParseError blk, BlockOffset)
forall a b c d. (a -> b) -> (c -> d) -> (a, c) -> (b, d)
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap ReadIncrementalErr -> ParseError blk
forall blk. ReadIncrementalErr -> ParseError blk
BlockReadErr Word64 -> BlockOffset
BlockOffset ((ReadIncrementalErr, Word64) -> (ParseError blk, BlockOffset))
-> Maybe (ReadIncrementalErr, Word64)
-> Maybe (ParseError blk, BlockOffset)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ReadIncrementalErr, Word64)
mbErr)
      Right ((Word64
offset, (Word64
size, blk
blk)), Stream
  (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream')
        | Bool
noValidation Bool -> Bool -> Bool
|| blk -> Bool
isNotCorrupt blk
blk
        -> let !blockInfo :: BlockInfo blk
blockInfo = blk -> BlockInfo blk
forall blk.
(GetPrevHash blk, HasBinaryBlockInfo blk) =>
blk -> BlockInfo blk
extractBlockInfo blk
blk
               !newParsed :: ParsedBlockInfo blk
newParsed = ParsedBlockInfo  {
                   pbiBlockOffset :: BlockOffset
pbiBlockOffset = Word64 -> BlockOffset
BlockOffset Word64
offset
                 , pbiBlockSize :: BlockSize
pbiBlockSize   = Word32 -> BlockSize
BlockSize (Word32 -> BlockSize) -> Word32 -> BlockSize
forall a b. (a -> b) -> a -> b
$ Word64 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
size
                 , pbiBlockInfo :: BlockInfo blk
pbiBlockInfo   = BlockInfo blk
blockInfo
                 , pbiNestedCtxt :: SomeSecond (NestedCtxt Header) blk
pbiNestedCtxt  = case Header blk -> DepPair (NestedCtxt Header blk)
forall (f :: * -> *) blk.
HasNestedContent f blk =>
f blk -> DepPair (NestedCtxt f blk)
unnest (blk -> Header blk
forall blk. GetHeader blk => blk -> Header blk
getHeader blk
blk) of
                                      DepPair NestedCtxt Header blk a
nestedCtxt a
_ -> NestedCtxt Header blk a -> SomeSecond (NestedCtxt Header) blk
forall {k1} {k2} (f :: k1 -> k2 -> *) (a :: k1) (b :: k2).
f a b -> SomeSecond f a
SomeSecond NestedCtxt Header blk a
nestedCtxt
                 }
           in [ParsedBlockInfo blk]
-> Stream
     (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries (ParsedBlockInfo blk
newParsed ParsedBlockInfo blk
-> [ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. a -> [a] -> [a]
: [ParsedBlockInfo blk]
parsed) Stream
  (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream'
        | Bool
otherwise  -- The block was invalid
        -> let !hash :: HeaderHash blk
hash = blk -> HeaderHash blk
forall b. HasHeader b => b -> HeaderHash b
blockHash blk
blk
           in ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ( [ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. [a] -> [a]
reverse [ParsedBlockInfo blk]
parsed
                     , (ParseError blk, BlockOffset)
-> Maybe (ParseError blk, BlockOffset)
forall a. a -> Maybe a
Just (HeaderHash blk -> ParseError blk
forall blk. HeaderHash blk -> ParseError blk
BlockCorruptedErr HeaderHash blk
hash, Word64 -> BlockOffset
BlockOffset Word64
offset)
                     )

{-------------------------------------------------------------------------------
  Auxiliary
-------------------------------------------------------------------------------}

extractBlockInfo ::
     (GetPrevHash blk, HasBinaryBlockInfo blk)
  => blk
  -> BlockInfo blk
extractBlockInfo :: forall blk.
(GetPrevHash blk, HasBinaryBlockInfo blk) =>
blk -> BlockInfo blk
extractBlockInfo blk
blk = BlockInfo {
      biHash :: HeaderHash blk
biHash         = blk -> HeaderHash blk
forall b. HasHeader b => b -> HeaderHash b
blockHash     blk
blk
    , biSlotNo :: SlotNo
biSlotNo       = blk -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot     blk
blk
    , biBlockNo :: BlockNo
biBlockNo      = blk -> BlockNo
forall b. HasHeader b => b -> BlockNo
blockNo       blk
blk
    , biIsEBB :: IsEBB
biIsEBB        = blk -> IsEBB
forall blk. GetHeader blk => blk -> IsEBB
blockToIsEBB  blk
blk
    , biPrevHash :: ChainHash blk
biPrevHash     = blk -> ChainHash blk
forall blk. GetPrevHash blk => blk -> ChainHash blk
blockPrevHash blk
blk
    , biHeaderOffset :: Word16
biHeaderOffset = Word16
headerOffset
    , biHeaderSize :: Word16
biHeaderSize   = Word16
headerSize
    }
  where
    BinaryBlockInfo { Word16
headerOffset :: Word16
headerOffset :: BinaryBlockInfo -> Word16
headerOffset, Word16
headerSize :: Word16
headerSize :: BinaryBlockInfo -> Word16
headerSize } = blk -> BinaryBlockInfo
forall blk. HasBinaryBlockInfo blk => blk -> BinaryBlockInfo
getBinaryBlockInfo blk
blk