{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
module Simplex.Messaging.Client.Agent
( SMPClientAgent (..),
SMPClientAgentConfig (..),
SMPClientAgentEvent (..),
OwnServer,
defaultSMPClientAgentConfig,
newSMPClientAgent,
getSMPServerClient'',
getConnectedSMPServerClient,
closeSMPClientAgent,
lookupSMPServerClient,
isOwnServer,
subscribeServiceNtfs,
subscribeQueuesNtfs,
activeClientSession',
removeActiveSub,
removeActiveSubs,
removePendingSub,
removePendingSubs,
)
where
import Control.Concurrent (forkIO)
import Control.Concurrent.Async (Async, uninterruptibleCancel)
import Control.Concurrent.STM (retry)
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Constraint (Dict (..))
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isJust, isNothing)
import qualified Data.Set as S
import Data.Text.Encoding
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
import Numeric.Natural
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
( BrokerMsg,
ErrorType,
NotifierId,
NtfPrivateAuthKey,
Party (..),
PartyI,
ProtocolServer (..),
QueueId,
SMPServer,
SParty (..),
ServiceParty,
serviceParty,
partyServiceRole
)
import Simplex.Messaging.Session
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Util (catchAll_, ifM, safeDecodeUtf8, toChunks, tshow, whenM, ($>>=), (<$$>))
import System.Timeout (timeout)
import UnliftIO (async)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) (OwnServer, SMPClient))
data SMPClientAgentEvent
= CAConnected SMPServer (Maybe ServiceId)
| CADisconnected SMPServer (NonEmpty QueueId)
| CASubscribed SMPServer (Maybe ServiceId) (NonEmpty QueueId)
| CASubError SMPServer (NonEmpty (QueueId, SMPClientError))
| CAServiceDisconnected SMPServer (ServiceId, Int64)
| CAServiceSubscribed SMPServer (ServiceId, Int64) Int64
| CAServiceSubError SMPServer (ServiceId, Int64) SMPClientError
| CAServiceUnavailable SMPServer (ServiceId, Int64)
data SMPClientAgentConfig = SMPClientAgentConfig
{ SMPClientAgentConfig -> ProtocolClientConfig SMPVersion
smpCfg :: ProtocolClientConfig SMPVersion,
SMPClientAgentConfig -> RetryInterval
reconnectInterval :: RetryInterval,
SMPClientAgentConfig -> NominalDiffTime
persistErrorInterval :: NominalDiffTime,
SMPClientAgentConfig -> Natural
msgQSize :: Natural,
SMPClientAgentConfig -> Natural
agentQSize :: Natural,
SMPClientAgentConfig -> Int
agentSubsBatchSize :: Int,
SMPClientAgentConfig -> [ByteString]
ownServerDomains :: [ByteString]
}
defaultSMPClientAgentConfig :: SMPClientAgentConfig
defaultSMPClientAgentConfig :: SMPClientAgentConfig
defaultSMPClientAgentConfig =
SMPClientAgentConfig
{ $sel:smpCfg:SMPClientAgentConfig :: ProtocolClientConfig SMPVersion
smpCfg = ProtocolClientConfig SMPVersion
defaultSMPClientConfig,
$sel:reconnectInterval:SMPClientAgentConfig :: RetryInterval
reconnectInterval =
RetryInterval
{ initialInterval :: Int64
initialInterval = Int64
second,
increaseAfter :: Int64
increaseAfter = Int64
10 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
second,
maxInterval :: Int64
maxInterval = Int64
10 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
second
},
$sel:persistErrorInterval:SMPClientAgentConfig :: NominalDiffTime
persistErrorInterval = NominalDiffTime
30,
$sel:msgQSize:SMPClientAgentConfig :: Natural
msgQSize = Natural
2048,
$sel:agentQSize:SMPClientAgentConfig :: Natural
agentQSize = Natural
2048,
$sel:agentSubsBatchSize:SMPClientAgentConfig :: Int
agentSubsBatchSize = Int
1360,
$sel:ownServerDomains:SMPClientAgentConfig :: [ByteString]
ownServerDomains = []
}
where
second :: Int64
second = Int64
1000000
data SMPClientAgent p = SMPClientAgent
{ forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig,
forall (p :: Party). SMPClientAgent p -> SParty p
agentParty :: SParty p,
forall (p :: Party). SMPClientAgent p -> TVar Bool
active :: TVar Bool,
forall (p :: Party). SMPClientAgent p -> UTCTime
startedAt :: UTCTime,
forall (p :: Party).
SMPClientAgent p
-> TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg),
forall (p :: Party).
SMPClientAgent p -> TBQueue SMPClientAgentEvent
agentQ :: TBQueue SMPClientAgentEvent,
forall (p :: Party). SMPClientAgent p -> TVar ChaChaDRG
randomDrg :: TVar ChaChaDRG,
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients :: TMap SMPServer SMPClientVar,
forall (p :: Party).
SMPClientAgent p -> TMap ByteString (Bool, SMPClient)
smpSessions :: TMap SessionId (OwnServer, SMPClient),
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs :: TMap SMPServer (TVar (Maybe ((ServiceId, Int64), SessionId))),
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs :: TMap SMPServer (TMap QueueId (SessionId, C.APrivateAuthKey)),
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs :: TMap SMPServer (TVar (Maybe (ServiceId, Int64))),
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs :: TMap SMPServer (TMap QueueId C.APrivateAuthKey),
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (SessionVar (Async ()))
smpSubWorkers :: TMap SMPServer (SessionVar (Async ())),
forall (p :: Party). SMPClientAgent p -> TVar Int
workerSeq :: TVar Int
}
type OwnServer = Bool
newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> TVar ChaChaDRG -> IO (SMPClientAgent p)
newSMPClientAgent :: forall (p :: Party).
SParty p
-> SMPClientAgentConfig -> TVar ChaChaDRG -> IO (SMPClientAgent p)
newSMPClientAgent SParty p
agentParty agentCfg :: SMPClientAgentConfig
agentCfg@SMPClientAgentConfig {Natural
$sel:msgQSize:SMPClientAgentConfig :: SMPClientAgentConfig -> Natural
msgQSize :: Natural
msgQSize, Natural
$sel:agentQSize:SMPClientAgentConfig :: SMPClientAgentConfig -> Natural
agentQSize :: Natural
agentQSize} TVar ChaChaDRG
randomDrg = do
TVar Bool
active <- Bool -> IO (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
True
UTCTime
startedAt <- IO UTCTime
getCurrentTime
TBQueue
((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
ByteString,
NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg))
msgQ <- Natural
-> IO
(TBQueue
((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
ByteString,
NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg)))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
msgQSize
TBQueue SMPClientAgentEvent
agentQ <- Natural -> IO (TBQueue SMPClientAgentEvent)
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
agentQSize
TMap SMPServer SMPClientVar
smpClients <- IO (TMap SMPServer SMPClientVar)
forall k a. IO (TMap k a)
TM.emptyIO
TMap ByteString (Bool, SMPClient)
smpSessions <- IO (TMap ByteString (Bool, SMPClient))
forall k a. IO (TMap k a)
TM.emptyIO
TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs <- IO (TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString))))
forall k a. IO (TMap k a)
TM.emptyIO
TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs <- IO (TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey)))
forall k a. IO (TMap k a)
TM.emptyIO
TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs <- IO (TMap SMPServer (TVar (Maybe (ServiceId, Int64))))
forall k a. IO (TMap k a)
TM.emptyIO
TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs <- IO (TMap SMPServer (TMap ServiceId APrivateAuthKey))
forall k a. IO (TMap k a)
TM.emptyIO
TMap SMPServer (SessionVar (Async ()))
smpSubWorkers <- IO (TMap SMPServer (SessionVar (Async ())))
forall k a. IO (TMap k a)
TM.emptyIO
TVar Int
workerSeq <- Int -> IO (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
0
SMPClientAgent p -> IO (SMPClientAgent p)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
SMPClientAgent
{ SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg,
SParty p
$sel:agentParty:SMPClientAgent :: SParty p
agentParty :: SParty p
agentParty,
TVar Bool
$sel:active:SMPClientAgent :: TVar Bool
active :: TVar Bool
active,
UTCTime
$sel:startedAt:SMPClientAgent :: UTCTime
startedAt :: UTCTime
startedAt,
TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
TBQueue
((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
ByteString,
NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg))
$sel:msgQ:SMPClientAgent :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
msgQ :: TBQueue
((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
ByteString,
NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg))
msgQ,
TBQueue SMPClientAgentEvent
$sel:agentQ:SMPClientAgent :: TBQueue SMPClientAgentEvent
agentQ :: TBQueue SMPClientAgentEvent
agentQ,
TVar ChaChaDRG
$sel:randomDrg:SMPClientAgent :: TVar ChaChaDRG
randomDrg :: TVar ChaChaDRG
randomDrg,
TMap SMPServer SMPClientVar
$sel:smpClients:SMPClientAgent :: TMap SMPServer SMPClientVar
smpClients :: TMap SMPServer SMPClientVar
smpClients,
TMap ByteString (Bool, SMPClient)
$sel:smpSessions:SMPClientAgent :: TMap ByteString (Bool, SMPClient)
smpSessions :: TMap ByteString (Bool, SMPClient)
smpSessions,
TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
$sel:activeServiceSubs:SMPClientAgent :: TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs :: TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs,
TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
$sel:activeQueueSubs:SMPClientAgent :: TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs :: TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs,
TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
$sel:pendingServiceSubs:SMPClientAgent :: TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs :: TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs,
TMap SMPServer (TMap ServiceId APrivateAuthKey)
$sel:pendingQueueSubs:SMPClientAgent :: TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs :: TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs,
TMap SMPServer (SessionVar (Async ()))
$sel:smpSubWorkers:SMPClientAgent :: TMap SMPServer (SessionVar (Async ()))
smpSubWorkers :: TMap SMPServer (SessionVar (Async ()))
smpSubWorkers,
TVar Int
$sel:workerSeq:SMPClientAgent :: TVar Int
workerSeq :: TVar Int
workerSeq
}
getSMPServerClient' :: SMPClientAgent p -> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' :: forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' SMPClientAgent p
ca SMPServer
srv = (Bool, SMPClient) -> SMPClient
forall a b. (a, b) -> b
snd ((Bool, SMPClient) -> SMPClient)
-> ExceptT SMPClientError IO (Bool, SMPClient)
-> ExceptT SMPClientError IO SMPClient
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO (Bool, SMPClient)
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO (Bool, SMPClient)
getSMPServerClient'' SMPClientAgent p
ca SMPServer
srv
{-# INLINE getSMPServerClient' #-}
getSMPServerClient'' :: SMPClientAgent p -> SMPServer -> ExceptT SMPClientError IO (OwnServer, SMPClient)
getSMPServerClient'' :: forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO (Bool, SMPClient)
getSMPServerClient'' ca :: SMPClientAgent p
ca@SMPClientAgent {SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg, TMap SMPServer SMPClientVar
$sel:smpClients:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients :: TMap SMPServer SMPClientVar
smpClients, TMap ByteString (Bool, SMPClient)
$sel:smpSessions:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap ByteString (Bool, SMPClient)
smpSessions :: TMap ByteString (Bool, SMPClient)
smpSessions, TVar Int
$sel:workerSeq:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> TVar Int
workerSeq :: TVar Int
workerSeq} SMPServer
srv = do
UTCTime
ts <- IO UTCTime -> ExceptT SMPClientError IO UTCTime
forall a. IO a -> ExceptT SMPClientError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
STM (Either SMPClientVar SMPClientVar)
-> ExceptT SMPClientError IO (Either SMPClientVar SMPClientVar)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (UTCTime -> STM (Either SMPClientVar SMPClientVar)
getClientVar UTCTime
ts) ExceptT SMPClientError IO (Either SMPClientVar SMPClientVar)
-> (Either SMPClientVar SMPClientVar
-> ExceptT SMPClientError IO (Bool, SMPClient))
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall a b.
ExceptT SMPClientError IO a
-> (a -> ExceptT SMPClientError IO b)
-> ExceptT SMPClientError IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SMPClientVar -> ExceptT SMPClientError IO (Bool, SMPClient))
-> (SMPClientVar -> ExceptT SMPClientError IO (Bool, SMPClient))
-> Either SMPClientVar SMPClientVar
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (IO (Either SMPClientError (Bool, SMPClient))
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO (Either SMPClientError (Bool, SMPClient))
-> ExceptT SMPClientError IO (Bool, SMPClient))
-> (SMPClientVar -> IO (Either SMPClientError (Bool, SMPClient)))
-> SMPClientVar
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientVar -> IO (Either SMPClientError (Bool, SMPClient))
newSMPClient) SMPClientVar -> ExceptT SMPClientError IO (Bool, SMPClient)
waitForSMPClient
where
getClientVar :: UTCTime -> STM (Either SMPClientVar SMPClientVar)
getClientVar :: UTCTime -> STM (Either SMPClientVar SMPClientVar)
getClientVar = TVar Int
-> SMPServer
-> TMap SMPServer SMPClientVar
-> UTCTime
-> STM (Either SMPClientVar SMPClientVar)
forall k a.
Ord k =>
TVar Int
-> k
-> TMap k (SessionVar a)
-> UTCTime
-> STM (Either (SessionVar a) (SessionVar a))
getSessVar TVar Int
workerSeq SMPServer
srv TMap SMPServer SMPClientVar
smpClients
waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO (OwnServer, SMPClient)
waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO (Bool, SMPClient)
waitForSMPClient SMPClientVar
v = do
let ProtocolClientConfig {$sel:networkConfig:ProtocolClientConfig :: forall v. ProtocolClientConfig v -> NetworkConfig
networkConfig = NetworkConfig {NetworkTimeout
tcpConnectTimeout :: NetworkTimeout
$sel:tcpConnectTimeout:NetworkConfig :: NetworkConfig -> NetworkTimeout
tcpConnectTimeout}} = SMPClientAgentConfig -> ProtocolClientConfig SMPVersion
smpCfg SMPClientAgentConfig
agentCfg
Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
smpClient_ <- IO
(Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> ExceptT
SMPClientError
IO
(Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall a. IO a -> ExceptT SMPClientError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> ExceptT
SMPClientError
IO
(Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))))
-> IO
(Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> ExceptT
SMPClientError
IO
(Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall a b. (a -> b) -> a -> b
$ NetworkTimeout -> NetworkRequestMode -> Int
netTimeoutInt NetworkTimeout
tcpConnectTimeout NetworkRequestMode
NRMBackground Int
-> IO (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> IO
(Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall a. Int -> IO a -> IO (Maybe a)
`timeout` STM (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> IO (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> STM (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. TMVar a -> STM a
readTMVar (TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> STM (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> STM (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a b. (a -> b) -> a -> b
$ SMPClientVar
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. SessionVar a -> TMVar a
sessionVar SMPClientVar
v)
case Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
smpClient_ of
Just (Right (Bool, SMPClient)
smpClient) -> (Bool, SMPClient) -> ExceptT SMPClientError IO (Bool, SMPClient)
forall a. a -> ExceptT SMPClientError IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool, SMPClient)
smpClient
Just (Left (SMPClientError
e, Maybe UTCTime
ts_)) -> case Maybe UTCTime
ts_ of
Maybe UTCTime
Nothing -> SMPClientError -> ExceptT SMPClientError IO (Bool, SMPClient)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE SMPClientError
e
Just UTCTime
ts ->
ExceptT SMPClientError IO Bool
-> ExceptT SMPClientError IO (Bool, SMPClient)
-> ExceptT SMPClientError IO (Bool, SMPClient)
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
((UTCTime
ts UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
<) (UTCTime -> Bool)
-> ExceptT SMPClientError IO UTCTime
-> ExceptT SMPClientError IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UTCTime -> ExceptT SMPClientError IO UTCTime
forall a. IO a -> ExceptT SMPClientError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime)
(STM () -> ExceptT SMPClientError IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (SMPClientVar -> SMPServer -> TMap SMPServer SMPClientVar -> STM ()
forall k a.
Ord k =>
SessionVar a -> k -> TMap k (SessionVar a) -> STM ()
removeSessVar SMPClientVar
v SMPServer
srv TMap SMPServer SMPClientVar
smpClients) ExceptT SMPClientError IO ()
-> ExceptT SMPClientError IO (Bool, SMPClient)
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall a b.
ExceptT SMPClientError IO a
-> ExceptT SMPClientError IO b -> ExceptT SMPClientError IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO (Bool, SMPClient)
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO (Bool, SMPClient)
getSMPServerClient'' SMPClientAgent p
ca SMPServer
srv)
(SMPClientError -> ExceptT SMPClientError IO (Bool, SMPClient)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE SMPClientError
e)
Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
Nothing -> SMPClientError -> ExceptT SMPClientError IO (Bool, SMPClient)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE SMPClientError
forall err. ProtocolClientError err
PCEResponseTimeout
newSMPClient :: SMPClientVar -> IO (Either SMPClientError (OwnServer, SMPClient))
newSMPClient :: SMPClientVar -> IO (Either SMPClientError (Bool, SMPClient))
newSMPClient SMPClientVar
v = do
Either SMPClientError SMPClient
r <- SMPClientAgent p
-> SMPServer
-> SMPClientVar
-> IO (Either SMPClientError SMPClient)
forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> SMPClientVar
-> IO (Either SMPClientError SMPClient)
connectClient SMPClientAgent p
ca SMPServer
srv SMPClientVar
v IO (Either SMPClientError SMPClient)
-> (IOException -> IO (Either SMPClientError SMPClient))
-> IO (Either SMPClientError SMPClient)
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`E.catch` (Either SMPClientError SMPClient
-> IO (Either SMPClientError SMPClient)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SMPClientError SMPClient
-> IO (Either SMPClientError SMPClient))
-> (IOException -> Either SMPClientError SMPClient)
-> IOException
-> IO (Either SMPClientError SMPClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientError -> Either SMPClientError SMPClient
forall a b. a -> Either a b
Left (SMPClientError -> Either SMPClientError SMPClient)
-> (IOException -> SMPClientError)
-> IOException
-> Either SMPClientError SMPClient
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOException -> SMPClientError
forall err. IOException -> ProtocolClientError err
PCEIOError)
case Either SMPClientError SMPClient
r of
Right SMPClient
smp -> do
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logInfo (Text -> IO ()) -> (ByteString -> Text) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
decodeUtf8 (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString
"Agent connected to " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> SMPServer -> ByteString
showServer SMPServer
srv
let !owned :: Bool
owned = SMPClientAgent p -> SMPServer -> Bool
forall (p :: Party). SMPClientAgent p -> SMPServer -> Bool
isOwnServer SMPClientAgent p
ca SMPServer
srv
!c :: (Bool, SMPClient)
c = (Bool
owned, SMPClient
smp)
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
-> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (SMPClientVar
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. SessionVar a -> TMVar a
sessionVar SMPClientVar
v) ((Bool, SMPClient)
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
forall a b. b -> Either a b
Right (Bool, SMPClient)
c)
ByteString
-> (Bool, SMPClient) -> TMap ByteString (Bool, SMPClient) -> STM ()
forall k a. Ord k => k -> a -> TMap k a -> STM ()
TM.insert (THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (THandleParams SMPVersion 'TClient -> ByteString)
-> THandleParams SMPVersion 'TClient -> ByteString
forall a b. (a -> b) -> a -> b
$ SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams SMPClient
smp) (Bool, SMPClient)
c TMap ByteString (Bool, SMPClient)
smpSessions
let serviceId_ :: Maybe ServiceId
serviceId_ = (\THClientService {ServiceId
serviceId :: ServiceId
$sel:serviceId:THClientService :: forall k. THClientService' k -> ServiceId
serviceId} -> ServiceId
serviceId) (THClientService' PrivateKeyEd25519 -> ServiceId)
-> Maybe (THClientService' PrivateKeyEd25519) -> Maybe ServiceId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SMPClient -> Maybe (THClientService' PrivateKeyEd25519)
smpClientService SMPClient
smp
SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ()) -> SMPClientAgentEvent -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPServer -> Maybe ServiceId -> SMPClientAgentEvent
CAConnected SMPServer
srv Maybe ServiceId
serviceId_
Either SMPClientError (Bool, SMPClient)
-> IO (Either SMPClientError (Bool, SMPClient))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SMPClientError (Bool, SMPClient)
-> IO (Either SMPClientError (Bool, SMPClient)))
-> Either SMPClientError (Bool, SMPClient)
-> IO (Either SMPClientError (Bool, SMPClient))
forall a b. (a -> b) -> a -> b
$ (Bool, SMPClient) -> Either SMPClientError (Bool, SMPClient)
forall a b. b -> Either a b
Right (Bool, SMPClient)
c
Left SMPClientError
e -> do
let ei :: NominalDiffTime
ei = SMPClientAgentConfig -> NominalDiffTime
persistErrorInterval SMPClientAgentConfig
agentCfg
if NominalDiffTime
ei NominalDiffTime -> NominalDiffTime -> Bool
forall a. Eq a => a -> a -> Bool
== NominalDiffTime
0
then STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
-> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (SMPClientVar
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. SessionVar a -> TMVar a
sessionVar SMPClientVar
v) ((SMPClientError, Maybe UTCTime)
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
forall a b. a -> Either a b
Left (SMPClientError
e, Maybe UTCTime
forall a. Maybe a
Nothing))
SMPClientVar -> SMPServer -> TMap SMPServer SMPClientVar -> STM ()
forall k a.
Ord k =>
SessionVar a -> k -> TMap k (SessionVar a) -> STM ()
removeSessVar SMPClientVar
v SMPServer
srv TMap SMPServer SMPClientVar
smpClients
else do
UTCTime
ts <- NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
ei (UTCTime -> UTCTime) -> IO UTCTime -> IO UTCTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UTCTime -> IO UTCTime
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
-> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (SMPClientVar
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. SessionVar a -> TMVar a
sessionVar SMPClientVar
v) ((SMPClientError, Maybe UTCTime)
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
forall a b. a -> Either a b
Left (SMPClientError
e, UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
ts))
SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv
Either SMPClientError (Bool, SMPClient)
-> IO (Either SMPClientError (Bool, SMPClient))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SMPClientError (Bool, SMPClient)
-> IO (Either SMPClientError (Bool, SMPClient)))
-> Either SMPClientError (Bool, SMPClient)
-> IO (Either SMPClientError (Bool, SMPClient))
forall a b. (a -> b) -> a -> b
$ SMPClientError -> Either SMPClientError (Bool, SMPClient)
forall a b. a -> Either a b
Left SMPClientError
e
isOwnServer :: SMPClientAgent p -> SMPServer -> OwnServer
isOwnServer :: forall (p :: Party). SMPClientAgent p -> SMPServer -> Bool
isOwnServer SMPClientAgent {SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg} ProtocolServer {NonEmpty TransportHost
host :: NonEmpty TransportHost
$sel:host:ProtocolServer :: forall (p :: ProtocolType).
ProtocolServer p -> NonEmpty TransportHost
host} =
let srv :: ByteString
srv = TransportHost -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode (TransportHost -> ByteString) -> TransportHost -> ByteString
forall a b. (a -> b) -> a -> b
$ NonEmpty TransportHost -> TransportHost
forall a. NonEmpty a -> a
L.head NonEmpty TransportHost
host
in (ByteString -> Bool) -> [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (\ByteString
s -> ByteString
s ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
srv Bool -> Bool -> Bool
|| Char -> ByteString -> ByteString
B.cons Char
'.' ByteString
s ByteString -> ByteString -> Bool
`B.isSuffixOf` ByteString
srv) (SMPClientAgentConfig -> [ByteString]
ownServerDomains SMPClientAgentConfig
agentCfg)
connectClient :: SMPClientAgent p -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient)
connectClient :: forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> SMPClientVar
-> IO (Either SMPClientError SMPClient)
connectClient ca :: SMPClientAgent p
ca@SMPClientAgent {SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg, TMap SMPServer SMPClientVar
$sel:smpClients:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients :: TMap SMPServer SMPClientVar
smpClients, TMap ByteString (Bool, SMPClient)
$sel:smpSessions:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap ByteString (Bool, SMPClient)
smpSessions :: TMap ByteString (Bool, SMPClient)
smpSessions, TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
$sel:msgQ:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p
-> TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
msgQ, TVar ChaChaDRG
$sel:randomDrg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> TVar ChaChaDRG
randomDrg :: TVar ChaChaDRG
randomDrg, UTCTime
$sel:startedAt:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> UTCTime
startedAt :: UTCTime
startedAt} SMPServer
srv SMPClientVar
v =
TVar ChaChaDRG
-> NetworkRequestMode
-> TransportSession BrokerMsg
-> ProtocolClientConfig SMPVersion
-> [HostName]
-> Maybe
(TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg))
-> UTCTime
-> (SMPClient -> IO ())
-> IO (Either SMPClientError SMPClient)
forall v err msg.
Protocol v err msg =>
TVar ChaChaDRG
-> NetworkRequestMode
-> TransportSession msg
-> ProtocolClientConfig v
-> [HostName]
-> Maybe (TBQueue (ServerTransmissionBatch v err msg))
-> UTCTime
-> (ProtocolClient v err msg -> IO ())
-> IO (Either (ProtocolClientError err) (ProtocolClient v err msg))
getProtocolClient TVar ChaChaDRG
randomDrg NetworkRequestMode
NRMBackground (Int64
1, ProtocolServer (ProtoType BrokerMsg)
SMPServer
srv, Maybe ByteString
forall a. Maybe a
Nothing) (SMPClientAgentConfig -> ProtocolClientConfig SMPVersion
smpCfg SMPClientAgentConfig
agentCfg) [] (TBQueue
((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
ByteString,
NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg))
-> Maybe
(TBQueue
((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
ByteString,
NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg)))
forall a. a -> Maybe a
Just TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
TBQueue
((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
ByteString,
NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg))
msgQ) UTCTime
startedAt SMPClient -> IO ()
clientDisconnected
where
clientDisconnected :: SMPClient -> IO ()
clientDisconnected :: SMPClient -> IO ()
clientDisconnected SMPClient
smp = do
SMPClient
-> IO
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
removeClientAndSubs SMPClient
smp IO
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> ((Maybe (ServiceId, Int64),
Maybe (Map ServiceId APrivateAuthKey))
-> IO ())
-> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO ()
serverDown
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logInfo (Text -> IO ()) -> (ByteString -> Text) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
decodeUtf8 (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString
"Agent disconnected from " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> SMPServer -> ByteString
showServer SMPServer
srv
removeClientAndSubs :: SMPClient -> IO (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey))
removeClientAndSubs :: SMPClient
-> IO
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
removeClientAndSubs SMPClient
smp = do
Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))
sVar_ <- SMPServer
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> IO (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO SMPServer
srv (TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> IO (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))))
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> IO (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs SMPClientAgent p
ca
Maybe (TMap ServiceId (ByteString, APrivateAuthKey))
qVar_ <- SMPServer
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> IO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO SMPServer
srv (TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> IO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey))))
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> IO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs SMPClientAgent p
ca
STM
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey)))
-> STM
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall a b. (a -> b) -> a -> b
$ do
ByteString -> TMap ByteString (Bool, SMPClient) -> STM ()
forall k a. Ord k => k -> TMap k a -> STM ()
TM.delete ByteString
sessId TMap ByteString (Bool, SMPClient)
smpSessions
SMPClientVar -> SMPServer -> TMap SMPServer SMPClientVar -> STM ()
forall k a.
Ord k =>
SessionVar a -> k -> TMap k (SessionVar a) -> STM ()
removeSessVar SMPClientVar
v SMPServer
srv TMap SMPServer SMPClientVar
smpClients
Maybe (ServiceId, Int64)
sSub <- Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> STM (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))
sVar_ STM (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
-> (TVar (Maybe ((ServiceId, Int64), ByteString))
-> STM (Maybe (ServiceId, Int64)))
-> STM (Maybe (ServiceId, Int64))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= TVar (Maybe ((ServiceId, Int64), ByteString))
-> STM (Maybe (ServiceId, Int64))
updateServiceSub
Maybe (Map ServiceId APrivateAuthKey)
qSubs <- Maybe (TMap ServiceId (ByteString, APrivateAuthKey))
-> STM (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (TMap ServiceId (ByteString, APrivateAuthKey))
qVar_ STM (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
-> (TMap ServiceId (ByteString, APrivateAuthKey)
-> STM (Maybe (Map ServiceId APrivateAuthKey)))
-> STM (Maybe (Map ServiceId APrivateAuthKey))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= TMap ServiceId (ByteString, APrivateAuthKey)
-> STM (Maybe (Map ServiceId APrivateAuthKey))
updateQueueSubs
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> STM
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ServiceId, Int64)
sSub, Maybe (Map ServiceId APrivateAuthKey)
qSubs)
where
sessId :: ByteString
sessId = THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (THandleParams SMPVersion 'TClient -> ByteString)
-> THandleParams SMPVersion 'TClient -> ByteString
forall a b. (a -> b) -> a -> b
$ SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams SMPClient
smp
updateServiceSub :: TVar (Maybe ((ServiceId, Int64), ByteString))
-> STM (Maybe (ServiceId, Int64))
updateServiceSub TVar (Maybe ((ServiceId, Int64), ByteString))
sVar = do
Maybe (ServiceId, Int64)
serviceSub_ <- TVar (Maybe ((ServiceId, Int64), ByteString))
-> (Maybe ((ServiceId, Int64), ByteString)
-> (Maybe (ServiceId, Int64),
Maybe ((ServiceId, Int64), ByteString)))
-> STM (Maybe (ServiceId, Int64))
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar (Maybe ((ServiceId, Int64), ByteString))
sVar ((Maybe ((ServiceId, Int64), ByteString)
-> (Maybe (ServiceId, Int64),
Maybe ((ServiceId, Int64), ByteString)))
-> STM (Maybe (ServiceId, Int64)))
-> (Maybe ((ServiceId, Int64), ByteString)
-> (Maybe (ServiceId, Int64),
Maybe ((ServiceId, Int64), ByteString)))
-> STM (Maybe (ServiceId, Int64))
forall a b. (a -> b) -> a -> b
$ \case
Just ((ServiceId, Int64)
serviceSub, ByteString
sessId') | ByteString
sessId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
sessId' -> ((ServiceId, Int64) -> Maybe (ServiceId, Int64)
forall a. a -> Maybe a
Just (ServiceId, Int64)
serviceSub, Maybe ((ServiceId, Int64), ByteString)
forall a. Maybe a
Nothing)
Maybe ((ServiceId, Int64), ByteString)
s -> (Maybe (ServiceId, Int64)
forall a. Maybe a
Nothing, Maybe ((ServiceId, Int64), ByteString)
s)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (ServiceId, Int64) -> Bool
forall a. Maybe a -> Bool
isJust Maybe (ServiceId, Int64)
serviceSub_) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
forall (p :: Party).
SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub SMPClientAgent p
ca SMPServer
srv Maybe (ServiceId, Int64)
serviceSub_
Maybe (ServiceId, Int64) -> STM (Maybe (ServiceId, Int64))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ServiceId, Int64)
serviceSub_
updateQueueSubs :: TMap ServiceId (ByteString, APrivateAuthKey)
-> STM (Maybe (Map ServiceId APrivateAuthKey))
updateQueueSubs TMap ServiceId (ByteString, APrivateAuthKey)
qVar = do
Map ServiceId APrivateAuthKey
subs <- ((ByteString, APrivateAuthKey) -> APrivateAuthKey)
-> Map ServiceId (ByteString, APrivateAuthKey)
-> Map ServiceId APrivateAuthKey
forall a b k. (a -> b) -> Map k a -> Map k b
M.map (ByteString, APrivateAuthKey) -> APrivateAuthKey
forall a b. (a, b) -> b
snd (Map ServiceId (ByteString, APrivateAuthKey)
-> Map ServiceId APrivateAuthKey)
-> STM (Map ServiceId (ByteString, APrivateAuthKey))
-> STM (Map ServiceId APrivateAuthKey)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap ServiceId (ByteString, APrivateAuthKey)
-> (Map ServiceId (ByteString, APrivateAuthKey)
-> (Map ServiceId (ByteString, APrivateAuthKey),
Map ServiceId (ByteString, APrivateAuthKey)))
-> STM (Map ServiceId (ByteString, APrivateAuthKey))
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TMap ServiceId (ByteString, APrivateAuthKey)
qVar (((ByteString, APrivateAuthKey) -> Bool)
-> Map ServiceId (ByteString, APrivateAuthKey)
-> (Map ServiceId (ByteString, APrivateAuthKey),
Map ServiceId (ByteString, APrivateAuthKey))
forall a k. (a -> Bool) -> Map k a -> (Map k a, Map k a)
M.partition ((ByteString
sessId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
==) (ByteString -> Bool)
-> ((ByteString, APrivateAuthKey) -> ByteString)
-> (ByteString, APrivateAuthKey)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString, APrivateAuthKey) -> ByteString
forall a b. (a, b) -> a
fst))
if Map ServiceId APrivateAuthKey -> Bool
forall k a. Map k a -> Bool
M.null Map ServiceId APrivateAuthKey
subs
then Maybe (Map ServiceId APrivateAuthKey)
-> STM (Maybe (Map ServiceId APrivateAuthKey))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Map ServiceId APrivateAuthKey)
forall a. Maybe a
Nothing
else Map ServiceId APrivateAuthKey
-> Maybe (Map ServiceId APrivateAuthKey)
forall a. a -> Maybe a
Just Map ServiceId APrivateAuthKey
subs Maybe (Map ServiceId APrivateAuthKey)
-> STM () -> STM (Maybe (Map ServiceId APrivateAuthKey))
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> SMPServer -> Map ServiceId APrivateAuthKey -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> Map ServiceId s -> STM ()
addSubs_ (SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs SMPClientAgent p
ca) SMPServer
srv Map ServiceId APrivateAuthKey
subs
serverDown :: (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey)) -> IO ()
serverDown :: (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO ()
serverDown (Maybe (ServiceId, Int64)
sSub, Maybe (Map ServiceId APrivateAuthKey)
qSubs) = do
((ServiceId, Int64) -> IO ()) -> Maybe (ServiceId, Int64) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ())
-> ((ServiceId, Int64) -> SMPClientAgentEvent)
-> (ServiceId, Int64)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPServer -> (ServiceId, Int64) -> SMPClientAgentEvent
CAServiceDisconnected SMPServer
srv) Maybe (ServiceId, Int64)
sSub
let qIds :: Maybe (NonEmpty ServiceId)
qIds = [ServiceId] -> Maybe (NonEmpty ServiceId)
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty ([ServiceId] -> Maybe (NonEmpty ServiceId))
-> (Map ServiceId APrivateAuthKey -> [ServiceId])
-> Map ServiceId APrivateAuthKey
-> Maybe (NonEmpty ServiceId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map ServiceId APrivateAuthKey -> [ServiceId]
forall k a. Map k a -> [k]
M.keys (Map ServiceId APrivateAuthKey -> Maybe (NonEmpty ServiceId))
-> Maybe (Map ServiceId APrivateAuthKey)
-> Maybe (NonEmpty ServiceId)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe (Map ServiceId APrivateAuthKey)
qSubs
(NonEmpty ServiceId -> IO ())
-> Maybe (NonEmpty ServiceId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ())
-> (NonEmpty ServiceId -> SMPClientAgentEvent)
-> NonEmpty ServiceId
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPServer -> NonEmpty ServiceId -> SMPClientAgentEvent
CADisconnected SMPServer
srv) Maybe (NonEmpty ServiceId)
qIds
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (ServiceId, Int64) -> Bool
forall a. Maybe a -> Bool
isJust Maybe (ServiceId, Int64)
sSub Bool -> Bool -> Bool
|| Maybe (NonEmpty ServiceId) -> Bool
forall a. Maybe a -> Bool
isJust Maybe (NonEmpty ServiceId)
qIds) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv
reconnectClient :: SMPClientAgent p -> SMPServer -> IO ()
reconnectClient :: forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient ca :: SMPClientAgent p
ca@SMPClientAgent {TVar Bool
$sel:active:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> TVar Bool
active :: TVar Bool
active, SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg, TMap SMPServer (SessionVar (Async ()))
$sel:smpSubWorkers:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (SessionVar (Async ()))
smpSubWorkers :: TMap SMPServer (SessionVar (Async ()))
smpSubWorkers, TVar Int
$sel:workerSeq:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> TVar Int
workerSeq :: TVar Int
workerSeq} SMPServer
srv = do
UTCTime
ts <- IO UTCTime
getCurrentTime
IO Bool -> IO () -> IO ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (TVar Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
active) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM
(Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
-> IO
(Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (UTCTime
-> STM
(Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
getWorkerVar UTCTime
ts) IO (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
-> (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ())))
-> IO ())
-> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either (SessionVar (Async ())) (SessionVar (Async ())) -> IO ())
-> Maybe (Either (SessionVar (Async ())) (SessionVar (Async ())))
-> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((SessionVar (Async ()) -> IO ())
-> (SessionVar (Async ()) -> IO ())
-> Either (SessionVar (Async ())) (SessionVar (Async ()))
-> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SessionVar (Async ()) -> IO ()
newSubWorker (\SessionVar (Async ())
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))
where
getWorkerVar :: UTCTime
-> STM
(Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
getWorkerVar UTCTime
ts =
STM Bool
-> STM
(Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
-> STM
(Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
-> STM
(Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
((Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> Bool
forall {a} {k} {a}. (Maybe a, Maybe (Map k a)) -> Bool
noPending ((Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> Bool)
-> STM
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> STM Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall a. SMPServer -> TMap SMPServer a -> STM (Maybe a))
-> (forall a. TVar a -> STM a)
-> STM
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall (m :: * -> *).
Monad m =>
(forall a. SMPServer -> TMap SMPServer a -> m (Maybe a))
-> (forall a. TVar a -> m a)
-> m (Maybe (ServiceId, Int64),
Maybe (Map ServiceId APrivateAuthKey))
getPending SMPServer -> TMap SMPServer a -> STM (Maybe a)
forall a. SMPServer -> TMap SMPServer a -> STM (Maybe a)
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup TVar a -> STM a
forall a. TVar a -> STM a
readTVar)
(Maybe (Either (SessionVar (Async ())) (SessionVar (Async ())))
-> STM
(Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Either (SessionVar (Async ())) (SessionVar (Async ())))
forall a. Maybe a
Nothing)
(Either (SessionVar (Async ())) (SessionVar (Async ()))
-> Maybe (Either (SessionVar (Async ())) (SessionVar (Async ())))
forall a. a -> Maybe a
Just (Either (SessionVar (Async ())) (SessionVar (Async ()))
-> Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
-> STM (Either (SessionVar (Async ())) (SessionVar (Async ())))
-> STM
(Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int
-> SMPServer
-> TMap SMPServer (SessionVar (Async ()))
-> UTCTime
-> STM (Either (SessionVar (Async ())) (SessionVar (Async ())))
forall k a.
Ord k =>
TVar Int
-> k
-> TMap k (SessionVar a)
-> UTCTime
-> STM (Either (SessionVar a) (SessionVar a))
getSessVar TVar Int
workerSeq SMPServer
srv TMap SMPServer (SessionVar (Async ()))
smpSubWorkers UTCTime
ts)
newSubWorker :: SessionVar (Async ()) -> IO ()
newSubWorker :: SessionVar (Async ()) -> IO ()
newSubWorker SessionVar (Async ())
v = do
Async ()
a <- IO () -> IO (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either SomeException ()) -> IO ())
-> IO (Either SomeException ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
E.tryAny (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ SessionVar (Async ()) -> IO ()
runSubWorker SessionVar (Async ())
v
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (Async ()) -> Async () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (SessionVar (Async ()) -> TMVar (Async ())
forall a. SessionVar a -> TMVar a
sessionVar SessionVar (Async ())
v) Async ()
a
runSubWorker :: SessionVar (Async ()) -> IO ()
runSubWorker SessionVar (Async ())
v =
RetryInterval -> (Int64 -> IO () -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
RetryInterval -> (Int64 -> m a -> m a) -> m a
withRetryInterval (SMPClientAgentConfig -> RetryInterval
reconnectInterval SMPClientAgentConfig
agentCfg) ((Int64 -> IO () -> IO ()) -> IO ())
-> (Int64 -> IO () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int64
_ IO ()
loop -> do
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
subs <- STM
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey)))
-> STM
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall a b. (a -> b) -> a -> b
$ do
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
s <- (forall a. SMPServer -> TMap SMPServer a -> STM (Maybe a))
-> (forall a. TVar a -> STM a)
-> STM
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall (m :: * -> *).
Monad m =>
(forall a. SMPServer -> TMap SMPServer a -> m (Maybe a))
-> (forall a. TVar a -> m a)
-> m (Maybe (ServiceId, Int64),
Maybe (Map ServiceId APrivateAuthKey))
getPending SMPServer -> TMap SMPServer a -> STM (Maybe a)
forall a. SMPServer -> TMap SMPServer a -> STM (Maybe a)
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup TVar a -> STM a
forall a. TVar a -> STM a
readTVar
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ((Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> Bool
forall {a} {k} {a}. (Maybe a, Maybe (Map k a)) -> Bool
noPending (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
s) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ SessionVar (Async ()) -> STM ()
cleanup SessionVar (Async ())
v
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> STM
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
s
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ((Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> Bool
forall {a} {k} {a}. (Maybe a, Maybe (Map k a)) -> Bool
noPending (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
subs) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO () -> IO ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (TVar Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
active) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO (Maybe (Either SMPClientError ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe (Either SMPClientError ())) -> IO ())
-> IO (Maybe (Either SMPClientError ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ NetworkTimeout -> NetworkRequestMode -> Int
netTimeoutInt NetworkTimeout
tcpConnectTimeout NetworkRequestMode
NRMBackground Int
-> IO (Either SMPClientError ())
-> IO (Maybe (Either SMPClientError ()))
forall a. Int -> IO a -> IO (Maybe a)
`timeout` ExceptT SMPClientError IO () -> IO (Either SMPClientError ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SMPClientAgent p
-> SMPServer
-> (Maybe (ServiceId, Int64),
Maybe (Map ServiceId APrivateAuthKey))
-> ExceptT SMPClientError IO ()
forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> (Maybe (ServiceId, Int64),
Maybe (Map ServiceId APrivateAuthKey))
-> ExceptT SMPClientError IO ()
reconnectSMPClient SMPClientAgent p
ca SMPServer
srv (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
subs)
IO ()
loop
ProtocolClientConfig {$sel:networkConfig:ProtocolClientConfig :: forall v. ProtocolClientConfig v -> NetworkConfig
networkConfig = NetworkConfig {NetworkTimeout
$sel:tcpConnectTimeout:NetworkConfig :: NetworkConfig -> NetworkTimeout
tcpConnectTimeout :: NetworkTimeout
tcpConnectTimeout}} = SMPClientAgentConfig -> ProtocolClientConfig SMPVersion
smpCfg SMPClientAgentConfig
agentCfg
noPending :: (Maybe a, Maybe (Map k a)) -> Bool
noPending (Maybe a
sSub, Maybe (Map k a)
qSubs) = Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing Maybe a
sSub Bool -> Bool -> Bool
&& Bool -> (Map k a -> Bool) -> Maybe (Map k a) -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
True Map k a -> Bool
forall k a. Map k a -> Bool
M.null Maybe (Map k a)
qSubs
getPending :: Monad m => (forall a. SMPServer -> TMap SMPServer a -> m (Maybe a)) -> (forall a. TVar a -> m a) -> m (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey))
getPending :: forall (m :: * -> *).
Monad m =>
(forall a. SMPServer -> TMap SMPServer a -> m (Maybe a))
-> (forall a. TVar a -> m a)
-> m (Maybe (ServiceId, Int64),
Maybe (Map ServiceId APrivateAuthKey))
getPending forall a. SMPServer -> TMap SMPServer a -> m (Maybe a)
lkup forall a. TVar a -> m a
rd = do
Maybe (ServiceId, Int64)
sSub <- SMPServer
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
-> m (Maybe (TVar (Maybe (ServiceId, Int64))))
forall a. SMPServer -> TMap SMPServer a -> m (Maybe a)
lkup SMPServer
srv (SMPClientAgent p
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs SMPClientAgent p
ca) m (Maybe (TVar (Maybe (ServiceId, Int64))))
-> (TVar (Maybe (ServiceId, Int64))
-> m (Maybe (ServiceId, Int64)))
-> m (Maybe (ServiceId, Int64))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= TVar (Maybe (ServiceId, Int64)) -> m (Maybe (ServiceId, Int64))
forall a. TVar a -> m a
rd
Maybe (Map ServiceId APrivateAuthKey)
qSubs <- SMPServer
-> TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> m (Maybe (TMap ServiceId APrivateAuthKey))
forall a. SMPServer -> TMap SMPServer a -> m (Maybe a)
lkup SMPServer
srv (SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs SMPClientAgent p
ca) m (Maybe (TMap ServiceId APrivateAuthKey))
-> (Maybe (TMap ServiceId APrivateAuthKey)
-> m (Maybe (Map ServiceId APrivateAuthKey)))
-> m (Maybe (Map ServiceId APrivateAuthKey))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (TMap ServiceId APrivateAuthKey
-> m (Map ServiceId APrivateAuthKey))
-> Maybe (TMap ServiceId APrivateAuthKey)
-> m (Maybe (Map ServiceId APrivateAuthKey))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM TMap ServiceId APrivateAuthKey -> m (Map ServiceId APrivateAuthKey)
forall a. TVar a -> m a
rd
(Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> m (Maybe (ServiceId, Int64),
Maybe (Map ServiceId APrivateAuthKey))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ServiceId, Int64)
sSub, Maybe (Map ServiceId APrivateAuthKey)
qSubs)
cleanup :: SessionVar (Async ()) -> STM ()
cleanup :: SessionVar (Async ()) -> STM ()
cleanup SessionVar (Async ())
v = do
STM Bool -> STM () -> STM ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (TMVar (Async ()) -> STM Bool
forall a. TMVar a -> STM Bool
isEmptyTMVar (TMVar (Async ()) -> STM Bool) -> TMVar (Async ()) -> STM Bool
forall a b. (a -> b) -> a -> b
$ SessionVar (Async ()) -> TMVar (Async ())
forall a. SessionVar a -> TMVar a
sessionVar SessionVar (Async ())
v) STM ()
forall a. STM a
retry
SessionVar (Async ())
-> SMPServer -> TMap SMPServer (SessionVar (Async ())) -> STM ()
forall k a.
Ord k =>
SessionVar a -> k -> TMap k (SessionVar a) -> STM ()
removeSessVar SessionVar (Async ())
v SMPServer
srv TMap SMPServer (SessionVar (Async ()))
smpSubWorkers
reconnectSMPClient :: forall p. SMPClientAgent p -> SMPServer -> (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey)) -> ExceptT SMPClientError IO ()
reconnectSMPClient :: forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> (Maybe (ServiceId, Int64),
Maybe (Map ServiceId APrivateAuthKey))
-> ExceptT SMPClientError IO ()
reconnectSMPClient ca :: SMPClientAgent p
ca@SMPClientAgent {SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg, SParty p
$sel:agentParty:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SParty p
agentParty :: SParty p
agentParty} SMPServer
srv (Maybe (ServiceId, Int64)
sSub_, Maybe (Map ServiceId APrivateAuthKey)
qSubs_) =
SMPClientAgent p
-> SMPServer
-> (SMPClient -> ExceptT SMPClientError IO ())
-> ExceptT SMPClientError IO ()
forall (p :: Party) a.
SMPClientAgent p
-> SMPServer
-> (SMPClient -> ExceptT SMPClientError IO a)
-> ExceptT SMPClientError IO a
withSMP SMPClientAgent p
ca SMPServer
srv ((SMPClient -> ExceptT SMPClientError IO ())
-> ExceptT SMPClientError IO ())
-> (SMPClient -> ExceptT SMPClientError IO ())
-> ExceptT SMPClientError IO ()
forall a b. (a -> b) -> a -> b
$ \SMPClient
smp -> IO () -> ExceptT SMPClientError IO ()
forall a. IO a -> ExceptT SMPClientError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT SMPClientError IO ())
-> IO () -> ExceptT SMPClientError IO ()
forall a b. (a -> b) -> a -> b
$ case SParty p -> Maybe (Dict (PartyI p, ServiceParty p))
forall (p :: Party).
SParty p -> Maybe (Dict (PartyI p, ServiceParty p))
serviceParty SParty p
agentParty of
Just Dict (PartyI p, ServiceParty p)
Dict -> (PartyI p, ServiceParty p) => SMPClient -> IO ()
SMPClient -> IO ()
resubscribe SMPClient
smp
Maybe (Dict (PartyI p, ServiceParty p))
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where
resubscribe :: (PartyI p, ServiceParty p) => SMPClient -> IO ()
resubscribe :: (PartyI p, ServiceParty p) => SMPClient -> IO ()
resubscribe SMPClient
smp = do
((ServiceId, Int64) -> IO ()) -> Maybe (ServiceId, Int64) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SMPClientAgent p
-> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClientAgent p
-> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
smpSubscribeService SMPClientAgent p
ca SMPClient
smp SMPServer
srv) Maybe (ServiceId, Int64)
sSub_
Maybe (Map ServiceId APrivateAuthKey)
-> (Map ServiceId APrivateAuthKey -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (Map ServiceId APrivateAuthKey)
qSubs_ ((Map ServiceId APrivateAuthKey -> IO ()) -> IO ())
-> (Map ServiceId APrivateAuthKey -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Map ServiceId APrivateAuthKey
qSubs -> do
Maybe (Map ServiceId (ByteString, APrivateAuthKey))
currSubs_ <- (TMap ServiceId (ByteString, APrivateAuthKey)
-> IO (Map ServiceId (ByteString, APrivateAuthKey)))
-> Maybe (TMap ServiceId (ByteString, APrivateAuthKey))
-> IO (Maybe (Map ServiceId (ByteString, APrivateAuthKey)))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM TMap ServiceId (ByteString, APrivateAuthKey)
-> IO (Map ServiceId (ByteString, APrivateAuthKey))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey))
-> IO (Maybe (Map ServiceId (ByteString, APrivateAuthKey))))
-> IO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
-> IO (Maybe (Map ServiceId (ByteString, APrivateAuthKey)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< SMPServer
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> IO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO SMPServer
srv (SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs SMPClientAgent p
ca)
let [(ServiceId, APrivateAuthKey)]
qSubs' :: [(QueueId, C.APrivateAuthKey)] =
([(ServiceId, APrivateAuthKey)] -> [(ServiceId, APrivateAuthKey)])
-> (Map ServiceId (ByteString, APrivateAuthKey)
-> [(ServiceId, APrivateAuthKey)]
-> [(ServiceId, APrivateAuthKey)])
-> Maybe (Map ServiceId (ByteString, APrivateAuthKey))
-> [(ServiceId, APrivateAuthKey)]
-> [(ServiceId, APrivateAuthKey)]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [(ServiceId, APrivateAuthKey)] -> [(ServiceId, APrivateAuthKey)]
forall a. a -> a
id (\Map ServiceId (ByteString, APrivateAuthKey)
currSubs -> ((ServiceId, APrivateAuthKey) -> Bool)
-> [(ServiceId, APrivateAuthKey)] -> [(ServiceId, APrivateAuthKey)]
forall a. (a -> Bool) -> [a] -> [a]
filter ((ServiceId -> Map ServiceId (ByteString, APrivateAuthKey) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`M.notMember` Map ServiceId (ByteString, APrivateAuthKey)
currSubs) (ServiceId -> Bool)
-> ((ServiceId, APrivateAuthKey) -> ServiceId)
-> (ServiceId, APrivateAuthKey)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ServiceId, APrivateAuthKey) -> ServiceId
forall a b. (a, b) -> a
fst)) Maybe (Map ServiceId (ByteString, APrivateAuthKey))
currSubs_ ([(ServiceId, APrivateAuthKey)] -> [(ServiceId, APrivateAuthKey)])
-> [(ServiceId, APrivateAuthKey)] -> [(ServiceId, APrivateAuthKey)]
forall a b. (a -> b) -> a -> b
$ Map ServiceId APrivateAuthKey -> [(ServiceId, APrivateAuthKey)]
forall k a. Map k a -> [(k, a)]
M.assocs Map ServiceId APrivateAuthKey
qSubs
(NonEmpty (ServiceId, APrivateAuthKey) -> IO ())
-> [NonEmpty (ServiceId, APrivateAuthKey)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall (p :: Party).
ServiceParty p =>
SMPClientAgent p
-> SMPClient
-> SMPServer
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO ()
smpSubscribeQueues @p SMPClientAgent p
ca SMPClient
smp SMPServer
srv) ([NonEmpty (ServiceId, APrivateAuthKey)] -> IO ())
-> [NonEmpty (ServiceId, APrivateAuthKey)] -> IO ()
forall a b. (a -> b) -> a -> b
$ Int
-> [(ServiceId, APrivateAuthKey)]
-> [NonEmpty (ServiceId, APrivateAuthKey)]
forall a. Int -> [a] -> [NonEmpty a]
toChunks (SMPClientAgentConfig -> Int
agentSubsBatchSize SMPClientAgentConfig
agentCfg) [(ServiceId, APrivateAuthKey)]
qSubs'
notify :: MonadIO m => SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify :: forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca SMPClientAgentEvent
evt = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue SMPClientAgentEvent -> SMPClientAgentEvent -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (SMPClientAgent p -> TBQueue SMPClientAgentEvent
forall (p :: Party).
SMPClientAgent p -> TBQueue SMPClientAgentEvent
agentQ SMPClientAgent p
ca) SMPClientAgentEvent
evt
{-# INLINE notify #-}
getConnectedSMPServerClient :: SMPClientAgent p -> SMPServer -> IO (Maybe (Either SMPClientError (OwnServer, SMPClient)))
getConnectedSMPServerClient :: forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
getConnectedSMPServerClient SMPClientAgent {TMap SMPServer SMPClientVar
$sel:smpClients:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients :: TMap SMPServer SMPClientVar
smpClients} SMPServer
srv =
STM
(Maybe
(SMPClientVar,
Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> IO
(Maybe
(SMPClientVar,
Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (SMPServer
-> TMap SMPServer SMPClientVar -> STM (Maybe SMPClientVar)
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv TMap SMPServer SMPClientVar
smpClients STM (Maybe SMPClientVar)
-> (SMPClientVar
-> STM
(Maybe
(SMPClientVar,
Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))))
-> STM
(Maybe
(SMPClientVar,
Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \SMPClientVar
v -> (SMPClientVar
v,) (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
-> (SMPClientVar,
Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> STM
(Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> STM
(Maybe
(SMPClientVar,
Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> STM
(Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall a. TMVar a -> STM (Maybe a)
tryReadTMVar (SMPClientVar
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. SessionVar a -> TMVar a
sessionVar SMPClientVar
v))
IO
(Maybe
(SMPClientVar,
Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> ((SMPClientVar,
Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient))))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \case
(SMPClientVar
_, Right (Bool, SMPClient)
r) -> Maybe (Either SMPClientError (Bool, SMPClient))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either SMPClientError (Bool, SMPClient))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient))))
-> Maybe (Either SMPClientError (Bool, SMPClient))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall a b. (a -> b) -> a -> b
$ Either SMPClientError (Bool, SMPClient)
-> Maybe (Either SMPClientError (Bool, SMPClient))
forall a. a -> Maybe a
Just (Either SMPClientError (Bool, SMPClient)
-> Maybe (Either SMPClientError (Bool, SMPClient)))
-> Either SMPClientError (Bool, SMPClient)
-> Maybe (Either SMPClientError (Bool, SMPClient))
forall a b. (a -> b) -> a -> b
$ (Bool, SMPClient) -> Either SMPClientError (Bool, SMPClient)
forall a b. b -> Either a b
Right (Bool, SMPClient)
r
(SMPClientVar
v, Left (SMPClientError
e, Maybe UTCTime
ts_)) ->
Maybe UTCTime -> IO (Maybe UTCTime)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe UTCTime
ts_ IO (Maybe UTCTime)
-> (UTCTime
-> IO (Maybe (Either SMPClientError (Bool, SMPClient))))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \UTCTime
ts ->
IO Bool
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
((UTCTime
ts UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
<) (UTCTime -> Bool) -> IO UTCTime -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UTCTime -> IO UTCTime
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime)
(Maybe (Either SMPClientError (Bool, SMPClient))
forall a. Maybe a
Nothing Maybe (Either SMPClientError (Bool, SMPClient))
-> IO () -> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (SMPClientVar -> SMPServer -> TMap SMPServer SMPClientVar -> STM ()
forall k a.
Ord k =>
SessionVar a -> k -> TMap k (SessionVar a) -> STM ()
removeSessVar SMPClientVar
v SMPServer
srv TMap SMPServer SMPClientVar
smpClients))
(Maybe (Either SMPClientError (Bool, SMPClient))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either SMPClientError (Bool, SMPClient))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient))))
-> Maybe (Either SMPClientError (Bool, SMPClient))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall a b. (a -> b) -> a -> b
$ Either SMPClientError (Bool, SMPClient)
-> Maybe (Either SMPClientError (Bool, SMPClient))
forall a. a -> Maybe a
Just (Either SMPClientError (Bool, SMPClient)
-> Maybe (Either SMPClientError (Bool, SMPClient)))
-> Either SMPClientError (Bool, SMPClient)
-> Maybe (Either SMPClientError (Bool, SMPClient))
forall a b. (a -> b) -> a -> b
$ SMPClientError -> Either SMPClientError (Bool, SMPClient)
forall a b. a -> Either a b
Left SMPClientError
e)
lookupSMPServerClient :: SMPClientAgent p -> SessionId -> IO (Maybe (OwnServer, SMPClient))
lookupSMPServerClient :: forall (p :: Party).
SMPClientAgent p -> ByteString -> IO (Maybe (Bool, SMPClient))
lookupSMPServerClient SMPClientAgent {TMap ByteString (Bool, SMPClient)
$sel:smpSessions:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap ByteString (Bool, SMPClient)
smpSessions :: TMap ByteString (Bool, SMPClient)
smpSessions} ByteString
sessId = ByteString
-> TMap ByteString (Bool, SMPClient)
-> IO (Maybe (Bool, SMPClient))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO ByteString
sessId TMap ByteString (Bool, SMPClient)
smpSessions
closeSMPClientAgent :: SMPClientAgent p -> IO ()
closeSMPClientAgent :: forall (p :: Party). SMPClientAgent p -> IO ()
closeSMPClientAgent SMPClientAgent p
c = do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (SMPClientAgent p -> TVar Bool
forall (p :: Party). SMPClientAgent p -> TVar Bool
active SMPClientAgent p
c) Bool
False
SMPClientAgent p -> IO ()
forall (p :: Party). SMPClientAgent p -> IO ()
closeSMPServerClients SMPClientAgent p
c
STM (Map SMPServer (SessionVar (Async ())))
-> IO (Map SMPServer (SessionVar (Async ())))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TMap SMPServer (SessionVar (Async ()))
-> Map SMPServer (SessionVar (Async ()))
-> STM (Map SMPServer (SessionVar (Async ())))
forall a. TVar a -> a -> STM a
swapTVar (SMPClientAgent p -> TMap SMPServer (SessionVar (Async ()))
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (SessionVar (Async ()))
smpSubWorkers SMPClientAgent p
c) Map SMPServer (SessionVar (Async ()))
forall k a. Map k a
M.empty) IO (Map SMPServer (SessionVar (Async ())))
-> (Map SMPServer (SessionVar (Async ())) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SessionVar (Async ()) -> IO ())
-> Map SMPServer (SessionVar (Async ())) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ SessionVar (Async ()) -> IO ()
cancelReconnect
where
cancelReconnect :: SessionVar (Async ()) -> IO ()
cancelReconnect :: SessionVar (Async ()) -> IO ()
cancelReconnect SessionVar (Async ())
v = IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (Async ()) -> IO (Async ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TMVar (Async ()) -> STM (Async ())
forall a. TMVar a -> STM a
readTMVar (TMVar (Async ()) -> STM (Async ()))
-> TMVar (Async ()) -> STM (Async ())
forall a b. (a -> b) -> a -> b
$ SessionVar (Async ()) -> TMVar (Async ())
forall a. SessionVar a -> TMVar a
sessionVar SessionVar (Async ())
v) IO (Async ()) -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Async () -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel
closeSMPServerClients :: SMPClientAgent p -> IO ()
closeSMPServerClients :: forall (p :: Party). SMPClientAgent p -> IO ()
closeSMPServerClients SMPClientAgent p
c = STM (Map SMPServer SMPClientVar) -> IO (Map SMPServer SMPClientVar)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (SMPClientAgent p -> TMap SMPServer SMPClientVar
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients SMPClientAgent p
c TMap SMPServer SMPClientVar
-> Map SMPServer SMPClientVar -> STM (Map SMPServer SMPClientVar)
forall a. TVar a -> a -> STM a
`swapTVar` Map SMPServer SMPClientVar
forall k a. Map k a
M.empty) IO (Map SMPServer SMPClientVar)
-> (Map SMPServer SMPClientVar -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SMPClientVar -> IO ThreadId)
-> Map SMPServer SMPClientVar -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId)
-> (SMPClientVar -> IO ()) -> SMPClientVar -> IO ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientVar -> IO ()
forall {a} {a} {v} {err} {msg}.
SessionVar (Either a (a, ProtocolClient v err msg)) -> IO ()
closeClient)
where
closeClient :: SessionVar (Either a (a, ProtocolClient v err msg)) -> IO ()
closeClient SessionVar (Either a (a, ProtocolClient v err msg))
v =
STM (Either a (a, ProtocolClient v err msg))
-> IO (Either a (a, ProtocolClient v err msg))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TMVar (Either a (a, ProtocolClient v err msg))
-> STM (Either a (a, ProtocolClient v err msg))
forall a. TMVar a -> STM a
readTMVar (TMVar (Either a (a, ProtocolClient v err msg))
-> STM (Either a (a, ProtocolClient v err msg)))
-> TMVar (Either a (a, ProtocolClient v err msg))
-> STM (Either a (a, ProtocolClient v err msg))
forall a b. (a -> b) -> a -> b
$ SessionVar (Either a (a, ProtocolClient v err msg))
-> TMVar (Either a (a, ProtocolClient v err msg))
forall a. SessionVar a -> TMVar a
sessionVar SessionVar (Either a (a, ProtocolClient v err msg))
v) IO (Either a (a, ProtocolClient v err msg))
-> (Either a (a, ProtocolClient v err msg) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right (a
_, ProtocolClient v err msg
smp) -> ProtocolClient v err msg -> IO ()
forall v err msg. ProtocolClient v err msg -> IO ()
closeProtocolClient ProtocolClient v err msg
smp IO () -> IO () -> IO ()
forall a. IO a -> IO a -> IO a
`catchAll_` () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Either a (a, ProtocolClient v err msg)
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
cancelActions :: Foldable f => TVar (f (Async ())) -> IO ()
cancelActions :: forall (f :: * -> *). Foldable f => TVar (f (Async ())) -> IO ()
cancelActions TVar (f (Async ()))
as = TVar (f (Async ())) -> IO (f (Async ()))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar (f (Async ()))
as IO (f (Async ())) -> (f (Async ()) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Async () -> IO ()) -> f (Async ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async () -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel
withSMP :: SMPClientAgent p -> SMPServer -> (SMPClient -> ExceptT SMPClientError IO a) -> ExceptT SMPClientError IO a
withSMP :: forall (p :: Party) a.
SMPClientAgent p
-> SMPServer
-> (SMPClient -> ExceptT SMPClientError IO a)
-> ExceptT SMPClientError IO a
withSMP SMPClientAgent p
ca SMPServer
srv SMPClient -> ExceptT SMPClientError IO a
action = (SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' SMPClientAgent p
ca SMPServer
srv ExceptT SMPClientError IO SMPClient
-> (SMPClient -> ExceptT SMPClientError IO a)
-> ExceptT SMPClientError IO a
forall a b.
ExceptT SMPClientError IO a
-> (a -> ExceptT SMPClientError IO b)
-> ExceptT SMPClientError IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SMPClient -> ExceptT SMPClientError IO a
action) ExceptT SMPClientError IO a
-> (SMPClientError -> ExceptT SMPClientError IO a)
-> ExceptT SMPClientError IO a
forall (m :: * -> *) e a e'.
Monad m =>
ExceptT e m a -> (e -> ExceptT e' m a) -> ExceptT e' m a
`catchE` SMPClientError -> ExceptT SMPClientError IO a
forall a. SMPClientError -> ExceptT SMPClientError IO a
logSMPError
where
logSMPError :: SMPClientError -> ExceptT SMPClientError IO a
logSMPError :: forall a. SMPClientError -> ExceptT SMPClientError IO a
logSMPError SMPClientError
e = do
Text -> ExceptT SMPClientError IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logInfo (Text -> ExceptT SMPClientError IO ())
-> Text -> ExceptT SMPClientError IO ()
forall a b. (a -> b) -> a -> b
$ Text
"SMP error (" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ByteString -> Text
safeDecodeUtf8 (SMPServer -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode SMPServer
srv) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"): " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SMPClientError -> Text
forall a. Show a => a -> Text
tshow SMPClientError
e
SMPClientError -> ExceptT SMPClientError IO a
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE SMPClientError
e
subscribeQueuesNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> NonEmpty (NotifierId, NtfPrivateAuthKey) -> IO ()
subscribeQueuesNtfs :: SMPClientAgent 'NotifierService
-> SMPServer -> NonEmpty (ServiceId, APrivateAuthKey) -> IO ()
subscribeQueuesNtfs = SMPClientAgent 'NotifierService
-> SMPServer -> NonEmpty (ServiceId, APrivateAuthKey) -> IO ()
forall (p :: Party).
ServiceParty p =>
SMPClientAgent p
-> SMPServer -> NonEmpty (ServiceId, APrivateAuthKey) -> IO ()
subscribeQueues_
{-# INLINE subscribeQueuesNtfs #-}
subscribeQueues_ :: ServiceParty p => SMPClientAgent p -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
subscribeQueues_ :: forall (p :: Party).
ServiceParty p =>
SMPClientAgent p
-> SMPServer -> NonEmpty (ServiceId, APrivateAuthKey) -> IO ()
subscribeQueues_ SMPClientAgent p
ca SMPServer
srv NonEmpty (ServiceId, APrivateAuthKey)
subs = do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p
-> SMPServer -> [(ServiceId, APrivateAuthKey)] -> STM ()
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> [(ServiceId, APrivateAuthKey)] -> STM ()
addPendingSubs SMPClientAgent p
ca SMPServer
srv ([(ServiceId, APrivateAuthKey)] -> STM ())
-> [(ServiceId, APrivateAuthKey)] -> STM ()
forall a b. (a -> b) -> a -> b
$ NonEmpty (ServiceId, APrivateAuthKey)
-> [(ServiceId, APrivateAuthKey)]
forall a. NonEmpty a -> [a]
L.toList NonEmpty (ServiceId, APrivateAuthKey)
subs
ExceptT SMPClientError IO SMPClient
-> IO (Either SMPClientError SMPClient)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' SMPClientAgent p
ca SMPServer
srv) IO (Either SMPClientError SMPClient)
-> (Either SMPClientError SMPClient -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right SMPClient
smp -> SMPClientAgent p
-> SMPClient
-> SMPServer
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO ()
forall (p :: Party).
ServiceParty p =>
SMPClientAgent p
-> SMPClient
-> SMPServer
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO ()
smpSubscribeQueues SMPClientAgent p
ca SMPClient
smp SMPServer
srv NonEmpty (ServiceId, APrivateAuthKey)
subs
Left SMPClientError
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
smpSubscribeQueues :: ServiceParty p => SMPClientAgent p -> SMPClient -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
smpSubscribeQueues :: forall (p :: Party).
ServiceParty p =>
SMPClientAgent p
-> SMPClient
-> SMPServer
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO ()
smpSubscribeQueues SMPClientAgent p
ca SMPClient
smp SMPServer
srv NonEmpty (ServiceId, APrivateAuthKey)
subs = do
NonEmpty (Either SMPClientError (Maybe ServiceId))
rs <- case SMPClientAgent p -> SParty p
forall (p :: Party). SMPClientAgent p -> SParty p
agentParty SMPClientAgent p
ca of
SParty p
SRecipientService -> SMPClient
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO (NonEmpty (Either SMPClientError (Maybe ServiceId)))
subscribeSMPQueues SMPClient
smp NonEmpty (ServiceId, APrivateAuthKey)
subs
SParty p
SNotifierService -> SMPClient
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO (NonEmpty (Either SMPClientError (Maybe ServiceId)))
subscribeSMPQueuesNtfs SMPClient
smp NonEmpty (ServiceId, APrivateAuthKey)
subs
Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
rs' <-
STM
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
-> IO
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
-> IO
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])))
-> STM
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
-> IO
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
forall a b. (a -> b) -> a -> b
$
STM Bool
-> STM
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
-> STM
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
-> STM
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
(SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
forall (p :: Party).
SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
activeClientSession SMPClientAgent p
ca SMPClient
smp SMPServer
srv)
((Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
-> Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
forall a. a -> Maybe a
Just ((Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
-> Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
-> STM
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
-> STM
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty (Either SMPClientError (Maybe ServiceId))
-> STM
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
processSubscriptions NonEmpty (Either SMPClientError (Maybe ServiceId))
rs)
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
-> STM
(Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
forall a. Maybe a
Nothing)
case Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
rs' of
Just (Bool
tempErrs, [(ServiceId, SMPClientError)]
finalErrs, ([(ServiceId, (ByteString, APrivateAuthKey))]
qOks, [ServiceId]
sQs), [ServiceId]
_) -> do
(SMPServer -> NonEmpty ServiceId -> SMPClientAgentEvent)
-> [ServiceId] -> IO ()
forall a.
(SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
notify_ (SMPServer
-> Maybe ServiceId -> NonEmpty ServiceId -> SMPClientAgentEvent
`CASubscribed` Maybe ServiceId
forall a. Maybe a
Nothing) ([ServiceId] -> IO ()) -> [ServiceId] -> IO ()
forall a b. (a -> b) -> a -> b
$ ((ServiceId, (ByteString, APrivateAuthKey)) -> ServiceId)
-> [(ServiceId, (ByteString, APrivateAuthKey))] -> [ServiceId]
forall a b. (a -> b) -> [a] -> [b]
map (ServiceId, (ByteString, APrivateAuthKey)) -> ServiceId
forall a b. (a, b) -> a
fst [(ServiceId, (ByteString, APrivateAuthKey))]
qOks
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe ServiceId -> Bool
forall a. Maybe a -> Bool
isJust Maybe ServiceId
smpServiceId) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (SMPServer -> NonEmpty ServiceId -> SMPClientAgentEvent)
-> [ServiceId] -> IO ()
forall a.
(SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
notify_ (SMPServer
-> Maybe ServiceId -> NonEmpty ServiceId -> SMPClientAgentEvent
`CASubscribed` Maybe ServiceId
smpServiceId) [ServiceId]
sQs
(SMPServer
-> NonEmpty (ServiceId, SMPClientError) -> SMPClientAgentEvent)
-> [(ServiceId, SMPClientError)] -> IO ()
forall a.
(SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
notify_ SMPServer
-> NonEmpty (ServiceId, SMPClientError) -> SMPClientAgentEvent
CASubError [(ServiceId, SMPClientError)]
finalErrs
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
tempErrs (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv
Maybe
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
Nothing -> SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv
where
processSubscriptions :: NonEmpty (Either SMPClientError (Maybe ServiceId)) -> STM (Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId])
processSubscriptions :: NonEmpty (Either SMPClientError (Maybe ServiceId))
-> STM
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
processSubscriptions NonEmpty (Either SMPClientError (Maybe ServiceId))
rs = do
Map ServiceId APrivateAuthKey
pending <- STM (Map ServiceId APrivateAuthKey)
-> (TMap ServiceId APrivateAuthKey
-> STM (Map ServiceId APrivateAuthKey))
-> Maybe (TMap ServiceId APrivateAuthKey)
-> STM (Map ServiceId APrivateAuthKey)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Map ServiceId APrivateAuthKey
-> STM (Map ServiceId APrivateAuthKey)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Map ServiceId APrivateAuthKey
forall k a. Map k a
M.empty) TMap ServiceId APrivateAuthKey
-> STM (Map ServiceId APrivateAuthKey)
forall a. TVar a -> STM a
readTVar (Maybe (TMap ServiceId APrivateAuthKey)
-> STM (Map ServiceId APrivateAuthKey))
-> STM (Maybe (TMap ServiceId APrivateAuthKey))
-> STM (Map ServiceId APrivateAuthKey)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< SMPServer
-> TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> STM (Maybe (TMap ServiceId APrivateAuthKey))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv (SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs SMPClientAgent p
ca)
let acc :: (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
acc@(Bool
_, [(ServiceId, SMPClientError)]
_, ([(ServiceId, (ByteString, APrivateAuthKey))]
qOks, [ServiceId]
sQs), [ServiceId]
notPending) = (((ServiceId, APrivateAuthKey),
Either SMPClientError (Maybe ServiceId))
-> (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
-> (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId]))
-> (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
-> NonEmpty
((ServiceId, APrivateAuthKey),
Either SMPClientError (Maybe ServiceId))
-> (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
forall a b. (a -> b -> b) -> b -> NonEmpty a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (Map ServiceId APrivateAuthKey
-> ((ServiceId, APrivateAuthKey),
Either SMPClientError (Maybe ServiceId))
-> (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
-> (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
groupSub Map ServiceId APrivateAuthKey
pending) (Bool
False, [], ([], []), []) (NonEmpty (ServiceId, APrivateAuthKey)
-> NonEmpty (Either SMPClientError (Maybe ServiceId))
-> NonEmpty
((ServiceId, APrivateAuthKey),
Either SMPClientError (Maybe ServiceId))
forall a b. NonEmpty a -> NonEmpty b -> NonEmpty (a, b)
L.zip NonEmpty (ServiceId, APrivateAuthKey)
subs NonEmpty (Either SMPClientError (Maybe ServiceId))
rs)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(ServiceId, (ByteString, APrivateAuthKey))] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(ServiceId, (ByteString, APrivateAuthKey))]
qOks) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p
-> SMPServer
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> STM ()
forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> STM ()
addActiveSubs SMPClientAgent p
ca SMPServer
srv [(ServiceId, (ByteString, APrivateAuthKey))]
qOks
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([ServiceId] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ServiceId]
sQs) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe ServiceId -> (ServiceId -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ServiceId
smpServiceId ((ServiceId -> STM ()) -> STM ())
-> (ServiceId -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ServiceId
serviceId ->
SMPClientAgent p
-> SMPServer -> ((ServiceId, Int64), ByteString) -> STM ()
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ((ServiceId, Int64), ByteString) -> STM ()
updateActiveServiceSub SMPClientAgent p
ca SMPServer
srv ((ServiceId
serviceId, Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ [ServiceId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ServiceId]
sQs), ByteString
sessId)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([ServiceId] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ServiceId]
notPending) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> [ServiceId] -> STM ()
forall (p :: Party).
SMPClientAgent p -> SMPServer -> [ServiceId] -> STM ()
removePendingSubs SMPClientAgent p
ca SMPServer
srv [ServiceId]
notPending
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
-> STM
(Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
acc
sessId :: ByteString
sessId = THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (THandleParams SMPVersion 'TClient -> ByteString)
-> THandleParams SMPVersion 'TClient -> ByteString
forall a b. (a -> b) -> a -> b
$ SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams SMPClient
smp
smpServiceId :: Maybe ServiceId
smpServiceId = (\THClientService {ServiceId
$sel:serviceId:THClientService :: forall k. THClientService' k -> ServiceId
serviceId :: ServiceId
serviceId} -> ServiceId
serviceId) (THClientService' PrivateKeyEd25519 -> ServiceId)
-> Maybe (THClientService' PrivateKeyEd25519) -> Maybe ServiceId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SMPClient -> Maybe (THClientService' PrivateKeyEd25519)
smpClientService SMPClient
smp
groupSub ::
Map QueueId C.APrivateAuthKey ->
((QueueId, C.APrivateAuthKey), Either SMPClientError (Maybe ServiceId)) ->
(Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId]) ->
(Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId])
groupSub :: Map ServiceId APrivateAuthKey
-> ((ServiceId, APrivateAuthKey),
Either SMPClientError (Maybe ServiceId))
-> (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
-> (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
groupSub Map ServiceId APrivateAuthKey
pending ((ServiceId
qId, APrivateAuthKey
pk), Either SMPClientError (Maybe ServiceId)
r) acc :: (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
acc@(!Bool
tempErrs, [(ServiceId, SMPClientError)]
finalErrs, oks :: ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId])
oks@([(ServiceId, (ByteString, APrivateAuthKey))]
qOks, [ServiceId]
sQs), [ServiceId]
notPending) = case Either SMPClientError (Maybe ServiceId)
r of
Right Maybe ServiceId
serviceId_
| ServiceId -> Map ServiceId APrivateAuthKey -> Bool
forall k a. Ord k => k -> Map k a -> Bool
M.member ServiceId
qId Map ServiceId APrivateAuthKey
pending ->
let oks' :: ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId])
oks' = case (Maybe ServiceId
smpServiceId, Maybe ServiceId
serviceId_) of
(Just ServiceId
sId, Just ServiceId
sId') | ServiceId
sId ServiceId -> ServiceId -> Bool
forall a. Eq a => a -> a -> Bool
== ServiceId
sId' -> ([(ServiceId, (ByteString, APrivateAuthKey))]
qOks, ServiceId
qId ServiceId -> [ServiceId] -> [ServiceId]
forall a. a -> [a] -> [a]
: [ServiceId]
sQs)
(Maybe ServiceId, Maybe ServiceId)
_ -> ((ServiceId
qId, (ByteString
sessId, APrivateAuthKey
pk)) (ServiceId, (ByteString, APrivateAuthKey))
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> [(ServiceId, (ByteString, APrivateAuthKey))]
forall a. a -> [a] -> [a]
: [(ServiceId, (ByteString, APrivateAuthKey))]
qOks, [ServiceId]
sQs)
in (Bool
tempErrs, [(ServiceId, SMPClientError)]
finalErrs, ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId])
oks', ServiceId
qId ServiceId -> [ServiceId] -> [ServiceId]
forall a. a -> [a] -> [a]
: [ServiceId]
notPending)
| Bool
otherwise -> (Bool, [(ServiceId, SMPClientError)],
([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
[ServiceId])
acc
Left SMPClientError
e
| SMPClientError -> Bool
forall err. ProtocolClientError err -> Bool
temporaryClientError SMPClientError
e -> (Bool
True, [(ServiceId, SMPClientError)]
finalErrs, ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId])
oks, [ServiceId]
notPending)
| Bool
otherwise -> (Bool
tempErrs, (ServiceId
qId, SMPClientError
e) (ServiceId, SMPClientError)
-> [(ServiceId, SMPClientError)] -> [(ServiceId, SMPClientError)]
forall a. a -> [a] -> [a]
: [(ServiceId, SMPClientError)]
finalErrs, ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId])
oks, ServiceId
qId ServiceId -> [ServiceId] -> [ServiceId]
forall a. a -> [a] -> [a]
: [ServiceId]
notPending)
notify_ :: (SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
notify_ :: forall a.
(SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
notify_ SMPServer -> NonEmpty a -> SMPClientAgentEvent
evt [a]
qs = (NonEmpty a -> IO ()) -> Maybe (NonEmpty a) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ())
-> (NonEmpty a -> SMPClientAgentEvent) -> NonEmpty a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPServer -> NonEmpty a -> SMPClientAgentEvent
evt SMPServer
srv) (Maybe (NonEmpty a) -> IO ()) -> Maybe (NonEmpty a) -> IO ()
forall a b. (a -> b) -> a -> b
$ [a] -> Maybe (NonEmpty a)
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [a]
qs
subscribeServiceNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeServiceNtfs :: SMPClientAgent 'NotifierService
-> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeServiceNtfs = SMPClientAgent 'NotifierService
-> SMPServer -> (ServiceId, Int64) -> IO ()
forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClientAgent p -> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeService_
{-# INLINE subscribeServiceNtfs #-}
subscribeService_ :: (PartyI p, ServiceParty p) => SMPClientAgent p -> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeService_ :: forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClientAgent p -> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeService_ SMPClientAgent p
ca SMPServer
srv (ServiceId, Int64)
serviceSub = do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
forall (p :: Party).
SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub SMPClientAgent p
ca SMPServer
srv (Maybe (ServiceId, Int64) -> STM ())
-> Maybe (ServiceId, Int64) -> STM ()
forall a b. (a -> b) -> a -> b
$ (ServiceId, Int64) -> Maybe (ServiceId, Int64)
forall a. a -> Maybe a
Just (ServiceId, Int64)
serviceSub
ExceptT SMPClientError IO SMPClient
-> IO (Either SMPClientError SMPClient)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' SMPClientAgent p
ca SMPServer
srv) IO (Either SMPClientError SMPClient)
-> (Either SMPClientError SMPClient -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right SMPClient
smp -> SMPClientAgent p
-> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClientAgent p
-> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
smpSubscribeService SMPClientAgent p
ca SMPClient
smp SMPServer
srv (ServiceId, Int64)
serviceSub
Left SMPClientError
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
smpSubscribeService :: (PartyI p, ServiceParty p) => SMPClientAgent p -> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
smpSubscribeService :: forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClientAgent p
-> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
smpSubscribeService SMPClientAgent p
ca SMPClient
smp SMPServer
srv serviceSub :: (ServiceId, Int64)
serviceSub@(ServiceId
serviceId, Int64
_) = case SMPClient -> Maybe (THClientService' PrivateKeyEd25519)
smpClientService SMPClient
smp of
Just THClientService' PrivateKeyEd25519
service | THClientService' PrivateKeyEd25519 -> Bool
serviceAvailable THClientService' PrivateKeyEd25519
service -> IO ()
subscribe
Maybe (THClientService' PrivateKeyEd25519)
_ -> IO ()
notifyUnavailable
where
subscribe :: IO ()
subscribe = do
Either SMPClientError Int64
r <- ExceptT SMPClientError IO Int64 -> IO (Either SMPClientError Int64)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SMPClientError IO Int64
-> IO (Either SMPClientError Int64))
-> ExceptT SMPClientError IO Int64
-> IO (Either SMPClientError Int64)
forall a b. (a -> b) -> a -> b
$ SMPClient -> SParty p -> ExceptT SMPClientError IO Int64
forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClient -> SParty p -> ExceptT SMPClientError IO Int64
subscribeService SMPClient
smp (SParty p -> ExceptT SMPClientError IO Int64)
-> SParty p -> ExceptT SMPClientError IO Int64
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SParty p
forall (p :: Party). SMPClientAgent p -> SParty p
agentParty SMPClientAgent p
ca
Bool
ok <-
STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$
STM Bool -> STM Bool -> STM Bool -> STM Bool
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
(SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
forall (p :: Party).
SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
activeClientSession SMPClientAgent p
ca SMPClient
smp SMPServer
srv)
(Bool
True Bool -> STM () -> STM Bool
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Either SMPClientError Int64 -> STM ()
processSubscription Either SMPClientError Int64
r)
(Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False)
if Bool
ok
then case Either SMPClientError Int64
r of
Right Int64
n -> SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ()) -> SMPClientAgentEvent -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPServer -> (ServiceId, Int64) -> Int64 -> SMPClientAgentEvent
CAServiceSubscribed SMPServer
srv (ServiceId, Int64)
serviceSub Int64
n
Left SMPClientError
e
| SMPClientError -> Bool
smpClientServiceError SMPClientError
e -> IO ()
notifyUnavailable
| SMPClientError -> Bool
forall err. ProtocolClientError err -> Bool
temporaryClientError SMPClientError
e -> SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv
| Bool
otherwise -> SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ()) -> SMPClientAgentEvent -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPServer
-> (ServiceId, Int64) -> SMPClientError -> SMPClientAgentEvent
CAServiceSubError SMPServer
srv (ServiceId, Int64)
serviceSub SMPClientError
e
else SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv
processSubscription :: Either SMPClientError Int64 -> STM ()
processSubscription = (Int64 -> STM ()) -> Either SMPClientError Int64 -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((Int64 -> STM ()) -> Either SMPClientError Int64 -> STM ())
-> (Int64 -> STM ()) -> Either SMPClientError Int64 -> STM ()
forall a b. (a -> b) -> a -> b
$ \Int64
n -> do
SMPClientAgent p
-> SMPServer -> Maybe ((ServiceId, Int64), ByteString) -> STM ()
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> Maybe ((ServiceId, Int64), ByteString) -> STM ()
setActiveServiceSub SMPClientAgent p
ca SMPServer
srv (Maybe ((ServiceId, Int64), ByteString) -> STM ())
-> Maybe ((ServiceId, Int64), ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$ ((ServiceId, Int64), ByteString)
-> Maybe ((ServiceId, Int64), ByteString)
forall a. a -> Maybe a
Just ((ServiceId
serviceId, Int64
n), ByteString
sessId)
SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
forall (p :: Party).
SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub SMPClientAgent p
ca SMPServer
srv Maybe (ServiceId, Int64)
forall a. Maybe a
Nothing
serviceAvailable :: THClientService' PrivateKeyEd25519 -> Bool
serviceAvailable THClientService {SMPServiceRole
serviceRole :: SMPServiceRole
$sel:serviceRole:THClientService :: forall k. THClientService' k -> SMPServiceRole
serviceRole, $sel:serviceId:THClientService :: forall k. THClientService' k -> ServiceId
serviceId = ServiceId
serviceId'} =
ServiceId
serviceId ServiceId -> ServiceId -> Bool
forall a. Eq a => a -> a -> Bool
== ServiceId
serviceId' Bool -> Bool -> Bool
&& SParty p -> SMPServiceRole
forall (p :: Party). ServiceParty p => SParty p -> SMPServiceRole
partyServiceRole (SMPClientAgent p -> SParty p
forall (p :: Party). SMPClientAgent p -> SParty p
agentParty SMPClientAgent p
ca) SMPServiceRole -> SMPServiceRole -> Bool
forall a. Eq a => a -> a -> Bool
== SMPServiceRole
serviceRole
notifyUnavailable :: IO ()
notifyUnavailable = do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
forall (p :: Party).
SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub SMPClientAgent p
ca SMPServer
srv Maybe (ServiceId, Int64)
forall a. Maybe a
Nothing
SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ()) -> SMPClientAgentEvent -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPServer -> (ServiceId, Int64) -> SMPClientAgentEvent
CAServiceUnavailable SMPServer
srv (ServiceId, Int64)
serviceSub
sessId :: ByteString
sessId = THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (THandleParams SMPVersion 'TClient -> ByteString)
-> THandleParams SMPVersion 'TClient -> ByteString
forall a b. (a -> b) -> a -> b
$ SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams SMPClient
smp
activeClientSession' :: SMPClientAgent p -> SessionId -> SMPServer -> STM Bool
activeClientSession' :: forall (p :: Party).
SMPClientAgent p -> ByteString -> SMPServer -> STM Bool
activeClientSession' SMPClientAgent p
ca ByteString
sessId SMPServer
srv = Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> Bool
sameSess (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> Bool)
-> STM
(Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> STM Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SMPServer
-> TMap SMPServer SMPClientVar
-> STM
(Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall k a. Ord k => k -> TMap k (SessionVar a) -> STM (Maybe a)
tryReadSessVar SMPServer
srv (SMPClientAgent p -> TMap SMPServer SMPClientVar
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients SMPClientAgent p
ca)
where
sameSess :: Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> Bool
sameSess = \case
Just (Right (Bool
_, SMPClient
smp')) -> ByteString
sessId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams SMPClient
smp')
Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
_ -> Bool
False
activeClientSession :: SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
activeClientSession :: forall (p :: Party).
SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
activeClientSession SMPClientAgent p
ca = SMPClientAgent p -> ByteString -> SMPServer -> STM Bool
forall (p :: Party).
SMPClientAgent p -> ByteString -> SMPServer -> STM Bool
activeClientSession' SMPClientAgent p
ca (ByteString -> SMPServer -> STM Bool)
-> (SMPClient -> ByteString) -> SMPClient -> SMPServer -> STM Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (THandleParams SMPVersion 'TClient -> ByteString)
-> (SMPClient -> THandleParams SMPVersion 'TClient)
-> SMPClient
-> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams
showServer :: SMPServer -> ByteString
showServer :: SMPServer -> ByteString
showServer ProtocolServer {NonEmpty TransportHost
$sel:host:ProtocolServer :: forall (p :: ProtocolType).
ProtocolServer p -> NonEmpty TransportHost
host :: NonEmpty TransportHost
host, HostName
port :: HostName
$sel:port:ProtocolServer :: forall (p :: ProtocolType). ProtocolServer p -> HostName
port} =
NonEmpty TransportHost -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode NonEmpty TransportHost
host ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> HostName -> ByteString
B.pack (if HostName -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null HostName
port then HostName
"" else Char
':' Char -> HostName -> HostName
forall a. a -> [a] -> [a]
: HostName
port)
addActiveSubs :: SMPClientAgent p -> SMPServer -> [(QueueId, (SessionId, C.APrivateAuthKey))] -> STM ()
addActiveSubs :: forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> STM ()
addActiveSubs = TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> SMPServer
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [(ServiceId, s)] -> STM ()
addSubsList_ (TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> SMPServer
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> STM ())
-> (SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey)))
-> SMPClientAgent p
-> SMPServer
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs
{-# INLINE addActiveSubs #-}
addPendingSubs :: SMPClientAgent p -> SMPServer -> [(QueueId, C.APrivateAuthKey)] -> STM ()
addPendingSubs :: forall (p :: Party).
SMPClientAgent p
-> SMPServer -> [(ServiceId, APrivateAuthKey)] -> STM ()
addPendingSubs = TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> SMPServer -> [(ServiceId, APrivateAuthKey)] -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [(ServiceId, s)] -> STM ()
addSubsList_ (TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> SMPServer -> [(ServiceId, APrivateAuthKey)] -> STM ())
-> (SMPClientAgent p
-> TMap SMPServer (TMap ServiceId APrivateAuthKey))
-> SMPClientAgent p
-> SMPServer
-> [(ServiceId, APrivateAuthKey)]
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs
{-# INLINE addPendingSubs #-}
addSubsList_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> [(QueueId, s)] -> STM ()
addSubsList_ :: forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [(ServiceId, s)] -> STM ()
addSubsList_ TMap SMPServer (TMap ServiceId s)
subs SMPServer
srv [(ServiceId, s)]
ss = TMap SMPServer (TMap ServiceId s)
-> SMPServer -> Map ServiceId s -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> Map ServiceId s -> STM ()
addSubs_ TMap SMPServer (TMap ServiceId s)
subs SMPServer
srv (Map ServiceId s -> STM ()) -> Map ServiceId s -> STM ()
forall a b. (a -> b) -> a -> b
$ [(ServiceId, s)] -> Map ServiceId s
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [(ServiceId, s)]
ss
addSubs_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> Map QueueId s -> STM ()
addSubs_ :: forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> Map ServiceId s -> STM ()
addSubs_ TMap SMPServer (TMap ServiceId s)
subs SMPServer
srv Map ServiceId s
ss =
SMPServer
-> TMap SMPServer (TMap ServiceId s)
-> STM (Maybe (TMap ServiceId s))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv TMap SMPServer (TMap ServiceId s)
subs STM (Maybe (TMap ServiceId s))
-> (Maybe (TMap ServiceId s) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just TMap ServiceId s
m -> Map ServiceId s -> TMap ServiceId s -> STM ()
forall k a. Ord k => Map k a -> TMap k a -> STM ()
TM.union Map ServiceId s
ss TMap ServiceId s
m
Maybe (TMap ServiceId s)
_ -> SMPServer
-> STM (TMap ServiceId s)
-> TMap SMPServer (TMap ServiceId s)
-> STM ()
forall k a. Ord k => k -> STM a -> TMap k a -> STM ()
TM.insertM SMPServer
srv (Map ServiceId s -> STM (TMap ServiceId s)
forall a. a -> STM (TVar a)
newTVar Map ServiceId s
ss) TMap SMPServer (TMap ServiceId s)
subs
setActiveServiceSub :: SMPClientAgent p -> SMPServer -> Maybe ((ServiceId, Int64), SessionId) -> STM ()
setActiveServiceSub :: forall (p :: Party).
SMPClientAgent p
-> SMPServer -> Maybe ((ServiceId, Int64), ByteString) -> STM ()
setActiveServiceSub = (SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString))))
-> SMPClientAgent p
-> SMPServer
-> Maybe ((ServiceId, Int64), ByteString)
-> STM ()
forall (p :: Party) sub.
(SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub)))
-> SMPClientAgent p -> SMPServer -> Maybe sub -> STM ()
setServiceSub_ SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs
{-# INLINE setActiveServiceSub #-}
setPendingServiceSub :: SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub :: forall (p :: Party).
SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub = (SMPClientAgent p
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64))))
-> SMPClientAgent p
-> SMPServer
-> Maybe (ServiceId, Int64)
-> STM ()
forall (p :: Party) sub.
(SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub)))
-> SMPClientAgent p -> SMPServer -> Maybe sub -> STM ()
setServiceSub_ SMPClientAgent p
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs
{-# INLINE setPendingServiceSub #-}
setServiceSub_ ::
(SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub))) ->
SMPClientAgent p ->
SMPServer ->
Maybe sub ->
STM ()
setServiceSub_ :: forall (p :: Party) sub.
(SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub)))
-> SMPClientAgent p -> SMPServer -> Maybe sub -> STM ()
setServiceSub_ SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub))
subsSel SMPClientAgent p
ca SMPServer
srv Maybe sub
sub =
SMPServer
-> TMap SMPServer (TVar (Maybe sub))
-> STM (Maybe (TVar (Maybe sub)))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv (SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub))
subsSel SMPClientAgent p
ca) STM (Maybe (TVar (Maybe sub)))
-> (Maybe (TVar (Maybe sub)) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just TVar (Maybe sub)
v -> TVar (Maybe sub) -> Maybe sub -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe sub)
v Maybe sub
sub
Maybe (TVar (Maybe sub))
Nothing -> SMPServer
-> STM (TVar (Maybe sub))
-> TMap SMPServer (TVar (Maybe sub))
-> STM ()
forall k a. Ord k => k -> STM a -> TMap k a -> STM ()
TM.insertM SMPServer
srv (Maybe sub -> STM (TVar (Maybe sub))
forall a. a -> STM (TVar a)
newTVar Maybe sub
sub) (SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub))
subsSel SMPClientAgent p
ca)
updateActiveServiceSub :: SMPClientAgent p -> SMPServer -> ((ServiceId, Int64), SessionId) -> STM ()
updateActiveServiceSub :: forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ((ServiceId, Int64), ByteString) -> STM ()
updateActiveServiceSub SMPClientAgent p
ca SMPServer
srv sub :: ((ServiceId, Int64), ByteString)
sub@((ServiceId
serviceId', Int64
n'), ByteString
sessId') =
SMPServer
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> STM (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv (SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs SMPClientAgent p
ca) STM (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
-> (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> STM ())
-> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just TVar (Maybe ((ServiceId, Int64), ByteString))
v -> TVar (Maybe ((ServiceId, Int64), ByteString))
-> (Maybe ((ServiceId, Int64), ByteString)
-> Maybe ((ServiceId, Int64), ByteString))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Maybe ((ServiceId, Int64), ByteString))
v ((Maybe ((ServiceId, Int64), ByteString)
-> Maybe ((ServiceId, Int64), ByteString))
-> STM ())
-> (Maybe ((ServiceId, Int64), ByteString)
-> Maybe ((ServiceId, Int64), ByteString))
-> STM ()
forall a b. (a -> b) -> a -> b
$ \case
Just ((ServiceId
serviceId, Int64
n), ByteString
sessId) | ServiceId
serviceId ServiceId -> ServiceId -> Bool
forall a. Eq a => a -> a -> Bool
== ServiceId
serviceId' Bool -> Bool -> Bool
&& ByteString
sessId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
sessId' ->
((ServiceId, Int64), ByteString)
-> Maybe ((ServiceId, Int64), ByteString)
forall a. a -> Maybe a
Just ((ServiceId
serviceId, Int64
n Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
n'), ByteString
sessId)
Maybe ((ServiceId, Int64), ByteString)
_ -> ((ServiceId, Int64), ByteString)
-> Maybe ((ServiceId, Int64), ByteString)
forall a. a -> Maybe a
Just ((ServiceId, Int64), ByteString)
sub
Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))
Nothing -> SMPServer
-> STM (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> STM ()
forall k a. Ord k => k -> STM a -> TMap k a -> STM ()
TM.insertM SMPServer
srv (Maybe ((ServiceId, Int64), ByteString)
-> STM (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall a. a -> STM (TVar a)
newTVar (Maybe ((ServiceId, Int64), ByteString)
-> STM (TVar (Maybe ((ServiceId, Int64), ByteString))))
-> Maybe ((ServiceId, Int64), ByteString)
-> STM (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall a b. (a -> b) -> a -> b
$ ((ServiceId, Int64), ByteString)
-> Maybe ((ServiceId, Int64), ByteString)
forall a. a -> Maybe a
Just ((ServiceId, Int64), ByteString)
sub) (SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs SMPClientAgent p
ca)
removeActiveSub :: SMPClientAgent p -> SMPServer -> QueueId -> STM ()
removeActiveSub :: forall (p :: Party).
SMPClientAgent p -> SMPServer -> ServiceId -> STM ()
removeActiveSub = TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> SMPServer -> ServiceId -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> ServiceId -> STM ()
removeSub_ (TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> SMPServer -> ServiceId -> STM ())
-> (SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey)))
-> SMPClientAgent p
-> SMPServer
-> ServiceId
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs
{-# INLINE removeActiveSub #-}
removePendingSub :: SMPClientAgent p -> SMPServer -> QueueId -> STM ()
removePendingSub :: forall (p :: Party).
SMPClientAgent p -> SMPServer -> ServiceId -> STM ()
removePendingSub = TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> SMPServer -> ServiceId -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> ServiceId -> STM ()
removeSub_ (TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> SMPServer -> ServiceId -> STM ())
-> (SMPClientAgent p
-> TMap SMPServer (TMap ServiceId APrivateAuthKey))
-> SMPClientAgent p
-> SMPServer
-> ServiceId
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs
{-# INLINE removePendingSub #-}
removeSub_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> QueueId -> STM ()
removeSub_ :: forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> ServiceId -> STM ()
removeSub_ TMap SMPServer (TMap ServiceId s)
subs SMPServer
srv ServiceId
s = SMPServer
-> TMap SMPServer (TMap ServiceId s)
-> STM (Maybe (TMap ServiceId s))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv TMap SMPServer (TMap ServiceId s)
subs STM (Maybe (TMap ServiceId s))
-> (Maybe (TMap ServiceId s) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (TMap ServiceId s -> STM ()) -> Maybe (TMap ServiceId s) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ServiceId -> TMap ServiceId s -> STM ()
forall k a. Ord k => k -> TMap k a -> STM ()
TM.delete ServiceId
s)
removeActiveSubs :: SMPClientAgent p -> SMPServer -> [QueueId] -> STM ()
removeActiveSubs :: forall (p :: Party).
SMPClientAgent p -> SMPServer -> [ServiceId] -> STM ()
removeActiveSubs = TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> SMPServer -> [ServiceId] -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [ServiceId] -> STM ()
removeSubs_ (TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> SMPServer -> [ServiceId] -> STM ())
-> (SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey)))
-> SMPClientAgent p
-> SMPServer
-> [ServiceId]
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs
{-# INLINE removeActiveSubs #-}
removePendingSubs :: SMPClientAgent p -> SMPServer -> [QueueId] -> STM ()
removePendingSubs :: forall (p :: Party).
SMPClientAgent p -> SMPServer -> [ServiceId] -> STM ()
removePendingSubs = TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> SMPServer -> [ServiceId] -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [ServiceId] -> STM ()
removeSubs_ (TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> SMPServer -> [ServiceId] -> STM ())
-> (SMPClientAgent p
-> TMap SMPServer (TMap ServiceId APrivateAuthKey))
-> SMPClientAgent p
-> SMPServer
-> [ServiceId]
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs
{-# INLINE removePendingSubs #-}
removeSubs_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> [QueueId] -> STM ()
removeSubs_ :: forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [ServiceId] -> STM ()
removeSubs_ TMap SMPServer (TMap ServiceId s)
subs SMPServer
srv [ServiceId]
qs = SMPServer
-> TMap SMPServer (TMap ServiceId s)
-> STM (Maybe (TMap ServiceId s))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv TMap SMPServer (TMap ServiceId s)
subs STM (Maybe (TMap ServiceId s))
-> (Maybe (TMap ServiceId s) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (TMap ServiceId s -> STM ()) -> Maybe (TMap ServiceId s) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TMap ServiceId s -> (Map ServiceId s -> Map ServiceId s) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
`modifyTVar'` (Map ServiceId s -> Set ServiceId -> Map ServiceId s
forall k a. Ord k => Map k a -> Set k -> Map k a
`M.withoutKeys` [ServiceId] -> Set ServiceId
forall a. Ord a => [a] -> Set a
S.fromList [ServiceId]
qs))