{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilyDependencies #-}
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
{-# HLINT ignore "Redundant multi-way if" #-}
module Simplex.Messaging.Server.MsgStore.Types
( MsgStoreClass (..),
MSType (..),
QSType (..),
SMSType (..),
SQSType (..),
MessageStats (..),
LoadedQueueCounts (..),
newMessageStats,
addQueue,
getQueue,
getQueueRec,
getQueues,
getQueueRecs,
readQueueRec,
withPeekMsgQueue,
expireQueueMsgs,
deleteExpireMsgs_,
) where
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Trans.Except
import Data.Functor (($>))
import Data.Int (Int64)
import Data.Kind
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import Data.Time.Clock.System (SystemTime (systemSeconds))
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Util ((<$$>), ($>>=))
class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => MsgStoreClass s where
type StoreMonad s = (m :: Type -> Type) | m -> s
type MsgStoreConfig s = c | c -> s
type MsgQueue s = q | q -> s
type StoreQueue s = q | q -> s
type QueueStore s = qs | qs -> s
newMsgStore :: MsgStoreConfig s -> IO s
closeMsgStore :: s -> IO ()
withActiveMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a
unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a
expireOldMessages :: Bool -> s -> Int64 -> Int64 -> IO MessageStats
logQueueStates :: s -> IO ()
logQueueState :: StoreQueue s -> StoreMonad s ()
queueStore :: s -> QueueStore s
loadedQueueCounts :: s -> IO LoadedQueueCounts
mkQueue :: s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
deleteQueue :: s -> StoreQueue s -> IO (Either ErrorType QueueRec)
deleteQueueSize :: s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
getQueueMessages_ :: Bool -> StoreQueue s -> MsgQueue s -> StoreMonad s [Message]
writeMsg :: s -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
setOverQuota_ :: StoreQueue s -> IO ()
getQueueSize_ :: MsgQueue s -> StoreMonad s Int
tryPeekMsg_ :: StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
isolateQueue :: s -> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
unsafeRunStore :: StoreQueue s -> Text -> StoreMonad s a -> IO a
tryPeekMsg :: s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg s
st StoreQueue s
q = (MsgQueue s, Message) -> Message
forall a b. (a, b) -> b
snd ((MsgQueue s, Message) -> Message)
-> ExceptT ErrorType IO (Maybe (MsgQueue s, Message))
-> ExceptT ErrorType IO (Maybe Message)
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe (MsgQueue s, Message)))
-> ExceptT ErrorType IO (Maybe (MsgQueue s, Message))
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> ExceptT ErrorType IO a
withPeekMsgQueue s
st StoreQueue s
q Text
"tryPeekMsg" Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe (MsgQueue s, Message))
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE tryPeekMsg #-}
tryDelMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg s
st StoreQueue s
q MsgId
msgId' =
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> ExceptT ErrorType IO (Maybe Message)
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> ExceptT ErrorType IO a
withPeekMsgQueue s
st StoreQueue s
q Text
"tryDelMsg" ((Maybe (MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> ExceptT ErrorType IO (Maybe Message))
-> (Maybe (MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> ExceptT ErrorType IO (Maybe Message)
forall a b. (a -> b) -> a -> b
$
StoreMonad s (Maybe Message)
-> ((MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe Message -> StoreMonad s (Maybe Message)
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Message
forall a. Maybe a
Nothing) (((MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> Maybe (MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> ((MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ \(MsgQueue s
mq, Message
msg) ->
if
| Message -> MsgId
messageId Message
msg MsgId -> MsgId -> Bool
forall a. Eq a => a -> a -> Bool
== MsgId
msgId' ->
StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
tryDeleteMsg_ StoreQueue s
q MsgQueue s
mq Bool
True StoreMonad s () -> Maybe Message -> StoreMonad s (Maybe Message)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Message -> Maybe Message
forall a. a -> Maybe a
Just Message
msg
| Bool
otherwise -> Maybe Message -> StoreMonad s (Maybe Message)
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Message
forall a. Maybe a
Nothing
tryDelPeekMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
tryDelPeekMsg s
st StoreQueue s
q MsgId
msgId' =
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message, Maybe Message))
-> ExceptT ErrorType IO (Maybe Message, Maybe Message)
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> ExceptT ErrorType IO a
withPeekMsgQueue s
st StoreQueue s
q Text
"tryDelPeekMsg" ((Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message, Maybe Message))
-> ExceptT ErrorType IO (Maybe Message, Maybe Message))
-> (Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message, Maybe Message))
-> ExceptT ErrorType IO (Maybe Message, Maybe Message)
forall a b. (a -> b) -> a -> b
$
StoreMonad s (Maybe Message, Maybe Message)
-> ((MsgQueue s, Message)
-> StoreMonad s (Maybe Message, Maybe Message))
-> Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message, Maybe Message)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((Maybe Message, Maybe Message)
-> StoreMonad s (Maybe Message, Maybe Message)
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message
forall a. Maybe a
Nothing, Maybe Message
forall a. Maybe a
Nothing)) (((MsgQueue s, Message)
-> StoreMonad s (Maybe Message, Maybe Message))
-> Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message, Maybe Message))
-> ((MsgQueue s, Message)
-> StoreMonad s (Maybe Message, Maybe Message))
-> Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message, Maybe Message)
forall a b. (a -> b) -> a -> b
$ \(MsgQueue s
mq, Message
msg) ->
if
| Message -> MsgId
messageId Message
msg MsgId -> MsgId -> Bool
forall a. Eq a => a -> a -> Bool
== MsgId
msgId' -> (Message -> Maybe Message
forall a. a -> Maybe a
Just Message
msg,) (Maybe Message -> (Maybe Message, Maybe Message))
-> StoreMonad s (Maybe Message)
-> StoreMonad s (Maybe Message, Maybe Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
tryDeleteMsg_ StoreQueue s
q MsgQueue s
mq Bool
True StoreMonad s ()
-> StoreMonad s (Maybe Message) -> StoreMonad s (Maybe Message)
forall a b. StoreMonad s a -> StoreMonad s b -> StoreMonad s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryPeekMsg_ StoreQueue s
q MsgQueue s
mq)
| Bool
otherwise -> (Maybe Message, Maybe Message)
-> StoreMonad s (Maybe Message, Maybe Message)
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message
forall a. Maybe a
Nothing, Message -> Maybe Message
forall a. a -> Maybe a
Just Message
msg)
deleteExpiredMsgs :: s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs s
st StoreQueue s
q Int64
old =
s
-> StoreQueue s
-> Text
-> StoreMonad s Int
-> ExceptT ErrorType IO Int
forall a.
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
isolateQueue s
st StoreQueue s
q Text
"deleteExpiredMsgs" (StoreMonad s Int -> ExceptT ErrorType IO Int)
-> StoreMonad s Int -> ExceptT ErrorType IO Int
forall a b. (a -> b) -> a -> b
$
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getMsgQueue s
st StoreQueue s
q Bool
False StoreMonad s (MsgQueue s)
-> (MsgQueue s -> StoreMonad s Int) -> StoreMonad s Int
forall a b.
StoreMonad s a -> (a -> StoreMonad s b) -> StoreMonad s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
forall s.
MsgStoreClass s =>
Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ Int64
old StoreQueue s
q
getQueueSize :: s -> StoreQueue s -> ExceptT ErrorType IO Int
getQueueSize s
st StoreQueue s
q = s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s Int)
-> ExceptT ErrorType IO Int
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> ExceptT ErrorType IO a
withPeekMsgQueue s
st StoreQueue s
q Text
"getQueueSize" ((Maybe (MsgQueue s, Message) -> StoreMonad s Int)
-> ExceptT ErrorType IO Int)
-> (Maybe (MsgQueue s, Message) -> StoreMonad s Int)
-> ExceptT ErrorType IO Int
forall a b. (a -> b) -> a -> b
$ StoreMonad s Int
-> ((MsgQueue s, Message) -> StoreMonad s Int)
-> Maybe (MsgQueue s, Message)
-> StoreMonad s Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int -> StoreMonad s Int
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0) (MsgQueue s -> StoreMonad s Int
forall s. MsgStoreClass s => MsgQueue s -> StoreMonad s Int
getQueueSize_ (MsgQueue s -> StoreMonad s Int)
-> ((MsgQueue s, Message) -> MsgQueue s)
-> (MsgQueue s, Message)
-> StoreMonad s Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MsgQueue s, Message) -> MsgQueue s
forall a b. (a, b) -> a
fst)
{-# INLINE getQueueSize #-}
data MSType = MSMemory | MSJournal | MSPostgres
data QSType = QSMemory | QSPostgres
data SMSType :: MSType -> Type where
SMSMemory :: SMSType 'MSMemory
SMSJournal :: SMSType 'MSJournal
SMSPostgres :: SMSType 'MSPostgres
data SQSType :: QSType -> Type where
SQSMemory :: SQSType 'QSMemory
SQSPostgres :: SQSType 'QSPostgres
data MessageStats = MessageStats
{ MessageStats -> Int
storedMsgsCount :: Int,
MessageStats -> Int
expiredMsgsCount :: Int,
MessageStats -> Int
storedQueues :: Int
}
deriving (Int -> MessageStats -> ShowS
[MessageStats] -> ShowS
MessageStats -> String
(Int -> MessageStats -> ShowS)
-> (MessageStats -> String)
-> ([MessageStats] -> ShowS)
-> Show MessageStats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MessageStats -> ShowS
showsPrec :: Int -> MessageStats -> ShowS
$cshow :: MessageStats -> String
show :: MessageStats -> String
$cshowList :: [MessageStats] -> ShowS
showList :: [MessageStats] -> ShowS
Show)
instance Monoid MessageStats where
mempty :: MessageStats
mempty = Int -> Int -> Int -> MessageStats
MessageStats Int
0 Int
0 Int
0
{-# INLINE mempty #-}
instance Semigroup MessageStats where
MessageStats Int
a Int
b Int
c <> :: MessageStats -> MessageStats -> MessageStats
<> MessageStats Int
x Int
y Int
z = Int -> Int -> Int -> MessageStats
MessageStats (Int
a Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
x) (Int
b Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
y) (Int
c Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
z)
{-# INLINE (<>) #-}
data LoadedQueueCounts = LoadedQueueCounts
{ LoadedQueueCounts -> Int
loadedQueueCount :: Int,
LoadedQueueCounts -> Int
loadedNotifierCount :: Int,
LoadedQueueCounts -> Int
openJournalCount :: Int,
LoadedQueueCounts -> Int
queueLockCount :: Int,
LoadedQueueCounts -> Int
notifierLockCount :: Int
}
newMessageStats :: MessageStats
newMessageStats :: MessageStats
newMessageStats = Int -> Int -> Int -> MessageStats
MessageStats Int
0 Int
0 Int
0
addQueue :: MsgStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue :: forall s.
MsgStoreClass s =>
s
-> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue s
st = QueueStore s
-> (RecipientId -> QueueRec -> IO (StoreQueue s))
-> RecipientId
-> QueueRec
-> IO (Either ErrorType (StoreQueue s))
forall q s.
QueueStoreClass q s =>
s
-> (RecipientId -> QueueRec -> IO q)
-> RecipientId
-> QueueRec
-> IO (Either ErrorType q)
addQueue_ (s -> QueueStore s
forall s. MsgStoreClass s => s -> QueueStore s
queueStore s
st) (s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
forall s.
MsgStoreClass s =>
s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
mkQueue s
st Bool
True)
{-# INLINE addQueue #-}
getQueue :: (MsgStoreClass s, QueueParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))
getQueue :: forall s (p :: Party).
(MsgStoreClass s, QueueParty p) =>
s
-> SParty p -> RecipientId -> IO (Either ErrorType (StoreQueue s))
getQueue s
st = QueueStore s
-> (Bool -> RecipientId -> QueueRec -> IO (StoreQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (StoreQueue s))
forall q s (p :: Party).
(QueueStoreClass q s, QueueParty p) =>
s
-> (Bool -> RecipientId -> QueueRec -> IO q)
-> SParty p
-> RecipientId
-> IO (Either ErrorType q)
forall (p :: Party).
QueueParty p =>
QueueStore s
-> (Bool -> RecipientId -> QueueRec -> IO (StoreQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (StoreQueue s))
getQueue_ (s -> QueueStore s
forall s. MsgStoreClass s => s -> QueueStore s
queueStore s
st) (s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
forall s.
MsgStoreClass s =>
s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
mkQueue s
st)
{-# INLINE getQueue #-}
getQueueRec :: (MsgStoreClass s, QueueParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec))
getQueueRec :: forall s (p :: Party).
(MsgStoreClass s, QueueParty p) =>
s
-> SParty p
-> RecipientId
-> IO (Either ErrorType (StoreQueue s, QueueRec))
getQueueRec s
st SParty p
party RecipientId
qId = s
-> SParty p -> RecipientId -> IO (Either ErrorType (StoreQueue s))
forall s (p :: Party).
(MsgStoreClass s, QueueParty p) =>
s
-> SParty p -> RecipientId -> IO (Either ErrorType (StoreQueue s))
getQueue s
st SParty p
party RecipientId
qId IO (Either ErrorType (StoreQueue s))
-> (StoreQueue s -> IO (Either ErrorType (StoreQueue s, QueueRec)))
-> IO (Either ErrorType (StoreQueue s, QueueRec))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= StoreQueue s -> IO (Either ErrorType (StoreQueue s, QueueRec))
forall q.
StoreQueueClass q =>
q -> IO (Either ErrorType (q, QueueRec))
readQueueRec
getQueues :: (MsgStoreClass s, BatchParty p) => s -> SParty p -> [QueueId] -> IO [Either ErrorType (StoreQueue s)]
getQueues :: forall s (p :: Party).
(MsgStoreClass s, BatchParty p) =>
s
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s)]
getQueues s
st = QueueStore s
-> (Bool -> RecipientId -> QueueRec -> IO (StoreQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s)]
forall q s (p :: Party).
(QueueStoreClass q s, BatchParty p) =>
s
-> (Bool -> RecipientId -> QueueRec -> IO q)
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType q]
forall (p :: Party).
BatchParty p =>
QueueStore s
-> (Bool -> RecipientId -> QueueRec -> IO (StoreQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s)]
getQueues_ (s -> QueueStore s
forall s. MsgStoreClass s => s -> QueueStore s
queueStore s
st) (s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
forall s.
MsgStoreClass s =>
s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
mkQueue s
st)
{-# INLINE getQueues #-}
getQueueRecs :: (MsgStoreClass s, BatchParty p) => s -> SParty p -> [QueueId] -> IO [Either ErrorType (StoreQueue s, QueueRec)]
getQueueRecs :: forall s (p :: Party).
(MsgStoreClass s, BatchParty p) =>
s
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s, QueueRec)]
getQueueRecs s
st SParty p
party [RecipientId]
qIds = s
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s)]
forall s (p :: Party).
(MsgStoreClass s, BatchParty p) =>
s
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s)]
getQueues s
st SParty p
party [RecipientId]
qIds IO [Either ErrorType (StoreQueue s)]
-> ([Either ErrorType (StoreQueue s)]
-> IO [Either ErrorType (StoreQueue s, QueueRec)])
-> IO [Either ErrorType (StoreQueue s, QueueRec)]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either ErrorType (StoreQueue s)
-> IO (Either ErrorType (StoreQueue s, QueueRec)))
-> [Either ErrorType (StoreQueue s)]
-> IO [Either ErrorType (StoreQueue s, QueueRec)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ((Either ErrorType (Either ErrorType (StoreQueue s, QueueRec))
-> Either ErrorType (StoreQueue s, QueueRec))
-> IO
(Either ErrorType (Either ErrorType (StoreQueue s, QueueRec)))
-> IO (Either ErrorType (StoreQueue s, QueueRec))
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Either ErrorType (Either ErrorType (StoreQueue s, QueueRec))
-> Either ErrorType (StoreQueue s, QueueRec)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (Either ErrorType (Either ErrorType (StoreQueue s, QueueRec)))
-> IO (Either ErrorType (StoreQueue s, QueueRec)))
-> (Either ErrorType (StoreQueue s)
-> IO
(Either ErrorType (Either ErrorType (StoreQueue s, QueueRec))))
-> Either ErrorType (StoreQueue s)
-> IO (Either ErrorType (StoreQueue s, QueueRec))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StoreQueue s -> IO (Either ErrorType (StoreQueue s, QueueRec)))
-> Either ErrorType (StoreQueue s)
-> IO
(Either ErrorType (Either ErrorType (StoreQueue s, QueueRec)))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Either ErrorType a -> m (Either ErrorType b)
mapM StoreQueue s -> IO (Either ErrorType (StoreQueue s, QueueRec))
forall q.
StoreQueueClass q =>
q -> IO (Either ErrorType (q, QueueRec))
readQueueRec)
readQueueRec :: StoreQueueClass q => q -> IO (Either ErrorType (q, QueueRec))
readQueueRec :: forall q.
StoreQueueClass q =>
q -> IO (Either ErrorType (q, QueueRec))
readQueueRec q
q = Either ErrorType (q, QueueRec)
-> (QueueRec -> Either ErrorType (q, QueueRec))
-> Maybe QueueRec
-> Either ErrorType (q, QueueRec)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ErrorType -> Either ErrorType (q, QueueRec)
forall a b. a -> Either a b
Left ErrorType
AUTH) ((q, QueueRec) -> Either ErrorType (q, QueueRec)
forall a b. b -> Either a b
Right ((q, QueueRec) -> Either ErrorType (q, QueueRec))
-> (QueueRec -> (q, QueueRec))
-> QueueRec
-> Either ErrorType (q, QueueRec)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (q
q,)) (Maybe QueueRec -> Either ErrorType (q, QueueRec))
-> IO (Maybe QueueRec) -> IO (Either ErrorType (q, QueueRec))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Maybe QueueRec) -> IO (Maybe QueueRec)
forall a. TVar a -> IO a
readTVarIO (q -> TVar (Maybe QueueRec)
forall q. StoreQueueClass q => q -> TVar (Maybe QueueRec)
queueRec q
q)
{-# INLINE readQueueRec #-}
withPeekMsgQueue :: MsgStoreClass s => s -> StoreQueue s -> Text -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a
withPeekMsgQueue :: forall s a.
MsgStoreClass s =>
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> ExceptT ErrorType IO a
withPeekMsgQueue s
st StoreQueue s
q Text
op Maybe (MsgQueue s, Message) -> StoreMonad s a
a = s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
forall a.
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
isolateQueue s
st StoreQueue s
q Text
op (StoreMonad s a -> ExceptT ErrorType IO a)
-> StoreMonad s a -> ExceptT ErrorType IO a
forall a b. (a -> b) -> a -> b
$ s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
getPeekMsgQueue s
st StoreQueue s
q StoreMonad s (Maybe (MsgQueue s, Message))
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> StoreMonad s a
forall a b.
StoreMonad s a -> (a -> StoreMonad s b) -> StoreMonad s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe (MsgQueue s, Message) -> StoreMonad s a
a
{-# INLINE withPeekMsgQueue #-}
expireQueueMsgs :: MsgStoreClass s => s -> Int64 -> Int64 -> StoreQueue s -> StoreMonad s MessageStats
expireQueueMsgs :: forall s.
MsgStoreClass s =>
s -> Int64 -> Int64 -> StoreQueue s -> StoreMonad s MessageStats
expireQueueMsgs s
st Int64
now Int64
old StoreQueue s
q = do
(Maybe Int
expired_, Int
stored) <- Int64
-> s
-> StoreQueue s
-> (MsgQueue s -> StoreMonad s Int)
-> StoreMonad s (Maybe Int, Int)
forall a.
Int64
-> s
-> StoreQueue s
-> (MsgQueue s -> StoreMonad s a)
-> StoreMonad s (Maybe a, Int)
forall s a.
MsgStoreClass s =>
Int64
-> s
-> StoreQueue s
-> (MsgQueue s -> StoreMonad s a)
-> StoreMonad s (Maybe a, Int)
withIdleMsgQueue Int64
now s
st StoreQueue s
q ((MsgQueue s -> StoreMonad s Int) -> StoreMonad s (Maybe Int, Int))
-> (MsgQueue s -> StoreMonad s Int)
-> StoreMonad s (Maybe Int, Int)
forall a b. (a -> b) -> a -> b
$ Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
forall s.
MsgStoreClass s =>
Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ Int64
old StoreQueue s
q
MessageStats -> StoreMonad s MessageStats
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MessageStats {$sel:storedMsgsCount:MessageStats :: Int
storedMsgsCount = Int
stored, $sel:expiredMsgsCount:MessageStats :: Int
expiredMsgsCount = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
0 Maybe Int
expired_, $sel:storedQueues:MessageStats :: Int
storedQueues = Int
1}
deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ :: forall s.
MsgStoreClass s =>
Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ Int64
old StoreQueue s
q MsgQueue s
mq = do
Int
n <- Int -> StoreMonad s Int
loop Int
0
StoreQueue s -> StoreMonad s ()
forall s. MsgStoreClass s => StoreQueue s -> StoreMonad s ()
logQueueState StoreQueue s
q
Int -> StoreMonad s Int
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
n
where
loop :: Int -> StoreMonad s Int
loop Int
dc =
StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryPeekMsg_ StoreQueue s
q MsgQueue s
mq StoreMonad s (Maybe Message)
-> (Maybe Message -> StoreMonad s Int) -> StoreMonad s Int
forall a b.
StoreMonad s a -> (a -> StoreMonad s b) -> StoreMonad s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Message {SystemTime
msgTs :: SystemTime
$sel:msgTs:Message :: Message -> SystemTime
msgTs}
| SystemTime -> Int64
systemSeconds SystemTime
msgTs Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
old ->
StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
tryDeleteMsg_ StoreQueue s
q MsgQueue s
mq Bool
False StoreMonad s () -> StoreMonad s Int -> StoreMonad s Int
forall a b. StoreMonad s a -> StoreMonad s b -> StoreMonad s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> StoreMonad s Int
loop (Int
dc Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
Maybe Message
_ -> Int -> StoreMonad s Int
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
dc