{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TupleSections #-}
module Simplex.Messaging.Server.MsgStore.STM
( STMMsgStore (..),
STMStoreConfig (..),
STMQueue,
)
where
import Control.Concurrent.STM
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import Data.Functor (($>))
import Data.Int (Int64)
import qualified Data.Map.Strict as M
import Data.Text (Text)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Util ((<$$>), ($>>=))
data STMMsgStore = STMMsgStore
{ STMMsgStore -> STMStoreConfig
storeConfig :: STMStoreConfig,
STMMsgStore -> STMQueueStore STMQueue
queueStore_ :: STMQueueStore STMQueue
}
data STMQueue = STMQueue
{
STMQueue -> RecipientId
recipientId' :: RecipientId,
STMQueue -> TVar (Maybe QueueRec)
queueRec' :: TVar (Maybe QueueRec),
STMQueue -> TVar (Maybe STMMsgQueue)
msgQueue' :: TVar (Maybe STMMsgQueue)
}
data STMMsgQueue = STMMsgQueue
{ STMMsgQueue -> TQueue Message
msgTQueue :: TQueue Message,
STMMsgQueue -> TVar Bool
canWrite :: TVar Bool,
STMMsgQueue -> TVar Int
size :: TVar Int
}
data STMStoreConfig = STMStoreConfig
{ STMStoreConfig -> Maybe FilePath
storePath :: Maybe FilePath,
STMStoreConfig -> Int
quota :: Int
}
instance StoreQueueClass STMQueue where
recipientId :: STMQueue -> RecipientId
recipientId = STMQueue -> RecipientId
recipientId'
{-# INLINE recipientId #-}
queueRec :: STMQueue -> TVar (Maybe QueueRec)
queueRec = STMQueue -> TVar (Maybe QueueRec)
queueRec'
{-# INLINE queueRec #-}
withQueueLock :: forall a. STMQueue -> Text -> IO a -> IO a
withQueueLock STMQueue
_ Text
_ = IO a -> IO a
forall a. a -> a
id
{-# INLINE withQueueLock #-}
instance MsgStoreClass STMMsgStore where
type StoreMonad STMMsgStore = STM
type MsgQueue STMMsgStore = STMMsgQueue
type QueueStore STMMsgStore = STMQueueStore STMQueue
type StoreQueue STMMsgStore = STMQueue
type MsgStoreConfig STMMsgStore = STMStoreConfig
newMsgStore :: STMStoreConfig -> IO STMMsgStore
newMsgStore :: STMStoreConfig -> IO STMMsgStore
newMsgStore STMStoreConfig
storeConfig = do
STMQueueStore STMQueue
queueStore_ <- forall q s. QueueStoreClass q s => QueueStoreCfg s -> IO s
newQueueStore @STMQueue ()
STMMsgStore -> IO STMMsgStore
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure STMMsgStore {STMStoreConfig
$sel:storeConfig:STMMsgStore :: STMStoreConfig
storeConfig :: STMStoreConfig
storeConfig, STMQueueStore STMQueue
$sel:queueStore_:STMMsgStore :: STMQueueStore STMQueue
queueStore_ :: STMQueueStore STMQueue
queueStore_}
closeMsgStore :: STMMsgStore -> IO ()
closeMsgStore = forall q s. QueueStoreClass q s => s -> IO ()
closeQueueStore @STMQueue (STMQueueStore STMQueue -> IO ())
-> (STMMsgStore -> STMQueueStore STMQueue) -> STMMsgStore -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STMMsgStore -> STMQueueStore STMQueue
queueStore_
{-# INLINE closeMsgStore #-}
withActiveMsgQueues :: forall a.
Monoid a =>
STMMsgStore -> (StoreQueue STMMsgStore -> IO a) -> IO a
withActiveMsgQueues = STMQueueStore STMQueue -> (STMQueue -> IO a) -> IO a
forall a q s.
(Monoid a, QueueStoreClass q s) =>
s -> (q -> IO a) -> IO a
withLoadedQueues (STMQueueStore STMQueue -> (STMQueue -> IO a) -> IO a)
-> (STMMsgStore -> STMQueueStore STMQueue)
-> STMMsgStore
-> (STMQueue -> IO a)
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STMMsgStore -> STMQueueStore STMQueue
queueStore_
{-# INLINE withActiveMsgQueues #-}
unsafeWithAllMsgQueues :: forall a.
Monoid a =>
Bool -> STMMsgStore -> (StoreQueue STMMsgStore -> IO a) -> IO a
unsafeWithAllMsgQueues Bool
_ = STMQueueStore STMQueue -> (STMQueue -> IO a) -> IO a
forall a q s.
(Monoid a, QueueStoreClass q s) =>
s -> (q -> IO a) -> IO a
withLoadedQueues (STMQueueStore STMQueue -> (STMQueue -> IO a) -> IO a)
-> (STMMsgStore -> STMQueueStore STMQueue)
-> STMMsgStore
-> (STMQueue -> IO a)
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STMMsgStore -> STMQueueStore STMQueue
queueStore_
{-# INLINE unsafeWithAllMsgQueues #-}
expireOldMessages :: Bool -> STMMsgStore -> Int64 -> Int64 -> IO MessageStats
expireOldMessages :: Bool -> STMMsgStore -> Int64 -> Int64 -> IO MessageStats
expireOldMessages Bool
_tty STMMsgStore
ms Int64
now Int64
ttl =
STMQueueStore STMQueue
-> (STMQueue -> IO MessageStats) -> IO MessageStats
forall a q s.
(Monoid a, QueueStoreClass q s) =>
s -> (q -> IO a) -> IO a
withLoadedQueues (STMMsgStore -> STMQueueStore STMQueue
queueStore_ STMMsgStore
ms) ((STMQueue -> IO MessageStats) -> IO MessageStats)
-> (STMQueue -> IO MessageStats) -> IO MessageStats
forall a b. (a -> b) -> a -> b
$ STM MessageStats -> IO MessageStats
forall a. STM a -> IO a
atomically (STM MessageStats -> IO MessageStats)
-> (STMQueue -> STM MessageStats) -> STMQueue -> IO MessageStats
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STMMsgStore
-> Int64
-> Int64
-> StoreQueue STMMsgStore
-> StoreMonad STMMsgStore MessageStats
forall s.
MsgStoreClass s =>
s -> Int64 -> Int64 -> StoreQueue s -> StoreMonad s MessageStats
expireQueueMsgs STMMsgStore
ms Int64
now (Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
ttl)
logQueueStates :: STMMsgStore -> IO ()
logQueueStates STMMsgStore
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
{-# INLINE logQueueStates #-}
logQueueState :: StoreQueue STMMsgStore -> StoreMonad STMMsgStore ()
logQueueState StoreQueue STMMsgStore
_ = () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
{-# INLINE logQueueState #-}
queueStore :: STMMsgStore -> QueueStore STMMsgStore
queueStore = STMMsgStore -> STMQueueStore STMQueue
STMMsgStore -> QueueStore STMMsgStore
queueStore_
{-# INLINE queueStore #-}
loadedQueueCounts :: STMMsgStore -> IO LoadedQueueCounts
loadedQueueCounts :: STMMsgStore -> IO LoadedQueueCounts
loadedQueueCounts STMMsgStore {$sel:queueStore_:STMMsgStore :: STMMsgStore -> STMQueueStore STMQueue
queueStore_ = STMQueueStore STMQueue
st} = do
Int
loadedQueueCount <- Map RecipientId STMQueue -> Int
forall k a. Map k a -> Int
M.size (Map RecipientId STMQueue -> Int)
-> IO (Map RecipientId STMQueue) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Map RecipientId STMQueue) -> IO (Map RecipientId STMQueue)
forall a. TVar a -> IO a
readTVarIO (STMQueueStore STMQueue -> TVar (Map RecipientId STMQueue)
forall q. STMQueueStore q -> TMap RecipientId q
queues STMQueueStore STMQueue
st)
Int
loadedNotifierCount <- Map RecipientId RecipientId -> Int
forall k a. Map k a -> Int
M.size (Map RecipientId RecipientId -> Int)
-> IO (Map RecipientId RecipientId) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Map RecipientId RecipientId)
-> IO (Map RecipientId RecipientId)
forall a. TVar a -> IO a
readTVarIO (STMQueueStore STMQueue -> TVar (Map RecipientId RecipientId)
forall q. STMQueueStore q -> TVar (Map RecipientId RecipientId)
notifiers STMQueueStore STMQueue
st)
LoadedQueueCounts -> IO LoadedQueueCounts
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure LoadedQueueCounts {Int
loadedQueueCount :: Int
$sel:loadedQueueCount:LoadedQueueCounts :: Int
loadedQueueCount, Int
loadedNotifierCount :: Int
$sel:loadedNotifierCount:LoadedQueueCounts :: Int
loadedNotifierCount, $sel:openJournalCount:LoadedQueueCounts :: Int
openJournalCount = Int
0, $sel:queueLockCount:LoadedQueueCounts :: Int
queueLockCount = Int
0, $sel:notifierLockCount:LoadedQueueCounts :: Int
notifierLockCount = Int
0}
mkQueue :: STMMsgStore
-> Bool -> RecipientId -> QueueRec -> IO (StoreQueue STMMsgStore)
mkQueue STMMsgStore
_ Bool
_ RecipientId
rId QueueRec
qr = RecipientId
-> TVar (Maybe QueueRec) -> TVar (Maybe STMMsgQueue) -> STMQueue
STMQueue RecipientId
rId (TVar (Maybe QueueRec) -> TVar (Maybe STMMsgQueue) -> STMQueue)
-> IO (TVar (Maybe QueueRec))
-> IO (TVar (Maybe STMMsgQueue) -> STMQueue)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe QueueRec -> IO (TVar (Maybe QueueRec))
forall a. a -> IO (TVar a)
newTVarIO (QueueRec -> Maybe QueueRec
forall a. a -> Maybe a
Just QueueRec
qr) IO (TVar (Maybe STMMsgQueue) -> STMQueue)
-> IO (TVar (Maybe STMMsgQueue)) -> IO STMQueue
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe STMMsgQueue -> IO (TVar (Maybe STMMsgQueue))
forall a. a -> IO (TVar a)
newTVarIO Maybe STMMsgQueue
forall a. Maybe a
Nothing
{-# INLINE mkQueue #-}
getMsgQueue :: STMMsgStore -> STMQueue -> Bool -> STM STMMsgQueue
getMsgQueue :: STMMsgStore -> STMQueue -> Bool -> STM STMMsgQueue
getMsgQueue STMMsgStore
_ STMQueue {TVar (Maybe STMMsgQueue)
$sel:msgQueue':STMQueue :: STMQueue -> TVar (Maybe STMMsgQueue)
msgQueue' :: TVar (Maybe STMMsgQueue)
msgQueue'} Bool
_ = TVar (Maybe STMMsgQueue) -> STM (Maybe STMMsgQueue)
forall a. TVar a -> STM a
readTVar TVar (Maybe STMMsgQueue)
msgQueue' STM (Maybe STMMsgQueue)
-> (Maybe STMMsgQueue -> STM STMMsgQueue) -> STM STMMsgQueue
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM STMMsgQueue
-> (STMMsgQueue -> STM STMMsgQueue)
-> Maybe STMMsgQueue
-> STM STMMsgQueue
forall b a. b -> (a -> b) -> Maybe a -> b
maybe STM STMMsgQueue
newQ STMMsgQueue -> STM STMMsgQueue
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
where
newQ :: STM STMMsgQueue
newQ = do
TQueue Message
msgTQueue <- STM (TQueue Message)
forall a. STM (TQueue a)
newTQueue
TVar Bool
canWrite <- Bool -> STM (TVar Bool)
forall a. a -> STM (TVar a)
newTVar Bool
True
TVar Int
size <- Int -> STM (TVar Int)
forall a. a -> STM (TVar a)
newTVar Int
0
let q :: STMMsgQueue
q = STMMsgQueue {TQueue Message
$sel:msgTQueue:STMMsgQueue :: TQueue Message
msgTQueue :: TQueue Message
msgTQueue, TVar Bool
$sel:canWrite:STMMsgQueue :: TVar Bool
canWrite :: TVar Bool
canWrite, TVar Int
$sel:size:STMMsgQueue :: TVar Int
size :: TVar Int
size}
TVar (Maybe STMMsgQueue) -> Maybe STMMsgQueue -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe STMMsgQueue)
msgQueue' (STMMsgQueue -> Maybe STMMsgQueue
forall a. a -> Maybe a
Just STMMsgQueue
q)
STMMsgQueue -> STM STMMsgQueue
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure STMMsgQueue
q
getPeekMsgQueue :: STMMsgStore -> STMQueue -> STM (Maybe (STMMsgQueue, Message))
getPeekMsgQueue :: STMMsgStore -> STMQueue -> STM (Maybe (STMMsgQueue, Message))
getPeekMsgQueue STMMsgStore
_ q :: STMQueue
q@STMQueue {TVar (Maybe STMMsgQueue)
$sel:msgQueue':STMQueue :: STMQueue -> TVar (Maybe STMMsgQueue)
msgQueue' :: TVar (Maybe STMMsgQueue)
msgQueue'} = TVar (Maybe STMMsgQueue) -> STM (Maybe STMMsgQueue)
forall a. TVar a -> STM a
readTVar TVar (Maybe STMMsgQueue)
msgQueue' STM (Maybe STMMsgQueue)
-> (STMMsgQueue -> STM (Maybe (STMMsgQueue, Message)))
-> STM (Maybe (STMMsgQueue, Message))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \STMMsgQueue
mq -> (STMMsgQueue
mq,) (Message -> (STMMsgQueue, Message))
-> STM (Maybe Message) -> STM (Maybe (STMMsgQueue, Message))
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> StoreQueue STMMsgStore
-> MsgQueue STMMsgStore -> StoreMonad STMMsgStore (Maybe Message)
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryPeekMsg_ StoreQueue STMMsgStore
STMQueue
q MsgQueue STMMsgStore
STMMsgQueue
mq
withIdleMsgQueue :: Int64 -> STMMsgStore -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
withIdleMsgQueue :: forall a.
Int64
-> STMMsgStore
-> STMQueue
-> (STMMsgQueue -> STM a)
-> STM (Maybe a, Int)
withIdleMsgQueue Int64
_ STMMsgStore
_ STMQueue {TVar (Maybe STMMsgQueue)
$sel:msgQueue':STMQueue :: STMQueue -> TVar (Maybe STMMsgQueue)
msgQueue' :: TVar (Maybe STMMsgQueue)
msgQueue'} STMMsgQueue -> STM a
action = TVar (Maybe STMMsgQueue) -> STM (Maybe STMMsgQueue)
forall a. TVar a -> STM a
readTVar TVar (Maybe STMMsgQueue)
msgQueue' STM (Maybe STMMsgQueue)
-> (Maybe STMMsgQueue -> STM (Maybe a, Int)) -> STM (Maybe a, Int)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just STMMsgQueue
q -> do
a
r <- STMMsgQueue -> STM a
action STMMsgQueue
q
Int
sz <- MsgQueue STMMsgStore -> StoreMonad STMMsgStore Int
forall s. MsgStoreClass s => MsgQueue s -> StoreMonad s Int
getQueueSize_ MsgQueue STMMsgStore
STMMsgQueue
q
(Maybe a, Int) -> STM (Maybe a, Int)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
r, Int
sz)
Maybe STMMsgQueue
Nothing -> (Maybe a, Int) -> STM (Maybe a, Int)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a
forall a. Maybe a
Nothing, Int
0)
deleteQueue :: STMMsgStore -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue :: STMMsgStore -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue STMMsgStore
ms STMQueue
q = (QueueRec, Maybe STMMsgQueue) -> QueueRec
forall a b. (a, b) -> a
fst ((QueueRec, Maybe STMMsgQueue) -> QueueRec)
-> IO (Either ErrorType (QueueRec, Maybe STMMsgQueue))
-> IO (Either ErrorType QueueRec)
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> STMMsgStore
-> STMQueue -> IO (Either ErrorType (QueueRec, Maybe STMMsgQueue))
deleteQueue_ STMMsgStore
ms STMQueue
q
deleteQueueSize :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize STMMsgStore
ms STMQueue
q = STMMsgStore
-> STMQueue -> IO (Either ErrorType (QueueRec, Maybe STMMsgQueue))
deleteQueue_ STMMsgStore
ms STMQueue
q IO (Either ErrorType (QueueRec, Maybe STMMsgQueue))
-> (Either ErrorType (QueueRec, Maybe STMMsgQueue)
-> IO (Either ErrorType (QueueRec, Int)))
-> IO (Either ErrorType (QueueRec, Int))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((QueueRec, Maybe STMMsgQueue) -> IO (QueueRec, Int))
-> Either ErrorType (QueueRec, Maybe STMMsgQueue)
-> IO (Either ErrorType (QueueRec, Int))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Either ErrorType a -> m (Either ErrorType b)
mapM ((Maybe STMMsgQueue -> IO Int)
-> (QueueRec, Maybe STMMsgQueue) -> IO (QueueRec, Int)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> (QueueRec, a) -> f (QueueRec, b)
traverse Maybe STMMsgQueue -> IO Int
getSize)
where
getSize :: Maybe STMMsgQueue -> IO Int
getSize = IO Int -> (STMMsgQueue -> IO Int) -> Maybe STMMsgQueue -> IO Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0) (\STMMsgQueue {TVar Int
$sel:size:STMMsgQueue :: STMMsgQueue -> TVar Int
size :: TVar Int
size} -> TVar Int -> IO Int
forall a. TVar a -> IO a
readTVarIO TVar Int
size)
getQueueMessages_ :: Bool -> STMQueue -> STMMsgQueue -> STM [Message]
getQueueMessages_ :: Bool -> STMQueue -> STMMsgQueue -> STM [Message]
getQueueMessages_ Bool
drainMsgs STMQueue
_ = (if Bool
drainMsgs then TQueue Message -> STM [Message]
forall a. TQueue a -> STM [a]
flushTQueue else TQueue Message -> STM [Message]
forall a. TQueue a -> STM [a]
snapshotTQueue) (TQueue Message -> STM [Message])
-> (STMMsgQueue -> TQueue Message) -> STMMsgQueue -> STM [Message]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STMMsgQueue -> TQueue Message
msgTQueue
where
snapshotTQueue :: TQueue a -> STM [a]
snapshotTQueue TQueue a
q = do
[a]
msgs <- TQueue a -> STM [a]
forall a. TQueue a -> STM [a]
flushTQueue TQueue a
q
(a -> STM ()) -> [a] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TQueue a -> a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue a
q) [a]
msgs
[a] -> STM [a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [a]
msgs
writeMsg :: STMMsgStore -> STMQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg :: STMMsgStore
-> STMQueue
-> Bool
-> Message
-> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg STMMsgStore
ms STMQueue
q' Bool
_logState Message
msg = IO (Maybe (Message, Bool))
-> ExceptT ErrorType IO (Maybe (Message, Bool))
forall a. IO a -> ExceptT ErrorType IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Message, Bool))
-> ExceptT ErrorType IO (Maybe (Message, Bool)))
-> IO (Maybe (Message, Bool))
-> ExceptT ErrorType IO (Maybe (Message, Bool))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (Message, Bool)) -> IO (Maybe (Message, Bool))
forall a. STM a -> IO a
atomically (STM (Maybe (Message, Bool)) -> IO (Maybe (Message, Bool)))
-> STM (Maybe (Message, Bool)) -> IO (Maybe (Message, Bool))
forall a b. (a -> b) -> a -> b
$ do
STMMsgQueue {$sel:msgTQueue:STMMsgQueue :: STMMsgQueue -> TQueue Message
msgTQueue = TQueue Message
q, TVar Bool
$sel:canWrite:STMMsgQueue :: STMMsgQueue -> TVar Bool
canWrite :: TVar Bool
canWrite, TVar Int
$sel:size:STMMsgQueue :: STMMsgQueue -> TVar Int
size :: TVar Int
size} <- STMMsgStore
-> StoreQueue STMMsgStore
-> Bool
-> StoreMonad STMMsgStore (MsgQueue STMMsgStore)
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getMsgQueue STMMsgStore
ms StoreQueue STMMsgStore
STMQueue
q' Bool
True
Bool
canWrt <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
canWrite
Bool
empty <- TQueue Message -> STM Bool
forall a. TQueue a -> STM Bool
isEmptyTQueue TQueue Message
q
if Bool
canWrt Bool -> Bool -> Bool
|| Bool
empty
then do
Bool
canWrt' <- (Int
quota Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>) (Int -> Bool) -> STM Int -> STM Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
size
TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
canWrite (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$! Bool
canWrt'
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
size (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
if Bool
canWrt'
then (TQueue Message -> Message -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Message
q (Message -> STM ()) -> Message -> STM ()
forall a b. (a -> b) -> a -> b
$! Message
msg) STM () -> Maybe (Message, Bool) -> STM (Maybe (Message, Bool))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (Message, Bool) -> Maybe (Message, Bool)
forall a. a -> Maybe a
Just (Message
msg, Bool
empty)
else (TQueue Message -> Message -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Message
q (Message -> STM ()) -> Message -> STM ()
forall a b. (a -> b) -> a -> b
$! Message
msgQuota) STM () -> Maybe (Message, Bool) -> STM (Maybe (Message, Bool))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe (Message, Bool)
forall a. Maybe a
Nothing
else Maybe (Message, Bool) -> STM (Maybe (Message, Bool))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Message, Bool)
forall a. Maybe a
Nothing
where
STMMsgStore {$sel:storeConfig:STMMsgStore :: STMMsgStore -> STMStoreConfig
storeConfig = STMStoreConfig {Int
$sel:quota:STMStoreConfig :: STMStoreConfig -> Int
quota :: Int
quota}} = STMMsgStore
ms
msgQuota :: Message
msgQuota = MessageQuota {$sel:msgId:Message :: MsgId
msgId = Message -> MsgId
messageId Message
msg, $sel:msgTs:Message :: SystemTime
msgTs = Message -> SystemTime
messageTs Message
msg}
setOverQuota_ :: STMQueue -> IO ()
setOverQuota_ :: STMQueue -> IO ()
setOverQuota_ STMQueue
q = TVar (Maybe STMMsgQueue) -> IO (Maybe STMMsgQueue)
forall a. TVar a -> IO a
readTVarIO (STMQueue -> TVar (Maybe STMMsgQueue)
msgQueue' STMQueue
q) IO (Maybe STMMsgQueue) -> (Maybe STMMsgQueue -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (STMMsgQueue -> IO ()) -> Maybe STMMsgQueue -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\STMMsgQueue
mq -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (STMMsgQueue -> TVar Bool
canWrite STMMsgQueue
mq) Bool
False)
getQueueSize_ :: STMMsgQueue -> STM Int
getQueueSize_ :: STMMsgQueue -> STM Int
getQueueSize_ STMMsgQueue {TVar Int
$sel:size:STMMsgQueue :: STMMsgQueue -> TVar Int
size :: TVar Int
size} = TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
size
tryPeekMsg_ :: STMQueue -> STMMsgQueue -> STM (Maybe Message)
tryPeekMsg_ :: STMQueue -> STMMsgQueue -> STM (Maybe Message)
tryPeekMsg_ STMQueue
_ = TQueue Message -> STM (Maybe Message)
forall a. TQueue a -> STM (Maybe a)
tryPeekTQueue (TQueue Message -> STM (Maybe Message))
-> (STMMsgQueue -> TQueue Message)
-> STMMsgQueue
-> STM (Maybe Message)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STMMsgQueue -> TQueue Message
msgTQueue
{-# INLINE tryPeekMsg_ #-}
tryDeleteMsg_ :: STMQueue -> STMMsgQueue -> Bool -> STM ()
tryDeleteMsg_ :: STMQueue -> STMMsgQueue -> Bool -> STM ()
tryDeleteMsg_ STMQueue
_ STMMsgQueue {$sel:msgTQueue:STMMsgQueue :: STMMsgQueue -> TQueue Message
msgTQueue = TQueue Message
q, TVar Int
$sel:size:STMMsgQueue :: STMMsgQueue -> TVar Int
size :: TVar Int
size} Bool
_logState =
TQueue Message -> STM (Maybe Message)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue Message
q STM (Maybe Message) -> (Maybe Message -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Message
_ -> TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
size (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
Maybe Message
_ -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
isolateQueue :: STMMsgStore -> STMQueue -> Text -> STM a -> ExceptT ErrorType IO a
isolateQueue :: forall a.
STMMsgStore -> STMQueue -> Text -> STM a -> ExceptT ErrorType IO a
isolateQueue STMMsgStore
_ STMQueue
_ Text
_ = IO a -> ExceptT ErrorType IO a
forall a. IO a -> ExceptT ErrorType IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> ExceptT ErrorType IO a)
-> (STM a -> IO a) -> STM a -> ExceptT ErrorType IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically
{-# INLINE isolateQueue #-}
unsafeRunStore :: STMQueue -> Text -> STM a -> IO a
unsafeRunStore :: forall a. STMQueue -> Text -> STM a -> IO a
unsafeRunStore STMQueue
_ Text
_ = STM a -> IO a
forall a. STM a -> IO a
atomically
{-# INLINE unsafeRunStore #-}
deleteQueue_ :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Maybe STMMsgQueue))
deleteQueue_ :: STMMsgStore
-> STMQueue -> IO (Either ErrorType (QueueRec, Maybe STMMsgQueue))
deleteQueue_ STMMsgStore
ms STMQueue
q = STMQueueStore STMQueue
-> STMQueue -> IO (Either ErrorType QueueRec)
forall q s.
QueueStoreClass q s =>
s -> q -> IO (Either ErrorType QueueRec)
deleteStoreQueue (STMMsgStore -> STMQueueStore STMQueue
queueStore_ STMMsgStore
ms) STMQueue
q IO (Either ErrorType QueueRec)
-> (Either ErrorType QueueRec
-> IO (Either ErrorType (QueueRec, Maybe STMMsgQueue)))
-> IO (Either ErrorType (QueueRec, Maybe STMMsgQueue))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (QueueRec -> IO (QueueRec, Maybe STMMsgQueue))
-> Either ErrorType QueueRec
-> IO (Either ErrorType (QueueRec, Maybe STMMsgQueue))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Either ErrorType a -> m (Either ErrorType b)
mapM QueueRec -> IO (QueueRec, Maybe STMMsgQueue)
remove
where
remove :: QueueRec -> IO (QueueRec, Maybe STMMsgQueue)
remove QueueRec
qr = (QueueRec
qr,) (Maybe STMMsgQueue -> (QueueRec, Maybe STMMsgQueue))
-> IO (Maybe STMMsgQueue) -> IO (QueueRec, Maybe STMMsgQueue)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Maybe STMMsgQueue) -> IO (Maybe STMMsgQueue)
forall a. STM a -> IO a
atomically (TVar (Maybe STMMsgQueue)
-> Maybe STMMsgQueue -> STM (Maybe STMMsgQueue)
forall a. TVar a -> a -> STM a
swapTVar (STMQueue -> TVar (Maybe STMMsgQueue)
msgQueue' STMQueue
q) Maybe STMMsgQueue
forall a. Maybe a
Nothing)