{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilyDependencies #-}
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}

{-# HLINT ignore "Redundant multi-way if" #-}

module Simplex.Messaging.Server.MsgStore.Types
  ( MsgStoreClass (..),
    MSType (..),
    QSType (..),
    SMSType (..),
    SQSType (..),
    MessageStats (..),
    LoadedQueueCounts (..),
    newMessageStats,
    addQueue,
    getQueue,
    getQueueRec,
    getQueues,
    getQueueRecs,
    readQueueRec,
    withPeekMsgQueue,
    expireQueueMsgs,
    deleteExpireMsgs_,
  ) where

import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Trans.Except
import Data.Functor (($>))
import Data.Int (Int64)
import Data.Kind
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import Data.Time.Clock.System (SystemTime (systemSeconds))
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Util ((<$$>), ($>>=))

class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => MsgStoreClass s where
  type StoreMonad s = (m :: Type -> Type) | m -> s
  type MsgStoreConfig s = c | c -> s
  type MsgQueue s = q | q -> s
  type StoreQueue s = q | q -> s
  type QueueStore s = qs | qs -> s
  newMsgStore :: MsgStoreConfig s -> IO s
  closeMsgStore :: s -> IO ()
  withActiveMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a
  -- This function can only be used in server CLI commands or before server is started.
  -- tty, store
  unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a
  -- tty, store, now, ttl
  expireOldMessages :: Bool -> s -> Int64 -> Int64 -> IO MessageStats
  logQueueStates :: s -> IO ()
  logQueueState :: StoreQueue s -> StoreMonad s ()
  queueStore :: s -> QueueStore s
  loadedQueueCounts :: s -> IO LoadedQueueCounts

  -- message store methods
  mkQueue :: s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
  getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
  getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))

  -- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
  withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
  deleteQueue :: s -> StoreQueue s -> IO (Either ErrorType QueueRec)
  deleteQueueSize :: s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
  getQueueMessages_ :: Bool -> StoreQueue s -> MsgQueue s -> StoreMonad s [Message]
  writeMsg :: s -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
  setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running
  getQueueSize_ :: MsgQueue s -> StoreMonad s Int
  tryPeekMsg_ :: StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
  tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
  isolateQueue :: s -> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
  unsafeRunStore :: StoreQueue s -> Text -> StoreMonad s a -> IO a

  -- default implementations are overridden for PostgreSQL storage of messages
  tryPeekMsg :: s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
  tryPeekMsg s
st StoreQueue s
q = (MsgQueue s, Message) -> Message
forall a b. (a, b) -> b
snd ((MsgQueue s, Message) -> Message)
-> ExceptT ErrorType IO (Maybe (MsgQueue s, Message))
-> ExceptT ErrorType IO (Maybe Message)
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message)
    -> StoreMonad s (Maybe (MsgQueue s, Message)))
-> ExceptT ErrorType IO (Maybe (MsgQueue s, Message))
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> ExceptT ErrorType IO a
withPeekMsgQueue s
st StoreQueue s
q Text
"tryPeekMsg" Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe (MsgQueue s, Message))
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
  {-# INLINE tryPeekMsg #-}

  tryDelMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
  tryDelMsg s
st StoreQueue s
q MsgId
msgId' =
    s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> ExceptT ErrorType IO (Maybe Message)
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> ExceptT ErrorType IO a
withPeekMsgQueue s
st StoreQueue s
q Text
"tryDelMsg" ((Maybe (MsgQueue s, Message) -> StoreMonad s (Maybe Message))
 -> ExceptT ErrorType IO (Maybe Message))
-> (Maybe (MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> ExceptT ErrorType IO (Maybe Message)
forall a b. (a -> b) -> a -> b
$
      StoreMonad s (Maybe Message)
-> ((MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe Message -> StoreMonad s (Maybe Message)
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Message
forall a. Maybe a
Nothing) (((MsgQueue s, Message) -> StoreMonad s (Maybe Message))
 -> Maybe (MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> ((MsgQueue s, Message) -> StoreMonad s (Maybe Message))
-> Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ \(MsgQueue s
mq, Message
msg) ->
        if
          | Message -> MsgId
messageId Message
msg MsgId -> MsgId -> Bool
forall a. Eq a => a -> a -> Bool
== MsgId
msgId' ->
              StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
tryDeleteMsg_ StoreQueue s
q MsgQueue s
mq Bool
True StoreMonad s () -> Maybe Message -> StoreMonad s (Maybe Message)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Message -> Maybe Message
forall a. a -> Maybe a
Just Message
msg
          | Bool
otherwise -> Maybe Message -> StoreMonad s (Maybe Message)
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Message
forall a. Maybe a
Nothing

  -- atomic delete (== read) last and peek next message if available
  tryDelPeekMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
  tryDelPeekMsg s
st StoreQueue s
q MsgId
msgId' =
    s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message)
    -> StoreMonad s (Maybe Message, Maybe Message))
-> ExceptT ErrorType IO (Maybe Message, Maybe Message)
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> ExceptT ErrorType IO a
withPeekMsgQueue s
st StoreQueue s
q Text
"tryDelPeekMsg" ((Maybe (MsgQueue s, Message)
  -> StoreMonad s (Maybe Message, Maybe Message))
 -> ExceptT ErrorType IO (Maybe Message, Maybe Message))
-> (Maybe (MsgQueue s, Message)
    -> StoreMonad s (Maybe Message, Maybe Message))
-> ExceptT ErrorType IO (Maybe Message, Maybe Message)
forall a b. (a -> b) -> a -> b
$
      StoreMonad s (Maybe Message, Maybe Message)
-> ((MsgQueue s, Message)
    -> StoreMonad s (Maybe Message, Maybe Message))
-> Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message, Maybe Message)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((Maybe Message, Maybe Message)
-> StoreMonad s (Maybe Message, Maybe Message)
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message
forall a. Maybe a
Nothing, Maybe Message
forall a. Maybe a
Nothing)) (((MsgQueue s, Message)
  -> StoreMonad s (Maybe Message, Maybe Message))
 -> Maybe (MsgQueue s, Message)
 -> StoreMonad s (Maybe Message, Maybe Message))
-> ((MsgQueue s, Message)
    -> StoreMonad s (Maybe Message, Maybe Message))
-> Maybe (MsgQueue s, Message)
-> StoreMonad s (Maybe Message, Maybe Message)
forall a b. (a -> b) -> a -> b
$ \(MsgQueue s
mq, Message
msg) ->
        if
          | Message -> MsgId
messageId Message
msg MsgId -> MsgId -> Bool
forall a. Eq a => a -> a -> Bool
== MsgId
msgId' -> (Message -> Maybe Message
forall a. a -> Maybe a
Just Message
msg,) (Maybe Message -> (Maybe Message, Maybe Message))
-> StoreMonad s (Maybe Message)
-> StoreMonad s (Maybe Message, Maybe Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
tryDeleteMsg_ StoreQueue s
q MsgQueue s
mq Bool
True StoreMonad s ()
-> StoreMonad s (Maybe Message) -> StoreMonad s (Maybe Message)
forall a b. StoreMonad s a -> StoreMonad s b -> StoreMonad s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryPeekMsg_ StoreQueue s
q MsgQueue s
mq)
          | Bool
otherwise -> (Maybe Message, Maybe Message)
-> StoreMonad s (Maybe Message, Maybe Message)
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message
forall a. Maybe a
Nothing, Message -> Maybe Message
forall a. a -> Maybe a
Just Message
msg)

  deleteExpiredMsgs :: s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
  deleteExpiredMsgs s
st StoreQueue s
q Int64
old =
    s
-> StoreQueue s
-> Text
-> StoreMonad s Int
-> ExceptT ErrorType IO Int
forall a.
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
isolateQueue s
st StoreQueue s
q Text
"deleteExpiredMsgs" (StoreMonad s Int -> ExceptT ErrorType IO Int)
-> StoreMonad s Int -> ExceptT ErrorType IO Int
forall a b. (a -> b) -> a -> b
$
      s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getMsgQueue s
st StoreQueue s
q Bool
False StoreMonad s (MsgQueue s)
-> (MsgQueue s -> StoreMonad s Int) -> StoreMonad s Int
forall a b.
StoreMonad s a -> (a -> StoreMonad s b) -> StoreMonad s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
forall s.
MsgStoreClass s =>
Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ Int64
old StoreQueue s
q

  getQueueSize :: s -> StoreQueue s -> ExceptT ErrorType IO Int
  getQueueSize s
st StoreQueue s
q = s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s Int)
-> ExceptT ErrorType IO Int
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> ExceptT ErrorType IO a
withPeekMsgQueue s
st StoreQueue s
q Text
"getQueueSize" ((Maybe (MsgQueue s, Message) -> StoreMonad s Int)
 -> ExceptT ErrorType IO Int)
-> (Maybe (MsgQueue s, Message) -> StoreMonad s Int)
-> ExceptT ErrorType IO Int
forall a b. (a -> b) -> a -> b
$ StoreMonad s Int
-> ((MsgQueue s, Message) -> StoreMonad s Int)
-> Maybe (MsgQueue s, Message)
-> StoreMonad s Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int -> StoreMonad s Int
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0) (MsgQueue s -> StoreMonad s Int
forall s. MsgStoreClass s => MsgQueue s -> StoreMonad s Int
getQueueSize_ (MsgQueue s -> StoreMonad s Int)
-> ((MsgQueue s, Message) -> MsgQueue s)
-> (MsgQueue s, Message)
-> StoreMonad s Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MsgQueue s, Message) -> MsgQueue s
forall a b. (a, b) -> a
fst)
  {-# INLINE getQueueSize #-}

data MSType = MSMemory | MSJournal | MSPostgres

data QSType = QSMemory | QSPostgres

data SMSType :: MSType -> Type where
  SMSMemory :: SMSType 'MSMemory
  SMSJournal :: SMSType 'MSJournal
  SMSPostgres :: SMSType 'MSPostgres

data SQSType :: QSType -> Type where
  SQSMemory :: SQSType 'QSMemory
  SQSPostgres :: SQSType 'QSPostgres

data MessageStats = MessageStats
  { MessageStats -> Int
storedMsgsCount :: Int,
    MessageStats -> Int
expiredMsgsCount :: Int,
    MessageStats -> Int
storedQueues :: Int
  }
  deriving (Int -> MessageStats -> ShowS
[MessageStats] -> ShowS
MessageStats -> String
(Int -> MessageStats -> ShowS)
-> (MessageStats -> String)
-> ([MessageStats] -> ShowS)
-> Show MessageStats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MessageStats -> ShowS
showsPrec :: Int -> MessageStats -> ShowS
$cshow :: MessageStats -> String
show :: MessageStats -> String
$cshowList :: [MessageStats] -> ShowS
showList :: [MessageStats] -> ShowS
Show)

instance Monoid MessageStats where
  mempty :: MessageStats
mempty = Int -> Int -> Int -> MessageStats
MessageStats Int
0 Int
0 Int
0
  {-# INLINE mempty #-}

instance Semigroup MessageStats where
  MessageStats Int
a Int
b Int
c <> :: MessageStats -> MessageStats -> MessageStats
<> MessageStats Int
x Int
y Int
z = Int -> Int -> Int -> MessageStats
MessageStats (Int
a Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
x) (Int
b Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
y) (Int
c Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
z)
  {-# INLINE (<>) #-}

data LoadedQueueCounts = LoadedQueueCounts
  { LoadedQueueCounts -> Int
loadedQueueCount :: Int,
    LoadedQueueCounts -> Int
loadedNotifierCount :: Int,
    LoadedQueueCounts -> Int
openJournalCount :: Int,
    LoadedQueueCounts -> Int
queueLockCount :: Int,
    LoadedQueueCounts -> Int
notifierLockCount :: Int
  }

newMessageStats :: MessageStats
newMessageStats :: MessageStats
newMessageStats = Int -> Int -> Int -> MessageStats
MessageStats Int
0 Int
0 Int
0

addQueue :: MsgStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue :: forall s.
MsgStoreClass s =>
s
-> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue s
st = QueueStore s
-> (RecipientId -> QueueRec -> IO (StoreQueue s))
-> RecipientId
-> QueueRec
-> IO (Either ErrorType (StoreQueue s))
forall q s.
QueueStoreClass q s =>
s
-> (RecipientId -> QueueRec -> IO q)
-> RecipientId
-> QueueRec
-> IO (Either ErrorType q)
addQueue_ (s -> QueueStore s
forall s. MsgStoreClass s => s -> QueueStore s
queueStore s
st) (s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
forall s.
MsgStoreClass s =>
s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
mkQueue s
st Bool
True)
{-# INLINE addQueue #-}

getQueue :: (MsgStoreClass s, QueueParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))
getQueue :: forall s (p :: Party).
(MsgStoreClass s, QueueParty p) =>
s
-> SParty p -> RecipientId -> IO (Either ErrorType (StoreQueue s))
getQueue s
st = QueueStore s
-> (Bool -> RecipientId -> QueueRec -> IO (StoreQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (StoreQueue s))
forall q s (p :: Party).
(QueueStoreClass q s, QueueParty p) =>
s
-> (Bool -> RecipientId -> QueueRec -> IO q)
-> SParty p
-> RecipientId
-> IO (Either ErrorType q)
forall (p :: Party).
QueueParty p =>
QueueStore s
-> (Bool -> RecipientId -> QueueRec -> IO (StoreQueue s))
-> SParty p
-> RecipientId
-> IO (Either ErrorType (StoreQueue s))
getQueue_ (s -> QueueStore s
forall s. MsgStoreClass s => s -> QueueStore s
queueStore s
st) (s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
forall s.
MsgStoreClass s =>
s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
mkQueue s
st)
{-# INLINE getQueue #-}

getQueueRec :: (MsgStoreClass s, QueueParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec))
getQueueRec :: forall s (p :: Party).
(MsgStoreClass s, QueueParty p) =>
s
-> SParty p
-> RecipientId
-> IO (Either ErrorType (StoreQueue s, QueueRec))
getQueueRec s
st SParty p
party RecipientId
qId = s
-> SParty p -> RecipientId -> IO (Either ErrorType (StoreQueue s))
forall s (p :: Party).
(MsgStoreClass s, QueueParty p) =>
s
-> SParty p -> RecipientId -> IO (Either ErrorType (StoreQueue s))
getQueue s
st SParty p
party RecipientId
qId IO (Either ErrorType (StoreQueue s))
-> (StoreQueue s -> IO (Either ErrorType (StoreQueue s, QueueRec)))
-> IO (Either ErrorType (StoreQueue s, QueueRec))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= StoreQueue s -> IO (Either ErrorType (StoreQueue s, QueueRec))
forall q.
StoreQueueClass q =>
q -> IO (Either ErrorType (q, QueueRec))
readQueueRec

getQueues :: (MsgStoreClass s, BatchParty p) => s -> SParty p -> [QueueId] -> IO [Either ErrorType (StoreQueue s)]
getQueues :: forall s (p :: Party).
(MsgStoreClass s, BatchParty p) =>
s
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s)]
getQueues s
st = QueueStore s
-> (Bool -> RecipientId -> QueueRec -> IO (StoreQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s)]
forall q s (p :: Party).
(QueueStoreClass q s, BatchParty p) =>
s
-> (Bool -> RecipientId -> QueueRec -> IO q)
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType q]
forall (p :: Party).
BatchParty p =>
QueueStore s
-> (Bool -> RecipientId -> QueueRec -> IO (StoreQueue s))
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s)]
getQueues_ (s -> QueueStore s
forall s. MsgStoreClass s => s -> QueueStore s
queueStore s
st) (s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
forall s.
MsgStoreClass s =>
s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
mkQueue s
st)
{-# INLINE getQueues #-}

getQueueRecs :: (MsgStoreClass s, BatchParty p) => s -> SParty p -> [QueueId] -> IO [Either ErrorType (StoreQueue s, QueueRec)]
getQueueRecs :: forall s (p :: Party).
(MsgStoreClass s, BatchParty p) =>
s
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s, QueueRec)]
getQueueRecs s
st SParty p
party [RecipientId]
qIds = s
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s)]
forall s (p :: Party).
(MsgStoreClass s, BatchParty p) =>
s
-> SParty p
-> [RecipientId]
-> IO [Either ErrorType (StoreQueue s)]
getQueues s
st SParty p
party [RecipientId]
qIds IO [Either ErrorType (StoreQueue s)]
-> ([Either ErrorType (StoreQueue s)]
    -> IO [Either ErrorType (StoreQueue s, QueueRec)])
-> IO [Either ErrorType (StoreQueue s, QueueRec)]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either ErrorType (StoreQueue s)
 -> IO (Either ErrorType (StoreQueue s, QueueRec)))
-> [Either ErrorType (StoreQueue s)]
-> IO [Either ErrorType (StoreQueue s, QueueRec)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ((Either ErrorType (Either ErrorType (StoreQueue s, QueueRec))
 -> Either ErrorType (StoreQueue s, QueueRec))
-> IO
     (Either ErrorType (Either ErrorType (StoreQueue s, QueueRec)))
-> IO (Either ErrorType (StoreQueue s, QueueRec))
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Either ErrorType (Either ErrorType (StoreQueue s, QueueRec))
-> Either ErrorType (StoreQueue s, QueueRec)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (Either ErrorType (Either ErrorType (StoreQueue s, QueueRec)))
 -> IO (Either ErrorType (StoreQueue s, QueueRec)))
-> (Either ErrorType (StoreQueue s)
    -> IO
         (Either ErrorType (Either ErrorType (StoreQueue s, QueueRec))))
-> Either ErrorType (StoreQueue s)
-> IO (Either ErrorType (StoreQueue s, QueueRec))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StoreQueue s -> IO (Either ErrorType (StoreQueue s, QueueRec)))
-> Either ErrorType (StoreQueue s)
-> IO
     (Either ErrorType (Either ErrorType (StoreQueue s, QueueRec)))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Either ErrorType a -> m (Either ErrorType b)
mapM StoreQueue s -> IO (Either ErrorType (StoreQueue s, QueueRec))
forall q.
StoreQueueClass q =>
q -> IO (Either ErrorType (q, QueueRec))
readQueueRec)

readQueueRec :: StoreQueueClass q => q -> IO (Either ErrorType (q, QueueRec))
readQueueRec :: forall q.
StoreQueueClass q =>
q -> IO (Either ErrorType (q, QueueRec))
readQueueRec q
q = Either ErrorType (q, QueueRec)
-> (QueueRec -> Either ErrorType (q, QueueRec))
-> Maybe QueueRec
-> Either ErrorType (q, QueueRec)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ErrorType -> Either ErrorType (q, QueueRec)
forall a b. a -> Either a b
Left ErrorType
AUTH) ((q, QueueRec) -> Either ErrorType (q, QueueRec)
forall a b. b -> Either a b
Right ((q, QueueRec) -> Either ErrorType (q, QueueRec))
-> (QueueRec -> (q, QueueRec))
-> QueueRec
-> Either ErrorType (q, QueueRec)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (q
q,)) (Maybe QueueRec -> Either ErrorType (q, QueueRec))
-> IO (Maybe QueueRec) -> IO (Either ErrorType (q, QueueRec))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Maybe QueueRec) -> IO (Maybe QueueRec)
forall a. TVar a -> IO a
readTVarIO (q -> TVar (Maybe QueueRec)
forall q. StoreQueueClass q => q -> TVar (Maybe QueueRec)
queueRec q
q)
{-# INLINE readQueueRec #-}

-- The action is called with Nothing when it is known that the queue is empty
withPeekMsgQueue :: MsgStoreClass s => s -> StoreQueue s -> Text -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a
withPeekMsgQueue :: forall s a.
MsgStoreClass s =>
s
-> StoreQueue s
-> Text
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> ExceptT ErrorType IO a
withPeekMsgQueue s
st StoreQueue s
q Text
op Maybe (MsgQueue s, Message) -> StoreMonad s a
a = s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
forall a.
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
forall s a.
MsgStoreClass s =>
s
-> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
isolateQueue s
st StoreQueue s
q Text
op (StoreMonad s a -> ExceptT ErrorType IO a)
-> StoreMonad s a -> ExceptT ErrorType IO a
forall a b. (a -> b) -> a -> b
$ s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
forall s.
MsgStoreClass s =>
s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
getPeekMsgQueue s
st StoreQueue s
q StoreMonad s (Maybe (MsgQueue s, Message))
-> (Maybe (MsgQueue s, Message) -> StoreMonad s a)
-> StoreMonad s a
forall a b.
StoreMonad s a -> (a -> StoreMonad s b) -> StoreMonad s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe (MsgQueue s, Message) -> StoreMonad s a
a
{-# INLINE withPeekMsgQueue #-}

-- not used with PostgreSQL message store
expireQueueMsgs :: MsgStoreClass s => s -> Int64 -> Int64 -> StoreQueue s -> StoreMonad s MessageStats
expireQueueMsgs :: forall s.
MsgStoreClass s =>
s -> Int64 -> Int64 -> StoreQueue s -> StoreMonad s MessageStats
expireQueueMsgs s
st Int64
now Int64
old StoreQueue s
q = do
  (Maybe Int
expired_, Int
stored) <- Int64
-> s
-> StoreQueue s
-> (MsgQueue s -> StoreMonad s Int)
-> StoreMonad s (Maybe Int, Int)
forall a.
Int64
-> s
-> StoreQueue s
-> (MsgQueue s -> StoreMonad s a)
-> StoreMonad s (Maybe a, Int)
forall s a.
MsgStoreClass s =>
Int64
-> s
-> StoreQueue s
-> (MsgQueue s -> StoreMonad s a)
-> StoreMonad s (Maybe a, Int)
withIdleMsgQueue Int64
now s
st StoreQueue s
q ((MsgQueue s -> StoreMonad s Int) -> StoreMonad s (Maybe Int, Int))
-> (MsgQueue s -> StoreMonad s Int)
-> StoreMonad s (Maybe Int, Int)
forall a b. (a -> b) -> a -> b
$ Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
forall s.
MsgStoreClass s =>
Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ Int64
old StoreQueue s
q
  MessageStats -> StoreMonad s MessageStats
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MessageStats {$sel:storedMsgsCount:MessageStats :: Int
storedMsgsCount = Int
stored, $sel:expiredMsgsCount:MessageStats :: Int
expiredMsgsCount = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
0 Maybe Int
expired_, $sel:storedQueues:MessageStats :: Int
storedQueues = Int
1}

-- not used with PostgreSQL message store
deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ :: forall s.
MsgStoreClass s =>
Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ Int64
old StoreQueue s
q MsgQueue s
mq = do
  Int
n <- Int -> StoreMonad s Int
loop Int
0
  StoreQueue s -> StoreMonad s ()
forall s. MsgStoreClass s => StoreQueue s -> StoreMonad s ()
logQueueState StoreQueue s
q
  Int -> StoreMonad s Int
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
n
  where
    loop :: Int -> StoreMonad s Int
loop Int
dc =
      StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryPeekMsg_ StoreQueue s
q MsgQueue s
mq StoreMonad s (Maybe Message)
-> (Maybe Message -> StoreMonad s Int) -> StoreMonad s Int
forall a b.
StoreMonad s a -> (a -> StoreMonad s b) -> StoreMonad s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just Message {SystemTime
msgTs :: SystemTime
$sel:msgTs:Message :: Message -> SystemTime
msgTs}
          | SystemTime -> Int64
systemSeconds SystemTime
msgTs Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
old ->
              StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
forall s.
MsgStoreClass s =>
StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
tryDeleteMsg_ StoreQueue s
q MsgQueue s
mq Bool
False StoreMonad s () -> StoreMonad s Int -> StoreMonad s Int
forall a b. StoreMonad s a -> StoreMonad s b -> StoreMonad s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> StoreMonad s Int
loop (Int
dc Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        Maybe Message
_ -> Int -> StoreMonad s Int
forall a. a -> StoreMonad s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
dc