{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TupleSections #-}

module Simplex.Messaging.Server.MsgStore.Journal
  ( JournalMsgStore (random, expireBackupsBefore),
    QStore (..),
    QStoreCfg (..),
    JournalQueue (msgQueue'), -- msgQueue' is used in tests
    JournalMsgQueue (queue, state),
    JMQueue (queueDirectory, statePath),
    JournalStoreConfig (..),
    closeMsgQueue,
    closeMsgQueueHandles,
    -- below are exported for tests
    MsgQueueState (..),
    JournalState (..),
    SJournalType (..),
    msgQueueDirectory,
    msgQueueStatePath,
    readQueueState,
    newMsgQueueState,
    getJournalQueueMessages,
    newJournalId,
    appendState,
    queueLogFileName,
    journalFilePath,
    logFileExt,
    stmQueueStore,
#if defined(dbServerPostgres)
    postgresQueueStore,
#endif
  )
where

import Control.Concurrent.STM
import qualified Control.Exception as E
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Trans.Except
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Either (fromRight, partitionEithers)
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (sort)
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show, iso8601ParseM)
import GHC.IO (catchAny)
import Simplex.Messaging.Agent.Client (getMapLock)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore.Journal.SharedLock
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
#if defined(dbServerPostgres)
import Simplex.Messaging.Server.QueueStore.Postgres
#endif
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>))
import System.Directory
import System.FilePath (takeFileName, (</>))
import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..))
import qualified System.IO as IO
import System.Random (StdGen, genByteString, newStdGen)

data JournalMsgStore s = JournalMsgStore
  { forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config :: JournalStoreConfig s,
    forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random :: TVar StdGen,
    forall (s :: QSType). JournalMsgStore s -> TMap RecipientId Lock
queueLocks :: TMap RecipientId Lock,
    forall (s :: QSType). JournalMsgStore s -> TMVar RecipientId
sharedLock :: TMVar RecipientId,
    forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ :: QStore s,
    forall (s :: QSType). JournalMsgStore s -> TVar Int
openedQueueCount :: TVar Int,
    forall (s :: QSType). JournalMsgStore s -> UTCTime
expireBackupsBefore :: UTCTime
  }

data QStore (s :: QSType) where
  MQStore :: QStoreType 'QSMemory -> QStore 'QSMemory
#if defined(dbServerPostgres)
  PQStore :: QStoreType 'QSPostgres -> QStore 'QSPostgres
#endif

type family QStoreType s where
  QStoreType 'QSMemory = STMQueueStore (JournalQueue 'QSMemory)
#if defined(dbServerPostgres)
  QStoreType 'QSPostgres = PostgresQueueStore (JournalQueue 'QSPostgres)
#endif

withQS :: (QueueStoreClass (JournalQueue s) (QStoreType s) => QStoreType s -> r) -> QStore s -> r
withQS :: forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r
f = \case
  MQStore QStoreType 'QSMemory
st -> QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r
QStoreType s -> r
f QStoreType s
QStoreType 'QSMemory
st
#if defined(dbServerPostgres)
  PQStore st -> f st
#endif
{-# INLINE withQS #-}

stmQueueStore :: JournalMsgStore 'QSMemory -> STMQueueStore (JournalQueue 'QSMemory)
stmQueueStore :: JournalMsgStore 'QSMemory -> STMQueueStore (JournalQueue 'QSMemory)
stmQueueStore JournalMsgStore 'QSMemory
st = case JournalMsgStore 'QSMemory -> QStore 'QSMemory
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore 'QSMemory
st of
  MQStore QStoreType 'QSMemory
st' -> STMQueueStore (JournalQueue 'QSMemory)
QStoreType 'QSMemory
st'

#if defined(dbServerPostgres)
postgresQueueStore :: JournalMsgStore 'QSPostgres -> PostgresQueueStore (JournalQueue 'QSPostgres)
postgresQueueStore st = case queueStore_ st of
  PQStore st' -> st'
#endif

data JournalStoreConfig s = JournalStoreConfig
  { forall (s :: QSType). JournalStoreConfig s -> String
storePath :: FilePath,
    forall (s :: QSType). JournalStoreConfig s -> Int
pathParts :: Int,
    forall (s :: QSType). JournalStoreConfig s -> QStoreCfg s
queueStoreCfg :: QStoreCfg s,
    forall (s :: QSType). JournalStoreConfig s -> Int
quota :: Int,
    -- Max number of messages per journal file - ignored in STM store.
    -- When this limit is reached, the file will be changed.
    -- This number should be set bigger than queue quota.
    forall (s :: QSType). JournalStoreConfig s -> Int
maxMsgCount :: Int,
    forall (s :: QSType). JournalStoreConfig s -> Int
maxStateLines :: Int,
    forall (s :: QSType). JournalStoreConfig s -> Int
stateTailSize :: Int,
    -- time in seconds after which the queue will be closed after message expiration
    forall (s :: QSType). JournalStoreConfig s -> Int64
idleInterval :: Int64,
    -- expire state backup files
    forall (s :: QSType). JournalStoreConfig s -> NominalDiffTime
expireBackupsAfter :: NominalDiffTime,
    forall (s :: QSType). JournalStoreConfig s -> Int
keepMinBackups :: Int
  }

data QStoreCfg s where
  MQStoreCfg :: QStoreCfg 'QSMemory
#if defined(dbServerPostgres)
  PQStoreCfg :: PostgresStoreCfg -> QStoreCfg 'QSPostgres
#endif

data JournalQueue (s :: QSType) = JournalQueue
  { forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' :: RecipientId,
    forall (s :: QSType). JournalQueue s -> Lock
queueLock :: Lock,
    forall (s :: QSType). JournalQueue s -> TMVar RecipientId
sharedLock :: TMVar RecipientId,
    -- To avoid race conditions and errors when restoring queues,
    -- Nothing is written to TVar when queue is deleted.
    forall (s :: QSType). JournalQueue s -> TVar (Maybe QueueRec)
queueRec' :: TVar (Maybe QueueRec),
    forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' :: TVar (Maybe (JournalMsgQueue s)),
    -- system time in seconds since epoch
    forall (s :: QSType). JournalQueue s -> TVar Int64
activeAt :: TVar Int64,
    forall (s :: QSType). JournalQueue s -> TVar (Maybe QState)
queueState :: TVar (Maybe QState) -- Nothing - unknown
  }

data QState = QState
  { QState -> Bool
hasPending :: Bool,
    QState -> Bool
hasStored :: Bool
  }

data JMQueue = JMQueue
  { JMQueue -> String
queueDirectory :: FilePath,
    JMQueue -> String
statePath :: FilePath
  }

data JournalMsgQueue (s :: QSType) = JournalMsgQueue
  { forall (s :: QSType). JournalMsgQueue s -> JMQueue
queue :: JMQueue,
    forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state :: TVar MsgQueueState,
    -- tipMsg contains last message and length incl. newline
    -- Nothing - unknown, Just Nothing - empty queue.
    -- It  prevents reading each message twice,
    -- and reading it after it was just written.
    forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
tipMsg :: TVar (Maybe (Maybe (Message, Int64))),
    forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles :: TVar (Maybe MsgQueueHandles)
  }

data MsgQueueState = MsgQueueState
  { MsgQueueState -> JournalState 'JTRead
readState :: JournalState 'JTRead,
    MsgQueueState -> JournalState 'JTWrite
writeState :: JournalState 'JTWrite,
    MsgQueueState -> Bool
canWrite :: Bool,
    MsgQueueState -> Int
size :: Int
  }
  deriving (Int -> MsgQueueState -> ShowS
[MsgQueueState] -> ShowS
MsgQueueState -> String
(Int -> MsgQueueState -> ShowS)
-> (MsgQueueState -> String)
-> ([MsgQueueState] -> ShowS)
-> Show MsgQueueState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MsgQueueState -> ShowS
showsPrec :: Int -> MsgQueueState -> ShowS
$cshow :: MsgQueueState -> String
show :: MsgQueueState -> String
$cshowList :: [MsgQueueState] -> ShowS
showList :: [MsgQueueState] -> ShowS
Show)

data MsgQueueHandles = MsgQueueHandles
  { MsgQueueHandles -> Handle
stateHandle :: Handle, -- handle to queue state log file, rotates and removes old backups when server is restarted
    MsgQueueHandles -> Handle
readHandle :: Handle,
    MsgQueueHandles -> Maybe Handle
writeHandle :: Maybe Handle -- optional, used when write file is different from read file
  }

data JournalState t = JournalState
  { forall (t :: JournalType). JournalState t -> SJournalType t
journalType :: SJournalType t,
    forall (t :: JournalType). JournalState t -> ByteString
journalId :: ByteString,
    forall (t :: JournalType). JournalState t -> Int
msgPos :: Int,
    forall (t :: JournalType). JournalState t -> Int
msgCount :: Int,
    forall (t :: JournalType). JournalState t -> Int64
bytePos :: Int64,
    forall (t :: JournalType). JournalState t -> Int64
byteCount :: Int64
  }
  deriving (Int -> JournalState t -> ShowS
[JournalState t] -> ShowS
JournalState t -> String
(Int -> JournalState t -> ShowS)
-> (JournalState t -> String)
-> ([JournalState t] -> ShowS)
-> Show (JournalState t)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall (t :: JournalType). Int -> JournalState t -> ShowS
forall (t :: JournalType). [JournalState t] -> ShowS
forall (t :: JournalType). JournalState t -> String
$cshowsPrec :: forall (t :: JournalType). Int -> JournalState t -> ShowS
showsPrec :: Int -> JournalState t -> ShowS
$cshow :: forall (t :: JournalType). JournalState t -> String
show :: JournalState t -> String
$cshowList :: forall (t :: JournalType). [JournalState t] -> ShowS
showList :: [JournalState t] -> ShowS
Show)

qState :: MsgQueueState -> QState
qState :: MsgQueueState -> QState
qState MsgQueueState {Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size, $sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, $sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws} =
  let hasPending :: Bool
hasPending = Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
   in QState {Bool
$sel:hasPending:QState :: Bool
hasPending :: Bool
hasPending, $sel:hasStored:QState :: Bool
hasStored = Bool
hasPending Bool -> Bool -> Bool
|| JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
|| JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTWrite
ws Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0}
{-# INLINE qState #-}

data JournalType = JTRead | JTWrite

data SJournalType (t :: JournalType) where
  SJTRead :: SJournalType 'JTRead
  SJTWrite :: SJournalType 'JTWrite

class JournalTypeI t where sJournalType :: SJournalType t

instance JournalTypeI 'JTRead where sJournalType :: SJournalType 'JTRead
sJournalType = SJournalType 'JTRead
SJTRead

instance JournalTypeI 'JTWrite where sJournalType :: SJournalType 'JTWrite
sJournalType = SJournalType 'JTWrite
SJTWrite

deriving instance Show (SJournalType t)

newMsgQueueState :: ByteString -> MsgQueueState
newMsgQueueState :: ByteString -> MsgQueueState
newMsgQueueState ByteString
journalId =
  MsgQueueState
    { $sel:writeState:MsgQueueState :: JournalState 'JTWrite
writeState = ByteString -> JournalState 'JTWrite
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
journalId,
      $sel:readState:MsgQueueState :: JournalState 'JTRead
readState = ByteString -> JournalState 'JTRead
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
journalId,
      $sel:canWrite:MsgQueueState :: Bool
canWrite = Bool
True,
      $sel:size:MsgQueueState :: Int
size = Int
0
    }

newJournalState :: JournalTypeI t => ByteString -> JournalState t
newJournalState :: forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
journalId = SJournalType t
-> ByteString -> Int -> Int -> Int64 -> Int64 -> JournalState t
forall (t :: JournalType).
SJournalType t
-> ByteString -> Int -> Int -> Int64 -> Int64 -> JournalState t
JournalState SJournalType t
forall (t :: JournalType). JournalTypeI t => SJournalType t
sJournalType ByteString
journalId Int
0 Int
0 Int64
0 Int64
0

journalFilePath :: FilePath -> ByteString -> FilePath
journalFilePath :: String -> ByteString -> String
journalFilePath String
dir ByteString
journalId = String
dir String -> ShowS
</> (String
msgLogFileName String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
B.unpack ByteString
journalId String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
logFileExt)

instance StrEncoding MsgQueueState where
  strEncode :: MsgQueueState -> ByteString
strEncode MsgQueueState {JournalState 'JTWrite
$sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState :: JournalState 'JTWrite
writeState, JournalState 'JTRead
$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState :: JournalState 'JTRead
readState, Bool
$sel:canWrite:MsgQueueState :: MsgQueueState -> Bool
canWrite :: Bool
canWrite, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size} =
    [ByteString] -> ByteString
B.unwords
      [ ByteString
"write=" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> JournalState 'JTWrite -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode JournalState 'JTWrite
writeState,
        ByteString
"read=" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> JournalState 'JTRead -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode JournalState 'JTRead
readState,
        ByteString
"canWrite=" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Bool -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode Bool
canWrite,
        ByteString
"size=" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Int -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode Int
size
      ]
  strP :: Parser MsgQueueState
strP = do
    JournalState 'JTWrite
writeState <- Parser ByteString ByteString
"write=" Parser ByteString ByteString
-> Parser ByteString (JournalState 'JTWrite)
-> Parser ByteString (JournalState 'JTWrite)
forall a b.
Parser ByteString a -> Parser ByteString b -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Parser ByteString (JournalState 'JTWrite)
forall a. StrEncoding a => Parser a
strP
    JournalState 'JTRead
readState <- Parser ByteString ByteString
" read=" Parser ByteString ByteString
-> Parser ByteString (JournalState 'JTRead)
-> Parser ByteString (JournalState 'JTRead)
forall a b.
Parser ByteString a -> Parser ByteString b -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Parser ByteString (JournalState 'JTRead)
forall a. StrEncoding a => Parser a
strP
    Bool
canWrite <- Parser ByteString ByteString
" canWrite=" Parser ByteString ByteString
-> Parser ByteString Bool -> Parser ByteString Bool
forall a b.
Parser ByteString a -> Parser ByteString b -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Parser ByteString Bool
forall a. StrEncoding a => Parser a
strP
    Int
size <- Parser ByteString ByteString
" size=" Parser ByteString ByteString
-> Parser ByteString Int -> Parser ByteString Int
forall a b.
Parser ByteString a -> Parser ByteString b -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Parser ByteString Int
forall a. StrEncoding a => Parser a
strP
    MsgQueueState -> Parser MsgQueueState
forall a. a -> Parser ByteString a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MsgQueueState {JournalState 'JTWrite
$sel:writeState:MsgQueueState :: JournalState 'JTWrite
writeState :: JournalState 'JTWrite
writeState, JournalState 'JTRead
$sel:readState:MsgQueueState :: JournalState 'JTRead
readState :: JournalState 'JTRead
readState, Bool
$sel:canWrite:MsgQueueState :: Bool
canWrite :: Bool
canWrite, Int
$sel:size:MsgQueueState :: Int
size :: Int
size}

instance JournalTypeI t => StrEncoding (JournalState t) where
  strEncode :: JournalState t -> ByteString
strEncode JournalState {ByteString
$sel:journalId:JournalState :: forall (t :: JournalType). JournalState t -> ByteString
journalId :: ByteString
journalId, Int
$sel:msgPos:JournalState :: forall (t :: JournalType). JournalState t -> Int
msgPos :: Int
msgPos, Int
$sel:msgCount:JournalState :: forall (t :: JournalType). JournalState t -> Int
msgCount :: Int
msgCount, Int64
$sel:bytePos:JournalState :: forall (t :: JournalType). JournalState t -> Int64
bytePos :: Int64
bytePos, Int64
$sel:byteCount:JournalState :: forall (t :: JournalType). JournalState t -> Int64
byteCount :: Int64
byteCount} =
    ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
"," [ByteString
journalId, Int -> ByteString
forall a. StrEncoding a => a -> ByteString
e Int
msgPos, Int -> ByteString
forall a. StrEncoding a => a -> ByteString
e Int
msgCount, Int64 -> ByteString
forall a. StrEncoding a => a -> ByteString
e Int64
bytePos, Int64 -> ByteString
forall a. StrEncoding a => a -> ByteString
e Int64
byteCount]
    where
      e :: StrEncoding a => a -> ByteString
      e :: forall a. StrEncoding a => a -> ByteString
e = a -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode
  strP :: Parser (JournalState t)
strP = do
    ByteString
journalId <- (Char -> Bool) -> Parser ByteString ByteString
A.takeTill (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
',')
    SJournalType t
-> ByteString -> Int -> Int -> Int64 -> Int64 -> JournalState t
forall (t :: JournalType).
SJournalType t
-> ByteString -> Int -> Int -> Int64 -> Int64 -> JournalState t
JournalState SJournalType t
forall (t :: JournalType). JournalTypeI t => SJournalType t
sJournalType ByteString
journalId (Int -> Int -> Int64 -> Int64 -> JournalState t)
-> Parser ByteString Int
-> Parser ByteString (Int -> Int64 -> Int64 -> JournalState t)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser ByteString Int
forall a. Integral a => Parser a
i Parser ByteString (Int -> Int64 -> Int64 -> JournalState t)
-> Parser ByteString Int
-> Parser ByteString (Int64 -> Int64 -> JournalState t)
forall a b.
Parser ByteString (a -> b)
-> Parser ByteString a -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser ByteString Int
forall a. Integral a => Parser a
i Parser ByteString (Int64 -> Int64 -> JournalState t)
-> Parser ByteString Int64
-> Parser ByteString (Int64 -> JournalState t)
forall a b.
Parser ByteString (a -> b)
-> Parser ByteString a -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser ByteString Int64
forall a. Integral a => Parser a
i Parser ByteString (Int64 -> JournalState t)
-> Parser ByteString Int64 -> Parser (JournalState t)
forall a b.
Parser ByteString (a -> b)
-> Parser ByteString a -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser ByteString Int64
forall a. Integral a => Parser a
i
    where
      i :: Integral a => A.Parser a
      i :: forall a. Integral a => Parser a
i = Char -> Parser Char
A.char Char
',' Parser Char -> Parser ByteString a -> Parser ByteString a
forall a b.
Parser ByteString a -> Parser ByteString b -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Parser ByteString a
forall a. Integral a => Parser a
A.decimal

queueLogFileName :: String
queueLogFileName :: String
queueLogFileName = String
"queue_state"

msgLogFileName :: String
msgLogFileName :: String
msgLogFileName = String
"messages"

logFileExt :: String
logFileExt :: String
logFileExt = String
".log"

newtype StoreIO (s :: QSType) a = StoreIO {forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO :: IO a}
  deriving newtype ((forall a b. (a -> b) -> StoreIO s a -> StoreIO s b)
-> (forall a b. a -> StoreIO s b -> StoreIO s a)
-> Functor (StoreIO s)
forall a b. a -> StoreIO s b -> StoreIO s a
forall a b. (a -> b) -> StoreIO s a -> StoreIO s b
forall (s :: QSType) a b. a -> StoreIO s b -> StoreIO s a
forall (s :: QSType) a b. (a -> b) -> StoreIO s a -> StoreIO s b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall (s :: QSType) a b. (a -> b) -> StoreIO s a -> StoreIO s b
fmap :: forall a b. (a -> b) -> StoreIO s a -> StoreIO s b
$c<$ :: forall (s :: QSType) a b. a -> StoreIO s b -> StoreIO s a
<$ :: forall a b. a -> StoreIO s b -> StoreIO s a
Functor, Functor (StoreIO s)
Functor (StoreIO s) =>
(forall a. a -> StoreIO s a)
-> (forall a b. StoreIO s (a -> b) -> StoreIO s a -> StoreIO s b)
-> (forall a b c.
    (a -> b -> c) -> StoreIO s a -> StoreIO s b -> StoreIO s c)
-> (forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b)
-> (forall a b. StoreIO s a -> StoreIO s b -> StoreIO s a)
-> Applicative (StoreIO s)
forall a. a -> StoreIO s a
forall a b. StoreIO s a -> StoreIO s b -> StoreIO s a
forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b
forall a b. StoreIO s (a -> b) -> StoreIO s a -> StoreIO s b
forall a b c.
(a -> b -> c) -> StoreIO s a -> StoreIO s b -> StoreIO s c
forall (s :: QSType). Functor (StoreIO s)
forall (s :: QSType) a. a -> StoreIO s a
forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s a
forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s b
forall (s :: QSType) a b.
StoreIO s (a -> b) -> StoreIO s a -> StoreIO s b
forall (s :: QSType) a b c.
(a -> b -> c) -> StoreIO s a -> StoreIO s b -> StoreIO s c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall (s :: QSType) a. a -> StoreIO s a
pure :: forall a. a -> StoreIO s a
$c<*> :: forall (s :: QSType) a b.
StoreIO s (a -> b) -> StoreIO s a -> StoreIO s b
<*> :: forall a b. StoreIO s (a -> b) -> StoreIO s a -> StoreIO s b
$cliftA2 :: forall (s :: QSType) a b c.
(a -> b -> c) -> StoreIO s a -> StoreIO s b -> StoreIO s c
liftA2 :: forall a b c.
(a -> b -> c) -> StoreIO s a -> StoreIO s b -> StoreIO s c
$c*> :: forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s b
*> :: forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b
$c<* :: forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s a
<* :: forall a b. StoreIO s a -> StoreIO s b -> StoreIO s a
Applicative, Applicative (StoreIO s)
Applicative (StoreIO s) =>
(forall a b. StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b)
-> (forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b)
-> (forall a. a -> StoreIO s a)
-> Monad (StoreIO s)
forall a. a -> StoreIO s a
forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b
forall a b. StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b
forall (s :: QSType). Applicative (StoreIO s)
forall (s :: QSType) a. a -> StoreIO s a
forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s b
forall (s :: QSType) a b.
StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall (s :: QSType) a b.
StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b
>>= :: forall a b. StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b
$c>> :: forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s b
>> :: forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b
$creturn :: forall (s :: QSType) a. a -> StoreIO s a
return :: forall a. a -> StoreIO s a
Monad)

instance StoreQueueClass (JournalQueue s) where
  recipientId :: JournalQueue s -> RecipientId
recipientId = JournalQueue s -> RecipientId
forall (s :: QSType). JournalQueue s -> RecipientId
recipientId'
  {-# INLINE recipientId #-}
  queueRec :: JournalQueue s -> TVar (Maybe QueueRec)
queueRec = JournalQueue s -> TVar (Maybe QueueRec)
forall (s :: QSType). JournalQueue s -> TVar (Maybe QueueRec)
queueRec'
  {-# INLINE queueRec #-}
  withQueueLock :: JournalQueue s -> Text -> IO a -> IO a
  withQueueLock :: forall a. JournalQueue s -> Text -> IO a -> IO a
withQueueLock JournalQueue {RecipientId
$sel:recipientId':JournalQueue :: forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' :: RecipientId
recipientId', Lock
$sel:queueLock:JournalQueue :: forall (s :: QSType). JournalQueue s -> Lock
queueLock :: Lock
queueLock, TMVar RecipientId
$sel:sharedLock:JournalQueue :: forall (s :: QSType). JournalQueue s -> TMVar RecipientId
sharedLock :: TMVar RecipientId
sharedLock} =
    RecipientId -> Lock -> TMVar RecipientId -> Text -> IO a -> IO a
forall a.
RecipientId -> Lock -> TMVar RecipientId -> Text -> IO a -> IO a
withLockWaitShared RecipientId
recipientId' Lock
queueLock TMVar RecipientId
sharedLock
  {-# INLINE withQueueLock #-}

instance QueueStoreClass (JournalQueue s) (QStore s) where
  type QueueStoreCfg (QStore s) = QStoreCfg s

  newQueueStore :: QStoreCfg s -> IO (QStore s)
  newQueueStore :: QStoreCfg s -> IO (QStore s)
newQueueStore = \case
    QStoreCfg s
MQStoreCfg -> STMQueueStore (JournalQueue 'QSMemory) -> QStore s
QStoreType 'QSMemory -> QStore 'QSMemory
MQStore (STMQueueStore (JournalQueue 'QSMemory) -> QStore s)
-> IO (STMQueueStore (JournalQueue 'QSMemory)) -> IO (QStore s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall q s. QueueStoreClass q s => QueueStoreCfg s -> IO s
newQueueStore @(JournalQueue s) ()
#if defined(dbServerPostgres)
    PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) (cfg, True)
#endif

  closeQueueStore :: QStore s -> IO ()
closeQueueStore = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> IO ())
-> QStore s -> IO ()
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS (forall q s. QueueStoreClass q s => s -> IO ()
closeQueueStore @(JournalQueue s))
  {-# INLINE closeQueueStore #-}
  loadedQueues :: QStore s -> TMap RecipientId (JournalQueue s)
loadedQueues = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> TMap RecipientId (JournalQueue s))
-> QStore s -> TMap RecipientId (JournalQueue s)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> TMap RecipientId (JournalQueue s)
QStoreType s -> TMap RecipientId (JournalQueue s)
forall q s. QueueStoreClass q s => s -> TMap RecipientId q
loadedQueues
  {-# INLINE loadedQueues #-}
  compactQueues :: QStore s -> IO Int64
compactQueues = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> IO Int64)
-> QStore s -> IO Int64
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS (forall q s. QueueStoreClass q s => s -> IO Int64
compactQueues @(JournalQueue s))
  {-# INLINE compactQueues #-}
  getEntityCounts :: QStore s -> IO EntityCounts
getEntityCounts = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> IO EntityCounts)
-> QStore s -> IO EntityCounts
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS (forall q s. QueueStoreClass q s => s -> IO EntityCounts
getEntityCounts @(JournalQueue s))
  {-# INLINE getEntityCounts #-}
  addQueue_ :: QStore s
-> (RecipientId -> QueueRec -> IO (JournalQueue s))
-> RecipientId
-> QueueRec
-> IO (Either ErrorType (JournalQueue s))
addQueue_ = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> (RecipientId -> QueueRec -> IO (JournalQueue s))
 -> RecipientId
 -> QueueRec
 -> IO (Either ErrorType (JournalQueue s)))
-> QStore s
-> (RecipientId -> QueueRec -> IO (JournalQueue s))
-> RecipientId
-> QueueRec
-> IO (Either ErrorType (JournalQueue s))
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> (RecipientId -> QueueRec -> IO (JournalQueue s))
-> RecipientId
-> QueueRec
-> IO (Either ErrorType (JournalQueue s))
QStoreType s
-> (RecipientId -> QueueRec -> IO (JournalQueue s))
-> RecipientId
-> QueueRec
-> IO (Either ErrorType (JournalQueue s))
forall q s.
QueueStoreClass q s =>
s
-> (RecipientId -> QueueRec -> IO q)
-> RecipientId
-> QueueRec
-> IO (Either ErrorType q)
addQueue_
  {-# INLINE addQueue_ #-}
  getQueue_ :: forall (p :: Party).
QueueParty p =>
QStore s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (JournalQueue s))
getQueue_ = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
 -> SParty p
 -> RecipientId
 -> IO (Either ErrorType (JournalQueue s)))
-> QStore s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (JournalQueue s))
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (JournalQueue s))
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (JournalQueue s))
forall q s (p :: Party).
(QueueStoreClass q s, QueueParty p) =>
s
-> (Bool -> RecipientId -> QueueRec -> IO q)
-> SParty p
-> RecipientId
-> IO (Either ErrorType q)
forall (p :: Party).
QueueParty p =>
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (JournalQueue s))
getQueue_
  {-# INLINE getQueue_ #-}
  getQueues_ :: forall (p :: Party).
BatchParty p =>
QStore s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (JournalQueue s)]
getQueues_ = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
 -> SParty p
 -> [RecipientId]
 -> IO [Either ErrorType (JournalQueue s)])
-> QStore s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (JournalQueue s)]
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (JournalQueue s)]
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (JournalQueue s)]
forall q s (p :: Party).
(QueueStoreClass q s, BatchParty p) =>
s
-> (Bool -> RecipientId -> QueueRec -> IO q)
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType q]
forall (p :: Party).
BatchParty p =>
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (JournalQueue s)]
getQueues_
  {-# INLINE getQueues_ #-}
  addQueueLinkData :: QStore s
-> JournalQueue s
-> RecipientId
-> QueueLinkData
-> IO (Either ErrorType ())
addQueueLinkData = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> JournalQueue s
 -> RecipientId
 -> QueueLinkData
 -> IO (Either ErrorType ()))
-> QStore s
-> JournalQueue s
-> RecipientId
-> QueueLinkData
-> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> RecipientId
-> QueueLinkData
-> IO (Either ErrorType ())
QStoreType s
-> JournalQueue s
-> RecipientId
-> QueueLinkData
-> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> RecipientId -> QueueLinkData -> IO (Either ErrorType ())
addQueueLinkData
  {-# INLINE addQueueLinkData #-}
  getQueueLinkData :: QStore s
-> JournalQueue s
-> RecipientId
-> IO (Either ErrorType QueueLinkData)
getQueueLinkData = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> JournalQueue s
 -> RecipientId
 -> IO (Either ErrorType QueueLinkData))
-> QStore s
-> JournalQueue s
-> RecipientId
-> IO (Either ErrorType QueueLinkData)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> RecipientId
-> IO (Either ErrorType QueueLinkData)
QStoreType s
-> JournalQueue s
-> RecipientId
-> IO (Either ErrorType QueueLinkData)
forall q s.
QueueStoreClass q s =>
s -> q -> RecipientId -> IO (Either ErrorType QueueLinkData)
getQueueLinkData
  {-# INLINE getQueueLinkData #-}
  deleteQueueLinkData :: QStore s -> JournalQueue s -> IO (Either ErrorType ())
deleteQueueLinkData = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> JournalQueue s -> IO (Either ErrorType ()))
-> QStore s -> JournalQueue s -> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType ())
deleteQueueLinkData
  {-# INLINE deleteQueueLinkData #-}
  secureQueue :: QStore s
-> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()))
-> QStore s
-> JournalQueue s
-> SndPublicAuthKey
-> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
QStoreType s
-> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue
  {-# INLINE secureQueue #-}
  updateKeys :: QStore s
-> JournalQueue s
-> NonEmpty SndPublicAuthKey
-> IO (Either ErrorType ())
updateKeys = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> JournalQueue s
 -> NonEmpty SndPublicAuthKey
 -> IO (Either ErrorType ()))
-> QStore s
-> JournalQueue s
-> NonEmpty SndPublicAuthKey
-> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> NonEmpty SndPublicAuthKey
-> IO (Either ErrorType ())
QStoreType s
-> JournalQueue s
-> NonEmpty SndPublicAuthKey
-> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> NonEmpty SndPublicAuthKey -> IO (Either ErrorType ())
updateKeys
  {-# INLINE updateKeys #-}
  addQueueNotifier :: QStore s
-> JournalQueue s
-> NtfCreds
-> IO (Either ErrorType (Maybe NtfCreds))
addQueueNotifier = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> JournalQueue s
 -> NtfCreds
 -> IO (Either ErrorType (Maybe NtfCreds)))
-> QStore s
-> JournalQueue s
-> NtfCreds
-> IO (Either ErrorType (Maybe NtfCreds))
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> NtfCreds
-> IO (Either ErrorType (Maybe NtfCreds))
QStoreType s
-> JournalQueue s
-> NtfCreds
-> IO (Either ErrorType (Maybe NtfCreds))
forall q s.
QueueStoreClass q s =>
s -> q -> NtfCreds -> IO (Either ErrorType (Maybe NtfCreds))
addQueueNotifier
  {-# INLINE addQueueNotifier #-}
  deleteQueueNotifier :: QStore s
-> JournalQueue s -> IO (Either ErrorType (Maybe NtfCreds))
deleteQueueNotifier = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> JournalQueue s -> IO (Either ErrorType (Maybe NtfCreds)))
-> QStore s
-> JournalQueue s
-> IO (Either ErrorType (Maybe NtfCreds))
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> IO (Either ErrorType (Maybe NtfCreds))
QStoreType s
-> JournalQueue s -> IO (Either ErrorType (Maybe NtfCreds))
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType (Maybe NtfCreds))
deleteQueueNotifier
  {-# INLINE deleteQueueNotifier #-}
  suspendQueue :: QStore s -> JournalQueue s -> IO (Either ErrorType ())
suspendQueue = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> JournalQueue s -> IO (Either ErrorType ()))
-> QStore s -> JournalQueue s -> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType ())
suspendQueue
  {-# INLINE suspendQueue #-}
  blockQueue :: QStore s
-> JournalQueue s -> BlockingInfo -> IO (Either ErrorType ())
blockQueue = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> JournalQueue s -> BlockingInfo -> IO (Either ErrorType ()))
-> QStore s
-> JournalQueue s
-> BlockingInfo
-> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> BlockingInfo -> IO (Either ErrorType ())
QStoreType s
-> JournalQueue s -> BlockingInfo -> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> BlockingInfo -> IO (Either ErrorType ())
blockQueue
  {-# INLINE blockQueue #-}
  unblockQueue :: QStore s -> JournalQueue s -> IO (Either ErrorType ())
unblockQueue = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> JournalQueue s -> IO (Either ErrorType ()))
-> QStore s -> JournalQueue s -> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType ())
unblockQueue
  {-# INLINE unblockQueue #-}
  updateQueueTime :: QStore s
-> JournalQueue s -> SystemDate -> IO (Either ErrorType QueueRec)
updateQueueTime = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> JournalQueue s -> SystemDate -> IO (Either ErrorType QueueRec))
-> QStore s
-> JournalQueue s
-> SystemDate
-> IO (Either ErrorType QueueRec)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> SystemDate -> IO (Either ErrorType QueueRec)
QStoreType s
-> JournalQueue s -> SystemDate -> IO (Either ErrorType QueueRec)
forall q s.
QueueStoreClass q s =>
s -> q -> SystemDate -> IO (Either ErrorType QueueRec)
updateQueueTime
  {-# INLINE updateQueueTime #-}
  deleteStoreQueue :: QStore s -> JournalQueue s -> IO (Either ErrorType QueueRec)
deleteStoreQueue = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> JournalQueue s -> IO (Either ErrorType QueueRec))
-> QStore s -> JournalQueue s -> IO (Either ErrorType QueueRec)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType QueueRec)
QStoreType s -> JournalQueue s -> IO (Either ErrorType QueueRec)
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType QueueRec)
deleteStoreQueue
  {-# INLINE deleteStoreQueue #-}
  getCreateService :: QStore s -> ServiceRec -> IO (Either ErrorType RecipientId)
getCreateService = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> ServiceRec -> IO (Either ErrorType RecipientId))
-> QStore s -> ServiceRec -> IO (Either ErrorType RecipientId)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS (forall q s.
QueueStoreClass q s =>
s -> ServiceRec -> IO (Either ErrorType RecipientId)
getCreateService @(JournalQueue s))
  {-# INLINE getCreateService #-}
  setQueueService :: forall (p :: Party).
(PartyI p, ServiceParty p) =>
QStore s
-> JournalQueue s
-> SParty p
-> Maybe RecipientId
-> IO (Either ErrorType ())
setQueueService = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> JournalQueue s
 -> SParty p
 -> Maybe RecipientId
 -> IO (Either ErrorType ()))
-> QStore s
-> JournalQueue s
-> SParty p
-> Maybe RecipientId
-> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> SParty p
-> Maybe RecipientId
-> IO (Either ErrorType ())
QStoreType s
-> JournalQueue s
-> SParty p
-> Maybe RecipientId
-> IO (Either ErrorType ())
forall q s (p :: Party).
(QueueStoreClass q s, PartyI p, ServiceParty p) =>
s -> q -> SParty p -> Maybe RecipientId -> IO (Either ErrorType ())
forall (p :: Party).
(PartyI p, ServiceParty p) =>
QStoreType s
-> JournalQueue s
-> SParty p
-> Maybe RecipientId
-> IO (Either ErrorType ())
setQueueService
  {-# INLINE setQueueService #-}
  getQueueNtfServices :: forall a.
QStore s
-> [(RecipientId, a)]
-> IO
     (Either
        ErrorType
        ([(Maybe RecipientId, [(RecipientId, a)])], [(RecipientId, a)]))
getQueueNtfServices = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> [(RecipientId, a)]
 -> IO
      (Either
         ErrorType
         ([(Maybe RecipientId, [(RecipientId, a)])], [(RecipientId, a)])))
-> QStore s
-> [(RecipientId, a)]
-> IO
     (Either
        ErrorType
        ([(Maybe RecipientId, [(RecipientId, a)])], [(RecipientId, a)]))
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS (forall q s a.
QueueStoreClass q s =>
s
-> [(RecipientId, a)]
-> IO
     (Either
        ErrorType
        ([(Maybe RecipientId, [(RecipientId, a)])], [(RecipientId, a)]))
getQueueNtfServices @(JournalQueue s))
  {-# INLINE getQueueNtfServices #-}
  getServiceQueueCount :: forall (p :: Party).
(PartyI p, ServiceParty p) =>
QStore s -> SParty p -> RecipientId -> IO (Either ErrorType Int64)
getServiceQueueCount = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s
 -> SParty p -> RecipientId -> IO (Either ErrorType Int64))
-> QStore s
-> SParty p
-> RecipientId
-> IO (Either ErrorType Int64)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS (forall q s (p :: Party).
(QueueStoreClass q s, PartyI p, ServiceParty p) =>
s -> SParty p -> RecipientId -> IO (Either ErrorType Int64)
getServiceQueueCount @(JournalQueue s))
  {-# INLINE getServiceQueueCount #-}

makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
makeQueue_ :: forall (s :: QSType).
JournalMsgStore s
-> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
makeQueue_ JournalMsgStore {TMVar RecipientId
$sel:sharedLock:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> TMVar RecipientId
sharedLock :: TMVar RecipientId
sharedLock} RecipientId
rId QueueRec
qr Lock
queueLock = do
  TVar (Maybe QueueRec)
queueRec' <- Maybe QueueRec -> IO (TVar (Maybe QueueRec))
forall a. a -> IO (TVar a)
newTVarIO (Maybe QueueRec -> IO (TVar (Maybe QueueRec)))
-> Maybe QueueRec -> IO (TVar (Maybe QueueRec))
forall a b. (a -> b) -> a -> b
$ QueueRec -> Maybe QueueRec
forall a. a -> Maybe a
Just QueueRec
qr
  TVar (Maybe (JournalMsgQueue s))
msgQueue' <- Maybe (JournalMsgQueue s) -> IO (TVar (Maybe (JournalMsgQueue s)))
forall a. a -> IO (TVar a)
newTVarIO Maybe (JournalMsgQueue s)
forall a. Maybe a
Nothing
  TVar Int64
activeAt <- Int64 -> IO (TVar Int64)
forall a. a -> IO (TVar a)
newTVarIO Int64
0
  TVar (Maybe QState)
queueState <- Maybe QState -> IO (TVar (Maybe QState))
forall a. a -> IO (TVar a)
newTVarIO Maybe QState
forall a. Maybe a
Nothing
  JournalQueue s -> IO (JournalQueue s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (JournalQueue s -> IO (JournalQueue s))
-> JournalQueue s -> IO (JournalQueue s)
forall a b. (a -> b) -> a -> b
$
    JournalQueue
      { $sel:recipientId':JournalQueue :: RecipientId
recipientId' = RecipientId
rId,
        Lock
$sel:queueLock:JournalQueue :: Lock
queueLock :: Lock
queueLock,
        TMVar RecipientId
$sel:sharedLock:JournalQueue :: TMVar RecipientId
sharedLock :: TMVar RecipientId
sharedLock,
        TVar (Maybe QueueRec)
$sel:queueRec':JournalQueue :: TVar (Maybe QueueRec)
queueRec' :: TVar (Maybe QueueRec)
queueRec',
        TVar (Maybe (JournalMsgQueue s))
$sel:msgQueue':JournalQueue :: TVar (Maybe (JournalMsgQueue s))
msgQueue' :: TVar (Maybe (JournalMsgQueue s))
msgQueue',
        TVar Int64
$sel:activeAt:JournalQueue :: TVar Int64
activeAt :: TVar Int64
activeAt,
        TVar (Maybe QState)
$sel:queueState:JournalQueue :: TVar (Maybe QState)
queueState :: TVar (Maybe QState)
queueState
      }

instance MsgStoreClass (JournalMsgStore s) where
  type StoreMonad (JournalMsgStore s) = StoreIO s
  type MsgQueue (JournalMsgStore s) = JournalMsgQueue s
  type QueueStore (JournalMsgStore s) = QStore s
  type StoreQueue (JournalMsgStore s) = JournalQueue s
  type MsgStoreConfig (JournalMsgStore s) = JournalStoreConfig s

  newMsgStore :: JournalStoreConfig s -> IO (JournalMsgStore s)
  newMsgStore :: JournalStoreConfig s -> IO (JournalMsgStore s)
newMsgStore config :: JournalStoreConfig s
config@JournalStoreConfig {QStoreCfg s
$sel:queueStoreCfg:JournalStoreConfig :: forall (s :: QSType). JournalStoreConfig s -> QStoreCfg s
queueStoreCfg :: QStoreCfg s
queueStoreCfg} = do
    TVar StdGen
random <- StdGen -> IO (TVar StdGen)
forall a. a -> IO (TVar a)
newTVarIO (StdGen -> IO (TVar StdGen)) -> IO StdGen -> IO (TVar StdGen)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO StdGen
forall (m :: * -> *). MonadIO m => m StdGen
newStdGen
    TMap RecipientId Lock
queueLocks <- IO (TMap RecipientId Lock)
forall k a. IO (TMap k a)
TM.emptyIO
    TMVar RecipientId
sharedLock <- IO (TMVar RecipientId)
forall a. IO (TMVar a)
newEmptyTMVarIO
    QStore s
queueStore_ <- forall q s. QueueStoreClass q s => QueueStoreCfg s -> IO s
newQueueStore @(JournalQueue s) QueueStoreCfg (QStore s)
QStoreCfg s
queueStoreCfg
    TVar Int
openedQueueCount <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
    UTCTime
expireBackupsBefore <- NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (- JournalStoreConfig s -> NominalDiffTime
forall (s :: QSType). JournalStoreConfig s -> NominalDiffTime
expireBackupsAfter JournalStoreConfig s
config) (UTCTime -> UTCTime) -> IO UTCTime -> IO UTCTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UTCTime
getCurrentTime
    JournalMsgStore s -> IO (JournalMsgStore s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JournalMsgStore {JournalStoreConfig s
$sel:config:JournalMsgStore :: JournalStoreConfig s
config :: JournalStoreConfig s
config, TVar StdGen
$sel:random:JournalMsgStore :: TVar StdGen
random :: TVar StdGen
random, TMap RecipientId Lock
$sel:queueLocks:JournalMsgStore :: TMap RecipientId Lock
queueLocks :: TMap RecipientId Lock
queueLocks, TMVar RecipientId
$sel:sharedLock:JournalMsgStore :: TMVar RecipientId
sharedLock :: TMVar RecipientId
sharedLock, QStore s
$sel:queueStore_:JournalMsgStore :: QStore s
queueStore_ :: QStore s
queueStore_, TVar Int
$sel:openedQueueCount:JournalMsgStore :: TVar Int
openedQueueCount :: TVar Int
openedQueueCount, UTCTime
$sel:expireBackupsBefore:JournalMsgStore :: UTCTime
expireBackupsBefore :: UTCTime
expireBackupsBefore}

  closeMsgStore :: JournalMsgStore s -> IO ()
  closeMsgStore :: JournalMsgStore s -> IO ()
closeMsgStore JournalMsgStore s
ms = do
    let st :: QStore s
st = JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms
    TVar (Map RecipientId (JournalQueue s)) -> IO ()
closeQueues (TVar (Map RecipientId (JournalQueue s)) -> IO ())
-> TVar (Map RecipientId (JournalQueue s)) -> IO ()
forall a b. (a -> b) -> a -> b
$ forall q s. QueueStoreClass q s => s -> TMap RecipientId q
loadedQueues @(JournalQueue s) QStore s
st
    forall q s. QueueStoreClass q s => s -> IO ()
closeQueueStore @(JournalQueue s) QStore s
st
    where
      closeQueues :: TVar (Map RecipientId (JournalQueue s)) -> IO ()
closeQueues TVar (Map RecipientId (JournalQueue s))
qs = TVar (Map RecipientId (JournalQueue s))
-> IO (Map RecipientId (JournalQueue s))
forall a. TVar a -> IO a
readTVarIO TVar (Map RecipientId (JournalQueue s))
qs IO (Map RecipientId (JournalQueue s))
-> (Map RecipientId (JournalQueue s) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (JournalQueue s -> IO ())
-> Map RecipientId (JournalQueue s) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms)

  withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
  withActiveMsgQueues :: forall a.
Monoid a =>
JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
withActiveMsgQueues = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> (JournalQueue s -> IO a) -> IO a)
-> QStore s -> (JournalQueue s -> IO a) -> IO a
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
 QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> (JournalQueue s -> IO a) -> IO a
QStoreType s -> (JournalQueue s -> IO a) -> IO a
forall a q s.
(Monoid a, QueueStoreClass q s) =>
s -> (q -> IO a) -> IO a
withLoadedQueues (QStore s -> (JournalQueue s -> IO a) -> IO a)
-> (JournalMsgStore s -> QStore s)
-> JournalMsgStore s
-> (JournalQueue s -> IO a)
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_

  -- This function can only be used in server CLI commands or before server is started.
  -- It does not cache queues and is NOT concurrency safe.
  unsafeWithAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
  unsafeWithAllMsgQueues :: forall a.
Monoid a =>
Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
unsafeWithAllMsgQueues Bool
tty JournalMsgStore s
ms JournalQueue s -> IO a
action = case JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms of
    MQStore QStoreType 'QSMemory
st -> STMQueueStore (JournalQueue 'QSMemory)
-> (JournalQueue s -> IO a) -> IO a
forall a q s.
(Monoid a, QueueStoreClass q s) =>
s -> (q -> IO a) -> IO a
withLoadedQueues STMQueueStore (JournalQueue 'QSMemory)
QStoreType 'QSMemory
st JournalQueue s -> IO a
run
#if defined(dbServerPostgres)
    PQStore st -> foldQueueRecs False tty st $ uncurry (mkQueue ms False) >=> run
#endif
    where
      run :: JournalQueue s -> IO a
run JournalQueue s
q = do
        a
r <- JournalQueue s -> IO a
action JournalQueue s
q
        JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue s
q
        a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r

  -- This function is concurrency safe
  expireOldMessages :: Bool -> JournalMsgStore s -> Int64 -> Int64 -> IO MessageStats
  expireOldMessages :: Bool -> JournalMsgStore s -> Int64 -> Int64 -> IO MessageStats
expireOldMessages Bool
tty JournalMsgStore s
ms Int64
now Int64
ttl = case JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms of
    MQStore QStoreType 'QSMemory
st ->
      STMQueueStore (JournalQueue 'QSMemory)
-> (JournalQueue 'QSMemory -> IO MessageStats) -> IO MessageStats
forall a q s.
(Monoid a, QueueStoreClass q s) =>
s -> (q -> IO a) -> IO a
withLoadedQueues STMQueueStore (JournalQueue 'QSMemory)
QStoreType 'QSMemory
st ((JournalQueue 'QSMemory -> IO MessageStats) -> IO MessageStats)
-> (JournalQueue 'QSMemory -> IO MessageStats) -> IO MessageStats
forall a b. (a -> b) -> a -> b
$ \JournalQueue 'QSMemory
q -> ExceptT ErrorType IO MessageStats -> IO MessageStats
run (ExceptT ErrorType IO MessageStats -> IO MessageStats)
-> ExceptT ErrorType IO MessageStats -> IO MessageStats
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Text
-> StoreMonad (JournalMsgStore s) MessageStats
-> ExceptT ErrorType IO MessageStats
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
forall a.
JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Text
-> StoreMonad (JournalMsgStore s) a
-> ExceptT ErrorType IO a
isolateQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue 'QSMemory
q Text
"deleteExpiredMsgs" (StoreMonad (JournalMsgStore s) MessageStats
 -> ExceptT ErrorType IO MessageStats)
-> StoreMonad (JournalMsgStore s) MessageStats
-> ExceptT ErrorType IO MessageStats
forall a b. (a -> b) -> a -> b
$ do
        IO (Maybe QueueRec) -> StoreIO 'QSMemory (Maybe QueueRec)
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (TVar (Maybe QueueRec) -> IO (Maybe QueueRec)
forall a. TVar a -> IO a
readTVarIO (TVar (Maybe QueueRec) -> IO (Maybe QueueRec))
-> TVar (Maybe QueueRec) -> IO (Maybe QueueRec)
forall a b. (a -> b) -> a -> b
$ JournalQueue 'QSMemory -> TVar (Maybe QueueRec)
forall q. StoreQueueClass q => q -> TVar (Maybe QueueRec)
queueRec JournalQueue 'QSMemory
q) StoreIO 'QSMemory (Maybe QueueRec)
-> (Maybe QueueRec -> StoreIO 'QSMemory MessageStats)
-> StoreIO 'QSMemory MessageStats
forall a b.
StoreIO 'QSMemory a
-> (a -> StoreIO 'QSMemory b) -> StoreIO 'QSMemory b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Just QueueRec {$sel:updatedAt:QueueRec :: QueueRec -> Maybe SystemDate
updatedAt = Just (RoundedSystemTime Int64
t)} | Int64
t Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
veryOld ->
            JournalMsgStore s
-> Int64
-> Int64
-> StoreQueue (JournalMsgStore s)
-> StoreMonad (JournalMsgStore s) MessageStats
forall s.
MsgStoreClass s =>
s -> Int64 -> Int64 -> StoreQueue s -> StoreMonad s MessageStats
expireQueueMsgs JournalMsgStore s
ms Int64
now Int64
old StoreQueue (JournalMsgStore s)
JournalQueue 'QSMemory
q
          Maybe QueueRec
_ -> MessageStats -> StoreIO 'QSMemory MessageStats
forall a. a -> StoreIO 'QSMemory a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MessageStats
newMessageStats
#if defined(dbServerPostgres)
    PQStore st -> do
      let JournalMsgStore {queueLocks, sharedLock} = ms
      foldRecentQueueRecs veryOld tty st $ \(rId, qr) -> do
        q <- mkQueue ms False rId qr
        withSharedWaitLock rId queueLocks sharedLock $ run $ tryStore' "deleteExpiredMsgs" rId $
          getLoadedQueue q >>= unStoreIO . expireQueueMsgs ms now old
#endif
    where
      old :: Int64
old = Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
ttl
      veryOld :: Int64
veryOld = Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
2 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
ttl Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
86400
      run :: ExceptT ErrorType IO MessageStats -> IO MessageStats
      run :: ExceptT ErrorType IO MessageStats -> IO MessageStats
run = (Either ErrorType MessageStats -> MessageStats)
-> IO (Either ErrorType MessageStats) -> IO MessageStats
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (MessageStats -> Either ErrorType MessageStats -> MessageStats
forall b a. b -> Either a b -> b
fromRight MessageStats
newMessageStats) (IO (Either ErrorType MessageStats) -> IO MessageStats)
-> (ExceptT ErrorType IO MessageStats
    -> IO (Either ErrorType MessageStats))
-> ExceptT ErrorType IO MessageStats
-> IO MessageStats
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT ErrorType IO MessageStats
-> IO (Either ErrorType MessageStats)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
      -- Use cached queue if available.
      -- Also see the comment in loadQueue in PostgresQueueStore
      getLoadedQueue :: JournalQueue s -> IO (JournalQueue s)
      getLoadedQueue :: JournalQueue s -> IO (JournalQueue s)
getLoadedQueue JournalQueue s
q = JournalQueue s -> Maybe (JournalQueue s) -> JournalQueue s
forall a. a -> Maybe a -> a
fromMaybe JournalQueue s
q (Maybe (JournalQueue s) -> JournalQueue s)
-> IO (Maybe (JournalQueue s)) -> IO (JournalQueue s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RecipientId
-> TVar (Map RecipientId (JournalQueue s))
-> IO (Maybe (JournalQueue s))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO (JournalQueue s -> RecipientId
forall q. StoreQueueClass q => q -> RecipientId
recipientId JournalQueue s
q) (QStore s -> TVar (Map RecipientId (JournalQueue s))
forall q s. QueueStoreClass q s => s -> TMap RecipientId q
loadedQueues (QStore s -> TVar (Map RecipientId (JournalQueue s)))
-> QStore s -> TVar (Map RecipientId (JournalQueue s))
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms)

  logQueueStates :: JournalMsgStore s -> IO ()
  logQueueStates :: JournalMsgStore s -> IO ()
logQueueStates JournalMsgStore s
ms = JournalMsgStore s
-> (StoreQueue (JournalMsgStore s) -> IO ()) -> IO ()
forall a.
Monoid a =>
JournalMsgStore s
-> (StoreQueue (JournalMsgStore s) -> IO a) -> IO a
forall s a.
(MsgStoreClass s, Monoid a) =>
s -> (StoreQueue s -> IO a) -> IO a
withActiveMsgQueues JournalMsgStore s
ms ((StoreQueue (JournalMsgStore s) -> IO ()) -> IO ())
-> (StoreQueue (JournalMsgStore s) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ StoreIO s () -> IO ()
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (StoreIO s () -> IO ())
-> (JournalQueue s -> StoreIO s ()) -> JournalQueue s -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StoreQueue (JournalMsgStore s) -> StoreMonad (JournalMsgStore s) ()
JournalQueue s -> StoreIO s ()
forall s. MsgStoreClass s => StoreQueue s -> StoreMonad s ()
logQueueState

  logQueueState :: JournalQueue s -> StoreIO s ()
  logQueueState :: JournalQueue s -> StoreIO s ()
logQueueState JournalQueue s
q =
    IO () -> StoreIO s ()
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO () -> StoreIO s ())
-> (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> StoreIO s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> StoreIO s ()) -> IO (Maybe ()) -> StoreIO s ()
forall a b. (a -> b) -> a -> b
$
      TVar (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. TVar a -> IO a
readTVarIO (JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' JournalQueue s
q)
        IO (Maybe (JournalMsgQueue s))
-> (JournalMsgQueue s -> IO (Maybe ())) -> IO (Maybe ())
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \JournalMsgQueue s
mq -> TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles JournalMsgQueue s
mq)
        IO (Maybe MsgQueueHandles)
-> (MsgQueueHandles -> IO (Maybe ())) -> IO (Maybe ())
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= (\MsgQueueHandles
hs -> (TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
mq) IO MsgQueueState -> (MsgQueueState -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Handle -> MsgQueueState -> IO ()
appendState (MsgQueueHandles -> Handle
stateHandle MsgQueueHandles
hs)) IO () -> Maybe () -> IO (Maybe ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> () -> Maybe ()
forall a. a -> Maybe a
Just ())

  queueStore :: JournalMsgStore s -> QueueStore (JournalMsgStore s)
queueStore = JournalMsgStore s -> QueueStore (JournalMsgStore s)
JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_
  {-# INLINE queueStore #-}

  loadedQueueCounts :: JournalMsgStore s -> IO LoadedQueueCounts
  loadedQueueCounts :: JournalMsgStore s -> IO LoadedQueueCounts
loadedQueueCounts JournalMsgStore s
ms = do
    let (TVar (Map RecipientId (JournalQueue s))
qs, TMap RecipientId RecipientId
ns, Maybe (TMap RecipientId Lock)
nLocks_) = (TVar (Map RecipientId (JournalQueue s)),
 TMap RecipientId RecipientId, Maybe (TMap RecipientId Lock))
loaded
    Int
loadedQueueCount <- Map RecipientId (JournalQueue s) -> Int
forall k a. Map k a -> Int
M.size (Map RecipientId (JournalQueue s) -> Int)
-> IO (Map RecipientId (JournalQueue s)) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Map RecipientId (JournalQueue s))
-> IO (Map RecipientId (JournalQueue s))
forall a. TVar a -> IO a
readTVarIO TVar (Map RecipientId (JournalQueue s))
qs
    Int
loadedNotifierCount <- Map RecipientId RecipientId -> Int
forall k a. Map k a -> Int
M.size (Map RecipientId RecipientId -> Int)
-> IO (Map RecipientId RecipientId) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap RecipientId RecipientId -> IO (Map RecipientId RecipientId)
forall a. TVar a -> IO a
readTVarIO TMap RecipientId RecipientId
ns
    Int
openJournalCount <- TVar Int -> IO Int
forall a. TVar a -> IO a
readTVarIO (JournalMsgStore s -> TVar Int
forall (s :: QSType). JournalMsgStore s -> TVar Int
openedQueueCount JournalMsgStore s
ms)
    Int
queueLockCount <- Map RecipientId Lock -> Int
forall k a. Map k a -> Int
M.size (Map RecipientId Lock -> Int)
-> IO (Map RecipientId Lock) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap RecipientId Lock -> IO (Map RecipientId Lock)
forall a. TVar a -> IO a
readTVarIO (JournalMsgStore s -> TMap RecipientId Lock
forall (s :: QSType). JournalMsgStore s -> TMap RecipientId Lock
queueLocks JournalMsgStore s
ms)
    Int
notifierLockCount <- IO Int
-> (TMap RecipientId Lock -> IO Int)
-> Maybe (TMap RecipientId Lock)
-> IO Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0) ((Map RecipientId Lock -> Int)
-> IO (Map RecipientId Lock) -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map RecipientId Lock -> Int
forall k a. Map k a -> Int
M.size (IO (Map RecipientId Lock) -> IO Int)
-> (TMap RecipientId Lock -> IO (Map RecipientId Lock))
-> TMap RecipientId Lock
-> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMap RecipientId Lock -> IO (Map RecipientId Lock)
forall a. TVar a -> IO a
readTVarIO) Maybe (TMap RecipientId Lock)
nLocks_
    LoadedQueueCounts -> IO LoadedQueueCounts
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure LoadedQueueCounts {Int
loadedQueueCount :: Int
$sel:loadedQueueCount:LoadedQueueCounts :: Int
loadedQueueCount, Int
loadedNotifierCount :: Int
$sel:loadedNotifierCount:LoadedQueueCounts :: Int
loadedNotifierCount, Int
openJournalCount :: Int
$sel:openJournalCount:LoadedQueueCounts :: Int
openJournalCount, Int
queueLockCount :: Int
$sel:queueLockCount:LoadedQueueCounts :: Int
queueLockCount, Int
notifierLockCount :: Int
$sel:notifierLockCount:LoadedQueueCounts :: Int
notifierLockCount}
    where
      loaded :: (TMap RecipientId (JournalQueue s), TMap NotifierId RecipientId, Maybe (TMap NotifierId Lock))
      loaded :: (TVar (Map RecipientId (JournalQueue s)),
 TMap RecipientId RecipientId, Maybe (TMap RecipientId Lock))
loaded = case JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms of
        MQStore STMQueueStore {TVar (Map RecipientId (JournalQueue s))
queues :: TVar (Map RecipientId (JournalQueue s))
$sel:queues:STMQueueStore :: forall q. STMQueueStore q -> TMap RecipientId q
queues, TMap RecipientId RecipientId
notifiers :: TMap RecipientId RecipientId
$sel:notifiers:STMQueueStore :: forall q. STMQueueStore q -> TMap RecipientId RecipientId
notifiers} -> (TVar (Map RecipientId (JournalQueue s))
queues, TMap RecipientId RecipientId
notifiers, Maybe (TMap RecipientId Lock)
forall a. Maybe a
Nothing)
#if defined(dbServerPostgres)
        PQStore PostgresQueueStore {queues, notifiers, notifierLocks} -> (queues, notifiers, Just notifierLocks)
#endif

  mkQueue :: JournalMsgStore s -> Bool -> RecipientId -> QueueRec -> IO (JournalQueue s)
  mkQueue :: JournalMsgStore s
-> Bool -> RecipientId -> QueueRec -> IO (JournalQueue s)
mkQueue JournalMsgStore s
ms Bool
keepLock RecipientId
rId QueueRec
qr = do
    Lock
lock <- if Bool
keepLock then STM Lock -> IO Lock
forall a. STM a -> IO a
atomically (STM Lock -> IO Lock) -> STM Lock -> IO Lock
forall a b. (a -> b) -> a -> b
$ TMap RecipientId Lock -> RecipientId -> STM Lock
forall k. Ord k => TMap k Lock -> k -> STM Lock
getMapLock (JournalMsgStore s -> TMap RecipientId Lock
forall (s :: QSType). JournalMsgStore s -> TMap RecipientId Lock
queueLocks JournalMsgStore s
ms) RecipientId
rId else IO Lock
createLockIO
    JournalMsgStore s
-> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
forall (s :: QSType).
JournalMsgStore s
-> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
makeQueue_ JournalMsgStore s
ms RecipientId
rId QueueRec
qr Lock
lock

  getMsgQueue :: JournalMsgStore s -> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s)
  getMsgQueue :: JournalMsgStore s
-> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s)
getMsgQueue ms :: JournalMsgStore s
ms@JournalMsgStore {TVar StdGen
$sel:random:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random :: TVar StdGen
random} q' :: JournalQueue s
q'@JournalQueue {$sel:recipientId':JournalQueue :: forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' = RecipientId
rId, TVar (Maybe (JournalMsgQueue s))
$sel:msgQueue':JournalQueue :: forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' :: TVar (Maybe (JournalMsgQueue s))
msgQueue'} Bool
forWrite =
    IO (JournalMsgQueue s) -> StoreIO s (JournalMsgQueue s)
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO (JournalMsgQueue s) -> StoreIO s (JournalMsgQueue s))
-> IO (JournalMsgQueue s) -> StoreIO s (JournalMsgQueue s)
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (JournalMsgQueue s))
msgQueue' IO (Maybe (JournalMsgQueue s))
-> (Maybe (JournalMsgQueue s) -> IO (JournalMsgQueue s))
-> IO (JournalMsgQueue s)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (JournalMsgQueue s)
-> (JournalMsgQueue s -> IO (JournalMsgQueue s))
-> Maybe (JournalMsgQueue s)
-> IO (JournalMsgQueue s)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (JournalMsgQueue s)
newQ JournalMsgQueue s -> IO (JournalMsgQueue s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    where
      newQ :: IO (JournalMsgQueue s)
newQ = do
        let dir :: String
dir = JournalMsgStore s -> RecipientId -> String
forall (s :: QSType). JournalMsgStore s -> RecipientId -> String
msgQueueDirectory JournalMsgStore s
ms RecipientId
rId
            statePath :: String
statePath = String -> RecipientId -> String
msgQueueStatePath String
dir RecipientId
rId
            queue :: JMQueue
queue = JMQueue {$sel:queueDirectory:JMQueue :: String
queueDirectory = String
dir, String
$sel:statePath:JMQueue :: String
statePath :: String
statePath}
        JournalMsgQueue s
q <- IO Bool
-> IO (JournalMsgQueue s)
-> IO (JournalMsgQueue s)
-> IO (JournalMsgQueue s)
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (String -> IO Bool
doesDirectoryExist String
dir) (JournalMsgStore s -> JMQueue -> Bool -> IO (JournalMsgQueue s)
forall (s :: QSType).
JournalMsgStore s -> JMQueue -> Bool -> IO (JournalMsgQueue s)
openMsgQueue JournalMsgStore s
ms JMQueue
queue Bool
forWrite) (JMQueue -> IO (JournalMsgQueue s)
createQ JMQueue
queue)
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (JournalMsgQueue s))
-> Maybe (JournalMsgQueue s) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (JournalMsgQueue s))
msgQueue' (Maybe (JournalMsgQueue s) -> STM ())
-> Maybe (JournalMsgQueue s) -> STM ()
forall a b. (a -> b) -> a -> b
$ JournalMsgQueue s -> Maybe (JournalMsgQueue s)
forall a. a -> Maybe a
Just JournalMsgQueue s
q
        MsgQueueState
st <- TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (TVar MsgQueueState -> IO MsgQueueState)
-> TVar MsgQueueState -> IO MsgQueueState
forall a b. (a -> b) -> a -> b
$ JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe QState) -> Maybe QState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalQueue s -> TVar (Maybe QState)
forall (s :: QSType). JournalQueue s -> TVar (Maybe QState)
queueState JournalQueue s
q') (Maybe QState -> STM ()) -> Maybe QState -> STM ()
forall a b. (a -> b) -> a -> b
$ QState -> Maybe QState
forall a. a -> Maybe a
Just (QState -> Maybe QState) -> QState -> Maybe QState
forall a b. (a -> b) -> a -> b
$! MsgQueueState -> QState
qState MsgQueueState
st
        JournalMsgQueue s -> IO (JournalMsgQueue s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JournalMsgQueue s
q
        where
          createQ :: JMQueue -> IO (JournalMsgQueue s)
          createQ :: JMQueue -> IO (JournalMsgQueue s)
createQ JMQueue
queue = do
            -- folder and files are not created here,
            -- to avoid file IO for queues without messages during subscription
            ByteString
journalId <- TVar StdGen -> IO ByteString
newJournalId TVar StdGen
random
            JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
forall (s :: QSType).
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue JMQueue
queue (ByteString -> MsgQueueState
newMsgQueueState ByteString
journalId) Maybe MsgQueueHandles
forall a. Maybe a
Nothing

  getPeekMsgQueue :: JournalMsgStore s -> JournalQueue s -> StoreIO s (Maybe (JournalMsgQueue s, Message))
  getPeekMsgQueue :: JournalMsgStore s
-> JournalQueue s -> StoreIO s (Maybe (JournalMsgQueue s, Message))
getPeekMsgQueue JournalMsgStore s
ms q :: JournalQueue s
q@JournalQueue {TVar (Maybe QState)
$sel:queueState:JournalQueue :: forall (s :: QSType). JournalQueue s -> TVar (Maybe QState)
queueState :: TVar (Maybe QState)
queueState} =
    IO (Maybe QState) -> StoreIO s (Maybe QState)
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (TVar (Maybe QState) -> IO (Maybe QState)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe QState)
queueState) StoreIO s (Maybe QState)
-> (Maybe QState -> StoreIO s (Maybe (JournalMsgQueue s, Message)))
-> StoreIO s (Maybe (JournalMsgQueue s, Message))
forall a b. StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Just QState {Bool
$sel:hasPending:QState :: QState -> Bool
hasPending :: Bool
hasPending} -> if Bool
hasPending then StoreIO s (Maybe (JournalMsgQueue s, Message))
peek else Maybe (JournalMsgQueue s, Message)
-> StoreIO s (Maybe (JournalMsgQueue s, Message))
forall a. a -> StoreIO s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JournalMsgQueue s, Message)
forall a. Maybe a
Nothing
      Maybe QState
Nothing -> do
        -- We only close the queue if we just learnt it's empty.
        -- This is needed to reduce file descriptors and memory usage
        -- after the server just started and many clients subscribe.
        -- In case the queue became non-empty on write and then again empty on read
        -- we won't be closing it, to avoid frequent open/close on active queues.
        Maybe (JournalMsgQueue s, Message)
r <- StoreIO s (Maybe (JournalMsgQueue s, Message))
peek
        Bool -> StoreIO s () -> StoreIO s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (JournalMsgQueue s, Message) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (JournalMsgQueue s, Message)
r) (StoreIO s () -> StoreIO s ()) -> StoreIO s () -> StoreIO s ()
forall a b. (a -> b) -> a -> b
$ IO () -> StoreIO s ()
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO () -> StoreIO s ()) -> IO () -> StoreIO s ()
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue s
q
        Maybe (JournalMsgQueue s, Message)
-> StoreIO s (Maybe (JournalMsgQueue s, Message))
forall a. a -> StoreIO s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JournalMsgQueue s, Message)
r
    where
      peek :: StoreIO s (Maybe (JournalMsgQueue s, Message))
peek = do
        JournalMsgQueue s
mq <- JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Bool
-> StoreMonad (JournalMsgStore s) (MsgQueue (JournalMsgStore s))
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getMsgQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue s
q Bool
False
        (JournalMsgQueue s
mq,) (Message -> (JournalMsgQueue s, Message))
-> StoreIO s (Maybe Message)
-> StoreIO s (Maybe (JournalMsgQueue s, Message))
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> StoreQueue (JournalMsgStore s)
-> MsgQueue (JournalMsgStore s)
-> StoreMonad (JournalMsgStore s) (Maybe Message)
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryPeekMsg_ StoreQueue (JournalMsgStore s)
JournalQueue s
q MsgQueue (JournalMsgStore s)
JournalMsgQueue s
mq

  -- only runs action if queue is not empty
  withIdleMsgQueue :: Int64 -> JournalMsgStore s -> JournalQueue s -> (JournalMsgQueue s -> StoreIO s a) -> StoreIO s (Maybe a, Int)
  withIdleMsgQueue :: forall a.
Int64
-> JournalMsgStore s
-> JournalQueue s
-> (JournalMsgQueue s -> StoreIO s a)
-> StoreIO s (Maybe a, Int)
withIdleMsgQueue Int64
now ms :: JournalMsgStore s
ms@JournalMsgStore {JournalStoreConfig s
$sel:config:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config :: JournalStoreConfig s
config} q :: JournalQueue s
q@JournalQueue {TVar (Maybe QState)
$sel:queueState:JournalQueue :: forall (s :: QSType). JournalQueue s -> TVar (Maybe QState)
queueState :: TVar (Maybe QState)
queueState} JournalMsgQueue s -> StoreIO s a
action =
    IO (Maybe a, Int) -> StoreIO s (Maybe a, Int)
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO (Maybe a, Int) -> StoreIO s (Maybe a, Int))
-> IO (Maybe a, Int) -> StoreIO s (Maybe a, Int)
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. TVar a -> IO a
readTVarIO (JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' JournalQueue s
q) IO (Maybe (JournalMsgQueue s))
-> (Maybe (JournalMsgQueue s) -> IO (Maybe a, Int))
-> IO (Maybe a, Int)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Maybe (JournalMsgQueue s)
Nothing ->
        IO (Maybe (JournalMsgQueue s))
-> (Maybe (JournalMsgQueue s) -> IO ())
-> (Maybe (JournalMsgQueue s) -> IO (Maybe a, Int))
-> IO (Maybe a, Int)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracket
          IO (Maybe (JournalMsgQueue s))
getNonEmptyMsgQueue
          ((JournalMsgQueue s -> IO ()) -> Maybe (JournalMsgQueue s) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((JournalMsgQueue s -> IO ())
 -> Maybe (JournalMsgQueue s) -> IO ())
-> (JournalMsgQueue s -> IO ())
-> Maybe (JournalMsgQueue s)
-> IO ()
forall a b. (a -> b) -> a -> b
$ \JournalMsgQueue s
_ -> JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue s
q)
          (IO (Maybe a, Int)
-> (JournalMsgQueue s -> IO (Maybe a, Int))
-> Maybe (JournalMsgQueue s)
-> IO (Maybe a, Int)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((Maybe a, Int) -> IO (Maybe a, Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a
forall a. Maybe a
Nothing, Int
0)) (StoreIO s (Maybe a, Int) -> IO (Maybe a, Int)
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (StoreIO s (Maybe a, Int) -> IO (Maybe a, Int))
-> (JournalMsgQueue s -> StoreIO s (Maybe a, Int))
-> JournalMsgQueue s
-> IO (Maybe a, Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JournalMsgQueue s -> StoreIO s (Maybe a, Int)
run))
        where
          run :: JournalMsgQueue s -> StoreIO s (Maybe a, Int)
run JournalMsgQueue s
mq = do
            a
r <- JournalMsgQueue s -> StoreIO s a
action JournalMsgQueue s
mq
            Int
sz <- MsgQueue (JournalMsgStore s) -> StoreMonad (JournalMsgStore s) Int
forall s. MsgStoreClass s => MsgQueue s -> StoreMonad s Int
getQueueSize_ MsgQueue (JournalMsgStore s)
JournalMsgQueue s
mq
            (Maybe a, Int) -> StoreIO s (Maybe a, Int)
forall a. a -> StoreIO s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
r, Int
sz)
      Just JournalMsgQueue s
mq -> do
        Int64
ts <- TVar Int64 -> IO Int64
forall a. TVar a -> IO a
readTVarIO (TVar Int64 -> IO Int64) -> TVar Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ JournalQueue s -> TVar Int64
forall (s :: QSType). JournalQueue s -> TVar Int64
activeAt JournalQueue s
q
        Maybe a
r <- if Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
ts Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
>= JournalStoreConfig s -> Int64
forall (s :: QSType). JournalStoreConfig s -> Int64
idleInterval JournalStoreConfig s
config
          then a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StoreIO s a -> IO a
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (JournalMsgQueue s -> StoreIO s a
action JournalMsgQueue s
mq) IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`E.finally` JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue s
q
          else Maybe a -> IO (Maybe a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
        Int
sz <- StoreIO s Int -> IO Int
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (StoreIO s Int -> IO Int) -> StoreIO s Int -> IO Int
forall a b. (a -> b) -> a -> b
$ MsgQueue (JournalMsgStore s) -> StoreMonad (JournalMsgStore s) Int
forall s. MsgStoreClass s => MsgQueue s -> StoreMonad s Int
getQueueSize_ MsgQueue (JournalMsgStore s)
JournalMsgQueue s
mq
        (Maybe a, Int) -> IO (Maybe a, Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a
r, Int
sz)
    where
      getNonEmptyMsgQueue :: IO (Maybe (JournalMsgQueue s))
      getNonEmptyMsgQueue :: IO (Maybe (JournalMsgQueue s))
getNonEmptyMsgQueue =
        TVar (Maybe QState) -> IO (Maybe QState)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe QState)
queueState IO (Maybe QState)
-> (Maybe QState -> IO (Maybe (JournalMsgQueue s)))
-> IO (Maybe (JournalMsgQueue s))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Just QState {Bool
$sel:hasStored:QState :: QState -> Bool
hasStored :: Bool
hasStored}
            | Bool
hasStored -> JournalMsgQueue s -> Maybe (JournalMsgQueue s)
forall a. a -> Maybe a
Just (JournalMsgQueue s -> Maybe (JournalMsgQueue s))
-> IO (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StoreIO s (JournalMsgQueue s) -> IO (JournalMsgQueue s)
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Bool
-> StoreMonad (JournalMsgStore s) (MsgQueue (JournalMsgStore s))
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getMsgQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue s
q Bool
False)
            | Bool
otherwise -> Maybe (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JournalMsgQueue s)
forall a. Maybe a
Nothing
          Maybe QState
Nothing -> do
            JournalMsgQueue s
mq <- StoreIO s (JournalMsgQueue s) -> IO (JournalMsgQueue s)
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (StoreIO s (JournalMsgQueue s) -> IO (JournalMsgQueue s))
-> StoreIO s (JournalMsgQueue s) -> IO (JournalMsgQueue s)
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Bool
-> StoreMonad (JournalMsgStore s) (MsgQueue (JournalMsgStore s))
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getMsgQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue s
q Bool
False
            -- queueState was updated in getMsgQueue
            TVar (Maybe QState) -> IO (Maybe QState)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe QState)
queueState IO (Maybe QState)
-> (Maybe QState -> IO (Maybe (JournalMsgQueue s)))
-> IO (Maybe (JournalMsgQueue s))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Just QState {Bool
$sel:hasStored:QState :: QState -> Bool
hasStored :: Bool
hasStored} | Bool -> Bool
not Bool
hasStored -> JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue s
q IO ()
-> Maybe (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe (JournalMsgQueue s)
forall a. Maybe a
Nothing
              Maybe QState
_ -> Maybe (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s)))
-> Maybe (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s))
forall a b. (a -> b) -> a -> b
$ JournalMsgQueue s -> Maybe (JournalMsgQueue s)
forall a. a -> Maybe a
Just JournalMsgQueue s
mq

  deleteQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType QueueRec)
  deleteQueue :: JournalMsgStore s
-> JournalQueue s -> IO (Either ErrorType QueueRec)
deleteQueue JournalMsgStore s
ms JournalQueue s
q = (QueueRec, Maybe (JournalMsgQueue s)) -> QueueRec
forall a b. (a, b) -> a
fst ((QueueRec, Maybe (JournalMsgQueue s)) -> QueueRec)
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
-> IO (Either ErrorType QueueRec)
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> JournalMsgStore s
-> JournalQueue s
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall (s :: QSType).
JournalMsgStore s
-> JournalQueue s
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
deleteQueue_ JournalMsgStore s
ms JournalQueue s
q

  deleteQueueSize :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Int))
  deleteQueueSize :: JournalMsgStore s
-> JournalQueue s -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize JournalMsgStore s
ms JournalQueue s
q =
    JournalMsgStore s
-> JournalQueue s
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall (s :: QSType).
JournalMsgStore s
-> JournalQueue s
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
deleteQueue_ JournalMsgStore s
ms JournalQueue s
q IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
-> (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))
    -> IO (Either ErrorType (QueueRec, Int)))
-> IO (Either ErrorType (QueueRec, Int))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((QueueRec, Maybe (JournalMsgQueue s)) -> IO (QueueRec, Int))
-> Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))
-> IO (Either ErrorType (QueueRec, Int))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Either ErrorType a -> m (Either ErrorType b)
mapM ((Maybe (JournalMsgQueue s) -> IO Int)
-> (QueueRec, Maybe (JournalMsgQueue s)) -> IO (QueueRec, Int)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> (QueueRec, a) -> f (QueueRec, b)
traverse Maybe (JournalMsgQueue s) -> IO Int
forall {s :: QSType}. Maybe (JournalMsgQueue s) -> IO Int
getSize)
    -- traverse operates on the second tuple element
    where
      getSize :: Maybe (JournalMsgQueue s) -> IO Int
getSize = IO Int
-> (JournalMsgQueue s -> IO Int)
-> Maybe (JournalMsgQueue s)
-> IO Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (-Int
1)) ((MsgQueueState -> Int) -> IO MsgQueueState -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MsgQueueState -> Int
size (IO MsgQueueState -> IO Int)
-> (JournalMsgQueue s -> IO MsgQueueState)
-> JournalMsgQueue s
-> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (TVar MsgQueueState -> IO MsgQueueState)
-> (JournalMsgQueue s -> TVar MsgQueueState)
-> JournalMsgQueue s
-> IO MsgQueueState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state)

   -- drainMsgs is never True with Journal storage
  getQueueMessages_ :: Bool -> JournalQueue s -> JournalMsgQueue s -> StoreIO s [Message]
  getQueueMessages_ :: Bool -> JournalQueue s -> JournalMsgQueue s -> StoreIO s [Message]
getQueueMessages_ Bool
drainMsgs JournalQueue s
q' JournalMsgQueue s
q = IO [Message] -> StoreIO s [Message]
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO [Message] -> StoreIO s [Message])
-> IO [Message] -> StoreIO s [Message]
forall a b. (a -> b) -> a -> b
$ if Bool
drainMsgs then [Message] -> IO [Message]
run [] else TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q) IO MsgQueueState -> (MsgQueueState -> IO [Message]) -> IO [Message]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MsgQueueState -> IO [Message]
runFast
    where
      run :: [Message] -> IO [Message]
run [Message]
msgs = TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles JournalMsgQueue s
q) IO (Maybe MsgQueueHandles)
-> (Maybe MsgQueueHandles -> IO [Message]) -> IO [Message]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO [Message]
-> (MsgQueueHandles -> IO [Message])
-> Maybe MsgQueueHandles
-> IO [Message]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ([Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []) ([Message] -> MsgQueueHandles -> IO [Message]
getMsg [Message]
msgs)
      getMsg :: [Message] -> MsgQueueHandles -> IO [Message]
getMsg [Message]
msgs MsgQueueHandles
hs = JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> IO (Maybe (JournalState 'JTRead, Handle))
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> IO (Maybe (JournalState 'JTRead, Handle))
chooseReadJournal JournalQueue s
q' JournalMsgQueue s
q Bool
drainMsgs MsgQueueHandles
hs IO (Maybe (JournalState 'JTRead, Handle))
-> (Maybe (JournalState 'JTRead, Handle) -> IO [Message])
-> IO [Message]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO [Message]
-> ((JournalState 'JTRead, Handle) -> IO [Message])
-> Maybe (JournalState 'JTRead, Handle)
-> IO [Message]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ([Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Message]
msgs) (JournalState 'JTRead, Handle) -> IO [Message]
readMsg
        where
          readMsg :: (JournalState 'JTRead, Handle) -> IO [Message]
readMsg (JournalState 'JTRead
rs, Handle
h) = do
            (Message
msg, Int64
len) <- Handle -> Int64 -> IO (Message, Int64)
hGetMsgAt Handle
h (Int64 -> IO (Message, Int64)) -> Int64 -> IO (Message, Int64)
forall a b. (a -> b) -> a -> b
$ JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs
            JournalQueue s
-> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
updateReadPos JournalQueue s
q' JournalMsgQueue s
q Bool
drainMsgs Int64
len MsgQueueHandles
hs
            (Message
msg Message -> [Message] -> [Message]
forall a. a -> [a] -> [a]
:) ([Message] -> [Message]) -> IO [Message] -> IO [Message]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Message] -> IO [Message]
run [Message]
msgs
      runFast :: MsgQueueState -> IO [Message]
runFast MsgQueueState {$sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws, $sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size}
        | Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 =
            TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles JournalMsgQueue s
q) IO (Maybe MsgQueueHandles)
-> (Maybe MsgQueueHandles -> IO [Message]) -> IO [Message]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Just (MsgQueueHandles Handle
_ Handle
rh Maybe Handle
wh_) -> do
                [Message]
msgs <- Handle -> Int64 -> Int64 -> IO [Message]
getJournalRange Handle
rh (JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs) (JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs)
                case Maybe Handle
wh_ of
                  Just Handle
wh -> ([Message]
msgs [Message] -> [Message] -> [Message]
forall a. [a] -> [a] -> [a]
++) ([Message] -> [Message]) -> IO [Message] -> IO [Message]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> Int64 -> Int64 -> IO [Message]
getJournalRange Handle
wh Int64
0 (JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws)
                  Maybe Handle
Nothing -> [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Message]
msgs
              Maybe MsgQueueHandles
Nothing -> [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
        | Bool
otherwise = [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []

  writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
  writeMsg :: JournalMsgStore s
-> JournalQueue s
-> Bool
-> Message
-> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg JournalMsgStore s
ms JournalQueue s
q' Bool
logState Message
msg = JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Text
-> StoreMonad (JournalMsgStore s) (Maybe (Message, Bool))
-> ExceptT ErrorType IO (Maybe (Message, Bool))
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
forall a.
JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Text
-> StoreMonad (JournalMsgStore s) a
-> ExceptT ErrorType IO a
isolateQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue s
q' Text
"writeMsg" (StoreMonad (JournalMsgStore s) (Maybe (Message, Bool))
 -> ExceptT ErrorType IO (Maybe (Message, Bool)))
-> StoreMonad (JournalMsgStore s) (Maybe (Message, Bool))
-> ExceptT ErrorType IO (Maybe (Message, Bool))
forall a b. (a -> b) -> a -> b
$ do
    JournalMsgQueue s
q <- JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Bool
-> StoreMonad (JournalMsgStore s) (MsgQueue (JournalMsgStore s))
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getMsgQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue s
q' Bool
True
    IO (Maybe (Message, Bool)) -> StoreIO s (Maybe (Message, Bool))
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO (Maybe (Message, Bool)) -> StoreIO s (Maybe (Message, Bool)))
-> IO (Maybe (Message, Bool)) -> StoreIO s (Maybe (Message, Bool))
forall a b. (a -> b) -> a -> b
$ (IO (Maybe (Message, Bool)) -> IO () -> IO (Maybe (Message, Bool))
forall a b. IO a -> IO b -> IO a
`E.finally` JournalQueue s -> IO ()
forall (s :: QSType). JournalQueue s -> IO ()
updateActiveAt JournalQueue s
q') (IO (Maybe (Message, Bool)) -> IO (Maybe (Message, Bool)))
-> IO (Maybe (Message, Bool)) -> IO (Maybe (Message, Bool))
forall a b. (a -> b) -> a -> b
$ do
      st :: MsgQueueState
st@MsgQueueState {Bool
$sel:canWrite:MsgQueueState :: MsgQueueState -> Bool
canWrite :: Bool
canWrite, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size} <- TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q)
      let empty :: Bool
empty = Int
size Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
      if Bool
canWrite Bool -> Bool -> Bool
|| Bool
empty
        then do
          let canWrt' :: Bool
canWrt' = Int
quota Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
size
          if Bool
canWrt'
            then JournalMsgQueue s -> MsgQueueState -> Bool -> Message -> IO ()
writeToJournal JournalMsgQueue s
q MsgQueueState
st Bool
canWrt' Message
msg IO () -> Maybe (Message, Bool) -> IO (Maybe (Message, Bool))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (Message, Bool) -> Maybe (Message, Bool)
forall a. a -> Maybe a
Just (Message
msg, Bool
empty)
            else JournalMsgQueue s -> MsgQueueState -> Bool -> Message -> IO ()
writeToJournal JournalMsgQueue s
q MsgQueueState
st Bool
canWrt' Message
msgQuota IO () -> Maybe (Message, Bool) -> IO (Maybe (Message, Bool))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe (Message, Bool)
forall a. Maybe a
Nothing
        else Maybe (Message, Bool) -> IO (Maybe (Message, Bool))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Message, Bool)
forall a. Maybe a
Nothing
    where
      JournalStoreConfig {Int
$sel:quota:JournalStoreConfig :: forall (s :: QSType). JournalStoreConfig s -> Int
quota :: Int
quota, Int
$sel:maxMsgCount:JournalStoreConfig :: forall (s :: QSType). JournalStoreConfig s -> Int
maxMsgCount :: Int
maxMsgCount} = JournalMsgStore s -> JournalStoreConfig s
forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config JournalMsgStore s
ms
      msgQuota :: Message
msgQuota = MessageQuota {$sel:msgId:Message :: ByteString
msgId = Message -> ByteString
messageId Message
msg, $sel:msgTs:Message :: SystemTime
msgTs = Message -> SystemTime
messageTs Message
msg}
      writeToJournal :: JournalMsgQueue s -> MsgQueueState -> Bool -> Message -> IO ()
writeToJournal JournalMsgQueue s
q st :: MsgQueueState
st@MsgQueueState {JournalState 'JTWrite
$sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState :: JournalState 'JTWrite
writeState, $sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size} Bool
canWrt' !Message
msg' = do
        let msgStr :: ByteString
msgStr = Message -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode Message
msg' ByteString -> Char -> ByteString
`B.snoc` Char
'\n'
            msgLen :: Int64
msgLen = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
B.length ByteString
msgStr
        MsgQueueHandles
hs <- IO MsgQueueHandles
-> (MsgQueueHandles -> IO MsgQueueHandles)
-> Maybe MsgQueueHandles
-> IO MsgQueueHandles
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO MsgQueueHandles
createQueueDir MsgQueueHandles -> IO MsgQueueHandles
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe MsgQueueHandles -> IO MsgQueueHandles)
-> IO (Maybe MsgQueueHandles) -> IO MsgQueueHandles
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe MsgQueueHandles)
handles
        (JournalState 'JTWrite
ws, Handle
wh) <- case MsgQueueHandles -> Maybe Handle
writeHandle MsgQueueHandles
hs of
          Maybe Handle
Nothing | JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTWrite
writeState Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxMsgCount -> MsgQueueHandles -> IO (JournalState 'JTWrite, Handle)
switchWriteJournal MsgQueueHandles
hs
          Maybe Handle
wh_ -> (JournalState 'JTWrite, Handle)
-> IO (JournalState 'JTWrite, Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (JournalState 'JTWrite
writeState, Handle -> Maybe Handle -> Handle
forall a. a -> Maybe a -> a
fromMaybe (MsgQueueHandles -> Handle
readHandle MsgQueueHandles
hs) Maybe Handle
wh_)
        let msgPos' :: Int
msgPos' = JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTWrite
ws Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
            bytePos' :: Int64
bytePos' = JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
msgLen
            ws' :: JournalState 'JTWrite
ws' = JournalState 'JTWrite
ws {msgPos = msgPos', msgCount = msgPos', bytePos = bytePos', byteCount = bytePos'}
            rs' :: JournalState 'JTRead
rs' = if JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs then JournalState 'JTRead
rs {msgCount = msgPos', byteCount = bytePos'} else JournalState 'JTRead
rs
            !st' :: MsgQueueState
st' = MsgQueueState
st {writeState = ws', readState = rs', canWrite = canWrt', size = size + 1}
        Handle -> Int64 -> ByteString -> IO ()
hAppend Handle
wh (JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws) ByteString
msgStr
        JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
updateQueueState JournalQueue s
q' JournalMsgQueue s
q Bool
logState MsgQueueHandles
hs MsgQueueState
st' (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
size Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Maybe (Message, Int64)))
-> Maybe (Maybe (Message, Int64)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
tipMsg JournalMsgQueue s
q) (Maybe (Maybe (Message, Int64)) -> STM ())
-> Maybe (Maybe (Message, Int64)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (Message, Int64) -> Maybe (Maybe (Message, Int64))
forall a. a -> Maybe a
Just ((Message, Int64) -> Maybe (Message, Int64)
forall a. a -> Maybe a
Just (Message
msg, Int64
msgLen))
        where
          JournalMsgQueue {$sel:queue:JournalMsgQueue :: forall (s :: QSType). JournalMsgQueue s -> JMQueue
queue = JMQueue {String
$sel:queueDirectory:JMQueue :: JMQueue -> String
queueDirectory :: String
queueDirectory, String
$sel:statePath:JMQueue :: JMQueue -> String
statePath :: String
statePath}, TVar (Maybe MsgQueueHandles)
$sel:handles:JournalMsgQueue :: forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles :: TVar (Maybe MsgQueueHandles)
handles} = JournalMsgQueue s
q
          createQueueDir :: IO MsgQueueHandles
createQueueDir = do
            Bool -> String -> IO ()
createDirectoryIfMissing Bool
True String
queueDirectory
            Handle
sh <- String -> IOMode -> IO Handle
openFile String
statePath IOMode
AppendMode
            Handle
rh <- String -> ByteString -> IO Handle
createNewJournal String
queueDirectory (ByteString -> IO Handle) -> ByteString -> IO Handle
forall a b. (a -> b) -> a -> b
$ JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs
            let hs :: MsgQueueHandles
hs = MsgQueueHandles {$sel:stateHandle:MsgQueueHandles :: Handle
stateHandle = Handle
sh, $sel:readHandle:MsgQueueHandles :: Handle
readHandle = Handle
rh, $sel:writeHandle:MsgQueueHandles :: Maybe Handle
writeHandle = Maybe Handle
forall a. Maybe a
Nothing}
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe MsgQueueHandles) -> Maybe MsgQueueHandles -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe MsgQueueHandles)
handles (Maybe MsgQueueHandles -> STM ())
-> Maybe MsgQueueHandles -> STM ()
forall a b. (a -> b) -> a -> b
$ MsgQueueHandles -> Maybe MsgQueueHandles
forall a. a -> Maybe a
Just MsgQueueHandles
hs
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (JournalMsgStore s -> TVar Int
forall (s :: QSType). JournalMsgStore s -> TVar Int
openedQueueCount JournalMsgStore s
ms) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            MsgQueueHandles -> IO MsgQueueHandles
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MsgQueueHandles
hs
          switchWriteJournal :: MsgQueueHandles -> IO (JournalState 'JTWrite, Handle)
switchWriteJournal MsgQueueHandles
hs = do
            ByteString
journalId <- TVar StdGen -> IO ByteString
newJournalId (TVar StdGen -> IO ByteString) -> TVar StdGen -> IO ByteString
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s -> TVar StdGen
forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random JournalMsgStore s
ms
            Handle
wh <- String -> ByteString -> IO Handle
createNewJournal String
queueDirectory ByteString
journalId
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe MsgQueueHandles) -> Maybe MsgQueueHandles -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe MsgQueueHandles)
handles (Maybe MsgQueueHandles -> STM ())
-> Maybe MsgQueueHandles -> STM ()
forall a b. (a -> b) -> a -> b
$ MsgQueueHandles -> Maybe MsgQueueHandles
forall a. a -> Maybe a
Just (MsgQueueHandles -> Maybe MsgQueueHandles)
-> MsgQueueHandles -> Maybe MsgQueueHandles
forall a b. (a -> b) -> a -> b
$ MsgQueueHandles
hs {writeHandle = Just wh}
            (JournalState 'JTWrite, Handle)
-> IO (JournalState 'JTWrite, Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> JournalState 'JTWrite
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
journalId, Handle
wh)

  -- can ONLY be used while restoring messages, not while server running
  setOverQuota_ :: JournalQueue s -> IO ()
  setOverQuota_ :: JournalQueue s -> IO ()
setOverQuota_ JournalQueue s
q =
    TVar (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. TVar a -> IO a
readTVarIO (JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' JournalQueue s
q)
      IO (Maybe (JournalMsgQueue s))
-> (Maybe (JournalMsgQueue s) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (JournalMsgQueue s -> IO ()) -> Maybe (JournalMsgQueue s) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\JournalMsgQueue {TVar MsgQueueState
$sel:state:JournalMsgQueue :: forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state :: TVar MsgQueueState
state} -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar MsgQueueState -> (MsgQueueState -> MsgQueueState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar MsgQueueState
state ((MsgQueueState -> MsgQueueState) -> STM ())
-> (MsgQueueState -> MsgQueueState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \MsgQueueState
st -> MsgQueueState
st {canWrite = False})

  getQueueSize_ :: JournalMsgQueue s -> StoreIO s Int
  getQueueSize_ :: JournalMsgQueue s -> StoreIO s Int
getQueueSize_ JournalMsgQueue {TVar MsgQueueState
$sel:state:JournalMsgQueue :: forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state :: TVar MsgQueueState
state} = IO Int -> StoreIO s Int
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO Int -> StoreIO s Int) -> IO Int -> StoreIO s Int
forall a b. (a -> b) -> a -> b
$ MsgQueueState -> Int
size (MsgQueueState -> Int) -> IO MsgQueueState -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO TVar MsgQueueState
state

  tryPeekMsg_ :: JournalQueue s -> JournalMsgQueue s -> StoreIO s (Maybe Message)
  tryPeekMsg_ :: JournalQueue s -> JournalMsgQueue s -> StoreIO s (Maybe Message)
tryPeekMsg_ JournalQueue s
q mq :: JournalMsgQueue s
mq@JournalMsgQueue {TVar (Maybe (Maybe (Message, Int64)))
$sel:tipMsg:JournalMsgQueue :: forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
tipMsg :: TVar (Maybe (Maybe (Message, Int64)))
tipMsg, TVar (Maybe MsgQueueHandles)
$sel:handles:JournalMsgQueue :: forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles :: TVar (Maybe MsgQueueHandles)
handles} =
    IO (Maybe Message) -> StoreIO s (Maybe Message)
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO (Maybe Message) -> StoreIO s (Maybe Message))
-> IO (Maybe Message) -> StoreIO s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ (TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe MsgQueueHandles)
handles IO (Maybe MsgQueueHandles)
-> (MsgQueueHandles -> IO (Maybe (JournalState 'JTRead, Handle)))
-> IO (Maybe (JournalState 'JTRead, Handle))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> IO (Maybe (JournalState 'JTRead, Handle))
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> IO (Maybe (JournalState 'JTRead, Handle))
chooseReadJournal JournalQueue s
q JournalMsgQueue s
mq Bool
True IO (Maybe (JournalState 'JTRead, Handle))
-> ((JournalState 'JTRead, Handle) -> IO (Maybe Message))
-> IO (Maybe Message)
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= (JournalState 'JTRead, Handle) -> IO (Maybe Message)
peekMsg)
    where
      peekMsg :: (JournalState 'JTRead, Handle) -> IO (Maybe Message)
peekMsg (JournalState 'JTRead
rs, Handle
h) = TVar (Maybe (Maybe (Message, Int64)))
-> IO (Maybe (Maybe (Message, Int64)))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Maybe (Message, Int64)))
tipMsg IO (Maybe (Maybe (Message, Int64)))
-> (Maybe (Maybe (Message, Int64)) -> IO (Maybe Message))
-> IO (Maybe Message)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe Message)
-> (Maybe (Message, Int64) -> IO (Maybe Message))
-> Maybe (Maybe (Message, Int64))
-> IO (Maybe Message)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (Maybe Message)
readMsg (Maybe Message -> IO (Maybe Message)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message -> IO (Maybe Message))
-> (Maybe (Message, Int64) -> Maybe Message)
-> Maybe (Message, Int64)
-> IO (Maybe Message)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Message, Int64) -> Message)
-> Maybe (Message, Int64) -> Maybe Message
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Message, Int64) -> Message
forall a b. (a, b) -> a
fst)
        where
          readMsg :: IO (Maybe Message)
readMsg = do
            ml :: (Message, Int64)
ml@(Message
msg, Int64
_) <- Handle -> Int64 -> IO (Message, Int64)
hGetMsgAt Handle
h (Int64 -> IO (Message, Int64)) -> Int64 -> IO (Message, Int64)
forall a b. (a -> b) -> a -> b
$ JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Maybe (Message, Int64)))
-> Maybe (Maybe (Message, Int64)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Maybe (Message, Int64)))
tipMsg (Maybe (Maybe (Message, Int64)) -> STM ())
-> Maybe (Maybe (Message, Int64)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (Message, Int64) -> Maybe (Maybe (Message, Int64))
forall a. a -> Maybe a
Just ((Message, Int64) -> Maybe (Message, Int64)
forall a. a -> Maybe a
Just (Message, Int64)
ml)
            Maybe Message -> IO (Maybe Message)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message -> IO (Maybe Message))
-> Maybe Message -> IO (Maybe Message)
forall a b. (a -> b) -> a -> b
$ Message -> Maybe Message
forall a. a -> Maybe a
Just Message
msg

  tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s ()
  tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s ()
tryDeleteMsg_ JournalQueue s
q mq :: JournalMsgQueue s
mq@JournalMsgQueue {TVar (Maybe (Maybe (Message, Int64)))
$sel:tipMsg:JournalMsgQueue :: forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
tipMsg :: TVar (Maybe (Maybe (Message, Int64)))
tipMsg, TVar (Maybe MsgQueueHandles)
$sel:handles:JournalMsgQueue :: forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles :: TVar (Maybe MsgQueueHandles)
handles} Bool
logState = IO () -> StoreIO s ()
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO () -> StoreIO s ()) -> IO () -> StoreIO s ()
forall a b. (a -> b) -> a -> b
$ (IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`E.finally` Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
logState (JournalQueue s -> IO ()
forall (s :: QSType). JournalQueue s -> IO ()
updateActiveAt JournalQueue s
q)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
      TVar (Maybe (Maybe (Message, Int64)))
-> IO (Maybe (Maybe (Message, Int64)))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Maybe (Message, Int64)))
tipMsg -- if there is no cached tipMsg, do nothing
        IO (Maybe (Maybe (Message, Int64)))
-> (Maybe (Message, Int64) -> IO (Maybe Int64)) -> IO (Maybe Int64)
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= (Maybe Int64 -> IO (Maybe Int64)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Int64 -> IO (Maybe Int64))
-> (Maybe (Message, Int64) -> Maybe Int64)
-> Maybe (Message, Int64)
-> IO (Maybe Int64)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Message, Int64) -> Int64)
-> Maybe (Message, Int64) -> Maybe Int64
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Message, Int64) -> Int64
forall a b. (a, b) -> b
snd)
        IO (Maybe Int64) -> (Int64 -> IO (Maybe ())) -> IO (Maybe ())
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \Int64
len -> TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe MsgQueueHandles)
handles
        IO (Maybe MsgQueueHandles)
-> (MsgQueueHandles -> IO (Maybe ())) -> IO (Maybe ())
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \MsgQueueHandles
hs -> JournalQueue s
-> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
updateReadPos JournalQueue s
q JournalMsgQueue s
mq Bool
logState Int64
len MsgQueueHandles
hs IO () -> Maybe () -> IO (Maybe ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> () -> Maybe ()
forall a. a -> Maybe a
Just ()

  isolateQueue :: JournalMsgStore s -> JournalQueue s -> Text -> StoreIO s a -> ExceptT ErrorType IO a
  isolateQueue :: forall a.
JournalMsgStore s
-> JournalQueue s -> Text -> StoreIO s a -> ExceptT ErrorType IO a
isolateQueue JournalMsgStore s
_ JournalQueue s
sq Text
op = Text -> RecipientId -> IO a -> ExceptT ErrorType IO a
forall a. Text -> RecipientId -> IO a -> ExceptT ErrorType IO a
tryStore' Text
op (JournalQueue s -> RecipientId
forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' JournalQueue s
sq) (IO a -> ExceptT ErrorType IO a)
-> (StoreIO s a -> IO a) -> StoreIO s a -> ExceptT ErrorType IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JournalQueue s -> Text -> IO a -> IO a
forall q a. StoreQueueClass q => q -> Text -> IO a -> IO a
forall a. JournalQueue s -> Text -> IO a -> IO a
withQueueLock JournalQueue s
sq Text
op (IO a -> IO a) -> (StoreIO s a -> IO a) -> StoreIO s a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StoreIO s a -> IO a
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO

  unsafeRunStore :: JournalQueue s -> Text -> StoreIO s a -> IO a
  unsafeRunStore :: forall a. JournalQueue s -> Text -> StoreIO s a -> IO a
unsafeRunStore JournalQueue s
sq Text
op StoreIO s a
a =
    StoreIO s a -> IO a
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO StoreIO s a
a IO a -> (SomeException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \SomeException
e -> Text -> RecipientId -> SomeException -> IO (Either ErrorType Any)
forall a.
Text -> RecipientId -> SomeException -> IO (Either ErrorType a)
storeError Text
op (JournalQueue s -> RecipientId
forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' JournalQueue s
sq) SomeException
e IO (Either ErrorType Any) -> IO a -> IO a
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> IO a
forall e a. Exception e => e -> IO a
E.throwIO SomeException
e

updateActiveAt :: JournalQueue s -> IO ()
updateActiveAt :: forall (s :: QSType). JournalQueue s -> IO ()
updateActiveAt JournalQueue s
q = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (SystemTime -> STM ()) -> SystemTime -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar Int64 -> Int64 -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalQueue s -> TVar Int64
forall (s :: QSType). JournalQueue s -> TVar Int64
activeAt JournalQueue s
q) (Int64 -> STM ()) -> (SystemTime -> Int64) -> SystemTime -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SystemTime -> Int64
systemSeconds (SystemTime -> IO ()) -> IO SystemTime -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO SystemTime
getSystemTime

tryStore' :: Text -> RecipientId -> IO a -> ExceptT ErrorType IO a
tryStore' :: forall a. Text -> RecipientId -> IO a -> ExceptT ErrorType IO a
tryStore' Text
op RecipientId
rId = Text
-> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
forall a.
Text
-> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
tryStore Text
op RecipientId
rId (IO (Either ErrorType a) -> ExceptT ErrorType IO a)
-> (IO a -> IO (Either ErrorType a))
-> IO a
-> ExceptT ErrorType IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Either ErrorType a) -> IO a -> IO (Either ErrorType a)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Either ErrorType a
forall a b. b -> Either a b
Right

tryStore :: forall a. Text -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
tryStore :: forall a.
Text
-> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
tryStore Text
op RecipientId
rId IO (Either ErrorType a)
a = IO (Either ErrorType a) -> ExceptT ErrorType IO a
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO (Either ErrorType a) -> ExceptT ErrorType IO a)
-> IO (Either ErrorType a) -> ExceptT ErrorType IO a
forall a b. (a -> b) -> a -> b
$ IO (Either ErrorType a) -> IO (Either ErrorType a)
forall a. IO a -> IO a
E.mask_ (IO (Either ErrorType a) -> IO (Either ErrorType a))
-> IO (Either ErrorType a) -> IO (Either ErrorType a)
forall a b. (a -> b) -> a -> b
$ IO (Either ErrorType a)
a IO (Either ErrorType a)
-> (SomeException -> IO (Either ErrorType a))
-> IO (Either ErrorType a)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` Text -> RecipientId -> SomeException -> IO (Either ErrorType a)
forall a.
Text -> RecipientId -> SomeException -> IO (Either ErrorType a)
storeError Text
op RecipientId
rId

storeError :: Text -> RecipientId -> E.SomeException -> IO (Either ErrorType a)
storeError :: forall a.
Text -> RecipientId -> SomeException -> IO (Either ErrorType a)
storeError Text
op RecipientId
rId SomeException
e =
  let e' :: Text
e' = Text -> [Text] -> Text
T.intercalate Text
", " [Text
op, ByteString -> Text
decodeLatin1 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ RecipientId -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode RecipientId
rId, SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
e]
    in Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text
"STORE: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
e') IO () -> Either ErrorType a -> IO (Either ErrorType a)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> ErrorType -> Either ErrorType a
forall a b. a -> Either a b
Left (Text -> ErrorType
STORE Text
e')

isolateQueueId :: Text -> JournalMsgStore s -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
isolateQueueId :: forall (s :: QSType) a.
Text
-> JournalMsgStore s
-> RecipientId
-> IO (Either ErrorType a)
-> ExceptT ErrorType IO a
isolateQueueId Text
op JournalMsgStore {TMap RecipientId Lock
$sel:queueLocks:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> TMap RecipientId Lock
queueLocks :: TMap RecipientId Lock
queueLocks, TMVar RecipientId
$sel:sharedLock:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> TMVar RecipientId
sharedLock :: TMVar RecipientId
sharedLock} RecipientId
rId =
  Text
-> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
forall a.
Text
-> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
tryStore Text
op RecipientId
rId (IO (Either ErrorType a) -> ExceptT ErrorType IO a)
-> (IO (Either ErrorType a) -> IO (Either ErrorType a))
-> IO (Either ErrorType a)
-> ExceptT ErrorType IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RecipientId
-> TMap RecipientId Lock
-> TMVar RecipientId
-> Text
-> IO (Either ErrorType a)
-> IO (Either ErrorType a)
forall a.
RecipientId
-> TMap RecipientId Lock
-> TMVar RecipientId
-> Text
-> IO a
-> IO a
withLockMapWaitShared RecipientId
rId TMap RecipientId Lock
queueLocks TMVar RecipientId
sharedLock Text
op

openMsgQueue :: JournalMsgStore s -> JMQueue -> Bool -> IO (JournalMsgQueue s)
openMsgQueue :: forall (s :: QSType).
JournalMsgStore s -> JMQueue -> Bool -> IO (JournalMsgQueue s)
openMsgQueue ms :: JournalMsgStore s
ms@JournalMsgStore {JournalStoreConfig s
$sel:config:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config :: JournalStoreConfig s
config} q :: JMQueue
q@JMQueue {$sel:queueDirectory:JMQueue :: JMQueue -> String
queueDirectory = String
dir, String
$sel:statePath:JMQueue :: JMQueue -> String
statePath :: String
statePath} Bool
forWrite = do
  (Maybe MsgQueueState
st_, Bool
shouldBackup) <- JournalMsgStore s -> String -> IO (Maybe MsgQueueState, Bool)
forall (s :: QSType).
JournalMsgStore s -> String -> IO (Maybe MsgQueueState, Bool)
readQueueState JournalMsgStore s
ms String
statePath
  case Maybe MsgQueueState
st_ of
    Maybe MsgQueueState
Nothing -> do
      MsgQueueState
st <- ByteString -> MsgQueueState
newMsgQueueState (ByteString -> MsgQueueState) -> IO ByteString -> IO MsgQueueState
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar StdGen -> IO ByteString
newJournalId (JournalMsgStore s -> TVar StdGen
forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random JournalMsgStore s
ms)
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldBackup (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
backupQueueState String
statePath -- rename invalid state file
      JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
forall (s :: QSType).
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue JMQueue
q MsgQueueState
st Maybe MsgQueueHandles
forall a. Maybe a
Nothing
    Just MsgQueueState
st
      | MsgQueueState -> Int
size MsgQueueState
st Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 -> do
          (MsgQueueState
st', Maybe MsgQueueHandles
hs_) <- MsgQueueState -> Bool -> IO (MsgQueueState, Maybe MsgQueueHandles)
removeJournals MsgQueueState
st Bool
shouldBackup
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe MsgQueueHandles -> Bool
forall a. Maybe a -> Bool
isJust Maybe MsgQueueHandles
hs_) IO ()
incOpenedCount
          JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
forall (s :: QSType).
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue JMQueue
q MsgQueueState
st' Maybe MsgQueueHandles
hs_
      | Bool
otherwise -> do
          Handle
sh <- MsgQueueState -> Bool -> IO Handle
openBackupQueueState MsgQueueState
st Bool
shouldBackup
          (MsgQueueState
st', Handle
rh, Maybe Handle
wh_) <- Handle
-> IO (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. Handle -> IO a -> IO a
closeOnException Handle
sh (IO (MsgQueueState, Handle, Maybe Handle)
 -> IO (MsgQueueState, Handle, Maybe Handle))
-> IO (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s
-> String
-> MsgQueueState
-> Handle
-> IO (MsgQueueState, Handle, Maybe Handle)
forall (s :: QSType).
JournalMsgStore s
-> String
-> MsgQueueState
-> Handle
-> IO (MsgQueueState, Handle, Maybe Handle)
openJournals JournalMsgStore s
ms String
dir MsgQueueState
st Handle
sh
          let hs :: MsgQueueHandles
hs = MsgQueueHandles {$sel:stateHandle:MsgQueueHandles :: Handle
stateHandle = Handle
sh, $sel:readHandle:MsgQueueHandles :: Handle
readHandle = Handle
rh, $sel:writeHandle:MsgQueueHandles :: Maybe Handle
writeHandle = Maybe Handle
wh_}
          IO ()
incOpenedCount
          JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
forall (s :: QSType).
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue JMQueue
q MsgQueueState
st' (MsgQueueHandles -> Maybe MsgQueueHandles
forall a. a -> Maybe a
Just MsgQueueHandles
hs)
  where
    incOpenedCount :: IO ()
incOpenedCount = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (JournalMsgStore s -> TVar Int
forall (s :: QSType). JournalMsgStore s -> TVar Int
openedQueueCount JournalMsgStore s
ms) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
    -- If the queue is empty, journals are deleted.
    -- New journal is created if queue is written to.
    -- canWrite is set to True.
    removeJournals :: MsgQueueState -> Bool -> IO (MsgQueueState, Maybe MsgQueueHandles)
removeJournals MsgQueueState {$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, $sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws} Bool
shouldBackup = IO (MsgQueueState, Maybe MsgQueueHandles)
-> IO (MsgQueueState, Maybe MsgQueueHandles)
forall a. IO a -> IO a
E.uninterruptibleMask_ (IO (MsgQueueState, Maybe MsgQueueHandles)
 -> IO (MsgQueueState, Maybe MsgQueueHandles))
-> IO (MsgQueueState, Maybe MsgQueueHandles)
-> IO (MsgQueueState, Maybe MsgQueueHandles)
forall a b. (a -> b) -> a -> b
$ do
      ByteString
rjId <- TVar StdGen -> IO ByteString
newJournalId (TVar StdGen -> IO ByteString) -> TVar StdGen -> IO ByteString
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s -> TVar StdGen
forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random JournalMsgStore s
ms
      let st :: MsgQueueState
st = ByteString -> MsgQueueState
newMsgQueueState ByteString
rjId
      Maybe MsgQueueHandles
hs_ <-
        if Bool
forWrite
          then MsgQueueHandles -> Maybe MsgQueueHandles
forall a. a -> Maybe a
Just (MsgQueueHandles -> Maybe MsgQueueHandles)
-> IO MsgQueueHandles -> IO (Maybe MsgQueueHandles)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MsgQueueState -> ByteString -> IO MsgQueueHandles
newJournalHandles MsgQueueState
st ByteString
rjId
          else Maybe MsgQueueHandles
forall a. Maybe a
Nothing Maybe MsgQueueHandles -> IO () -> IO (Maybe MsgQueueHandles)
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ String -> IO ()
backupQueueState String
statePath
      String -> JournalState 'JTRead -> IO ()
forall (t :: JournalType). String -> JournalState t -> IO ()
removeJournalIfExists String
dir JournalState 'JTRead
rs
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> JournalState 'JTWrite -> IO ()
forall (t :: JournalType). String -> JournalState t -> IO ()
removeJournalIfExists String
dir JournalState 'JTWrite
ws
      (MsgQueueState, Maybe MsgQueueHandles)
-> IO (MsgQueueState, Maybe MsgQueueHandles)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st, Maybe MsgQueueHandles
hs_)
      where
        newJournalHandles :: MsgQueueState -> ByteString -> IO MsgQueueHandles
newJournalHandles MsgQueueState
st ByteString
rjId = do
          Handle
sh <- MsgQueueState -> Bool -> IO Handle
openBackupQueueState MsgQueueState
st Bool
shouldBackup
          Handle -> MsgQueueState -> IO ()
appendState_ Handle
sh MsgQueueState
st
          Handle
rh <- Handle -> IO Handle -> IO Handle
forall a. Handle -> IO a -> IO a
closeOnException Handle
sh (IO Handle -> IO Handle) -> IO Handle -> IO Handle
forall a b. (a -> b) -> a -> b
$ String -> ByteString -> IO Handle
createNewJournal String
dir ByteString
rjId
          MsgQueueHandles -> IO MsgQueueHandles
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MsgQueueHandles {$sel:stateHandle:MsgQueueHandles :: Handle
stateHandle = Handle
sh, $sel:readHandle:MsgQueueHandles :: Handle
readHandle = Handle
rh, $sel:writeHandle:MsgQueueHandles :: Maybe Handle
writeHandle = Maybe Handle
forall a. Maybe a
Nothing}
    openBackupQueueState :: MsgQueueState -> Bool -> IO Handle
openBackupQueueState MsgQueueState
st Bool
shouldBackup
      | Bool
shouldBackup = do
          -- State backup is made in two steps to mitigate the crash during the backup.
          -- Temporary backup file will be used when it is present.
          let tempBackup :: String
tempBackup = String
statePath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
".bak"
          String -> String -> IO ()
renameFile String
statePath String
tempBackup -- 1) temp backup
          Handle
sh <- String -> IOMode -> IO Handle
openFile String
statePath IOMode
AppendMode
          Handle -> IO () -> IO ()
forall a. Handle -> IO a -> IO a
closeOnException Handle
sh (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> MsgQueueState -> IO ()
appendState Handle
sh MsgQueueState
st -- 2) save state to new file
          String -> IO ()
backupQueueState String
tempBackup -- 3) timed backup
          Handle -> IO Handle
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Handle
sh
      | Bool
otherwise = String -> IOMode -> IO Handle
openFile String
statePath IOMode
AppendMode
    backupQueueState :: String -> IO ()
backupQueueState String
path = do
      UTCTime
ts <- IO UTCTime
getCurrentTime
      String -> String -> IO ()
renameFile String
path (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> UTCTime -> String
stateBackupPath String
statePath UTCTime
ts
      -- remove old backups
      [UTCTime]
times <- [UTCTime] -> [UTCTime]
forall a. Ord a => [a] -> [a]
sort ([UTCTime] -> [UTCTime])
-> ([String] -> [UTCTime]) -> [String] -> [UTCTime]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String -> Maybe UTCTime) -> [String] -> [UTCTime]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe String -> Maybe UTCTime
backupPathTime ([String] -> [UTCTime]) -> IO [String] -> IO [UTCTime]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IO [String]
listDirectory String
dir
      let toDelete :: [UTCTime]
toDelete = (UTCTime -> Bool) -> [UTCTime] -> [UTCTime]
forall a. (a -> Bool) -> [a] -> [a]
filter (UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< JournalMsgStore s -> UTCTime
forall (s :: QSType). JournalMsgStore s -> UTCTime
expireBackupsBefore JournalMsgStore s
ms) ([UTCTime] -> [UTCTime]) -> [UTCTime] -> [UTCTime]
forall a b. (a -> b) -> a -> b
$ Int -> [UTCTime] -> [UTCTime]
forall a. Int -> [a] -> [a]
take ([UTCTime] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [UTCTime]
times Int -> Int -> Int
forall a. Num a => a -> a -> a
- JournalStoreConfig s -> Int
forall (s :: QSType). JournalStoreConfig s -> Int
keepMinBackups JournalStoreConfig s
config) [UTCTime]
times
      (UTCTime -> IO ()) -> [UTCTime] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Text -> String -> IO ()
safeRemoveFile Text
"removeBackups" (String -> IO ()) -> (UTCTime -> String) -> UTCTime -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> UTCTime -> String
stateBackupPath String
statePath) [UTCTime]
toDelete
      where
        backupPathTime :: FilePath -> Maybe UTCTime
        backupPathTime :: String -> Maybe UTCTime
backupPathTime = String -> Maybe UTCTime
forall (m :: * -> *) t. (MonadFail m, ISO8601 t) => String -> m t
iso8601ParseM (String -> Maybe UTCTime)
-> (Text -> String) -> Text -> Maybe UTCTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> String
T.unpack (Text -> Maybe UTCTime)
-> (String -> Maybe Text) -> String -> Maybe UTCTime
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< Text -> Text -> Maybe Text
T.stripSuffix Text
".bak" (Text -> Maybe Text)
-> (String -> Maybe Text) -> String -> Maybe Text
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< Text -> Text -> Maybe Text
T.stripPrefix Text
statePathPfx (Text -> Maybe Text) -> (String -> Text) -> String -> Maybe Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack
        statePathPfx :: Text
statePathPfx = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ ShowS
takeFileName String
statePath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"."

mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue :: forall (s :: QSType).
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue JMQueue
queue MsgQueueState
st Maybe MsgQueueHandles
hs_ = do
  TVar MsgQueueState
state <- MsgQueueState -> IO (TVar MsgQueueState)
forall a. a -> IO (TVar a)
newTVarIO MsgQueueState
st
  TVar (Maybe (Maybe (Message, Int64)))
tipMsg <- Maybe (Maybe (Message, Int64))
-> IO (TVar (Maybe (Maybe (Message, Int64))))
forall a. a -> IO (TVar a)
newTVarIO Maybe (Maybe (Message, Int64))
forall a. Maybe a
Nothing
  TVar (Maybe MsgQueueHandles)
handles <- Maybe MsgQueueHandles -> IO (TVar (Maybe MsgQueueHandles))
forall a. a -> IO (TVar a)
newTVarIO Maybe MsgQueueHandles
hs_
  -- using the same queue lock which is currently locked,
  -- to avoid map lookup on queue operations
  JournalMsgQueue s -> IO (JournalMsgQueue s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JournalMsgQueue {JMQueue
$sel:queue:JournalMsgQueue :: JMQueue
queue :: JMQueue
queue, TVar MsgQueueState
$sel:state:JournalMsgQueue :: TVar MsgQueueState
state :: TVar MsgQueueState
state, TVar (Maybe (Maybe (Message, Int64)))
$sel:tipMsg:JournalMsgQueue :: TVar (Maybe (Maybe (Message, Int64)))
tipMsg :: TVar (Maybe (Maybe (Message, Int64)))
tipMsg, TVar (Maybe MsgQueueHandles)
$sel:handles:JournalMsgQueue :: TVar (Maybe MsgQueueHandles)
handles :: TVar (Maybe MsgQueueHandles)
handles}

chooseReadJournal :: JournalQueue s -> JournalMsgQueue s -> Bool -> MsgQueueHandles -> IO (Maybe (JournalState 'JTRead, Handle))
chooseReadJournal :: forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> IO (Maybe (JournalState 'JTRead, Handle))
chooseReadJournal JournalQueue s
q' JournalMsgQueue s
q Bool
log' MsgQueueHandles
hs = do
  st :: MsgQueueState
st@MsgQueueState {$sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws, $sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs} <- TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q)
  case MsgQueueHandles -> Maybe Handle
writeHandle MsgQueueHandles
hs of
    Just Handle
wh | JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Bool -> Bool -> Bool
&& JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws -> do
      -- switching to write journal
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe MsgQueueHandles) -> Maybe MsgQueueHandles -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles JournalMsgQueue s
q) (Maybe MsgQueueHandles -> STM ())
-> Maybe MsgQueueHandles -> STM ()
forall a b. (a -> b) -> a -> b
$ MsgQueueHandles -> Maybe MsgQueueHandles
forall a. a -> Maybe a
Just MsgQueueHandles
hs {readHandle = wh, writeHandle = Nothing}
      Handle -> IO ()
hClose (Handle -> IO ()) -> Handle -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgQueueHandles -> Handle
readHandle MsgQueueHandles
hs
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
log' (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> JournalState 'JTRead -> IO ()
forall (t :: JournalType). String -> JournalState t -> IO ()
removeJournal (JMQueue -> String
queueDirectory (JMQueue -> String) -> JMQueue -> String
forall a b. (a -> b) -> a -> b
$ JournalMsgQueue s -> JMQueue
forall (s :: QSType). JournalMsgQueue s -> JMQueue
queue JournalMsgQueue s
q) JournalState 'JTRead
rs
      let !rs' :: JournalState 'JTRead
rs' = (ByteString -> JournalState 'JTRead
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState (ByteString -> JournalState 'JTRead)
-> ByteString -> JournalState 'JTRead
forall a b. (a -> b) -> a -> b
$ JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws) {msgCount = msgCount ws, byteCount = byteCount ws}
          !st' :: MsgQueueState
st' = MsgQueueState
st {readState = rs'}
      JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
updateQueueState JournalQueue s
q' JournalMsgQueue s
q Bool
log' MsgQueueHandles
hs MsgQueueState
st' (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (JournalState 'JTRead, Handle)
 -> IO (Maybe (JournalState 'JTRead, Handle)))
-> Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle))
forall a b. (a -> b) -> a -> b
$ (JournalState 'JTRead, Handle)
-> Maybe (JournalState 'JTRead, Handle)
forall a. a -> Maybe a
Just (JournalState 'JTRead
rs', Handle
wh)
    Maybe Handle
_ | JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Bool -> Bool -> Bool
&& JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws -> Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JournalState 'JTRead, Handle)
forall a. Maybe a
Nothing
    Maybe Handle
_ -> Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (JournalState 'JTRead, Handle)
 -> IO (Maybe (JournalState 'JTRead, Handle)))
-> Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle))
forall a b. (a -> b) -> a -> b
$ (JournalState 'JTRead, Handle)
-> Maybe (JournalState 'JTRead, Handle)
forall a. a -> Maybe a
Just (JournalState 'JTRead
rs, MsgQueueHandles -> Handle
readHandle MsgQueueHandles
hs)

updateQueueState :: JournalQueue s -> JournalMsgQueue s -> Bool -> MsgQueueHandles -> MsgQueueState -> STM () -> IO ()
updateQueueState :: forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
updateQueueState JournalQueue s
q' JournalMsgQueue s
q Bool
log' MsgQueueHandles
hs MsgQueueState
st STM ()
a = do
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (MsgQueueState -> Bool
validQueueState MsgQueueState
st) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOError -> IO ()
forall e a. Exception e => e -> IO a
E.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"updateQueueState invalid state: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MsgQueueState -> String
forall a. Show a => a -> String
show MsgQueueState
st
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
log' (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> MsgQueueState -> IO ()
appendState (MsgQueueHandles -> Handle
stateHandle MsgQueueHandles
hs) MsgQueueState
st
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe QState) -> Maybe QState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalQueue s -> TVar (Maybe QState)
forall (s :: QSType). JournalQueue s -> TVar (Maybe QState)
queueState JournalQueue s
q') (Maybe QState -> STM ()) -> Maybe QState -> STM ()
forall a b. (a -> b) -> a -> b
$ QState -> Maybe QState
forall a. a -> Maybe a
Just (QState -> Maybe QState) -> QState -> Maybe QState
forall a b. (a -> b) -> a -> b
$! MsgQueueState -> QState
qState MsgQueueState
st
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar MsgQueueState -> MsgQueueState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q) MsgQueueState
st STM () -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM ()
a

appendState :: Handle -> MsgQueueState -> IO ()
appendState :: Handle -> MsgQueueState -> IO ()
appendState Handle
h = IO () -> IO ()
forall a. IO a -> IO a
E.uninterruptibleMask_ (IO () -> IO ())
-> (MsgQueueState -> IO ()) -> MsgQueueState -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> MsgQueueState -> IO ()
appendState_ Handle
h
{-# INLINE appendState #-}

appendState_ :: Handle -> MsgQueueState -> IO ()
appendState_ :: Handle -> MsgQueueState -> IO ()
appendState_ Handle
h MsgQueueState
st = Handle -> ByteString -> IO ()
B.hPutStr Handle
h (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgQueueState -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode MsgQueueState
st ByteString -> Char -> ByteString
`B.snoc` Char
'\n'

updateReadPos :: JournalQueue s -> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
updateReadPos :: forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
updateReadPos JournalQueue s
q' JournalMsgQueue s
q Bool
log' Int64
len MsgQueueHandles
hs = do
  st :: MsgQueueState
st@MsgQueueState {$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size} <- TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q)
  let JournalState {Int
$sel:msgPos:JournalState :: forall (t :: JournalType). JournalState t -> Int
msgPos :: Int
msgPos, Int64
$sel:bytePos:JournalState :: forall (t :: JournalType). JournalState t -> Int64
bytePos :: Int64
bytePos} = JournalState 'JTRead
rs
  let msgPos' :: Int
msgPos' = Int
msgPos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
      rs' :: JournalState 'JTRead
rs' = JournalState 'JTRead
rs {msgPos = msgPos', bytePos = bytePos + len}
      st' :: MsgQueueState
st' = MsgQueueState
st {readState = rs', size = size - 1}
  JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
updateQueueState JournalQueue s
q' JournalMsgQueue s
q Bool
log' MsgQueueHandles
hs MsgQueueState
st' (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Maybe (Message, Int64)))
-> Maybe (Maybe (Message, Int64)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
tipMsg JournalMsgQueue s
q) Maybe (Maybe (Message, Int64))
forall a. Maybe a
Nothing

msgQueueDirectory :: JournalMsgStore s -> RecipientId -> FilePath
msgQueueDirectory :: forall (s :: QSType). JournalMsgStore s -> RecipientId -> String
msgQueueDirectory JournalMsgStore {$sel:config:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config = JournalStoreConfig {String
$sel:storePath:JournalStoreConfig :: forall (s :: QSType). JournalStoreConfig s -> String
storePath :: String
storePath, Int
$sel:pathParts:JournalStoreConfig :: forall (s :: QSType). JournalStoreConfig s -> Int
pathParts :: Int
pathParts}} RecipientId
rId =
  String
storePath String -> ShowS
</> ByteString -> String
B.unpack (ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
"/" ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> [ByteString]
forall {t}. (Eq t, Num t) => t -> ByteString -> [ByteString]
splitSegments Int
pathParts (ByteString -> [ByteString]) -> ByteString -> [ByteString]
forall a b. (a -> b) -> a -> b
$ RecipientId -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode RecipientId
rId)
  where
    splitSegments :: t -> ByteString -> [ByteString]
splitSegments t
_ ByteString
"" = []
    splitSegments t
1 ByteString
s = [ByteString
s]
    splitSegments t
n ByteString
s =
      let (ByteString
seg, ByteString
s') = Int -> ByteString -> (ByteString, ByteString)
B.splitAt Int
2 ByteString
s
       in ByteString
seg ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: t -> ByteString -> [ByteString]
splitSegments (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1) ByteString
s'

msgQueueStatePath :: FilePath -> RecipientId -> FilePath
msgQueueStatePath :: String -> RecipientId -> String
msgQueueStatePath String
dir RecipientId
rId = String
dir String -> ShowS
</> (String
queueLogFileName String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
B.unpack (RecipientId -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode RecipientId
rId) String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
logFileExt)

createNewJournal :: FilePath -> ByteString -> IO Handle
createNewJournal :: String -> ByteString -> IO Handle
createNewJournal String
dir ByteString
journalId = do
  let path :: String
path = String -> ByteString -> String
journalFilePath String
dir ByteString
journalId -- TODO retry if file exists
  Handle
h <- String -> IOMode -> IO Handle
openFile String
path IOMode
ReadWriteMode
  Handle -> ByteString -> IO ()
B.hPutStr Handle
h ByteString
""
  Handle -> IO Handle
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Handle
h

newJournalId :: TVar StdGen -> IO ByteString
newJournalId :: TVar StdGen -> IO ByteString
newJournalId TVar StdGen
g = ByteString -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode (ByteString -> ByteString) -> IO ByteString -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM ByteString -> IO ByteString
forall a. STM a -> IO a
atomically (TVar StdGen -> (StdGen -> (ByteString, StdGen)) -> STM ByteString
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar StdGen
g ((StdGen -> (ByteString, StdGen)) -> STM ByteString)
-> (StdGen -> (ByteString, StdGen)) -> STM ByteString
forall a b. (a -> b) -> a -> b
$ Int -> StdGen -> (ByteString, StdGen)
forall g. RandomGen g => Int -> g -> (ByteString, g)
genByteString Int
12)

openJournals :: JournalMsgStore s -> FilePath -> MsgQueueState -> Handle -> IO (MsgQueueState, Handle, Maybe Handle)
openJournals :: forall (s :: QSType).
JournalMsgStore s
-> String
-> MsgQueueState
-> Handle
-> IO (MsgQueueState, Handle, Maybe Handle)
openJournals JournalMsgStore s
ms String
dir st :: MsgQueueState
st@MsgQueueState {$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, $sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws} Handle
sh = do
  let rjId :: ByteString
rjId = JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs
      wjId :: ByteString
wjId = JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws
  JournalState 'JTRead -> IO (Either String Handle)
forall (t :: JournalType).
JournalState t -> IO (Either String Handle)
openJournal JournalState 'JTRead
rs IO (Either String Handle)
-> (Either String Handle
    -> IO (MsgQueueState, Handle, Maybe Handle))
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Left String
path
      | ByteString
rjId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
wjId -> do
          Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: openJournals, no read/write file - creating new file, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path
          IO (MsgQueueState, Handle, Maybe Handle)
newReadJournal
      | Bool
otherwise -> do
          let rs' :: JournalState 'JTRead
rs' = (ByteString -> JournalState 'JTRead
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
wjId) {msgCount = msgCount ws, byteCount = byteCount ws}
              st' :: MsgQueueState
st' = MsgQueueState
st {readState = rs', size = msgCount ws}
          JournalState 'JTRead -> IO (Either String Handle)
forall (t :: JournalType).
JournalState t -> IO (Either String Handle)
openJournal JournalState 'JTRead
rs' IO (Either String Handle)
-> (Either String Handle
    -> IO (MsgQueueState, Handle, Maybe Handle))
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Left String
path' -> do
              Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: openJournals, no read and write files - creating new file, read: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", write: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path'
              IO (MsgQueueState, Handle, Maybe Handle)
newReadJournal
            Right Handle
rh -> do
              Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: openJournals, no read file - switched to write file, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path
              Handle -> IO () -> IO ()
forall a. Handle -> IO a -> IO a
closeOnException Handle
rh (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> Int64 -> IO ()
fixFileSize Handle
rh (Int64 -> IO ()) -> Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws
              (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st', Handle
rh, Maybe Handle
forall a. Maybe a
Nothing)
    Right Handle
rh
      | ByteString
rjId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
wjId -> do
          Handle -> IO () -> IO ()
forall a. Handle -> IO a -> IO a
closeOnException Handle
rh (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> Int64 -> IO ()
fixFileSize Handle
rh (Int64 -> IO ()) -> Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws
          (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st, Handle
rh, Maybe Handle
forall a. Maybe a
Nothing)
      | Bool
otherwise -> Handle
-> IO (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. Handle -> IO a -> IO a
closeOnException Handle
rh (IO (MsgQueueState, Handle, Maybe Handle)
 -> IO (MsgQueueState, Handle, Maybe Handle))
-> IO (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a b. (a -> b) -> a -> b
$ do
          Handle -> Int64 -> IO ()
fixFileSize Handle
rh (Int64 -> IO ()) -> Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs
          JournalState 'JTWrite -> IO (Either String Handle)
forall (t :: JournalType).
JournalState t -> IO (Either String Handle)
openJournal JournalState 'JTWrite
ws IO (Either String Handle)
-> (Either String Handle
    -> IO (MsgQueueState, Handle, Maybe Handle))
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Left String
path -> do
              let msgs :: Int
msgs = JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs
                  bytes :: Int64
bytes = JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs
                  size' :: Int
size' = Int
msgs Int -> Int -> Int
forall a. Num a => a -> a -> a
- JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs
                  ws' :: JournalState 'JTWrite
ws' = (ByteString -> JournalState 'JTWrite
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
rjId) {msgPos = msgs, msgCount = msgs, bytePos = bytes, byteCount = bytes}
                  st' :: MsgQueueState
st' = MsgQueueState
st {writeState = ws', size = size'} -- we don't amend canWrite to trigger QCONT
              Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: openJournals, no write file, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path
              (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st', Handle
rh, Maybe Handle
forall a. Maybe a
Nothing)
            Right Handle
wh -> do
              Handle -> IO () -> IO ()
forall a. Handle -> IO a -> IO a
closeOnException Handle
wh (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> Int64 -> IO ()
fixFileSize Handle
wh (Int64 -> IO ()) -> Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws
              (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st, Handle
rh, Handle -> Maybe Handle
forall a. a -> Maybe a
Just Handle
wh)
  where
    newReadJournal :: IO (MsgQueueState, Handle, Maybe Handle)
newReadJournal = do
      ByteString
rjId' <- TVar StdGen -> IO ByteString
newJournalId (TVar StdGen -> IO ByteString) -> TVar StdGen -> IO ByteString
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s -> TVar StdGen
forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random JournalMsgStore s
ms
      Handle
rh <- String -> ByteString -> IO Handle
createNewJournal String
dir ByteString
rjId'
      let st' :: MsgQueueState
st' = ByteString -> MsgQueueState
newMsgQueueState ByteString
rjId'
      Handle -> IO () -> IO ()
forall a. Handle -> IO a -> IO a
closeOnException Handle
rh (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> MsgQueueState -> IO ()
appendState Handle
sh MsgQueueState
st'
      (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st', Handle
rh, Maybe Handle
forall a. Maybe a
Nothing)
    openJournal :: JournalState t -> IO (Either FilePath Handle)
    openJournal :: forall (t :: JournalType).
JournalState t -> IO (Either String Handle)
openJournal JournalState {ByteString
$sel:journalId:JournalState :: forall (t :: JournalType). JournalState t -> ByteString
journalId :: ByteString
journalId} =
      let path :: String
path = String -> ByteString -> String
journalFilePath String
dir ByteString
journalId
       in IO Bool
-> IO (Either String Handle)
-> IO (Either String Handle)
-> IO (Either String Handle)
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (String -> IO Bool
doesFileExist String
path) (Handle -> Either String Handle
forall a b. b -> Either a b
Right (Handle -> Either String Handle)
-> IO Handle -> IO (Either String Handle)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IOMode -> IO Handle
openFile String
path IOMode
ReadWriteMode) (Either String Handle -> IO (Either String Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String Handle -> IO (Either String Handle))
-> Either String Handle -> IO (Either String Handle)
forall a b. (a -> b) -> a -> b
$ String -> Either String Handle
forall a b. a -> Either a b
Left String
path)
    -- do that for all append operations

fixFileSize :: Handle -> Int64 -> IO ()
fixFileSize :: Handle -> Int64 -> IO ()
fixFileSize Handle
h Int64
pos = do
  let pos' :: Integer
pos' = Int64 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
pos
  Integer
size <- Handle -> IO Integer
IO.hFileSize Handle
h
  if
    | Integer
size Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Integer
pos' -> do
        String
name <- Handle -> IO String
IO.hShow Handle
h
        Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logWarn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: fixFileSize, size " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Integer -> Text
forall a. Show a => a -> Text
tshow Integer
size Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" > pos " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int64 -> Text
forall a. Show a => a -> Text
tshow Int64
pos Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" - truncating, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
name
        Handle -> Integer -> IO ()
IO.hSetFileSize Handle
h Integer
pos'
    | Integer
size Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
< Integer
pos' -> do
        -- From code logic this can't happen.
        String
name <- Handle -> IO String
IO.hShow Handle
h
        IOError -> IO ()
forall e a. Exception e => e -> IO a
E.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"fixFileSize size " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Integer -> String
forall a. Show a => a -> String
show Integer
size String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" < pos " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int64 -> String
forall a. Show a => a -> String
show Int64
pos String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" - aborting: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
name
    | Bool
otherwise -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

removeJournal :: FilePath -> JournalState t -> IO ()
removeJournal :: forall (t :: JournalType). String -> JournalState t -> IO ()
removeJournal String
dir JournalState {ByteString
$sel:journalId:JournalState :: forall (t :: JournalType). JournalState t -> ByteString
journalId :: ByteString
journalId} =
  Text -> String -> IO ()
safeRemoveFile Text
"removeJournal" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ByteString -> String
journalFilePath String
dir ByteString
journalId

removeJournalIfExists :: FilePath -> JournalState t -> IO ()
removeJournalIfExists :: forall (t :: JournalType). String -> JournalState t -> IO ()
removeJournalIfExists String
dir JournalState {ByteString
$sel:journalId:JournalState :: forall (t :: JournalType). JournalState t -> ByteString
journalId :: ByteString
journalId} = do
  let path :: String
path = String -> ByteString -> String
journalFilePath String
dir ByteString
journalId
  Text -> String -> IO () -> IO ()
handleError Text
"removeJournalIfExists" String
path (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    IO Bool -> IO () -> IO ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (String -> IO Bool
doesFileExist String
path) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
removeFile String
path

safeRemoveFile :: Text -> FilePath -> IO ()
safeRemoveFile :: Text -> String -> IO ()
safeRemoveFile Text
cxt String
path = Text -> String -> IO () -> IO ()
handleError Text
cxt String
path (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
removeFile String
path

handleError :: Text -> FilePath -> IO () -> IO ()
handleError :: Text -> String -> IO () -> IO ()
handleError Text
cxt String
path IO ()
a =
  IO ()
a IO () -> (forall e. Exception e => e -> IO ()) -> IO ()
forall a. IO a -> (forall e. Exception e => e -> IO a) -> IO a
`catchAny` \e
e -> Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
cxt Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> e -> Text
forall a. Show a => a -> Text
tshow e
e

-- This function is supposed to be resilient to crashes while updating state files,
-- and also resilient to crashes during its execution.
readQueueState :: JournalMsgStore s -> FilePath -> IO (Maybe MsgQueueState, Bool)
readQueueState :: forall (s :: QSType).
JournalMsgStore s -> String -> IO (Maybe MsgQueueState, Bool)
readQueueState JournalMsgStore {JournalStoreConfig s
$sel:config:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config :: JournalStoreConfig s
config} String
statePath =
  IO Bool
-> IO (Maybe MsgQueueState, Bool)
-> IO (Maybe MsgQueueState, Bool)
-> IO (Maybe MsgQueueState, Bool)
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
    (String -> IO Bool
doesFileExist String
tempBackup)
    (String -> String -> IO ()
renameFile String
tempBackup String
statePath IO ()
-> IO (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Maybe MsgQueueState, Bool)
readState)
    (IO Bool
-> IO (Maybe MsgQueueState, Bool)
-> IO (Maybe MsgQueueState, Bool)
-> IO (Maybe MsgQueueState, Bool)
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (String -> IO Bool
doesFileExist String
statePath) IO (Maybe MsgQueueState, Bool)
readState (IO (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool))
-> IO (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a b. (a -> b) -> a -> b
$ (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe MsgQueueState
forall a. Maybe a
Nothing, Bool
False))
  where
    tempBackup :: String
tempBackup = String
statePath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
".bak"
    readState :: IO (Maybe MsgQueueState, Bool)
readState = do
      [ByteString]
ls <- ByteString -> [ByteString]
B.lines (ByteString -> [ByteString]) -> IO ByteString -> IO [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ByteString
readFileTail
      case [ByteString]
ls of
        [] -> do
          Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logWarn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: readWriteQueueState, empty queue state, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
statePath
          (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe MsgQueueState
forall a. Maybe a
Nothing, Bool
False)
        [ByteString]
_ -> do
          (Maybe MsgQueueState, Bool)
r <- Int -> Bool -> [ByteString] -> IO (Maybe MsgQueueState, Bool)
useLastLine ([ByteString] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
ls) Bool
True [ByteString]
ls
          Maybe MsgQueueState -> (MsgQueueState -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ((Maybe MsgQueueState, Bool) -> Maybe MsgQueueState
forall a b. (a, b) -> a
fst (Maybe MsgQueueState, Bool)
r) ((MsgQueueState -> IO ()) -> IO ())
-> (MsgQueueState -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MsgQueueState
st ->
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (MsgQueueState -> Bool
validQueueState MsgQueueState
st) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOError -> IO ()
forall e a. Exception e => e -> IO a
E.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"readWriteQueueState inconsistent state: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MsgQueueState -> String
forall a. Show a => a -> String
show MsgQueueState
st
          (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe MsgQueueState, Bool)
r
    useLastLine :: Int -> Bool -> [ByteString] -> IO (Maybe MsgQueueState, Bool)
useLastLine Int
len Bool
isLastLine [ByteString]
ls = case ByteString -> Either String MsgQueueState
forall a. StrEncoding a => ByteString -> Either String a
strDecode (ByteString -> Either String MsgQueueState)
-> ByteString -> Either String MsgQueueState
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
forall a. (?callStack::CallStack) => [a] -> a
last [ByteString]
ls of
      Right MsgQueueState
st ->
        -- when state file has fewer than maxStateLines, we don't compact it
        let shouldBackup :: Bool
shouldBackup = Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> JournalStoreConfig s -> Int
forall (s :: QSType). JournalStoreConfig s -> Int
maxStateLines JournalStoreConfig s
config Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
isLastLine
         in (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState -> Maybe MsgQueueState
forall a. a -> Maybe a
Just MsgQueueState
st, Bool
shouldBackup)
      Left String
e -- if the last line failed to parse
        | Bool
isLastLine -> case [ByteString] -> [ByteString]
forall a. (?callStack::CallStack) => [a] -> [a]
init [ByteString]
ls of -- or use the previous line
            [] -> do
              Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logWarn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: readWriteQueueState, invalid 1-line queue state - initialized, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
statePath
              (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe MsgQueueState
forall a. Maybe a
Nothing, Bool
True) -- backup state file, because last line was invalid
            [ByteString]
ls' -> do
              Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logWarn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: readWriteQueueState, invalid last line in queue state - using the previous line, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
statePath
              Int -> Bool -> [ByteString] -> IO (Maybe MsgQueueState, Bool)
useLastLine Int
len Bool
False [ByteString]
ls'
        | Bool
otherwise -> IOError -> IO (Maybe MsgQueueState, Bool)
forall e a. Exception e => e -> IO a
E.throwIO (IOError -> IO (Maybe MsgQueueState, Bool))
-> IOError -> IO (Maybe MsgQueueState, Bool)
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"readWriteQueueState invalid state " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
statePath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
": " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ShowS
forall a. Show a => a -> String
show String
e
    readFileTail :: IO ByteString
readFileTail =
      String -> IOMode -> (Handle -> IO ByteString) -> IO ByteString
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
IO.withFile String
statePath IOMode
ReadMode ((Handle -> IO ByteString) -> IO ByteString)
-> (Handle -> IO ByteString) -> IO ByteString
forall a b. (a -> b) -> a -> b
$ \Handle
h -> do
        Integer
size <- Handle -> IO Integer
IO.hFileSize Handle
h
        let sz :: Int
sz = JournalStoreConfig s -> Int
forall (s :: QSType). JournalStoreConfig s -> Int
stateTailSize JournalStoreConfig s
config
            sz' :: Integer
sz' = Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
sz
        if Integer
size Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Integer
sz'
          then Handle -> SeekMode -> Integer -> IO ()
IO.hSeek Handle
h SeekMode
AbsoluteSeek (Integer
size Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
sz') IO () -> IO ByteString -> IO ByteString
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Handle -> Int -> IO ByteString
B.hGet Handle
h Int
sz
          else Handle -> Int -> IO ByteString
B.hGet Handle
h (Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
size)

stateBackupPath :: FilePath -> UTCTime -> FilePath
stateBackupPath :: String -> UTCTime -> String
stateBackupPath String
statePath UTCTime
ts = String
statePath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> UTCTime -> String
forall t. ISO8601 t => t -> String
iso8601Show UTCTime
ts String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
".bak"

validQueueState :: MsgQueueState -> Bool
validQueueState :: MsgQueueState -> Bool
validQueueState MsgQueueState {$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, $sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size}
  | JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws =
      Bool
alwaysValid
        Bool -> Bool -> Bool
&& JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTWrite
ws
        Bool -> Bool -> Bool
&& JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTWrite
ws
        Bool -> Bool -> Bool
&& JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws
        Bool -> Bool -> Bool
&& JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTWrite
ws
        Bool -> Bool -> Bool
&& Int
size Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Int -> Int -> Int
forall a. Num a => a -> a -> a
- JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs
  | Bool
otherwise =
      Bool
alwaysValid
        Bool -> Bool -> Bool
&& Int
size Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTWrite
ws Int -> Int -> Int
forall a. Num a => a -> a -> a
+ JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Int -> Int -> Int
forall a. Num a => a -> a -> a
- JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs
  where
    alwaysValid :: Bool
alwaysValid =
      JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs
        Bool -> Bool -> Bool
&& JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs
        Bool -> Bool -> Bool
&& JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTWrite
ws Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTWrite
ws
        Bool -> Bool -> Bool
&& JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTWrite
ws

deleteQueue_ :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
deleteQueue_ :: forall (s :: QSType).
JournalMsgStore s
-> JournalQueue s
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
deleteQueue_ JournalMsgStore s
ms JournalQueue s
q =
  ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s))
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s))
 -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))))
-> ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s))
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall a b. (a -> b) -> a -> b
$ Text
-> JournalMsgStore s
-> RecipientId
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
-> ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s))
forall (s :: QSType) a.
Text
-> JournalMsgStore s
-> RecipientId
-> IO (Either ErrorType a)
-> ExceptT ErrorType IO a
isolateQueueId Text
"deleteQueue_" JournalMsgStore s
ms RecipientId
rId (IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
 -> ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s)))
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
-> ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s))
forall a b. (a -> b) -> a -> b
$ do
    Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))
r <- QStore s -> JournalQueue s -> IO (Either ErrorType QueueRec)
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType QueueRec)
deleteStoreQueue (JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms) JournalQueue s
q IO (Either ErrorType QueueRec)
-> (Either ErrorType QueueRec
    -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))))
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (QueueRec -> IO (QueueRec, Maybe (JournalMsgQueue s)))
-> Either ErrorType QueueRec
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Either ErrorType a -> m (Either ErrorType b)
mapM QueueRec -> IO (QueueRec, Maybe (JournalMsgQueue s))
remove
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ RecipientId -> TMap RecipientId Lock -> STM ()
forall k a. Ord k => k -> TMap k a -> STM ()
TM.delete RecipientId
rId (JournalMsgStore s -> TMap RecipientId Lock
forall (s :: QSType). JournalMsgStore s -> TMap RecipientId Lock
queueLocks JournalMsgStore s
ms)
    Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))
r
  where
    rId :: RecipientId
rId = JournalQueue s -> RecipientId
forall q. StoreQueueClass q => q -> RecipientId
recipientId JournalQueue s
q
    remove :: QueueRec -> IO (QueueRec, Maybe (JournalMsgQueue s))
remove QueueRec
qr = do
      Maybe (JournalMsgQueue s)
mq_ <- STM (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. STM a -> IO a
atomically (STM (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s)))
-> STM (Maybe (JournalMsgQueue s))
-> IO (Maybe (JournalMsgQueue s))
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (JournalMsgQueue s))
-> Maybe (JournalMsgQueue s) -> STM (Maybe (JournalMsgQueue s))
forall a. TVar a -> a -> STM a
swapTVar (JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' JournalQueue s
q) Maybe (JournalMsgQueue s)
forall a. Maybe a
Nothing
      (JournalMsgQueue s -> IO ()) -> Maybe (JournalMsgQueue s) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JournalMsgStore s -> JournalMsgQueue s -> IO ()
forall (s :: QSType).
JournalMsgStore s -> JournalMsgQueue s -> IO ()
closeMsgQueueHandles JournalMsgStore s
ms) Maybe (JournalMsgQueue s)
mq_
      JournalMsgStore s -> RecipientId -> IO ()
forall (s :: QSType). JournalMsgStore s -> RecipientId -> IO ()
removeQueueDirectory JournalMsgStore s
ms RecipientId
rId
      (QueueRec, Maybe (JournalMsgQueue s))
-> IO (QueueRec, Maybe (JournalMsgQueue s))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (QueueRec
qr, Maybe (JournalMsgQueue s)
mq_)

closeMsgQueue :: JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue :: forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue {TVar (Maybe (JournalMsgQueue s))
$sel:msgQueue':JournalQueue :: forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' :: TVar (Maybe (JournalMsgQueue s))
msgQueue'} = STM (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. STM a -> IO a
atomically (TVar (Maybe (JournalMsgQueue s))
-> Maybe (JournalMsgQueue s) -> STM (Maybe (JournalMsgQueue s))
forall a. TVar a -> a -> STM a
swapTVar TVar (Maybe (JournalMsgQueue s))
msgQueue' Maybe (JournalMsgQueue s)
forall a. Maybe a
Nothing) IO (Maybe (JournalMsgQueue s))
-> (Maybe (JournalMsgQueue s) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (JournalMsgQueue s -> IO ()) -> Maybe (JournalMsgQueue s) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JournalMsgStore s -> JournalMsgQueue s -> IO ()
forall (s :: QSType).
JournalMsgStore s -> JournalMsgQueue s -> IO ()
closeMsgQueueHandles JournalMsgStore s
ms)

closeMsgQueueHandles :: JournalMsgStore s -> JournalMsgQueue s -> IO ()
closeMsgQueueHandles :: forall (s :: QSType).
JournalMsgStore s -> JournalMsgQueue s -> IO ()
closeMsgQueueHandles JournalMsgStore s
ms JournalMsgQueue s
q = TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles JournalMsgQueue s
q) IO (Maybe MsgQueueHandles)
-> (Maybe MsgQueueHandles -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MsgQueueHandles -> IO ()) -> Maybe MsgQueueHandles -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MsgQueueHandles -> IO ()
closeHandles
  where
    closeHandles :: MsgQueueHandles -> IO ()
closeHandles (MsgQueueHandles Handle
sh Handle
rh Maybe Handle
wh_) = do
      Handle -> IO ()
hClose Handle
sh
      Handle -> IO ()
hClose Handle
rh
      (Handle -> IO ()) -> Maybe Handle -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Handle -> IO ()
hClose Maybe Handle
wh_
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (JournalMsgStore s -> TVar Int
forall (s :: QSType). JournalMsgStore s -> TVar Int
openedQueueCount JournalMsgStore s
ms) (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)

removeQueueDirectory :: JournalMsgStore s -> RecipientId -> IO ()
removeQueueDirectory :: forall (s :: QSType). JournalMsgStore s -> RecipientId -> IO ()
removeQueueDirectory JournalMsgStore s
st = String -> IO ()
removeQueueDirectory_ (String -> IO ())
-> (RecipientId -> String) -> RecipientId -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JournalMsgStore s -> RecipientId -> String
forall (s :: QSType). JournalMsgStore s -> RecipientId -> String
msgQueueDirectory JournalMsgStore s
st

removeQueueDirectory_ :: FilePath -> IO ()
removeQueueDirectory_ :: String -> IO ()
removeQueueDirectory_ String
dir =
  Text -> String -> IO () -> IO ()
handleError Text
"removeQueueDirectory" String
dir (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
removePathForcibly String
dir

hAppend :: Handle -> Int64 -> ByteString -> IO ()
hAppend :: Handle -> Int64 -> ByteString -> IO ()
hAppend Handle
h Int64
pos ByteString
s = do
  Handle -> Int64 -> IO ()
fixFileSize Handle
h Int64
pos
  Handle -> SeekMode -> Integer -> IO ()
IO.hSeek Handle
h SeekMode
SeekFromEnd Integer
0
  Handle -> ByteString -> IO ()
B.hPutStr Handle
h ByteString
s

hGetMsgAt :: Handle -> Int64 -> IO (Message, Int64)
hGetMsgAt :: Handle -> Int64 -> IO (Message, Int64)
hGetMsgAt Handle
h Int64
pos = do
  Handle -> SeekMode -> Integer -> IO ()
IO.hSeek Handle
h SeekMode
AbsoluteSeek (Integer -> IO ()) -> Integer -> IO ()
forall a b. (a -> b) -> a -> b
$ Int64 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
pos
  ByteString
s <- Handle -> IO ByteString
B.hGetLine Handle
h
  case ByteString -> Either String Message
forall a. StrEncoding a => ByteString -> Either String a
strDecode ByteString
s of
    Right !Message
msg ->
      let !len :: Int64
len = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
B.length ByteString
s) Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
1
       in (Message, Int64) -> IO (Message, Int64)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Message
msg, Int64
len)
    Left String
e -> IOError -> IO (Message, Int64)
forall e a. Exception e => e -> IO a
E.throwIO (IOError -> IO (Message, Int64)) -> IOError -> IO (Message, Int64)
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"hGetMsgAt invalid message: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
e

openFile :: FilePath -> IOMode -> IO Handle
openFile :: String -> IOMode -> IO Handle
openFile String
f IOMode
mode = do
  Handle
h <- String -> IOMode -> IO Handle
IO.openFile String
f IOMode
mode
  Handle -> BufferMode -> IO ()
IO.hSetBuffering Handle
h BufferMode
LineBuffering
  Handle -> IO Handle
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Handle
h

hClose :: Handle -> IO ()
hClose :: Handle -> IO ()
hClose Handle
h =
  Handle -> IO ()
IO.hClose Handle
h IO () -> (forall e. Exception e => e -> IO ()) -> IO ()
forall a. IO a -> (forall e. Exception e => e -> IO a) -> IO a
`catchAny` \e
e -> do
    String
name <- Handle -> IO String
IO.hShow Handle
h
    Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: hClose, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> e -> Text
forall a. Show a => a -> Text
tshow e
e

closeOnException :: Handle -> IO a -> IO a
closeOnException :: forall a. Handle -> IO a -> IO a
closeOnException Handle
h IO a
a = IO a
a IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`E.onException` Handle -> IO ()
hClose Handle
h

getJournalQueueMessages :: JournalMsgStore s -> JournalQueue s -> IO [Message]
getJournalQueueMessages :: forall (s :: QSType).
JournalMsgStore s -> JournalQueue s -> IO [Message]
getJournalQueueMessages JournalMsgStore s
ms JournalQueue s
q =
  JournalMsgStore s -> String -> IO (Maybe MsgQueueState, Bool)
forall (s :: QSType).
JournalMsgStore s -> String -> IO (Maybe MsgQueueState, Bool)
readQueueState JournalMsgStore s
ms (String -> RecipientId -> String
msgQueueStatePath String
dir RecipientId
rId) IO (Maybe MsgQueueState, Bool)
-> ((Maybe MsgQueueState, Bool) -> IO [Message]) -> IO [Message]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    (Just MsgQueueState {$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, $sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size}, Bool
_) | Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 -> do
      [Message]
msgs <- ByteString -> Int64 -> Int64 -> IO [Message]
getMsgs (JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs) (JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs) (JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs)
      if JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws
        then [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Message]
msgs
        else ([Message]
msgs [Message] -> [Message] -> [Message]
forall a. [a] -> [a] -> [a]
++) ([Message] -> [Message]) -> IO [Message] -> IO [Message]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Int64 -> Int64 -> IO [Message]
getMsgs (JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws) Int64
0 (JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws)
    (Maybe MsgQueueState, Bool)
_ -> [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
  where
    rId :: RecipientId
rId = JournalQueue s -> RecipientId
forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' JournalQueue s
q
    dir :: String
dir = JournalMsgStore s -> RecipientId -> String
forall (s :: QSType). JournalMsgStore s -> RecipientId -> String
msgQueueDirectory JournalMsgStore s
ms RecipientId
rId
    getMsgs :: ByteString -> Int64 -> Int64 -> IO [Message]
getMsgs ByteString
jId Int64
from Int64
to =
      String -> IOMode -> (Handle -> IO [Message]) -> IO [Message]
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
IO.withFile (String -> ByteString -> String
journalFilePath String
dir ByteString
jId) IOMode
ReadWriteMode ((Handle -> IO [Message]) -> IO [Message])
-> (Handle -> IO [Message]) -> IO [Message]
forall a b. (a -> b) -> a -> b
$ \Handle
h' ->
        Handle -> Int64 -> Int64 -> IO [Message]
getJournalRange Handle
h' Int64
from Int64
to

getJournalRange :: Handle -> Int64 -> Int64 -> IO [Message]
getJournalRange :: Handle -> Int64 -> Int64 -> IO [Message]
getJournalRange Handle
h Int64
from Int64
to
  | Int64
to Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
from = do
      Handle -> SeekMode -> Integer -> IO ()
IO.hSeek Handle
h SeekMode
AbsoluteSeek (Integer -> IO ()) -> Integer -> IO ()
forall a b. (a -> b) -> a -> b
$ Int64 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
from
      ByteString -> IO [Message]
parseMsgs (ByteString -> IO [Message]) -> IO ByteString -> IO [Message]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Handle -> Int -> IO ByteString
B.hGet Handle
h (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Int64 -> Int
forall a b. (a -> b) -> a -> b
$ Int64
to Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
from)
  | Bool
otherwise = [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
  where
    parseMsgs :: ByteString -> IO [Message]
parseMsgs ByteString
s = do
      let ([String]
errs, [Message]
msgs) = [Either String Message] -> ([String], [Message])
forall a b. [Either a b] -> ([a], [b])
partitionEithers ([Either String Message] -> ([String], [Message]))
-> [Either String Message] -> ([String], [Message])
forall a b. (a -> b) -> a -> b
$ (ByteString -> Either String Message)
-> [ByteString] -> [Either String Message]
forall a b. (a -> b) -> [a] -> [b]
map ByteString -> Either String Message
forall a. StrEncoding a => ByteString -> Either String a
strDecode ([ByteString] -> [Either String Message])
-> [ByteString] -> [Either String Message]
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString]
B.lines ByteString
s
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([String] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [String]
errs) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        String
f <- Handle -> IO String
IO.hShow Handle
h
        String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Error reading " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([String] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [String]
errs) String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" messages from " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
f
      [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Message]
msgs