{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
module Simplex.Messaging.Agent.NtfSubSupervisor
( runNtfSupervisor,
nsUpdateToken,
nsRemoveNtfToken,
sendNtfSubCommand,
hasInstantNotifications,
instantNotifications,
deleteToken,
closeNtfSupervisor,
getNtfServer,
)
where
import Control.Logger.Simple (logError, logInfo)
import Control.Monad
import Control.Monad.Reader
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import Data.Bifunctor (first)
import Data.Either (fromRight, partitionEithers)
import Data.Functor (($>))
import Data.List (foldl')
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Time (UTCTime, addUTCTime, getCurrentTime)
import Data.Time.Clock (diffUTCTime)
import Simplex.Messaging.Agent.Client
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.AgentStore
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Client (NetworkRequestMode (..), nonBlockingWriteTBQueue)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Protocol (NtfServer, sameSrvAddr)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Util (catchAllErrors, catchAllErrors', diffToMicroseconds, threadDelay', tryAllErrors, tshow, whenM)
import System.Random (randomR)
import UnliftIO
import UnliftIO.Concurrent (forkIO)
import qualified UnliftIO.Exception as E
runNtfSupervisor :: AgentClient -> AM' ()
runNtfSupervisor :: AgentClient -> AM' ()
runNtfSupervisor AgentClient
c = do
NtfSupervisor
ns <- (Env -> NtfSupervisor) -> ReaderT Env IO NtfSupervisor
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> NtfSupervisor
ntfSupervisor
ExceptT AgentErrorType (ReaderT Env IO) ()
-> ReaderT Env IO (Either AgentErrorType ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT AgentErrorType (ReaderT Env IO) ()
startTknDelete ReaderT Env IO (Either AgentErrorType ())
-> (Either AgentErrorType () -> AM' ()) -> AM' ()
forall a b.
ReaderT Env IO a -> (a -> ReaderT Env IO b) -> ReaderT Env IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left AgentErrorType
e -> AgentErrorType -> AM' ()
forall {m :: * -> *} {a}. (MonadIO m, Show a) => a -> m ()
notifyErr AgentErrorType
e
Right ()
_ -> () -> AM' ()
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
AM' () -> AM' ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (AM' () -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$ do
(NtfSupervisorCommand, NonEmpty ConnId)
cmd <- STM (NtfSupervisorCommand, NonEmpty ConnId)
-> ReaderT Env IO (NtfSupervisorCommand, NonEmpty ConnId)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (NtfSupervisorCommand, NonEmpty ConnId)
-> ReaderT Env IO (NtfSupervisorCommand, NonEmpty ConnId))
-> (TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> STM (NtfSupervisorCommand, NonEmpty ConnId))
-> TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> ReaderT Env IO (NtfSupervisorCommand, NonEmpty ConnId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> STM (NtfSupervisorCommand, NonEmpty ConnId)
forall a. TBQueue a -> STM a
readTBQueue (TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> ReaderT Env IO (NtfSupervisorCommand, NonEmpty ConnId))
-> TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> ReaderT Env IO (NtfSupervisorCommand, NonEmpty ConnId)
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
ntfSubQ NtfSupervisor
ns
AM' () -> AM' ()
handleErr (AM' () -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> AgentOperation -> (AgentClient -> IO ()) -> AM' () -> AM' ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
AgentClient
-> AgentOperation -> (AgentClient -> IO ()) -> m a -> m a
agentOperationBracket AgentClient
c AgentOperation
AONtfNetwork AgentClient -> IO ()
waitUntilActive (AM' () -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$
AgentClient
-> (NtfSupervisorCommand, NonEmpty ConnId)
-> ExceptT AgentErrorType (ReaderT Env IO) ()
processNtfCmd AgentClient
c (NtfSupervisorCommand, NonEmpty ConnId)
cmd ExceptT AgentErrorType (ReaderT Env IO) ()
-> (AgentErrorType -> AM' ()) -> AM' ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> m a) -> m a
`catchAllErrors'` AgentErrorType -> AM' ()
forall {m :: * -> *} {a}. (MonadIO m, Show a) => a -> m ()
notifyErr
where
startTknDelete :: AM ()
startTknDelete :: ExceptT AgentErrorType (ReaderT Env IO) ()
startTknDelete = do
[NtfServer]
pendingDelServers <- AgentClient -> (Connection -> IO [NtfServer]) -> AM [NtfServer]
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c Connection -> IO [NtfServer]
getPendingDelTknServers
AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ((NtfServer -> AM' Worker) -> AM' ())
-> (NtfServer -> AM' Worker)
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [NtfServer] -> (NtfServer -> AM' Worker) -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [NtfServer]
pendingDelServers ((NtfServer -> AM' Worker)
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NtfServer -> AM' Worker)
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfTknDelWorker Bool
True AgentClient
c
handleErr :: AM' () -> AM' ()
handleErr :: AM' () -> AM' ()
handleErr = (SomeException -> AM' ()) -> AM' () -> AM' ()
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
(e -> m a) -> m a -> m a
E.handle ((SomeException -> AM' ()) -> AM' () -> AM' ())
-> (SomeException -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
e :: E.SomeException) -> SomeException -> AM' ()
forall {m :: * -> *} {a}. (MonadIO m, Show a) => a -> m ()
notifyErr SomeException
e
notifyErr :: a -> m ()
notifyErr a
e = AgentClient -> String -> m ()
forall (m :: * -> *). MonadIO m => AgentClient -> String -> m ()
notifyInternalError' AgentClient
c (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ String
"runNtfSupervisor error " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
e
partitionErrs :: (a -> ConnId) -> [a] -> [Either AgentErrorType b] -> ([(ConnId, AgentErrorType)], [b])
partitionErrs :: forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs a -> ConnId
f [a]
xs = [Either (ConnId, AgentErrorType) b]
-> ([(ConnId, AgentErrorType)], [b])
forall a b. [Either a b] -> ([a], [b])
partitionEithers ([Either (ConnId, AgentErrorType) b]
-> ([(ConnId, AgentErrorType)], [b]))
-> ([Either AgentErrorType b]
-> [Either (ConnId, AgentErrorType) b])
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Either AgentErrorType b -> Either (ConnId, AgentErrorType) b)
-> [a]
-> [Either AgentErrorType b]
-> [Either (ConnId, AgentErrorType) b]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith (\a
x -> (AgentErrorType -> (ConnId, AgentErrorType))
-> Either AgentErrorType b -> Either (ConnId, AgentErrorType) b
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (a -> ConnId
f a
x,)) [a]
xs
{-# INLINE partitionErrs #-}
ntfSubConnId :: NtfSubscription -> ConnId
ntfSubConnId :: NtfSubscription -> ConnId
ntfSubConnId NtfSubscription {ConnId
connId :: ConnId
$sel:connId:NtfSubscription :: NtfSubscription -> ConnId
connId} = ConnId
connId
processNtfCmd :: AgentClient -> (NtfSupervisorCommand, NonEmpty ConnId) -> AM ()
processNtfCmd :: AgentClient
-> (NtfSupervisorCommand, NonEmpty ConnId)
-> ExceptT AgentErrorType (ReaderT Env IO) ()
processNtfCmd AgentClient
c (NtfSupervisorCommand
cmd, NonEmpty ConnId
connIds) = do
Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"processNtfCmd - cmd = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> NtfSupervisorCommand -> Text
forall a. Show a => a -> Text
tshow NtfSupervisorCommand
cmd
let connIds' :: [ConnId]
connIds' = NonEmpty ConnId -> [ConnId]
forall a. NonEmpty a -> [a]
L.toList NonEmpty ConnId
connIds
case NtfSupervisorCommand
cmd of
NtfSupervisorCommand
NSCCreate -> do
([(ConnId, AgentErrorType)]
cErrs, [(RcvQueue, Maybe NtfSupervisorSub)]
rqSubActions) <- ReaderT
Env
IO
([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
-> ExceptT
AgentErrorType
(ReaderT Env IO)
([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT
Env
IO
([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
-> ExceptT
AgentErrorType
(ReaderT Env IO)
([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)]))
-> ReaderT
Env
IO
([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
-> ExceptT
AgentErrorType
(ReaderT Env IO)
([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
forall a b. (a -> b) -> a -> b
$ (ConnId -> ConnId)
-> [ConnId]
-> [Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)]
-> ([(ConnId, AgentErrorType)],
[(RcvQueue, Maybe NtfSupervisorSub)])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs ConnId -> ConnId
forall a. a -> a
id [ConnId]
connIds' ([Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)]
-> ([(ConnId, AgentErrorType)],
[(RcvQueue, Maybe NtfSupervisorSub)]))
-> ReaderT
Env IO [Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)]
-> ReaderT
Env
IO
([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection
-> [IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))])
-> ReaderT
Env IO [Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (ConnId
-> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)))
-> [ConnId]
-> [IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> ConnId
-> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
getQueueSub Connection
db) [ConnId]
connIds')
AgentClient
-> [(ConnId, AgentErrorType)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c [(ConnId, AgentErrorType)]
cErrs
Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"processNtfCmd, NSCCreate - length rqSubs = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow ([(RcvQueue, Maybe NtfSupervisorSub)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(RcvQueue, Maybe NtfSupervisorSub)]
rqSubActions)
let ([RcvQueue]
ns, [(RcvQueue, NtfSubscription)]
rs, [SMPServer]
css, [NtfServer]
cns) = [(RcvQueue, Maybe NtfSupervisorSub)]
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
[NtfServer])
partitionQueueSubActions [(RcvQueue, Maybe NtfSupervisorSub)]
rqSubActions
[RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
createNewSubs [RcvQueue]
ns
[(RcvQueue, NtfSubscription)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
resetSubs [(RcvQueue, NtfSubscription)]
rs
AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ do
(SMPServer -> AM' Worker) -> Set SMPServer -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker Bool
True AgentClient
c) ([SMPServer] -> Set SMPServer
forall a. Ord a => [a] -> Set a
S.fromList [SMPServer]
css)
(NtfServer -> AM' Worker) -> Set NtfServer -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfNTFWorker Bool
True AgentClient
c) ([NtfServer] -> Set NtfServer
forall a. Ord a => [a] -> Set a
S.fromList [NtfServer]
cns)
where
getQueueSub ::
DB.Connection ->
ConnId ->
IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
getQueueSub :: Connection
-> ConnId
-> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
getQueueSub Connection
db ConnId
connId = (Either StoreError (RcvQueue, Maybe NtfSupervisorSub)
-> Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
-> IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub))
-> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((StoreError -> AgentErrorType)
-> Either StoreError (RcvQueue, Maybe NtfSupervisorSub)
-> Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError) (IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub))
-> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)))
-> IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub))
-> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
forall a b. (a -> b) -> a -> b
$ ExceptT StoreError IO (RcvQueue, Maybe NtfSupervisorSub)
-> IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT StoreError IO (RcvQueue, Maybe NtfSupervisorSub)
-> IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub)))
-> ExceptT StoreError IO (RcvQueue, Maybe NtfSupervisorSub)
-> IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub))
forall a b. (a -> b) -> a -> b
$ do
RcvQueue
rq <- IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue)
-> IO (Either StoreError RcvQueue)
-> ExceptT StoreError IO RcvQueue
forall a b. (a -> b) -> a -> b
$ Connection -> ConnId -> IO (Either StoreError RcvQueue)
getPrimaryRcvQueue Connection
db ConnId
connId
Maybe NtfSupervisorSub
sub <- IO (Maybe NtfSupervisorSub)
-> ExceptT StoreError IO (Maybe NtfSupervisorSub)
forall a. IO a -> ExceptT StoreError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe NtfSupervisorSub)
-> ExceptT StoreError IO (Maybe NtfSupervisorSub))
-> IO (Maybe NtfSupervisorSub)
-> ExceptT StoreError IO (Maybe NtfSupervisorSub)
forall a b. (a -> b) -> a -> b
$ Connection -> ConnId -> IO (Maybe NtfSupervisorSub)
getNtfSubscription Connection
db ConnId
connId
(RcvQueue, Maybe NtfSupervisorSub)
-> ExceptT StoreError IO (RcvQueue, Maybe NtfSupervisorSub)
forall a. a -> ExceptT StoreError IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RcvQueue
rq, Maybe NtfSupervisorSub
sub)
createNewSubs :: [RcvQueue] -> AM ()
createNewSubs :: [RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
createNewSubs [RcvQueue]
rqs = do
(NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
withTokenServer ((NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \NtfServer
ntfServer -> do
let newSubs :: [NtfSubscription]
newSubs = (RcvQueue -> NtfSubscription) -> [RcvQueue] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map (NtfServer -> RcvQueue -> NtfSubscription
rqToNewSub NtfServer
ntfServer) [RcvQueue]
rqs
([(ConnId, AgentErrorType)]
cErrs, [()]
_) <- ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall a b. (a -> b) -> a -> b
$ (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType ()]
-> ([(ConnId, AgentErrorType)], [()])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
newSubs ([Either AgentErrorType ()] -> ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO [Either AgentErrorType ()]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType ())])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (NtfSubscription -> IO (Either AgentErrorType ()))
-> [NtfSubscription] -> [IO (Either AgentErrorType ())]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> NtfSubscription -> IO (Either AgentErrorType ())
storeNewSub Connection
db) [NtfSubscription]
newSubs)
AgentClient
-> [(ConnId, AgentErrorType)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c [(ConnId, AgentErrorType)]
cErrs
[RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
kickSMPWorkers [RcvQueue]
rqs
where
rqToNewSub :: NtfServer -> RcvQueue -> NtfSubscription
rqToNewSub :: NtfServer -> RcvQueue -> NtfSubscription
rqToNewSub NtfServer
ntfServer RcvQueue {Int64
userId :: Int64
$sel:userId:RcvQueue :: forall (q :: DBStored). StoredRcvQueue q -> Int64
userId, ConnId
connId :: ConnId
$sel:connId:RcvQueue :: forall (q :: DBStored). StoredRcvQueue q -> ConnId
connId, SMPServer
server :: SMPServer
$sel:server:RcvQueue :: forall (q :: DBStored). StoredRcvQueue q -> SMPServer
server} = Int64
-> ConnId
-> SMPServer
-> Maybe NtfSubscriptionId
-> NtfServer
-> NtfAgentSubStatus
-> NtfSubscription
newNtfSubscription Int64
userId ConnId
connId SMPServer
server Maybe NtfSubscriptionId
forall a. Maybe a
Nothing NtfServer
ntfServer NtfAgentSubStatus
NASNew
storeNewSub :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType ())
storeNewSub :: Connection -> NtfSubscription -> IO (Either AgentErrorType ())
storeNewSub Connection
db NtfSubscription
sub = (StoreError -> AgentErrorType)
-> Either StoreError () -> Either AgentErrorType ()
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError (Either StoreError () -> Either AgentErrorType ())
-> IO (Either StoreError ()) -> IO (Either AgentErrorType ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection
-> NtfSubscription -> NtfSubAction -> IO (Either StoreError ())
createNtfSubscription Connection
db NtfSubscription
sub (NtfSubSMPAction -> NtfSubAction
NSASMP NtfSubSMPAction
NSASmpKey)
resetSubs :: [(RcvQueue, NtfSubscription)] -> AM ()
resetSubs :: [(RcvQueue, NtfSubscription)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
resetSubs [(RcvQueue, NtfSubscription)]
rqSubs = do
(NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
withTokenServer ((NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \NtfServer
ntfServer -> do
let subsToReset :: [NtfSubscription]
subsToReset = ((RcvQueue, NtfSubscription) -> NtfSubscription)
-> [(RcvQueue, NtfSubscription)] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map (NtfServer -> (RcvQueue, NtfSubscription) -> NtfSubscription
toResetSub NtfServer
ntfServer) [(RcvQueue, NtfSubscription)]
rqSubs
([(ConnId, AgentErrorType)]
cErrs, [()]
_) <- ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall a b. (a -> b) -> a -> b
$ (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType ()]
-> ([(ConnId, AgentErrorType)], [()])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
subsToReset ([Either AgentErrorType ()] -> ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO [Either AgentErrorType ()]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> (NtfSubscription -> IO ()) -> [NtfSubscription] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> NtfSubscription -> IO ()
storeResetSub Connection
db) [NtfSubscription]
subsToReset)
AgentClient
-> [(ConnId, AgentErrorType)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c [(ConnId, AgentErrorType)]
cErrs
let rqs :: [RcvQueue]
rqs = ((RcvQueue, NtfSubscription) -> RcvQueue)
-> [(RcvQueue, NtfSubscription)] -> [RcvQueue]
forall a b. (a -> b) -> [a] -> [b]
map (RcvQueue, NtfSubscription) -> RcvQueue
forall a b. (a, b) -> a
fst [(RcvQueue, NtfSubscription)]
rqSubs
[RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
kickSMPWorkers [RcvQueue]
rqs
where
toResetSub :: NtfServer -> (RcvQueue, NtfSubscription) -> NtfSubscription
toResetSub :: NtfServer -> (RcvQueue, NtfSubscription) -> NtfSubscription
toResetSub NtfServer
ntfServer (RcvQueue
rq, NtfSubscription
sub) =
let RcvQueue {$sel:server:RcvQueue :: forall (q :: DBStored). StoredRcvQueue q -> SMPServer
server = SMPServer
smpServer} = RcvQueue
rq
in NtfSubscription
sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
storeResetSub :: DB.Connection -> NtfSubscription -> IO ()
storeResetSub :: Connection -> NtfSubscription -> IO ()
storeResetSub Connection
db NtfSubscription
sub = Connection -> NtfSubscription -> NtfSubAction -> IO ()
supervisorUpdateNtfSub Connection
db NtfSubscription
sub (NtfSubSMPAction -> NtfSubAction
NSASMP NtfSubSMPAction
NSASmpKey)
partitionQueueSubActions ::
[(RcvQueue, Maybe NtfSupervisorSub)] ->
( [RcvQueue],
[(RcvQueue, NtfSubscription)],
[SMPServer],
[NtfServer]
)
partitionQueueSubActions :: [(RcvQueue, Maybe NtfSupervisorSub)]
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
[NtfServer])
partitionQueueSubActions = ((RcvQueue, Maybe NtfSupervisorSub)
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
[NtfServer])
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
[NtfServer]))
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
[NtfServer])
-> [(RcvQueue, Maybe NtfSupervisorSub)]
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
[NtfServer])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (RcvQueue, Maybe NtfSupervisorSub)
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
[NtfServer])
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
[NtfServer])
forall {q :: DBStored} {b}.
SMPQueue (StoredRcvQueue q) =>
(StoredRcvQueue q,
Maybe (NtfSubscription, Maybe (NtfSubAction, b)))
-> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
-> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
decideSubWork ([], [], [], [])
where
decideSubWork :: (StoredRcvQueue q,
Maybe (NtfSubscription, Maybe (NtfSubAction, b)))
-> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
-> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
decideSubWork (StoredRcvQueue q
rq, Maybe (NtfSubscription, Maybe (NtfSubAction, b))
Nothing) ([StoredRcvQueue q]
ns, [(StoredRcvQueue q, NtfSubscription)]
rs, [SMPServer]
css, [NtfServer]
cns) = (StoredRcvQueue q
rq StoredRcvQueue q -> [StoredRcvQueue q] -> [StoredRcvQueue q]
forall a. a -> [a] -> [a]
: [StoredRcvQueue q]
ns, [(StoredRcvQueue q, NtfSubscription)]
rs, [SMPServer]
css, [NtfServer]
cns)
decideSubWork (StoredRcvQueue q
rq, Just (NtfSubscription
sub, Maybe (NtfSubAction, b)
subAction_)) ([StoredRcvQueue q]
ns, [(StoredRcvQueue q, NtfSubscription)]
rs, [SMPServer]
css, [NtfServer]
cns) =
case (StoredRcvQueue q -> Maybe ClientNtfCreds
forall (q :: DBStored). StoredRcvQueue q -> Maybe ClientNtfCreds
clientNtfCreds StoredRcvQueue q
rq, NtfSubscription -> Maybe NtfSubscriptionId
ntfQueueId NtfSubscription
sub) of
(Just ClientNtfCreds {NtfSubscriptionId
notifierId :: NtfSubscriptionId
$sel:notifierId:ClientNtfCreds :: ClientNtfCreds -> NtfSubscriptionId
notifierId}, Just NtfSubscriptionId
ntfQueueId')
| SMPServer -> SMPServer -> Bool
forall (p :: ProtocolType).
ProtocolServer p -> ProtocolServer p -> Bool
sameSrvAddr (StoredRcvQueue q -> SMPServer
forall q. SMPQueue q => q -> SMPServer
qServer StoredRcvQueue q
rq) SMPServer
subSMPServer Bool -> Bool -> Bool
&& NtfSubscriptionId
notifierId NtfSubscriptionId -> NtfSubscriptionId -> Bool
forall a. Eq a => a -> a -> Bool
== NtfSubscriptionId
ntfQueueId' -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
contOrReset
| Bool
otherwise -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
reset
(Maybe ClientNtfCreds
Nothing, Maybe NtfSubscriptionId
Nothing) -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
contOrReset
(Maybe ClientNtfCreds, Maybe NtfSubscriptionId)
_ -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
reset
where
NtfSubscription {$sel:ntfServer:NtfSubscription :: NtfSubscription -> NtfServer
ntfServer = NtfServer
subNtfServer, $sel:smpServer:NtfSubscription :: NtfSubscription -> SMPServer
smpServer = SMPServer
subSMPServer} = NtfSubscription
sub
contOrReset :: ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
contOrReset = case Maybe (NtfSubAction, b)
subAction_ of
Maybe (NtfSubAction, b)
Nothing -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
reset
Just (NtfSubAction
action, b
_)
| NtfSubAction -> Bool
isDeleteNtfSubAction NtfSubAction
action -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
reset
| Bool
otherwise -> case NtfSubAction
action of
NSASMP NtfSubSMPAction
_ -> ([StoredRcvQueue q]
ns, [(StoredRcvQueue q, NtfSubscription)]
rs, StoredRcvQueue q -> SMPServer
forall q. SMPQueue q => q -> SMPServer
qServer StoredRcvQueue q
rq SMPServer -> [SMPServer] -> [SMPServer]
forall a. a -> [a] -> [a]
: [SMPServer]
css, [NtfServer]
cns)
NSANtf NtfSubNTFAction
_ -> ([StoredRcvQueue q]
ns, [(StoredRcvQueue q, NtfSubscription)]
rs, [SMPServer]
css, NtfServer
subNtfServer NtfServer -> [NtfServer] -> [NtfServer]
forall a. a -> [a] -> [a]
: [NtfServer]
cns)
reset :: ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
[SMPServer], [NtfServer])
reset = ([StoredRcvQueue q]
ns, (StoredRcvQueue q
rq, NtfSubscription
sub) (StoredRcvQueue q, NtfSubscription)
-> [(StoredRcvQueue q, NtfSubscription)]
-> [(StoredRcvQueue q, NtfSubscription)]
forall a. a -> [a] -> [a]
: [(StoredRcvQueue q, NtfSubscription)]
rs, [SMPServer]
css, [NtfServer]
cns)
NtfSupervisorCommand
NSCSmpDelete -> do
([(ConnId, AgentErrorType)]
cErrs, [RcvQueue]
rqs) <- ReaderT Env IO ([(ConnId, AgentErrorType)], [RcvQueue])
-> ExceptT
AgentErrorType
(ReaderT Env IO)
([(ConnId, AgentErrorType)], [RcvQueue])
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO ([(ConnId, AgentErrorType)], [RcvQueue])
-> ExceptT
AgentErrorType
(ReaderT Env IO)
([(ConnId, AgentErrorType)], [RcvQueue]))
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [RcvQueue])
-> ExceptT
AgentErrorType
(ReaderT Env IO)
([(ConnId, AgentErrorType)], [RcvQueue])
forall a b. (a -> b) -> a -> b
$ (ConnId -> ConnId)
-> [ConnId]
-> [Either AgentErrorType RcvQueue]
-> ([(ConnId, AgentErrorType)], [RcvQueue])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs ConnId -> ConnId
forall a. a -> a
id [ConnId]
connIds' ([Either AgentErrorType RcvQueue]
-> ([(ConnId, AgentErrorType)], [RcvQueue]))
-> ReaderT Env IO [Either AgentErrorType RcvQueue]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [RcvQueue])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType RcvQueue)])
-> ReaderT Env IO [Either AgentErrorType RcvQueue]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (ConnId -> IO (Either AgentErrorType RcvQueue))
-> [ConnId] -> [IO (Either AgentErrorType RcvQueue)]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> ConnId -> IO (Either AgentErrorType RcvQueue)
getQueue Connection
db) [ConnId]
connIds')
Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"processNtfCmd, NSCSmpDelete - length rqs = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow ([RcvQueue] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [RcvQueue]
rqs)
([(ConnId, AgentErrorType)]
cErrs', [()]
_) <- ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall a b. (a -> b) -> a -> b
$ (RcvQueue -> ConnId)
-> [RcvQueue]
-> [Either AgentErrorType ()]
-> ([(ConnId, AgentErrorType)], [()])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId [RcvQueue]
rqs ([Either AgentErrorType ()] -> ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO [Either AgentErrorType ()]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> (RcvQueue -> IO ()) -> [RcvQueue] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> RcvQueue -> IO ()
updateAction Connection
db) [RcvQueue]
rqs)
AgentClient
-> [(ConnId, AgentErrorType)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c ([(ConnId, AgentErrorType)]
cErrs [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
cErrs')
[RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
kickSMPWorkers [RcvQueue]
rqs
where
getQueue :: DB.Connection -> ConnId -> IO (Either AgentErrorType RcvQueue)
getQueue :: Connection -> ConnId -> IO (Either AgentErrorType RcvQueue)
getQueue Connection
db ConnId
connId = (StoreError -> AgentErrorType)
-> Either StoreError RcvQueue -> Either AgentErrorType RcvQueue
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError (Either StoreError RcvQueue -> Either AgentErrorType RcvQueue)
-> IO (Either StoreError RcvQueue)
-> IO (Either AgentErrorType RcvQueue)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> ConnId -> IO (Either StoreError RcvQueue)
getPrimaryRcvQueue Connection
db ConnId
connId
updateAction :: DB.Connection -> RcvQueue -> IO ()
updateAction :: Connection -> RcvQueue -> IO ()
updateAction Connection
db RcvQueue
rq = Connection -> ConnId -> NtfSubAction -> IO ()
supervisorUpdateNtfAction Connection
db (RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId RcvQueue
rq) (NtfSubSMPAction -> NtfSubAction
NSASMP NtfSubSMPAction
NSASmpDelete)
NtfSupervisorCommand
NSCDeleteSub -> ExceptT AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ExceptT AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()]
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT
AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ ReaderT Env IO [Either AgentErrorType ()]
-> ExceptT
AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()]
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO [Either AgentErrorType ()]
-> ExceptT
AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()])
-> ReaderT Env IO [Either AgentErrorType ()]
-> ExceptT
AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()]
forall a b. (a -> b) -> a -> b
$ AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c ((Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()])
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall a b. (a -> b) -> a -> b
$ \Connection
db -> (ConnId -> IO ()) -> [ConnId] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> ConnId -> IO ()
deleteNtfSubscription' Connection
db) [ConnId]
connIds'
where
kickSMPWorkers :: [RcvQueue] -> AM ()
kickSMPWorkers :: [RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
kickSMPWorkers [RcvQueue]
rqs = do
let smpServers :: Set SMPServer
smpServers = [SMPServer] -> Set SMPServer
forall a. Ord a => [a] -> Set a
S.fromList ([SMPServer] -> Set SMPServer) -> [SMPServer] -> Set SMPServer
forall a b. (a -> b) -> a -> b
$ (RcvQueue -> SMPServer) -> [RcvQueue] -> [SMPServer]
forall a b. (a -> b) -> [a] -> [b]
map RcvQueue -> SMPServer
forall q. SMPQueue q => q -> SMPServer
qServer [RcvQueue]
rqs
AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ (SMPServer -> AM' Worker) -> Set SMPServer -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker Bool
True AgentClient
c) Set SMPServer
smpServers
getNtfNTFWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfNTFWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfNTFWorker Bool
hasWork AgentClient
c NtfServer
server = do
TMap NtfServer Worker
ws <- (Env -> TMap NtfServer Worker)
-> ReaderT Env IO (TMap NtfServer Worker)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TMap NtfServer Worker)
-> ReaderT Env IO (TMap NtfServer Worker))
-> (Env -> TMap NtfServer Worker)
-> ReaderT Env IO (TMap NtfServer Worker)
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap NtfServer Worker
ntfWorkers (NtfSupervisor -> TMap NtfServer Worker)
-> (Env -> NtfSupervisor) -> Env -> TMap NtfServer Worker
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> NtfSupervisor
ntfSupervisor
String
-> Bool
-> AgentClient
-> NtfServer
-> TMap NtfServer Worker
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall k e (m :: * -> *).
(Ord k, Show k, AnyError e, MonadUnliftIO m) =>
String
-> Bool
-> AgentClient
-> k
-> TMap k Worker
-> (Worker -> ExceptT e m ())
-> m Worker
getAgentWorker String
"ntf_ntf" Bool
hasWork AgentClient
c NtfServer
server TMap NtfServer Worker
ws ((Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker)
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall a b. (a -> b) -> a -> b
$ AgentClient
-> NtfServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfWorker AgentClient
c NtfServer
server
getNtfSMPWorker :: Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker :: Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker Bool
hasWork AgentClient
c SMPServer
server = do
TMap SMPServer Worker
ws <- (Env -> TMap SMPServer Worker)
-> ReaderT Env IO (TMap SMPServer Worker)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TMap SMPServer Worker)
-> ReaderT Env IO (TMap SMPServer Worker))
-> (Env -> TMap SMPServer Worker)
-> ReaderT Env IO (TMap SMPServer Worker)
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap SMPServer Worker
ntfSMPWorkers (NtfSupervisor -> TMap SMPServer Worker)
-> (Env -> NtfSupervisor) -> Env -> TMap SMPServer Worker
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> NtfSupervisor
ntfSupervisor
String
-> Bool
-> AgentClient
-> SMPServer
-> TMap SMPServer Worker
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall k e (m :: * -> *).
(Ord k, Show k, AnyError e, MonadUnliftIO m) =>
String
-> Bool
-> AgentClient
-> k
-> TMap k Worker
-> (Worker -> ExceptT e m ())
-> m Worker
getAgentWorker String
"ntf_smp" Bool
hasWork AgentClient
c SMPServer
server TMap SMPServer Worker
ws ((Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker)
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall a b. (a -> b) -> a -> b
$ AgentClient
-> SMPServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfSMPWorker AgentClient
c SMPServer
server
getNtfTknDelWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfTknDelWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfTknDelWorker Bool
hasWork AgentClient
c NtfServer
server = do
TMap NtfServer Worker
ws <- (Env -> TMap NtfServer Worker)
-> ReaderT Env IO (TMap NtfServer Worker)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TMap NtfServer Worker)
-> ReaderT Env IO (TMap NtfServer Worker))
-> (Env -> TMap NtfServer Worker)
-> ReaderT Env IO (TMap NtfServer Worker)
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap NtfServer Worker
ntfTknDelWorkers (NtfSupervisor -> TMap NtfServer Worker)
-> (Env -> NtfSupervisor) -> Env -> TMap NtfServer Worker
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> NtfSupervisor
ntfSupervisor
String
-> Bool
-> AgentClient
-> NtfServer
-> TMap NtfServer Worker
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall k e (m :: * -> *).
(Ord k, Show k, AnyError e, MonadUnliftIO m) =>
String
-> Bool
-> AgentClient
-> k
-> TMap k Worker
-> (Worker -> ExceptT e m ())
-> m Worker
getAgentWorker String
"ntf_tkn_del" Bool
hasWork AgentClient
c NtfServer
server TMap NtfServer Worker
ws ((Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker)
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall a b. (a -> b) -> a -> b
$ AgentClient
-> NtfServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfTknDelWorker AgentClient
c NtfServer
server
withTokenServer :: (NtfServer -> AM ()) -> AM ()
withTokenServer :: (NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
withTokenServer NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ()
action = ReaderT Env IO (Maybe NtfToken)
-> ExceptT AgentErrorType (ReaderT Env IO) (Maybe NtfToken)
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ReaderT Env IO (Maybe NtfToken)
getNtfToken ExceptT AgentErrorType (ReaderT Env IO) (Maybe NtfToken)
-> (Maybe NtfToken -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b.
ExceptT AgentErrorType (ReaderT Env IO) a
-> (a -> ExceptT AgentErrorType (ReaderT Env IO) b)
-> ExceptT AgentErrorType (ReaderT Env IO) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (NtfToken -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Maybe NtfToken -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\NtfToken {NtfServer
ntfServer :: NtfServer
$sel:ntfServer:NtfToken :: NtfToken -> NtfServer
ntfServer} -> NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ()
action NtfServer
ntfServer)
runNtfWorker :: AgentClient -> NtfServer -> Worker -> AM ()
runNtfWorker :: AgentClient
-> NtfServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfWorker AgentClient
c NtfServer
srv Worker {TMVar ()
doWork :: TMVar ()
$sel:doWork:Worker :: Worker -> TMVar ()
doWork} =
ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ do
TMVar () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). MonadIO m => TMVar () -> m ()
waitForWork TMVar ()
doWork
ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> AgentOperation
-> (AgentClient -> IO ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
AgentClient
-> AgentOperation -> (AgentClient -> IO ()) -> m a -> m a
agentOperationBracket AgentClient
c AgentOperation
AONtfNetwork AgentClient -> IO ()
throwWhenInactive (ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ()))
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall a b. (a -> b) -> a -> b
$ ExceptT AgentErrorType (ReaderT Env IO) ()
-> ReaderT Env IO (Either AgentErrorType ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfOperation
where
runNtfOperation :: AM ()
runNtfOperation :: ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfOperation = do
Int
ntfBatchSize <- (Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int)
-> (Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int
forall a b. (a -> b) -> a -> b
$ AgentConfig -> Int
ntfBatchSize (AgentConfig -> Int) -> (Env -> AgentConfig) -> Env -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
AgentClient
-> TMVar ()
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(Either
StoreError
[Either
StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)])
-> (NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e' (m :: * -> *) e a.
(AnyStoreError e', MonadIO m) =>
AgentClient
-> TMVar ()
-> ExceptT e m (Either e' [Either e' a])
-> (NonEmpty a -> ExceptT e m ())
-> ExceptT e m ()
withWorkItems AgentClient
c TMVar ()
doWork (AgentClient
-> (Connection
-> IO
(Either
StoreError
[Either
StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)]))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(Either
StoreError
[Either
StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)])
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection
-> IO
(Either
StoreError
[Either
StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)]))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(Either
StoreError
[Either
StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)]))
-> (Connection
-> IO
(Either
StoreError
[Either
StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)]))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(Either
StoreError
[Either
StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)])
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> NtfServer
-> Int
-> IO
(Either
StoreError
[Either
StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)])
getNextNtfSubNTFActions Connection
db NtfServer
srv Int
ntfBatchSize) ((NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
nextSubs -> do
Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"runNtfWorker - length nextSubs = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow (NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs) -> Int
forall a. NonEmpty a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
nextSubs)
NtfActionTs
currTs <- IO NtfActionTs
-> ExceptT AgentErrorType (ReaderT Env IO) NtfActionTs
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO NtfActionTs
getCurrentTime
let ([NtfSubscription]
creates, [NtfSubscription]
checks, [NtfSubscription]
deletes, [NtfSubscription]
rotates) = NtfActionTs
-> NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
[NtfSubscription])
splitActions NtfActionTs
currTs NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
nextSubs
if [NtfSubscription] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [NtfSubscription]
creates Bool -> Bool -> Bool
&& [NtfSubscription] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [NtfSubscription]
checks Bool -> Bool -> Bool
&& [NtfSubscription] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [NtfSubscription]
deletes Bool -> Bool -> Bool
&& [NtfSubscription] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [NtfSubscription]
rotates
then
let (NtfSubNTFAction
_, NtfSubscription
_, NtfActionTs
firstActionTs) = NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> (NtfSubNTFAction, NtfSubscription, NtfActionTs)
forall a. NonEmpty a -> a
L.head NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
nextSubs
in AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> NtfActionTs -> NtfActionTs -> AM' ()
rescheduleWork TMVar ()
doWork NtfActionTs
currTs NtfActionTs
firstActionTs
else do
AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
creates [NtfSubscription] -> AM' [NtfSubscription]
createSubs
AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
checks [NtfSubscription] -> AM' [NtfSubscription]
checkSubs
AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
deletes [NtfSubscription] -> AM' [NtfSubscription]
deleteSubs
AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
rotates [NtfSubscription] -> AM' [NtfSubscription]
rotateSubs
splitActions :: UTCTime -> NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs) -> ([NtfSubscription], [NtfSubscription], [NtfSubscription], [NtfSubscription])
splitActions :: NtfActionTs
-> NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
[NtfSubscription])
splitActions NtfActionTs
currTs = ((NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
[NtfSubscription])
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
[NtfSubscription]))
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
[NtfSubscription])
-> NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
[NtfSubscription])
forall a b. (a -> b -> b) -> b -> NonEmpty a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
[NtfSubscription])
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
[NtfSubscription])
forall {a}.
(NtfSubNTFAction, a, NtfActionTs)
-> ([a], [a], [a], [a]) -> ([a], [a], [a], [a])
addAction ([], [], [], [])
where
addAction :: (NtfSubNTFAction, a, NtfActionTs)
-> ([a], [a], [a], [a]) -> ([a], [a], [a], [a])
addAction (NtfSubNTFAction
cmd, a
sub, NtfActionTs
ts) acc :: ([a], [a], [a], [a])
acc@([a]
creates, [a]
checks, [a]
deletes, [a]
rotates) = case NtfSubNTFAction
cmd of
NtfSubNTFAction
NSACreate -> (a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
creates, [a]
checks, [a]
deletes, [a]
rotates)
NtfSubNTFAction
NSACheck
| NtfActionTs
ts NtfActionTs -> NtfActionTs -> Bool
forall a. Ord a => a -> a -> Bool
<= NtfActionTs
currTs -> ([a]
creates, a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
checks, [a]
deletes, [a]
rotates)
| Bool
otherwise -> ([a], [a], [a], [a])
acc
NtfSubNTFAction
NSADelete -> ([a]
creates, [a]
checks, a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
deletes, [a]
rotates)
NtfSubNTFAction
NSARotate -> ([a]
creates, [a]
checks, [a]
deletes, a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
rotates)
createSubs :: [NtfSubscription] -> AM' [NtfSubscription]
createSubs :: [NtfSubscription] -> AM' [NtfSubscription]
createSubs [NtfSubscription]
ntfSubs =
ReaderT Env IO (Maybe NtfToken)
getNtfToken ReaderT Env IO (Maybe NtfToken)
-> (Maybe NtfToken -> AM' [NtfSubscription])
-> AM' [NtfSubscription]
forall a b.
ReaderT Env IO a -> (a -> ReaderT Env IO b) -> ReaderT Env IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just tkn :: NtfToken
tkn@NtfToken {NtfServer
$sel:ntfServer:NtfToken :: NtfToken -> NtfServer
ntfServer :: NtfServer
ntfServer, $sel:ntfTokenId:NtfToken :: NtfToken -> Maybe NtfSubscriptionId
ntfTokenId = Just NtfSubscriptionId
tknId, $sel:ntfTknStatus:NtfToken :: NtfToken -> NtfTknStatus
ntfTknStatus = NtfTknStatus
NTActive, $sel:ntfMode:NtfToken :: NtfToken -> NotificationsMode
ntfMode = NotificationsMode
NMInstant} -> do
[(NtfSubscription, Either AgentErrorType RcvQueue)]
subsRqs_ <- [NtfSubscription]
-> [Either AgentErrorType RcvQueue]
-> [(NtfSubscription, Either AgentErrorType RcvQueue)]
forall a b. [a] -> [b] -> [(a, b)]
zip [NtfSubscription]
ntfSubs ([Either AgentErrorType RcvQueue]
-> [(NtfSubscription, Either AgentErrorType RcvQueue)])
-> ReaderT Env IO [Either AgentErrorType RcvQueue]
-> ReaderT
Env IO [(NtfSubscription, Either AgentErrorType RcvQueue)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType RcvQueue)])
-> ReaderT Env IO [Either AgentErrorType RcvQueue]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (NtfSubscription -> IO (Either AgentErrorType RcvQueue))
-> [NtfSubscription] -> [IO (Either AgentErrorType RcvQueue)]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfSubscription -> IO (Either AgentErrorType RcvQueue)
getQueue Connection
db) [NtfSubscription]
ntfSubs)
let ([(ConnId, AgentErrorType)]
errs1, [NtfSubscription]
subs_, [NewNtfEntity 'Subscription]
newSubs_) = NtfSubscriptionId
-> [(NtfSubscription, Either AgentErrorType RcvQueue)]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription])
splitSubs NtfSubscriptionId
tknId [(NtfSubscription, Either AgentErrorType RcvQueue)]
subsRqs_
NtfServer
-> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfCreateAttempts [NtfSubscription]
subs_
case ([NtfSubscription] -> Maybe (NonEmpty NtfSubscription)
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [NtfSubscription]
subs_, [NewNtfEntity 'Subscription]
-> Maybe (NonEmpty (NewNtfEntity 'Subscription))
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [NewNtfEntity 'Subscription]
newSubs_) of
(Just NonEmpty NtfSubscription
subs, Just NonEmpty (NewNtfEntity 'Subscription)
newSubs) -> do
NonEmpty (NtfSubscription, Either AgentErrorType NtfSubscriptionId)
rs <- NonEmpty NtfSubscription
-> NonEmpty (Either AgentErrorType NtfSubscriptionId)
-> NonEmpty
(NtfSubscription, Either AgentErrorType NtfSubscriptionId)
forall a b. NonEmpty a -> NonEmpty b -> NonEmpty (a, b)
L.zip NonEmpty NtfSubscription
subs (NonEmpty (Either AgentErrorType NtfSubscriptionId)
-> NonEmpty
(NtfSubscription, Either AgentErrorType NtfSubscriptionId))
-> ReaderT
Env IO (NonEmpty (Either AgentErrorType NtfSubscriptionId))
-> ReaderT
Env
IO
(NonEmpty
(NtfSubscription, Either AgentErrorType NtfSubscriptionId))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> NtfToken
-> NonEmpty (NewNtfEntity 'Subscription)
-> ReaderT
Env IO (NonEmpty (Either AgentErrorType NtfSubscriptionId))
agentNtfCreateSubscriptions AgentClient
c NtfToken
tkn NonEmpty (NewNtfEntity 'Subscription)
newSubs
let ([NtfSubscription]
ntfSubs', [(NtfSubscription, AgentErrorType)]
errs2, [(NtfSubscription, NtfSubscriptionId)]
nSubIds) = [(NtfSubscription, Either AgentErrorType NtfSubscriptionId)]
-> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
[(NtfSubscription, NtfSubscriptionId)])
forall a r.
[(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults ([(NtfSubscription, Either AgentErrorType NtfSubscriptionId)]
-> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
[(NtfSubscription, NtfSubscriptionId)]))
-> [(NtfSubscription, Either AgentErrorType NtfSubscriptionId)]
-> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
[(NtfSubscription, NtfSubscriptionId)])
forall a b. (a -> b) -> a -> b
$ NonEmpty (NtfSubscription, Either AgentErrorType NtfSubscriptionId)
-> [(NtfSubscription, Either AgentErrorType NtfSubscriptionId)]
forall a. NonEmpty a -> [a]
L.toList NonEmpty (NtfSubscription, Either AgentErrorType NtfSubscriptionId)
rs
subs' :: [NtfSubscription]
subs' = ((NtfSubscription, NtfSubscriptionId) -> NtfSubscription)
-> [(NtfSubscription, NtfSubscriptionId)] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map (NtfSubscription, NtfSubscriptionId) -> NtfSubscription
forall a b. (a, b) -> a
fst [(NtfSubscription, NtfSubscriptionId)]
nSubIds
errs2' :: [(ConnId, AgentErrorType)]
errs2' = ((NtfSubscription, AgentErrorType) -> (ConnId, AgentErrorType))
-> [(NtfSubscription, AgentErrorType)]
-> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map ((NtfSubscription -> ConnId)
-> (NtfSubscription, AgentErrorType) -> (ConnId, AgentErrorType)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first NtfSubscription -> ConnId
ntfSubConnId) [(NtfSubscription, AgentErrorType)]
errs2
NtfServer
-> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfCreated [NtfSubscription]
subs'
NtfActionTs
ts <- IO NtfActionTs -> ReaderT Env IO NtfActionTs
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO NtfActionTs
getCurrentTime
NominalDiffTime
int <- (Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime)
-> (Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime
forall a b. (a -> b) -> a -> b
$ AgentConfig -> NominalDiffTime
ntfSubFirstCheckInterval (AgentConfig -> NominalDiffTime)
-> (Env -> AgentConfig) -> Env -> NominalDiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
let checkTs :: NtfActionTs
checkTs = NominalDiffTime -> NtfActionTs -> NtfActionTs
addUTCTime NominalDiffTime
int NtfActionTs
ts
([(ConnId, AgentErrorType)]
errs3, [()]
_) <- (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType ()]
-> ([(ConnId, AgentErrorType)], [()])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
subs' ([Either AgentErrorType ()] -> ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO [Either AgentErrorType ()]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> ((NtfSubscription, NtfSubscriptionId) -> IO ())
-> [(NtfSubscription, NtfSubscriptionId)] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfActionTs -> (NtfSubscription, NtfSubscriptionId) -> IO ()
updateSubNSACheck Connection
db NtfActionTs
checkTs) [(NtfSubscription, NtfSubscriptionId)]
nSubIds)
AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c ([(ConnId, AgentErrorType)] -> AM' ())
-> [(ConnId, AgentErrorType)] -> AM' ()
forall a b. (a -> b) -> a -> b
$ [(ConnId, AgentErrorType)]
errs1 [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs2' [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs3
[NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [NtfSubscription]
ntfSubs'
(Maybe (NonEmpty NtfSubscription),
Maybe (NonEmpty (NewNtfEntity 'Subscription)))
_ -> AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
errs1 AM' () -> [NtfSubscription] -> AM' [NtfSubscription]
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> []
Maybe NtfToken
_ -> do
let errs :: [(ConnId, AgentErrorType)]
errs = (NtfSubscription -> (ConnId, AgentErrorType))
-> [NtfSubscription] -> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map (\NtfSubscription
sub -> (NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, String -> AgentErrorType
INTERNAL String
"NSACreate - no active token")) [NtfSubscription]
ntfSubs
AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
errs
[NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
where
getQueue :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType RcvQueue)
getQueue :: Connection
-> NtfSubscription -> IO (Either AgentErrorType RcvQueue)
getQueue Connection
db NtfSubscription {ConnId
$sel:connId:NtfSubscription :: NtfSubscription -> ConnId
connId :: ConnId
connId} = (StoreError -> AgentErrorType)
-> Either StoreError RcvQueue -> Either AgentErrorType RcvQueue
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError (Either StoreError RcvQueue -> Either AgentErrorType RcvQueue)
-> IO (Either StoreError RcvQueue)
-> IO (Either AgentErrorType RcvQueue)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> ConnId -> IO (Either StoreError RcvQueue)
getPrimaryRcvQueue Connection
db ConnId
connId
splitSubs :: NtfTokenId -> [(NtfSubscription, Either AgentErrorType RcvQueue)] -> ([(ConnId, AgentErrorType)], [NtfSubscription], [NewNtfEntity 'Subscription])
splitSubs :: NtfSubscriptionId
-> [(NtfSubscription, Either AgentErrorType RcvQueue)]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription])
splitSubs NtfSubscriptionId
tknId = ((NtfSubscription, Either AgentErrorType RcvQueue)
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription]))
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription])
-> [(NtfSubscription, Either AgentErrorType RcvQueue)]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (NtfSubscription, Either AgentErrorType RcvQueue)
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription])
forall {q :: DBStored}.
(NtfSubscription, Either AgentErrorType (StoredRcvQueue q))
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription])
splitSub ([], [], [])
where
splitSub :: (NtfSubscription, Either AgentErrorType (StoredRcvQueue q))
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NewNtfEntity 'Subscription])
splitSub (NtfSubscription
sub, Either AgentErrorType (StoredRcvQueue q)
rq) ([(ConnId, AgentErrorType)]
errs, [NtfSubscription]
subs, [NewNtfEntity 'Subscription]
newSubs) = case Either AgentErrorType (StoredRcvQueue q)
rq of
Right RcvQueue {$sel:clientNtfCreds:RcvQueue :: forall (q :: DBStored). StoredRcvQueue q -> Maybe ClientNtfCreds
clientNtfCreds = Just ClientNtfCreds
creds} -> ([(ConnId, AgentErrorType)]
errs, NtfSubscription
sub NtfSubscription -> [NtfSubscription] -> [NtfSubscription]
forall a. a -> [a] -> [a]
: [NtfSubscription]
subs, NtfSubscription -> ClientNtfCreds -> NewNtfEntity 'Subscription
toNewSub NtfSubscription
sub ClientNtfCreds
creds NewNtfEntity 'Subscription
-> [NewNtfEntity 'Subscription] -> [NewNtfEntity 'Subscription]
forall a. a -> [a] -> [a]
: [NewNtfEntity 'Subscription]
newSubs)
Right StoredRcvQueue q
_ -> ((NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, String -> AgentErrorType
INTERNAL String
"NSACreate - no notifier queue credentials") (ConnId, AgentErrorType)
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. a -> [a] -> [a]
: [(ConnId, AgentErrorType)]
errs, [NtfSubscription]
subs, [NewNtfEntity 'Subscription]
newSubs)
Left AgentErrorType
e -> ((NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, AgentErrorType
e) (ConnId, AgentErrorType)
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. a -> [a] -> [a]
: [(ConnId, AgentErrorType)]
errs, [NtfSubscription]
subs, [NewNtfEntity 'Subscription]
newSubs)
toNewSub :: NtfSubscription -> ClientNtfCreds -> NewNtfEntity 'Subscription
toNewSub NtfSubscription {SMPServer
$sel:smpServer:NtfSubscription :: NtfSubscription -> SMPServer
smpServer :: SMPServer
smpServer} ClientNtfCreds {APrivateAuthKey
ntfPrivateKey :: APrivateAuthKey
$sel:ntfPrivateKey:ClientNtfCreds :: ClientNtfCreds -> APrivateAuthKey
ntfPrivateKey, NtfSubscriptionId
$sel:notifierId:ClientNtfCreds :: ClientNtfCreds -> NtfSubscriptionId
notifierId :: NtfSubscriptionId
notifierId} =
NtfSubscriptionId
-> SMPQueueNtf -> APrivateAuthKey -> NewNtfEntity 'Subscription
NewNtfSub NtfSubscriptionId
tknId (SMPServer -> NtfSubscriptionId -> SMPQueueNtf
SMPQueueNtf SMPServer
smpServer NtfSubscriptionId
notifierId) APrivateAuthKey
ntfPrivateKey
updateSubNSACheck :: DB.Connection -> UTCTime -> (NtfSubscription, NtfSubscriptionId) -> IO ()
updateSubNSACheck :: Connection
-> NtfActionTs -> (NtfSubscription, NtfSubscriptionId) -> IO ()
updateSubNSACheck Connection
db NtfActionTs
checkTs (NtfSubscription
sub, NtfSubscriptionId
nSubId) = Connection
-> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
updateNtfSubscription Connection
db NtfSubscription
sub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew} (NtfSubNTFAction -> NtfSubAction
NSANtf NtfSubNTFAction
NSACheck) NtfActionTs
checkTs
checkSubs :: [NtfSubscription] -> AM' [NtfSubscription]
checkSubs :: [NtfSubscription] -> AM' [NtfSubscription]
checkSubs [NtfSubscription]
ntfSubs =
ReaderT Env IO (Maybe NtfToken)
getNtfToken ReaderT Env IO (Maybe NtfToken)
-> (Maybe NtfToken -> AM' [NtfSubscription])
-> AM' [NtfSubscription]
forall a b.
ReaderT Env IO a -> (a -> ReaderT Env IO b) -> ReaderT Env IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just tkn :: NtfToken
tkn@NtfToken {NtfServer
$sel:ntfServer:NtfToken :: NtfToken -> NtfServer
ntfServer :: NtfServer
ntfServer, $sel:ntfTknStatus:NtfToken :: NtfToken -> NtfTknStatus
ntfTknStatus = NtfTknStatus
NTActive, $sel:ntfMode:NtfToken :: NtfToken -> NotificationsMode
ntfMode = NotificationsMode
NMInstant} -> do
let ([(ConnId, AgentErrorType)]
errs1, [NtfSubscription]
subs_, [NtfSubscriptionId]
subIds_) = [NtfSubscription]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NtfSubscriptionId])
splitSubs [NtfSubscription]
ntfSubs
NtfServer
-> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfCheckAttempts [NtfSubscription]
subs_
case ([NtfSubscription] -> Maybe (NonEmpty NtfSubscription)
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [NtfSubscription]
subs_, [NtfSubscriptionId] -> Maybe (NonEmpty NtfSubscriptionId)
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [NtfSubscriptionId]
subIds_) of
(Just NonEmpty NtfSubscription
subs, Just NonEmpty NtfSubscriptionId
subIds) -> do
NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus)
rs <- NonEmpty NtfSubscription
-> NonEmpty (Either AgentErrorType NtfSubStatus)
-> NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus)
forall a b. NonEmpty a -> NonEmpty b -> NonEmpty (a, b)
L.zip NonEmpty NtfSubscription
subs (NonEmpty (Either AgentErrorType NtfSubStatus)
-> NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus))
-> ReaderT Env IO (NonEmpty (Either AgentErrorType NtfSubStatus))
-> ReaderT
Env
IO
(NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> NtfToken
-> NonEmpty NtfSubscriptionId
-> ReaderT Env IO (NonEmpty (Either AgentErrorType NtfSubStatus))
agentNtfCheckSubscriptions AgentClient
c NtfToken
tkn NonEmpty NtfSubscriptionId
subIds
let ([NtfSubscription]
ntfSubs', [(NtfSubscription, AgentErrorType)]
errs2, [(NtfSubscription, NtfSubStatus)]
nSubStatuses) = [(NtfSubscription, Either AgentErrorType NtfSubStatus)]
-> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
[(NtfSubscription, NtfSubStatus)])
forall a r.
[(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults ([(NtfSubscription, Either AgentErrorType NtfSubStatus)]
-> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
[(NtfSubscription, NtfSubStatus)]))
-> [(NtfSubscription, Either AgentErrorType NtfSubStatus)]
-> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
[(NtfSubscription, NtfSubStatus)])
forall a b. (a -> b) -> a -> b
$ NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus)
-> [(NtfSubscription, Either AgentErrorType NtfSubStatus)]
forall a. NonEmpty a -> [a]
L.toList NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus)
rs
subs' :: [NtfSubscription]
subs' = ((NtfSubscription, NtfSubStatus) -> NtfSubscription)
-> [(NtfSubscription, NtfSubStatus)] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map (NtfSubscription, NtfSubStatus) -> NtfSubscription
forall a b. (a, b) -> a
fst [(NtfSubscription, NtfSubStatus)]
nSubStatuses
([(ConnId, AgentErrorType)]
errs2', [NtfSubscription]
authSubs) = [Either (ConnId, AgentErrorType) NtfSubscription]
-> ([(ConnId, AgentErrorType)], [NtfSubscription])
forall a b. [Either a b] -> ([a], [b])
partitionEithers ([Either (ConnId, AgentErrorType) NtfSubscription]
-> ([(ConnId, AgentErrorType)], [NtfSubscription]))
-> [Either (ConnId, AgentErrorType) NtfSubscription]
-> ([(ConnId, AgentErrorType)], [NtfSubscription])
forall a b. (a -> b) -> a -> b
$ ((NtfSubscription, AgentErrorType)
-> Either (ConnId, AgentErrorType) NtfSubscription)
-> [(NtfSubscription, AgentErrorType)]
-> [Either (ConnId, AgentErrorType) NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map (\case (NtfSubscription
sub, NTF String
_ ErrorType
SMP.AUTH) -> NtfSubscription -> Either (ConnId, AgentErrorType) NtfSubscription
forall a b. b -> Either a b
Right NtfSubscription
sub; (NtfSubscription, AgentErrorType)
e -> (ConnId, AgentErrorType)
-> Either (ConnId, AgentErrorType) NtfSubscription
forall a b. a -> Either a b
Left ((ConnId, AgentErrorType)
-> Either (ConnId, AgentErrorType) NtfSubscription)
-> (ConnId, AgentErrorType)
-> Either (ConnId, AgentErrorType) NtfSubscription
forall a b. (a -> b) -> a -> b
$ (NtfSubscription -> ConnId)
-> (NtfSubscription, AgentErrorType) -> (ConnId, AgentErrorType)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first NtfSubscription -> ConnId
ntfSubConnId (NtfSubscription, AgentErrorType)
e) [(NtfSubscription, AgentErrorType)]
errs2
NtfServer
-> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfChecked [NtfSubscription]
subs'
NtfActionTs
ts <- IO NtfActionTs -> ReaderT Env IO NtfActionTs
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO NtfActionTs
getCurrentTime
NominalDiffTime
int <- (Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime)
-> (Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime
forall a b. (a -> b) -> a -> b
$ AgentConfig -> NominalDiffTime
ntfSubCheckInterval (AgentConfig -> NominalDiffTime)
-> (Env -> AgentConfig) -> Env -> NominalDiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
let nextCheckTs :: NtfActionTs
nextCheckTs = NominalDiffTime -> NtfActionTs -> NtfActionTs
addUTCTime NominalDiffTime
int NtfActionTs
ts
([(ConnId, AgentErrorType)]
errs3, [Maybe SMPServer]
srvs) <- (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType (Maybe SMPServer)]
-> ([(ConnId, AgentErrorType)], [Maybe SMPServer])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
subs' ([Either AgentErrorType (Maybe SMPServer)]
-> ([(ConnId, AgentErrorType)], [Maybe SMPServer]))
-> ReaderT Env IO [Either AgentErrorType (Maybe SMPServer)]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [Maybe SMPServer])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Maybe SMPServer)])
-> ReaderT Env IO [Either AgentErrorType (Maybe SMPServer)]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> ((NtfSubscription, NtfSubStatus) -> IO (Maybe SMPServer))
-> [(NtfSubscription, NtfSubStatus)] -> [IO (Maybe SMPServer)]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfServer
-> NtfActionTs
-> NtfActionTs
-> (NtfSubscription, NtfSubStatus)
-> IO (Maybe SMPServer)
updateSub Connection
db NtfServer
ntfServer NtfActionTs
ts NtfActionTs
nextCheckTs) [(NtfSubscription, NtfSubStatus)]
nSubStatuses)
([(ConnId, AgentErrorType)]
errs4, [SMPServer]
srvs') <- (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType SMPServer]
-> ([(ConnId, AgentErrorType)], [SMPServer])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
authSubs ([Either AgentErrorType SMPServer]
-> ([(ConnId, AgentErrorType)], [SMPServer]))
-> ReaderT Env IO [Either AgentErrorType SMPServer]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [SMPServer])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO SMPServer])
-> ReaderT Env IO [Either AgentErrorType SMPServer]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> (NtfSubscription -> IO SMPServer)
-> [NtfSubscription] -> [IO SMPServer]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfServer -> NtfActionTs -> NtfSubscription -> IO SMPServer
recreateNtfSub Connection
db NtfServer
ntfServer NtfActionTs
ts) [NtfSubscription]
authSubs)
(SMPServer -> AM' Worker) -> Set SMPServer -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker Bool
True AgentClient
c) (Set SMPServer -> AM' ()) -> Set SMPServer -> AM' ()
forall a b. (a -> b) -> a -> b
$ [SMPServer] -> Set SMPServer
forall a. Ord a => [a] -> Set a
S.fromList ([Maybe SMPServer] -> [SMPServer]
forall a. [Maybe a] -> [a]
catMaybes [Maybe SMPServer]
srvs [SMPServer] -> [SMPServer] -> [SMPServer]
forall a. Semigroup a => a -> a -> a
<> [SMPServer]
srvs')
AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c ([(ConnId, AgentErrorType)] -> AM' ())
-> [(ConnId, AgentErrorType)] -> AM' ()
forall a b. (a -> b) -> a -> b
$ [(ConnId, AgentErrorType)]
errs1 [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs2' [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs3 [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs4
[NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [NtfSubscription]
ntfSubs'
(Maybe (NonEmpty NtfSubscription),
Maybe (NonEmpty NtfSubscriptionId))
_ -> AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
errs1 AM' () -> [NtfSubscription] -> AM' [NtfSubscription]
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> []
Maybe NtfToken
_ -> do
let errs :: [(ConnId, AgentErrorType)]
errs = (NtfSubscription -> (ConnId, AgentErrorType))
-> [NtfSubscription] -> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map (\NtfSubscription
sub -> (NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, String -> AgentErrorType
INTERNAL String
"NSACheck - no active token")) [NtfSubscription]
ntfSubs
AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
errs
[NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
where
splitSubs :: [NtfSubscription] -> ([(ConnId, AgentErrorType)], [NtfSubscription], [NtfSubscriptionId])
splitSubs :: [NtfSubscription]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NtfSubscriptionId])
splitSubs = (NtfSubscription
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NtfSubscriptionId])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NtfSubscriptionId]))
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NtfSubscriptionId])
-> [NtfSubscription]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NtfSubscriptionId])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr NtfSubscription
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NtfSubscriptionId])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NtfSubscriptionId])
splitSub ([], [], [])
where
splitSub :: NtfSubscription
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NtfSubscriptionId])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
[NtfSubscriptionId])
splitSub NtfSubscription
sub ([(ConnId, AgentErrorType)]
errs, [NtfSubscription]
subs, [NtfSubscriptionId]
subIds) = case NtfSubscription
sub of
NtfSubscription {$sel:ntfSubId:NtfSubscription :: NtfSubscription -> Maybe NtfSubscriptionId
ntfSubId = Just NtfSubscriptionId
subId} -> ([(ConnId, AgentErrorType)]
errs, NtfSubscription
sub NtfSubscription -> [NtfSubscription] -> [NtfSubscription]
forall a. a -> [a] -> [a]
: [NtfSubscription]
subs, NtfSubscriptionId
subId NtfSubscriptionId -> [NtfSubscriptionId] -> [NtfSubscriptionId]
forall a. a -> [a] -> [a]
: [NtfSubscriptionId]
subIds)
NtfSubscription
_ -> ((NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, String -> AgentErrorType
INTERNAL String
"NSACheck - no subscription ID") (ConnId, AgentErrorType)
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. a -> [a] -> [a]
: [(ConnId, AgentErrorType)]
errs, [NtfSubscription]
subs, [NtfSubscriptionId]
subIds)
updateSub :: DB.Connection -> NtfServer -> UTCTime -> UTCTime -> (NtfSubscription, NtfSubStatus) -> IO (Maybe SMPServer)
updateSub :: Connection
-> NtfServer
-> NtfActionTs
-> NtfActionTs
-> (NtfSubscription, NtfSubStatus)
-> IO (Maybe SMPServer)
updateSub Connection
db NtfServer
ntfServer NtfActionTs
ts NtfActionTs
nextCheckTs (NtfSubscription
sub, NtfSubStatus
status)
| NtfSubStatus -> Bool
ntfShouldSubscribe NtfSubStatus
status =
let sub' :: NtfSubscription
sub' = NtfSubscription
sub {ntfSubStatus = NASCreated status}
in Maybe SMPServer
forall a. Maybe a
Nothing Maybe SMPServer -> IO () -> IO (Maybe SMPServer)
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Connection
-> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
updateNtfSubscription Connection
db NtfSubscription
sub' (NtfSubNTFAction -> NtfSubAction
NSANtf NtfSubNTFAction
NSACheck) NtfActionTs
nextCheckTs
| Bool
otherwise = SMPServer -> Maybe SMPServer
forall a. a -> Maybe a
Just (SMPServer -> Maybe SMPServer)
-> IO SMPServer -> IO (Maybe SMPServer)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection
-> NtfServer -> NtfActionTs -> NtfSubscription -> IO SMPServer
recreateNtfSub Connection
db NtfServer
ntfServer NtfActionTs
ts NtfSubscription
sub
recreateNtfSub :: DB.Connection -> NtfServer -> UTCTime -> NtfSubscription -> IO SMPServer
recreateNtfSub :: Connection
-> NtfServer -> NtfActionTs -> NtfSubscription -> IO SMPServer
recreateNtfSub Connection
db NtfServer
ntfServer NtfActionTs
ts sub :: NtfSubscription
sub@NtfSubscription {SMPServer
$sel:smpServer:NtfSubscription :: NtfSubscription -> SMPServer
smpServer :: SMPServer
smpServer} =
let sub' :: NtfSubscription
sub' = NtfSubscription
sub {ntfServer, ntfQueueId = Nothing, ntfSubId = Nothing, ntfSubStatus = NASNew}
in SMPServer
smpServer SMPServer -> IO () -> IO SMPServer
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Connection
-> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
updateNtfSubscription Connection
db NtfSubscription
sub' (NtfSubSMPAction -> NtfSubAction
NSASMP NtfSubSMPAction
NSASmpKey) NtfActionTs
ts
incStatByUserId :: NtfServer -> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId :: NtfServer
-> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
sel [NtfSubscription]
ss =
[(Int64, Int)] -> ((Int64, Int) -> AM' ()) -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map Int64 Int -> [(Int64, Int)]
forall k a. Map k a -> [(k, a)]
M.assocs Map Int64 Int
userIdsCounts) (((Int64, Int) -> AM' ()) -> AM' ())
-> ((Int64, Int) -> AM' ()) -> AM' ()
forall a b. (a -> b) -> a -> b
$ \(Int64
userId, Int
count) ->
STM () -> AM' ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM' ()) -> STM () -> AM' ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> NtfServer
-> (AgentNtfServerStats -> TVar Int)
-> Int
-> STM ()
incNtfServerStat' AgentClient
c Int64
userId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
sel Int
count
where
userIdsCounts :: Map Int64 Int
userIdsCounts = (Map Int64 Int -> NtfSubscription -> Map Int64 Int)
-> Map Int64 Int -> [NtfSubscription] -> Map Int64 Int
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\Map Int64 Int
acc NtfSubscription {Int64
userId :: Int64
$sel:userId:NtfSubscription :: NtfSubscription -> Int64
userId} -> (Int -> Int -> Int)
-> Int64 -> Int -> Map Int64 Int -> Map Int64 Int
forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) Int64
userId Int
1 Map Int64 Int
acc) Map Int64 Int
forall k a. Map k a
M.empty [NtfSubscription]
ss
deleteSubs :: [NtfSubscription] -> AM' [NtfSubscription]
deleteSubs :: [NtfSubscription] -> AM' [NtfSubscription]
deleteSubs [NtfSubscription]
ntfSubs = do
[Maybe NtfSubscription]
retrySubs_ <- (NtfSubscription -> ReaderT Env IO (Maybe NtfSubscription))
-> [NtfSubscription] -> ReaderT Env IO [Maybe NtfSubscription]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ((NtfSubscription -> AM (Maybe NtfSubscription))
-> NtfSubscription -> ReaderT Env IO (Maybe NtfSubscription)
runCatching NtfSubscription -> AM (Maybe NtfSubscription)
deleteSub) [NtfSubscription]
ntfSubs
[NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([NtfSubscription] -> AM' [NtfSubscription])
-> [NtfSubscription] -> AM' [NtfSubscription]
forall a b. (a -> b) -> a -> b
$ [Maybe NtfSubscription] -> [NtfSubscription]
forall a. [Maybe a] -> [a]
catMaybes [Maybe NtfSubscription]
retrySubs_
where
deleteSub :: NtfSubscription -> AM (Maybe NtfSubscription)
deleteSub :: NtfSubscription -> AM (Maybe NtfSubscription)
deleteSub sub :: NtfSubscription
sub@NtfSubscription {SMPServer
$sel:smpServer:NtfSubscription :: NtfSubscription -> SMPServer
smpServer :: SMPServer
smpServer} =
NtfSubscription
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription)
deleteNtfSub NtfSubscription
sub (ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription))
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription)
forall a b. (a -> b) -> a -> b
$ do
let sub' :: NtfSubscription
sub' = NtfSubscription
sub {ntfSubId = Nothing, ntfSubStatus = NASOff}
NtfActionTs
ts <- IO NtfActionTs
-> ExceptT AgentErrorType (ReaderT Env IO) NtfActionTs
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO NtfActionTs
getCurrentTime
AgentClient
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
updateNtfSubscription Connection
db NtfSubscription
sub' (NtfSubSMPAction -> NtfSubAction
NSASMP NtfSubSMPAction
NSASmpDelete) NtfActionTs
ts
AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (AM' Worker -> AM' ())
-> AM' Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AM' Worker -> AM' ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (AM' Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker Bool
True AgentClient
c SMPServer
smpServer
rotateSubs :: [NtfSubscription] -> AM' [NtfSubscription]
rotateSubs :: [NtfSubscription] -> AM' [NtfSubscription]
rotateSubs [NtfSubscription]
ntfSubs = do
[Maybe NtfSubscription]
retrySubs_ <- (NtfSubscription -> ReaderT Env IO (Maybe NtfSubscription))
-> [NtfSubscription] -> ReaderT Env IO [Maybe NtfSubscription]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ((NtfSubscription -> AM (Maybe NtfSubscription))
-> NtfSubscription -> ReaderT Env IO (Maybe NtfSubscription)
runCatching NtfSubscription -> AM (Maybe NtfSubscription)
rotateSub) [NtfSubscription]
ntfSubs
[NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([NtfSubscription] -> AM' [NtfSubscription])
-> [NtfSubscription] -> AM' [NtfSubscription]
forall a b. (a -> b) -> a -> b
$ [Maybe NtfSubscription] -> [NtfSubscription]
forall a. [Maybe a] -> [a]
catMaybes [Maybe NtfSubscription]
retrySubs_
where
rotateSub :: NtfSubscription -> AM (Maybe NtfSubscription)
rotateSub :: NtfSubscription -> AM (Maybe NtfSubscription)
rotateSub sub :: NtfSubscription
sub@NtfSubscription {ConnId
$sel:connId:NtfSubscription :: NtfSubscription -> ConnId
connId :: ConnId
connId} =
NtfSubscription
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription)
deleteNtfSub NtfSubscription
sub (ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription))
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription)
forall a b. (a -> b) -> a -> b
$ do
AgentClient
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> ConnId -> IO ()
deleteNtfSubscription Connection
db ConnId
connId
NtfSupervisor
ns <- (Env -> NtfSupervisor)
-> ExceptT AgentErrorType (ReaderT Env IO) NtfSupervisor
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> NtfSupervisor
ntfSupervisor
STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> (NtfSupervisorCommand, NonEmpty ConnId) -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (NtfSupervisor -> TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
ntfSubQ NtfSupervisor
ns) (NtfSupervisorCommand
NSCCreate, [ConnId
Item (NonEmpty ConnId)
connId])
runCatching :: (NtfSubscription -> AM (Maybe NtfSubscription)) -> NtfSubscription -> AM' (Maybe NtfSubscription)
runCatching :: (NtfSubscription -> AM (Maybe NtfSubscription))
-> NtfSubscription -> ReaderT Env IO (Maybe NtfSubscription)
runCatching NtfSubscription -> AM (Maybe NtfSubscription)
action sub :: NtfSubscription
sub@NtfSubscription {ConnId
$sel:connId:NtfSubscription :: NtfSubscription -> ConnId
connId :: ConnId
connId} =
Maybe NtfSubscription
-> Either AgentErrorType (Maybe NtfSubscription)
-> Maybe NtfSubscription
forall b a. b -> Either a b -> b
fromRight Maybe NtfSubscription
forall a. Maybe a
Nothing
(Either AgentErrorType (Maybe NtfSubscription)
-> Maybe NtfSubscription)
-> ReaderT Env IO (Either AgentErrorType (Maybe NtfSubscription))
-> ReaderT Env IO (Maybe NtfSubscription)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AM (Maybe NtfSubscription)
-> ReaderT Env IO (Either AgentErrorType (Maybe NtfSubscription))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (NtfSubscription -> AM (Maybe NtfSubscription)
action NtfSubscription
sub AM (Maybe NtfSubscription)
-> (AgentErrorType -> AM (Maybe NtfSubscription))
-> AM (Maybe NtfSubscription)
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
`catchAllErrors` \AgentErrorType
e -> AgentClient
-> ConnId -> String -> ExceptT AgentErrorType (ReaderT Env IO) ()
workerInternalError AgentClient
c ConnId
connId (AgentErrorType -> String
forall a. Show a => a -> String
show AgentErrorType
e) ExceptT AgentErrorType (ReaderT Env IO) ()
-> Maybe NtfSubscription -> AM (Maybe NtfSubscription)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe NtfSubscription
forall a. Maybe a
Nothing)
deleteNtfSub :: NtfSubscription -> AM () -> AM (Maybe NtfSubscription)
deleteNtfSub :: NtfSubscription
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription)
deleteNtfSub sub :: NtfSubscription
sub@NtfSubscription {Int64
$sel:userId:NtfSubscription :: NtfSubscription -> Int64
userId :: Int64
userId, Maybe NtfSubscriptionId
$sel:ntfSubId:NtfSubscription :: NtfSubscription -> Maybe NtfSubscriptionId
ntfSubId :: Maybe NtfSubscriptionId
ntfSubId} ExceptT AgentErrorType (ReaderT Env IO) ()
continue = case Maybe NtfSubscriptionId
ntfSubId of
Just NtfSubscriptionId
nSubId ->
ReaderT Env IO (Maybe NtfToken)
-> ExceptT AgentErrorType (ReaderT Env IO) (Maybe NtfToken)
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ReaderT Env IO (Maybe NtfToken)
getNtfToken ExceptT AgentErrorType (ReaderT Env IO) (Maybe NtfToken)
-> (Maybe NtfToken -> AM (Maybe NtfSubscription))
-> AM (Maybe NtfSubscription)
forall a b.
ExceptT AgentErrorType (ReaderT Env IO) a
-> (a -> ExceptT AgentErrorType (ReaderT Env IO) b)
-> ExceptT AgentErrorType (ReaderT Env IO) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just tkn :: NtfToken
tkn@NtfToken {NtfServer
$sel:ntfServer:NtfToken :: NtfToken -> NtfServer
ntfServer :: NtfServer
ntfServer} -> do
STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> NtfServer
-> (AgentNtfServerStats -> TVar Int)
-> STM ()
incNtfServerStat AgentClient
c Int64
userId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfDelAttempts
ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT
AgentErrorType (ReaderT Env IO) (Either AgentErrorType ())
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> ExceptT e m (Either e a)
tryAllErrors (AgentClient
-> NtfSubscriptionId
-> NtfToken
-> ExceptT AgentErrorType (ReaderT Env IO) ()
agentNtfDeleteSubscription AgentClient
c NtfSubscriptionId
nSubId NtfToken
tkn) ExceptT AgentErrorType (ReaderT Env IO) (Either AgentErrorType ())
-> (Either AgentErrorType () -> AM (Maybe NtfSubscription))
-> AM (Maybe NtfSubscription)
forall a b.
ExceptT AgentErrorType (ReaderT Env IO) a
-> (a -> ExceptT AgentErrorType (ReaderT Env IO) b)
-> ExceptT AgentErrorType (ReaderT Env IO) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right ()
_ -> do
STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> NtfServer
-> (AgentNtfServerStats -> TVar Int)
-> STM ()
incNtfServerStat AgentClient
c Int64
userId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfDeleted
AM (Maybe NtfSubscription)
forall {a}. ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
continue'
Left AgentErrorType
e
| AgentErrorType -> Bool
temporaryOrHostError AgentErrorType
e -> Maybe NtfSubscription -> AM (Maybe NtfSubscription)
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe NtfSubscription -> AM (Maybe NtfSubscription))
-> Maybe NtfSubscription -> AM (Maybe NtfSubscription)
forall a b. (a -> b) -> a -> b
$ NtfSubscription -> Maybe NtfSubscription
forall a. a -> Maybe a
Just NtfSubscription
sub
| Bool
otherwise -> AM (Maybe NtfSubscription)
forall {a}. ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
continue'
Maybe NtfToken
Nothing -> AM (Maybe NtfSubscription)
forall {a}. ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
continue'
Maybe NtfSubscriptionId
_ -> AM (Maybe NtfSubscription)
forall {a}. ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
continue'
where
continue' :: ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
continue' = ExceptT AgentErrorType (ReaderT Env IO) ()
continue ExceptT AgentErrorType (ReaderT Env IO) ()
-> Maybe a -> ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe a
forall a. Maybe a
Nothing
runNtfSMPWorker :: AgentClient -> SMPServer -> Worker -> AM ()
runNtfSMPWorker :: AgentClient
-> SMPServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfSMPWorker AgentClient
c SMPServer
srv Worker {TMVar ()
$sel:doWork:Worker :: Worker -> TMVar ()
doWork :: TMVar ()
doWork} = ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ do
TMVar () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). MonadIO m => TMVar () -> m ()
waitForWork TMVar ()
doWork
ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> AgentOperation
-> (AgentClient -> IO ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
AgentClient
-> AgentOperation -> (AgentClient -> IO ()) -> m a -> m a
agentOperationBracket AgentClient
c AgentOperation
AONtfNetwork AgentClient -> IO ()
throwWhenInactive (ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ()))
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall a b. (a -> b) -> a -> b
$ ExceptT AgentErrorType (ReaderT Env IO) ()
-> ReaderT Env IO (Either AgentErrorType ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfSMPOperation
where
runNtfSMPOperation :: AM ()
runNtfSMPOperation :: ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfSMPOperation = do
Int
ntfBatchSize <- (Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int)
-> (Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int
forall a b. (a -> b) -> a -> b
$ AgentConfig -> Int
ntfBatchSize (AgentConfig -> Int) -> (Env -> AgentConfig) -> Env -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
AgentClient
-> TMVar ()
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(Either
StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)])
-> (NonEmpty (NtfSubSMPAction, NtfSubscription)
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e' (m :: * -> *) e a.
(AnyStoreError e', MonadIO m) =>
AgentClient
-> TMVar ()
-> ExceptT e m (Either e' [Either e' a])
-> (NonEmpty a -> ExceptT e m ())
-> ExceptT e m ()
withWorkItems AgentClient
c TMVar ()
doWork (AgentClient
-> (Connection
-> IO
(Either
StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)]))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(Either
StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)])
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection
-> IO
(Either
StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)]))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(Either
StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)]))
-> (Connection
-> IO
(Either
StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)]))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(Either
StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)])
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> SMPServer
-> Int
-> IO
(Either
StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)])
getNextNtfSubSMPActions Connection
db SMPServer
srv Int
ntfBatchSize) ((NonEmpty (NtfSubSMPAction, NtfSubscription)
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NonEmpty (NtfSubSMPAction, NtfSubscription)
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \NonEmpty (NtfSubSMPAction, NtfSubscription)
nextSubs -> do
Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"runNtfSMPWorker - length nextSubs = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow (NonEmpty (NtfSubSMPAction, NtfSubscription) -> Int
forall a. NonEmpty a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length NonEmpty (NtfSubSMPAction, NtfSubscription)
nextSubs)
let ([NtfSubscription]
creates, [NtfSubscription]
deletes) = NonEmpty (NtfSubSMPAction, NtfSubscription)
-> ([NtfSubscription], [NtfSubscription])
splitActions NonEmpty (NtfSubSMPAction, NtfSubscription)
nextSubs
AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
creates [NtfSubscription] -> AM' [NtfSubscription]
createNotifierKeys
AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
deletes [NtfSubscription] -> AM' [NtfSubscription]
deleteNotifierKeys
splitActions :: NonEmpty (NtfSubSMPAction, NtfSubscription) -> ([NtfSubscription], [NtfSubscription])
splitActions :: NonEmpty (NtfSubSMPAction, NtfSubscription)
-> ([NtfSubscription], [NtfSubscription])
splitActions = ((NtfSubSMPAction, NtfSubscription)
-> ([NtfSubscription], [NtfSubscription])
-> ([NtfSubscription], [NtfSubscription]))
-> ([NtfSubscription], [NtfSubscription])
-> NonEmpty (NtfSubSMPAction, NtfSubscription)
-> ([NtfSubscription], [NtfSubscription])
forall a b. (a -> b -> b) -> b -> NonEmpty a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (NtfSubSMPAction, NtfSubscription)
-> ([NtfSubscription], [NtfSubscription])
-> ([NtfSubscription], [NtfSubscription])
forall {a}. (NtfSubSMPAction, a) -> ([a], [a]) -> ([a], [a])
addAction ([], [])
where
addAction :: (NtfSubSMPAction, a) -> ([a], [a]) -> ([a], [a])
addAction (NtfSubSMPAction
cmd, a
sub) ([a]
creates, [a]
deletes) = case NtfSubSMPAction
cmd of
NtfSubSMPAction
NSASmpKey -> (a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
creates, [a]
deletes)
NtfSubSMPAction
NSASmpDelete -> ([a]
creates, a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
deletes)
createNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
createNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
createNotifierKeys [NtfSubscription]
ntfSubs =
ReaderT Env IO (Maybe NtfToken)
getNtfToken ReaderT Env IO (Maybe NtfToken)
-> (Maybe NtfToken -> AM' [NtfSubscription])
-> AM' [NtfSubscription]
forall a b.
ReaderT Env IO a -> (a -> ReaderT Env IO b) -> ReaderT Env IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just NtfToken {$sel:ntfTknStatus:NtfToken :: NtfToken -> NtfTknStatus
ntfTknStatus = NtfTknStatus
NTActive, $sel:ntfMode:NtfToken :: NtfToken -> NotificationsMode
ntfMode = NotificationsMode
NMInstant} -> do
([(ConnId, AgentErrorType)]
errs1, [EnableQueueNtfReq]
subRqKeys) <- [NtfSubscription]
-> AM' ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
prepareQueueSmpKey [NtfSubscription]
ntfSubs
[(EnableQueueNtfReq,
Either AgentErrorType (NtfSubscriptionId, RcvNtfPublicDhKey))]
rs <- AgentClient
-> [EnableQueueNtfReq]
-> AM'
[(EnableQueueNtfReq,
Either AgentErrorType (NtfSubscriptionId, RcvNtfPublicDhKey))]
enableQueuesNtfs AgentClient
c [EnableQueueNtfReq]
subRqKeys
let ([EnableQueueNtfReq]
subRqKeys', [(EnableQueueNtfReq, AgentErrorType)]
errs2, [(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))]
successes) = [(EnableQueueNtfReq,
Either AgentErrorType (NtfSubscriptionId, RcvNtfPublicDhKey))]
-> ([EnableQueueNtfReq], [(EnableQueueNtfReq, AgentErrorType)],
[(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))])
forall a r.
[(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults [(EnableQueueNtfReq,
Either AgentErrorType (NtfSubscriptionId, RcvNtfPublicDhKey))]
rs
ntfSubs' :: [NtfSubscription]
ntfSubs' = (EnableQueueNtfReq -> NtfSubscription)
-> [EnableQueueNtfReq] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map EnableQueueNtfReq -> NtfSubscription
eqnrNtfSub [EnableQueueNtfReq]
subRqKeys'
errs2' :: [(ConnId, AgentErrorType)]
errs2' = ((EnableQueueNtfReq, AgentErrorType) -> (ConnId, AgentErrorType))
-> [(EnableQueueNtfReq, AgentErrorType)]
-> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map ((EnableQueueNtfReq -> ConnId)
-> (EnableQueueNtfReq, AgentErrorType) -> (ConnId, AgentErrorType)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId (RcvQueue -> ConnId)
-> (EnableQueueNtfReq -> RcvQueue) -> EnableQueueNtfReq -> ConnId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EnableQueueNtfReq -> RcvQueue
eqnrRq)) [(EnableQueueNtfReq, AgentErrorType)]
errs2
NtfActionTs
ts <- IO NtfActionTs -> ReaderT Env IO NtfActionTs
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO NtfActionTs
getCurrentTime
([(ConnId, AgentErrorType)]
errs3, [NtfServer]
srvs) <- ((EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> ConnId)
-> [(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))]
-> [Either AgentErrorType NtfServer]
-> ([(ConnId, AgentErrorType)], [NtfServer])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs (RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId (RcvQueue -> ConnId)
-> ((EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> RcvQueue)
-> (EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> ConnId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EnableQueueNtfReq -> RcvQueue
eqnrRq (EnableQueueNtfReq -> RcvQueue)
-> ((EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> EnableQueueNtfReq)
-> (EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> RcvQueue
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> EnableQueueNtfReq
forall a b. (a, b) -> a
fst) [(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))]
successes ([Either AgentErrorType NtfServer]
-> ([(ConnId, AgentErrorType)], [NtfServer]))
-> ReaderT Env IO [Either AgentErrorType NtfServer]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [NtfServer])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO NtfServer])
-> ReaderT Env IO [Either AgentErrorType NtfServer]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> ((EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> IO NtfServer)
-> [(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))]
-> [IO NtfServer]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfActionTs
-> (EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> IO NtfServer
storeNtfSubCreds Connection
db NtfActionTs
ts) [(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))]
successes)
(NtfServer -> AM' Worker) -> Set NtfServer -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfNTFWorker Bool
True AgentClient
c) (Set NtfServer -> AM' ()) -> Set NtfServer -> AM' ()
forall a b. (a -> b) -> a -> b
$ [NtfServer] -> Set NtfServer
forall a. Ord a => [a] -> Set a
S.fromList [NtfServer]
srvs
AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c ([(ConnId, AgentErrorType)] -> AM' ())
-> [(ConnId, AgentErrorType)] -> AM' ()
forall a b. (a -> b) -> a -> b
$ [(ConnId, AgentErrorType)]
errs1 [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs2' [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs3
[NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [NtfSubscription]
ntfSubs'
Maybe NtfToken
_ -> do
let errs :: [(ConnId, AgentErrorType)]
errs = (NtfSubscription -> (ConnId, AgentErrorType))
-> [NtfSubscription] -> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map (\NtfSubscription
sub -> (NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, String -> AgentErrorType
INTERNAL String
"NSASmpKey - no active token")) [NtfSubscription]
ntfSubs
AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
errs
[NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
where
prepareQueueSmpKey :: [NtfSubscription] -> AM' ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
prepareQueueSmpKey :: [NtfSubscription]
-> AM' ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
prepareQueueSmpKey [NtfSubscription]
subs = do
AuthAlg
alg <- (Env -> AuthAlg) -> ReaderT Env IO AuthAlg
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (AgentConfig -> AuthAlg
rcvAuthAlg (AgentConfig -> AuthAlg) -> (Env -> AgentConfig) -> Env -> AuthAlg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config)
TVar ChaChaDRG
g <- (Env -> TVar ChaChaDRG) -> ReaderT Env IO (TVar ChaChaDRG)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> TVar ChaChaDRG
random
(NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType EnableQueueNtfReq]
-> ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
subs ([Either AgentErrorType EnableQueueNtfReq]
-> ([(ConnId, AgentErrorType)], [EnableQueueNtfReq]))
-> ReaderT Env IO [Either AgentErrorType EnableQueueNtfReq]
-> AM' ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType EnableQueueNtfReq)])
-> ReaderT Env IO [Either AgentErrorType EnableQueueNtfReq]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (NtfSubscription -> IO (Either AgentErrorType EnableQueueNtfReq))
-> [NtfSubscription]
-> [IO (Either AgentErrorType EnableQueueNtfReq)]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> AuthAlg
-> TVar ChaChaDRG
-> NtfSubscription
-> IO (Either AgentErrorType EnableQueueNtfReq)
getQueue Connection
db AuthAlg
alg TVar ChaChaDRG
g) [NtfSubscription]
subs)
where
getQueue :: DB.Connection -> C.AuthAlg -> TVar ChaChaDRG -> NtfSubscription -> IO (Either AgentErrorType EnableQueueNtfReq)
getQueue :: Connection
-> AuthAlg
-> TVar ChaChaDRG
-> NtfSubscription
-> IO (Either AgentErrorType EnableQueueNtfReq)
getQueue Connection
db (C.AuthAlg SAlgorithm a
a) TVar ChaChaDRG
g NtfSubscription
sub = (Either StoreError EnableQueueNtfReq
-> Either AgentErrorType EnableQueueNtfReq)
-> IO (Either StoreError EnableQueueNtfReq)
-> IO (Either AgentErrorType EnableQueueNtfReq)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((StoreError -> AgentErrorType)
-> Either StoreError EnableQueueNtfReq
-> Either AgentErrorType EnableQueueNtfReq
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError) (IO (Either StoreError EnableQueueNtfReq)
-> IO (Either AgentErrorType EnableQueueNtfReq))
-> IO (Either StoreError EnableQueueNtfReq)
-> IO (Either AgentErrorType EnableQueueNtfReq)
forall a b. (a -> b) -> a -> b
$ ExceptT StoreError IO EnableQueueNtfReq
-> IO (Either StoreError EnableQueueNtfReq)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT StoreError IO EnableQueueNtfReq
-> IO (Either StoreError EnableQueueNtfReq))
-> ExceptT StoreError IO EnableQueueNtfReq
-> IO (Either StoreError EnableQueueNtfReq)
forall a b. (a -> b) -> a -> b
$ do
RcvQueue
rq <- IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue)
-> IO (Either StoreError RcvQueue)
-> ExceptT StoreError IO RcvQueue
forall a b. (a -> b) -> a -> b
$ Connection -> ConnId -> IO (Either StoreError RcvQueue)
getPrimaryRcvQueue Connection
db (NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub)
(APublicAuthKey, APrivateAuthKey)
authKeyPair <- STM AAuthKeyPair -> ExceptT StoreError IO AAuthKeyPair
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM AAuthKeyPair -> ExceptT StoreError IO AAuthKeyPair)
-> STM AAuthKeyPair -> ExceptT StoreError IO AAuthKeyPair
forall a b. (a -> b) -> a -> b
$ SAlgorithm a -> TVar ChaChaDRG -> STM AAuthKeyPair
forall (a :: Algorithm).
(AlgorithmI a, AuthAlgorithm a) =>
SAlgorithm a -> TVar ChaChaDRG -> STM AAuthKeyPair
C.generateAuthKeyPair SAlgorithm a
a TVar ChaChaDRG
g
(RcvNtfPublicDhKey, PrivateKey 'X25519)
rcvNtfKeyPair <- STM (RcvNtfPublicDhKey, PrivateKey 'X25519)
-> ExceptT StoreError IO (RcvNtfPublicDhKey, PrivateKey 'X25519)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (RcvNtfPublicDhKey, PrivateKey 'X25519)
-> ExceptT StoreError IO (RcvNtfPublicDhKey, PrivateKey 'X25519))
-> STM (RcvNtfPublicDhKey, PrivateKey 'X25519)
-> ExceptT StoreError IO (RcvNtfPublicDhKey, PrivateKey 'X25519)
forall a b. (a -> b) -> a -> b
$ TVar ChaChaDRG -> STM (KeyPair 'X25519)
forall (a :: Algorithm).
AlgorithmI a =>
TVar ChaChaDRG -> STM (KeyPair a)
C.generateKeyPair TVar ChaChaDRG
g
EnableQueueNtfReq -> ExceptT StoreError IO EnableQueueNtfReq
forall a. a -> ExceptT StoreError IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NtfSubscription
-> RcvQueue -> AAuthKeyPair -> KeyPair 'X25519 -> EnableQueueNtfReq
EnableQueueNtfReq NtfSubscription
sub RcvQueue
rq AAuthKeyPair
(APublicAuthKey, APrivateAuthKey)
authKeyPair KeyPair 'X25519
(RcvNtfPublicDhKey, PrivateKey 'X25519)
rcvNtfKeyPair)
storeNtfSubCreds :: DB.Connection -> UTCTime -> (EnableQueueNtfReq, (SMP.NotifierId, SMP.RcvNtfPublicDhKey)) -> IO NtfServer
storeNtfSubCreds :: Connection
-> NtfActionTs
-> (EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> IO NtfServer
storeNtfSubCreds Connection
db NtfActionTs
ts (EnableQueueNtfReq {NtfSubscription
$sel:eqnrNtfSub:EnableQueueNtfReq :: EnableQueueNtfReq -> NtfSubscription
eqnrNtfSub :: NtfSubscription
eqnrNtfSub, $sel:eqnrAuthKeyPair:EnableQueueNtfReq :: EnableQueueNtfReq -> AAuthKeyPair
eqnrAuthKeyPair = (PublicKeyType APrivateAuthKey
ntfPublicKey, APrivateAuthKey
ntfPrivateKey), $sel:eqnrRcvKeyPair:EnableQueueNtfReq :: EnableQueueNtfReq -> KeyPair 'X25519
eqnrRcvKeyPair = (PublicKeyType (PrivateKey 'X25519)
_, PrivateKey 'X25519
pk)}, (NtfSubscriptionId
notifierId, RcvNtfPublicDhKey
srvPubDhKey)) = do
let NtfSubscription {NtfServer
$sel:ntfServer:NtfSubscription :: NtfSubscription -> NtfServer
ntfServer :: NtfServer
ntfServer} = NtfSubscription
eqnrNtfSub
rcvNtfDhSecret :: DhSecret 'X25519
rcvNtfDhSecret = RcvNtfPublicDhKey -> PrivateKey 'X25519 -> DhSecret 'X25519
forall (a :: Algorithm).
DhAlgorithm a =>
PublicKey a -> PrivateKey a -> DhSecret a
C.dh' RcvNtfPublicDhKey
srvPubDhKey PrivateKey 'X25519
pk
Connection -> ConnId -> Maybe ClientNtfCreds -> IO ()
setRcvQueueNtfCreds Connection
db (NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
eqnrNtfSub) (Maybe ClientNtfCreds -> IO ()) -> Maybe ClientNtfCreds -> IO ()
forall a b. (a -> b) -> a -> b
$ ClientNtfCreds -> Maybe ClientNtfCreds
forall a. a -> Maybe a
Just ClientNtfCreds {PublicKeyType APrivateAuthKey
APublicAuthKey
ntfPublicKey :: PublicKeyType APrivateAuthKey
$sel:ntfPublicKey:ClientNtfCreds :: APublicAuthKey
ntfPublicKey, APrivateAuthKey
$sel:ntfPrivateKey:ClientNtfCreds :: APrivateAuthKey
ntfPrivateKey :: APrivateAuthKey
ntfPrivateKey, NtfSubscriptionId
$sel:notifierId:ClientNtfCreds :: NtfSubscriptionId
notifierId :: NtfSubscriptionId
notifierId, DhSecret 'X25519
rcvNtfDhSecret :: DhSecret 'X25519
$sel:rcvNtfDhSecret:ClientNtfCreds :: DhSecret 'X25519
rcvNtfDhSecret}
Connection
-> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
updateNtfSubscription Connection
db NtfSubscription
eqnrNtfSub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey} (NtfSubNTFAction -> NtfSubAction
NSANtf NtfSubNTFAction
NSACreate) NtfActionTs
ts
NtfServer -> IO NtfServer
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NtfServer
ntfServer
deleteNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
deleteNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
deleteNotifierKeys [NtfSubscription]
ntfSubs = do
([(ConnId, AgentErrorType)]
errs1, [DisableQueueNtfReq]
subRqs) <- (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType DisableQueueNtfReq]
-> ([(ConnId, AgentErrorType)], [DisableQueueNtfReq])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
ntfSubs ([Either AgentErrorType DisableQueueNtfReq]
-> ([(ConnId, AgentErrorType)], [DisableQueueNtfReq]))
-> ReaderT Env IO [Either AgentErrorType DisableQueueNtfReq]
-> ReaderT
Env IO ([(ConnId, AgentErrorType)], [DisableQueueNtfReq])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType DisableQueueNtfReq)])
-> ReaderT Env IO [Either AgentErrorType DisableQueueNtfReq]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (NtfSubscription -> IO (Either AgentErrorType DisableQueueNtfReq))
-> [NtfSubscription]
-> [IO (Either AgentErrorType DisableQueueNtfReq)]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfSubscription -> IO (Either AgentErrorType DisableQueueNtfReq)
resetCredsGetQueue Connection
db) [NtfSubscription]
ntfSubs)
[(DisableQueueNtfReq, Either AgentErrorType ())]
rs <- AgentClient
-> [DisableQueueNtfReq]
-> AM' [(DisableQueueNtfReq, Either AgentErrorType ())]
disableQueuesNtfs AgentClient
c [DisableQueueNtfReq]
subRqs
let ([DisableQueueNtfReq]
subRqs', [(DisableQueueNtfReq, AgentErrorType)]
errs2, [(DisableQueueNtfReq, ())]
successes) = [(DisableQueueNtfReq, Either AgentErrorType ())]
-> ([DisableQueueNtfReq], [(DisableQueueNtfReq, AgentErrorType)],
[(DisableQueueNtfReq, ())])
forall a r.
[(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults [(DisableQueueNtfReq, Either AgentErrorType ())]
rs
ntfSubs' :: [NtfSubscription]
ntfSubs' = (DisableQueueNtfReq -> NtfSubscription)
-> [DisableQueueNtfReq] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map DisableQueueNtfReq -> NtfSubscription
forall a b. (a, b) -> a
fst [DisableQueueNtfReq]
subRqs'
errs2' :: [(ConnId, AgentErrorType)]
errs2' = ((DisableQueueNtfReq, AgentErrorType) -> (ConnId, AgentErrorType))
-> [(DisableQueueNtfReq, AgentErrorType)]
-> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map ((DisableQueueNtfReq -> ConnId)
-> (DisableQueueNtfReq, AgentErrorType) -> (ConnId, AgentErrorType)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId (RcvQueue -> ConnId)
-> (DisableQueueNtfReq -> RcvQueue) -> DisableQueueNtfReq -> ConnId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DisableQueueNtfReq -> RcvQueue
forall a b. (a, b) -> b
snd)) [(DisableQueueNtfReq, AgentErrorType)]
errs2
disabledRqs :: [RcvQueue]
disabledRqs = ((DisableQueueNtfReq, ()) -> RcvQueue)
-> [(DisableQueueNtfReq, ())] -> [RcvQueue]
forall a b. (a -> b) -> [a] -> [b]
map (DisableQueueNtfReq -> RcvQueue
forall a b. (a, b) -> b
snd (DisableQueueNtfReq -> RcvQueue)
-> ((DisableQueueNtfReq, ()) -> DisableQueueNtfReq)
-> (DisableQueueNtfReq, ())
-> RcvQueue
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (DisableQueueNtfReq, ()) -> DisableQueueNtfReq
forall a b. (a, b) -> a
fst) [(DisableQueueNtfReq, ())]
successes
([(ConnId, AgentErrorType)]
errs3, [()]
_) <- (RcvQueue -> ConnId)
-> [RcvQueue]
-> [Either AgentErrorType ()]
-> ([(ConnId, AgentErrorType)], [()])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId [RcvQueue]
disabledRqs ([Either AgentErrorType ()] -> ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO [Either AgentErrorType ()]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> (RcvQueue -> IO ()) -> [RcvQueue] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> RcvQueue -> IO ()
deleteSub Connection
db) [RcvQueue]
disabledRqs)
AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c ([(ConnId, AgentErrorType)] -> AM' ())
-> [(ConnId, AgentErrorType)] -> AM' ()
forall a b. (a -> b) -> a -> b
$ [(ConnId, AgentErrorType)]
errs1 [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs2' [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs3
[NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [NtfSubscription]
ntfSubs'
where
resetCredsGetQueue :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType DisableQueueNtfReq)
resetCredsGetQueue :: Connection
-> NtfSubscription -> IO (Either AgentErrorType DisableQueueNtfReq)
resetCredsGetQueue Connection
db sub :: NtfSubscription
sub@NtfSubscription {ConnId
$sel:connId:NtfSubscription :: NtfSubscription -> ConnId
connId :: ConnId
connId} = (Either StoreError DisableQueueNtfReq
-> Either AgentErrorType DisableQueueNtfReq)
-> IO (Either StoreError DisableQueueNtfReq)
-> IO (Either AgentErrorType DisableQueueNtfReq)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((StoreError -> AgentErrorType)
-> Either StoreError DisableQueueNtfReq
-> Either AgentErrorType DisableQueueNtfReq
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError) (IO (Either StoreError DisableQueueNtfReq)
-> IO (Either AgentErrorType DisableQueueNtfReq))
-> IO (Either StoreError DisableQueueNtfReq)
-> IO (Either AgentErrorType DisableQueueNtfReq)
forall a b. (a -> b) -> a -> b
$ ExceptT StoreError IO DisableQueueNtfReq
-> IO (Either StoreError DisableQueueNtfReq)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT StoreError IO DisableQueueNtfReq
-> IO (Either StoreError DisableQueueNtfReq))
-> ExceptT StoreError IO DisableQueueNtfReq
-> IO (Either StoreError DisableQueueNtfReq)
forall a b. (a -> b) -> a -> b
$ do
IO () -> ExceptT StoreError IO ()
forall a. IO a -> ExceptT StoreError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT StoreError IO ())
-> IO () -> ExceptT StoreError IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> ConnId -> Maybe ClientNtfCreds -> IO ()
setRcvQueueNtfCreds Connection
db ConnId
connId Maybe ClientNtfCreds
forall a. Maybe a
Nothing
RcvQueue
rq <- IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue)
-> IO (Either StoreError RcvQueue)
-> ExceptT StoreError IO RcvQueue
forall a b. (a -> b) -> a -> b
$ Connection -> ConnId -> IO (Either StoreError RcvQueue)
getPrimaryRcvQueue Connection
db ConnId
connId
DisableQueueNtfReq -> ExceptT StoreError IO DisableQueueNtfReq
forall a. a -> ExceptT StoreError IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NtfSubscription
sub, RcvQueue
rq)
deleteSub :: DB.Connection -> RcvQueue -> IO ()
deleteSub :: Connection -> RcvQueue -> IO ()
deleteSub Connection
db RcvQueue
rq = Connection -> ConnId -> IO ()
deleteNtfSubscription Connection
db (RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId RcvQueue
rq)
retrySubActions :: AgentClient -> [NtfSubscription] -> ([NtfSubscription] -> AM' [NtfSubscription]) -> AM ()
retrySubActions :: AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
_ [] [NtfSubscription] -> AM' [NtfSubscription]
_ = () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
retrySubActions AgentClient
c [NtfSubscription]
subs [NtfSubscription] -> AM' [NtfSubscription]
action = do
TVar [NtfSubscription]
v <- [NtfSubscription]
-> ExceptT AgentErrorType (ReaderT Env IO) (TVar [NtfSubscription])
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO [NtfSubscription]
subs
RetryInterval
ri <- (Env -> RetryInterval)
-> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> RetryInterval)
-> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval)
-> (Env -> RetryInterval)
-> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval
forall a b. (a -> b) -> a -> b
$ AgentConfig -> RetryInterval
reconnectInterval (AgentConfig -> RetryInterval)
-> (Env -> AgentConfig) -> Env -> RetryInterval
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
RetryInterval
-> (Int64
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
MonadIO m =>
RetryInterval -> (Int64 -> m a -> m a) -> m a
withRetryInterval RetryInterval
ri ((Int64
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Int64
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Int64
_ ExceptT AgentErrorType (ReaderT Env IO) ()
loop -> do
IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitWhileSuspended AgentClient
c
IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitForUserNetwork AgentClient
c
[NtfSubscription]
subs' <- TVar [NtfSubscription]
-> ExceptT AgentErrorType (ReaderT Env IO) [NtfSubscription]
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar [NtfSubscription]
v
[NtfSubscription]
retrySubs <- AM' [NtfSubscription]
-> ExceptT AgentErrorType (ReaderT Env IO) [NtfSubscription]
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' [NtfSubscription]
-> ExceptT AgentErrorType (ReaderT Env IO) [NtfSubscription])
-> AM' [NtfSubscription]
-> ExceptT AgentErrorType (ReaderT Env IO) [NtfSubscription]
forall a b. (a -> b) -> a -> b
$ [NtfSubscription] -> AM' [NtfSubscription]
action [NtfSubscription]
subs'
Bool
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([NtfSubscription] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [NtfSubscription]
retrySubs) (ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ do
STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ TVar [NtfSubscription] -> [NtfSubscription] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [NtfSubscription]
v [NtfSubscription]
retrySubs
AgentClient
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retryNetworkLoop AgentClient
c ExceptT AgentErrorType (ReaderT Env IO) ()
loop
splitResults :: [(a, Either AgentErrorType r)] -> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults :: forall a r.
[(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults = ((a, Either AgentErrorType r)
-> ([a], [(a, AgentErrorType)], [(a, r)])
-> ([a], [(a, AgentErrorType)], [(a, r)]))
-> ([a], [(a, AgentErrorType)], [(a, r)])
-> [(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (a, Either AgentErrorType r)
-> ([a], [(a, AgentErrorType)], [(a, r)])
-> ([a], [(a, AgentErrorType)], [(a, r)])
forall {a} {b}.
(a, Either AgentErrorType b)
-> ([a], [(a, AgentErrorType)], [(a, b)])
-> ([a], [(a, AgentErrorType)], [(a, b)])
addRes ([], [], [])
where
addRes :: (a, Either AgentErrorType b)
-> ([a], [(a, AgentErrorType)], [(a, b)])
-> ([a], [(a, AgentErrorType)], [(a, b)])
addRes (a
a, Either AgentErrorType b
r_) ([a]
as, [(a, AgentErrorType)]
errs, [(a, b)]
rs) = case Either AgentErrorType b
r_ of
Right b
r -> ([a]
as, [(a, AgentErrorType)]
errs, (a
a, b
r) (a, b) -> [(a, b)] -> [(a, b)]
forall a. a -> [a] -> [a]
: [(a, b)]
rs)
Left AgentErrorType
e
| AgentErrorType -> Bool
temporaryOrHostError AgentErrorType
e -> (a
a a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
as, [(a, AgentErrorType)]
errs, [(a, b)]
rs)
| Bool
otherwise -> ([a]
as, (a
a, AgentErrorType
e) (a, AgentErrorType)
-> [(a, AgentErrorType)] -> [(a, AgentErrorType)]
forall a. a -> [a] -> [a]
: [(a, AgentErrorType)]
errs, [(a, b)]
rs)
rescheduleWork :: TMVar () -> UTCTime -> UTCTime -> AM' ()
rescheduleWork :: TMVar () -> NtfActionTs -> NtfActionTs -> AM' ()
rescheduleWork TMVar ()
doWork NtfActionTs
ts NtfActionTs
actionTs = do
ReaderT Env IO (Maybe ()) -> AM' ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ReaderT Env IO (Maybe ()) -> AM' ())
-> (STM (Maybe ()) -> ReaderT Env IO (Maybe ()))
-> STM (Maybe ())
-> AM' ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe ()) -> ReaderT Env IO (Maybe ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe ()) -> AM' ()) -> STM (Maybe ()) -> AM' ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> STM (Maybe ())
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar ()
doWork
ReaderT Env IO ThreadId -> AM' ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ReaderT Env IO ThreadId -> AM' ())
-> (AM' () -> ReaderT Env IO ThreadId) -> AM' () -> AM' ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AM' () -> ReaderT Env IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (AM' () -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> AM' ()
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM' ()) -> IO () -> AM' ()
forall a b. (a -> b) -> a -> b
$ Int64 -> IO ()
threadDelay' (Int64 -> IO ()) -> Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Int64
diffToMicroseconds (NominalDiffTime -> Int64) -> NominalDiffTime -> Int64
forall a b. (a -> b) -> a -> b
$ NtfActionTs -> NtfActionTs -> NominalDiffTime
diffUTCTime NtfActionTs
actionTs NtfActionTs
ts
STM () -> AM' ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM' ()) -> STM () -> AM' ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> STM ()
hasWorkToDo' TMVar ()
doWork
retryNetworkLoop :: AgentClient -> AM () -> AM ()
retryNetworkLoop :: AgentClient
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retryNetworkLoop AgentClient
c ExceptT AgentErrorType (ReaderT Env IO) ()
loop = do
STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> AgentOperation -> STM ()
endAgentOperation AgentClient
c AgentOperation
AONtfNetwork
IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
throwWhenInactive AgentClient
c
STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> AgentOperation -> STM ()
beginAgentOperation AgentClient
c AgentOperation
AONtfNetwork
ExceptT AgentErrorType (ReaderT Env IO) ()
loop
workerErrors :: AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors :: AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
connErrs =
Bool -> AM' () -> AM' ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(ConnId, AgentErrorType)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(ConnId, AgentErrorType)]
connErrs) (AM' () -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$ do
ReaderT Env IO [Either AgentErrorType ()] -> AM' ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ReaderT Env IO [Either AgentErrorType ()] -> AM' ())
-> ReaderT Env IO [Either AgentErrorType ()] -> AM' ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> ((ConnId, AgentErrorType) -> IO ())
-> [(ConnId, AgentErrorType)] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> ConnId -> IO ()
setNullNtfSubscriptionAction Connection
db (ConnId -> IO ())
-> ((ConnId, AgentErrorType) -> ConnId)
-> (ConnId, AgentErrorType)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ConnId, AgentErrorType) -> ConnId
forall a b. (a, b) -> a
fst) [(ConnId, AgentErrorType)]
connErrs)
AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c [(ConnId, AgentErrorType)]
connErrs
workerInternalError :: AgentClient -> ConnId -> String -> AM ()
workerInternalError :: AgentClient
-> ConnId -> String -> ExceptT AgentErrorType (ReaderT Env IO) ()
workerInternalError AgentClient
c ConnId
connId String
internalErrStr = do
AgentClient
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> ConnId -> IO ()
setNullNtfSubscriptionAction Connection
db ConnId
connId
AgentClient
-> ConnId -> String -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> ConnId -> String -> m ()
notifyInternalError AgentClient
c ConnId
connId String
internalErrStr
notifyInternalError :: MonadIO m => AgentClient -> ConnId -> String -> m ()
notifyInternalError :: forall (m :: * -> *).
MonadIO m =>
AgentClient -> ConnId -> String -> m ()
notifyInternalError AgentClient {TBQueue ATransmission
subQ :: TBQueue ATransmission
$sel:subQ:AgentClient :: AgentClient -> TBQueue ATransmission
subQ} ConnId
connId String
internalErrStr = do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logError (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
internalErrStr
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue ATransmission -> ATransmission -> IO ()
forall a. TBQueue a -> a -> IO ()
nonBlockingWriteTBQueue TBQueue ATransmission
subQ (ConnId
"", ConnId
connId, SAEntity 'AEConn -> AEvent 'AEConn -> AEvt
forall (e :: AEntity). AEntityI e => SAEntity e -> AEvent e -> AEvt
AEvt SAEntity 'AEConn
SAEConn (AEvent 'AEConn -> AEvt) -> AEvent 'AEConn -> AEvt
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AEvent 'AEConn
ERR (AgentErrorType -> AEvent 'AEConn)
-> AgentErrorType -> AEvent 'AEConn
forall a b. (a -> b) -> a -> b
$ String -> AgentErrorType
INTERNAL String
internalErrStr)
notifyInternalError' :: MonadIO m => AgentClient -> String -> m ()
notifyInternalError' :: forall (m :: * -> *). MonadIO m => AgentClient -> String -> m ()
notifyInternalError' AgentClient
c = AgentClient -> ConnId -> String -> m ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> ConnId -> String -> m ()
notifyInternalError AgentClient
c ConnId
""
{-# INLINE notifyInternalError' #-}
notifyErrs :: MonadIO m => AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs :: forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c [(ConnId, AgentErrorType)]
errs_ = Maybe (NonEmpty (ConnId, AgentErrorType))
-> (NonEmpty (ConnId, AgentErrorType) -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([(ConnId, AgentErrorType)]
-> Maybe (NonEmpty (ConnId, AgentErrorType))
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [(ConnId, AgentErrorType)]
errs_) ((NonEmpty (ConnId, AgentErrorType) -> m ()) -> m ())
-> (NonEmpty (ConnId, AgentErrorType) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \NonEmpty (ConnId, AgentErrorType)
errs -> do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logError (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"notifyErrs: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> NonEmpty (ConnId, AgentErrorType) -> Text
forall a. Show a => a -> Text
tshow NonEmpty (ConnId, AgentErrorType)
errs
AgentClient -> AEvent 'AENone -> m ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> AEvent 'AENone -> m ()
notifySub AgentClient
c (AEvent 'AENone -> m ()) -> AEvent 'AENone -> m ()
forall a b. (a -> b) -> a -> b
$ NonEmpty (ConnId, AgentErrorType) -> AEvent 'AENone
ERRS NonEmpty (ConnId, AgentErrorType)
errs
getNtfToken :: AM' (Maybe NtfToken)
getNtfToken :: ReaderT Env IO (Maybe NtfToken)
getNtfToken = do
TVar (Maybe NtfToken)
tkn <- (Env -> TVar (Maybe NtfToken))
-> ReaderT Env IO (TVar (Maybe NtfToken))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TVar (Maybe NtfToken))
-> ReaderT Env IO (TVar (Maybe NtfToken)))
-> (Env -> TVar (Maybe NtfToken))
-> ReaderT Env IO (TVar (Maybe NtfToken))
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TVar (Maybe NtfToken)
ntfTkn (NtfSupervisor -> TVar (Maybe NtfToken))
-> (Env -> NtfSupervisor) -> Env -> TVar (Maybe NtfToken)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> NtfSupervisor
ntfSupervisor
TVar (Maybe NtfToken) -> ReaderT Env IO (Maybe NtfToken)
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar (Maybe NtfToken)
tkn
nsUpdateToken :: NtfSupervisor -> NtfToken -> STM ()
nsUpdateToken :: NtfSupervisor -> NtfToken -> STM ()
nsUpdateToken NtfSupervisor
ns NtfToken
tkn = TVar (Maybe NtfToken) -> Maybe NtfToken -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (NtfSupervisor -> TVar (Maybe NtfToken)
ntfTkn NtfSupervisor
ns) (Maybe NtfToken -> STM ()) -> Maybe NtfToken -> STM ()
forall a b. (a -> b) -> a -> b
$ NtfToken -> Maybe NtfToken
forall a. a -> Maybe a
Just NtfToken
tkn
nsRemoveNtfToken :: NtfSupervisor -> STM ()
nsRemoveNtfToken :: NtfSupervisor -> STM ()
nsRemoveNtfToken NtfSupervisor
ns = TVar (Maybe NtfToken) -> Maybe NtfToken -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (NtfSupervisor -> TVar (Maybe NtfToken)
ntfTkn NtfSupervisor
ns) Maybe NtfToken
forall a. Maybe a
Nothing
sendNtfSubCommand :: NtfSupervisor -> (NtfSupervisorCommand, NonEmpty ConnId) -> IO ()
sendNtfSubCommand :: NtfSupervisor -> (NtfSupervisorCommand, NonEmpty ConnId) -> IO ()
sendNtfSubCommand NtfSupervisor
ns (NtfSupervisorCommand, NonEmpty ConnId)
cmd =
IO Bool -> IO () -> IO ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (NtfSupervisor -> IO Bool
hasInstantNotifications NtfSupervisor
ns) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> (NtfSupervisorCommand, NonEmpty ConnId) -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (NtfSupervisor -> TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
ntfSubQ NtfSupervisor
ns) (NtfSupervisorCommand, NonEmpty ConnId)
cmd
hasInstantNotifications :: NtfSupervisor -> IO Bool
hasInstantNotifications :: NtfSupervisor -> IO Bool
hasInstantNotifications NtfSupervisor
ns = do
Maybe NtfToken
tkn <- TVar (Maybe NtfToken) -> IO (Maybe NtfToken)
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar (Maybe NtfToken) -> IO (Maybe NtfToken))
-> TVar (Maybe NtfToken) -> IO (Maybe NtfToken)
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TVar (Maybe NtfToken)
ntfTkn NtfSupervisor
ns
Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> (NtfToken -> Bool) -> Maybe NtfToken -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
False NtfToken -> Bool
instantNotifications Maybe NtfToken
tkn
instantNotifications :: NtfToken -> Bool
instantNotifications :: NtfToken -> Bool
instantNotifications NtfToken {$sel:ntfTknStatus:NtfToken :: NtfToken -> NtfTknStatus
ntfTknStatus = NtfTknStatus
NTActive, $sel:ntfMode:NtfToken :: NtfToken -> NotificationsMode
ntfMode = NotificationsMode
NMInstant} = Bool
True
instantNotifications NtfToken
_ = Bool
False
{-# INLINE instantNotifications #-}
deleteToken :: AgentClient -> NtfToken -> AM ()
deleteToken :: AgentClient
-> NtfToken -> ExceptT AgentErrorType (ReaderT Env IO) ()
deleteToken AgentClient
c tkn :: NtfToken
tkn@NtfToken {NtfServer
$sel:ntfServer:NtfToken :: NtfToken -> NtfServer
ntfServer :: NtfServer
ntfServer, Maybe NtfSubscriptionId
$sel:ntfTokenId:NtfToken :: NtfToken -> Maybe NtfSubscriptionId
ntfTokenId :: Maybe NtfSubscriptionId
ntfTokenId, APrivateAuthKey
ntfPrivKey :: APrivateAuthKey
$sel:ntfPrivKey:NtfToken :: NtfToken -> APrivateAuthKey
ntfPrivKey} = do
Bool
setToDelete <- AgentClient -> (Connection -> IO Bool) -> AM Bool
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO Bool) -> AM Bool)
-> (Connection -> IO Bool) -> AM Bool
forall a b. (a -> b) -> a -> b
$ \Connection
db -> do
Connection -> NtfToken -> IO ()
removeNtfToken Connection
db NtfToken
tkn
case Maybe NtfSubscriptionId
ntfTokenId of
Just NtfSubscriptionId
tknId -> Connection
-> NtfServer -> APrivateAuthKey -> NtfSubscriptionId -> IO ()
addNtfTokenToDelete Connection
db NtfServer
ntfServer APrivateAuthKey
ntfPrivKey NtfSubscriptionId
tknId IO () -> Bool -> IO Bool
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Bool
True
Maybe NtfSubscriptionId
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
NtfSupervisor
ns <- (Env -> NtfSupervisor)
-> ExceptT AgentErrorType (ReaderT Env IO) NtfSupervisor
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> NtfSupervisor
ntfSupervisor
STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> STM ()
nsRemoveNtfToken NtfSupervisor
ns
Bool
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
setToDelete (ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ ExceptT AgentErrorType (ReaderT Env IO) Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ExceptT AgentErrorType (ReaderT Env IO) Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AM' Worker -> ExceptT AgentErrorType (ReaderT Env IO) Worker
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' Worker -> ExceptT AgentErrorType (ReaderT Env IO) Worker)
-> AM' Worker -> ExceptT AgentErrorType (ReaderT Env IO) Worker
forall a b. (a -> b) -> a -> b
$ Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfTknDelWorker Bool
True AgentClient
c NtfServer
ntfServer
runNtfTknDelWorker :: AgentClient -> NtfServer -> Worker -> AM ()
runNtfTknDelWorker :: AgentClient
-> NtfServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfTknDelWorker AgentClient
c NtfServer
srv Worker {TMVar ()
$sel:doWork:Worker :: Worker -> TMVar ()
doWork :: TMVar ()
doWork} =
ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ do
TMVar () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). MonadIO m => TMVar () -> m ()
waitForWork TMVar ()
doWork
ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> AgentOperation
-> (AgentClient -> IO ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
AgentClient
-> AgentOperation -> (AgentClient -> IO ()) -> m a -> m a
agentOperationBracket AgentClient
c AgentOperation
AONtfNetwork AgentClient -> IO ()
throwWhenInactive (ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ()))
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall a b. (a -> b) -> a -> b
$ ExceptT AgentErrorType (ReaderT Env IO) ()
-> ReaderT Env IO (Either AgentErrorType ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfOperation
where
runNtfOperation :: AM ()
runNtfOperation :: ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfOperation =
AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe NtfTokenToDelete)))
-> (NtfTokenToDelete -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a.
AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe a)))
-> (a -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
withWork AgentClient
c TMVar ()
doWork (Connection
-> NtfServer -> IO (Either StoreError (Maybe NtfTokenToDelete))
`getNextNtfTokenToDelete` NtfServer
srv) ((NtfTokenToDelete -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NtfTokenToDelete -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$
\NtfTokenToDelete
nextTknToDelete -> do
Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"runNtfTknDelWorker, nextTknToDelete " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> NtfTokenToDelete -> Text
forall a. Show a => a -> Text
tshow NtfTokenToDelete
nextTknToDelete
RetryInterval
ri <- (Env -> RetryInterval)
-> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> RetryInterval)
-> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval)
-> (Env -> RetryInterval)
-> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval
forall a b. (a -> b) -> a -> b
$ AgentConfig -> RetryInterval
reconnectInterval (AgentConfig -> RetryInterval)
-> (Env -> AgentConfig) -> Env -> RetryInterval
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
RetryInterval
-> (Int64
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
MonadIO m =>
RetryInterval -> (Int64 -> m a -> m a) -> m a
withRetryInterval RetryInterval
ri ((Int64
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Int64
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Int64
_ ExceptT AgentErrorType (ReaderT Env IO) ()
loop -> do
IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitWhileSuspended AgentClient
c
IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitForUserNetwork AgentClient
c
NtfTokenToDelete -> ExceptT AgentErrorType (ReaderT Env IO) ()
processTknToDelete NtfTokenToDelete
nextTknToDelete ExceptT AgentErrorType (ReaderT Env IO) ()
-> (AgentErrorType -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
`catchAllErrors` ExceptT AgentErrorType (ReaderT Env IO) ()
-> NtfTokenToDelete
-> AgentErrorType
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retryTmpError ExceptT AgentErrorType (ReaderT Env IO) ()
loop NtfTokenToDelete
nextTknToDelete
retryTmpError :: AM () -> NtfTokenToDelete -> AgentErrorType -> AM ()
retryTmpError :: ExceptT AgentErrorType (ReaderT Env IO) ()
-> NtfTokenToDelete
-> AgentErrorType
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retryTmpError ExceptT AgentErrorType (ReaderT Env IO) ()
loop (Int64
tknDbId, APrivateAuthKey
_, NtfSubscriptionId
_) AgentErrorType
e = do
Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logError (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"ntf tkn del error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> AgentErrorType -> Text
forall a. Show a => a -> Text
tshow AgentErrorType
e
if AgentErrorType -> Bool
temporaryOrHostError AgentErrorType
e
then AgentClient
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retryNetworkLoop AgentClient
c ExceptT AgentErrorType (ReaderT Env IO) ()
loop
else do
AgentClient
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> IO ()
deleteNtfTokenToDelete Connection
db Int64
tknDbId
AgentClient -> String -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). MonadIO m => AgentClient -> String -> m ()
notifyInternalError' AgentClient
c (AgentErrorType -> String
forall a. Show a => a -> String
show AgentErrorType
e)
processTknToDelete :: NtfTokenToDelete -> AM ()
processTknToDelete :: NtfTokenToDelete -> ExceptT AgentErrorType (ReaderT Env IO) ()
processTknToDelete (Int64
tknDbId, APrivateAuthKey
ntfPrivKey, NtfSubscriptionId
tknId) = do
AgentClient
-> NetworkRequestMode
-> NtfServer
-> APrivateAuthKey
-> NtfSubscriptionId
-> ExceptT AgentErrorType (ReaderT Env IO) ()
agentNtfDeleteToken AgentClient
c NetworkRequestMode
NRMBackground NtfServer
srv APrivateAuthKey
ntfPrivKey NtfSubscriptionId
tknId
AgentClient
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> IO ()
deleteNtfTokenToDelete Connection
db Int64
tknDbId
closeNtfSupervisor :: NtfSupervisor -> IO ()
closeNtfSupervisor :: NtfSupervisor -> IO ()
closeNtfSupervisor NtfSupervisor
ns = do
TMap NtfServer Worker -> IO ()
forall {m :: * -> *} {k}. MonadIO m => TVar (Map k Worker) -> m ()
stopWorkers (TMap NtfServer Worker -> IO ()) -> TMap NtfServer Worker -> IO ()
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap NtfServer Worker
ntfWorkers NtfSupervisor
ns
TMap SMPServer Worker -> IO ()
forall {m :: * -> *} {k}. MonadIO m => TVar (Map k Worker) -> m ()
stopWorkers (TMap SMPServer Worker -> IO ()) -> TMap SMPServer Worker -> IO ()
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap SMPServer Worker
ntfSMPWorkers NtfSupervisor
ns
TMap NtfServer Worker -> IO ()
forall {m :: * -> *} {k}. MonadIO m => TVar (Map k Worker) -> m ()
stopWorkers (TMap NtfServer Worker -> IO ()) -> TMap NtfServer Worker -> IO ()
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap NtfServer Worker
ntfTknDelWorkers NtfSupervisor
ns
where
stopWorkers :: TVar (Map k Worker) -> m ()
stopWorkers TVar (Map k Worker)
workers = STM (Map k Worker) -> m (Map k Worker)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar (Map k Worker) -> Map k Worker -> STM (Map k Worker)
forall a. TVar a -> a -> STM a
swapTVar TVar (Map k Worker)
workers Map k Worker
forall k a. Map k a
M.empty) m (Map k Worker) -> (Map k Worker -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Worker -> m ()) -> Map k Worker -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Worker -> IO ()) -> Worker -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Worker -> IO ()
cancelWorker)
getNtfServer :: AgentClient -> AM' (Maybe NtfServer)
getNtfServer :: AgentClient -> AM' (Maybe NtfServer)
getNtfServer AgentClient
c = do
[NtfServer]
ntfServers <- TVar [NtfServer] -> ReaderT Env IO [NtfServer]
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar [NtfServer] -> ReaderT Env IO [NtfServer])
-> TVar [NtfServer] -> ReaderT Env IO [NtfServer]
forall a b. (a -> b) -> a -> b
$ AgentClient -> TVar [NtfServer]
ntfServers AgentClient
c
case [NtfServer]
ntfServers of
[] -> Maybe NtfServer -> AM' (Maybe NtfServer)
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NtfServer
forall a. Maybe a
Nothing
[Item [NtfServer]
srv] -> Maybe NtfServer -> AM' (Maybe NtfServer)
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe NtfServer -> AM' (Maybe NtfServer))
-> Maybe NtfServer -> AM' (Maybe NtfServer)
forall a b. (a -> b) -> a -> b
$ NtfServer -> Maybe NtfServer
forall a. a -> Maybe a
Just Item [NtfServer]
NtfServer
srv
[NtfServer]
servers -> do
TVar StdGen
gen <- (Env -> TVar StdGen) -> ReaderT Env IO (TVar StdGen)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> TVar StdGen
randomServer
STM (Maybe NtfServer) -> AM' (Maybe NtfServer)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe NtfServer) -> AM' (Maybe NtfServer))
-> ((StdGen -> (Maybe NtfServer, StdGen)) -> STM (Maybe NtfServer))
-> (StdGen -> (Maybe NtfServer, StdGen))
-> AM' (Maybe NtfServer)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar StdGen
-> (StdGen -> (Maybe NtfServer, StdGen)) -> STM (Maybe NtfServer)
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar StdGen
gen ((StdGen -> (Maybe NtfServer, StdGen)) -> AM' (Maybe NtfServer))
-> (StdGen -> (Maybe NtfServer, StdGen)) -> AM' (Maybe NtfServer)
forall a b. (a -> b) -> a -> b
$
(Int -> Maybe NtfServer)
-> (Int, StdGen) -> (Maybe NtfServer, StdGen)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (NtfServer -> Maybe NtfServer
forall a. a -> Maybe a
Just (NtfServer -> Maybe NtfServer)
-> (Int -> NtfServer) -> Int -> Maybe NtfServer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([NtfServer]
servers [NtfServer] -> Int -> NtfServer
forall a. HasCallStack => [a] -> Int -> a
!!)) ((Int, StdGen) -> (Maybe NtfServer, StdGen))
-> (StdGen -> (Int, StdGen)) -> StdGen -> (Maybe NtfServer, StdGen)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int, Int) -> StdGen -> (Int, StdGen)
forall g. RandomGen g => (Int, Int) -> g -> (Int, g)
forall a g. (Random a, RandomGen g) => (a, a) -> g -> (a, g)
randomR (Int
0, [NtfServer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [NtfServer]
servers Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)