{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
module Simplex.Messaging.Server.NtfStore
( NtfStore (..),
MsgNtf (..),
NtfLogRecord (..),
storeNtf,
deleteNtfs,
deleteExpiredNtfs,
) where
import Control.Concurrent.STM
import Control.Monad (foldM)
import Data.Int (Int64)
import qualified Data.Map.Strict as M
import Data.Time.Clock.System (SystemTime (..))
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (EncNMsgMeta, MsgId, NotifierId)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
newtype NtfStore = NtfStore (TMap NotifierId (TVar [MsgNtf]))
data MsgNtf = MsgNtf
{ MsgNtf -> MsgId
ntfMsgId :: MsgId,
MsgNtf -> SystemTime
ntfTs :: SystemTime,
MsgNtf -> CbNonce
ntfNonce :: C.CbNonce,
MsgNtf -> MsgId
ntfEncMeta :: EncNMsgMeta
}
storeNtf :: NtfStore -> NotifierId -> MsgNtf -> IO ()
storeNtf :: NtfStore -> NotifierId -> MsgNtf -> IO ()
storeNtf (NtfStore TMap NotifierId (TVar [MsgNtf])
ns) NotifierId
nId MsgNtf
ntf = do
NotifierId
-> TMap NotifierId (TVar [MsgNtf]) -> IO (Maybe (TVar [MsgNtf]))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO NotifierId
nId TMap NotifierId (TVar [MsgNtf])
ns IO (Maybe (TVar [MsgNtf]))
-> (Maybe (TVar [MsgNtf]) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Maybe (TVar [MsgNtf]) -> STM ())
-> Maybe (TVar [MsgNtf])
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM ()
-> (TVar [MsgNtf] -> STM ()) -> Maybe (TVar [MsgNtf]) -> STM ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe STM ()
newNtfs (TVar [MsgNtf] -> ([MsgNtf] -> [MsgNtf]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
`modifyTVar'` (MsgNtf
ntf MsgNtf -> [MsgNtf] -> [MsgNtf]
forall a. a -> [a] -> [a]
:))
where
newNtfs :: STM ()
newNtfs = NotifierId
-> TMap NotifierId (TVar [MsgNtf]) -> STM (Maybe (TVar [MsgNtf]))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup NotifierId
nId TMap NotifierId (TVar [MsgNtf])
ns STM (Maybe (TVar [MsgNtf]))
-> (Maybe (TVar [MsgNtf]) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM ()
-> (TVar [MsgNtf] -> STM ()) -> Maybe (TVar [MsgNtf]) -> STM ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (NotifierId
-> STM (TVar [MsgNtf]) -> TMap NotifierId (TVar [MsgNtf]) -> STM ()
forall k a. Ord k => k -> STM a -> TMap k a -> STM ()
TM.insertM NotifierId
nId ([MsgNtf] -> STM (TVar [MsgNtf])
forall a. a -> STM (TVar a)
newTVar [MsgNtf
ntf]) TMap NotifierId (TVar [MsgNtf])
ns) (TVar [MsgNtf] -> ([MsgNtf] -> [MsgNtf]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
`modifyTVar'` (MsgNtf
ntf MsgNtf -> [MsgNtf] -> [MsgNtf]
forall a. a -> [a] -> [a]
:))
deleteNtfs :: NtfStore -> NotifierId -> IO Int
deleteNtfs :: NtfStore -> NotifierId -> IO Int
deleteNtfs (NtfStore TMap NotifierId (TVar [MsgNtf])
ns) NotifierId
nId = STM (Maybe (TVar [MsgNtf])) -> IO (Maybe (TVar [MsgNtf]))
forall a. STM a -> IO a
atomically (NotifierId
-> TMap NotifierId (TVar [MsgNtf]) -> STM (Maybe (TVar [MsgNtf]))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookupDelete NotifierId
nId TMap NotifierId (TVar [MsgNtf])
ns) IO (Maybe (TVar [MsgNtf]))
-> (Maybe (TVar [MsgNtf]) -> IO Int) -> IO Int
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO Int
-> (TVar [MsgNtf] -> IO Int) -> Maybe (TVar [MsgNtf]) -> IO Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0) (([MsgNtf] -> Int) -> IO [MsgNtf] -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [MsgNtf] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (IO [MsgNtf] -> IO Int)
-> (TVar [MsgNtf] -> IO [MsgNtf]) -> TVar [MsgNtf] -> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar [MsgNtf] -> IO [MsgNtf]
forall a. TVar a -> IO a
readTVarIO)
deleteExpiredNtfs :: NtfStore -> Int64 -> IO Int
deleteExpiredNtfs :: NtfStore -> Int64 -> IO Int
deleteExpiredNtfs (NtfStore TMap NotifierId (TVar [MsgNtf])
ns) Int64
old =
(Int -> NotifierId -> IO Int) -> Int -> [NotifierId] -> IO Int
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (\Int
expired -> (Int -> Int) -> IO Int -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int
expired Int -> Int -> Int
forall a. Num a => a -> a -> a
+) (IO Int -> IO Int)
-> (NotifierId -> IO Int) -> NotifierId -> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NotifierId -> IO Int
expireQueue) Int
0 ([NotifierId] -> IO Int)
-> (Map NotifierId (TVar [MsgNtf]) -> [NotifierId])
-> Map NotifierId (TVar [MsgNtf])
-> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map NotifierId (TVar [MsgNtf]) -> [NotifierId]
forall k a. Map k a -> [k]
M.keys (Map NotifierId (TVar [MsgNtf]) -> IO Int)
-> IO (Map NotifierId (TVar [MsgNtf])) -> IO Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TMap NotifierId (TVar [MsgNtf])
-> IO (Map NotifierId (TVar [MsgNtf]))
forall a. TVar a -> IO a
readTVarIO TMap NotifierId (TVar [MsgNtf])
ns
where
expireQueue :: NotifierId -> IO Int
expireQueue NotifierId
nId = NotifierId
-> TMap NotifierId (TVar [MsgNtf]) -> IO (Maybe (TVar [MsgNtf]))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO NotifierId
nId TMap NotifierId (TVar [MsgNtf])
ns IO (Maybe (TVar [MsgNtf]))
-> (Maybe (TVar [MsgNtf]) -> IO Int) -> IO Int
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO Int
-> (TVar [MsgNtf] -> IO Int) -> Maybe (TVar [MsgNtf]) -> IO Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0) TVar [MsgNtf] -> IO Int
expire
expire :: TVar [MsgNtf] -> IO Int
expire TVar [MsgNtf]
v = TVar [MsgNtf] -> IO [MsgNtf]
forall a. TVar a -> IO a
readTVarIO TVar [MsgNtf]
v IO [MsgNtf] -> ([MsgNtf] -> IO Int) -> IO Int
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
[] -> Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0
[MsgNtf]
_ ->
STM Int -> IO Int
forall a. STM a -> IO a
atomically (STM Int -> IO Int) -> STM Int -> IO Int
forall a b. (a -> b) -> a -> b
$ TVar [MsgNtf] -> STM [MsgNtf]
forall a. TVar a -> STM a
readTVar TVar [MsgNtf]
v STM [MsgNtf] -> ([MsgNtf] -> STM Int) -> STM Int
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
[] -> Int -> STM Int
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0
[MsgNtf]
ntfs | SystemTime -> Int64
systemSeconds (MsgNtf -> SystemTime
ntfTs (MsgNtf -> SystemTime) -> MsgNtf -> SystemTime
forall a b. (a -> b) -> a -> b
$ [MsgNtf] -> MsgNtf
forall a. HasCallStack => [a] -> a
last ([MsgNtf] -> MsgNtf) -> [MsgNtf] -> MsgNtf
forall a b. (a -> b) -> a -> b
$ [MsgNtf]
ntfs) Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
old -> do
let !ntfs' :: [MsgNtf]
ntfs' = (MsgNtf -> Bool) -> [MsgNtf] -> [MsgNtf]
forall a. (a -> Bool) -> [a] -> [a]
filter (\MsgNtf {ntfTs :: MsgNtf -> SystemTime
ntfTs = SystemTime
ts} -> SystemTime -> Int64
systemSeconds SystemTime
ts Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int64
old) [MsgNtf]
ntfs
TVar [MsgNtf] -> [MsgNtf] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [MsgNtf]
v [MsgNtf]
ntfs'
Int -> STM Int
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$! [MsgNtf] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [MsgNtf]
ntfs Int -> Int -> Int
forall a. Num a => a -> a -> a
- [MsgNtf] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [MsgNtf]
ntfs'
[MsgNtf]
_ -> Int -> STM Int
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0
data NtfLogRecord = NLRv1 NotifierId MsgNtf
instance StrEncoding MsgNtf where
strEncode :: MsgNtf -> MsgId
strEncode MsgNtf {MsgId
ntfMsgId :: MsgNtf -> MsgId
ntfMsgId :: MsgId
ntfMsgId, SystemTime
ntfTs :: MsgNtf -> SystemTime
ntfTs :: SystemTime
ntfTs, CbNonce
ntfNonce :: MsgNtf -> CbNonce
ntfNonce :: CbNonce
ntfNonce, MsgId
ntfEncMeta :: MsgNtf -> MsgId
ntfEncMeta :: MsgId
ntfEncMeta} = (MsgId, SystemTime, CbNonce, MsgId) -> MsgId
forall a. StrEncoding a => a -> MsgId
strEncode (MsgId
ntfMsgId, SystemTime
ntfTs, CbNonce
ntfNonce, MsgId
ntfEncMeta)
strP :: Parser MsgNtf
strP = do
(MsgId
ntfMsgId, SystemTime
ntfTs, CbNonce
ntfNonce, MsgId
ntfEncMeta) <- Parser (MsgId, SystemTime, CbNonce, MsgId)
forall a. StrEncoding a => Parser a
strP
MsgNtf -> Parser MsgNtf
forall a. a -> Parser MsgId a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MsgNtf {MsgId
ntfMsgId :: MsgId
ntfMsgId :: MsgId
ntfMsgId, SystemTime
ntfTs :: SystemTime
ntfTs :: SystemTime
ntfTs, CbNonce
ntfNonce :: CbNonce
ntfNonce :: CbNonce
ntfNonce, MsgId
ntfEncMeta :: MsgId
ntfEncMeta :: MsgId
ntfEncMeta}
instance StrEncoding NtfLogRecord where
strEncode :: NtfLogRecord -> MsgId
strEncode (NLRv1 NotifierId
nId MsgNtf
ntf) = (Str, NotifierId, MsgNtf) -> MsgId
forall a. StrEncoding a => a -> MsgId
strEncode (MsgId -> Str
Str MsgId
"v1", NotifierId
nId, MsgNtf
ntf)
strP :: Parser NtfLogRecord
strP = Parser MsgId MsgId
"v1 " Parser MsgId MsgId -> Parser NtfLogRecord -> Parser NtfLogRecord
forall a b. Parser MsgId a -> Parser MsgId b -> Parser MsgId b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> (NotifierId -> MsgNtf -> NtfLogRecord
NLRv1 (NotifierId -> MsgNtf -> NtfLogRecord)
-> Parser MsgId NotifierId -> Parser MsgId (MsgNtf -> NtfLogRecord)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser MsgId NotifierId
forall a. StrEncoding a => Parser a
strP_ Parser MsgId (MsgNtf -> NtfLogRecord)
-> Parser MsgNtf -> Parser NtfLogRecord
forall a b.
Parser MsgId (a -> b) -> Parser MsgId a -> Parser MsgId b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser MsgNtf
forall a. StrEncoding a => Parser a
strP)