{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

module Simplex.Messaging.Agent.NtfSubSupervisor
  ( runNtfSupervisor,
    nsUpdateToken,
    nsRemoveNtfToken,
    sendNtfSubCommand,
    hasInstantNotifications,
    instantNotifications,
    deleteToken,
    closeNtfSupervisor,
    getNtfServer,
  )
where

import Control.Logger.Simple (logError, logInfo)
import Control.Monad
import Control.Monad.Reader
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import Data.Bifunctor (first)
import Data.Either (fromRight, partitionEithers)
import Data.Functor (($>))
import Data.List (foldl')
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Time (UTCTime, addUTCTime, getCurrentTime)
import Data.Time.Clock (diffUTCTime)
import Simplex.Messaging.Agent.Client
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.AgentStore
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Client (NetworkRequestMode (..), nonBlockingWriteTBQueue)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Protocol (NtfServer, sameSrvAddr)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Util (catchAllErrors, catchAllErrors', diffToMicroseconds, threadDelay', tryAllErrors, tshow, whenM)
import System.Random (randomR)
import UnliftIO
import UnliftIO.Concurrent (forkIO)
import qualified UnliftIO.Exception as E

runNtfSupervisor :: AgentClient -> AM' ()
runNtfSupervisor :: AgentClient -> AM' ()
runNtfSupervisor AgentClient
c = do
  NtfSupervisor
ns <- (Env -> NtfSupervisor) -> ReaderT Env IO NtfSupervisor
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> NtfSupervisor
ntfSupervisor
  ExceptT AgentErrorType (ReaderT Env IO) ()
-> ReaderT Env IO (Either AgentErrorType ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT AgentErrorType (ReaderT Env IO) ()
startTknDelete ReaderT Env IO (Either AgentErrorType ())
-> (Either AgentErrorType () -> AM' ()) -> AM' ()
forall a b.
ReaderT Env IO a -> (a -> ReaderT Env IO b) -> ReaderT Env IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Left AgentErrorType
e -> AgentErrorType -> AM' ()
forall {m :: * -> *} {a}. (MonadIO m, Show a) => a -> m ()
notifyErr AgentErrorType
e
    Right ()
_ -> () -> AM' ()
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  AM' () -> AM' ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (AM' () -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$ do
    (NtfSupervisorCommand, NonEmpty ConnId)
cmd <- STM (NtfSupervisorCommand, NonEmpty ConnId)
-> ReaderT Env IO (NtfSupervisorCommand, NonEmpty ConnId)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (NtfSupervisorCommand, NonEmpty ConnId)
 -> ReaderT Env IO (NtfSupervisorCommand, NonEmpty ConnId))
-> (TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
    -> STM (NtfSupervisorCommand, NonEmpty ConnId))
-> TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> ReaderT Env IO (NtfSupervisorCommand, NonEmpty ConnId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> STM (NtfSupervisorCommand, NonEmpty ConnId)
forall a. TBQueue a -> STM a
readTBQueue (TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
 -> ReaderT Env IO (NtfSupervisorCommand, NonEmpty ConnId))
-> TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> ReaderT Env IO (NtfSupervisorCommand, NonEmpty ConnId)
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
ntfSubQ NtfSupervisor
ns
    AM' () -> AM' ()
handleErr (AM' () -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> AgentOperation -> (AgentClient -> IO ()) -> AM' () -> AM' ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
AgentClient
-> AgentOperation -> (AgentClient -> IO ()) -> m a -> m a
agentOperationBracket AgentClient
c AgentOperation
AONtfNetwork AgentClient -> IO ()
waitUntilActive (AM' () -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$
      AgentClient
-> (NtfSupervisorCommand, NonEmpty ConnId)
-> ExceptT AgentErrorType (ReaderT Env IO) ()
processNtfCmd AgentClient
c (NtfSupervisorCommand, NonEmpty ConnId)
cmd ExceptT AgentErrorType (ReaderT Env IO) ()
-> (AgentErrorType -> AM' ()) -> AM' ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> m a) -> m a
`catchAllErrors'` AgentErrorType -> AM' ()
forall {m :: * -> *} {a}. (MonadIO m, Show a) => a -> m ()
notifyErr
  where
    startTknDelete :: AM ()
    startTknDelete :: ExceptT AgentErrorType (ReaderT Env IO) ()
startTknDelete = do
      [NtfServer]
pendingDelServers <- AgentClient -> (Connection -> IO [NtfServer]) -> AM [NtfServer]
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c Connection -> IO [NtfServer]
getPendingDelTknServers
      AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ((NtfServer -> AM' Worker) -> AM' ())
-> (NtfServer -> AM' Worker)
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [NtfServer] -> (NtfServer -> AM' Worker) -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [NtfServer]
pendingDelServers ((NtfServer -> AM' Worker)
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NtfServer -> AM' Worker)
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfTknDelWorker Bool
True AgentClient
c
    handleErr :: AM' () -> AM' ()
    handleErr :: AM' () -> AM' ()
handleErr = (SomeException -> AM' ()) -> AM' () -> AM' ()
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
(e -> m a) -> m a -> m a
E.handle ((SomeException -> AM' ()) -> AM' () -> AM' ())
-> (SomeException -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
e :: E.SomeException) -> SomeException -> AM' ()
forall {m :: * -> *} {a}. (MonadIO m, Show a) => a -> m ()
notifyErr SomeException
e
    notifyErr :: a -> m ()
notifyErr a
e = AgentClient -> String -> m ()
forall (m :: * -> *). MonadIO m => AgentClient -> String -> m ()
notifyInternalError' AgentClient
c (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ String
"runNtfSupervisor error " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
e

partitionErrs :: (a -> ConnId) -> [a] -> [Either AgentErrorType b] -> ([(ConnId, AgentErrorType)], [b])
partitionErrs :: forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs a -> ConnId
f [a]
xs = [Either (ConnId, AgentErrorType) b]
-> ([(ConnId, AgentErrorType)], [b])
forall a b. [Either a b] -> ([a], [b])
partitionEithers ([Either (ConnId, AgentErrorType) b]
 -> ([(ConnId, AgentErrorType)], [b]))
-> ([Either AgentErrorType b]
    -> [Either (ConnId, AgentErrorType) b])
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Either AgentErrorType b -> Either (ConnId, AgentErrorType) b)
-> [a]
-> [Either AgentErrorType b]
-> [Either (ConnId, AgentErrorType) b]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith (\a
x -> (AgentErrorType -> (ConnId, AgentErrorType))
-> Either AgentErrorType b -> Either (ConnId, AgentErrorType) b
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (a -> ConnId
f a
x,)) [a]
xs
{-# INLINE partitionErrs #-}

ntfSubConnId :: NtfSubscription -> ConnId
ntfSubConnId :: NtfSubscription -> ConnId
ntfSubConnId NtfSubscription {ConnId
connId :: ConnId
$sel:connId:NtfSubscription :: NtfSubscription -> ConnId
connId} = ConnId
connId

processNtfCmd :: AgentClient -> (NtfSupervisorCommand, NonEmpty ConnId) -> AM ()
processNtfCmd :: AgentClient
-> (NtfSupervisorCommand, NonEmpty ConnId)
-> ExceptT AgentErrorType (ReaderT Env IO) ()
processNtfCmd AgentClient
c (NtfSupervisorCommand
cmd, NonEmpty ConnId
connIds) = do
  Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"processNtfCmd - cmd = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> NtfSupervisorCommand -> Text
forall a. Show a => a -> Text
tshow NtfSupervisorCommand
cmd
  let connIds' :: [ConnId]
connIds' = NonEmpty ConnId -> [ConnId]
forall a. NonEmpty a -> [a]
L.toList NonEmpty ConnId
connIds
  case NtfSupervisorCommand
cmd of
    NtfSupervisorCommand
NSCCreate -> do
      ([(ConnId, AgentErrorType)]
cErrs, [(RcvQueue, Maybe NtfSupervisorSub)]
rqSubActions) <- ReaderT
  Env
  IO
  ([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
-> ExceptT
     AgentErrorType
     (ReaderT Env IO)
     ([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT
   Env
   IO
   ([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
 -> ExceptT
      AgentErrorType
      (ReaderT Env IO)
      ([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)]))
-> ReaderT
     Env
     IO
     ([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
-> ExceptT
     AgentErrorType
     (ReaderT Env IO)
     ([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
forall a b. (a -> b) -> a -> b
$ (ConnId -> ConnId)
-> [ConnId]
-> [Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)]
-> ([(ConnId, AgentErrorType)],
    [(RcvQueue, Maybe NtfSupervisorSub)])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs ConnId -> ConnId
forall a. a -> a
id [ConnId]
connIds' ([Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)]
 -> ([(ConnId, AgentErrorType)],
     [(RcvQueue, Maybe NtfSupervisorSub)]))
-> ReaderT
     Env IO [Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)]
-> ReaderT
     Env
     IO
     ([(ConnId, AgentErrorType)], [(RcvQueue, Maybe NtfSupervisorSub)])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection
    -> [IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))])
-> ReaderT
     Env IO [Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (ConnId
 -> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)))
-> [ConnId]
-> [IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> ConnId
-> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
getQueueSub Connection
db) [ConnId]
connIds')
      AgentClient
-> [(ConnId, AgentErrorType)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c [(ConnId, AgentErrorType)]
cErrs
      Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"processNtfCmd, NSCCreate - length rqSubs = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow ([(RcvQueue, Maybe NtfSupervisorSub)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(RcvQueue, Maybe NtfSupervisorSub)]
rqSubActions)
      let ([RcvQueue]
ns, [(RcvQueue, NtfSubscription)]
rs, [SMPServer]
css, [NtfServer]
cns) = [(RcvQueue, Maybe NtfSupervisorSub)]
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
    [NtfServer])
partitionQueueSubActions [(RcvQueue, Maybe NtfSupervisorSub)]
rqSubActions
      [RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
createNewSubs [RcvQueue]
ns
      [(RcvQueue, NtfSubscription)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
resetSubs [(RcvQueue, NtfSubscription)]
rs
      AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ do
        (SMPServer -> AM' Worker) -> Set SMPServer -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker Bool
True AgentClient
c) ([SMPServer] -> Set SMPServer
forall a. Ord a => [a] -> Set a
S.fromList [SMPServer]
css)
        (NtfServer -> AM' Worker) -> Set NtfServer -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfNTFWorker Bool
True AgentClient
c) ([NtfServer] -> Set NtfServer
forall a. Ord a => [a] -> Set a
S.fromList [NtfServer]
cns)
      where
        getQueueSub ::
          DB.Connection ->
          ConnId ->
          IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
        getQueueSub :: Connection
-> ConnId
-> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
getQueueSub Connection
db ConnId
connId = (Either StoreError (RcvQueue, Maybe NtfSupervisorSub)
 -> Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
-> IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub))
-> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((StoreError -> AgentErrorType)
-> Either StoreError (RcvQueue, Maybe NtfSupervisorSub)
-> Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError) (IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub))
 -> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)))
-> IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub))
-> IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
forall a b. (a -> b) -> a -> b
$ ExceptT StoreError IO (RcvQueue, Maybe NtfSupervisorSub)
-> IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT StoreError IO (RcvQueue, Maybe NtfSupervisorSub)
 -> IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub)))
-> ExceptT StoreError IO (RcvQueue, Maybe NtfSupervisorSub)
-> IO (Either StoreError (RcvQueue, Maybe NtfSupervisorSub))
forall a b. (a -> b) -> a -> b
$ do
          RcvQueue
rq <- IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue)
-> IO (Either StoreError RcvQueue)
-> ExceptT StoreError IO RcvQueue
forall a b. (a -> b) -> a -> b
$ Connection -> ConnId -> IO (Either StoreError RcvQueue)
getPrimaryRcvQueue Connection
db ConnId
connId
          Maybe NtfSupervisorSub
sub <- IO (Maybe NtfSupervisorSub)
-> ExceptT StoreError IO (Maybe NtfSupervisorSub)
forall a. IO a -> ExceptT StoreError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe NtfSupervisorSub)
 -> ExceptT StoreError IO (Maybe NtfSupervisorSub))
-> IO (Maybe NtfSupervisorSub)
-> ExceptT StoreError IO (Maybe NtfSupervisorSub)
forall a b. (a -> b) -> a -> b
$ Connection -> ConnId -> IO (Maybe NtfSupervisorSub)
getNtfSubscription Connection
db ConnId
connId
          (RcvQueue, Maybe NtfSupervisorSub)
-> ExceptT StoreError IO (RcvQueue, Maybe NtfSupervisorSub)
forall a. a -> ExceptT StoreError IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RcvQueue
rq, Maybe NtfSupervisorSub
sub)
        createNewSubs :: [RcvQueue] -> AM ()
        createNewSubs :: [RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
createNewSubs [RcvQueue]
rqs = do
          (NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
withTokenServer ((NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \NtfServer
ntfServer -> do
            let newSubs :: [NtfSubscription]
newSubs = (RcvQueue -> NtfSubscription) -> [RcvQueue] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map (NtfServer -> RcvQueue -> NtfSubscription
rqToNewSub NtfServer
ntfServer) [RcvQueue]
rqs
            ([(ConnId, AgentErrorType)]
cErrs, [()]
_) <- ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
     AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
 -> ExceptT
      AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
     AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall a b. (a -> b) -> a -> b
$ (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType ()]
-> ([(ConnId, AgentErrorType)], [()])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
newSubs ([Either AgentErrorType ()] -> ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO [Either AgentErrorType ()]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType ())])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (NtfSubscription -> IO (Either AgentErrorType ()))
-> [NtfSubscription] -> [IO (Either AgentErrorType ())]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> NtfSubscription -> IO (Either AgentErrorType ())
storeNewSub Connection
db) [NtfSubscription]
newSubs)
            AgentClient
-> [(ConnId, AgentErrorType)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c [(ConnId, AgentErrorType)]
cErrs
            [RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
kickSMPWorkers [RcvQueue]
rqs
          where
            rqToNewSub :: NtfServer -> RcvQueue -> NtfSubscription
            rqToNewSub :: NtfServer -> RcvQueue -> NtfSubscription
rqToNewSub NtfServer
ntfServer RcvQueue {Int64
userId :: Int64
$sel:userId:RcvQueue :: forall (q :: DBStored). StoredRcvQueue q -> Int64
userId, ConnId
connId :: ConnId
$sel:connId:RcvQueue :: forall (q :: DBStored). StoredRcvQueue q -> ConnId
connId, SMPServer
server :: SMPServer
$sel:server:RcvQueue :: forall (q :: DBStored). StoredRcvQueue q -> SMPServer
server} = Int64
-> ConnId
-> SMPServer
-> Maybe NtfSubscriptionId
-> NtfServer
-> NtfAgentSubStatus
-> NtfSubscription
newNtfSubscription Int64
userId ConnId
connId SMPServer
server Maybe NtfSubscriptionId
forall a. Maybe a
Nothing NtfServer
ntfServer NtfAgentSubStatus
NASNew
            storeNewSub :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType ())
            storeNewSub :: Connection -> NtfSubscription -> IO (Either AgentErrorType ())
storeNewSub Connection
db NtfSubscription
sub = (StoreError -> AgentErrorType)
-> Either StoreError () -> Either AgentErrorType ()
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError (Either StoreError () -> Either AgentErrorType ())
-> IO (Either StoreError ()) -> IO (Either AgentErrorType ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection
-> NtfSubscription -> NtfSubAction -> IO (Either StoreError ())
createNtfSubscription Connection
db NtfSubscription
sub (NtfSubSMPAction -> NtfSubAction
NSASMP NtfSubSMPAction
NSASmpKey)
        resetSubs :: [(RcvQueue, NtfSubscription)] -> AM ()
        resetSubs :: [(RcvQueue, NtfSubscription)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
resetSubs [(RcvQueue, NtfSubscription)]
rqSubs = do
          (NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
withTokenServer ((NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \NtfServer
ntfServer -> do
            let subsToReset :: [NtfSubscription]
subsToReset = ((RcvQueue, NtfSubscription) -> NtfSubscription)
-> [(RcvQueue, NtfSubscription)] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map (NtfServer -> (RcvQueue, NtfSubscription) -> NtfSubscription
toResetSub NtfServer
ntfServer) [(RcvQueue, NtfSubscription)]
rqSubs
            ([(ConnId, AgentErrorType)]
cErrs, [()]
_) <- ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
     AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
 -> ExceptT
      AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
     AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall a b. (a -> b) -> a -> b
$ (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType ()]
-> ([(ConnId, AgentErrorType)], [()])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
subsToReset ([Either AgentErrorType ()] -> ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO [Either AgentErrorType ()]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> (NtfSubscription -> IO ()) -> [NtfSubscription] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> NtfSubscription -> IO ()
storeResetSub Connection
db) [NtfSubscription]
subsToReset)
            AgentClient
-> [(ConnId, AgentErrorType)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c [(ConnId, AgentErrorType)]
cErrs
            let rqs :: [RcvQueue]
rqs = ((RcvQueue, NtfSubscription) -> RcvQueue)
-> [(RcvQueue, NtfSubscription)] -> [RcvQueue]
forall a b. (a -> b) -> [a] -> [b]
map (RcvQueue, NtfSubscription) -> RcvQueue
forall a b. (a, b) -> a
fst [(RcvQueue, NtfSubscription)]
rqSubs
            [RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
kickSMPWorkers [RcvQueue]
rqs
          where
            toResetSub :: NtfServer -> (RcvQueue, NtfSubscription) -> NtfSubscription
            toResetSub :: NtfServer -> (RcvQueue, NtfSubscription) -> NtfSubscription
toResetSub NtfServer
ntfServer (RcvQueue
rq, NtfSubscription
sub) =
              let RcvQueue {$sel:server:RcvQueue :: forall (q :: DBStored). StoredRcvQueue q -> SMPServer
server = SMPServer
smpServer} = RcvQueue
rq
               in NtfSubscription
sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
            storeResetSub :: DB.Connection -> NtfSubscription -> IO ()
            storeResetSub :: Connection -> NtfSubscription -> IO ()
storeResetSub Connection
db NtfSubscription
sub = Connection -> NtfSubscription -> NtfSubAction -> IO ()
supervisorUpdateNtfSub Connection
db NtfSubscription
sub (NtfSubSMPAction -> NtfSubAction
NSASMP NtfSubSMPAction
NSASmpKey)
        partitionQueueSubActions ::
          [(RcvQueue, Maybe NtfSupervisorSub)] ->
          ( [RcvQueue], -- new subs
            [(RcvQueue, NtfSubscription)], -- reset subs
            [SMPServer], -- continue work (SMP)
            [NtfServer] -- continue work (Ntf)
          )
        partitionQueueSubActions :: [(RcvQueue, Maybe NtfSupervisorSub)]
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
    [NtfServer])
partitionQueueSubActions = ((RcvQueue, Maybe NtfSupervisorSub)
 -> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
     [NtfServer])
 -> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
     [NtfServer]))
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
    [NtfServer])
-> [(RcvQueue, Maybe NtfSupervisorSub)]
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
    [NtfServer])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (RcvQueue, Maybe NtfSupervisorSub)
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
    [NtfServer])
-> ([RcvQueue], [(RcvQueue, NtfSubscription)], [SMPServer],
    [NtfServer])
forall {q :: DBStored} {b}.
SMPQueue (StoredRcvQueue q) =>
(StoredRcvQueue q,
 Maybe (NtfSubscription, Maybe (NtfSubAction, b)))
-> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
    [SMPServer], [NtfServer])
-> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
    [SMPServer], [NtfServer])
decideSubWork ([], [], [], [])
          where
            -- sub = Nothing, needs to be created
            decideSubWork :: (StoredRcvQueue q,
 Maybe (NtfSubscription, Maybe (NtfSubAction, b)))
-> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
    [SMPServer], [NtfServer])
-> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
    [SMPServer], [NtfServer])
decideSubWork (StoredRcvQueue q
rq, Maybe (NtfSubscription, Maybe (NtfSubAction, b))
Nothing) ([StoredRcvQueue q]
ns, [(StoredRcvQueue q, NtfSubscription)]
rs, [SMPServer]
css, [NtfServer]
cns) = (StoredRcvQueue q
rq StoredRcvQueue q -> [StoredRcvQueue q] -> [StoredRcvQueue q]
forall a. a -> [a] -> [a]
: [StoredRcvQueue q]
ns, [(StoredRcvQueue q, NtfSubscription)]
rs, [SMPServer]
css, [NtfServer]
cns)
            decideSubWork (StoredRcvQueue q
rq, Just (NtfSubscription
sub, Maybe (NtfSubAction, b)
subAction_)) ([StoredRcvQueue q]
ns, [(StoredRcvQueue q, NtfSubscription)]
rs, [SMPServer]
css, [NtfServer]
cns) =
              case (StoredRcvQueue q -> Maybe ClientNtfCreds
forall (q :: DBStored). StoredRcvQueue q -> Maybe ClientNtfCreds
clientNtfCreds StoredRcvQueue q
rq, NtfSubscription -> Maybe NtfSubscriptionId
ntfQueueId NtfSubscription
sub) of
                -- notifier ID created on SMP server (on ntf server subscription can be registered or not yet),
                -- need to clarify action
                (Just ClientNtfCreds {NtfSubscriptionId
notifierId :: NtfSubscriptionId
$sel:notifierId:ClientNtfCreds :: ClientNtfCreds -> NtfSubscriptionId
notifierId}, Just NtfSubscriptionId
ntfQueueId')
                  | SMPServer -> SMPServer -> Bool
forall (p :: ProtocolType).
ProtocolServer p -> ProtocolServer p -> Bool
sameSrvAddr (StoredRcvQueue q -> SMPServer
forall q. SMPQueue q => q -> SMPServer
qServer StoredRcvQueue q
rq) SMPServer
subSMPServer Bool -> Bool -> Bool
&& NtfSubscriptionId
notifierId NtfSubscriptionId -> NtfSubscriptionId -> Bool
forall a. Eq a => a -> a -> Bool
== NtfSubscriptionId
ntfQueueId' -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
 [SMPServer], [NtfServer])
contOrReset
                  | Bool
otherwise -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
 [SMPServer], [NtfServer])
reset
                (Maybe ClientNtfCreds
Nothing, Maybe NtfSubscriptionId
Nothing) -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
 [SMPServer], [NtfServer])
contOrReset
                (Maybe ClientNtfCreds, Maybe NtfSubscriptionId)
_ -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
 [SMPServer], [NtfServer])
reset
              where
                NtfSubscription {$sel:ntfServer:NtfSubscription :: NtfSubscription -> NtfServer
ntfServer = NtfServer
subNtfServer, $sel:smpServer:NtfSubscription :: NtfSubscription -> SMPServer
smpServer = SMPServer
subSMPServer} = NtfSubscription
sub
                contOrReset :: ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
 [SMPServer], [NtfServer])
contOrReset = case Maybe (NtfSubAction, b)
subAction_ of
                  -- action was set to NULL after worker internal error
                  Maybe (NtfSubAction, b)
Nothing -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
 [SMPServer], [NtfServer])
reset
                  Just (NtfSubAction
action, b
_)
                    -- subscription was marked for deletion / is being deleted
                    | NtfSubAction -> Bool
isDeleteNtfSubAction NtfSubAction
action -> ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
 [SMPServer], [NtfServer])
reset
                    -- continue work on subscription (e.g. supervisor was repeatedly tasked with creating a subscription)
                    | Bool
otherwise -> case NtfSubAction
action of
                        NSASMP NtfSubSMPAction
_ -> ([StoredRcvQueue q]
ns, [(StoredRcvQueue q, NtfSubscription)]
rs, StoredRcvQueue q -> SMPServer
forall q. SMPQueue q => q -> SMPServer
qServer StoredRcvQueue q
rq SMPServer -> [SMPServer] -> [SMPServer]
forall a. a -> [a] -> [a]
: [SMPServer]
css, [NtfServer]
cns)
                        NSANtf NtfSubNTFAction
_ -> ([StoredRcvQueue q]
ns, [(StoredRcvQueue q, NtfSubscription)]
rs, [SMPServer]
css, NtfServer
subNtfServer NtfServer -> [NtfServer] -> [NtfServer]
forall a. a -> [a] -> [a]
: [NtfServer]
cns)
                reset :: ([StoredRcvQueue q], [(StoredRcvQueue q, NtfSubscription)],
 [SMPServer], [NtfServer])
reset = ([StoredRcvQueue q]
ns, (StoredRcvQueue q
rq, NtfSubscription
sub) (StoredRcvQueue q, NtfSubscription)
-> [(StoredRcvQueue q, NtfSubscription)]
-> [(StoredRcvQueue q, NtfSubscription)]
forall a. a -> [a] -> [a]
: [(StoredRcvQueue q, NtfSubscription)]
rs, [SMPServer]
css, [NtfServer]
cns)
    NtfSupervisorCommand
NSCSmpDelete -> do
      ([(ConnId, AgentErrorType)]
cErrs, [RcvQueue]
rqs) <- ReaderT Env IO ([(ConnId, AgentErrorType)], [RcvQueue])
-> ExceptT
     AgentErrorType
     (ReaderT Env IO)
     ([(ConnId, AgentErrorType)], [RcvQueue])
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO ([(ConnId, AgentErrorType)], [RcvQueue])
 -> ExceptT
      AgentErrorType
      (ReaderT Env IO)
      ([(ConnId, AgentErrorType)], [RcvQueue]))
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [RcvQueue])
-> ExceptT
     AgentErrorType
     (ReaderT Env IO)
     ([(ConnId, AgentErrorType)], [RcvQueue])
forall a b. (a -> b) -> a -> b
$ (ConnId -> ConnId)
-> [ConnId]
-> [Either AgentErrorType RcvQueue]
-> ([(ConnId, AgentErrorType)], [RcvQueue])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs ConnId -> ConnId
forall a. a -> a
id [ConnId]
connIds' ([Either AgentErrorType RcvQueue]
 -> ([(ConnId, AgentErrorType)], [RcvQueue]))
-> ReaderT Env IO [Either AgentErrorType RcvQueue]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [RcvQueue])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType RcvQueue)])
-> ReaderT Env IO [Either AgentErrorType RcvQueue]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (ConnId -> IO (Either AgentErrorType RcvQueue))
-> [ConnId] -> [IO (Either AgentErrorType RcvQueue)]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> ConnId -> IO (Either AgentErrorType RcvQueue)
getQueue Connection
db) [ConnId]
connIds')
      Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"processNtfCmd, NSCSmpDelete - length rqs = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow ([RcvQueue] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [RcvQueue]
rqs)
      ([(ConnId, AgentErrorType)]
cErrs', [()]
_) <- ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
     AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
 -> ExceptT
      AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
-> ExceptT
     AgentErrorType (ReaderT Env IO) ([(ConnId, AgentErrorType)], [()])
forall a b. (a -> b) -> a -> b
$ (RcvQueue -> ConnId)
-> [RcvQueue]
-> [Either AgentErrorType ()]
-> ([(ConnId, AgentErrorType)], [()])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId [RcvQueue]
rqs ([Either AgentErrorType ()] -> ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO [Either AgentErrorType ()]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> (RcvQueue -> IO ()) -> [RcvQueue] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> RcvQueue -> IO ()
updateAction Connection
db) [RcvQueue]
rqs)
      AgentClient
-> [(ConnId, AgentErrorType)]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c ([(ConnId, AgentErrorType)]
cErrs [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
cErrs')
      [RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
kickSMPWorkers [RcvQueue]
rqs
      where
        getQueue :: DB.Connection -> ConnId -> IO (Either AgentErrorType RcvQueue)
        getQueue :: Connection -> ConnId -> IO (Either AgentErrorType RcvQueue)
getQueue Connection
db ConnId
connId = (StoreError -> AgentErrorType)
-> Either StoreError RcvQueue -> Either AgentErrorType RcvQueue
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError (Either StoreError RcvQueue -> Either AgentErrorType RcvQueue)
-> IO (Either StoreError RcvQueue)
-> IO (Either AgentErrorType RcvQueue)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> ConnId -> IO (Either StoreError RcvQueue)
getPrimaryRcvQueue Connection
db ConnId
connId
        updateAction :: DB.Connection -> RcvQueue -> IO ()
        updateAction :: Connection -> RcvQueue -> IO ()
updateAction Connection
db RcvQueue
rq = Connection -> ConnId -> NtfSubAction -> IO ()
supervisorUpdateNtfAction Connection
db (RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId RcvQueue
rq) (NtfSubSMPAction -> NtfSubAction
NSASMP NtfSubSMPAction
NSASmpDelete)
    NtfSupervisorCommand
NSCDeleteSub -> ExceptT AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ExceptT AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()]
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT
     AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()]
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ ReaderT Env IO [Either AgentErrorType ()]
-> ExceptT
     AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()]
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO [Either AgentErrorType ()]
 -> ExceptT
      AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()])
-> ReaderT Env IO [Either AgentErrorType ()]
-> ExceptT
     AgentErrorType (ReaderT Env IO) [Either AgentErrorType ()]
forall a b. (a -> b) -> a -> b
$ AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c ((Connection -> [IO ()])
 -> ReaderT Env IO [Either AgentErrorType ()])
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall a b. (a -> b) -> a -> b
$ \Connection
db -> (ConnId -> IO ()) -> [ConnId] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> ConnId -> IO ()
deleteNtfSubscription' Connection
db) [ConnId]
connIds'
  where
    kickSMPWorkers :: [RcvQueue] -> AM ()
    kickSMPWorkers :: [RcvQueue] -> ExceptT AgentErrorType (ReaderT Env IO) ()
kickSMPWorkers [RcvQueue]
rqs = do
      let smpServers :: Set SMPServer
smpServers = [SMPServer] -> Set SMPServer
forall a. Ord a => [a] -> Set a
S.fromList ([SMPServer] -> Set SMPServer) -> [SMPServer] -> Set SMPServer
forall a b. (a -> b) -> a -> b
$ (RcvQueue -> SMPServer) -> [RcvQueue] -> [SMPServer]
forall a b. (a -> b) -> [a] -> [b]
map RcvQueue -> SMPServer
forall q. SMPQueue q => q -> SMPServer
qServer [RcvQueue]
rqs
      AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ (SMPServer -> AM' Worker) -> Set SMPServer -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker Bool
True AgentClient
c) Set SMPServer
smpServers

getNtfNTFWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfNTFWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfNTFWorker Bool
hasWork AgentClient
c NtfServer
server = do
  TMap NtfServer Worker
ws <- (Env -> TMap NtfServer Worker)
-> ReaderT Env IO (TMap NtfServer Worker)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TMap NtfServer Worker)
 -> ReaderT Env IO (TMap NtfServer Worker))
-> (Env -> TMap NtfServer Worker)
-> ReaderT Env IO (TMap NtfServer Worker)
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap NtfServer Worker
ntfWorkers (NtfSupervisor -> TMap NtfServer Worker)
-> (Env -> NtfSupervisor) -> Env -> TMap NtfServer Worker
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> NtfSupervisor
ntfSupervisor
  String
-> Bool
-> AgentClient
-> NtfServer
-> TMap NtfServer Worker
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall k e (m :: * -> *).
(Ord k, Show k, AnyError e, MonadUnliftIO m) =>
String
-> Bool
-> AgentClient
-> k
-> TMap k Worker
-> (Worker -> ExceptT e m ())
-> m Worker
getAgentWorker String
"ntf_ntf" Bool
hasWork AgentClient
c NtfServer
server TMap NtfServer Worker
ws ((Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
 -> AM' Worker)
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall a b. (a -> b) -> a -> b
$ AgentClient
-> NtfServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfWorker AgentClient
c NtfServer
server

getNtfSMPWorker :: Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker :: Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker Bool
hasWork AgentClient
c SMPServer
server = do
  TMap SMPServer Worker
ws <- (Env -> TMap SMPServer Worker)
-> ReaderT Env IO (TMap SMPServer Worker)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TMap SMPServer Worker)
 -> ReaderT Env IO (TMap SMPServer Worker))
-> (Env -> TMap SMPServer Worker)
-> ReaderT Env IO (TMap SMPServer Worker)
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap SMPServer Worker
ntfSMPWorkers (NtfSupervisor -> TMap SMPServer Worker)
-> (Env -> NtfSupervisor) -> Env -> TMap SMPServer Worker
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> NtfSupervisor
ntfSupervisor
  String
-> Bool
-> AgentClient
-> SMPServer
-> TMap SMPServer Worker
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall k e (m :: * -> *).
(Ord k, Show k, AnyError e, MonadUnliftIO m) =>
String
-> Bool
-> AgentClient
-> k
-> TMap k Worker
-> (Worker -> ExceptT e m ())
-> m Worker
getAgentWorker String
"ntf_smp" Bool
hasWork AgentClient
c SMPServer
server TMap SMPServer Worker
ws ((Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
 -> AM' Worker)
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall a b. (a -> b) -> a -> b
$ AgentClient
-> SMPServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfSMPWorker AgentClient
c SMPServer
server

getNtfTknDelWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfTknDelWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfTknDelWorker Bool
hasWork AgentClient
c NtfServer
server = do
  TMap NtfServer Worker
ws <- (Env -> TMap NtfServer Worker)
-> ReaderT Env IO (TMap NtfServer Worker)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TMap NtfServer Worker)
 -> ReaderT Env IO (TMap NtfServer Worker))
-> (Env -> TMap NtfServer Worker)
-> ReaderT Env IO (TMap NtfServer Worker)
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap NtfServer Worker
ntfTknDelWorkers (NtfSupervisor -> TMap NtfServer Worker)
-> (Env -> NtfSupervisor) -> Env -> TMap NtfServer Worker
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> NtfSupervisor
ntfSupervisor
  String
-> Bool
-> AgentClient
-> NtfServer
-> TMap NtfServer Worker
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall k e (m :: * -> *).
(Ord k, Show k, AnyError e, MonadUnliftIO m) =>
String
-> Bool
-> AgentClient
-> k
-> TMap k Worker
-> (Worker -> ExceptT e m ())
-> m Worker
getAgentWorker String
"ntf_tkn_del" Bool
hasWork AgentClient
c NtfServer
server TMap NtfServer Worker
ws ((Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
 -> AM' Worker)
-> (Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker
forall a b. (a -> b) -> a -> b
$ AgentClient
-> NtfServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfTknDelWorker AgentClient
c NtfServer
server

withTokenServer :: (NtfServer -> AM ()) -> AM ()
withTokenServer :: (NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
withTokenServer NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ()
action = ReaderT Env IO (Maybe NtfToken)
-> ExceptT AgentErrorType (ReaderT Env IO) (Maybe NtfToken)
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ReaderT Env IO (Maybe NtfToken)
getNtfToken ExceptT AgentErrorType (ReaderT Env IO) (Maybe NtfToken)
-> (Maybe NtfToken -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b.
ExceptT AgentErrorType (ReaderT Env IO) a
-> (a -> ExceptT AgentErrorType (ReaderT Env IO) b)
-> ExceptT AgentErrorType (ReaderT Env IO) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (NtfToken -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Maybe NtfToken -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\NtfToken {NtfServer
ntfServer :: NtfServer
$sel:ntfServer:NtfToken :: NtfToken -> NtfServer
ntfServer} -> NtfServer -> ExceptT AgentErrorType (ReaderT Env IO) ()
action NtfServer
ntfServer)

runNtfWorker :: AgentClient -> NtfServer -> Worker -> AM ()
runNtfWorker :: AgentClient
-> NtfServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfWorker AgentClient
c NtfServer
srv Worker {TMVar ()
doWork :: TMVar ()
$sel:doWork:Worker :: Worker -> TMVar ()
doWork} =
  ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ExceptT AgentErrorType (ReaderT Env IO) ()
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ do
    TMVar () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). MonadIO m => TMVar () -> m ()
waitForWork TMVar ()
doWork
    ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ReaderT Env IO (Either AgentErrorType ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> AgentOperation
-> (AgentClient -> IO ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
AgentClient
-> AgentOperation -> (AgentClient -> IO ()) -> m a -> m a
agentOperationBracket AgentClient
c AgentOperation
AONtfNetwork AgentClient -> IO ()
throwWhenInactive (ReaderT Env IO (Either AgentErrorType ())
 -> ReaderT Env IO (Either AgentErrorType ()))
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall a b. (a -> b) -> a -> b
$ ExceptT AgentErrorType (ReaderT Env IO) ()
-> ReaderT Env IO (Either AgentErrorType ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfOperation
  where
    runNtfOperation :: AM ()
    runNtfOperation :: ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfOperation = do
      Int
ntfBatchSize <- (Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int)
-> (Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int
forall a b. (a -> b) -> a -> b
$ AgentConfig -> Int
ntfBatchSize (AgentConfig -> Int) -> (Env -> AgentConfig) -> Env -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
      AgentClient
-> TMVar ()
-> ExceptT
     AgentErrorType
     (ReaderT Env IO)
     (Either
        StoreError
        [Either
           StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)])
-> (NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
    -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e' (m :: * -> *) e a.
(AnyStoreError e', MonadIO m) =>
AgentClient
-> TMVar ()
-> ExceptT e m (Either e' [Either e' a])
-> (NonEmpty a -> ExceptT e m ())
-> ExceptT e m ()
withWorkItems AgentClient
c TMVar ()
doWork (AgentClient
-> (Connection
    -> IO
         (Either
            StoreError
            [Either
               StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)]))
-> ExceptT
     AgentErrorType
     (ReaderT Env IO)
     (Either
        StoreError
        [Either
           StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)])
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection
  -> IO
       (Either
          StoreError
          [Either
             StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)]))
 -> ExceptT
      AgentErrorType
      (ReaderT Env IO)
      (Either
         StoreError
         [Either
            StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)]))
-> (Connection
    -> IO
         (Either
            StoreError
            [Either
               StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)]))
-> ExceptT
     AgentErrorType
     (ReaderT Env IO)
     (Either
        StoreError
        [Either
           StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)])
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> NtfServer
-> Int
-> IO
     (Either
        StoreError
        [Either
           StoreError (NtfSubNTFAction, NtfSubscription, NtfActionTs)])
getNextNtfSubNTFActions Connection
db NtfServer
srv Int
ntfBatchSize) ((NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
  -> ExceptT AgentErrorType (ReaderT Env IO) ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
    -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
nextSubs -> do
        Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"runNtfWorker - length nextSubs = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow (NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs) -> Int
forall a. NonEmpty a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
nextSubs)
        NtfActionTs
currTs <- IO NtfActionTs
-> ExceptT AgentErrorType (ReaderT Env IO) NtfActionTs
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO NtfActionTs
getCurrentTime
        let ([NtfSubscription]
creates, [NtfSubscription]
checks, [NtfSubscription]
deletes, [NtfSubscription]
rotates) = NtfActionTs
-> NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
    [NtfSubscription])
splitActions NtfActionTs
currTs NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
nextSubs
        if [NtfSubscription] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [NtfSubscription]
creates Bool -> Bool -> Bool
&& [NtfSubscription] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [NtfSubscription]
checks Bool -> Bool -> Bool
&& [NtfSubscription] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [NtfSubscription]
deletes Bool -> Bool -> Bool
&& [NtfSubscription] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [NtfSubscription]
rotates
          then
            let (NtfSubNTFAction
_, NtfSubscription
_, NtfActionTs
firstActionTs) = NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> (NtfSubNTFAction, NtfSubscription, NtfActionTs)
forall a. NonEmpty a -> a
L.head NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
nextSubs
             in AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> NtfActionTs -> NtfActionTs -> AM' ()
rescheduleWork TMVar ()
doWork NtfActionTs
currTs NtfActionTs
firstActionTs
          else do
            AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
creates [NtfSubscription] -> AM' [NtfSubscription]
createSubs
            AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
checks [NtfSubscription] -> AM' [NtfSubscription]
checkSubs
            AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
deletes [NtfSubscription] -> AM' [NtfSubscription]
deleteSubs
            AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
rotates [NtfSubscription] -> AM' [NtfSubscription]
rotateSubs
    splitActions :: UTCTime -> NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs) -> ([NtfSubscription], [NtfSubscription], [NtfSubscription], [NtfSubscription])
    splitActions :: NtfActionTs
-> NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
    [NtfSubscription])
splitActions NtfActionTs
currTs = ((NtfSubNTFAction, NtfSubscription, NtfActionTs)
 -> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
     [NtfSubscription])
 -> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
     [NtfSubscription]))
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
    [NtfSubscription])
-> NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
    [NtfSubscription])
forall a b. (a -> b -> b) -> b -> NonEmpty a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (NtfSubNTFAction, NtfSubscription, NtfActionTs)
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
    [NtfSubscription])
-> ([NtfSubscription], [NtfSubscription], [NtfSubscription],
    [NtfSubscription])
forall {a}.
(NtfSubNTFAction, a, NtfActionTs)
-> ([a], [a], [a], [a]) -> ([a], [a], [a], [a])
addAction ([], [], [], [])
      where
        addAction :: (NtfSubNTFAction, a, NtfActionTs)
-> ([a], [a], [a], [a]) -> ([a], [a], [a], [a])
addAction (NtfSubNTFAction
cmd, a
sub, NtfActionTs
ts) acc :: ([a], [a], [a], [a])
acc@([a]
creates, [a]
checks, [a]
deletes, [a]
rotates) = case NtfSubNTFAction
cmd of
          NtfSubNTFAction
NSACreate -> (a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
creates, [a]
checks, [a]
deletes, [a]
rotates)
          NtfSubNTFAction
NSACheck
            | NtfActionTs
ts NtfActionTs -> NtfActionTs -> Bool
forall a. Ord a => a -> a -> Bool
<= NtfActionTs
currTs -> ([a]
creates, a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
checks, [a]
deletes, [a]
rotates)
            | Bool
otherwise -> ([a], [a], [a], [a])
acc
          NtfSubNTFAction
NSADelete -> ([a]
creates, [a]
checks, a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
deletes, [a]
rotates)
          NtfSubNTFAction
NSARotate -> ([a]
creates, [a]
checks, [a]
deletes, a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
rotates)
    createSubs :: [NtfSubscription] -> AM' [NtfSubscription]
    createSubs :: [NtfSubscription] -> AM' [NtfSubscription]
createSubs [NtfSubscription]
ntfSubs =
      ReaderT Env IO (Maybe NtfToken)
getNtfToken ReaderT Env IO (Maybe NtfToken)
-> (Maybe NtfToken -> AM' [NtfSubscription])
-> AM' [NtfSubscription]
forall a b.
ReaderT Env IO a -> (a -> ReaderT Env IO b) -> ReaderT Env IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just tkn :: NtfToken
tkn@NtfToken {NtfServer
$sel:ntfServer:NtfToken :: NtfToken -> NtfServer
ntfServer :: NtfServer
ntfServer, $sel:ntfTokenId:NtfToken :: NtfToken -> Maybe NtfSubscriptionId
ntfTokenId = Just NtfSubscriptionId
tknId, $sel:ntfTknStatus:NtfToken :: NtfToken -> NtfTknStatus
ntfTknStatus = NtfTknStatus
NTActive, $sel:ntfMode:NtfToken :: NtfToken -> NotificationsMode
ntfMode = NotificationsMode
NMInstant} -> do
          [(NtfSubscription, Either AgentErrorType RcvQueue)]
subsRqs_ <- [NtfSubscription]
-> [Either AgentErrorType RcvQueue]
-> [(NtfSubscription, Either AgentErrorType RcvQueue)]
forall a b. [a] -> [b] -> [(a, b)]
zip [NtfSubscription]
ntfSubs ([Either AgentErrorType RcvQueue]
 -> [(NtfSubscription, Either AgentErrorType RcvQueue)])
-> ReaderT Env IO [Either AgentErrorType RcvQueue]
-> ReaderT
     Env IO [(NtfSubscription, Either AgentErrorType RcvQueue)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType RcvQueue)])
-> ReaderT Env IO [Either AgentErrorType RcvQueue]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (NtfSubscription -> IO (Either AgentErrorType RcvQueue))
-> [NtfSubscription] -> [IO (Either AgentErrorType RcvQueue)]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfSubscription -> IO (Either AgentErrorType RcvQueue)
getQueue Connection
db) [NtfSubscription]
ntfSubs)
          let ([(ConnId, AgentErrorType)]
errs1, [NtfSubscription]
subs_, [NewNtfEntity 'Subscription]
newSubs_) = NtfSubscriptionId
-> [(NtfSubscription, Either AgentErrorType RcvQueue)]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NewNtfEntity 'Subscription])
splitSubs NtfSubscriptionId
tknId [(NtfSubscription, Either AgentErrorType RcvQueue)]
subsRqs_
          NtfServer
-> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfCreateAttempts [NtfSubscription]
subs_
          case ([NtfSubscription] -> Maybe (NonEmpty NtfSubscription)
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [NtfSubscription]
subs_, [NewNtfEntity 'Subscription]
-> Maybe (NonEmpty (NewNtfEntity 'Subscription))
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [NewNtfEntity 'Subscription]
newSubs_) of
            (Just NonEmpty NtfSubscription
subs, Just NonEmpty (NewNtfEntity 'Subscription)
newSubs) -> do
              NonEmpty (NtfSubscription, Either AgentErrorType NtfSubscriptionId)
rs <- NonEmpty NtfSubscription
-> NonEmpty (Either AgentErrorType NtfSubscriptionId)
-> NonEmpty
     (NtfSubscription, Either AgentErrorType NtfSubscriptionId)
forall a b. NonEmpty a -> NonEmpty b -> NonEmpty (a, b)
L.zip NonEmpty NtfSubscription
subs (NonEmpty (Either AgentErrorType NtfSubscriptionId)
 -> NonEmpty
      (NtfSubscription, Either AgentErrorType NtfSubscriptionId))
-> ReaderT
     Env IO (NonEmpty (Either AgentErrorType NtfSubscriptionId))
-> ReaderT
     Env
     IO
     (NonEmpty
        (NtfSubscription, Either AgentErrorType NtfSubscriptionId))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> NtfToken
-> NonEmpty (NewNtfEntity 'Subscription)
-> ReaderT
     Env IO (NonEmpty (Either AgentErrorType NtfSubscriptionId))
agentNtfCreateSubscriptions AgentClient
c NtfToken
tkn NonEmpty (NewNtfEntity 'Subscription)
newSubs
              let ([NtfSubscription]
ntfSubs', [(NtfSubscription, AgentErrorType)]
errs2, [(NtfSubscription, NtfSubscriptionId)]
nSubIds) = [(NtfSubscription, Either AgentErrorType NtfSubscriptionId)]
-> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
    [(NtfSubscription, NtfSubscriptionId)])
forall a r.
[(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults ([(NtfSubscription, Either AgentErrorType NtfSubscriptionId)]
 -> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
     [(NtfSubscription, NtfSubscriptionId)]))
-> [(NtfSubscription, Either AgentErrorType NtfSubscriptionId)]
-> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
    [(NtfSubscription, NtfSubscriptionId)])
forall a b. (a -> b) -> a -> b
$ NonEmpty (NtfSubscription, Either AgentErrorType NtfSubscriptionId)
-> [(NtfSubscription, Either AgentErrorType NtfSubscriptionId)]
forall a. NonEmpty a -> [a]
L.toList NonEmpty (NtfSubscription, Either AgentErrorType NtfSubscriptionId)
rs
                  subs' :: [NtfSubscription]
subs' = ((NtfSubscription, NtfSubscriptionId) -> NtfSubscription)
-> [(NtfSubscription, NtfSubscriptionId)] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map (NtfSubscription, NtfSubscriptionId) -> NtfSubscription
forall a b. (a, b) -> a
fst [(NtfSubscription, NtfSubscriptionId)]
nSubIds
                  errs2' :: [(ConnId, AgentErrorType)]
errs2' = ((NtfSubscription, AgentErrorType) -> (ConnId, AgentErrorType))
-> [(NtfSubscription, AgentErrorType)]
-> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map ((NtfSubscription -> ConnId)
-> (NtfSubscription, AgentErrorType) -> (ConnId, AgentErrorType)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first NtfSubscription -> ConnId
ntfSubConnId) [(NtfSubscription, AgentErrorType)]
errs2
              NtfServer
-> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfCreated [NtfSubscription]
subs'
              NtfActionTs
ts <- IO NtfActionTs -> ReaderT Env IO NtfActionTs
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO NtfActionTs
getCurrentTime
              NominalDiffTime
int <- (Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime)
-> (Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime
forall a b. (a -> b) -> a -> b
$ AgentConfig -> NominalDiffTime
ntfSubFirstCheckInterval (AgentConfig -> NominalDiffTime)
-> (Env -> AgentConfig) -> Env -> NominalDiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
              let checkTs :: NtfActionTs
checkTs = NominalDiffTime -> NtfActionTs -> NtfActionTs
addUTCTime NominalDiffTime
int NtfActionTs
ts
              ([(ConnId, AgentErrorType)]
errs3, [()]
_) <- (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType ()]
-> ([(ConnId, AgentErrorType)], [()])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
subs' ([Either AgentErrorType ()] -> ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO [Either AgentErrorType ()]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> ((NtfSubscription, NtfSubscriptionId) -> IO ())
-> [(NtfSubscription, NtfSubscriptionId)] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfActionTs -> (NtfSubscription, NtfSubscriptionId) -> IO ()
updateSubNSACheck Connection
db NtfActionTs
checkTs) [(NtfSubscription, NtfSubscriptionId)]
nSubIds)
              AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c ([(ConnId, AgentErrorType)] -> AM' ())
-> [(ConnId, AgentErrorType)] -> AM' ()
forall a b. (a -> b) -> a -> b
$ [(ConnId, AgentErrorType)]
errs1 [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs2' [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs3
              [NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [NtfSubscription]
ntfSubs'
            (Maybe (NonEmpty NtfSubscription),
 Maybe (NonEmpty (NewNtfEntity 'Subscription)))
_ -> AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
errs1 AM' () -> [NtfSubscription] -> AM' [NtfSubscription]
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> []
        Maybe NtfToken
_ -> do
          let errs :: [(ConnId, AgentErrorType)]
errs = (NtfSubscription -> (ConnId, AgentErrorType))
-> [NtfSubscription] -> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map (\NtfSubscription
sub -> (NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, String -> AgentErrorType
INTERNAL String
"NSACreate - no active token")) [NtfSubscription]
ntfSubs
          AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
errs
          [NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
      where
        getQueue :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType RcvQueue)
        getQueue :: Connection
-> NtfSubscription -> IO (Either AgentErrorType RcvQueue)
getQueue Connection
db NtfSubscription {ConnId
$sel:connId:NtfSubscription :: NtfSubscription -> ConnId
connId :: ConnId
connId} = (StoreError -> AgentErrorType)
-> Either StoreError RcvQueue -> Either AgentErrorType RcvQueue
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError (Either StoreError RcvQueue -> Either AgentErrorType RcvQueue)
-> IO (Either StoreError RcvQueue)
-> IO (Either AgentErrorType RcvQueue)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> ConnId -> IO (Either StoreError RcvQueue)
getPrimaryRcvQueue Connection
db ConnId
connId
        splitSubs :: NtfTokenId -> [(NtfSubscription, Either AgentErrorType RcvQueue)] -> ([(ConnId, AgentErrorType)], [NtfSubscription], [NewNtfEntity 'Subscription])
        splitSubs :: NtfSubscriptionId
-> [(NtfSubscription, Either AgentErrorType RcvQueue)]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NewNtfEntity 'Subscription])
splitSubs NtfSubscriptionId
tknId = ((NtfSubscription, Either AgentErrorType RcvQueue)
 -> ([(ConnId, AgentErrorType)], [NtfSubscription],
     [NewNtfEntity 'Subscription])
 -> ([(ConnId, AgentErrorType)], [NtfSubscription],
     [NewNtfEntity 'Subscription]))
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NewNtfEntity 'Subscription])
-> [(NtfSubscription, Either AgentErrorType RcvQueue)]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NewNtfEntity 'Subscription])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (NtfSubscription, Either AgentErrorType RcvQueue)
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NewNtfEntity 'Subscription])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NewNtfEntity 'Subscription])
forall {q :: DBStored}.
(NtfSubscription, Either AgentErrorType (StoredRcvQueue q))
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NewNtfEntity 'Subscription])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NewNtfEntity 'Subscription])
splitSub ([], [], [])
          where
            splitSub :: (NtfSubscription, Either AgentErrorType (StoredRcvQueue q))
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NewNtfEntity 'Subscription])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NewNtfEntity 'Subscription])
splitSub (NtfSubscription
sub, Either AgentErrorType (StoredRcvQueue q)
rq) ([(ConnId, AgentErrorType)]
errs, [NtfSubscription]
subs, [NewNtfEntity 'Subscription]
newSubs) = case Either AgentErrorType (StoredRcvQueue q)
rq of
              Right RcvQueue {$sel:clientNtfCreds:RcvQueue :: forall (q :: DBStored). StoredRcvQueue q -> Maybe ClientNtfCreds
clientNtfCreds = Just ClientNtfCreds
creds} -> ([(ConnId, AgentErrorType)]
errs, NtfSubscription
sub NtfSubscription -> [NtfSubscription] -> [NtfSubscription]
forall a. a -> [a] -> [a]
: [NtfSubscription]
subs, NtfSubscription -> ClientNtfCreds -> NewNtfEntity 'Subscription
toNewSub NtfSubscription
sub ClientNtfCreds
creds NewNtfEntity 'Subscription
-> [NewNtfEntity 'Subscription] -> [NewNtfEntity 'Subscription]
forall a. a -> [a] -> [a]
: [NewNtfEntity 'Subscription]
newSubs)
              Right StoredRcvQueue q
_ -> ((NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, String -> AgentErrorType
INTERNAL String
"NSACreate - no notifier queue credentials") (ConnId, AgentErrorType)
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. a -> [a] -> [a]
: [(ConnId, AgentErrorType)]
errs, [NtfSubscription]
subs, [NewNtfEntity 'Subscription]
newSubs)
              Left AgentErrorType
e -> ((NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, AgentErrorType
e) (ConnId, AgentErrorType)
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. a -> [a] -> [a]
: [(ConnId, AgentErrorType)]
errs, [NtfSubscription]
subs, [NewNtfEntity 'Subscription]
newSubs)
            toNewSub :: NtfSubscription -> ClientNtfCreds -> NewNtfEntity 'Subscription
toNewSub NtfSubscription {SMPServer
$sel:smpServer:NtfSubscription :: NtfSubscription -> SMPServer
smpServer :: SMPServer
smpServer} ClientNtfCreds {APrivateAuthKey
ntfPrivateKey :: APrivateAuthKey
$sel:ntfPrivateKey:ClientNtfCreds :: ClientNtfCreds -> APrivateAuthKey
ntfPrivateKey, NtfSubscriptionId
$sel:notifierId:ClientNtfCreds :: ClientNtfCreds -> NtfSubscriptionId
notifierId :: NtfSubscriptionId
notifierId} =
              NtfSubscriptionId
-> SMPQueueNtf -> APrivateAuthKey -> NewNtfEntity 'Subscription
NewNtfSub NtfSubscriptionId
tknId (SMPServer -> NtfSubscriptionId -> SMPQueueNtf
SMPQueueNtf SMPServer
smpServer NtfSubscriptionId
notifierId) APrivateAuthKey
ntfPrivateKey
        updateSubNSACheck :: DB.Connection -> UTCTime -> (NtfSubscription, NtfSubscriptionId) -> IO ()
        updateSubNSACheck :: Connection
-> NtfActionTs -> (NtfSubscription, NtfSubscriptionId) -> IO ()
updateSubNSACheck Connection
db NtfActionTs
checkTs (NtfSubscription
sub, NtfSubscriptionId
nSubId) = Connection
-> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
updateNtfSubscription Connection
db NtfSubscription
sub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew} (NtfSubNTFAction -> NtfSubAction
NSANtf NtfSubNTFAction
NSACheck) NtfActionTs
checkTs
    checkSubs :: [NtfSubscription] -> AM' [NtfSubscription]
    checkSubs :: [NtfSubscription] -> AM' [NtfSubscription]
checkSubs [NtfSubscription]
ntfSubs =
      ReaderT Env IO (Maybe NtfToken)
getNtfToken ReaderT Env IO (Maybe NtfToken)
-> (Maybe NtfToken -> AM' [NtfSubscription])
-> AM' [NtfSubscription]
forall a b.
ReaderT Env IO a -> (a -> ReaderT Env IO b) -> ReaderT Env IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just tkn :: NtfToken
tkn@NtfToken {NtfServer
$sel:ntfServer:NtfToken :: NtfToken -> NtfServer
ntfServer :: NtfServer
ntfServer, $sel:ntfTknStatus:NtfToken :: NtfToken -> NtfTknStatus
ntfTknStatus = NtfTknStatus
NTActive, $sel:ntfMode:NtfToken :: NtfToken -> NotificationsMode
ntfMode = NotificationsMode
NMInstant} -> do
          let ([(ConnId, AgentErrorType)]
errs1, [NtfSubscription]
subs_, [NtfSubscriptionId]
subIds_) = [NtfSubscription]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NtfSubscriptionId])
splitSubs [NtfSubscription]
ntfSubs
          NtfServer
-> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfCheckAttempts [NtfSubscription]
subs_
          case ([NtfSubscription] -> Maybe (NonEmpty NtfSubscription)
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [NtfSubscription]
subs_, [NtfSubscriptionId] -> Maybe (NonEmpty NtfSubscriptionId)
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [NtfSubscriptionId]
subIds_) of
            (Just NonEmpty NtfSubscription
subs, Just NonEmpty NtfSubscriptionId
subIds) -> do
              NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus)
rs <- NonEmpty NtfSubscription
-> NonEmpty (Either AgentErrorType NtfSubStatus)
-> NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus)
forall a b. NonEmpty a -> NonEmpty b -> NonEmpty (a, b)
L.zip NonEmpty NtfSubscription
subs (NonEmpty (Either AgentErrorType NtfSubStatus)
 -> NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus))
-> ReaderT Env IO (NonEmpty (Either AgentErrorType NtfSubStatus))
-> ReaderT
     Env
     IO
     (NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> NtfToken
-> NonEmpty NtfSubscriptionId
-> ReaderT Env IO (NonEmpty (Either AgentErrorType NtfSubStatus))
agentNtfCheckSubscriptions AgentClient
c NtfToken
tkn NonEmpty NtfSubscriptionId
subIds
              let ([NtfSubscription]
ntfSubs', [(NtfSubscription, AgentErrorType)]
errs2, [(NtfSubscription, NtfSubStatus)]
nSubStatuses) = [(NtfSubscription, Either AgentErrorType NtfSubStatus)]
-> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
    [(NtfSubscription, NtfSubStatus)])
forall a r.
[(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults ([(NtfSubscription, Either AgentErrorType NtfSubStatus)]
 -> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
     [(NtfSubscription, NtfSubStatus)]))
-> [(NtfSubscription, Either AgentErrorType NtfSubStatus)]
-> ([NtfSubscription], [(NtfSubscription, AgentErrorType)],
    [(NtfSubscription, NtfSubStatus)])
forall a b. (a -> b) -> a -> b
$ NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus)
-> [(NtfSubscription, Either AgentErrorType NtfSubStatus)]
forall a. NonEmpty a -> [a]
L.toList NonEmpty (NtfSubscription, Either AgentErrorType NtfSubStatus)
rs
                  subs' :: [NtfSubscription]
subs' = ((NtfSubscription, NtfSubStatus) -> NtfSubscription)
-> [(NtfSubscription, NtfSubStatus)] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map (NtfSubscription, NtfSubStatus) -> NtfSubscription
forall a b. (a, b) -> a
fst [(NtfSubscription, NtfSubStatus)]
nSubStatuses
                  ([(ConnId, AgentErrorType)]
errs2', [NtfSubscription]
authSubs) = [Either (ConnId, AgentErrorType) NtfSubscription]
-> ([(ConnId, AgentErrorType)], [NtfSubscription])
forall a b. [Either a b] -> ([a], [b])
partitionEithers ([Either (ConnId, AgentErrorType) NtfSubscription]
 -> ([(ConnId, AgentErrorType)], [NtfSubscription]))
-> [Either (ConnId, AgentErrorType) NtfSubscription]
-> ([(ConnId, AgentErrorType)], [NtfSubscription])
forall a b. (a -> b) -> a -> b
$ ((NtfSubscription, AgentErrorType)
 -> Either (ConnId, AgentErrorType) NtfSubscription)
-> [(NtfSubscription, AgentErrorType)]
-> [Either (ConnId, AgentErrorType) NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map (\case (NtfSubscription
sub, NTF String
_ ErrorType
SMP.AUTH) -> NtfSubscription -> Either (ConnId, AgentErrorType) NtfSubscription
forall a b. b -> Either a b
Right NtfSubscription
sub; (NtfSubscription, AgentErrorType)
e -> (ConnId, AgentErrorType)
-> Either (ConnId, AgentErrorType) NtfSubscription
forall a b. a -> Either a b
Left ((ConnId, AgentErrorType)
 -> Either (ConnId, AgentErrorType) NtfSubscription)
-> (ConnId, AgentErrorType)
-> Either (ConnId, AgentErrorType) NtfSubscription
forall a b. (a -> b) -> a -> b
$ (NtfSubscription -> ConnId)
-> (NtfSubscription, AgentErrorType) -> (ConnId, AgentErrorType)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first NtfSubscription -> ConnId
ntfSubConnId (NtfSubscription, AgentErrorType)
e) [(NtfSubscription, AgentErrorType)]
errs2
              NtfServer
-> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfChecked [NtfSubscription]
subs'
              NtfActionTs
ts <- IO NtfActionTs -> ReaderT Env IO NtfActionTs
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO NtfActionTs
getCurrentTime
              NominalDiffTime
int <- (Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime)
-> (Env -> NominalDiffTime) -> ReaderT Env IO NominalDiffTime
forall a b. (a -> b) -> a -> b
$ AgentConfig -> NominalDiffTime
ntfSubCheckInterval (AgentConfig -> NominalDiffTime)
-> (Env -> AgentConfig) -> Env -> NominalDiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
              let nextCheckTs :: NtfActionTs
nextCheckTs = NominalDiffTime -> NtfActionTs -> NtfActionTs
addUTCTime NominalDiffTime
int NtfActionTs
ts
              ([(ConnId, AgentErrorType)]
errs3, [Maybe SMPServer]
srvs) <- (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType (Maybe SMPServer)]
-> ([(ConnId, AgentErrorType)], [Maybe SMPServer])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
subs' ([Either AgentErrorType (Maybe SMPServer)]
 -> ([(ConnId, AgentErrorType)], [Maybe SMPServer]))
-> ReaderT Env IO [Either AgentErrorType (Maybe SMPServer)]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [Maybe SMPServer])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Maybe SMPServer)])
-> ReaderT Env IO [Either AgentErrorType (Maybe SMPServer)]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> ((NtfSubscription, NtfSubStatus) -> IO (Maybe SMPServer))
-> [(NtfSubscription, NtfSubStatus)] -> [IO (Maybe SMPServer)]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfServer
-> NtfActionTs
-> NtfActionTs
-> (NtfSubscription, NtfSubStatus)
-> IO (Maybe SMPServer)
updateSub Connection
db NtfServer
ntfServer NtfActionTs
ts NtfActionTs
nextCheckTs) [(NtfSubscription, NtfSubStatus)]
nSubStatuses)
              ([(ConnId, AgentErrorType)]
errs4, [SMPServer]
srvs') <- (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType SMPServer]
-> ([(ConnId, AgentErrorType)], [SMPServer])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
authSubs ([Either AgentErrorType SMPServer]
 -> ([(ConnId, AgentErrorType)], [SMPServer]))
-> ReaderT Env IO [Either AgentErrorType SMPServer]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [SMPServer])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO SMPServer])
-> ReaderT Env IO [Either AgentErrorType SMPServer]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> (NtfSubscription -> IO SMPServer)
-> [NtfSubscription] -> [IO SMPServer]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfServer -> NtfActionTs -> NtfSubscription -> IO SMPServer
recreateNtfSub Connection
db NtfServer
ntfServer NtfActionTs
ts) [NtfSubscription]
authSubs)
              (SMPServer -> AM' Worker) -> Set SMPServer -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker Bool
True AgentClient
c) (Set SMPServer -> AM' ()) -> Set SMPServer -> AM' ()
forall a b. (a -> b) -> a -> b
$ [SMPServer] -> Set SMPServer
forall a. Ord a => [a] -> Set a
S.fromList ([Maybe SMPServer] -> [SMPServer]
forall a. [Maybe a] -> [a]
catMaybes [Maybe SMPServer]
srvs [SMPServer] -> [SMPServer] -> [SMPServer]
forall a. Semigroup a => a -> a -> a
<> [SMPServer]
srvs')
              AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c ([(ConnId, AgentErrorType)] -> AM' ())
-> [(ConnId, AgentErrorType)] -> AM' ()
forall a b. (a -> b) -> a -> b
$ [(ConnId, AgentErrorType)]
errs1 [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs2' [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs3 [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs4
              [NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [NtfSubscription]
ntfSubs'
            (Maybe (NonEmpty NtfSubscription),
 Maybe (NonEmpty NtfSubscriptionId))
_ -> AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
errs1 AM' () -> [NtfSubscription] -> AM' [NtfSubscription]
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> []
        Maybe NtfToken
_ -> do
          let errs :: [(ConnId, AgentErrorType)]
errs = (NtfSubscription -> (ConnId, AgentErrorType))
-> [NtfSubscription] -> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map (\NtfSubscription
sub -> (NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, String -> AgentErrorType
INTERNAL String
"NSACheck - no active token")) [NtfSubscription]
ntfSubs
          AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
errs
          [NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
      where
        splitSubs :: [NtfSubscription] -> ([(ConnId, AgentErrorType)], [NtfSubscription], [NtfSubscriptionId])
        splitSubs :: [NtfSubscription]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NtfSubscriptionId])
splitSubs = (NtfSubscription
 -> ([(ConnId, AgentErrorType)], [NtfSubscription],
     [NtfSubscriptionId])
 -> ([(ConnId, AgentErrorType)], [NtfSubscription],
     [NtfSubscriptionId]))
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NtfSubscriptionId])
-> [NtfSubscription]
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NtfSubscriptionId])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr NtfSubscription
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NtfSubscriptionId])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NtfSubscriptionId])
splitSub ([], [], [])
          where
            splitSub :: NtfSubscription
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NtfSubscriptionId])
-> ([(ConnId, AgentErrorType)], [NtfSubscription],
    [NtfSubscriptionId])
splitSub NtfSubscription
sub ([(ConnId, AgentErrorType)]
errs, [NtfSubscription]
subs, [NtfSubscriptionId]
subIds) = case NtfSubscription
sub of
              NtfSubscription {$sel:ntfSubId:NtfSubscription :: NtfSubscription -> Maybe NtfSubscriptionId
ntfSubId = Just NtfSubscriptionId
subId} -> ([(ConnId, AgentErrorType)]
errs, NtfSubscription
sub NtfSubscription -> [NtfSubscription] -> [NtfSubscription]
forall a. a -> [a] -> [a]
: [NtfSubscription]
subs, NtfSubscriptionId
subId NtfSubscriptionId -> [NtfSubscriptionId] -> [NtfSubscriptionId]
forall a. a -> [a] -> [a]
: [NtfSubscriptionId]
subIds)
              NtfSubscription
_ -> ((NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, String -> AgentErrorType
INTERNAL String
"NSACheck - no subscription ID") (ConnId, AgentErrorType)
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. a -> [a] -> [a]
: [(ConnId, AgentErrorType)]
errs, [NtfSubscription]
subs, [NtfSubscriptionId]
subIds)
        updateSub :: DB.Connection -> NtfServer -> UTCTime -> UTCTime -> (NtfSubscription, NtfSubStatus) -> IO (Maybe SMPServer)
        updateSub :: Connection
-> NtfServer
-> NtfActionTs
-> NtfActionTs
-> (NtfSubscription, NtfSubStatus)
-> IO (Maybe SMPServer)
updateSub Connection
db NtfServer
ntfServer NtfActionTs
ts NtfActionTs
nextCheckTs (NtfSubscription
sub, NtfSubStatus
status)
          | NtfSubStatus -> Bool
ntfShouldSubscribe NtfSubStatus
status =
              let sub' :: NtfSubscription
sub' = NtfSubscription
sub {ntfSubStatus = NASCreated status}
               in Maybe SMPServer
forall a. Maybe a
Nothing Maybe SMPServer -> IO () -> IO (Maybe SMPServer)
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Connection
-> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
updateNtfSubscription Connection
db NtfSubscription
sub' (NtfSubNTFAction -> NtfSubAction
NSANtf NtfSubNTFAction
NSACheck) NtfActionTs
nextCheckTs
          -- ntf server stopped subscribing to this queue
          | Bool
otherwise = SMPServer -> Maybe SMPServer
forall a. a -> Maybe a
Just (SMPServer -> Maybe SMPServer)
-> IO SMPServer -> IO (Maybe SMPServer)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection
-> NtfServer -> NtfActionTs -> NtfSubscription -> IO SMPServer
recreateNtfSub Connection
db NtfServer
ntfServer NtfActionTs
ts NtfSubscription
sub
        recreateNtfSub :: DB.Connection -> NtfServer -> UTCTime -> NtfSubscription -> IO SMPServer
        recreateNtfSub :: Connection
-> NtfServer -> NtfActionTs -> NtfSubscription -> IO SMPServer
recreateNtfSub Connection
db NtfServer
ntfServer NtfActionTs
ts sub :: NtfSubscription
sub@NtfSubscription {SMPServer
$sel:smpServer:NtfSubscription :: NtfSubscription -> SMPServer
smpServer :: SMPServer
smpServer} =
          let sub' :: NtfSubscription
sub' = NtfSubscription
sub {ntfServer, ntfQueueId = Nothing, ntfSubId = Nothing, ntfSubStatus = NASNew}
           in SMPServer
smpServer SMPServer -> IO () -> IO SMPServer
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Connection
-> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
updateNtfSubscription Connection
db NtfSubscription
sub' (NtfSubSMPAction -> NtfSubAction
NSASMP NtfSubSMPAction
NSASmpKey) NtfActionTs
ts
    incStatByUserId :: NtfServer -> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
    incStatByUserId :: NtfServer
-> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
sel [NtfSubscription]
ss =
      [(Int64, Int)] -> ((Int64, Int) -> AM' ()) -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map Int64 Int -> [(Int64, Int)]
forall k a. Map k a -> [(k, a)]
M.assocs Map Int64 Int
userIdsCounts) (((Int64, Int) -> AM' ()) -> AM' ())
-> ((Int64, Int) -> AM' ()) -> AM' ()
forall a b. (a -> b) -> a -> b
$ \(Int64
userId, Int
count) ->
        STM () -> AM' ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM' ()) -> STM () -> AM' ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> NtfServer
-> (AgentNtfServerStats -> TVar Int)
-> Int
-> STM ()
incNtfServerStat' AgentClient
c Int64
userId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
sel Int
count
      where
        userIdsCounts :: Map Int64 Int
userIdsCounts = (Map Int64 Int -> NtfSubscription -> Map Int64 Int)
-> Map Int64 Int -> [NtfSubscription] -> Map Int64 Int
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\Map Int64 Int
acc NtfSubscription {Int64
userId :: Int64
$sel:userId:NtfSubscription :: NtfSubscription -> Int64
userId} -> (Int -> Int -> Int)
-> Int64 -> Int -> Map Int64 Int -> Map Int64 Int
forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) Int64
userId Int
1 Map Int64 Int
acc) Map Int64 Int
forall k a. Map k a
M.empty [NtfSubscription]
ss
    -- NSADelete and NSARotate are deprecated, but their processing is kept for legacy db records;
    -- These actions are not batched
    deleteSubs :: [NtfSubscription] -> AM' [NtfSubscription]
    deleteSubs :: [NtfSubscription] -> AM' [NtfSubscription]
deleteSubs [NtfSubscription]
ntfSubs = do
      [Maybe NtfSubscription]
retrySubs_ <- (NtfSubscription -> ReaderT Env IO (Maybe NtfSubscription))
-> [NtfSubscription] -> ReaderT Env IO [Maybe NtfSubscription]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ((NtfSubscription -> AM (Maybe NtfSubscription))
-> NtfSubscription -> ReaderT Env IO (Maybe NtfSubscription)
runCatching NtfSubscription -> AM (Maybe NtfSubscription)
deleteSub) [NtfSubscription]
ntfSubs
      [NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([NtfSubscription] -> AM' [NtfSubscription])
-> [NtfSubscription] -> AM' [NtfSubscription]
forall a b. (a -> b) -> a -> b
$ [Maybe NtfSubscription] -> [NtfSubscription]
forall a. [Maybe a] -> [a]
catMaybes [Maybe NtfSubscription]
retrySubs_
      where
        deleteSub :: NtfSubscription -> AM (Maybe NtfSubscription)
        deleteSub :: NtfSubscription -> AM (Maybe NtfSubscription)
deleteSub sub :: NtfSubscription
sub@NtfSubscription {SMPServer
$sel:smpServer:NtfSubscription :: NtfSubscription -> SMPServer
smpServer :: SMPServer
smpServer} =
          NtfSubscription
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription)
deleteNtfSub NtfSubscription
sub (ExceptT AgentErrorType (ReaderT Env IO) ()
 -> AM (Maybe NtfSubscription))
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription)
forall a b. (a -> b) -> a -> b
$ do
            let sub' :: NtfSubscription
sub' = NtfSubscription
sub {ntfSubId = Nothing, ntfSubStatus = NASOff}
            NtfActionTs
ts <- IO NtfActionTs
-> ExceptT AgentErrorType (ReaderT Env IO) NtfActionTs
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO NtfActionTs
getCurrentTime
            AgentClient
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
updateNtfSubscription Connection
db NtfSubscription
sub' (NtfSubSMPAction -> NtfSubAction
NSASMP NtfSubSMPAction
NSASmpDelete) NtfActionTs
ts
            AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (AM' Worker -> AM' ())
-> AM' Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AM' Worker -> AM' ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (AM' Worker -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> AM' Worker -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker Bool
True AgentClient
c SMPServer
smpServer
    rotateSubs :: [NtfSubscription] -> AM' [NtfSubscription]
    rotateSubs :: [NtfSubscription] -> AM' [NtfSubscription]
rotateSubs [NtfSubscription]
ntfSubs = do
      [Maybe NtfSubscription]
retrySubs_ <- (NtfSubscription -> ReaderT Env IO (Maybe NtfSubscription))
-> [NtfSubscription] -> ReaderT Env IO [Maybe NtfSubscription]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ((NtfSubscription -> AM (Maybe NtfSubscription))
-> NtfSubscription -> ReaderT Env IO (Maybe NtfSubscription)
runCatching NtfSubscription -> AM (Maybe NtfSubscription)
rotateSub) [NtfSubscription]
ntfSubs
      [NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([NtfSubscription] -> AM' [NtfSubscription])
-> [NtfSubscription] -> AM' [NtfSubscription]
forall a b. (a -> b) -> a -> b
$ [Maybe NtfSubscription] -> [NtfSubscription]
forall a. [Maybe a] -> [a]
catMaybes [Maybe NtfSubscription]
retrySubs_
      where
        rotateSub :: NtfSubscription -> AM (Maybe NtfSubscription)
        rotateSub :: NtfSubscription -> AM (Maybe NtfSubscription)
rotateSub sub :: NtfSubscription
sub@NtfSubscription {ConnId
$sel:connId:NtfSubscription :: NtfSubscription -> ConnId
connId :: ConnId
connId} =
          NtfSubscription
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription)
deleteNtfSub NtfSubscription
sub (ExceptT AgentErrorType (ReaderT Env IO) ()
 -> AM (Maybe NtfSubscription))
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription)
forall a b. (a -> b) -> a -> b
$ do
            AgentClient
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> ConnId -> IO ()
deleteNtfSubscription Connection
db ConnId
connId
            NtfSupervisor
ns <- (Env -> NtfSupervisor)
-> ExceptT AgentErrorType (ReaderT Env IO) NtfSupervisor
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> NtfSupervisor
ntfSupervisor
            STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> (NtfSupervisorCommand, NonEmpty ConnId) -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (NtfSupervisor -> TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
ntfSubQ NtfSupervisor
ns) (NtfSupervisorCommand
NSCCreate, [ConnId
Item (NonEmpty ConnId)
connId])
    runCatching :: (NtfSubscription -> AM (Maybe NtfSubscription)) -> NtfSubscription -> AM' (Maybe NtfSubscription)
    runCatching :: (NtfSubscription -> AM (Maybe NtfSubscription))
-> NtfSubscription -> ReaderT Env IO (Maybe NtfSubscription)
runCatching NtfSubscription -> AM (Maybe NtfSubscription)
action sub :: NtfSubscription
sub@NtfSubscription {ConnId
$sel:connId:NtfSubscription :: NtfSubscription -> ConnId
connId :: ConnId
connId} =
      Maybe NtfSubscription
-> Either AgentErrorType (Maybe NtfSubscription)
-> Maybe NtfSubscription
forall b a. b -> Either a b -> b
fromRight Maybe NtfSubscription
forall a. Maybe a
Nothing
        (Either AgentErrorType (Maybe NtfSubscription)
 -> Maybe NtfSubscription)
-> ReaderT Env IO (Either AgentErrorType (Maybe NtfSubscription))
-> ReaderT Env IO (Maybe NtfSubscription)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AM (Maybe NtfSubscription)
-> ReaderT Env IO (Either AgentErrorType (Maybe NtfSubscription))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (NtfSubscription -> AM (Maybe NtfSubscription)
action NtfSubscription
sub AM (Maybe NtfSubscription)
-> (AgentErrorType -> AM (Maybe NtfSubscription))
-> AM (Maybe NtfSubscription)
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
`catchAllErrors` \AgentErrorType
e -> AgentClient
-> ConnId -> String -> ExceptT AgentErrorType (ReaderT Env IO) ()
workerInternalError AgentClient
c ConnId
connId (AgentErrorType -> String
forall a. Show a => a -> String
show AgentErrorType
e) ExceptT AgentErrorType (ReaderT Env IO) ()
-> Maybe NtfSubscription -> AM (Maybe NtfSubscription)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe NtfSubscription
forall a. Maybe a
Nothing)
    -- deleteNtfSub is only used in NSADelete and NSARotate, so also deprecated
    deleteNtfSub :: NtfSubscription -> AM () -> AM (Maybe NtfSubscription)
    deleteNtfSub :: NtfSubscription
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> AM (Maybe NtfSubscription)
deleteNtfSub sub :: NtfSubscription
sub@NtfSubscription {Int64
$sel:userId:NtfSubscription :: NtfSubscription -> Int64
userId :: Int64
userId, Maybe NtfSubscriptionId
$sel:ntfSubId:NtfSubscription :: NtfSubscription -> Maybe NtfSubscriptionId
ntfSubId :: Maybe NtfSubscriptionId
ntfSubId} ExceptT AgentErrorType (ReaderT Env IO) ()
continue = case Maybe NtfSubscriptionId
ntfSubId of
      Just NtfSubscriptionId
nSubId ->
        ReaderT Env IO (Maybe NtfToken)
-> ExceptT AgentErrorType (ReaderT Env IO) (Maybe NtfToken)
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ReaderT Env IO (Maybe NtfToken)
getNtfToken ExceptT AgentErrorType (ReaderT Env IO) (Maybe NtfToken)
-> (Maybe NtfToken -> AM (Maybe NtfSubscription))
-> AM (Maybe NtfSubscription)
forall a b.
ExceptT AgentErrorType (ReaderT Env IO) a
-> (a -> ExceptT AgentErrorType (ReaderT Env IO) b)
-> ExceptT AgentErrorType (ReaderT Env IO) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Just tkn :: NtfToken
tkn@NtfToken {NtfServer
$sel:ntfServer:NtfToken :: NtfToken -> NtfServer
ntfServer :: NtfServer
ntfServer} -> do
            STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> NtfServer
-> (AgentNtfServerStats -> TVar Int)
-> STM ()
incNtfServerStat AgentClient
c Int64
userId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfDelAttempts
            ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT
     AgentErrorType (ReaderT Env IO) (Either AgentErrorType ())
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> ExceptT e m (Either e a)
tryAllErrors (AgentClient
-> NtfSubscriptionId
-> NtfToken
-> ExceptT AgentErrorType (ReaderT Env IO) ()
agentNtfDeleteSubscription AgentClient
c NtfSubscriptionId
nSubId NtfToken
tkn) ExceptT AgentErrorType (ReaderT Env IO) (Either AgentErrorType ())
-> (Either AgentErrorType () -> AM (Maybe NtfSubscription))
-> AM (Maybe NtfSubscription)
forall a b.
ExceptT AgentErrorType (ReaderT Env IO) a
-> (a -> ExceptT AgentErrorType (ReaderT Env IO) b)
-> ExceptT AgentErrorType (ReaderT Env IO) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Right ()
_ -> do
                STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> NtfServer
-> (AgentNtfServerStats -> TVar Int)
-> STM ()
incNtfServerStat AgentClient
c Int64
userId NtfServer
ntfServer AgentNtfServerStats -> TVar Int
ntfDeleted
                AM (Maybe NtfSubscription)
forall {a}. ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
continue'
              Left AgentErrorType
e
                | AgentErrorType -> Bool
temporaryOrHostError AgentErrorType
e -> Maybe NtfSubscription -> AM (Maybe NtfSubscription)
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe NtfSubscription -> AM (Maybe NtfSubscription))
-> Maybe NtfSubscription -> AM (Maybe NtfSubscription)
forall a b. (a -> b) -> a -> b
$ NtfSubscription -> Maybe NtfSubscription
forall a. a -> Maybe a
Just NtfSubscription
sub -- don't continue, retry
                | Bool
otherwise -> AM (Maybe NtfSubscription)
forall {a}. ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
continue'
          Maybe NtfToken
Nothing -> AM (Maybe NtfSubscription)
forall {a}. ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
continue'
      Maybe NtfSubscriptionId
_ -> AM (Maybe NtfSubscription)
forall {a}. ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
continue'
      where
        continue' :: ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
continue' = ExceptT AgentErrorType (ReaderT Env IO) ()
continue ExceptT AgentErrorType (ReaderT Env IO) ()
-> Maybe a -> ExceptT AgentErrorType (ReaderT Env IO) (Maybe a)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe a
forall a. Maybe a
Nothing -- continue without retry

runNtfSMPWorker :: AgentClient -> SMPServer -> Worker -> AM ()
runNtfSMPWorker :: AgentClient
-> SMPServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfSMPWorker AgentClient
c SMPServer
srv Worker {TMVar ()
$sel:doWork:Worker :: Worker -> TMVar ()
doWork :: TMVar ()
doWork} = ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ExceptT AgentErrorType (ReaderT Env IO) ()
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ do
  TMVar () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). MonadIO m => TMVar () -> m ()
waitForWork TMVar ()
doWork
  ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ReaderT Env IO (Either AgentErrorType ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> AgentOperation
-> (AgentClient -> IO ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
AgentClient
-> AgentOperation -> (AgentClient -> IO ()) -> m a -> m a
agentOperationBracket AgentClient
c AgentOperation
AONtfNetwork AgentClient -> IO ()
throwWhenInactive (ReaderT Env IO (Either AgentErrorType ())
 -> ReaderT Env IO (Either AgentErrorType ()))
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall a b. (a -> b) -> a -> b
$ ExceptT AgentErrorType (ReaderT Env IO) ()
-> ReaderT Env IO (Either AgentErrorType ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfSMPOperation
  where
    runNtfSMPOperation :: AM ()
    runNtfSMPOperation :: ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfSMPOperation = do
      Int
ntfBatchSize <- (Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int)
-> (Env -> Int) -> ExceptT AgentErrorType (ReaderT Env IO) Int
forall a b. (a -> b) -> a -> b
$ AgentConfig -> Int
ntfBatchSize (AgentConfig -> Int) -> (Env -> AgentConfig) -> Env -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
      AgentClient
-> TMVar ()
-> ExceptT
     AgentErrorType
     (ReaderT Env IO)
     (Either
        StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)])
-> (NonEmpty (NtfSubSMPAction, NtfSubscription)
    -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e' (m :: * -> *) e a.
(AnyStoreError e', MonadIO m) =>
AgentClient
-> TMVar ()
-> ExceptT e m (Either e' [Either e' a])
-> (NonEmpty a -> ExceptT e m ())
-> ExceptT e m ()
withWorkItems AgentClient
c TMVar ()
doWork (AgentClient
-> (Connection
    -> IO
         (Either
            StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)]))
-> ExceptT
     AgentErrorType
     (ReaderT Env IO)
     (Either
        StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)])
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection
  -> IO
       (Either
          StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)]))
 -> ExceptT
      AgentErrorType
      (ReaderT Env IO)
      (Either
         StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)]))
-> (Connection
    -> IO
         (Either
            StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)]))
-> ExceptT
     AgentErrorType
     (ReaderT Env IO)
     (Either
        StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)])
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> SMPServer
-> Int
-> IO
     (Either
        StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)])
getNextNtfSubSMPActions Connection
db SMPServer
srv Int
ntfBatchSize) ((NonEmpty (NtfSubSMPAction, NtfSubscription)
  -> ExceptT AgentErrorType (ReaderT Env IO) ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NonEmpty (NtfSubSMPAction, NtfSubscription)
    -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \NonEmpty (NtfSubSMPAction, NtfSubscription)
nextSubs -> do
        Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"runNtfSMPWorker - length nextSubs = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow (NonEmpty (NtfSubSMPAction, NtfSubscription) -> Int
forall a. NonEmpty a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length NonEmpty (NtfSubSMPAction, NtfSubscription)
nextSubs)
        let ([NtfSubscription]
creates, [NtfSubscription]
deletes) = NonEmpty (NtfSubSMPAction, NtfSubscription)
-> ([NtfSubscription], [NtfSubscription])
splitActions NonEmpty (NtfSubSMPAction, NtfSubscription)
nextSubs
        AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
creates [NtfSubscription] -> AM' [NtfSubscription]
createNotifierKeys
        AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
c [NtfSubscription]
deletes [NtfSubscription] -> AM' [NtfSubscription]
deleteNotifierKeys
    splitActions :: NonEmpty (NtfSubSMPAction, NtfSubscription) -> ([NtfSubscription], [NtfSubscription])
    splitActions :: NonEmpty (NtfSubSMPAction, NtfSubscription)
-> ([NtfSubscription], [NtfSubscription])
splitActions = ((NtfSubSMPAction, NtfSubscription)
 -> ([NtfSubscription], [NtfSubscription])
 -> ([NtfSubscription], [NtfSubscription]))
-> ([NtfSubscription], [NtfSubscription])
-> NonEmpty (NtfSubSMPAction, NtfSubscription)
-> ([NtfSubscription], [NtfSubscription])
forall a b. (a -> b -> b) -> b -> NonEmpty a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (NtfSubSMPAction, NtfSubscription)
-> ([NtfSubscription], [NtfSubscription])
-> ([NtfSubscription], [NtfSubscription])
forall {a}. (NtfSubSMPAction, a) -> ([a], [a]) -> ([a], [a])
addAction ([], [])
      where
        addAction :: (NtfSubSMPAction, a) -> ([a], [a]) -> ([a], [a])
addAction (NtfSubSMPAction
cmd, a
sub) ([a]
creates, [a]
deletes) = case NtfSubSMPAction
cmd of
          NtfSubSMPAction
NSASmpKey -> (a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
creates, [a]
deletes)
          NtfSubSMPAction
NSASmpDelete -> ([a]
creates, a
sub a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
deletes)
    createNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
    createNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
createNotifierKeys [NtfSubscription]
ntfSubs =
      ReaderT Env IO (Maybe NtfToken)
getNtfToken ReaderT Env IO (Maybe NtfToken)
-> (Maybe NtfToken -> AM' [NtfSubscription])
-> AM' [NtfSubscription]
forall a b.
ReaderT Env IO a -> (a -> ReaderT Env IO b) -> ReaderT Env IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just NtfToken {$sel:ntfTknStatus:NtfToken :: NtfToken -> NtfTknStatus
ntfTknStatus = NtfTknStatus
NTActive, $sel:ntfMode:NtfToken :: NtfToken -> NotificationsMode
ntfMode = NotificationsMode
NMInstant} -> do
          ([(ConnId, AgentErrorType)]
errs1, [EnableQueueNtfReq]
subRqKeys) <- [NtfSubscription]
-> AM' ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
prepareQueueSmpKey [NtfSubscription]
ntfSubs
          [(EnableQueueNtfReq,
  Either AgentErrorType (NtfSubscriptionId, RcvNtfPublicDhKey))]
rs <- AgentClient
-> [EnableQueueNtfReq]
-> AM'
     [(EnableQueueNtfReq,
       Either AgentErrorType (NtfSubscriptionId, RcvNtfPublicDhKey))]
enableQueuesNtfs AgentClient
c [EnableQueueNtfReq]
subRqKeys
          let ([EnableQueueNtfReq]
subRqKeys', [(EnableQueueNtfReq, AgentErrorType)]
errs2, [(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))]
successes) = [(EnableQueueNtfReq,
  Either AgentErrorType (NtfSubscriptionId, RcvNtfPublicDhKey))]
-> ([EnableQueueNtfReq], [(EnableQueueNtfReq, AgentErrorType)],
    [(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))])
forall a r.
[(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults [(EnableQueueNtfReq,
  Either AgentErrorType (NtfSubscriptionId, RcvNtfPublicDhKey))]
rs
              ntfSubs' :: [NtfSubscription]
ntfSubs' = (EnableQueueNtfReq -> NtfSubscription)
-> [EnableQueueNtfReq] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map EnableQueueNtfReq -> NtfSubscription
eqnrNtfSub [EnableQueueNtfReq]
subRqKeys'
              errs2' :: [(ConnId, AgentErrorType)]
errs2' = ((EnableQueueNtfReq, AgentErrorType) -> (ConnId, AgentErrorType))
-> [(EnableQueueNtfReq, AgentErrorType)]
-> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map ((EnableQueueNtfReq -> ConnId)
-> (EnableQueueNtfReq, AgentErrorType) -> (ConnId, AgentErrorType)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId (RcvQueue -> ConnId)
-> (EnableQueueNtfReq -> RcvQueue) -> EnableQueueNtfReq -> ConnId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EnableQueueNtfReq -> RcvQueue
eqnrRq)) [(EnableQueueNtfReq, AgentErrorType)]
errs2
          NtfActionTs
ts <- IO NtfActionTs -> ReaderT Env IO NtfActionTs
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO NtfActionTs
getCurrentTime
          ([(ConnId, AgentErrorType)]
errs3, [NtfServer]
srvs) <- ((EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
 -> ConnId)
-> [(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))]
-> [Either AgentErrorType NtfServer]
-> ([(ConnId, AgentErrorType)], [NtfServer])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs (RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId (RcvQueue -> ConnId)
-> ((EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
    -> RcvQueue)
-> (EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> ConnId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EnableQueueNtfReq -> RcvQueue
eqnrRq (EnableQueueNtfReq -> RcvQueue)
-> ((EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
    -> EnableQueueNtfReq)
-> (EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> RcvQueue
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> EnableQueueNtfReq
forall a b. (a, b) -> a
fst) [(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))]
successes ([Either AgentErrorType NtfServer]
 -> ([(ConnId, AgentErrorType)], [NtfServer]))
-> ReaderT Env IO [Either AgentErrorType NtfServer]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [NtfServer])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO NtfServer])
-> ReaderT Env IO [Either AgentErrorType NtfServer]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> ((EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
 -> IO NtfServer)
-> [(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))]
-> [IO NtfServer]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfActionTs
-> (EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> IO NtfServer
storeNtfSubCreds Connection
db NtfActionTs
ts) [(EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))]
successes)
          (NtfServer -> AM' Worker) -> Set NtfServer -> AM' ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfNTFWorker Bool
True AgentClient
c) (Set NtfServer -> AM' ()) -> Set NtfServer -> AM' ()
forall a b. (a -> b) -> a -> b
$ [NtfServer] -> Set NtfServer
forall a. Ord a => [a] -> Set a
S.fromList [NtfServer]
srvs
          AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c ([(ConnId, AgentErrorType)] -> AM' ())
-> [(ConnId, AgentErrorType)] -> AM' ()
forall a b. (a -> b) -> a -> b
$ [(ConnId, AgentErrorType)]
errs1 [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs2' [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs3
          [NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [NtfSubscription]
ntfSubs'
        Maybe NtfToken
_ -> do
          let errs :: [(ConnId, AgentErrorType)]
errs = (NtfSubscription -> (ConnId, AgentErrorType))
-> [NtfSubscription] -> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map (\NtfSubscription
sub -> (NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub, String -> AgentErrorType
INTERNAL String
"NSASmpKey - no active token")) [NtfSubscription]
ntfSubs
          AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
errs
          [NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
      where
        prepareQueueSmpKey :: [NtfSubscription] -> AM' ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
        prepareQueueSmpKey :: [NtfSubscription]
-> AM' ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
prepareQueueSmpKey [NtfSubscription]
subs = do
          AuthAlg
alg <- (Env -> AuthAlg) -> ReaderT Env IO AuthAlg
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (AgentConfig -> AuthAlg
rcvAuthAlg (AgentConfig -> AuthAlg) -> (Env -> AgentConfig) -> Env -> AuthAlg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config)
          TVar ChaChaDRG
g <- (Env -> TVar ChaChaDRG) -> ReaderT Env IO (TVar ChaChaDRG)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> TVar ChaChaDRG
random
          (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType EnableQueueNtfReq]
-> ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
subs ([Either AgentErrorType EnableQueueNtfReq]
 -> ([(ConnId, AgentErrorType)], [EnableQueueNtfReq]))
-> ReaderT Env IO [Either AgentErrorType EnableQueueNtfReq]
-> AM' ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType EnableQueueNtfReq)])
-> ReaderT Env IO [Either AgentErrorType EnableQueueNtfReq]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (NtfSubscription -> IO (Either AgentErrorType EnableQueueNtfReq))
-> [NtfSubscription]
-> [IO (Either AgentErrorType EnableQueueNtfReq)]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> AuthAlg
-> TVar ChaChaDRG
-> NtfSubscription
-> IO (Either AgentErrorType EnableQueueNtfReq)
getQueue Connection
db AuthAlg
alg TVar ChaChaDRG
g) [NtfSubscription]
subs)
          where
            getQueue :: DB.Connection -> C.AuthAlg -> TVar ChaChaDRG -> NtfSubscription -> IO (Either AgentErrorType EnableQueueNtfReq)
            getQueue :: Connection
-> AuthAlg
-> TVar ChaChaDRG
-> NtfSubscription
-> IO (Either AgentErrorType EnableQueueNtfReq)
getQueue Connection
db (C.AuthAlg SAlgorithm a
a) TVar ChaChaDRG
g NtfSubscription
sub = (Either StoreError EnableQueueNtfReq
 -> Either AgentErrorType EnableQueueNtfReq)
-> IO (Either StoreError EnableQueueNtfReq)
-> IO (Either AgentErrorType EnableQueueNtfReq)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((StoreError -> AgentErrorType)
-> Either StoreError EnableQueueNtfReq
-> Either AgentErrorType EnableQueueNtfReq
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError) (IO (Either StoreError EnableQueueNtfReq)
 -> IO (Either AgentErrorType EnableQueueNtfReq))
-> IO (Either StoreError EnableQueueNtfReq)
-> IO (Either AgentErrorType EnableQueueNtfReq)
forall a b. (a -> b) -> a -> b
$ ExceptT StoreError IO EnableQueueNtfReq
-> IO (Either StoreError EnableQueueNtfReq)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT StoreError IO EnableQueueNtfReq
 -> IO (Either StoreError EnableQueueNtfReq))
-> ExceptT StoreError IO EnableQueueNtfReq
-> IO (Either StoreError EnableQueueNtfReq)
forall a b. (a -> b) -> a -> b
$ do
              RcvQueue
rq <- IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue)
-> IO (Either StoreError RcvQueue)
-> ExceptT StoreError IO RcvQueue
forall a b. (a -> b) -> a -> b
$ Connection -> ConnId -> IO (Either StoreError RcvQueue)
getPrimaryRcvQueue Connection
db (NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
sub)
              (APublicAuthKey, APrivateAuthKey)
authKeyPair <- STM AAuthKeyPair -> ExceptT StoreError IO AAuthKeyPair
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM AAuthKeyPair -> ExceptT StoreError IO AAuthKeyPair)
-> STM AAuthKeyPair -> ExceptT StoreError IO AAuthKeyPair
forall a b. (a -> b) -> a -> b
$ SAlgorithm a -> TVar ChaChaDRG -> STM AAuthKeyPair
forall (a :: Algorithm).
(AlgorithmI a, AuthAlgorithm a) =>
SAlgorithm a -> TVar ChaChaDRG -> STM AAuthKeyPair
C.generateAuthKeyPair SAlgorithm a
a TVar ChaChaDRG
g
              (RcvNtfPublicDhKey, PrivateKey 'X25519)
rcvNtfKeyPair <- STM (RcvNtfPublicDhKey, PrivateKey 'X25519)
-> ExceptT StoreError IO (RcvNtfPublicDhKey, PrivateKey 'X25519)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (RcvNtfPublicDhKey, PrivateKey 'X25519)
 -> ExceptT StoreError IO (RcvNtfPublicDhKey, PrivateKey 'X25519))
-> STM (RcvNtfPublicDhKey, PrivateKey 'X25519)
-> ExceptT StoreError IO (RcvNtfPublicDhKey, PrivateKey 'X25519)
forall a b. (a -> b) -> a -> b
$ TVar ChaChaDRG -> STM (KeyPair 'X25519)
forall (a :: Algorithm).
AlgorithmI a =>
TVar ChaChaDRG -> STM (KeyPair a)
C.generateKeyPair TVar ChaChaDRG
g
              EnableQueueNtfReq -> ExceptT StoreError IO EnableQueueNtfReq
forall a. a -> ExceptT StoreError IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NtfSubscription
-> RcvQueue -> AAuthKeyPair -> KeyPair 'X25519 -> EnableQueueNtfReq
EnableQueueNtfReq NtfSubscription
sub RcvQueue
rq AAuthKeyPair
(APublicAuthKey, APrivateAuthKey)
authKeyPair KeyPair 'X25519
(RcvNtfPublicDhKey, PrivateKey 'X25519)
rcvNtfKeyPair)
        storeNtfSubCreds :: DB.Connection -> UTCTime -> (EnableQueueNtfReq, (SMP.NotifierId, SMP.RcvNtfPublicDhKey)) -> IO NtfServer
        storeNtfSubCreds :: Connection
-> NtfActionTs
-> (EnableQueueNtfReq, (NtfSubscriptionId, RcvNtfPublicDhKey))
-> IO NtfServer
storeNtfSubCreds Connection
db NtfActionTs
ts (EnableQueueNtfReq {NtfSubscription
$sel:eqnrNtfSub:EnableQueueNtfReq :: EnableQueueNtfReq -> NtfSubscription
eqnrNtfSub :: NtfSubscription
eqnrNtfSub, $sel:eqnrAuthKeyPair:EnableQueueNtfReq :: EnableQueueNtfReq -> AAuthKeyPair
eqnrAuthKeyPair = (PublicKeyType APrivateAuthKey
ntfPublicKey, APrivateAuthKey
ntfPrivateKey), $sel:eqnrRcvKeyPair:EnableQueueNtfReq :: EnableQueueNtfReq -> KeyPair 'X25519
eqnrRcvKeyPair = (PublicKeyType (PrivateKey 'X25519)
_, PrivateKey 'X25519
pk)}, (NtfSubscriptionId
notifierId, RcvNtfPublicDhKey
srvPubDhKey)) = do
          let NtfSubscription {NtfServer
$sel:ntfServer:NtfSubscription :: NtfSubscription -> NtfServer
ntfServer :: NtfServer
ntfServer} = NtfSubscription
eqnrNtfSub
              rcvNtfDhSecret :: DhSecret 'X25519
rcvNtfDhSecret = RcvNtfPublicDhKey -> PrivateKey 'X25519 -> DhSecret 'X25519
forall (a :: Algorithm).
DhAlgorithm a =>
PublicKey a -> PrivateKey a -> DhSecret a
C.dh' RcvNtfPublicDhKey
srvPubDhKey PrivateKey 'X25519
pk
          Connection -> ConnId -> Maybe ClientNtfCreds -> IO ()
setRcvQueueNtfCreds Connection
db (NtfSubscription -> ConnId
ntfSubConnId NtfSubscription
eqnrNtfSub) (Maybe ClientNtfCreds -> IO ()) -> Maybe ClientNtfCreds -> IO ()
forall a b. (a -> b) -> a -> b
$ ClientNtfCreds -> Maybe ClientNtfCreds
forall a. a -> Maybe a
Just ClientNtfCreds {PublicKeyType APrivateAuthKey
APublicAuthKey
ntfPublicKey :: PublicKeyType APrivateAuthKey
$sel:ntfPublicKey:ClientNtfCreds :: APublicAuthKey
ntfPublicKey, APrivateAuthKey
$sel:ntfPrivateKey:ClientNtfCreds :: APrivateAuthKey
ntfPrivateKey :: APrivateAuthKey
ntfPrivateKey, NtfSubscriptionId
$sel:notifierId:ClientNtfCreds :: NtfSubscriptionId
notifierId :: NtfSubscriptionId
notifierId, DhSecret 'X25519
rcvNtfDhSecret :: DhSecret 'X25519
$sel:rcvNtfDhSecret:ClientNtfCreds :: DhSecret 'X25519
rcvNtfDhSecret}
          Connection
-> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
updateNtfSubscription Connection
db NtfSubscription
eqnrNtfSub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey} (NtfSubNTFAction -> NtfSubAction
NSANtf NtfSubNTFAction
NSACreate) NtfActionTs
ts
          NtfServer -> IO NtfServer
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NtfServer
ntfServer
    deleteNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
    deleteNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
deleteNotifierKeys [NtfSubscription]
ntfSubs = do
      ([(ConnId, AgentErrorType)]
errs1, [DisableQueueNtfReq]
subRqs) <- (NtfSubscription -> ConnId)
-> [NtfSubscription]
-> [Either AgentErrorType DisableQueueNtfReq]
-> ([(ConnId, AgentErrorType)], [DisableQueueNtfReq])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs NtfSubscription -> ConnId
ntfSubConnId [NtfSubscription]
ntfSubs ([Either AgentErrorType DisableQueueNtfReq]
 -> ([(ConnId, AgentErrorType)], [DisableQueueNtfReq]))
-> ReaderT Env IO [Either AgentErrorType DisableQueueNtfReq]
-> ReaderT
     Env IO ([(ConnId, AgentErrorType)], [DisableQueueNtfReq])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType DisableQueueNtfReq)])
-> ReaderT Env IO [Either AgentErrorType DisableQueueNtfReq]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (NtfSubscription -> IO (Either AgentErrorType DisableQueueNtfReq))
-> [NtfSubscription]
-> [IO (Either AgentErrorType DisableQueueNtfReq)]
forall a b. (a -> b) -> [a] -> [b]
map (Connection
-> NtfSubscription -> IO (Either AgentErrorType DisableQueueNtfReq)
resetCredsGetQueue Connection
db) [NtfSubscription]
ntfSubs)
      [(DisableQueueNtfReq, Either AgentErrorType ())]
rs <- AgentClient
-> [DisableQueueNtfReq]
-> AM' [(DisableQueueNtfReq, Either AgentErrorType ())]
disableQueuesNtfs AgentClient
c [DisableQueueNtfReq]
subRqs
      let ([DisableQueueNtfReq]
subRqs', [(DisableQueueNtfReq, AgentErrorType)]
errs2, [(DisableQueueNtfReq, ())]
successes) = [(DisableQueueNtfReq, Either AgentErrorType ())]
-> ([DisableQueueNtfReq], [(DisableQueueNtfReq, AgentErrorType)],
    [(DisableQueueNtfReq, ())])
forall a r.
[(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults [(DisableQueueNtfReq, Either AgentErrorType ())]
rs
          ntfSubs' :: [NtfSubscription]
ntfSubs' = (DisableQueueNtfReq -> NtfSubscription)
-> [DisableQueueNtfReq] -> [NtfSubscription]
forall a b. (a -> b) -> [a] -> [b]
map DisableQueueNtfReq -> NtfSubscription
forall a b. (a, b) -> a
fst [DisableQueueNtfReq]
subRqs'
          errs2' :: [(ConnId, AgentErrorType)]
errs2' = ((DisableQueueNtfReq, AgentErrorType) -> (ConnId, AgentErrorType))
-> [(DisableQueueNtfReq, AgentErrorType)]
-> [(ConnId, AgentErrorType)]
forall a b. (a -> b) -> [a] -> [b]
map ((DisableQueueNtfReq -> ConnId)
-> (DisableQueueNtfReq, AgentErrorType) -> (ConnId, AgentErrorType)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId (RcvQueue -> ConnId)
-> (DisableQueueNtfReq -> RcvQueue) -> DisableQueueNtfReq -> ConnId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DisableQueueNtfReq -> RcvQueue
forall a b. (a, b) -> b
snd)) [(DisableQueueNtfReq, AgentErrorType)]
errs2
          disabledRqs :: [RcvQueue]
disabledRqs = ((DisableQueueNtfReq, ()) -> RcvQueue)
-> [(DisableQueueNtfReq, ())] -> [RcvQueue]
forall a b. (a -> b) -> [a] -> [b]
map (DisableQueueNtfReq -> RcvQueue
forall a b. (a, b) -> b
snd (DisableQueueNtfReq -> RcvQueue)
-> ((DisableQueueNtfReq, ()) -> DisableQueueNtfReq)
-> (DisableQueueNtfReq, ())
-> RcvQueue
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (DisableQueueNtfReq, ()) -> DisableQueueNtfReq
forall a b. (a, b) -> a
fst) [(DisableQueueNtfReq, ())]
successes
      ([(ConnId, AgentErrorType)]
errs3, [()]
_) <- (RcvQueue -> ConnId)
-> [RcvQueue]
-> [Either AgentErrorType ()]
-> ([(ConnId, AgentErrorType)], [()])
forall a b.
(a -> ConnId)
-> [a]
-> [Either AgentErrorType b]
-> ([(ConnId, AgentErrorType)], [b])
partitionErrs RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId [RcvQueue]
disabledRqs ([Either AgentErrorType ()] -> ([(ConnId, AgentErrorType)], [()]))
-> ReaderT Env IO [Either AgentErrorType ()]
-> ReaderT Env IO ([(ConnId, AgentErrorType)], [()])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> (RcvQueue -> IO ()) -> [RcvQueue] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> RcvQueue -> IO ()
deleteSub Connection
db) [RcvQueue]
disabledRqs)
      AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c ([(ConnId, AgentErrorType)] -> AM' ())
-> [(ConnId, AgentErrorType)] -> AM' ()
forall a b. (a -> b) -> a -> b
$ [(ConnId, AgentErrorType)]
errs1 [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs2' [(ConnId, AgentErrorType)]
-> [(ConnId, AgentErrorType)] -> [(ConnId, AgentErrorType)]
forall a. Semigroup a => a -> a -> a
<> [(ConnId, AgentErrorType)]
errs3
      [NtfSubscription] -> AM' [NtfSubscription]
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [NtfSubscription]
ntfSubs'
      where
        resetCredsGetQueue :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType DisableQueueNtfReq)
        resetCredsGetQueue :: Connection
-> NtfSubscription -> IO (Either AgentErrorType DisableQueueNtfReq)
resetCredsGetQueue Connection
db sub :: NtfSubscription
sub@NtfSubscription {ConnId
$sel:connId:NtfSubscription :: NtfSubscription -> ConnId
connId :: ConnId
connId} = (Either StoreError DisableQueueNtfReq
 -> Either AgentErrorType DisableQueueNtfReq)
-> IO (Either StoreError DisableQueueNtfReq)
-> IO (Either AgentErrorType DisableQueueNtfReq)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((StoreError -> AgentErrorType)
-> Either StoreError DisableQueueNtfReq
-> Either AgentErrorType DisableQueueNtfReq
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError) (IO (Either StoreError DisableQueueNtfReq)
 -> IO (Either AgentErrorType DisableQueueNtfReq))
-> IO (Either StoreError DisableQueueNtfReq)
-> IO (Either AgentErrorType DisableQueueNtfReq)
forall a b. (a -> b) -> a -> b
$ ExceptT StoreError IO DisableQueueNtfReq
-> IO (Either StoreError DisableQueueNtfReq)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT StoreError IO DisableQueueNtfReq
 -> IO (Either StoreError DisableQueueNtfReq))
-> ExceptT StoreError IO DisableQueueNtfReq
-> IO (Either StoreError DisableQueueNtfReq)
forall a b. (a -> b) -> a -> b
$ do
          IO () -> ExceptT StoreError IO ()
forall a. IO a -> ExceptT StoreError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT StoreError IO ())
-> IO () -> ExceptT StoreError IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> ConnId -> Maybe ClientNtfCreds -> IO ()
setRcvQueueNtfCreds Connection
db ConnId
connId Maybe ClientNtfCreds
forall a. Maybe a
Nothing
          RcvQueue
rq <- IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO (Either StoreError RcvQueue) -> ExceptT StoreError IO RcvQueue)
-> IO (Either StoreError RcvQueue)
-> ExceptT StoreError IO RcvQueue
forall a b. (a -> b) -> a -> b
$ Connection -> ConnId -> IO (Either StoreError RcvQueue)
getPrimaryRcvQueue Connection
db ConnId
connId
          DisableQueueNtfReq -> ExceptT StoreError IO DisableQueueNtfReq
forall a. a -> ExceptT StoreError IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NtfSubscription
sub, RcvQueue
rq)
        deleteSub :: DB.Connection -> RcvQueue -> IO ()
        deleteSub :: Connection -> RcvQueue -> IO ()
deleteSub Connection
db RcvQueue
rq = Connection -> ConnId -> IO ()
deleteNtfSubscription Connection
db (RcvQueue -> ConnId
forall q. SMPQueueRec q => q -> ConnId
qConnId RcvQueue
rq)

retrySubActions :: AgentClient -> [NtfSubscription] -> ([NtfSubscription] -> AM' [NtfSubscription]) -> AM ()
retrySubActions :: AgentClient
-> [NtfSubscription]
-> ([NtfSubscription] -> AM' [NtfSubscription])
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retrySubActions AgentClient
_ [] [NtfSubscription] -> AM' [NtfSubscription]
_ = () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
retrySubActions AgentClient
c [NtfSubscription]
subs [NtfSubscription] -> AM' [NtfSubscription]
action = do
  TVar [NtfSubscription]
v <- [NtfSubscription]
-> ExceptT AgentErrorType (ReaderT Env IO) (TVar [NtfSubscription])
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO [NtfSubscription]
subs
  RetryInterval
ri <- (Env -> RetryInterval)
-> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> RetryInterval)
 -> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval)
-> (Env -> RetryInterval)
-> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval
forall a b. (a -> b) -> a -> b
$ AgentConfig -> RetryInterval
reconnectInterval (AgentConfig -> RetryInterval)
-> (Env -> AgentConfig) -> Env -> RetryInterval
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
  RetryInterval
-> (Int64
    -> ExceptT AgentErrorType (ReaderT Env IO) ()
    -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
MonadIO m =>
RetryInterval -> (Int64 -> m a -> m a) -> m a
withRetryInterval RetryInterval
ri ((Int64
  -> ExceptT AgentErrorType (ReaderT Env IO) ()
  -> ExceptT AgentErrorType (ReaderT Env IO) ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Int64
    -> ExceptT AgentErrorType (ReaderT Env IO) ()
    -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Int64
_ ExceptT AgentErrorType (ReaderT Env IO) ()
loop -> do
    IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitWhileSuspended AgentClient
c
    IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitForUserNetwork AgentClient
c
    [NtfSubscription]
subs' <- TVar [NtfSubscription]
-> ExceptT AgentErrorType (ReaderT Env IO) [NtfSubscription]
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar [NtfSubscription]
v
    [NtfSubscription]
retrySubs <- AM' [NtfSubscription]
-> ExceptT AgentErrorType (ReaderT Env IO) [NtfSubscription]
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' [NtfSubscription]
 -> ExceptT AgentErrorType (ReaderT Env IO) [NtfSubscription])
-> AM' [NtfSubscription]
-> ExceptT AgentErrorType (ReaderT Env IO) [NtfSubscription]
forall a b. (a -> b) -> a -> b
$ [NtfSubscription] -> AM' [NtfSubscription]
action [NtfSubscription]
subs'
    Bool
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([NtfSubscription] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [NtfSubscription]
retrySubs) (ExceptT AgentErrorType (ReaderT Env IO) ()
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ do
      STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ TVar [NtfSubscription] -> [NtfSubscription] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [NtfSubscription]
v [NtfSubscription]
retrySubs
      AgentClient
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retryNetworkLoop AgentClient
c ExceptT AgentErrorType (ReaderT Env IO) ()
loop

--                                                (temporary errs, other errs, successes)
splitResults :: [(a, Either AgentErrorType r)] -> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults :: forall a r.
[(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults = ((a, Either AgentErrorType r)
 -> ([a], [(a, AgentErrorType)], [(a, r)])
 -> ([a], [(a, AgentErrorType)], [(a, r)]))
-> ([a], [(a, AgentErrorType)], [(a, r)])
-> [(a, Either AgentErrorType r)]
-> ([a], [(a, AgentErrorType)], [(a, r)])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (a, Either AgentErrorType r)
-> ([a], [(a, AgentErrorType)], [(a, r)])
-> ([a], [(a, AgentErrorType)], [(a, r)])
forall {a} {b}.
(a, Either AgentErrorType b)
-> ([a], [(a, AgentErrorType)], [(a, b)])
-> ([a], [(a, AgentErrorType)], [(a, b)])
addRes ([], [], [])
  where
    addRes :: (a, Either AgentErrorType b)
-> ([a], [(a, AgentErrorType)], [(a, b)])
-> ([a], [(a, AgentErrorType)], [(a, b)])
addRes (a
a, Either AgentErrorType b
r_) ([a]
as, [(a, AgentErrorType)]
errs, [(a, b)]
rs) = case Either AgentErrorType b
r_ of
      Right b
r -> ([a]
as, [(a, AgentErrorType)]
errs, (a
a, b
r) (a, b) -> [(a, b)] -> [(a, b)]
forall a. a -> [a] -> [a]
: [(a, b)]
rs)
      Left AgentErrorType
e
        | AgentErrorType -> Bool
temporaryOrHostError AgentErrorType
e -> (a
a a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
as, [(a, AgentErrorType)]
errs, [(a, b)]
rs)
        | Bool
otherwise -> ([a]
as, (a
a, AgentErrorType
e) (a, AgentErrorType)
-> [(a, AgentErrorType)] -> [(a, AgentErrorType)]
forall a. a -> [a] -> [a]
: [(a, AgentErrorType)]
errs, [(a, b)]
rs)

rescheduleWork :: TMVar () -> UTCTime -> UTCTime -> AM' ()
rescheduleWork :: TMVar () -> NtfActionTs -> NtfActionTs -> AM' ()
rescheduleWork TMVar ()
doWork NtfActionTs
ts NtfActionTs
actionTs = do
  ReaderT Env IO (Maybe ()) -> AM' ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ReaderT Env IO (Maybe ()) -> AM' ())
-> (STM (Maybe ()) -> ReaderT Env IO (Maybe ()))
-> STM (Maybe ())
-> AM' ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe ()) -> ReaderT Env IO (Maybe ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe ()) -> AM' ()) -> STM (Maybe ()) -> AM' ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> STM (Maybe ())
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar ()
doWork
  ReaderT Env IO ThreadId -> AM' ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ReaderT Env IO ThreadId -> AM' ())
-> (AM' () -> ReaderT Env IO ThreadId) -> AM' () -> AM' ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AM' () -> ReaderT Env IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (AM' () -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$ do
    IO () -> AM' ()
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM' ()) -> IO () -> AM' ()
forall a b. (a -> b) -> a -> b
$ Int64 -> IO ()
threadDelay' (Int64 -> IO ()) -> Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Int64
diffToMicroseconds (NominalDiffTime -> Int64) -> NominalDiffTime -> Int64
forall a b. (a -> b) -> a -> b
$ NtfActionTs -> NtfActionTs -> NominalDiffTime
diffUTCTime NtfActionTs
actionTs NtfActionTs
ts
    STM () -> AM' ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM' ()) -> STM () -> AM' ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> STM ()
hasWorkToDo' TMVar ()
doWork

retryNetworkLoop :: AgentClient -> AM () -> AM ()
retryNetworkLoop :: AgentClient
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retryNetworkLoop AgentClient
c ExceptT AgentErrorType (ReaderT Env IO) ()
loop = do
  STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> AgentOperation -> STM ()
endAgentOperation AgentClient
c AgentOperation
AONtfNetwork
  IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
throwWhenInactive AgentClient
c
  STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> AgentOperation -> STM ()
beginAgentOperation AgentClient
c AgentOperation
AONtfNetwork
  ExceptT AgentErrorType (ReaderT Env IO) ()
loop

workerErrors :: AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors :: AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors AgentClient
c [(ConnId, AgentErrorType)]
connErrs =
  Bool -> AM' () -> AM' ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(ConnId, AgentErrorType)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(ConnId, AgentErrorType)]
connErrs) (AM' () -> AM' ()) -> AM' () -> AM' ()
forall a b. (a -> b) -> a -> b
$ do
    ReaderT Env IO [Either AgentErrorType ()] -> AM' ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ReaderT Env IO [Either AgentErrorType ()] -> AM' ())
-> ReaderT Env IO [Either AgentErrorType ()] -> AM' ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> ((ConnId, AgentErrorType) -> IO ())
-> [(ConnId, AgentErrorType)] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Connection -> ConnId -> IO ()
setNullNtfSubscriptionAction Connection
db (ConnId -> IO ())
-> ((ConnId, AgentErrorType) -> ConnId)
-> (ConnId, AgentErrorType)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ConnId, AgentErrorType) -> ConnId
forall a b. (a, b) -> a
fst) [(ConnId, AgentErrorType)]
connErrs)
    AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c [(ConnId, AgentErrorType)]
connErrs

workerInternalError :: AgentClient -> ConnId -> String -> AM ()
workerInternalError :: AgentClient
-> ConnId -> String -> ExceptT AgentErrorType (ReaderT Env IO) ()
workerInternalError AgentClient
c ConnId
connId String
internalErrStr = do
  AgentClient
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> ConnId -> IO ()
setNullNtfSubscriptionAction Connection
db ConnId
connId
  AgentClient
-> ConnId -> String -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> ConnId -> String -> m ()
notifyInternalError AgentClient
c ConnId
connId String
internalErrStr

-- TODO change error
notifyInternalError :: MonadIO m => AgentClient -> ConnId -> String -> m ()
notifyInternalError :: forall (m :: * -> *).
MonadIO m =>
AgentClient -> ConnId -> String -> m ()
notifyInternalError AgentClient {TBQueue ATransmission
subQ :: TBQueue ATransmission
$sel:subQ:AgentClient :: AgentClient -> TBQueue ATransmission
subQ} ConnId
connId String
internalErrStr = do
  Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logError (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
internalErrStr
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue ATransmission -> ATransmission -> IO ()
forall a. TBQueue a -> a -> IO ()
nonBlockingWriteTBQueue TBQueue ATransmission
subQ (ConnId
"", ConnId
connId, SAEntity 'AEConn -> AEvent 'AEConn -> AEvt
forall (e :: AEntity). AEntityI e => SAEntity e -> AEvent e -> AEvt
AEvt SAEntity 'AEConn
SAEConn (AEvent 'AEConn -> AEvt) -> AEvent 'AEConn -> AEvt
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AEvent 'AEConn
ERR (AgentErrorType -> AEvent 'AEConn)
-> AgentErrorType -> AEvent 'AEConn
forall a b. (a -> b) -> a -> b
$ String -> AgentErrorType
INTERNAL String
internalErrStr)

notifyInternalError' :: MonadIO m => AgentClient -> String -> m ()
notifyInternalError' :: forall (m :: * -> *). MonadIO m => AgentClient -> String -> m ()
notifyInternalError' AgentClient
c = AgentClient -> ConnId -> String -> m ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> ConnId -> String -> m ()
notifyInternalError AgentClient
c ConnId
""
{-# INLINE notifyInternalError' #-}

notifyErrs :: MonadIO m => AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs :: forall (m :: * -> *).
MonadIO m =>
AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient
c [(ConnId, AgentErrorType)]
errs_ = Maybe (NonEmpty (ConnId, AgentErrorType))
-> (NonEmpty (ConnId, AgentErrorType) -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([(ConnId, AgentErrorType)]
-> Maybe (NonEmpty (ConnId, AgentErrorType))
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [(ConnId, AgentErrorType)]
errs_) ((NonEmpty (ConnId, AgentErrorType) -> m ()) -> m ())
-> (NonEmpty (ConnId, AgentErrorType) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \NonEmpty (ConnId, AgentErrorType)
errs -> do
  Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logError (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"notifyErrs: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> NonEmpty (ConnId, AgentErrorType) -> Text
forall a. Show a => a -> Text
tshow NonEmpty (ConnId, AgentErrorType)
errs
  AgentClient -> AEvent 'AENone -> m ()
forall (m :: * -> *).
MonadIO m =>
AgentClient -> AEvent 'AENone -> m ()
notifySub AgentClient
c (AEvent 'AENone -> m ()) -> AEvent 'AENone -> m ()
forall a b. (a -> b) -> a -> b
$ NonEmpty (ConnId, AgentErrorType) -> AEvent 'AENone
ERRS NonEmpty (ConnId, AgentErrorType)
errs

getNtfToken :: AM' (Maybe NtfToken)
getNtfToken :: ReaderT Env IO (Maybe NtfToken)
getNtfToken = do
  TVar (Maybe NtfToken)
tkn <- (Env -> TVar (Maybe NtfToken))
-> ReaderT Env IO (TVar (Maybe NtfToken))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TVar (Maybe NtfToken))
 -> ReaderT Env IO (TVar (Maybe NtfToken)))
-> (Env -> TVar (Maybe NtfToken))
-> ReaderT Env IO (TVar (Maybe NtfToken))
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TVar (Maybe NtfToken)
ntfTkn (NtfSupervisor -> TVar (Maybe NtfToken))
-> (Env -> NtfSupervisor) -> Env -> TVar (Maybe NtfToken)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> NtfSupervisor
ntfSupervisor
  TVar (Maybe NtfToken) -> ReaderT Env IO (Maybe NtfToken)
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar (Maybe NtfToken)
tkn

nsUpdateToken :: NtfSupervisor -> NtfToken -> STM ()
nsUpdateToken :: NtfSupervisor -> NtfToken -> STM ()
nsUpdateToken NtfSupervisor
ns NtfToken
tkn = TVar (Maybe NtfToken) -> Maybe NtfToken -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (NtfSupervisor -> TVar (Maybe NtfToken)
ntfTkn NtfSupervisor
ns) (Maybe NtfToken -> STM ()) -> Maybe NtfToken -> STM ()
forall a b. (a -> b) -> a -> b
$ NtfToken -> Maybe NtfToken
forall a. a -> Maybe a
Just NtfToken
tkn

nsRemoveNtfToken :: NtfSupervisor -> STM ()
nsRemoveNtfToken :: NtfSupervisor -> STM ()
nsRemoveNtfToken NtfSupervisor
ns = TVar (Maybe NtfToken) -> Maybe NtfToken -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (NtfSupervisor -> TVar (Maybe NtfToken)
ntfTkn NtfSupervisor
ns) Maybe NtfToken
forall a. Maybe a
Nothing

sendNtfSubCommand :: NtfSupervisor -> (NtfSupervisorCommand, NonEmpty ConnId) -> IO ()
sendNtfSubCommand :: NtfSupervisor -> (NtfSupervisorCommand, NonEmpty ConnId) -> IO ()
sendNtfSubCommand NtfSupervisor
ns (NtfSupervisorCommand, NonEmpty ConnId)
cmd =
  IO Bool -> IO () -> IO ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (NtfSupervisor -> IO Bool
hasInstantNotifications NtfSupervisor
ns) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
-> (NtfSupervisorCommand, NonEmpty ConnId) -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (NtfSupervisor -> TBQueue (NtfSupervisorCommand, NonEmpty ConnId)
ntfSubQ NtfSupervisor
ns) (NtfSupervisorCommand, NonEmpty ConnId)
cmd

hasInstantNotifications :: NtfSupervisor -> IO Bool
hasInstantNotifications :: NtfSupervisor -> IO Bool
hasInstantNotifications NtfSupervisor
ns = do
  Maybe NtfToken
tkn <- TVar (Maybe NtfToken) -> IO (Maybe NtfToken)
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar (Maybe NtfToken) -> IO (Maybe NtfToken))
-> TVar (Maybe NtfToken) -> IO (Maybe NtfToken)
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TVar (Maybe NtfToken)
ntfTkn NtfSupervisor
ns
  Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> (NtfToken -> Bool) -> Maybe NtfToken -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
False NtfToken -> Bool
instantNotifications Maybe NtfToken
tkn

instantNotifications :: NtfToken -> Bool
instantNotifications :: NtfToken -> Bool
instantNotifications NtfToken {$sel:ntfTknStatus:NtfToken :: NtfToken -> NtfTknStatus
ntfTknStatus = NtfTknStatus
NTActive, $sel:ntfMode:NtfToken :: NtfToken -> NotificationsMode
ntfMode = NotificationsMode
NMInstant} = Bool
True
instantNotifications NtfToken
_ = Bool
False
{-# INLINE instantNotifications #-}

deleteToken :: AgentClient -> NtfToken -> AM ()
deleteToken :: AgentClient
-> NtfToken -> ExceptT AgentErrorType (ReaderT Env IO) ()
deleteToken AgentClient
c tkn :: NtfToken
tkn@NtfToken {NtfServer
$sel:ntfServer:NtfToken :: NtfToken -> NtfServer
ntfServer :: NtfServer
ntfServer, Maybe NtfSubscriptionId
$sel:ntfTokenId:NtfToken :: NtfToken -> Maybe NtfSubscriptionId
ntfTokenId :: Maybe NtfSubscriptionId
ntfTokenId, APrivateAuthKey
ntfPrivKey :: APrivateAuthKey
$sel:ntfPrivKey:NtfToken :: NtfToken -> APrivateAuthKey
ntfPrivKey} = do
  Bool
setToDelete <- AgentClient -> (Connection -> IO Bool) -> AM Bool
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO Bool) -> AM Bool)
-> (Connection -> IO Bool) -> AM Bool
forall a b. (a -> b) -> a -> b
$ \Connection
db -> do
    Connection -> NtfToken -> IO ()
removeNtfToken Connection
db NtfToken
tkn
    case Maybe NtfSubscriptionId
ntfTokenId of
      Just NtfSubscriptionId
tknId -> Connection
-> NtfServer -> APrivateAuthKey -> NtfSubscriptionId -> IO ()
addNtfTokenToDelete Connection
db NtfServer
ntfServer APrivateAuthKey
ntfPrivKey NtfSubscriptionId
tknId IO () -> Bool -> IO Bool
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Bool
True
      Maybe NtfSubscriptionId
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
  NtfSupervisor
ns <- (Env -> NtfSupervisor)
-> ExceptT AgentErrorType (ReaderT Env IO) NtfSupervisor
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> NtfSupervisor
ntfSupervisor
  STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> STM () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> STM ()
nsRemoveNtfToken NtfSupervisor
ns
  Bool
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
setToDelete (ExceptT AgentErrorType (ReaderT Env IO) ()
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ ExceptT AgentErrorType (ReaderT Env IO) Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ExceptT AgentErrorType (ReaderT Env IO) Worker
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AM' Worker -> ExceptT AgentErrorType (ReaderT Env IO) Worker
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (AM' Worker -> ExceptT AgentErrorType (ReaderT Env IO) Worker)
-> AM' Worker -> ExceptT AgentErrorType (ReaderT Env IO) Worker
forall a b. (a -> b) -> a -> b
$ Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfTknDelWorker Bool
True AgentClient
c NtfServer
ntfServer

runNtfTknDelWorker :: AgentClient -> NtfServer -> Worker -> AM ()
runNtfTknDelWorker :: AgentClient
-> NtfServer
-> Worker
-> ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfTknDelWorker AgentClient
c NtfServer
srv Worker {TMVar ()
$sel:doWork:Worker :: Worker -> TMVar ()
doWork :: TMVar ()
doWork} =
  ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ExceptT AgentErrorType (ReaderT Env IO) ()
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ do
    TMVar () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). MonadIO m => TMVar () -> m ()
waitForWork TMVar ()
doWork
    ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ReaderT Env IO (Either AgentErrorType ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> AgentOperation
-> (AgentClient -> IO ())
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
AgentClient
-> AgentOperation -> (AgentClient -> IO ()) -> m a -> m a
agentOperationBracket AgentClient
c AgentOperation
AONtfNetwork AgentClient -> IO ()
throwWhenInactive (ReaderT Env IO (Either AgentErrorType ())
 -> ReaderT Env IO (Either AgentErrorType ()))
-> ReaderT Env IO (Either AgentErrorType ())
-> ReaderT Env IO (Either AgentErrorType ())
forall a b. (a -> b) -> a -> b
$ ExceptT AgentErrorType (ReaderT Env IO) ()
-> ReaderT Env IO (Either AgentErrorType ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfOperation
  where
    runNtfOperation :: AM ()
    runNtfOperation :: ExceptT AgentErrorType (ReaderT Env IO) ()
runNtfOperation =
      AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe NtfTokenToDelete)))
-> (NtfTokenToDelete -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a.
AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe a)))
-> (a -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
withWork AgentClient
c TMVar ()
doWork (Connection
-> NtfServer -> IO (Either StoreError (Maybe NtfTokenToDelete))
`getNextNtfTokenToDelete` NtfServer
srv) ((NtfTokenToDelete -> ExceptT AgentErrorType (ReaderT Env IO) ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (NtfTokenToDelete -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$
        \NtfTokenToDelete
nextTknToDelete -> do
          Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logInfo (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"runNtfTknDelWorker, nextTknToDelete " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> NtfTokenToDelete -> Text
forall a. Show a => a -> Text
tshow NtfTokenToDelete
nextTknToDelete
          RetryInterval
ri <- (Env -> RetryInterval)
-> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> RetryInterval)
 -> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval)
-> (Env -> RetryInterval)
-> ExceptT AgentErrorType (ReaderT Env IO) RetryInterval
forall a b. (a -> b) -> a -> b
$ AgentConfig -> RetryInterval
reconnectInterval (AgentConfig -> RetryInterval)
-> (Env -> AgentConfig) -> Env -> RetryInterval
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AgentConfig
config
          RetryInterval
-> (Int64
    -> ExceptT AgentErrorType (ReaderT Env IO) ()
    -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *) a.
MonadIO m =>
RetryInterval -> (Int64 -> m a -> m a) -> m a
withRetryInterval RetryInterval
ri ((Int64
  -> ExceptT AgentErrorType (ReaderT Env IO) ()
  -> ExceptT AgentErrorType (ReaderT Env IO) ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Int64
    -> ExceptT AgentErrorType (ReaderT Env IO) ()
    -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Int64
_ ExceptT AgentErrorType (ReaderT Env IO) ()
loop -> do
            IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitWhileSuspended AgentClient
c
            IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> IO () -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitForUserNetwork AgentClient
c
            NtfTokenToDelete -> ExceptT AgentErrorType (ReaderT Env IO) ()
processTknToDelete NtfTokenToDelete
nextTknToDelete ExceptT AgentErrorType (ReaderT Env IO) ()
-> (AgentErrorType -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
`catchAllErrors` ExceptT AgentErrorType (ReaderT Env IO) ()
-> NtfTokenToDelete
-> AgentErrorType
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retryTmpError ExceptT AgentErrorType (ReaderT Env IO) ()
loop NtfTokenToDelete
nextTknToDelete
    retryTmpError :: AM () -> NtfTokenToDelete -> AgentErrorType -> AM ()
    retryTmpError :: ExceptT AgentErrorType (ReaderT Env IO) ()
-> NtfTokenToDelete
-> AgentErrorType
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retryTmpError ExceptT AgentErrorType (ReaderT Env IO) ()
loop (Int64
tknDbId, APrivateAuthKey
_, NtfSubscriptionId
_) AgentErrorType
e = do
      Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). (HasCallStack, MonadIO m) => Text -> m ()
logError (Text -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> Text -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"ntf tkn del error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> AgentErrorType -> Text
forall a. Show a => a -> Text
tshow AgentErrorType
e
      if AgentErrorType -> Bool
temporaryOrHostError AgentErrorType
e
        then AgentClient
-> ExceptT AgentErrorType (ReaderT Env IO) ()
-> ExceptT AgentErrorType (ReaderT Env IO) ()
retryNetworkLoop AgentClient
c ExceptT AgentErrorType (ReaderT Env IO) ()
loop
        else do
          AgentClient
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> IO ()
deleteNtfTokenToDelete Connection
db Int64
tknDbId
          AgentClient -> String -> ExceptT AgentErrorType (ReaderT Env IO) ()
forall (m :: * -> *). MonadIO m => AgentClient -> String -> m ()
notifyInternalError' AgentClient
c (AgentErrorType -> String
forall a. Show a => a -> String
show AgentErrorType
e)
    processTknToDelete :: NtfTokenToDelete -> AM ()
    processTknToDelete :: NtfTokenToDelete -> ExceptT AgentErrorType (ReaderT Env IO) ()
processTknToDelete (Int64
tknDbId, APrivateAuthKey
ntfPrivKey, NtfSubscriptionId
tknId) = do
      AgentClient
-> NetworkRequestMode
-> NtfServer
-> APrivateAuthKey
-> NtfSubscriptionId
-> ExceptT AgentErrorType (ReaderT Env IO) ()
agentNtfDeleteToken AgentClient
c NetworkRequestMode
NRMBackground NtfServer
srv APrivateAuthKey
ntfPrivKey NtfSubscriptionId
tknId
      AgentClient
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ())
 -> ExceptT AgentErrorType (ReaderT Env IO) ())
-> (Connection -> IO ())
-> ExceptT AgentErrorType (ReaderT Env IO) ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> IO ()
deleteNtfTokenToDelete Connection
db Int64
tknDbId

closeNtfSupervisor :: NtfSupervisor -> IO ()
closeNtfSupervisor :: NtfSupervisor -> IO ()
closeNtfSupervisor NtfSupervisor
ns = do
  TMap NtfServer Worker -> IO ()
forall {m :: * -> *} {k}. MonadIO m => TVar (Map k Worker) -> m ()
stopWorkers (TMap NtfServer Worker -> IO ()) -> TMap NtfServer Worker -> IO ()
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap NtfServer Worker
ntfWorkers NtfSupervisor
ns
  TMap SMPServer Worker -> IO ()
forall {m :: * -> *} {k}. MonadIO m => TVar (Map k Worker) -> m ()
stopWorkers (TMap SMPServer Worker -> IO ()) -> TMap SMPServer Worker -> IO ()
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap SMPServer Worker
ntfSMPWorkers NtfSupervisor
ns
  TMap NtfServer Worker -> IO ()
forall {m :: * -> *} {k}. MonadIO m => TVar (Map k Worker) -> m ()
stopWorkers (TMap NtfServer Worker -> IO ()) -> TMap NtfServer Worker -> IO ()
forall a b. (a -> b) -> a -> b
$ NtfSupervisor -> TMap NtfServer Worker
ntfTknDelWorkers NtfSupervisor
ns
  where
    stopWorkers :: TVar (Map k Worker) -> m ()
stopWorkers TVar (Map k Worker)
workers = STM (Map k Worker) -> m (Map k Worker)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar (Map k Worker) -> Map k Worker -> STM (Map k Worker)
forall a. TVar a -> a -> STM a
swapTVar TVar (Map k Worker)
workers Map k Worker
forall k a. Map k a
M.empty) m (Map k Worker) -> (Map k Worker -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Worker -> m ()) -> Map k Worker -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Worker -> IO ()) -> Worker -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Worker -> IO ()
cancelWorker)

getNtfServer :: AgentClient -> AM' (Maybe NtfServer)
getNtfServer :: AgentClient -> AM' (Maybe NtfServer)
getNtfServer AgentClient
c = do
  [NtfServer]
ntfServers <- TVar [NtfServer] -> ReaderT Env IO [NtfServer]
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar [NtfServer] -> ReaderT Env IO [NtfServer])
-> TVar [NtfServer] -> ReaderT Env IO [NtfServer]
forall a b. (a -> b) -> a -> b
$ AgentClient -> TVar [NtfServer]
ntfServers AgentClient
c
  case [NtfServer]
ntfServers of
    [] -> Maybe NtfServer -> AM' (Maybe NtfServer)
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NtfServer
forall a. Maybe a
Nothing
    [Item [NtfServer]
srv] -> Maybe NtfServer -> AM' (Maybe NtfServer)
forall a. a -> ReaderT Env IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe NtfServer -> AM' (Maybe NtfServer))
-> Maybe NtfServer -> AM' (Maybe NtfServer)
forall a b. (a -> b) -> a -> b
$ NtfServer -> Maybe NtfServer
forall a. a -> Maybe a
Just Item [NtfServer]
NtfServer
srv
    [NtfServer]
servers -> do
      TVar StdGen
gen <- (Env -> TVar StdGen) -> ReaderT Env IO (TVar StdGen)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> TVar StdGen
randomServer
      STM (Maybe NtfServer) -> AM' (Maybe NtfServer)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe NtfServer) -> AM' (Maybe NtfServer))
-> ((StdGen -> (Maybe NtfServer, StdGen)) -> STM (Maybe NtfServer))
-> (StdGen -> (Maybe NtfServer, StdGen))
-> AM' (Maybe NtfServer)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar StdGen
-> (StdGen -> (Maybe NtfServer, StdGen)) -> STM (Maybe NtfServer)
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar StdGen
gen ((StdGen -> (Maybe NtfServer, StdGen)) -> AM' (Maybe NtfServer))
-> (StdGen -> (Maybe NtfServer, StdGen)) -> AM' (Maybe NtfServer)
forall a b. (a -> b) -> a -> b
$
        (Int -> Maybe NtfServer)
-> (Int, StdGen) -> (Maybe NtfServer, StdGen)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (NtfServer -> Maybe NtfServer
forall a. a -> Maybe a
Just (NtfServer -> Maybe NtfServer)
-> (Int -> NtfServer) -> Int -> Maybe NtfServer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([NtfServer]
servers [NtfServer] -> Int -> NtfServer
forall a. HasCallStack => [a] -> Int -> a
!!)) ((Int, StdGen) -> (Maybe NtfServer, StdGen))
-> (StdGen -> (Int, StdGen)) -> StdGen -> (Maybe NtfServer, StdGen)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int, Int) -> StdGen -> (Int, StdGen)
forall g. RandomGen g => (Int, Int) -> g -> (Int, g)
forall a g. (Random a, RandomGen g) => (a, a) -> g -> (a, g)
randomR (Int
0, [NtfServer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [NtfServer]
servers Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)