{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TupleSections #-}
module Simplex.Messaging.Server.MsgStore.Journal
( JournalMsgStore (random, expireBackupsBefore),
QStore (..),
QStoreCfg (..),
JournalQueue (msgQueue'),
JournalMsgQueue (queue, state),
JMQueue (queueDirectory, statePath),
JournalStoreConfig (..),
closeMsgQueue,
closeMsgQueueHandles,
MsgQueueState (..),
JournalState (..),
SJournalType (..),
msgQueueDirectory,
msgQueueStatePath,
readQueueState,
newMsgQueueState,
getJournalQueueMessages,
newJournalId,
appendState,
queueLogFileName,
journalFilePath,
logFileExt,
stmQueueStore,
#if defined(dbServerPostgres)
postgresQueueStore,
#endif
)
where
import Control.Concurrent.STM
import qualified Control.Exception as E
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Trans.Except
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Either (fromRight, partitionEithers)
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (sort)
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show, iso8601ParseM)
import GHC.IO (catchAny)
import Simplex.Messaging.Agent.Client (getMapLock)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore.Journal.SharedLock
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
#if defined(dbServerPostgres)
import Simplex.Messaging.Server.QueueStore.Postgres
#endif
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>))
import System.Directory
import System.FilePath (takeFileName, (</>))
import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..))
import qualified System.IO as IO
import System.Random (StdGen, genByteString, newStdGen)
data JournalMsgStore s = JournalMsgStore
{ forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config :: JournalStoreConfig s,
forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random :: TVar StdGen,
forall (s :: QSType). JournalMsgStore s -> TMap RecipientId Lock
queueLocks :: TMap RecipientId Lock,
forall (s :: QSType). JournalMsgStore s -> TMVar RecipientId
sharedLock :: TMVar RecipientId,
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ :: QStore s,
forall (s :: QSType). JournalMsgStore s -> TVar Int
openedQueueCount :: TVar Int,
forall (s :: QSType). JournalMsgStore s -> UTCTime
expireBackupsBefore :: UTCTime
}
data QStore (s :: QSType) where
MQStore :: QStoreType 'QSMemory -> QStore 'QSMemory
#if defined(dbServerPostgres)
PQStore :: QStoreType 'QSPostgres -> QStore 'QSPostgres
#endif
type family QStoreType s where
QStoreType 'QSMemory = STMQueueStore (JournalQueue 'QSMemory)
#if defined(dbServerPostgres)
QStoreType 'QSPostgres = PostgresQueueStore (JournalQueue 'QSPostgres)
#endif
withQS :: (QueueStoreClass (JournalQueue s) (QStoreType s) => QStoreType s -> r) -> QStore s -> r
withQS :: forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r
f = \case
MQStore QStoreType 'QSMemory
st -> QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r
QStoreType s -> r
f QStoreType s
QStoreType 'QSMemory
st
#if defined(dbServerPostgres)
PQStore st -> f st
#endif
{-# INLINE withQS #-}
stmQueueStore :: JournalMsgStore 'QSMemory -> STMQueueStore (JournalQueue 'QSMemory)
stmQueueStore :: JournalMsgStore 'QSMemory -> STMQueueStore (JournalQueue 'QSMemory)
stmQueueStore JournalMsgStore 'QSMemory
st = case JournalMsgStore 'QSMemory -> QStore 'QSMemory
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore 'QSMemory
st of
MQStore QStoreType 'QSMemory
st' -> STMQueueStore (JournalQueue 'QSMemory)
QStoreType 'QSMemory
st'
#if defined(dbServerPostgres)
postgresQueueStore :: JournalMsgStore 'QSPostgres -> PostgresQueueStore (JournalQueue 'QSPostgres)
postgresQueueStore st = case queueStore_ st of
PQStore st' -> st'
#endif
data JournalStoreConfig s = JournalStoreConfig
{ forall (s :: QSType). JournalStoreConfig s -> String
storePath :: FilePath,
forall (s :: QSType). JournalStoreConfig s -> Int
pathParts :: Int,
forall (s :: QSType). JournalStoreConfig s -> QStoreCfg s
queueStoreCfg :: QStoreCfg s,
forall (s :: QSType). JournalStoreConfig s -> Int
quota :: Int,
forall (s :: QSType). JournalStoreConfig s -> Int
maxMsgCount :: Int,
forall (s :: QSType). JournalStoreConfig s -> Int
maxStateLines :: Int,
forall (s :: QSType). JournalStoreConfig s -> Int
stateTailSize :: Int,
forall (s :: QSType). JournalStoreConfig s -> Int64
idleInterval :: Int64,
forall (s :: QSType). JournalStoreConfig s -> NominalDiffTime
expireBackupsAfter :: NominalDiffTime,
forall (s :: QSType). JournalStoreConfig s -> Int
keepMinBackups :: Int
}
data QStoreCfg s where
MQStoreCfg :: QStoreCfg 'QSMemory
#if defined(dbServerPostgres)
PQStoreCfg :: PostgresStoreCfg -> QStoreCfg 'QSPostgres
#endif
data JournalQueue (s :: QSType) = JournalQueue
{ forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' :: RecipientId,
forall (s :: QSType). JournalQueue s -> Lock
queueLock :: Lock,
forall (s :: QSType). JournalQueue s -> TMVar RecipientId
sharedLock :: TMVar RecipientId,
forall (s :: QSType). JournalQueue s -> TVar (Maybe QueueRec)
queueRec' :: TVar (Maybe QueueRec),
forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' :: TVar (Maybe (JournalMsgQueue s)),
forall (s :: QSType). JournalQueue s -> TVar Int64
activeAt :: TVar Int64,
forall (s :: QSType). JournalQueue s -> TVar (Maybe QState)
queueState :: TVar (Maybe QState)
}
data QState = QState
{ QState -> Bool
hasPending :: Bool,
QState -> Bool
hasStored :: Bool
}
data JMQueue = JMQueue
{ JMQueue -> String
queueDirectory :: FilePath,
JMQueue -> String
statePath :: FilePath
}
data JournalMsgQueue (s :: QSType) = JournalMsgQueue
{ forall (s :: QSType). JournalMsgQueue s -> JMQueue
queue :: JMQueue,
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state :: TVar MsgQueueState,
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
tipMsg :: TVar (Maybe (Maybe (Message, Int64))),
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles :: TVar (Maybe MsgQueueHandles)
}
data MsgQueueState = MsgQueueState
{ MsgQueueState -> JournalState 'JTRead
readState :: JournalState 'JTRead,
MsgQueueState -> JournalState 'JTWrite
writeState :: JournalState 'JTWrite,
MsgQueueState -> Bool
canWrite :: Bool,
MsgQueueState -> Int
size :: Int
}
deriving (Int -> MsgQueueState -> ShowS
[MsgQueueState] -> ShowS
MsgQueueState -> String
(Int -> MsgQueueState -> ShowS)
-> (MsgQueueState -> String)
-> ([MsgQueueState] -> ShowS)
-> Show MsgQueueState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MsgQueueState -> ShowS
showsPrec :: Int -> MsgQueueState -> ShowS
$cshow :: MsgQueueState -> String
show :: MsgQueueState -> String
$cshowList :: [MsgQueueState] -> ShowS
showList :: [MsgQueueState] -> ShowS
Show)
data MsgQueueHandles = MsgQueueHandles
{ MsgQueueHandles -> Handle
stateHandle :: Handle,
MsgQueueHandles -> Handle
readHandle :: Handle,
MsgQueueHandles -> Maybe Handle
writeHandle :: Maybe Handle
}
data JournalState t = JournalState
{ forall (t :: JournalType). JournalState t -> SJournalType t
journalType :: SJournalType t,
forall (t :: JournalType). JournalState t -> ByteString
journalId :: ByteString,
forall (t :: JournalType). JournalState t -> Int
msgPos :: Int,
forall (t :: JournalType). JournalState t -> Int
msgCount :: Int,
forall (t :: JournalType). JournalState t -> Int64
bytePos :: Int64,
forall (t :: JournalType). JournalState t -> Int64
byteCount :: Int64
}
deriving (Int -> JournalState t -> ShowS
[JournalState t] -> ShowS
JournalState t -> String
(Int -> JournalState t -> ShowS)
-> (JournalState t -> String)
-> ([JournalState t] -> ShowS)
-> Show (JournalState t)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall (t :: JournalType). Int -> JournalState t -> ShowS
forall (t :: JournalType). [JournalState t] -> ShowS
forall (t :: JournalType). JournalState t -> String
$cshowsPrec :: forall (t :: JournalType). Int -> JournalState t -> ShowS
showsPrec :: Int -> JournalState t -> ShowS
$cshow :: forall (t :: JournalType). JournalState t -> String
show :: JournalState t -> String
$cshowList :: forall (t :: JournalType). [JournalState t] -> ShowS
showList :: [JournalState t] -> ShowS
Show)
qState :: MsgQueueState -> QState
qState :: MsgQueueState -> QState
qState MsgQueueState {Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size, $sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, $sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws} =
let hasPending :: Bool
hasPending = Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
in QState {Bool
$sel:hasPending:QState :: Bool
hasPending :: Bool
hasPending, $sel:hasStored:QState :: Bool
hasStored = Bool
hasPending Bool -> Bool -> Bool
|| JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
|| JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTWrite
ws Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0}
{-# INLINE qState #-}
data JournalType = JTRead | JTWrite
data SJournalType (t :: JournalType) where
SJTRead :: SJournalType 'JTRead
SJTWrite :: SJournalType 'JTWrite
class JournalTypeI t where sJournalType :: SJournalType t
instance JournalTypeI 'JTRead where sJournalType :: SJournalType 'JTRead
sJournalType = SJournalType 'JTRead
SJTRead
instance JournalTypeI 'JTWrite where sJournalType :: SJournalType 'JTWrite
sJournalType = SJournalType 'JTWrite
SJTWrite
deriving instance Show (SJournalType t)
newMsgQueueState :: ByteString -> MsgQueueState
newMsgQueueState :: ByteString -> MsgQueueState
newMsgQueueState ByteString
journalId =
MsgQueueState
{ $sel:writeState:MsgQueueState :: JournalState 'JTWrite
writeState = ByteString -> JournalState 'JTWrite
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
journalId,
$sel:readState:MsgQueueState :: JournalState 'JTRead
readState = ByteString -> JournalState 'JTRead
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
journalId,
$sel:canWrite:MsgQueueState :: Bool
canWrite = Bool
True,
$sel:size:MsgQueueState :: Int
size = Int
0
}
newJournalState :: JournalTypeI t => ByteString -> JournalState t
newJournalState :: forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
journalId = SJournalType t
-> ByteString -> Int -> Int -> Int64 -> Int64 -> JournalState t
forall (t :: JournalType).
SJournalType t
-> ByteString -> Int -> Int -> Int64 -> Int64 -> JournalState t
JournalState SJournalType t
forall (t :: JournalType). JournalTypeI t => SJournalType t
sJournalType ByteString
journalId Int
0 Int
0 Int64
0 Int64
0
journalFilePath :: FilePath -> ByteString -> FilePath
journalFilePath :: String -> ByteString -> String
journalFilePath String
dir ByteString
journalId = String
dir String -> ShowS
</> (String
msgLogFileName String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
B.unpack ByteString
journalId String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
logFileExt)
instance StrEncoding MsgQueueState where
strEncode :: MsgQueueState -> ByteString
strEncode MsgQueueState {JournalState 'JTWrite
$sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState :: JournalState 'JTWrite
writeState, JournalState 'JTRead
$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState :: JournalState 'JTRead
readState, Bool
$sel:canWrite:MsgQueueState :: MsgQueueState -> Bool
canWrite :: Bool
canWrite, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size} =
[ByteString] -> ByteString
B.unwords
[ ByteString
"write=" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> JournalState 'JTWrite -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode JournalState 'JTWrite
writeState,
ByteString
"read=" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> JournalState 'JTRead -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode JournalState 'JTRead
readState,
ByteString
"canWrite=" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Bool -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode Bool
canWrite,
ByteString
"size=" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Int -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode Int
size
]
strP :: Parser MsgQueueState
strP = do
JournalState 'JTWrite
writeState <- Parser ByteString ByteString
"write=" Parser ByteString ByteString
-> Parser ByteString (JournalState 'JTWrite)
-> Parser ByteString (JournalState 'JTWrite)
forall a b.
Parser ByteString a -> Parser ByteString b -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Parser ByteString (JournalState 'JTWrite)
forall a. StrEncoding a => Parser a
strP
JournalState 'JTRead
readState <- Parser ByteString ByteString
" read=" Parser ByteString ByteString
-> Parser ByteString (JournalState 'JTRead)
-> Parser ByteString (JournalState 'JTRead)
forall a b.
Parser ByteString a -> Parser ByteString b -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Parser ByteString (JournalState 'JTRead)
forall a. StrEncoding a => Parser a
strP
Bool
canWrite <- Parser ByteString ByteString
" canWrite=" Parser ByteString ByteString
-> Parser ByteString Bool -> Parser ByteString Bool
forall a b.
Parser ByteString a -> Parser ByteString b -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Parser ByteString Bool
forall a. StrEncoding a => Parser a
strP
Int
size <- Parser ByteString ByteString
" size=" Parser ByteString ByteString
-> Parser ByteString Int -> Parser ByteString Int
forall a b.
Parser ByteString a -> Parser ByteString b -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Parser ByteString Int
forall a. StrEncoding a => Parser a
strP
MsgQueueState -> Parser MsgQueueState
forall a. a -> Parser ByteString a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MsgQueueState {JournalState 'JTWrite
$sel:writeState:MsgQueueState :: JournalState 'JTWrite
writeState :: JournalState 'JTWrite
writeState, JournalState 'JTRead
$sel:readState:MsgQueueState :: JournalState 'JTRead
readState :: JournalState 'JTRead
readState, Bool
$sel:canWrite:MsgQueueState :: Bool
canWrite :: Bool
canWrite, Int
$sel:size:MsgQueueState :: Int
size :: Int
size}
instance JournalTypeI t => StrEncoding (JournalState t) where
strEncode :: JournalState t -> ByteString
strEncode JournalState {ByteString
$sel:journalId:JournalState :: forall (t :: JournalType). JournalState t -> ByteString
journalId :: ByteString
journalId, Int
$sel:msgPos:JournalState :: forall (t :: JournalType). JournalState t -> Int
msgPos :: Int
msgPos, Int
$sel:msgCount:JournalState :: forall (t :: JournalType). JournalState t -> Int
msgCount :: Int
msgCount, Int64
$sel:bytePos:JournalState :: forall (t :: JournalType). JournalState t -> Int64
bytePos :: Int64
bytePos, Int64
$sel:byteCount:JournalState :: forall (t :: JournalType). JournalState t -> Int64
byteCount :: Int64
byteCount} =
ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
"," [ByteString
journalId, Int -> ByteString
forall a. StrEncoding a => a -> ByteString
e Int
msgPos, Int -> ByteString
forall a. StrEncoding a => a -> ByteString
e Int
msgCount, Int64 -> ByteString
forall a. StrEncoding a => a -> ByteString
e Int64
bytePos, Int64 -> ByteString
forall a. StrEncoding a => a -> ByteString
e Int64
byteCount]
where
e :: StrEncoding a => a -> ByteString
e :: forall a. StrEncoding a => a -> ByteString
e = a -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode
strP :: Parser (JournalState t)
strP = do
ByteString
journalId <- (Char -> Bool) -> Parser ByteString ByteString
A.takeTill (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
',')
SJournalType t
-> ByteString -> Int -> Int -> Int64 -> Int64 -> JournalState t
forall (t :: JournalType).
SJournalType t
-> ByteString -> Int -> Int -> Int64 -> Int64 -> JournalState t
JournalState SJournalType t
forall (t :: JournalType). JournalTypeI t => SJournalType t
sJournalType ByteString
journalId (Int -> Int -> Int64 -> Int64 -> JournalState t)
-> Parser ByteString Int
-> Parser ByteString (Int -> Int64 -> Int64 -> JournalState t)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser ByteString Int
forall a. Integral a => Parser a
i Parser ByteString (Int -> Int64 -> Int64 -> JournalState t)
-> Parser ByteString Int
-> Parser ByteString (Int64 -> Int64 -> JournalState t)
forall a b.
Parser ByteString (a -> b)
-> Parser ByteString a -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser ByteString Int
forall a. Integral a => Parser a
i Parser ByteString (Int64 -> Int64 -> JournalState t)
-> Parser ByteString Int64
-> Parser ByteString (Int64 -> JournalState t)
forall a b.
Parser ByteString (a -> b)
-> Parser ByteString a -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser ByteString Int64
forall a. Integral a => Parser a
i Parser ByteString (Int64 -> JournalState t)
-> Parser ByteString Int64 -> Parser (JournalState t)
forall a b.
Parser ByteString (a -> b)
-> Parser ByteString a -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser ByteString Int64
forall a. Integral a => Parser a
i
where
i :: Integral a => A.Parser a
i :: forall a. Integral a => Parser a
i = Char -> Parser Char
A.char Char
',' Parser Char -> Parser ByteString a -> Parser ByteString a
forall a b.
Parser ByteString a -> Parser ByteString b -> Parser ByteString b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Parser ByteString a
forall a. Integral a => Parser a
A.decimal
queueLogFileName :: String
queueLogFileName :: String
queueLogFileName = String
"queue_state"
msgLogFileName :: String
msgLogFileName :: String
msgLogFileName = String
"messages"
logFileExt :: String
logFileExt :: String
logFileExt = String
".log"
newtype StoreIO (s :: QSType) a = StoreIO {forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO :: IO a}
deriving newtype ((forall a b. (a -> b) -> StoreIO s a -> StoreIO s b)
-> (forall a b. a -> StoreIO s b -> StoreIO s a)
-> Functor (StoreIO s)
forall a b. a -> StoreIO s b -> StoreIO s a
forall a b. (a -> b) -> StoreIO s a -> StoreIO s b
forall (s :: QSType) a b. a -> StoreIO s b -> StoreIO s a
forall (s :: QSType) a b. (a -> b) -> StoreIO s a -> StoreIO s b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall (s :: QSType) a b. (a -> b) -> StoreIO s a -> StoreIO s b
fmap :: forall a b. (a -> b) -> StoreIO s a -> StoreIO s b
$c<$ :: forall (s :: QSType) a b. a -> StoreIO s b -> StoreIO s a
<$ :: forall a b. a -> StoreIO s b -> StoreIO s a
Functor, Functor (StoreIO s)
Functor (StoreIO s) =>
(forall a. a -> StoreIO s a)
-> (forall a b. StoreIO s (a -> b) -> StoreIO s a -> StoreIO s b)
-> (forall a b c.
(a -> b -> c) -> StoreIO s a -> StoreIO s b -> StoreIO s c)
-> (forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b)
-> (forall a b. StoreIO s a -> StoreIO s b -> StoreIO s a)
-> Applicative (StoreIO s)
forall a. a -> StoreIO s a
forall a b. StoreIO s a -> StoreIO s b -> StoreIO s a
forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b
forall a b. StoreIO s (a -> b) -> StoreIO s a -> StoreIO s b
forall a b c.
(a -> b -> c) -> StoreIO s a -> StoreIO s b -> StoreIO s c
forall (s :: QSType). Functor (StoreIO s)
forall (s :: QSType) a. a -> StoreIO s a
forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s a
forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s b
forall (s :: QSType) a b.
StoreIO s (a -> b) -> StoreIO s a -> StoreIO s b
forall (s :: QSType) a b c.
(a -> b -> c) -> StoreIO s a -> StoreIO s b -> StoreIO s c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall (s :: QSType) a. a -> StoreIO s a
pure :: forall a. a -> StoreIO s a
$c<*> :: forall (s :: QSType) a b.
StoreIO s (a -> b) -> StoreIO s a -> StoreIO s b
<*> :: forall a b. StoreIO s (a -> b) -> StoreIO s a -> StoreIO s b
$cliftA2 :: forall (s :: QSType) a b c.
(a -> b -> c) -> StoreIO s a -> StoreIO s b -> StoreIO s c
liftA2 :: forall a b c.
(a -> b -> c) -> StoreIO s a -> StoreIO s b -> StoreIO s c
$c*> :: forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s b
*> :: forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b
$c<* :: forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s a
<* :: forall a b. StoreIO s a -> StoreIO s b -> StoreIO s a
Applicative, Applicative (StoreIO s)
Applicative (StoreIO s) =>
(forall a b. StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b)
-> (forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b)
-> (forall a. a -> StoreIO s a)
-> Monad (StoreIO s)
forall a. a -> StoreIO s a
forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b
forall a b. StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b
forall (s :: QSType). Applicative (StoreIO s)
forall (s :: QSType) a. a -> StoreIO s a
forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s b
forall (s :: QSType) a b.
StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall (s :: QSType) a b.
StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b
>>= :: forall a b. StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b
$c>> :: forall (s :: QSType) a b. StoreIO s a -> StoreIO s b -> StoreIO s b
>> :: forall a b. StoreIO s a -> StoreIO s b -> StoreIO s b
$creturn :: forall (s :: QSType) a. a -> StoreIO s a
return :: forall a. a -> StoreIO s a
Monad)
instance StoreQueueClass (JournalQueue s) where
recipientId :: JournalQueue s -> RecipientId
recipientId = JournalQueue s -> RecipientId
forall (s :: QSType). JournalQueue s -> RecipientId
recipientId'
{-# INLINE recipientId #-}
queueRec :: JournalQueue s -> TVar (Maybe QueueRec)
queueRec = JournalQueue s -> TVar (Maybe QueueRec)
forall (s :: QSType). JournalQueue s -> TVar (Maybe QueueRec)
queueRec'
{-# INLINE queueRec #-}
withQueueLock :: JournalQueue s -> Text -> IO a -> IO a
withQueueLock :: forall a. JournalQueue s -> Text -> IO a -> IO a
withQueueLock JournalQueue {RecipientId
$sel:recipientId':JournalQueue :: forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' :: RecipientId
recipientId', Lock
$sel:queueLock:JournalQueue :: forall (s :: QSType). JournalQueue s -> Lock
queueLock :: Lock
queueLock, TMVar RecipientId
$sel:sharedLock:JournalQueue :: forall (s :: QSType). JournalQueue s -> TMVar RecipientId
sharedLock :: TMVar RecipientId
sharedLock} =
RecipientId -> Lock -> TMVar RecipientId -> Text -> IO a -> IO a
forall a.
RecipientId -> Lock -> TMVar RecipientId -> Text -> IO a -> IO a
withLockWaitShared RecipientId
recipientId' Lock
queueLock TMVar RecipientId
sharedLock
{-# INLINE withQueueLock #-}
instance QueueStoreClass (JournalQueue s) (QStore s) where
type QueueStoreCfg (QStore s) = QStoreCfg s
newQueueStore :: QStoreCfg s -> IO (QStore s)
newQueueStore :: QStoreCfg s -> IO (QStore s)
newQueueStore = \case
QStoreCfg s
MQStoreCfg -> STMQueueStore (JournalQueue 'QSMemory) -> QStore s
QStoreType 'QSMemory -> QStore 'QSMemory
MQStore (STMQueueStore (JournalQueue 'QSMemory) -> QStore s)
-> IO (STMQueueStore (JournalQueue 'QSMemory)) -> IO (QStore s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall q s. QueueStoreClass q s => QueueStoreCfg s -> IO s
newQueueStore @(JournalQueue s) ()
#if defined(dbServerPostgres)
PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) (cfg, True)
#endif
closeQueueStore :: QStore s -> IO ()
closeQueueStore = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> IO ())
-> QStore s -> IO ()
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS (forall q s. QueueStoreClass q s => s -> IO ()
closeQueueStore @(JournalQueue s))
{-# INLINE closeQueueStore #-}
loadedQueues :: QStore s -> TMap RecipientId (JournalQueue s)
loadedQueues = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> TMap RecipientId (JournalQueue s))
-> QStore s -> TMap RecipientId (JournalQueue s)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> TMap RecipientId (JournalQueue s)
QStoreType s -> TMap RecipientId (JournalQueue s)
forall q s. QueueStoreClass q s => s -> TMap RecipientId q
loadedQueues
{-# INLINE loadedQueues #-}
compactQueues :: QStore s -> IO Int64
compactQueues = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> IO Int64)
-> QStore s -> IO Int64
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS (forall q s. QueueStoreClass q s => s -> IO Int64
compactQueues @(JournalQueue s))
{-# INLINE compactQueues #-}
getEntityCounts :: QStore s -> IO EntityCounts
getEntityCounts = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> IO EntityCounts)
-> QStore s -> IO EntityCounts
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS (forall q s. QueueStoreClass q s => s -> IO EntityCounts
getEntityCounts @(JournalQueue s))
{-# INLINE getEntityCounts #-}
addQueue_ :: QStore s
-> (RecipientId -> QueueRec -> IO (JournalQueue s))
-> RecipientId
-> QueueRec
-> IO (Either ErrorType (JournalQueue s))
addQueue_ = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> (RecipientId -> QueueRec -> IO (JournalQueue s))
-> RecipientId
-> QueueRec
-> IO (Either ErrorType (JournalQueue s)))
-> QStore s
-> (RecipientId -> QueueRec -> IO (JournalQueue s))
-> RecipientId
-> QueueRec
-> IO (Either ErrorType (JournalQueue s))
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> (RecipientId -> QueueRec -> IO (JournalQueue s))
-> RecipientId
-> QueueRec
-> IO (Either ErrorType (JournalQueue s))
QStoreType s
-> (RecipientId -> QueueRec -> IO (JournalQueue s))
-> RecipientId
-> QueueRec
-> IO (Either ErrorType (JournalQueue s))
forall q s.
QueueStoreClass q s =>
s
-> (RecipientId -> QueueRec -> IO q)
-> RecipientId
-> QueueRec
-> IO (Either ErrorType q)
addQueue_
{-# INLINE addQueue_ #-}
getQueue_ :: forall (p :: Party).
QueueParty p =>
QStore s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (JournalQueue s))
getQueue_ = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (JournalQueue s)))
-> QStore s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (JournalQueue s))
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (JournalQueue s))
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (JournalQueue s))
forall q s (p :: Party).
(QueueStoreClass q s, QueueParty p) =>
s
-> (Bool -> RecipientId -> QueueRec -> IO q)
-> SParty p
-> RecipientId
-> IO (Either ErrorType q)
forall (p :: Party).
QueueParty p =>
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (JournalQueue s))
getQueue_
{-# INLINE getQueue_ #-}
getQueues_ :: forall (p :: Party).
BatchParty p =>
QStore s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (JournalQueue s)]
getQueues_ = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (JournalQueue s)])
-> QStore s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (JournalQueue s)]
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (JournalQueue s)]
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (JournalQueue s)]
forall q s (p :: Party).
(QueueStoreClass q s, BatchParty p) =>
s
-> (Bool -> RecipientId -> QueueRec -> IO q)
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType q]
forall (p :: Party).
BatchParty p =>
QStoreType s
-> (Bool -> RecipientId -> QueueRec -> IO (JournalQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (JournalQueue s)]
getQueues_
{-# INLINE getQueues_ #-}
addQueueLinkData :: QStore s
-> JournalQueue s
-> RecipientId
-> QueueLinkData
-> IO (Either ErrorType ())
addQueueLinkData = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> RecipientId
-> QueueLinkData
-> IO (Either ErrorType ()))
-> QStore s
-> JournalQueue s
-> RecipientId
-> QueueLinkData
-> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> RecipientId
-> QueueLinkData
-> IO (Either ErrorType ())
QStoreType s
-> JournalQueue s
-> RecipientId
-> QueueLinkData
-> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> RecipientId -> QueueLinkData -> IO (Either ErrorType ())
addQueueLinkData
{-# INLINE addQueueLinkData #-}
getQueueLinkData :: QStore s
-> JournalQueue s
-> RecipientId
-> IO (Either ErrorType QueueLinkData)
getQueueLinkData = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> RecipientId
-> IO (Either ErrorType QueueLinkData))
-> QStore s
-> JournalQueue s
-> RecipientId
-> IO (Either ErrorType QueueLinkData)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> RecipientId
-> IO (Either ErrorType QueueLinkData)
QStoreType s
-> JournalQueue s
-> RecipientId
-> IO (Either ErrorType QueueLinkData)
forall q s.
QueueStoreClass q s =>
s -> q -> RecipientId -> IO (Either ErrorType QueueLinkData)
getQueueLinkData
{-# INLINE getQueueLinkData #-}
deleteQueueLinkData :: QStore s -> JournalQueue s -> IO (Either ErrorType ())
deleteQueueLinkData = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType ()))
-> QStore s -> JournalQueue s -> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType ())
deleteQueueLinkData
{-# INLINE deleteQueueLinkData #-}
secureQueue :: QStore s
-> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()))
-> QStore s
-> JournalQueue s
-> SndPublicAuthKey
-> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
QStoreType s
-> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue
{-# INLINE secureQueue #-}
updateKeys :: QStore s
-> JournalQueue s
-> NonEmpty SndPublicAuthKey
-> IO (Either ErrorType ())
updateKeys = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> NonEmpty SndPublicAuthKey
-> IO (Either ErrorType ()))
-> QStore s
-> JournalQueue s
-> NonEmpty SndPublicAuthKey
-> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> NonEmpty SndPublicAuthKey
-> IO (Either ErrorType ())
QStoreType s
-> JournalQueue s
-> NonEmpty SndPublicAuthKey
-> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> NonEmpty SndPublicAuthKey -> IO (Either ErrorType ())
updateKeys
{-# INLINE updateKeys #-}
addQueueNotifier :: QStore s
-> JournalQueue s
-> NtfCreds
-> IO (Either ErrorType (Maybe NtfCreds))
addQueueNotifier = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> NtfCreds
-> IO (Either ErrorType (Maybe NtfCreds)))
-> QStore s
-> JournalQueue s
-> NtfCreds
-> IO (Either ErrorType (Maybe NtfCreds))
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> NtfCreds
-> IO (Either ErrorType (Maybe NtfCreds))
QStoreType s
-> JournalQueue s
-> NtfCreds
-> IO (Either ErrorType (Maybe NtfCreds))
forall q s.
QueueStoreClass q s =>
s -> q -> NtfCreds -> IO (Either ErrorType (Maybe NtfCreds))
addQueueNotifier
{-# INLINE addQueueNotifier #-}
deleteQueueNotifier :: QStore s
-> JournalQueue s -> IO (Either ErrorType (Maybe NtfCreds))
deleteQueueNotifier = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> IO (Either ErrorType (Maybe NtfCreds)))
-> QStore s
-> JournalQueue s
-> IO (Either ErrorType (Maybe NtfCreds))
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> IO (Either ErrorType (Maybe NtfCreds))
QStoreType s
-> JournalQueue s -> IO (Either ErrorType (Maybe NtfCreds))
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType (Maybe NtfCreds))
deleteQueueNotifier
{-# INLINE deleteQueueNotifier #-}
suspendQueue :: QStore s -> JournalQueue s -> IO (Either ErrorType ())
suspendQueue = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType ()))
-> QStore s -> JournalQueue s -> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType ())
suspendQueue
{-# INLINE suspendQueue #-}
blockQueue :: QStore s
-> JournalQueue s -> BlockingInfo -> IO (Either ErrorType ())
blockQueue = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> BlockingInfo -> IO (Either ErrorType ()))
-> QStore s
-> JournalQueue s
-> BlockingInfo
-> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> BlockingInfo -> IO (Either ErrorType ())
QStoreType s
-> JournalQueue s -> BlockingInfo -> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> BlockingInfo -> IO (Either ErrorType ())
blockQueue
{-# INLINE blockQueue #-}
unblockQueue :: QStore s -> JournalQueue s -> IO (Either ErrorType ())
unblockQueue = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType ()))
-> QStore s -> JournalQueue s -> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
QStoreType s -> JournalQueue s -> IO (Either ErrorType ())
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType ())
unblockQueue
{-# INLINE unblockQueue #-}
updateQueueTime :: QStore s
-> JournalQueue s -> SystemDate -> IO (Either ErrorType QueueRec)
updateQueueTime = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> SystemDate -> IO (Either ErrorType QueueRec))
-> QStore s
-> JournalQueue s
-> SystemDate
-> IO (Either ErrorType QueueRec)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s -> SystemDate -> IO (Either ErrorType QueueRec)
QStoreType s
-> JournalQueue s -> SystemDate -> IO (Either ErrorType QueueRec)
forall q s.
QueueStoreClass q s =>
s -> q -> SystemDate -> IO (Either ErrorType QueueRec)
updateQueueTime
{-# INLINE updateQueueTime #-}
deleteStoreQueue :: QStore s -> JournalQueue s -> IO (Either ErrorType QueueRec)
deleteStoreQueue = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType QueueRec))
-> QStore s -> JournalQueue s -> IO (Either ErrorType QueueRec)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> JournalQueue s -> IO (Either ErrorType QueueRec)
QStoreType s -> JournalQueue s -> IO (Either ErrorType QueueRec)
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType QueueRec)
deleteStoreQueue
{-# INLINE deleteStoreQueue #-}
getCreateService :: QStore s -> ServiceRec -> IO (Either ErrorType RecipientId)
getCreateService = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> ServiceRec -> IO (Either ErrorType RecipientId))
-> QStore s -> ServiceRec -> IO (Either ErrorType RecipientId)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS (forall q s.
QueueStoreClass q s =>
s -> ServiceRec -> IO (Either ErrorType RecipientId)
getCreateService @(JournalQueue s))
{-# INLINE getCreateService #-}
setQueueService :: forall (p :: Party).
(PartyI p, ServiceParty p) =>
QStore s
-> JournalQueue s
-> SParty p
-> Maybe RecipientId
-> IO (Either ErrorType ())
setQueueService = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> SParty p
-> Maybe RecipientId
-> IO (Either ErrorType ()))
-> QStore s
-> JournalQueue s
-> SParty p
-> Maybe RecipientId
-> IO (Either ErrorType ())
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> JournalQueue s
-> SParty p
-> Maybe RecipientId
-> IO (Either ErrorType ())
QStoreType s
-> JournalQueue s
-> SParty p
-> Maybe RecipientId
-> IO (Either ErrorType ())
forall q s (p :: Party).
(QueueStoreClass q s, PartyI p, ServiceParty p) =>
s -> q -> SParty p -> Maybe RecipientId -> IO (Either ErrorType ())
forall (p :: Party).
(PartyI p, ServiceParty p) =>
QStoreType s
-> JournalQueue s
-> SParty p
-> Maybe RecipientId
-> IO (Either ErrorType ())
setQueueService
{-# INLINE setQueueService #-}
getQueueNtfServices :: forall a.
QStore s
-> [(RecipientId, a)]
-> IO
(Either
ErrorType
([(Maybe RecipientId, [(RecipientId, a)])], [(RecipientId, a)]))
getQueueNtfServices = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> [(RecipientId, a)]
-> IO
(Either
ErrorType
([(Maybe RecipientId, [(RecipientId, a)])], [(RecipientId, a)])))
-> QStore s
-> [(RecipientId, a)]
-> IO
(Either
ErrorType
([(Maybe RecipientId, [(RecipientId, a)])], [(RecipientId, a)]))
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS (forall q s a.
QueueStoreClass q s =>
s
-> [(RecipientId, a)]
-> IO
(Either
ErrorType
([(Maybe RecipientId, [(RecipientId, a)])], [(RecipientId, a)]))
getQueueNtfServices @(JournalQueue s))
{-# INLINE getQueueNtfServices #-}
getServiceQueueCount :: forall (p :: Party).
(PartyI p, ServiceParty p) =>
QStore s -> SParty p -> RecipientId -> IO (Either ErrorType Int64)
getServiceQueueCount = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s
-> SParty p -> RecipientId -> IO (Either ErrorType Int64))
-> QStore s
-> SParty p
-> RecipientId
-> IO (Either ErrorType Int64)
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS (forall q s (p :: Party).
(QueueStoreClass q s, PartyI p, ServiceParty p) =>
s -> SParty p -> RecipientId -> IO (Either ErrorType Int64)
getServiceQueueCount @(JournalQueue s))
{-# INLINE getServiceQueueCount #-}
makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
makeQueue_ :: forall (s :: QSType).
JournalMsgStore s
-> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
makeQueue_ JournalMsgStore {TMVar RecipientId
$sel:sharedLock:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> TMVar RecipientId
sharedLock :: TMVar RecipientId
sharedLock} RecipientId
rId QueueRec
qr Lock
queueLock = do
TVar (Maybe QueueRec)
queueRec' <- Maybe QueueRec -> IO (TVar (Maybe QueueRec))
forall a. a -> IO (TVar a)
newTVarIO (Maybe QueueRec -> IO (TVar (Maybe QueueRec)))
-> Maybe QueueRec -> IO (TVar (Maybe QueueRec))
forall a b. (a -> b) -> a -> b
$ QueueRec -> Maybe QueueRec
forall a. a -> Maybe a
Just QueueRec
qr
TVar (Maybe (JournalMsgQueue s))
msgQueue' <- Maybe (JournalMsgQueue s) -> IO (TVar (Maybe (JournalMsgQueue s)))
forall a. a -> IO (TVar a)
newTVarIO Maybe (JournalMsgQueue s)
forall a. Maybe a
Nothing
TVar Int64
activeAt <- Int64 -> IO (TVar Int64)
forall a. a -> IO (TVar a)
newTVarIO Int64
0
TVar (Maybe QState)
queueState <- Maybe QState -> IO (TVar (Maybe QState))
forall a. a -> IO (TVar a)
newTVarIO Maybe QState
forall a. Maybe a
Nothing
JournalQueue s -> IO (JournalQueue s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (JournalQueue s -> IO (JournalQueue s))
-> JournalQueue s -> IO (JournalQueue s)
forall a b. (a -> b) -> a -> b
$
JournalQueue
{ $sel:recipientId':JournalQueue :: RecipientId
recipientId' = RecipientId
rId,
Lock
$sel:queueLock:JournalQueue :: Lock
queueLock :: Lock
queueLock,
TMVar RecipientId
$sel:sharedLock:JournalQueue :: TMVar RecipientId
sharedLock :: TMVar RecipientId
sharedLock,
TVar (Maybe QueueRec)
$sel:queueRec':JournalQueue :: TVar (Maybe QueueRec)
queueRec' :: TVar (Maybe QueueRec)
queueRec',
TVar (Maybe (JournalMsgQueue s))
$sel:msgQueue':JournalQueue :: TVar (Maybe (JournalMsgQueue s))
msgQueue' :: TVar (Maybe (JournalMsgQueue s))
msgQueue',
TVar Int64
$sel:activeAt:JournalQueue :: TVar Int64
activeAt :: TVar Int64
activeAt,
TVar (Maybe QState)
$sel:queueState:JournalQueue :: TVar (Maybe QState)
queueState :: TVar (Maybe QState)
queueState
}
instance MsgStoreClass (JournalMsgStore s) where
type StoreMonad (JournalMsgStore s) = StoreIO s
type MsgQueue (JournalMsgStore s) = JournalMsgQueue s
type QueueStore (JournalMsgStore s) = QStore s
type StoreQueue (JournalMsgStore s) = JournalQueue s
type MsgStoreConfig (JournalMsgStore s) = JournalStoreConfig s
newMsgStore :: JournalStoreConfig s -> IO (JournalMsgStore s)
newMsgStore :: JournalStoreConfig s -> IO (JournalMsgStore s)
newMsgStore config :: JournalStoreConfig s
config@JournalStoreConfig {QStoreCfg s
$sel:queueStoreCfg:JournalStoreConfig :: forall (s :: QSType). JournalStoreConfig s -> QStoreCfg s
queueStoreCfg :: QStoreCfg s
queueStoreCfg} = do
TVar StdGen
random <- StdGen -> IO (TVar StdGen)
forall a. a -> IO (TVar a)
newTVarIO (StdGen -> IO (TVar StdGen)) -> IO StdGen -> IO (TVar StdGen)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO StdGen
forall (m :: * -> *). MonadIO m => m StdGen
newStdGen
TMap RecipientId Lock
queueLocks <- IO (TMap RecipientId Lock)
forall k a. IO (TMap k a)
TM.emptyIO
TMVar RecipientId
sharedLock <- IO (TMVar RecipientId)
forall a. IO (TMVar a)
newEmptyTMVarIO
QStore s
queueStore_ <- forall q s. QueueStoreClass q s => QueueStoreCfg s -> IO s
newQueueStore @(JournalQueue s) QueueStoreCfg (QStore s)
QStoreCfg s
queueStoreCfg
TVar Int
openedQueueCount <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
UTCTime
expireBackupsBefore <- NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (- JournalStoreConfig s -> NominalDiffTime
forall (s :: QSType). JournalStoreConfig s -> NominalDiffTime
expireBackupsAfter JournalStoreConfig s
config) (UTCTime -> UTCTime) -> IO UTCTime -> IO UTCTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UTCTime
getCurrentTime
JournalMsgStore s -> IO (JournalMsgStore s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JournalMsgStore {JournalStoreConfig s
$sel:config:JournalMsgStore :: JournalStoreConfig s
config :: JournalStoreConfig s
config, TVar StdGen
$sel:random:JournalMsgStore :: TVar StdGen
random :: TVar StdGen
random, TMap RecipientId Lock
$sel:queueLocks:JournalMsgStore :: TMap RecipientId Lock
queueLocks :: TMap RecipientId Lock
queueLocks, TMVar RecipientId
$sel:sharedLock:JournalMsgStore :: TMVar RecipientId
sharedLock :: TMVar RecipientId
sharedLock, QStore s
$sel:queueStore_:JournalMsgStore :: QStore s
queueStore_ :: QStore s
queueStore_, TVar Int
$sel:openedQueueCount:JournalMsgStore :: TVar Int
openedQueueCount :: TVar Int
openedQueueCount, UTCTime
$sel:expireBackupsBefore:JournalMsgStore :: UTCTime
expireBackupsBefore :: UTCTime
expireBackupsBefore}
closeMsgStore :: JournalMsgStore s -> IO ()
closeMsgStore :: JournalMsgStore s -> IO ()
closeMsgStore JournalMsgStore s
ms = do
let st :: QStore s
st = JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms
TVar (Map RecipientId (JournalQueue s)) -> IO ()
closeQueues (TVar (Map RecipientId (JournalQueue s)) -> IO ())
-> TVar (Map RecipientId (JournalQueue s)) -> IO ()
forall a b. (a -> b) -> a -> b
$ forall q s. QueueStoreClass q s => s -> TMap RecipientId q
loadedQueues @(JournalQueue s) QStore s
st
forall q s. QueueStoreClass q s => s -> IO ()
closeQueueStore @(JournalQueue s) QStore s
st
where
closeQueues :: TVar (Map RecipientId (JournalQueue s)) -> IO ()
closeQueues TVar (Map RecipientId (JournalQueue s))
qs = TVar (Map RecipientId (JournalQueue s))
-> IO (Map RecipientId (JournalQueue s))
forall a. TVar a -> IO a
readTVarIO TVar (Map RecipientId (JournalQueue s))
qs IO (Map RecipientId (JournalQueue s))
-> (Map RecipientId (JournalQueue s) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (JournalQueue s -> IO ())
-> Map RecipientId (JournalQueue s) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms)
withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
withActiveMsgQueues :: forall a.
Monoid a =>
JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
withActiveMsgQueues = (QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> (JournalQueue s -> IO a) -> IO a)
-> QStore s -> (JournalQueue s -> IO a) -> IO a
forall (s :: QSType) r.
(QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> r)
-> QStore s -> r
withQS QueueStoreClass (JournalQueue s) (QStoreType s) =>
QStoreType s -> (JournalQueue s -> IO a) -> IO a
QStoreType s -> (JournalQueue s -> IO a) -> IO a
forall a q s.
(Monoid a, QueueStoreClass q s) =>
s -> (q -> IO a) -> IO a
withLoadedQueues (QStore s -> (JournalQueue s -> IO a) -> IO a)
-> (JournalMsgStore s -> QStore s)
-> JournalMsgStore s
-> (JournalQueue s -> IO a)
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_
unsafeWithAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
unsafeWithAllMsgQueues :: forall a.
Monoid a =>
Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
unsafeWithAllMsgQueues Bool
tty JournalMsgStore s
ms JournalQueue s -> IO a
action = case JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms of
MQStore QStoreType 'QSMemory
st -> STMQueueStore (JournalQueue 'QSMemory)
-> (JournalQueue s -> IO a) -> IO a
forall a q s.
(Monoid a, QueueStoreClass q s) =>
s -> (q -> IO a) -> IO a
withLoadedQueues STMQueueStore (JournalQueue 'QSMemory)
QStoreType 'QSMemory
st JournalQueue s -> IO a
run
#if defined(dbServerPostgres)
PQStore st -> foldQueueRecs False tty st $ uncurry (mkQueue ms False) >=> run
#endif
where
run :: JournalQueue s -> IO a
run JournalQueue s
q = do
a
r <- JournalQueue s -> IO a
action JournalQueue s
q
JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue s
q
a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
expireOldMessages :: Bool -> JournalMsgStore s -> Int64 -> Int64 -> IO MessageStats
expireOldMessages :: Bool -> JournalMsgStore s -> Int64 -> Int64 -> IO MessageStats
expireOldMessages Bool
tty JournalMsgStore s
ms Int64
now Int64
ttl = case JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms of
MQStore QStoreType 'QSMemory
st ->
STMQueueStore (JournalQueue 'QSMemory)
-> (JournalQueue 'QSMemory -> IO MessageStats) -> IO MessageStats
forall a q s.
(Monoid a, QueueStoreClass q s) =>
s -> (q -> IO a) -> IO a
withLoadedQueues STMQueueStore (JournalQueue 'QSMemory)
QStoreType 'QSMemory
st ((JournalQueue 'QSMemory -> IO MessageStats) -> IO MessageStats)
-> (JournalQueue 'QSMemory -> IO MessageStats) -> IO MessageStats
forall a b. (a -> b) -> a -> b
$ \JournalQueue 'QSMemory
q -> ExceptT ErrorType IO MessageStats -> IO MessageStats
run (ExceptT ErrorType IO MessageStats -> IO MessageStats)
-> ExceptT ErrorType IO MessageStats -> IO MessageStats
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Text
-> StoreMonad (JournalMsgStore s) MessageStats
-> ExceptT ErrorType IO MessageStats
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
forall a.
JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Text
-> StoreMonad (JournalMsgStore s) a
-> ExceptT ErrorType IO a
isolateQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue 'QSMemory
q Text
"deleteExpiredMsgs" (StoreMonad (JournalMsgStore s) MessageStats
-> ExceptT ErrorType IO MessageStats)
-> StoreMonad (JournalMsgStore s) MessageStats
-> ExceptT ErrorType IO MessageStats
forall a b. (a -> b) -> a -> b
$ do
IO (Maybe QueueRec) -> StoreIO 'QSMemory (Maybe QueueRec)
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (TVar (Maybe QueueRec) -> IO (Maybe QueueRec)
forall a. TVar a -> IO a
readTVarIO (TVar (Maybe QueueRec) -> IO (Maybe QueueRec))
-> TVar (Maybe QueueRec) -> IO (Maybe QueueRec)
forall a b. (a -> b) -> a -> b
$ JournalQueue 'QSMemory -> TVar (Maybe QueueRec)
forall q. StoreQueueClass q => q -> TVar (Maybe QueueRec)
queueRec JournalQueue 'QSMemory
q) StoreIO 'QSMemory (Maybe QueueRec)
-> (Maybe QueueRec -> StoreIO 'QSMemory MessageStats)
-> StoreIO 'QSMemory MessageStats
forall a b.
StoreIO 'QSMemory a
-> (a -> StoreIO 'QSMemory b) -> StoreIO 'QSMemory b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just QueueRec {$sel:updatedAt:QueueRec :: QueueRec -> Maybe SystemDate
updatedAt = Just (RoundedSystemTime Int64
t)} | Int64
t Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
veryOld ->
JournalMsgStore s
-> Int64
-> Int64
-> StoreQueue (JournalMsgStore s)
-> StoreMonad (JournalMsgStore s) MessageStats
forall s.
MsgStoreClass s =>
s -> Int64 -> Int64 -> StoreQueue s -> StoreMonad s MessageStats
expireQueueMsgs JournalMsgStore s
ms Int64
now Int64
old StoreQueue (JournalMsgStore s)
JournalQueue 'QSMemory
q
Maybe QueueRec
_ -> MessageStats -> StoreIO 'QSMemory MessageStats
forall a. a -> StoreIO 'QSMemory a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MessageStats
newMessageStats
#if defined(dbServerPostgres)
PQStore st -> do
let JournalMsgStore {queueLocks, sharedLock} = ms
foldRecentQueueRecs veryOld tty st $ \(rId, qr) -> do
q <- mkQueue ms False rId qr
withSharedWaitLock rId queueLocks sharedLock $ run $ tryStore' "deleteExpiredMsgs" rId $
getLoadedQueue q >>= unStoreIO . expireQueueMsgs ms now old
#endif
where
old :: Int64
old = Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
ttl
veryOld :: Int64
veryOld = Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
2 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
ttl Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
86400
run :: ExceptT ErrorType IO MessageStats -> IO MessageStats
run :: ExceptT ErrorType IO MessageStats -> IO MessageStats
run = (Either ErrorType MessageStats -> MessageStats)
-> IO (Either ErrorType MessageStats) -> IO MessageStats
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (MessageStats -> Either ErrorType MessageStats -> MessageStats
forall b a. b -> Either a b -> b
fromRight MessageStats
newMessageStats) (IO (Either ErrorType MessageStats) -> IO MessageStats)
-> (ExceptT ErrorType IO MessageStats
-> IO (Either ErrorType MessageStats))
-> ExceptT ErrorType IO MessageStats
-> IO MessageStats
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT ErrorType IO MessageStats
-> IO (Either ErrorType MessageStats)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
getLoadedQueue :: JournalQueue s -> IO (JournalQueue s)
getLoadedQueue :: JournalQueue s -> IO (JournalQueue s)
getLoadedQueue JournalQueue s
q = JournalQueue s -> Maybe (JournalQueue s) -> JournalQueue s
forall a. a -> Maybe a -> a
fromMaybe JournalQueue s
q (Maybe (JournalQueue s) -> JournalQueue s)
-> IO (Maybe (JournalQueue s)) -> IO (JournalQueue s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RecipientId
-> TVar (Map RecipientId (JournalQueue s))
-> IO (Maybe (JournalQueue s))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO (JournalQueue s -> RecipientId
forall q. StoreQueueClass q => q -> RecipientId
recipientId JournalQueue s
q) (QStore s -> TVar (Map RecipientId (JournalQueue s))
forall q s. QueueStoreClass q s => s -> TMap RecipientId q
loadedQueues (QStore s -> TVar (Map RecipientId (JournalQueue s)))
-> QStore s -> TVar (Map RecipientId (JournalQueue s))
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms)
logQueueStates :: JournalMsgStore s -> IO ()
logQueueStates :: JournalMsgStore s -> IO ()
logQueueStates JournalMsgStore s
ms = JournalMsgStore s
-> (StoreQueue (JournalMsgStore s) -> IO ()) -> IO ()
forall a.
Monoid a =>
JournalMsgStore s
-> (StoreQueue (JournalMsgStore s) -> IO a) -> IO a
forall s a.
(MsgStoreClass s, Monoid a) =>
s -> (StoreQueue s -> IO a) -> IO a
withActiveMsgQueues JournalMsgStore s
ms ((StoreQueue (JournalMsgStore s) -> IO ()) -> IO ())
-> (StoreQueue (JournalMsgStore s) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ StoreIO s () -> IO ()
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (StoreIO s () -> IO ())
-> (JournalQueue s -> StoreIO s ()) -> JournalQueue s -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StoreQueue (JournalMsgStore s) -> StoreMonad (JournalMsgStore s) ()
JournalQueue s -> StoreIO s ()
forall s. MsgStoreClass s => StoreQueue s -> StoreMonad s ()
logQueueState
logQueueState :: JournalQueue s -> StoreIO s ()
logQueueState :: JournalQueue s -> StoreIO s ()
logQueueState JournalQueue s
q =
IO () -> StoreIO s ()
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO () -> StoreIO s ())
-> (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> StoreIO s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> StoreIO s ()) -> IO (Maybe ()) -> StoreIO s ()
forall a b. (a -> b) -> a -> b
$
TVar (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. TVar a -> IO a
readTVarIO (JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' JournalQueue s
q)
IO (Maybe (JournalMsgQueue s))
-> (JournalMsgQueue s -> IO (Maybe ())) -> IO (Maybe ())
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \JournalMsgQueue s
mq -> TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles JournalMsgQueue s
mq)
IO (Maybe MsgQueueHandles)
-> (MsgQueueHandles -> IO (Maybe ())) -> IO (Maybe ())
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= (\MsgQueueHandles
hs -> (TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
mq) IO MsgQueueState -> (MsgQueueState -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Handle -> MsgQueueState -> IO ()
appendState (MsgQueueHandles -> Handle
stateHandle MsgQueueHandles
hs)) IO () -> Maybe () -> IO (Maybe ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> () -> Maybe ()
forall a. a -> Maybe a
Just ())
queueStore :: JournalMsgStore s -> QueueStore (JournalMsgStore s)
queueStore = JournalMsgStore s -> QueueStore (JournalMsgStore s)
JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_
{-# INLINE queueStore #-}
loadedQueueCounts :: JournalMsgStore s -> IO LoadedQueueCounts
loadedQueueCounts :: JournalMsgStore s -> IO LoadedQueueCounts
loadedQueueCounts JournalMsgStore s
ms = do
let (TVar (Map RecipientId (JournalQueue s))
qs, TMap RecipientId RecipientId
ns, Maybe (TMap RecipientId Lock)
nLocks_) = (TVar (Map RecipientId (JournalQueue s)),
TMap RecipientId RecipientId, Maybe (TMap RecipientId Lock))
loaded
Int
loadedQueueCount <- Map RecipientId (JournalQueue s) -> Int
forall k a. Map k a -> Int
M.size (Map RecipientId (JournalQueue s) -> Int)
-> IO (Map RecipientId (JournalQueue s)) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Map RecipientId (JournalQueue s))
-> IO (Map RecipientId (JournalQueue s))
forall a. TVar a -> IO a
readTVarIO TVar (Map RecipientId (JournalQueue s))
qs
Int
loadedNotifierCount <- Map RecipientId RecipientId -> Int
forall k a. Map k a -> Int
M.size (Map RecipientId RecipientId -> Int)
-> IO (Map RecipientId RecipientId) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap RecipientId RecipientId -> IO (Map RecipientId RecipientId)
forall a. TVar a -> IO a
readTVarIO TMap RecipientId RecipientId
ns
Int
openJournalCount <- TVar Int -> IO Int
forall a. TVar a -> IO a
readTVarIO (JournalMsgStore s -> TVar Int
forall (s :: QSType). JournalMsgStore s -> TVar Int
openedQueueCount JournalMsgStore s
ms)
Int
queueLockCount <- Map RecipientId Lock -> Int
forall k a. Map k a -> Int
M.size (Map RecipientId Lock -> Int)
-> IO (Map RecipientId Lock) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap RecipientId Lock -> IO (Map RecipientId Lock)
forall a. TVar a -> IO a
readTVarIO (JournalMsgStore s -> TMap RecipientId Lock
forall (s :: QSType). JournalMsgStore s -> TMap RecipientId Lock
queueLocks JournalMsgStore s
ms)
Int
notifierLockCount <- IO Int
-> (TMap RecipientId Lock -> IO Int)
-> Maybe (TMap RecipientId Lock)
-> IO Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0) ((Map RecipientId Lock -> Int)
-> IO (Map RecipientId Lock) -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map RecipientId Lock -> Int
forall k a. Map k a -> Int
M.size (IO (Map RecipientId Lock) -> IO Int)
-> (TMap RecipientId Lock -> IO (Map RecipientId Lock))
-> TMap RecipientId Lock
-> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMap RecipientId Lock -> IO (Map RecipientId Lock)
forall a. TVar a -> IO a
readTVarIO) Maybe (TMap RecipientId Lock)
nLocks_
LoadedQueueCounts -> IO LoadedQueueCounts
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure LoadedQueueCounts {Int
loadedQueueCount :: Int
$sel:loadedQueueCount:LoadedQueueCounts :: Int
loadedQueueCount, Int
loadedNotifierCount :: Int
$sel:loadedNotifierCount:LoadedQueueCounts :: Int
loadedNotifierCount, Int
openJournalCount :: Int
$sel:openJournalCount:LoadedQueueCounts :: Int
openJournalCount, Int
queueLockCount :: Int
$sel:queueLockCount:LoadedQueueCounts :: Int
queueLockCount, Int
notifierLockCount :: Int
$sel:notifierLockCount:LoadedQueueCounts :: Int
notifierLockCount}
where
loaded :: (TMap RecipientId (JournalQueue s), TMap NotifierId RecipientId, Maybe (TMap NotifierId Lock))
loaded :: (TVar (Map RecipientId (JournalQueue s)),
TMap RecipientId RecipientId, Maybe (TMap RecipientId Lock))
loaded = case JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms of
MQStore STMQueueStore {TVar (Map RecipientId (JournalQueue s))
queues :: TVar (Map RecipientId (JournalQueue s))
$sel:queues:STMQueueStore :: forall q. STMQueueStore q -> TMap RecipientId q
queues, TMap RecipientId RecipientId
notifiers :: TMap RecipientId RecipientId
$sel:notifiers:STMQueueStore :: forall q. STMQueueStore q -> TMap RecipientId RecipientId
notifiers} -> (TVar (Map RecipientId (JournalQueue s))
queues, TMap RecipientId RecipientId
notifiers, Maybe (TMap RecipientId Lock)
forall a. Maybe a
Nothing)
#if defined(dbServerPostgres)
PQStore PostgresQueueStore {queues, notifiers, notifierLocks} -> (queues, notifiers, Just notifierLocks)
#endif
mkQueue :: JournalMsgStore s -> Bool -> RecipientId -> QueueRec -> IO (JournalQueue s)
mkQueue :: JournalMsgStore s
-> Bool -> RecipientId -> QueueRec -> IO (JournalQueue s)
mkQueue JournalMsgStore s
ms Bool
keepLock RecipientId
rId QueueRec
qr = do
Lock
lock <- if Bool
keepLock then STM Lock -> IO Lock
forall a. STM a -> IO a
atomically (STM Lock -> IO Lock) -> STM Lock -> IO Lock
forall a b. (a -> b) -> a -> b
$ TMap RecipientId Lock -> RecipientId -> STM Lock
forall k. Ord k => TMap k Lock -> k -> STM Lock
getMapLock (JournalMsgStore s -> TMap RecipientId Lock
forall (s :: QSType). JournalMsgStore s -> TMap RecipientId Lock
queueLocks JournalMsgStore s
ms) RecipientId
rId else IO Lock
createLockIO
JournalMsgStore s
-> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
forall (s :: QSType).
JournalMsgStore s
-> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
makeQueue_ JournalMsgStore s
ms RecipientId
rId QueueRec
qr Lock
lock
getMsgQueue :: JournalMsgStore s -> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s)
getMsgQueue :: JournalMsgStore s
-> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s)
getMsgQueue ms :: JournalMsgStore s
ms@JournalMsgStore {TVar StdGen
$sel:random:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random :: TVar StdGen
random} q' :: JournalQueue s
q'@JournalQueue {$sel:recipientId':JournalQueue :: forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' = RecipientId
rId, TVar (Maybe (JournalMsgQueue s))
$sel:msgQueue':JournalQueue :: forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' :: TVar (Maybe (JournalMsgQueue s))
msgQueue'} Bool
forWrite =
IO (JournalMsgQueue s) -> StoreIO s (JournalMsgQueue s)
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO (JournalMsgQueue s) -> StoreIO s (JournalMsgQueue s))
-> IO (JournalMsgQueue s) -> StoreIO s (JournalMsgQueue s)
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (JournalMsgQueue s))
msgQueue' IO (Maybe (JournalMsgQueue s))
-> (Maybe (JournalMsgQueue s) -> IO (JournalMsgQueue s))
-> IO (JournalMsgQueue s)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (JournalMsgQueue s)
-> (JournalMsgQueue s -> IO (JournalMsgQueue s))
-> Maybe (JournalMsgQueue s)
-> IO (JournalMsgQueue s)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (JournalMsgQueue s)
newQ JournalMsgQueue s -> IO (JournalMsgQueue s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
where
newQ :: IO (JournalMsgQueue s)
newQ = do
let dir :: String
dir = JournalMsgStore s -> RecipientId -> String
forall (s :: QSType). JournalMsgStore s -> RecipientId -> String
msgQueueDirectory JournalMsgStore s
ms RecipientId
rId
statePath :: String
statePath = String -> RecipientId -> String
msgQueueStatePath String
dir RecipientId
rId
queue :: JMQueue
queue = JMQueue {$sel:queueDirectory:JMQueue :: String
queueDirectory = String
dir, String
$sel:statePath:JMQueue :: String
statePath :: String
statePath}
JournalMsgQueue s
q <- IO Bool
-> IO (JournalMsgQueue s)
-> IO (JournalMsgQueue s)
-> IO (JournalMsgQueue s)
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (String -> IO Bool
doesDirectoryExist String
dir) (JournalMsgStore s -> JMQueue -> Bool -> IO (JournalMsgQueue s)
forall (s :: QSType).
JournalMsgStore s -> JMQueue -> Bool -> IO (JournalMsgQueue s)
openMsgQueue JournalMsgStore s
ms JMQueue
queue Bool
forWrite) (JMQueue -> IO (JournalMsgQueue s)
createQ JMQueue
queue)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (JournalMsgQueue s))
-> Maybe (JournalMsgQueue s) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (JournalMsgQueue s))
msgQueue' (Maybe (JournalMsgQueue s) -> STM ())
-> Maybe (JournalMsgQueue s) -> STM ()
forall a b. (a -> b) -> a -> b
$ JournalMsgQueue s -> Maybe (JournalMsgQueue s)
forall a. a -> Maybe a
Just JournalMsgQueue s
q
MsgQueueState
st <- TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (TVar MsgQueueState -> IO MsgQueueState)
-> TVar MsgQueueState -> IO MsgQueueState
forall a b. (a -> b) -> a -> b
$ JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe QState) -> Maybe QState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalQueue s -> TVar (Maybe QState)
forall (s :: QSType). JournalQueue s -> TVar (Maybe QState)
queueState JournalQueue s
q') (Maybe QState -> STM ()) -> Maybe QState -> STM ()
forall a b. (a -> b) -> a -> b
$ QState -> Maybe QState
forall a. a -> Maybe a
Just (QState -> Maybe QState) -> QState -> Maybe QState
forall a b. (a -> b) -> a -> b
$! MsgQueueState -> QState
qState MsgQueueState
st
JournalMsgQueue s -> IO (JournalMsgQueue s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JournalMsgQueue s
q
where
createQ :: JMQueue -> IO (JournalMsgQueue s)
createQ :: JMQueue -> IO (JournalMsgQueue s)
createQ JMQueue
queue = do
ByteString
journalId <- TVar StdGen -> IO ByteString
newJournalId TVar StdGen
random
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
forall (s :: QSType).
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue JMQueue
queue (ByteString -> MsgQueueState
newMsgQueueState ByteString
journalId) Maybe MsgQueueHandles
forall a. Maybe a
Nothing
getPeekMsgQueue :: JournalMsgStore s -> JournalQueue s -> StoreIO s (Maybe (JournalMsgQueue s, Message))
getPeekMsgQueue :: JournalMsgStore s
-> JournalQueue s -> StoreIO s (Maybe (JournalMsgQueue s, Message))
getPeekMsgQueue JournalMsgStore s
ms q :: JournalQueue s
q@JournalQueue {TVar (Maybe QState)
$sel:queueState:JournalQueue :: forall (s :: QSType). JournalQueue s -> TVar (Maybe QState)
queueState :: TVar (Maybe QState)
queueState} =
IO (Maybe QState) -> StoreIO s (Maybe QState)
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (TVar (Maybe QState) -> IO (Maybe QState)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe QState)
queueState) StoreIO s (Maybe QState)
-> (Maybe QState -> StoreIO s (Maybe (JournalMsgQueue s, Message)))
-> StoreIO s (Maybe (JournalMsgQueue s, Message))
forall a b. StoreIO s a -> (a -> StoreIO s b) -> StoreIO s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just QState {Bool
$sel:hasPending:QState :: QState -> Bool
hasPending :: Bool
hasPending} -> if Bool
hasPending then StoreIO s (Maybe (JournalMsgQueue s, Message))
peek else Maybe (JournalMsgQueue s, Message)
-> StoreIO s (Maybe (JournalMsgQueue s, Message))
forall a. a -> StoreIO s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JournalMsgQueue s, Message)
forall a. Maybe a
Nothing
Maybe QState
Nothing -> do
Maybe (JournalMsgQueue s, Message)
r <- StoreIO s (Maybe (JournalMsgQueue s, Message))
peek
Bool -> StoreIO s () -> StoreIO s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (JournalMsgQueue s, Message) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (JournalMsgQueue s, Message)
r) (StoreIO s () -> StoreIO s ()) -> StoreIO s () -> StoreIO s ()
forall a b. (a -> b) -> a -> b
$ IO () -> StoreIO s ()
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO () -> StoreIO s ()) -> IO () -> StoreIO s ()
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue s
q
Maybe (JournalMsgQueue s, Message)
-> StoreIO s (Maybe (JournalMsgQueue s, Message))
forall a. a -> StoreIO s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JournalMsgQueue s, Message)
r
where
peek :: StoreIO s (Maybe (JournalMsgQueue s, Message))
peek = do
JournalMsgQueue s
mq <- JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Bool
-> StoreMonad (JournalMsgStore s) (MsgQueue (JournalMsgStore s))
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getMsgQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue s
q Bool
False
(JournalMsgQueue s
mq,) (Message -> (JournalMsgQueue s, Message))
-> StoreIO s (Maybe Message)
-> StoreIO s (Maybe (JournalMsgQueue s, Message))
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> StoreQueue (JournalMsgStore s)
-> MsgQueue (JournalMsgStore s)
-> StoreMonad (JournalMsgStore s) (Maybe Message)
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryPeekMsg_ StoreQueue (JournalMsgStore s)
JournalQueue s
q MsgQueue (JournalMsgStore s)
JournalMsgQueue s
mq
withIdleMsgQueue :: Int64 -> JournalMsgStore s -> JournalQueue s -> (JournalMsgQueue s -> StoreIO s a) -> StoreIO s (Maybe a, Int)
withIdleMsgQueue :: forall a.
Int64
-> JournalMsgStore s
-> JournalQueue s
-> (JournalMsgQueue s -> StoreIO s a)
-> StoreIO s (Maybe a, Int)
withIdleMsgQueue Int64
now ms :: JournalMsgStore s
ms@JournalMsgStore {JournalStoreConfig s
$sel:config:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config :: JournalStoreConfig s
config} q :: JournalQueue s
q@JournalQueue {TVar (Maybe QState)
$sel:queueState:JournalQueue :: forall (s :: QSType). JournalQueue s -> TVar (Maybe QState)
queueState :: TVar (Maybe QState)
queueState} JournalMsgQueue s -> StoreIO s a
action =
IO (Maybe a, Int) -> StoreIO s (Maybe a, Int)
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO (Maybe a, Int) -> StoreIO s (Maybe a, Int))
-> IO (Maybe a, Int) -> StoreIO s (Maybe a, Int)
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. TVar a -> IO a
readTVarIO (JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' JournalQueue s
q) IO (Maybe (JournalMsgQueue s))
-> (Maybe (JournalMsgQueue s) -> IO (Maybe a, Int))
-> IO (Maybe a, Int)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (JournalMsgQueue s)
Nothing ->
IO (Maybe (JournalMsgQueue s))
-> (Maybe (JournalMsgQueue s) -> IO ())
-> (Maybe (JournalMsgQueue s) -> IO (Maybe a, Int))
-> IO (Maybe a, Int)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracket
IO (Maybe (JournalMsgQueue s))
getNonEmptyMsgQueue
((JournalMsgQueue s -> IO ()) -> Maybe (JournalMsgQueue s) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((JournalMsgQueue s -> IO ())
-> Maybe (JournalMsgQueue s) -> IO ())
-> (JournalMsgQueue s -> IO ())
-> Maybe (JournalMsgQueue s)
-> IO ()
forall a b. (a -> b) -> a -> b
$ \JournalMsgQueue s
_ -> JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue s
q)
(IO (Maybe a, Int)
-> (JournalMsgQueue s -> IO (Maybe a, Int))
-> Maybe (JournalMsgQueue s)
-> IO (Maybe a, Int)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((Maybe a, Int) -> IO (Maybe a, Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a
forall a. Maybe a
Nothing, Int
0)) (StoreIO s (Maybe a, Int) -> IO (Maybe a, Int)
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (StoreIO s (Maybe a, Int) -> IO (Maybe a, Int))
-> (JournalMsgQueue s -> StoreIO s (Maybe a, Int))
-> JournalMsgQueue s
-> IO (Maybe a, Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JournalMsgQueue s -> StoreIO s (Maybe a, Int)
run))
where
run :: JournalMsgQueue s -> StoreIO s (Maybe a, Int)
run JournalMsgQueue s
mq = do
a
r <- JournalMsgQueue s -> StoreIO s a
action JournalMsgQueue s
mq
Int
sz <- MsgQueue (JournalMsgStore s) -> StoreMonad (JournalMsgStore s) Int
forall s. MsgStoreClass s => MsgQueue s -> StoreMonad s Int
getQueueSize_ MsgQueue (JournalMsgStore s)
JournalMsgQueue s
mq
(Maybe a, Int) -> StoreIO s (Maybe a, Int)
forall a. a -> StoreIO s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
r, Int
sz)
Just JournalMsgQueue s
mq -> do
Int64
ts <- TVar Int64 -> IO Int64
forall a. TVar a -> IO a
readTVarIO (TVar Int64 -> IO Int64) -> TVar Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ JournalQueue s -> TVar Int64
forall (s :: QSType). JournalQueue s -> TVar Int64
activeAt JournalQueue s
q
Maybe a
r <- if Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
ts Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
>= JournalStoreConfig s -> Int64
forall (s :: QSType). JournalStoreConfig s -> Int64
idleInterval JournalStoreConfig s
config
then a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StoreIO s a -> IO a
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (JournalMsgQueue s -> StoreIO s a
action JournalMsgQueue s
mq) IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`E.finally` JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue s
q
else Maybe a -> IO (Maybe a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
Int
sz <- StoreIO s Int -> IO Int
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (StoreIO s Int -> IO Int) -> StoreIO s Int -> IO Int
forall a b. (a -> b) -> a -> b
$ MsgQueue (JournalMsgStore s) -> StoreMonad (JournalMsgStore s) Int
forall s. MsgStoreClass s => MsgQueue s -> StoreMonad s Int
getQueueSize_ MsgQueue (JournalMsgStore s)
JournalMsgQueue s
mq
(Maybe a, Int) -> IO (Maybe a, Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a
r, Int
sz)
where
getNonEmptyMsgQueue :: IO (Maybe (JournalMsgQueue s))
getNonEmptyMsgQueue :: IO (Maybe (JournalMsgQueue s))
getNonEmptyMsgQueue =
TVar (Maybe QState) -> IO (Maybe QState)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe QState)
queueState IO (Maybe QState)
-> (Maybe QState -> IO (Maybe (JournalMsgQueue s)))
-> IO (Maybe (JournalMsgQueue s))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just QState {Bool
$sel:hasStored:QState :: QState -> Bool
hasStored :: Bool
hasStored}
| Bool
hasStored -> JournalMsgQueue s -> Maybe (JournalMsgQueue s)
forall a. a -> Maybe a
Just (JournalMsgQueue s -> Maybe (JournalMsgQueue s))
-> IO (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StoreIO s (JournalMsgQueue s) -> IO (JournalMsgQueue s)
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Bool
-> StoreMonad (JournalMsgStore s) (MsgQueue (JournalMsgStore s))
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getMsgQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue s
q Bool
False)
| Bool
otherwise -> Maybe (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JournalMsgQueue s)
forall a. Maybe a
Nothing
Maybe QState
Nothing -> do
JournalMsgQueue s
mq <- StoreIO s (JournalMsgQueue s) -> IO (JournalMsgQueue s)
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO (StoreIO s (JournalMsgQueue s) -> IO (JournalMsgQueue s))
-> StoreIO s (JournalMsgQueue s) -> IO (JournalMsgQueue s)
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Bool
-> StoreMonad (JournalMsgStore s) (MsgQueue (JournalMsgStore s))
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getMsgQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue s
q Bool
False
TVar (Maybe QState) -> IO (Maybe QState)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe QState)
queueState IO (Maybe QState)
-> (Maybe QState -> IO (Maybe (JournalMsgQueue s)))
-> IO (Maybe (JournalMsgQueue s))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just QState {Bool
$sel:hasStored:QState :: QState -> Bool
hasStored :: Bool
hasStored} | Bool -> Bool
not Bool
hasStored -> JournalMsgStore s -> JournalQueue s -> IO ()
forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue s
q IO ()
-> Maybe (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe (JournalMsgQueue s)
forall a. Maybe a
Nothing
Maybe QState
_ -> Maybe (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s)))
-> Maybe (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s))
forall a b. (a -> b) -> a -> b
$ JournalMsgQueue s -> Maybe (JournalMsgQueue s)
forall a. a -> Maybe a
Just JournalMsgQueue s
mq
deleteQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType QueueRec)
deleteQueue :: JournalMsgStore s
-> JournalQueue s -> IO (Either ErrorType QueueRec)
deleteQueue JournalMsgStore s
ms JournalQueue s
q = (QueueRec, Maybe (JournalMsgQueue s)) -> QueueRec
forall a b. (a, b) -> a
fst ((QueueRec, Maybe (JournalMsgQueue s)) -> QueueRec)
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
-> IO (Either ErrorType QueueRec)
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> JournalMsgStore s
-> JournalQueue s
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall (s :: QSType).
JournalMsgStore s
-> JournalQueue s
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
deleteQueue_ JournalMsgStore s
ms JournalQueue s
q
deleteQueueSize :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize :: JournalMsgStore s
-> JournalQueue s -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize JournalMsgStore s
ms JournalQueue s
q =
JournalMsgStore s
-> JournalQueue s
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall (s :: QSType).
JournalMsgStore s
-> JournalQueue s
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
deleteQueue_ JournalMsgStore s
ms JournalQueue s
q IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
-> (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))
-> IO (Either ErrorType (QueueRec, Int)))
-> IO (Either ErrorType (QueueRec, Int))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((QueueRec, Maybe (JournalMsgQueue s)) -> IO (QueueRec, Int))
-> Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))
-> IO (Either ErrorType (QueueRec, Int))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Either ErrorType a -> m (Either ErrorType b)
mapM ((Maybe (JournalMsgQueue s) -> IO Int)
-> (QueueRec, Maybe (JournalMsgQueue s)) -> IO (QueueRec, Int)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> (QueueRec, a) -> f (QueueRec, b)
traverse Maybe (JournalMsgQueue s) -> IO Int
forall {s :: QSType}. Maybe (JournalMsgQueue s) -> IO Int
getSize)
where
getSize :: Maybe (JournalMsgQueue s) -> IO Int
getSize = IO Int
-> (JournalMsgQueue s -> IO Int)
-> Maybe (JournalMsgQueue s)
-> IO Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (-Int
1)) ((MsgQueueState -> Int) -> IO MsgQueueState -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MsgQueueState -> Int
size (IO MsgQueueState -> IO Int)
-> (JournalMsgQueue s -> IO MsgQueueState)
-> JournalMsgQueue s
-> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (TVar MsgQueueState -> IO MsgQueueState)
-> (JournalMsgQueue s -> TVar MsgQueueState)
-> JournalMsgQueue s
-> IO MsgQueueState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state)
getQueueMessages_ :: Bool -> JournalQueue s -> JournalMsgQueue s -> StoreIO s [Message]
getQueueMessages_ :: Bool -> JournalQueue s -> JournalMsgQueue s -> StoreIO s [Message]
getQueueMessages_ Bool
drainMsgs JournalQueue s
q' JournalMsgQueue s
q = IO [Message] -> StoreIO s [Message]
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO [Message] -> StoreIO s [Message])
-> IO [Message] -> StoreIO s [Message]
forall a b. (a -> b) -> a -> b
$ if Bool
drainMsgs then [Message] -> IO [Message]
run [] else TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q) IO MsgQueueState -> (MsgQueueState -> IO [Message]) -> IO [Message]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MsgQueueState -> IO [Message]
runFast
where
run :: [Message] -> IO [Message]
run [Message]
msgs = TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles JournalMsgQueue s
q) IO (Maybe MsgQueueHandles)
-> (Maybe MsgQueueHandles -> IO [Message]) -> IO [Message]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO [Message]
-> (MsgQueueHandles -> IO [Message])
-> Maybe MsgQueueHandles
-> IO [Message]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ([Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []) ([Message] -> MsgQueueHandles -> IO [Message]
getMsg [Message]
msgs)
getMsg :: [Message] -> MsgQueueHandles -> IO [Message]
getMsg [Message]
msgs MsgQueueHandles
hs = JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> IO (Maybe (JournalState 'JTRead, Handle))
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> IO (Maybe (JournalState 'JTRead, Handle))
chooseReadJournal JournalQueue s
q' JournalMsgQueue s
q Bool
drainMsgs MsgQueueHandles
hs IO (Maybe (JournalState 'JTRead, Handle))
-> (Maybe (JournalState 'JTRead, Handle) -> IO [Message])
-> IO [Message]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO [Message]
-> ((JournalState 'JTRead, Handle) -> IO [Message])
-> Maybe (JournalState 'JTRead, Handle)
-> IO [Message]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ([Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Message]
msgs) (JournalState 'JTRead, Handle) -> IO [Message]
readMsg
where
readMsg :: (JournalState 'JTRead, Handle) -> IO [Message]
readMsg (JournalState 'JTRead
rs, Handle
h) = do
(Message
msg, Int64
len) <- Handle -> Int64 -> IO (Message, Int64)
hGetMsgAt Handle
h (Int64 -> IO (Message, Int64)) -> Int64 -> IO (Message, Int64)
forall a b. (a -> b) -> a -> b
$ JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs
JournalQueue s
-> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
updateReadPos JournalQueue s
q' JournalMsgQueue s
q Bool
drainMsgs Int64
len MsgQueueHandles
hs
(Message
msg Message -> [Message] -> [Message]
forall a. a -> [a] -> [a]
:) ([Message] -> [Message]) -> IO [Message] -> IO [Message]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Message] -> IO [Message]
run [Message]
msgs
runFast :: MsgQueueState -> IO [Message]
runFast MsgQueueState {$sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws, $sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size}
| Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 =
TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles JournalMsgQueue s
q) IO (Maybe MsgQueueHandles)
-> (Maybe MsgQueueHandles -> IO [Message]) -> IO [Message]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just (MsgQueueHandles Handle
_ Handle
rh Maybe Handle
wh_) -> do
[Message]
msgs <- Handle -> Int64 -> Int64 -> IO [Message]
getJournalRange Handle
rh (JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs) (JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs)
case Maybe Handle
wh_ of
Just Handle
wh -> ([Message]
msgs [Message] -> [Message] -> [Message]
forall a. [a] -> [a] -> [a]
++) ([Message] -> [Message]) -> IO [Message] -> IO [Message]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> Int64 -> Int64 -> IO [Message]
getJournalRange Handle
wh Int64
0 (JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws)
Maybe Handle
Nothing -> [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Message]
msgs
Maybe MsgQueueHandles
Nothing -> [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
| Bool
otherwise = [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg :: JournalMsgStore s
-> JournalQueue s
-> Bool
-> Message
-> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg JournalMsgStore s
ms JournalQueue s
q' Bool
logState Message
msg = JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Text
-> StoreMonad (JournalMsgStore s) (Maybe (Message, Bool))
-> ExceptT ErrorType IO (Maybe (Message, Bool))
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
forall a.
JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Text
-> StoreMonad (JournalMsgStore s) a
-> ExceptT ErrorType IO a
isolateQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue s
q' Text
"writeMsg" (StoreMonad (JournalMsgStore s) (Maybe (Message, Bool))
-> ExceptT ErrorType IO (Maybe (Message, Bool)))
-> StoreMonad (JournalMsgStore s) (Maybe (Message, Bool))
-> ExceptT ErrorType IO (Maybe (Message, Bool))
forall a b. (a -> b) -> a -> b
$ do
JournalMsgQueue s
q <- JournalMsgStore s
-> StoreQueue (JournalMsgStore s)
-> Bool
-> StoreMonad (JournalMsgStore s) (MsgQueue (JournalMsgStore s))
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getMsgQueue JournalMsgStore s
ms StoreQueue (JournalMsgStore s)
JournalQueue s
q' Bool
True
IO (Maybe (Message, Bool)) -> StoreIO s (Maybe (Message, Bool))
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO (Maybe (Message, Bool)) -> StoreIO s (Maybe (Message, Bool)))
-> IO (Maybe (Message, Bool)) -> StoreIO s (Maybe (Message, Bool))
forall a b. (a -> b) -> a -> b
$ (IO (Maybe (Message, Bool)) -> IO () -> IO (Maybe (Message, Bool))
forall a b. IO a -> IO b -> IO a
`E.finally` JournalQueue s -> IO ()
forall (s :: QSType). JournalQueue s -> IO ()
updateActiveAt JournalQueue s
q') (IO (Maybe (Message, Bool)) -> IO (Maybe (Message, Bool)))
-> IO (Maybe (Message, Bool)) -> IO (Maybe (Message, Bool))
forall a b. (a -> b) -> a -> b
$ do
st :: MsgQueueState
st@MsgQueueState {Bool
$sel:canWrite:MsgQueueState :: MsgQueueState -> Bool
canWrite :: Bool
canWrite, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size} <- TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q)
let empty :: Bool
empty = Int
size Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
if Bool
canWrite Bool -> Bool -> Bool
|| Bool
empty
then do
let canWrt' :: Bool
canWrt' = Int
quota Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
size
if Bool
canWrt'
then JournalMsgQueue s -> MsgQueueState -> Bool -> Message -> IO ()
writeToJournal JournalMsgQueue s
q MsgQueueState
st Bool
canWrt' Message
msg IO () -> Maybe (Message, Bool) -> IO (Maybe (Message, Bool))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (Message, Bool) -> Maybe (Message, Bool)
forall a. a -> Maybe a
Just (Message
msg, Bool
empty)
else JournalMsgQueue s -> MsgQueueState -> Bool -> Message -> IO ()
writeToJournal JournalMsgQueue s
q MsgQueueState
st Bool
canWrt' Message
msgQuota IO () -> Maybe (Message, Bool) -> IO (Maybe (Message, Bool))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe (Message, Bool)
forall a. Maybe a
Nothing
else Maybe (Message, Bool) -> IO (Maybe (Message, Bool))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Message, Bool)
forall a. Maybe a
Nothing
where
JournalStoreConfig {Int
$sel:quota:JournalStoreConfig :: forall (s :: QSType). JournalStoreConfig s -> Int
quota :: Int
quota, Int
$sel:maxMsgCount:JournalStoreConfig :: forall (s :: QSType). JournalStoreConfig s -> Int
maxMsgCount :: Int
maxMsgCount} = JournalMsgStore s -> JournalStoreConfig s
forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config JournalMsgStore s
ms
msgQuota :: Message
msgQuota = MessageQuota {$sel:msgId:Message :: ByteString
msgId = Message -> ByteString
messageId Message
msg, $sel:msgTs:Message :: SystemTime
msgTs = Message -> SystemTime
messageTs Message
msg}
writeToJournal :: JournalMsgQueue s -> MsgQueueState -> Bool -> Message -> IO ()
writeToJournal JournalMsgQueue s
q st :: MsgQueueState
st@MsgQueueState {JournalState 'JTWrite
$sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState :: JournalState 'JTWrite
writeState, $sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size} Bool
canWrt' !Message
msg' = do
let msgStr :: ByteString
msgStr = Message -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode Message
msg' ByteString -> Char -> ByteString
`B.snoc` Char
'\n'
msgLen :: Int64
msgLen = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
B.length ByteString
msgStr
MsgQueueHandles
hs <- IO MsgQueueHandles
-> (MsgQueueHandles -> IO MsgQueueHandles)
-> Maybe MsgQueueHandles
-> IO MsgQueueHandles
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO MsgQueueHandles
createQueueDir MsgQueueHandles -> IO MsgQueueHandles
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe MsgQueueHandles -> IO MsgQueueHandles)
-> IO (Maybe MsgQueueHandles) -> IO MsgQueueHandles
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe MsgQueueHandles)
handles
(JournalState 'JTWrite
ws, Handle
wh) <- case MsgQueueHandles -> Maybe Handle
writeHandle MsgQueueHandles
hs of
Maybe Handle
Nothing | JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTWrite
writeState Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxMsgCount -> MsgQueueHandles -> IO (JournalState 'JTWrite, Handle)
switchWriteJournal MsgQueueHandles
hs
Maybe Handle
wh_ -> (JournalState 'JTWrite, Handle)
-> IO (JournalState 'JTWrite, Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (JournalState 'JTWrite
writeState, Handle -> Maybe Handle -> Handle
forall a. a -> Maybe a -> a
fromMaybe (MsgQueueHandles -> Handle
readHandle MsgQueueHandles
hs) Maybe Handle
wh_)
let msgPos' :: Int
msgPos' = JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTWrite
ws Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
bytePos' :: Int64
bytePos' = JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
msgLen
ws' :: JournalState 'JTWrite
ws' = JournalState 'JTWrite
ws {msgPos = msgPos', msgCount = msgPos', bytePos = bytePos', byteCount = bytePos'}
rs' :: JournalState 'JTRead
rs' = if JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs then JournalState 'JTRead
rs {msgCount = msgPos', byteCount = bytePos'} else JournalState 'JTRead
rs
!st' :: MsgQueueState
st' = MsgQueueState
st {writeState = ws', readState = rs', canWrite = canWrt', size = size + 1}
Handle -> Int64 -> ByteString -> IO ()
hAppend Handle
wh (JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws) ByteString
msgStr
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
updateQueueState JournalQueue s
q' JournalMsgQueue s
q Bool
logState MsgQueueHandles
hs MsgQueueState
st' (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
size Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Maybe (Message, Int64)))
-> Maybe (Maybe (Message, Int64)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
tipMsg JournalMsgQueue s
q) (Maybe (Maybe (Message, Int64)) -> STM ())
-> Maybe (Maybe (Message, Int64)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (Message, Int64) -> Maybe (Maybe (Message, Int64))
forall a. a -> Maybe a
Just ((Message, Int64) -> Maybe (Message, Int64)
forall a. a -> Maybe a
Just (Message
msg, Int64
msgLen))
where
JournalMsgQueue {$sel:queue:JournalMsgQueue :: forall (s :: QSType). JournalMsgQueue s -> JMQueue
queue = JMQueue {String
$sel:queueDirectory:JMQueue :: JMQueue -> String
queueDirectory :: String
queueDirectory, String
$sel:statePath:JMQueue :: JMQueue -> String
statePath :: String
statePath}, TVar (Maybe MsgQueueHandles)
$sel:handles:JournalMsgQueue :: forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles :: TVar (Maybe MsgQueueHandles)
handles} = JournalMsgQueue s
q
createQueueDir :: IO MsgQueueHandles
createQueueDir = do
Bool -> String -> IO ()
createDirectoryIfMissing Bool
True String
queueDirectory
Handle
sh <- String -> IOMode -> IO Handle
openFile String
statePath IOMode
AppendMode
Handle
rh <- String -> ByteString -> IO Handle
createNewJournal String
queueDirectory (ByteString -> IO Handle) -> ByteString -> IO Handle
forall a b. (a -> b) -> a -> b
$ JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs
let hs :: MsgQueueHandles
hs = MsgQueueHandles {$sel:stateHandle:MsgQueueHandles :: Handle
stateHandle = Handle
sh, $sel:readHandle:MsgQueueHandles :: Handle
readHandle = Handle
rh, $sel:writeHandle:MsgQueueHandles :: Maybe Handle
writeHandle = Maybe Handle
forall a. Maybe a
Nothing}
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe MsgQueueHandles) -> Maybe MsgQueueHandles -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe MsgQueueHandles)
handles (Maybe MsgQueueHandles -> STM ())
-> Maybe MsgQueueHandles -> STM ()
forall a b. (a -> b) -> a -> b
$ MsgQueueHandles -> Maybe MsgQueueHandles
forall a. a -> Maybe a
Just MsgQueueHandles
hs
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (JournalMsgStore s -> TVar Int
forall (s :: QSType). JournalMsgStore s -> TVar Int
openedQueueCount JournalMsgStore s
ms) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
MsgQueueHandles -> IO MsgQueueHandles
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MsgQueueHandles
hs
switchWriteJournal :: MsgQueueHandles -> IO (JournalState 'JTWrite, Handle)
switchWriteJournal MsgQueueHandles
hs = do
ByteString
journalId <- TVar StdGen -> IO ByteString
newJournalId (TVar StdGen -> IO ByteString) -> TVar StdGen -> IO ByteString
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s -> TVar StdGen
forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random JournalMsgStore s
ms
Handle
wh <- String -> ByteString -> IO Handle
createNewJournal String
queueDirectory ByteString
journalId
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe MsgQueueHandles) -> Maybe MsgQueueHandles -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe MsgQueueHandles)
handles (Maybe MsgQueueHandles -> STM ())
-> Maybe MsgQueueHandles -> STM ()
forall a b. (a -> b) -> a -> b
$ MsgQueueHandles -> Maybe MsgQueueHandles
forall a. a -> Maybe a
Just (MsgQueueHandles -> Maybe MsgQueueHandles)
-> MsgQueueHandles -> Maybe MsgQueueHandles
forall a b. (a -> b) -> a -> b
$ MsgQueueHandles
hs {writeHandle = Just wh}
(JournalState 'JTWrite, Handle)
-> IO (JournalState 'JTWrite, Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> JournalState 'JTWrite
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
journalId, Handle
wh)
setOverQuota_ :: JournalQueue s -> IO ()
setOverQuota_ :: JournalQueue s -> IO ()
setOverQuota_ JournalQueue s
q =
TVar (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. TVar a -> IO a
readTVarIO (JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' JournalQueue s
q)
IO (Maybe (JournalMsgQueue s))
-> (Maybe (JournalMsgQueue s) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (JournalMsgQueue s -> IO ()) -> Maybe (JournalMsgQueue s) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\JournalMsgQueue {TVar MsgQueueState
$sel:state:JournalMsgQueue :: forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state :: TVar MsgQueueState
state} -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar MsgQueueState -> (MsgQueueState -> MsgQueueState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar MsgQueueState
state ((MsgQueueState -> MsgQueueState) -> STM ())
-> (MsgQueueState -> MsgQueueState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \MsgQueueState
st -> MsgQueueState
st {canWrite = False})
getQueueSize_ :: JournalMsgQueue s -> StoreIO s Int
getQueueSize_ :: JournalMsgQueue s -> StoreIO s Int
getQueueSize_ JournalMsgQueue {TVar MsgQueueState
$sel:state:JournalMsgQueue :: forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state :: TVar MsgQueueState
state} = IO Int -> StoreIO s Int
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO Int -> StoreIO s Int) -> IO Int -> StoreIO s Int
forall a b. (a -> b) -> a -> b
$ MsgQueueState -> Int
size (MsgQueueState -> Int) -> IO MsgQueueState -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO TVar MsgQueueState
state
tryPeekMsg_ :: JournalQueue s -> JournalMsgQueue s -> StoreIO s (Maybe Message)
tryPeekMsg_ :: JournalQueue s -> JournalMsgQueue s -> StoreIO s (Maybe Message)
tryPeekMsg_ JournalQueue s
q mq :: JournalMsgQueue s
mq@JournalMsgQueue {TVar (Maybe (Maybe (Message, Int64)))
$sel:tipMsg:JournalMsgQueue :: forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
tipMsg :: TVar (Maybe (Maybe (Message, Int64)))
tipMsg, TVar (Maybe MsgQueueHandles)
$sel:handles:JournalMsgQueue :: forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles :: TVar (Maybe MsgQueueHandles)
handles} =
IO (Maybe Message) -> StoreIO s (Maybe Message)
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO (Maybe Message) -> StoreIO s (Maybe Message))
-> IO (Maybe Message) -> StoreIO s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ (TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe MsgQueueHandles)
handles IO (Maybe MsgQueueHandles)
-> (MsgQueueHandles -> IO (Maybe (JournalState 'JTRead, Handle)))
-> IO (Maybe (JournalState 'JTRead, Handle))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> IO (Maybe (JournalState 'JTRead, Handle))
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> IO (Maybe (JournalState 'JTRead, Handle))
chooseReadJournal JournalQueue s
q JournalMsgQueue s
mq Bool
True IO (Maybe (JournalState 'JTRead, Handle))
-> ((JournalState 'JTRead, Handle) -> IO (Maybe Message))
-> IO (Maybe Message)
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= (JournalState 'JTRead, Handle) -> IO (Maybe Message)
peekMsg)
where
peekMsg :: (JournalState 'JTRead, Handle) -> IO (Maybe Message)
peekMsg (JournalState 'JTRead
rs, Handle
h) = TVar (Maybe (Maybe (Message, Int64)))
-> IO (Maybe (Maybe (Message, Int64)))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Maybe (Message, Int64)))
tipMsg IO (Maybe (Maybe (Message, Int64)))
-> (Maybe (Maybe (Message, Int64)) -> IO (Maybe Message))
-> IO (Maybe Message)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe Message)
-> (Maybe (Message, Int64) -> IO (Maybe Message))
-> Maybe (Maybe (Message, Int64))
-> IO (Maybe Message)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (Maybe Message)
readMsg (Maybe Message -> IO (Maybe Message)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message -> IO (Maybe Message))
-> (Maybe (Message, Int64) -> Maybe Message)
-> Maybe (Message, Int64)
-> IO (Maybe Message)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Message, Int64) -> Message)
-> Maybe (Message, Int64) -> Maybe Message
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Message, Int64) -> Message
forall a b. (a, b) -> a
fst)
where
readMsg :: IO (Maybe Message)
readMsg = do
ml :: (Message, Int64)
ml@(Message
msg, Int64
_) <- Handle -> Int64 -> IO (Message, Int64)
hGetMsgAt Handle
h (Int64 -> IO (Message, Int64)) -> Int64 -> IO (Message, Int64)
forall a b. (a -> b) -> a -> b
$ JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Maybe (Message, Int64)))
-> Maybe (Maybe (Message, Int64)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Maybe (Message, Int64)))
tipMsg (Maybe (Maybe (Message, Int64)) -> STM ())
-> Maybe (Maybe (Message, Int64)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (Message, Int64) -> Maybe (Maybe (Message, Int64))
forall a. a -> Maybe a
Just ((Message, Int64) -> Maybe (Message, Int64)
forall a. a -> Maybe a
Just (Message, Int64)
ml)
Maybe Message -> IO (Maybe Message)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message -> IO (Maybe Message))
-> Maybe Message -> IO (Maybe Message)
forall a b. (a -> b) -> a -> b
$ Message -> Maybe Message
forall a. a -> Maybe a
Just Message
msg
tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s ()
tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s ()
tryDeleteMsg_ JournalQueue s
q mq :: JournalMsgQueue s
mq@JournalMsgQueue {TVar (Maybe (Maybe (Message, Int64)))
$sel:tipMsg:JournalMsgQueue :: forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
tipMsg :: TVar (Maybe (Maybe (Message, Int64)))
tipMsg, TVar (Maybe MsgQueueHandles)
$sel:handles:JournalMsgQueue :: forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles :: TVar (Maybe MsgQueueHandles)
handles} Bool
logState = IO () -> StoreIO s ()
forall (s :: QSType) a. IO a -> StoreIO s a
StoreIO (IO () -> StoreIO s ()) -> IO () -> StoreIO s ()
forall a b. (a -> b) -> a -> b
$ (IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`E.finally` Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
logState (JournalQueue s -> IO ()
forall (s :: QSType). JournalQueue s -> IO ()
updateActiveAt JournalQueue s
q)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar (Maybe (Maybe (Message, Int64)))
-> IO (Maybe (Maybe (Message, Int64)))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Maybe (Message, Int64)))
tipMsg
IO (Maybe (Maybe (Message, Int64)))
-> (Maybe (Message, Int64) -> IO (Maybe Int64)) -> IO (Maybe Int64)
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= (Maybe Int64 -> IO (Maybe Int64)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Int64 -> IO (Maybe Int64))
-> (Maybe (Message, Int64) -> Maybe Int64)
-> Maybe (Message, Int64)
-> IO (Maybe Int64)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Message, Int64) -> Int64)
-> Maybe (Message, Int64) -> Maybe Int64
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Message, Int64) -> Int64
forall a b. (a, b) -> b
snd)
IO (Maybe Int64) -> (Int64 -> IO (Maybe ())) -> IO (Maybe ())
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \Int64
len -> TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe MsgQueueHandles)
handles
IO (Maybe MsgQueueHandles)
-> (MsgQueueHandles -> IO (Maybe ())) -> IO (Maybe ())
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \MsgQueueHandles
hs -> JournalQueue s
-> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
updateReadPos JournalQueue s
q JournalMsgQueue s
mq Bool
logState Int64
len MsgQueueHandles
hs IO () -> Maybe () -> IO (Maybe ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> () -> Maybe ()
forall a. a -> Maybe a
Just ()
isolateQueue :: JournalMsgStore s -> JournalQueue s -> Text -> StoreIO s a -> ExceptT ErrorType IO a
isolateQueue :: forall a.
JournalMsgStore s
-> JournalQueue s -> Text -> StoreIO s a -> ExceptT ErrorType IO a
isolateQueue JournalMsgStore s
_ JournalQueue s
sq Text
op = Text -> RecipientId -> IO a -> ExceptT ErrorType IO a
forall a. Text -> RecipientId -> IO a -> ExceptT ErrorType IO a
tryStore' Text
op (JournalQueue s -> RecipientId
forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' JournalQueue s
sq) (IO a -> ExceptT ErrorType IO a)
-> (StoreIO s a -> IO a) -> StoreIO s a -> ExceptT ErrorType IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JournalQueue s -> Text -> IO a -> IO a
forall q a. StoreQueueClass q => q -> Text -> IO a -> IO a
forall a. JournalQueue s -> Text -> IO a -> IO a
withQueueLock JournalQueue s
sq Text
op (IO a -> IO a) -> (StoreIO s a -> IO a) -> StoreIO s a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StoreIO s a -> IO a
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO
unsafeRunStore :: JournalQueue s -> Text -> StoreIO s a -> IO a
unsafeRunStore :: forall a. JournalQueue s -> Text -> StoreIO s a -> IO a
unsafeRunStore JournalQueue s
sq Text
op StoreIO s a
a =
StoreIO s a -> IO a
forall (s :: QSType) a. StoreIO s a -> IO a
unStoreIO StoreIO s a
a IO a -> (SomeException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \SomeException
e -> Text -> RecipientId -> SomeException -> IO (Either ErrorType Any)
forall a.
Text -> RecipientId -> SomeException -> IO (Either ErrorType a)
storeError Text
op (JournalQueue s -> RecipientId
forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' JournalQueue s
sq) SomeException
e IO (Either ErrorType Any) -> IO a -> IO a
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> IO a
forall e a. Exception e => e -> IO a
E.throwIO SomeException
e
updateActiveAt :: JournalQueue s -> IO ()
updateActiveAt :: forall (s :: QSType). JournalQueue s -> IO ()
updateActiveAt JournalQueue s
q = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (SystemTime -> STM ()) -> SystemTime -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar Int64 -> Int64 -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalQueue s -> TVar Int64
forall (s :: QSType). JournalQueue s -> TVar Int64
activeAt JournalQueue s
q) (Int64 -> STM ()) -> (SystemTime -> Int64) -> SystemTime -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SystemTime -> Int64
systemSeconds (SystemTime -> IO ()) -> IO SystemTime -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO SystemTime
getSystemTime
tryStore' :: Text -> RecipientId -> IO a -> ExceptT ErrorType IO a
tryStore' :: forall a. Text -> RecipientId -> IO a -> ExceptT ErrorType IO a
tryStore' Text
op RecipientId
rId = Text
-> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
forall a.
Text
-> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
tryStore Text
op RecipientId
rId (IO (Either ErrorType a) -> ExceptT ErrorType IO a)
-> (IO a -> IO (Either ErrorType a))
-> IO a
-> ExceptT ErrorType IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Either ErrorType a) -> IO a -> IO (Either ErrorType a)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Either ErrorType a
forall a b. b -> Either a b
Right
tryStore :: forall a. Text -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
tryStore :: forall a.
Text
-> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
tryStore Text
op RecipientId
rId IO (Either ErrorType a)
a = IO (Either ErrorType a) -> ExceptT ErrorType IO a
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO (Either ErrorType a) -> ExceptT ErrorType IO a)
-> IO (Either ErrorType a) -> ExceptT ErrorType IO a
forall a b. (a -> b) -> a -> b
$ IO (Either ErrorType a) -> IO (Either ErrorType a)
forall a. IO a -> IO a
E.mask_ (IO (Either ErrorType a) -> IO (Either ErrorType a))
-> IO (Either ErrorType a) -> IO (Either ErrorType a)
forall a b. (a -> b) -> a -> b
$ IO (Either ErrorType a)
a IO (Either ErrorType a)
-> (SomeException -> IO (Either ErrorType a))
-> IO (Either ErrorType a)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` Text -> RecipientId -> SomeException -> IO (Either ErrorType a)
forall a.
Text -> RecipientId -> SomeException -> IO (Either ErrorType a)
storeError Text
op RecipientId
rId
storeError :: Text -> RecipientId -> E.SomeException -> IO (Either ErrorType a)
storeError :: forall a.
Text -> RecipientId -> SomeException -> IO (Either ErrorType a)
storeError Text
op RecipientId
rId SomeException
e =
let e' :: Text
e' = Text -> [Text] -> Text
T.intercalate Text
", " [Text
op, ByteString -> Text
decodeLatin1 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ RecipientId -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode RecipientId
rId, SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
e]
in Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text
"STORE: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
e') IO () -> Either ErrorType a -> IO (Either ErrorType a)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> ErrorType -> Either ErrorType a
forall a b. a -> Either a b
Left (Text -> ErrorType
STORE Text
e')
isolateQueueId :: Text -> JournalMsgStore s -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
isolateQueueId :: forall (s :: QSType) a.
Text
-> JournalMsgStore s
-> RecipientId
-> IO (Either ErrorType a)
-> ExceptT ErrorType IO a
isolateQueueId Text
op JournalMsgStore {TMap RecipientId Lock
$sel:queueLocks:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> TMap RecipientId Lock
queueLocks :: TMap RecipientId Lock
queueLocks, TMVar RecipientId
$sel:sharedLock:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> TMVar RecipientId
sharedLock :: TMVar RecipientId
sharedLock} RecipientId
rId =
Text
-> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
forall a.
Text
-> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
tryStore Text
op RecipientId
rId (IO (Either ErrorType a) -> ExceptT ErrorType IO a)
-> (IO (Either ErrorType a) -> IO (Either ErrorType a))
-> IO (Either ErrorType a)
-> ExceptT ErrorType IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RecipientId
-> TMap RecipientId Lock
-> TMVar RecipientId
-> Text
-> IO (Either ErrorType a)
-> IO (Either ErrorType a)
forall a.
RecipientId
-> TMap RecipientId Lock
-> TMVar RecipientId
-> Text
-> IO a
-> IO a
withLockMapWaitShared RecipientId
rId TMap RecipientId Lock
queueLocks TMVar RecipientId
sharedLock Text
op
openMsgQueue :: JournalMsgStore s -> JMQueue -> Bool -> IO (JournalMsgQueue s)
openMsgQueue :: forall (s :: QSType).
JournalMsgStore s -> JMQueue -> Bool -> IO (JournalMsgQueue s)
openMsgQueue ms :: JournalMsgStore s
ms@JournalMsgStore {JournalStoreConfig s
$sel:config:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config :: JournalStoreConfig s
config} q :: JMQueue
q@JMQueue {$sel:queueDirectory:JMQueue :: JMQueue -> String
queueDirectory = String
dir, String
$sel:statePath:JMQueue :: JMQueue -> String
statePath :: String
statePath} Bool
forWrite = do
(Maybe MsgQueueState
st_, Bool
shouldBackup) <- JournalMsgStore s -> String -> IO (Maybe MsgQueueState, Bool)
forall (s :: QSType).
JournalMsgStore s -> String -> IO (Maybe MsgQueueState, Bool)
readQueueState JournalMsgStore s
ms String
statePath
case Maybe MsgQueueState
st_ of
Maybe MsgQueueState
Nothing -> do
MsgQueueState
st <- ByteString -> MsgQueueState
newMsgQueueState (ByteString -> MsgQueueState) -> IO ByteString -> IO MsgQueueState
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar StdGen -> IO ByteString
newJournalId (JournalMsgStore s -> TVar StdGen
forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random JournalMsgStore s
ms)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldBackup (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
backupQueueState String
statePath
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
forall (s :: QSType).
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue JMQueue
q MsgQueueState
st Maybe MsgQueueHandles
forall a. Maybe a
Nothing
Just MsgQueueState
st
| MsgQueueState -> Int
size MsgQueueState
st Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 -> do
(MsgQueueState
st', Maybe MsgQueueHandles
hs_) <- MsgQueueState -> Bool -> IO (MsgQueueState, Maybe MsgQueueHandles)
removeJournals MsgQueueState
st Bool
shouldBackup
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe MsgQueueHandles -> Bool
forall a. Maybe a -> Bool
isJust Maybe MsgQueueHandles
hs_) IO ()
incOpenedCount
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
forall (s :: QSType).
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue JMQueue
q MsgQueueState
st' Maybe MsgQueueHandles
hs_
| Bool
otherwise -> do
Handle
sh <- MsgQueueState -> Bool -> IO Handle
openBackupQueueState MsgQueueState
st Bool
shouldBackup
(MsgQueueState
st', Handle
rh, Maybe Handle
wh_) <- Handle
-> IO (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. Handle -> IO a -> IO a
closeOnException Handle
sh (IO (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle))
-> IO (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s
-> String
-> MsgQueueState
-> Handle
-> IO (MsgQueueState, Handle, Maybe Handle)
forall (s :: QSType).
JournalMsgStore s
-> String
-> MsgQueueState
-> Handle
-> IO (MsgQueueState, Handle, Maybe Handle)
openJournals JournalMsgStore s
ms String
dir MsgQueueState
st Handle
sh
let hs :: MsgQueueHandles
hs = MsgQueueHandles {$sel:stateHandle:MsgQueueHandles :: Handle
stateHandle = Handle
sh, $sel:readHandle:MsgQueueHandles :: Handle
readHandle = Handle
rh, $sel:writeHandle:MsgQueueHandles :: Maybe Handle
writeHandle = Maybe Handle
wh_}
IO ()
incOpenedCount
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
forall (s :: QSType).
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue JMQueue
q MsgQueueState
st' (MsgQueueHandles -> Maybe MsgQueueHandles
forall a. a -> Maybe a
Just MsgQueueHandles
hs)
where
incOpenedCount :: IO ()
incOpenedCount = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (JournalMsgStore s -> TVar Int
forall (s :: QSType). JournalMsgStore s -> TVar Int
openedQueueCount JournalMsgStore s
ms) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
removeJournals :: MsgQueueState -> Bool -> IO (MsgQueueState, Maybe MsgQueueHandles)
removeJournals MsgQueueState {$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, $sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws} Bool
shouldBackup = IO (MsgQueueState, Maybe MsgQueueHandles)
-> IO (MsgQueueState, Maybe MsgQueueHandles)
forall a. IO a -> IO a
E.uninterruptibleMask_ (IO (MsgQueueState, Maybe MsgQueueHandles)
-> IO (MsgQueueState, Maybe MsgQueueHandles))
-> IO (MsgQueueState, Maybe MsgQueueHandles)
-> IO (MsgQueueState, Maybe MsgQueueHandles)
forall a b. (a -> b) -> a -> b
$ do
ByteString
rjId <- TVar StdGen -> IO ByteString
newJournalId (TVar StdGen -> IO ByteString) -> TVar StdGen -> IO ByteString
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s -> TVar StdGen
forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random JournalMsgStore s
ms
let st :: MsgQueueState
st = ByteString -> MsgQueueState
newMsgQueueState ByteString
rjId
Maybe MsgQueueHandles
hs_ <-
if Bool
forWrite
then MsgQueueHandles -> Maybe MsgQueueHandles
forall a. a -> Maybe a
Just (MsgQueueHandles -> Maybe MsgQueueHandles)
-> IO MsgQueueHandles -> IO (Maybe MsgQueueHandles)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MsgQueueState -> ByteString -> IO MsgQueueHandles
newJournalHandles MsgQueueState
st ByteString
rjId
else Maybe MsgQueueHandles
forall a. Maybe a
Nothing Maybe MsgQueueHandles -> IO () -> IO (Maybe MsgQueueHandles)
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ String -> IO ()
backupQueueState String
statePath
String -> JournalState 'JTRead -> IO ()
forall (t :: JournalType). String -> JournalState t -> IO ()
removeJournalIfExists String
dir JournalState 'JTRead
rs
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> JournalState 'JTWrite -> IO ()
forall (t :: JournalType). String -> JournalState t -> IO ()
removeJournalIfExists String
dir JournalState 'JTWrite
ws
(MsgQueueState, Maybe MsgQueueHandles)
-> IO (MsgQueueState, Maybe MsgQueueHandles)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st, Maybe MsgQueueHandles
hs_)
where
newJournalHandles :: MsgQueueState -> ByteString -> IO MsgQueueHandles
newJournalHandles MsgQueueState
st ByteString
rjId = do
Handle
sh <- MsgQueueState -> Bool -> IO Handle
openBackupQueueState MsgQueueState
st Bool
shouldBackup
Handle -> MsgQueueState -> IO ()
appendState_ Handle
sh MsgQueueState
st
Handle
rh <- Handle -> IO Handle -> IO Handle
forall a. Handle -> IO a -> IO a
closeOnException Handle
sh (IO Handle -> IO Handle) -> IO Handle -> IO Handle
forall a b. (a -> b) -> a -> b
$ String -> ByteString -> IO Handle
createNewJournal String
dir ByteString
rjId
MsgQueueHandles -> IO MsgQueueHandles
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MsgQueueHandles {$sel:stateHandle:MsgQueueHandles :: Handle
stateHandle = Handle
sh, $sel:readHandle:MsgQueueHandles :: Handle
readHandle = Handle
rh, $sel:writeHandle:MsgQueueHandles :: Maybe Handle
writeHandle = Maybe Handle
forall a. Maybe a
Nothing}
openBackupQueueState :: MsgQueueState -> Bool -> IO Handle
openBackupQueueState MsgQueueState
st Bool
shouldBackup
| Bool
shouldBackup = do
let tempBackup :: String
tempBackup = String
statePath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
".bak"
String -> String -> IO ()
renameFile String
statePath String
tempBackup
Handle
sh <- String -> IOMode -> IO Handle
openFile String
statePath IOMode
AppendMode
Handle -> IO () -> IO ()
forall a. Handle -> IO a -> IO a
closeOnException Handle
sh (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> MsgQueueState -> IO ()
appendState Handle
sh MsgQueueState
st
String -> IO ()
backupQueueState String
tempBackup
Handle -> IO Handle
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Handle
sh
| Bool
otherwise = String -> IOMode -> IO Handle
openFile String
statePath IOMode
AppendMode
backupQueueState :: String -> IO ()
backupQueueState String
path = do
UTCTime
ts <- IO UTCTime
getCurrentTime
String -> String -> IO ()
renameFile String
path (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> UTCTime -> String
stateBackupPath String
statePath UTCTime
ts
[UTCTime]
times <- [UTCTime] -> [UTCTime]
forall a. Ord a => [a] -> [a]
sort ([UTCTime] -> [UTCTime])
-> ([String] -> [UTCTime]) -> [String] -> [UTCTime]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String -> Maybe UTCTime) -> [String] -> [UTCTime]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe String -> Maybe UTCTime
backupPathTime ([String] -> [UTCTime]) -> IO [String] -> IO [UTCTime]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IO [String]
listDirectory String
dir
let toDelete :: [UTCTime]
toDelete = (UTCTime -> Bool) -> [UTCTime] -> [UTCTime]
forall a. (a -> Bool) -> [a] -> [a]
filter (UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< JournalMsgStore s -> UTCTime
forall (s :: QSType). JournalMsgStore s -> UTCTime
expireBackupsBefore JournalMsgStore s
ms) ([UTCTime] -> [UTCTime]) -> [UTCTime] -> [UTCTime]
forall a b. (a -> b) -> a -> b
$ Int -> [UTCTime] -> [UTCTime]
forall a. Int -> [a] -> [a]
take ([UTCTime] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [UTCTime]
times Int -> Int -> Int
forall a. Num a => a -> a -> a
- JournalStoreConfig s -> Int
forall (s :: QSType). JournalStoreConfig s -> Int
keepMinBackups JournalStoreConfig s
config) [UTCTime]
times
(UTCTime -> IO ()) -> [UTCTime] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Text -> String -> IO ()
safeRemoveFile Text
"removeBackups" (String -> IO ()) -> (UTCTime -> String) -> UTCTime -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> UTCTime -> String
stateBackupPath String
statePath) [UTCTime]
toDelete
where
backupPathTime :: FilePath -> Maybe UTCTime
backupPathTime :: String -> Maybe UTCTime
backupPathTime = String -> Maybe UTCTime
forall (m :: * -> *) t. (MonadFail m, ISO8601 t) => String -> m t
iso8601ParseM (String -> Maybe UTCTime)
-> (Text -> String) -> Text -> Maybe UTCTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> String
T.unpack (Text -> Maybe UTCTime)
-> (String -> Maybe Text) -> String -> Maybe UTCTime
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< Text -> Text -> Maybe Text
T.stripSuffix Text
".bak" (Text -> Maybe Text)
-> (String -> Maybe Text) -> String -> Maybe Text
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< Text -> Text -> Maybe Text
T.stripPrefix Text
statePathPfx (Text -> Maybe Text) -> (String -> Text) -> String -> Maybe Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack
statePathPfx :: Text
statePathPfx = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ ShowS
takeFileName String
statePath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"."
mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue :: forall (s :: QSType).
JMQueue
-> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
mkJournalQueue JMQueue
queue MsgQueueState
st Maybe MsgQueueHandles
hs_ = do
TVar MsgQueueState
state <- MsgQueueState -> IO (TVar MsgQueueState)
forall a. a -> IO (TVar a)
newTVarIO MsgQueueState
st
TVar (Maybe (Maybe (Message, Int64)))
tipMsg <- Maybe (Maybe (Message, Int64))
-> IO (TVar (Maybe (Maybe (Message, Int64))))
forall a. a -> IO (TVar a)
newTVarIO Maybe (Maybe (Message, Int64))
forall a. Maybe a
Nothing
TVar (Maybe MsgQueueHandles)
handles <- Maybe MsgQueueHandles -> IO (TVar (Maybe MsgQueueHandles))
forall a. a -> IO (TVar a)
newTVarIO Maybe MsgQueueHandles
hs_
JournalMsgQueue s -> IO (JournalMsgQueue s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JournalMsgQueue {JMQueue
$sel:queue:JournalMsgQueue :: JMQueue
queue :: JMQueue
queue, TVar MsgQueueState
$sel:state:JournalMsgQueue :: TVar MsgQueueState
state :: TVar MsgQueueState
state, TVar (Maybe (Maybe (Message, Int64)))
$sel:tipMsg:JournalMsgQueue :: TVar (Maybe (Maybe (Message, Int64)))
tipMsg :: TVar (Maybe (Maybe (Message, Int64)))
tipMsg, TVar (Maybe MsgQueueHandles)
$sel:handles:JournalMsgQueue :: TVar (Maybe MsgQueueHandles)
handles :: TVar (Maybe MsgQueueHandles)
handles}
chooseReadJournal :: JournalQueue s -> JournalMsgQueue s -> Bool -> MsgQueueHandles -> IO (Maybe (JournalState 'JTRead, Handle))
chooseReadJournal :: forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> IO (Maybe (JournalState 'JTRead, Handle))
chooseReadJournal JournalQueue s
q' JournalMsgQueue s
q Bool
log' MsgQueueHandles
hs = do
st :: MsgQueueState
st@MsgQueueState {$sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws, $sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs} <- TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q)
case MsgQueueHandles -> Maybe Handle
writeHandle MsgQueueHandles
hs of
Just Handle
wh | JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Bool -> Bool -> Bool
&& JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe MsgQueueHandles) -> Maybe MsgQueueHandles -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles JournalMsgQueue s
q) (Maybe MsgQueueHandles -> STM ())
-> Maybe MsgQueueHandles -> STM ()
forall a b. (a -> b) -> a -> b
$ MsgQueueHandles -> Maybe MsgQueueHandles
forall a. a -> Maybe a
Just MsgQueueHandles
hs {readHandle = wh, writeHandle = Nothing}
Handle -> IO ()
hClose (Handle -> IO ()) -> Handle -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgQueueHandles -> Handle
readHandle MsgQueueHandles
hs
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
log' (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> JournalState 'JTRead -> IO ()
forall (t :: JournalType). String -> JournalState t -> IO ()
removeJournal (JMQueue -> String
queueDirectory (JMQueue -> String) -> JMQueue -> String
forall a b. (a -> b) -> a -> b
$ JournalMsgQueue s -> JMQueue
forall (s :: QSType). JournalMsgQueue s -> JMQueue
queue JournalMsgQueue s
q) JournalState 'JTRead
rs
let !rs' :: JournalState 'JTRead
rs' = (ByteString -> JournalState 'JTRead
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState (ByteString -> JournalState 'JTRead)
-> ByteString -> JournalState 'JTRead
forall a b. (a -> b) -> a -> b
$ JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws) {msgCount = msgCount ws, byteCount = byteCount ws}
!st' :: MsgQueueState
st' = MsgQueueState
st {readState = rs'}
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
updateQueueState JournalQueue s
q' JournalMsgQueue s
q Bool
log' MsgQueueHandles
hs MsgQueueState
st' (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle)))
-> Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle))
forall a b. (a -> b) -> a -> b
$ (JournalState 'JTRead, Handle)
-> Maybe (JournalState 'JTRead, Handle)
forall a. a -> Maybe a
Just (JournalState 'JTRead
rs', Handle
wh)
Maybe Handle
_ | JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Bool -> Bool -> Bool
&& JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws -> Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JournalState 'JTRead, Handle)
forall a. Maybe a
Nothing
Maybe Handle
_ -> Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle)))
-> Maybe (JournalState 'JTRead, Handle)
-> IO (Maybe (JournalState 'JTRead, Handle))
forall a b. (a -> b) -> a -> b
$ (JournalState 'JTRead, Handle)
-> Maybe (JournalState 'JTRead, Handle)
forall a. a -> Maybe a
Just (JournalState 'JTRead
rs, MsgQueueHandles -> Handle
readHandle MsgQueueHandles
hs)
updateQueueState :: JournalQueue s -> JournalMsgQueue s -> Bool -> MsgQueueHandles -> MsgQueueState -> STM () -> IO ()
updateQueueState :: forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
updateQueueState JournalQueue s
q' JournalMsgQueue s
q Bool
log' MsgQueueHandles
hs MsgQueueState
st STM ()
a = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (MsgQueueState -> Bool
validQueueState MsgQueueState
st) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOError -> IO ()
forall e a. Exception e => e -> IO a
E.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"updateQueueState invalid state: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MsgQueueState -> String
forall a. Show a => a -> String
show MsgQueueState
st
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
log' (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> MsgQueueState -> IO ()
appendState (MsgQueueHandles -> Handle
stateHandle MsgQueueHandles
hs) MsgQueueState
st
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe QState) -> Maybe QState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalQueue s -> TVar (Maybe QState)
forall (s :: QSType). JournalQueue s -> TVar (Maybe QState)
queueState JournalQueue s
q') (Maybe QState -> STM ()) -> Maybe QState -> STM ()
forall a b. (a -> b) -> a -> b
$ QState -> Maybe QState
forall a. a -> Maybe a
Just (QState -> Maybe QState) -> QState -> Maybe QState
forall a b. (a -> b) -> a -> b
$! MsgQueueState -> QState
qState MsgQueueState
st
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar MsgQueueState -> MsgQueueState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q) MsgQueueState
st STM () -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM ()
a
appendState :: Handle -> MsgQueueState -> IO ()
appendState :: Handle -> MsgQueueState -> IO ()
appendState Handle
h = IO () -> IO ()
forall a. IO a -> IO a
E.uninterruptibleMask_ (IO () -> IO ())
-> (MsgQueueState -> IO ()) -> MsgQueueState -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> MsgQueueState -> IO ()
appendState_ Handle
h
{-# INLINE appendState #-}
appendState_ :: Handle -> MsgQueueState -> IO ()
appendState_ :: Handle -> MsgQueueState -> IO ()
appendState_ Handle
h MsgQueueState
st = Handle -> ByteString -> IO ()
B.hPutStr Handle
h (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgQueueState -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode MsgQueueState
st ByteString -> Char -> ByteString
`B.snoc` Char
'\n'
updateReadPos :: JournalQueue s -> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
updateReadPos :: forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
updateReadPos JournalQueue s
q' JournalMsgQueue s
q Bool
log' Int64
len MsgQueueHandles
hs = do
st :: MsgQueueState
st@MsgQueueState {$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size} <- TVar MsgQueueState -> IO MsgQueueState
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar MsgQueueState
forall (s :: QSType). JournalMsgQueue s -> TVar MsgQueueState
state JournalMsgQueue s
q)
let JournalState {Int
$sel:msgPos:JournalState :: forall (t :: JournalType). JournalState t -> Int
msgPos :: Int
msgPos, Int64
$sel:bytePos:JournalState :: forall (t :: JournalType). JournalState t -> Int64
bytePos :: Int64
bytePos} = JournalState 'JTRead
rs
let msgPos' :: Int
msgPos' = Int
msgPos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
rs' :: JournalState 'JTRead
rs' = JournalState 'JTRead
rs {msgPos = msgPos', bytePos = bytePos + len}
st' :: MsgQueueState
st' = MsgQueueState
st {readState = rs', size = size - 1}
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
forall (s :: QSType).
JournalQueue s
-> JournalMsgQueue s
-> Bool
-> MsgQueueHandles
-> MsgQueueState
-> STM ()
-> IO ()
updateQueueState JournalQueue s
q' JournalMsgQueue s
q Bool
log' MsgQueueHandles
hs MsgQueueState
st' (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Maybe (Message, Int64)))
-> Maybe (Maybe (Message, Int64)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe (Maybe (Message, Int64)))
tipMsg JournalMsgQueue s
q) Maybe (Maybe (Message, Int64))
forall a. Maybe a
Nothing
msgQueueDirectory :: JournalMsgStore s -> RecipientId -> FilePath
msgQueueDirectory :: forall (s :: QSType). JournalMsgStore s -> RecipientId -> String
msgQueueDirectory JournalMsgStore {$sel:config:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config = JournalStoreConfig {String
$sel:storePath:JournalStoreConfig :: forall (s :: QSType). JournalStoreConfig s -> String
storePath :: String
storePath, Int
$sel:pathParts:JournalStoreConfig :: forall (s :: QSType). JournalStoreConfig s -> Int
pathParts :: Int
pathParts}} RecipientId
rId =
String
storePath String -> ShowS
</> ByteString -> String
B.unpack (ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
"/" ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> [ByteString]
forall {t}. (Eq t, Num t) => t -> ByteString -> [ByteString]
splitSegments Int
pathParts (ByteString -> [ByteString]) -> ByteString -> [ByteString]
forall a b. (a -> b) -> a -> b
$ RecipientId -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode RecipientId
rId)
where
splitSegments :: t -> ByteString -> [ByteString]
splitSegments t
_ ByteString
"" = []
splitSegments t
1 ByteString
s = [ByteString
s]
splitSegments t
n ByteString
s =
let (ByteString
seg, ByteString
s') = Int -> ByteString -> (ByteString, ByteString)
B.splitAt Int
2 ByteString
s
in ByteString
seg ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: t -> ByteString -> [ByteString]
splitSegments (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1) ByteString
s'
msgQueueStatePath :: FilePath -> RecipientId -> FilePath
msgQueueStatePath :: String -> RecipientId -> String
msgQueueStatePath String
dir RecipientId
rId = String
dir String -> ShowS
</> (String
queueLogFileName String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
B.unpack (RecipientId -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode RecipientId
rId) String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
logFileExt)
createNewJournal :: FilePath -> ByteString -> IO Handle
createNewJournal :: String -> ByteString -> IO Handle
createNewJournal String
dir ByteString
journalId = do
let path :: String
path = String -> ByteString -> String
journalFilePath String
dir ByteString
journalId
Handle
h <- String -> IOMode -> IO Handle
openFile String
path IOMode
ReadWriteMode
Handle -> ByteString -> IO ()
B.hPutStr Handle
h ByteString
""
Handle -> IO Handle
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Handle
h
newJournalId :: TVar StdGen -> IO ByteString
newJournalId :: TVar StdGen -> IO ByteString
newJournalId TVar StdGen
g = ByteString -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode (ByteString -> ByteString) -> IO ByteString -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM ByteString -> IO ByteString
forall a. STM a -> IO a
atomically (TVar StdGen -> (StdGen -> (ByteString, StdGen)) -> STM ByteString
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar StdGen
g ((StdGen -> (ByteString, StdGen)) -> STM ByteString)
-> (StdGen -> (ByteString, StdGen)) -> STM ByteString
forall a b. (a -> b) -> a -> b
$ Int -> StdGen -> (ByteString, StdGen)
forall g. RandomGen g => Int -> g -> (ByteString, g)
genByteString Int
12)
openJournals :: JournalMsgStore s -> FilePath -> MsgQueueState -> Handle -> IO (MsgQueueState, Handle, Maybe Handle)
openJournals :: forall (s :: QSType).
JournalMsgStore s
-> String
-> MsgQueueState
-> Handle
-> IO (MsgQueueState, Handle, Maybe Handle)
openJournals JournalMsgStore s
ms String
dir st :: MsgQueueState
st@MsgQueueState {$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, $sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws} Handle
sh = do
let rjId :: ByteString
rjId = JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs
wjId :: ByteString
wjId = JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws
JournalState 'JTRead -> IO (Either String Handle)
forall (t :: JournalType).
JournalState t -> IO (Either String Handle)
openJournal JournalState 'JTRead
rs IO (Either String Handle)
-> (Either String Handle
-> IO (MsgQueueState, Handle, Maybe Handle))
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left String
path
| ByteString
rjId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
wjId -> do
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: openJournals, no read/write file - creating new file, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path
IO (MsgQueueState, Handle, Maybe Handle)
newReadJournal
| Bool
otherwise -> do
let rs' :: JournalState 'JTRead
rs' = (ByteString -> JournalState 'JTRead
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
wjId) {msgCount = msgCount ws, byteCount = byteCount ws}
st' :: MsgQueueState
st' = MsgQueueState
st {readState = rs', size = msgCount ws}
JournalState 'JTRead -> IO (Either String Handle)
forall (t :: JournalType).
JournalState t -> IO (Either String Handle)
openJournal JournalState 'JTRead
rs' IO (Either String Handle)
-> (Either String Handle
-> IO (MsgQueueState, Handle, Maybe Handle))
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left String
path' -> do
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: openJournals, no read and write files - creating new file, read: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", write: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path'
IO (MsgQueueState, Handle, Maybe Handle)
newReadJournal
Right Handle
rh -> do
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: openJournals, no read file - switched to write file, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path
Handle -> IO () -> IO ()
forall a. Handle -> IO a -> IO a
closeOnException Handle
rh (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> Int64 -> IO ()
fixFileSize Handle
rh (Int64 -> IO ()) -> Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws
(MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st', Handle
rh, Maybe Handle
forall a. Maybe a
Nothing)
Right Handle
rh
| ByteString
rjId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
wjId -> do
Handle -> IO () -> IO ()
forall a. Handle -> IO a -> IO a
closeOnException Handle
rh (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> Int64 -> IO ()
fixFileSize Handle
rh (Int64 -> IO ()) -> Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws
(MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st, Handle
rh, Maybe Handle
forall a. Maybe a
Nothing)
| Bool
otherwise -> Handle
-> IO (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. Handle -> IO a -> IO a
closeOnException Handle
rh (IO (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle))
-> IO (MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a b. (a -> b) -> a -> b
$ do
Handle -> Int64 -> IO ()
fixFileSize Handle
rh (Int64 -> IO ()) -> Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs
JournalState 'JTWrite -> IO (Either String Handle)
forall (t :: JournalType).
JournalState t -> IO (Either String Handle)
openJournal JournalState 'JTWrite
ws IO (Either String Handle)
-> (Either String Handle
-> IO (MsgQueueState, Handle, Maybe Handle))
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left String
path -> do
let msgs :: Int
msgs = JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs
bytes :: Int64
bytes = JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs
size' :: Int
size' = Int
msgs Int -> Int -> Int
forall a. Num a => a -> a -> a
- JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs
ws' :: JournalState 'JTWrite
ws' = (ByteString -> JournalState 'JTWrite
forall (t :: JournalType).
JournalTypeI t =>
ByteString -> JournalState t
newJournalState ByteString
rjId) {msgPos = msgs, msgCount = msgs, bytePos = bytes, byteCount = bytes}
st' :: MsgQueueState
st' = MsgQueueState
st {writeState = ws', size = size'}
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: openJournals, no write file, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path
(MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st', Handle
rh, Maybe Handle
forall a. Maybe a
Nothing)
Right Handle
wh -> do
Handle -> IO () -> IO ()
forall a. Handle -> IO a -> IO a
closeOnException Handle
wh (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> Int64 -> IO ()
fixFileSize Handle
wh (Int64 -> IO ()) -> Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws
(MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st, Handle
rh, Handle -> Maybe Handle
forall a. a -> Maybe a
Just Handle
wh)
where
newReadJournal :: IO (MsgQueueState, Handle, Maybe Handle)
newReadJournal = do
ByteString
rjId' <- TVar StdGen -> IO ByteString
newJournalId (TVar StdGen -> IO ByteString) -> TVar StdGen -> IO ByteString
forall a b. (a -> b) -> a -> b
$ JournalMsgStore s -> TVar StdGen
forall (s :: QSType). JournalMsgStore s -> TVar StdGen
random JournalMsgStore s
ms
Handle
rh <- String -> ByteString -> IO Handle
createNewJournal String
dir ByteString
rjId'
let st' :: MsgQueueState
st' = ByteString -> MsgQueueState
newMsgQueueState ByteString
rjId'
Handle -> IO () -> IO ()
forall a. Handle -> IO a -> IO a
closeOnException Handle
rh (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> MsgQueueState -> IO ()
appendState Handle
sh MsgQueueState
st'
(MsgQueueState, Handle, Maybe Handle)
-> IO (MsgQueueState, Handle, Maybe Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState
st', Handle
rh, Maybe Handle
forall a. Maybe a
Nothing)
openJournal :: JournalState t -> IO (Either FilePath Handle)
openJournal :: forall (t :: JournalType).
JournalState t -> IO (Either String Handle)
openJournal JournalState {ByteString
$sel:journalId:JournalState :: forall (t :: JournalType). JournalState t -> ByteString
journalId :: ByteString
journalId} =
let path :: String
path = String -> ByteString -> String
journalFilePath String
dir ByteString
journalId
in IO Bool
-> IO (Either String Handle)
-> IO (Either String Handle)
-> IO (Either String Handle)
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (String -> IO Bool
doesFileExist String
path) (Handle -> Either String Handle
forall a b. b -> Either a b
Right (Handle -> Either String Handle)
-> IO Handle -> IO (Either String Handle)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IOMode -> IO Handle
openFile String
path IOMode
ReadWriteMode) (Either String Handle -> IO (Either String Handle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String Handle -> IO (Either String Handle))
-> Either String Handle -> IO (Either String Handle)
forall a b. (a -> b) -> a -> b
$ String -> Either String Handle
forall a b. a -> Either a b
Left String
path)
fixFileSize :: Handle -> Int64 -> IO ()
fixFileSize :: Handle -> Int64 -> IO ()
fixFileSize Handle
h Int64
pos = do
let pos' :: Integer
pos' = Int64 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
pos
Integer
size <- Handle -> IO Integer
IO.hFileSize Handle
h
if
| Integer
size Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Integer
pos' -> do
String
name <- Handle -> IO String
IO.hShow Handle
h
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logWarn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: fixFileSize, size " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Integer -> Text
forall a. Show a => a -> Text
tshow Integer
size Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" > pos " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int64 -> Text
forall a. Show a => a -> Text
tshow Int64
pos Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" - truncating, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
name
Handle -> Integer -> IO ()
IO.hSetFileSize Handle
h Integer
pos'
| Integer
size Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
< Integer
pos' -> do
String
name <- Handle -> IO String
IO.hShow Handle
h
IOError -> IO ()
forall e a. Exception e => e -> IO a
E.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"fixFileSize size " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Integer -> String
forall a. Show a => a -> String
show Integer
size String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" < pos " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int64 -> String
forall a. Show a => a -> String
show Int64
pos String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" - aborting: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
name
| Bool
otherwise -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
removeJournal :: FilePath -> JournalState t -> IO ()
removeJournal :: forall (t :: JournalType). String -> JournalState t -> IO ()
removeJournal String
dir JournalState {ByteString
$sel:journalId:JournalState :: forall (t :: JournalType). JournalState t -> ByteString
journalId :: ByteString
journalId} =
Text -> String -> IO ()
safeRemoveFile Text
"removeJournal" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ByteString -> String
journalFilePath String
dir ByteString
journalId
removeJournalIfExists :: FilePath -> JournalState t -> IO ()
removeJournalIfExists :: forall (t :: JournalType). String -> JournalState t -> IO ()
removeJournalIfExists String
dir JournalState {ByteString
$sel:journalId:JournalState :: forall (t :: JournalType). JournalState t -> ByteString
journalId :: ByteString
journalId} = do
let path :: String
path = String -> ByteString -> String
journalFilePath String
dir ByteString
journalId
Text -> String -> IO () -> IO ()
handleError Text
"removeJournalIfExists" String
path (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO Bool -> IO () -> IO ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (String -> IO Bool
doesFileExist String
path) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
removeFile String
path
safeRemoveFile :: Text -> FilePath -> IO ()
safeRemoveFile :: Text -> String -> IO ()
safeRemoveFile Text
cxt String
path = Text -> String -> IO () -> IO ()
handleError Text
cxt String
path (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
removeFile String
path
handleError :: Text -> FilePath -> IO () -> IO ()
handleError :: Text -> String -> IO () -> IO ()
handleError Text
cxt String
path IO ()
a =
IO ()
a IO () -> (forall e. Exception e => e -> IO ()) -> IO ()
forall a. IO a -> (forall e. Exception e => e -> IO a) -> IO a
`catchAny` \e
e -> Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
cxt Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
path Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> e -> Text
forall a. Show a => a -> Text
tshow e
e
readQueueState :: JournalMsgStore s -> FilePath -> IO (Maybe MsgQueueState, Bool)
readQueueState :: forall (s :: QSType).
JournalMsgStore s -> String -> IO (Maybe MsgQueueState, Bool)
readQueueState JournalMsgStore {JournalStoreConfig s
$sel:config:JournalMsgStore :: forall (s :: QSType). JournalMsgStore s -> JournalStoreConfig s
config :: JournalStoreConfig s
config} String
statePath =
IO Bool
-> IO (Maybe MsgQueueState, Bool)
-> IO (Maybe MsgQueueState, Bool)
-> IO (Maybe MsgQueueState, Bool)
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
(String -> IO Bool
doesFileExist String
tempBackup)
(String -> String -> IO ()
renameFile String
tempBackup String
statePath IO ()
-> IO (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Maybe MsgQueueState, Bool)
readState)
(IO Bool
-> IO (Maybe MsgQueueState, Bool)
-> IO (Maybe MsgQueueState, Bool)
-> IO (Maybe MsgQueueState, Bool)
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (String -> IO Bool
doesFileExist String
statePath) IO (Maybe MsgQueueState, Bool)
readState (IO (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool))
-> IO (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a b. (a -> b) -> a -> b
$ (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe MsgQueueState
forall a. Maybe a
Nothing, Bool
False))
where
tempBackup :: String
tempBackup = String
statePath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
".bak"
readState :: IO (Maybe MsgQueueState, Bool)
readState = do
[ByteString]
ls <- ByteString -> [ByteString]
B.lines (ByteString -> [ByteString]) -> IO ByteString -> IO [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ByteString
readFileTail
case [ByteString]
ls of
[] -> do
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logWarn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: readWriteQueueState, empty queue state, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
statePath
(Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe MsgQueueState
forall a. Maybe a
Nothing, Bool
False)
[ByteString]
_ -> do
(Maybe MsgQueueState, Bool)
r <- Int -> Bool -> [ByteString] -> IO (Maybe MsgQueueState, Bool)
useLastLine ([ByteString] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
ls) Bool
True [ByteString]
ls
Maybe MsgQueueState -> (MsgQueueState -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ((Maybe MsgQueueState, Bool) -> Maybe MsgQueueState
forall a b. (a, b) -> a
fst (Maybe MsgQueueState, Bool)
r) ((MsgQueueState -> IO ()) -> IO ())
-> (MsgQueueState -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MsgQueueState
st ->
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (MsgQueueState -> Bool
validQueueState MsgQueueState
st) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOError -> IO ()
forall e a. Exception e => e -> IO a
E.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"readWriteQueueState inconsistent state: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MsgQueueState -> String
forall a. Show a => a -> String
show MsgQueueState
st
(Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe MsgQueueState, Bool)
r
useLastLine :: Int -> Bool -> [ByteString] -> IO (Maybe MsgQueueState, Bool)
useLastLine Int
len Bool
isLastLine [ByteString]
ls = case ByteString -> Either String MsgQueueState
forall a. StrEncoding a => ByteString -> Either String a
strDecode (ByteString -> Either String MsgQueueState)
-> ByteString -> Either String MsgQueueState
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
forall a. (?callStack::CallStack) => [a] -> a
last [ByteString]
ls of
Right MsgQueueState
st ->
let shouldBackup :: Bool
shouldBackup = Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> JournalStoreConfig s -> Int
forall (s :: QSType). JournalStoreConfig s -> Int
maxStateLines JournalStoreConfig s
config Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
isLastLine
in (Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgQueueState -> Maybe MsgQueueState
forall a. a -> Maybe a
Just MsgQueueState
st, Bool
shouldBackup)
Left String
e
| Bool
isLastLine -> case [ByteString] -> [ByteString]
forall a. (?callStack::CallStack) => [a] -> [a]
init [ByteString]
ls of
[] -> do
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logWarn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: readWriteQueueState, invalid 1-line queue state - initialized, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
statePath
(Maybe MsgQueueState, Bool) -> IO (Maybe MsgQueueState, Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe MsgQueueState
forall a. Maybe a
Nothing, Bool
True)
[ByteString]
ls' -> do
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logWarn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: readWriteQueueState, invalid last line in queue state - using the previous line, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
statePath
Int -> Bool -> [ByteString] -> IO (Maybe MsgQueueState, Bool)
useLastLine Int
len Bool
False [ByteString]
ls'
| Bool
otherwise -> IOError -> IO (Maybe MsgQueueState, Bool)
forall e a. Exception e => e -> IO a
E.throwIO (IOError -> IO (Maybe MsgQueueState, Bool))
-> IOError -> IO (Maybe MsgQueueState, Bool)
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"readWriteQueueState invalid state " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
statePath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
": " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ShowS
forall a. Show a => a -> String
show String
e
readFileTail :: IO ByteString
readFileTail =
String -> IOMode -> (Handle -> IO ByteString) -> IO ByteString
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
IO.withFile String
statePath IOMode
ReadMode ((Handle -> IO ByteString) -> IO ByteString)
-> (Handle -> IO ByteString) -> IO ByteString
forall a b. (a -> b) -> a -> b
$ \Handle
h -> do
Integer
size <- Handle -> IO Integer
IO.hFileSize Handle
h
let sz :: Int
sz = JournalStoreConfig s -> Int
forall (s :: QSType). JournalStoreConfig s -> Int
stateTailSize JournalStoreConfig s
config
sz' :: Integer
sz' = Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
sz
if Integer
size Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Integer
sz'
then Handle -> SeekMode -> Integer -> IO ()
IO.hSeek Handle
h SeekMode
AbsoluteSeek (Integer
size Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
sz') IO () -> IO ByteString -> IO ByteString
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Handle -> Int -> IO ByteString
B.hGet Handle
h Int
sz
else Handle -> Int -> IO ByteString
B.hGet Handle
h (Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
size)
stateBackupPath :: FilePath -> UTCTime -> FilePath
stateBackupPath :: String -> UTCTime -> String
stateBackupPath String
statePath UTCTime
ts = String
statePath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> UTCTime -> String
forall t. ISO8601 t => t -> String
iso8601Show UTCTime
ts String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
".bak"
validQueueState :: MsgQueueState -> Bool
validQueueState :: MsgQueueState -> Bool
validQueueState MsgQueueState {$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, $sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size}
| JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws =
Bool
alwaysValid
Bool -> Bool -> Bool
&& JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTWrite
ws
Bool -> Bool -> Bool
&& JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTWrite
ws
Bool -> Bool -> Bool
&& JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws
Bool -> Bool -> Bool
&& JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTWrite
ws
Bool -> Bool -> Bool
&& Int
size Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Int -> Int -> Int
forall a. Num a => a -> a -> a
- JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs
| Bool
otherwise =
Bool
alwaysValid
Bool -> Bool -> Bool
&& Int
size Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTWrite
ws Int -> Int -> Int
forall a. Num a => a -> a -> a
+ JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs Int -> Int -> Int
forall a. Num a => a -> a -> a
- JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs
where
alwaysValid :: Bool
alwaysValid =
JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTRead
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= JournalState 'JTRead -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTRead
rs
Bool -> Bool -> Bool
&& JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs
Bool -> Bool -> Bool
&& JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgPos JournalState 'JTWrite
ws Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> Int
forall (t :: JournalType). JournalState t -> Int
msgCount JournalState 'JTWrite
ws
Bool -> Bool -> Bool
&& JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTWrite
ws
deleteQueue_ :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
deleteQueue_ :: forall (s :: QSType).
JournalMsgStore s
-> JournalQueue s
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
deleteQueue_ JournalMsgStore s
ms JournalQueue s
q =
ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s))
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s))
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))))
-> ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s))
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall a b. (a -> b) -> a -> b
$ Text
-> JournalMsgStore s
-> RecipientId
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
-> ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s))
forall (s :: QSType) a.
Text
-> JournalMsgStore s
-> RecipientId
-> IO (Either ErrorType a)
-> ExceptT ErrorType IO a
isolateQueueId Text
"deleteQueue_" JournalMsgStore s
ms RecipientId
rId (IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
-> ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s)))
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
-> ExceptT ErrorType IO (QueueRec, Maybe (JournalMsgQueue s))
forall a b. (a -> b) -> a -> b
$ do
Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))
r <- QStore s -> JournalQueue s -> IO (Either ErrorType QueueRec)
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType QueueRec)
deleteStoreQueue (JournalMsgStore s -> QStore s
forall (s :: QSType). JournalMsgStore s -> QStore s
queueStore_ JournalMsgStore s
ms) JournalQueue s
q IO (Either ErrorType QueueRec)
-> (Either ErrorType QueueRec
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))))
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (QueueRec -> IO (QueueRec, Maybe (JournalMsgQueue s)))
-> Either ErrorType QueueRec
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Either ErrorType a -> m (Either ErrorType b)
mapM QueueRec -> IO (QueueRec, Maybe (JournalMsgQueue s))
remove
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ RecipientId -> TMap RecipientId Lock -> STM ()
forall k a. Ord k => k -> TMap k a -> STM ()
TM.delete RecipientId
rId (JournalMsgStore s -> TMap RecipientId Lock
forall (s :: QSType). JournalMsgStore s -> TMap RecipientId Lock
queueLocks JournalMsgStore s
ms)
Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))
-> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))
r
where
rId :: RecipientId
rId = JournalQueue s -> RecipientId
forall q. StoreQueueClass q => q -> RecipientId
recipientId JournalQueue s
q
remove :: QueueRec -> IO (QueueRec, Maybe (JournalMsgQueue s))
remove QueueRec
qr = do
Maybe (JournalMsgQueue s)
mq_ <- STM (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. STM a -> IO a
atomically (STM (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s)))
-> STM (Maybe (JournalMsgQueue s))
-> IO (Maybe (JournalMsgQueue s))
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (JournalMsgQueue s))
-> Maybe (JournalMsgQueue s) -> STM (Maybe (JournalMsgQueue s))
forall a. TVar a -> a -> STM a
swapTVar (JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' JournalQueue s
q) Maybe (JournalMsgQueue s)
forall a. Maybe a
Nothing
(JournalMsgQueue s -> IO ()) -> Maybe (JournalMsgQueue s) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JournalMsgStore s -> JournalMsgQueue s -> IO ()
forall (s :: QSType).
JournalMsgStore s -> JournalMsgQueue s -> IO ()
closeMsgQueueHandles JournalMsgStore s
ms) Maybe (JournalMsgQueue s)
mq_
JournalMsgStore s -> RecipientId -> IO ()
forall (s :: QSType). JournalMsgStore s -> RecipientId -> IO ()
removeQueueDirectory JournalMsgStore s
ms RecipientId
rId
(QueueRec, Maybe (JournalMsgQueue s))
-> IO (QueueRec, Maybe (JournalMsgQueue s))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (QueueRec
qr, Maybe (JournalMsgQueue s)
mq_)
closeMsgQueue :: JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue :: forall (s :: QSType). JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue JournalMsgStore s
ms JournalQueue {TVar (Maybe (JournalMsgQueue s))
$sel:msgQueue':JournalQueue :: forall (s :: QSType).
JournalQueue s -> TVar (Maybe (JournalMsgQueue s))
msgQueue' :: TVar (Maybe (JournalMsgQueue s))
msgQueue'} = STM (Maybe (JournalMsgQueue s)) -> IO (Maybe (JournalMsgQueue s))
forall a. STM a -> IO a
atomically (TVar (Maybe (JournalMsgQueue s))
-> Maybe (JournalMsgQueue s) -> STM (Maybe (JournalMsgQueue s))
forall a. TVar a -> a -> STM a
swapTVar TVar (Maybe (JournalMsgQueue s))
msgQueue' Maybe (JournalMsgQueue s)
forall a. Maybe a
Nothing) IO (Maybe (JournalMsgQueue s))
-> (Maybe (JournalMsgQueue s) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (JournalMsgQueue s -> IO ()) -> Maybe (JournalMsgQueue s) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JournalMsgStore s -> JournalMsgQueue s -> IO ()
forall (s :: QSType).
JournalMsgStore s -> JournalMsgQueue s -> IO ()
closeMsgQueueHandles JournalMsgStore s
ms)
closeMsgQueueHandles :: JournalMsgStore s -> JournalMsgQueue s -> IO ()
closeMsgQueueHandles :: forall (s :: QSType).
JournalMsgStore s -> JournalMsgQueue s -> IO ()
closeMsgQueueHandles JournalMsgStore s
ms JournalMsgQueue s
q = TVar (Maybe MsgQueueHandles) -> IO (Maybe MsgQueueHandles)
forall a. TVar a -> IO a
readTVarIO (JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
forall (s :: QSType).
JournalMsgQueue s -> TVar (Maybe MsgQueueHandles)
handles JournalMsgQueue s
q) IO (Maybe MsgQueueHandles)
-> (Maybe MsgQueueHandles -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MsgQueueHandles -> IO ()) -> Maybe MsgQueueHandles -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MsgQueueHandles -> IO ()
closeHandles
where
closeHandles :: MsgQueueHandles -> IO ()
closeHandles (MsgQueueHandles Handle
sh Handle
rh Maybe Handle
wh_) = do
Handle -> IO ()
hClose Handle
sh
Handle -> IO ()
hClose Handle
rh
(Handle -> IO ()) -> Maybe Handle -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Handle -> IO ()
hClose Maybe Handle
wh_
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (JournalMsgStore s -> TVar Int
forall (s :: QSType). JournalMsgStore s -> TVar Int
openedQueueCount JournalMsgStore s
ms) (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
removeQueueDirectory :: JournalMsgStore s -> RecipientId -> IO ()
removeQueueDirectory :: forall (s :: QSType). JournalMsgStore s -> RecipientId -> IO ()
removeQueueDirectory JournalMsgStore s
st = String -> IO ()
removeQueueDirectory_ (String -> IO ())
-> (RecipientId -> String) -> RecipientId -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JournalMsgStore s -> RecipientId -> String
forall (s :: QSType). JournalMsgStore s -> RecipientId -> String
msgQueueDirectory JournalMsgStore s
st
removeQueueDirectory_ :: FilePath -> IO ()
removeQueueDirectory_ :: String -> IO ()
removeQueueDirectory_ String
dir =
Text -> String -> IO () -> IO ()
handleError Text
"removeQueueDirectory" String
dir (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
removePathForcibly String
dir
hAppend :: Handle -> Int64 -> ByteString -> IO ()
hAppend :: Handle -> Int64 -> ByteString -> IO ()
hAppend Handle
h Int64
pos ByteString
s = do
Handle -> Int64 -> IO ()
fixFileSize Handle
h Int64
pos
Handle -> SeekMode -> Integer -> IO ()
IO.hSeek Handle
h SeekMode
SeekFromEnd Integer
0
Handle -> ByteString -> IO ()
B.hPutStr Handle
h ByteString
s
hGetMsgAt :: Handle -> Int64 -> IO (Message, Int64)
hGetMsgAt :: Handle -> Int64 -> IO (Message, Int64)
hGetMsgAt Handle
h Int64
pos = do
Handle -> SeekMode -> Integer -> IO ()
IO.hSeek Handle
h SeekMode
AbsoluteSeek (Integer -> IO ()) -> Integer -> IO ()
forall a b. (a -> b) -> a -> b
$ Int64 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
pos
ByteString
s <- Handle -> IO ByteString
B.hGetLine Handle
h
case ByteString -> Either String Message
forall a. StrEncoding a => ByteString -> Either String a
strDecode ByteString
s of
Right !Message
msg ->
let !len :: Int64
len = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
B.length ByteString
s) Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
1
in (Message, Int64) -> IO (Message, Int64)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Message
msg, Int64
len)
Left String
e -> IOError -> IO (Message, Int64)
forall e a. Exception e => e -> IO a
E.throwIO (IOError -> IO (Message, Int64)) -> IOError -> IO (Message, Int64)
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"hGetMsgAt invalid message: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
e
openFile :: FilePath -> IOMode -> IO Handle
openFile :: String -> IOMode -> IO Handle
openFile String
f IOMode
mode = do
Handle
h <- String -> IOMode -> IO Handle
IO.openFile String
f IOMode
mode
Handle -> BufferMode -> IO ()
IO.hSetBuffering Handle
h BufferMode
LineBuffering
Handle -> IO Handle
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Handle
h
hClose :: Handle -> IO ()
hClose :: Handle -> IO ()
hClose Handle
h =
Handle -> IO ()
IO.hClose Handle
h IO () -> (forall e. Exception e => e -> IO ()) -> IO ()
forall a. IO a -> (forall e. Exception e => e -> IO a) -> IO a
`catchAny` \e
e -> do
String
name <- Handle -> IO String
IO.hShow Handle
h
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"STORE: hClose, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> e -> Text
forall a. Show a => a -> Text
tshow e
e
closeOnException :: Handle -> IO a -> IO a
closeOnException :: forall a. Handle -> IO a -> IO a
closeOnException Handle
h IO a
a = IO a
a IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`E.onException` Handle -> IO ()
hClose Handle
h
getJournalQueueMessages :: JournalMsgStore s -> JournalQueue s -> IO [Message]
getJournalQueueMessages :: forall (s :: QSType).
JournalMsgStore s -> JournalQueue s -> IO [Message]
getJournalQueueMessages JournalMsgStore s
ms JournalQueue s
q =
JournalMsgStore s -> String -> IO (Maybe MsgQueueState, Bool)
forall (s :: QSType).
JournalMsgStore s -> String -> IO (Maybe MsgQueueState, Bool)
readQueueState JournalMsgStore s
ms (String -> RecipientId -> String
msgQueueStatePath String
dir RecipientId
rId) IO (Maybe MsgQueueState, Bool)
-> ((Maybe MsgQueueState, Bool) -> IO [Message]) -> IO [Message]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Just MsgQueueState {$sel:readState:MsgQueueState :: MsgQueueState -> JournalState 'JTRead
readState = JournalState 'JTRead
rs, $sel:writeState:MsgQueueState :: MsgQueueState -> JournalState 'JTWrite
writeState = JournalState 'JTWrite
ws, Int
$sel:size:MsgQueueState :: MsgQueueState -> Int
size :: Int
size}, Bool
_) | Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 -> do
[Message]
msgs <- ByteString -> Int64 -> Int64 -> IO [Message]
getMsgs (JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs) (JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTRead
rs) (JournalState 'JTRead -> Int64
forall (t :: JournalType). JournalState t -> Int64
byteCount JournalState 'JTRead
rs)
if JournalState 'JTRead -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTRead
rs ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws
then [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Message]
msgs
else ([Message]
msgs [Message] -> [Message] -> [Message]
forall a. [a] -> [a] -> [a]
++) ([Message] -> [Message]) -> IO [Message] -> IO [Message]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Int64 -> Int64 -> IO [Message]
getMsgs (JournalState 'JTWrite -> ByteString
forall (t :: JournalType). JournalState t -> ByteString
journalId JournalState 'JTWrite
ws) Int64
0 (JournalState 'JTWrite -> Int64
forall (t :: JournalType). JournalState t -> Int64
bytePos JournalState 'JTWrite
ws)
(Maybe MsgQueueState, Bool)
_ -> [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
where
rId :: RecipientId
rId = JournalQueue s -> RecipientId
forall (s :: QSType). JournalQueue s -> RecipientId
recipientId' JournalQueue s
q
dir :: String
dir = JournalMsgStore s -> RecipientId -> String
forall (s :: QSType). JournalMsgStore s -> RecipientId -> String
msgQueueDirectory JournalMsgStore s
ms RecipientId
rId
getMsgs :: ByteString -> Int64 -> Int64 -> IO [Message]
getMsgs ByteString
jId Int64
from Int64
to =
String -> IOMode -> (Handle -> IO [Message]) -> IO [Message]
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
IO.withFile (String -> ByteString -> String
journalFilePath String
dir ByteString
jId) IOMode
ReadWriteMode ((Handle -> IO [Message]) -> IO [Message])
-> (Handle -> IO [Message]) -> IO [Message]
forall a b. (a -> b) -> a -> b
$ \Handle
h' ->
Handle -> Int64 -> Int64 -> IO [Message]
getJournalRange Handle
h' Int64
from Int64
to
getJournalRange :: Handle -> Int64 -> Int64 -> IO [Message]
getJournalRange :: Handle -> Int64 -> Int64 -> IO [Message]
getJournalRange Handle
h Int64
from Int64
to
| Int64
to Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
from = do
Handle -> SeekMode -> Integer -> IO ()
IO.hSeek Handle
h SeekMode
AbsoluteSeek (Integer -> IO ()) -> Integer -> IO ()
forall a b. (a -> b) -> a -> b
$ Int64 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
from
ByteString -> IO [Message]
parseMsgs (ByteString -> IO [Message]) -> IO ByteString -> IO [Message]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Handle -> Int -> IO ByteString
B.hGet Handle
h (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Int64 -> Int
forall a b. (a -> b) -> a -> b
$ Int64
to Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
from)
| Bool
otherwise = [Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
where
parseMsgs :: ByteString -> IO [Message]
parseMsgs ByteString
s = do
let ([String]
errs, [Message]
msgs) = [Either String Message] -> ([String], [Message])
forall a b. [Either a b] -> ([a], [b])
partitionEithers ([Either String Message] -> ([String], [Message]))
-> [Either String Message] -> ([String], [Message])
forall a b. (a -> b) -> a -> b
$ (ByteString -> Either String Message)
-> [ByteString] -> [Either String Message]
forall a b. (a -> b) -> [a] -> [b]
map ByteString -> Either String Message
forall a. StrEncoding a => ByteString -> Either String a
strDecode ([ByteString] -> [Either String Message])
-> [ByteString] -> [Either String Message]
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString]
B.lines ByteString
s
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([String] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [String]
errs) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
String
f <- Handle -> IO String
IO.hShow Handle
h
String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Error reading " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([String] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [String]
errs) String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" messages from " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
f
[Message] -> IO [Message]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Message]
msgs