{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}

module Simplex.Messaging.Client.Agent
  ( SMPClientAgent (..),
    SMPClientAgentConfig (..),
    SMPClientAgentEvent (..),
    OwnServer,
    defaultSMPClientAgentConfig,
    newSMPClientAgent,
    getSMPServerClient'',
    getConnectedSMPServerClient,
    closeSMPClientAgent,
    lookupSMPServerClient,
    isOwnServer,
    subscribeServiceNtfs,
    subscribeQueuesNtfs,
    activeClientSession',
    removeActiveSub,
    removeActiveSubs,
    removePendingSub,
    removePendingSubs,
  )
where

import Control.Concurrent (forkIO)
import Control.Concurrent.Async (Async, uninterruptibleCancel)
import Control.Concurrent.STM (retry)
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Constraint (Dict (..))
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isJust, isNothing)
import qualified Data.Set as S
import Data.Text.Encoding
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
import Numeric.Natural
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
  ( BrokerMsg,
    ErrorType,
    NotifierId,
    NtfPrivateAuthKey,
    Party (..),
    PartyI,
    ProtocolServer (..),
    QueueId,
    SMPServer,
    SParty (..),
    ServiceParty,
    serviceParty,
    partyServiceRole
  )
import Simplex.Messaging.Session
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Util (catchAll_, ifM, safeDecodeUtf8, toChunks, tshow, whenM, ($>>=), (<$$>))
import System.Timeout (timeout)
import UnliftIO (async)
import qualified UnliftIO.Exception as E
import UnliftIO.STM

type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) (OwnServer, SMPClient))

data SMPClientAgentEvent
  = CAConnected SMPServer (Maybe ServiceId)
  | CADisconnected SMPServer (NonEmpty QueueId)
  | CASubscribed SMPServer (Maybe ServiceId) (NonEmpty QueueId)
  | CASubError SMPServer (NonEmpty (QueueId, SMPClientError))
  | CAServiceDisconnected SMPServer (ServiceId, Int64)
  | CAServiceSubscribed SMPServer (ServiceId, Int64) Int64
  | CAServiceSubError SMPServer (ServiceId, Int64) SMPClientError
  -- CAServiceUnavailable is used when service ID in pending subscription is different from the current service in connection.
  -- This will require resubscribing to all queues associated with this service ID individually, creating new associations.
  -- It may happen if, for example, SMP server deletes service information (e.g. via downgrade and upgrade)
  -- and assigns different service ID to the service certificate.
  | CAServiceUnavailable SMPServer (ServiceId, Int64)

data SMPClientAgentConfig = SMPClientAgentConfig
  { SMPClientAgentConfig -> ProtocolClientConfig SMPVersion
smpCfg :: ProtocolClientConfig SMPVersion,
    SMPClientAgentConfig -> RetryInterval
reconnectInterval :: RetryInterval,
    SMPClientAgentConfig -> NominalDiffTime
persistErrorInterval :: NominalDiffTime,
    SMPClientAgentConfig -> Natural
msgQSize :: Natural,
    SMPClientAgentConfig -> Natural
agentQSize :: Natural,
    SMPClientAgentConfig -> Int
agentSubsBatchSize :: Int,
    SMPClientAgentConfig -> [ByteString]
ownServerDomains :: [ByteString]
  }

defaultSMPClientAgentConfig :: SMPClientAgentConfig
defaultSMPClientAgentConfig :: SMPClientAgentConfig
defaultSMPClientAgentConfig =
  SMPClientAgentConfig
    { $sel:smpCfg:SMPClientAgentConfig :: ProtocolClientConfig SMPVersion
smpCfg = ProtocolClientConfig SMPVersion
defaultSMPClientConfig,
      $sel:reconnectInterval:SMPClientAgentConfig :: RetryInterval
reconnectInterval =
        RetryInterval
          { initialInterval :: Int64
initialInterval = Int64
second,
            increaseAfter :: Int64
increaseAfter = Int64
10 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
second,
            maxInterval :: Int64
maxInterval = Int64
10 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
second
          },
      $sel:persistErrorInterval:SMPClientAgentConfig :: NominalDiffTime
persistErrorInterval = NominalDiffTime
30, -- seconds
      $sel:msgQSize:SMPClientAgentConfig :: Natural
msgQSize = Natural
2048,
      $sel:agentQSize:SMPClientAgentConfig :: Natural
agentQSize = Natural
2048,
      $sel:agentSubsBatchSize:SMPClientAgentConfig :: Int
agentSubsBatchSize = Int
1360,
      $sel:ownServerDomains:SMPClientAgentConfig :: [ByteString]
ownServerDomains = []
    }
  where
    second :: Int64
second = Int64
1000000

data SMPClientAgent p = SMPClientAgent
  { forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig,
    forall (p :: Party). SMPClientAgent p -> SParty p
agentParty :: SParty p,
    forall (p :: Party). SMPClientAgent p -> TVar Bool
active :: TVar Bool,
    forall (p :: Party). SMPClientAgent p -> UTCTime
startedAt :: UTCTime,
    forall (p :: Party).
SMPClientAgent p
-> TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg),
    forall (p :: Party).
SMPClientAgent p -> TBQueue SMPClientAgentEvent
agentQ :: TBQueue SMPClientAgentEvent,
    forall (p :: Party). SMPClientAgent p -> TVar ChaChaDRG
randomDrg :: TVar ChaChaDRG,
    forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients :: TMap SMPServer SMPClientVar,
    forall (p :: Party).
SMPClientAgent p -> TMap ByteString (Bool, SMPClient)
smpSessions :: TMap SessionId (OwnServer, SMPClient),
    -- Only one service subscription can exist per server with this agent.
    -- With correctly functioning SMP server, queue and service subscriptions can't be
    -- active at the same time.
    forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs :: TMap SMPServer (TVar (Maybe ((ServiceId, Int64), SessionId))),
    forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs :: TMap SMPServer (TMap QueueId (SessionId, C.APrivateAuthKey)),
    -- Pending service subscriptions can co-exist with pending queue subscriptions
    -- on the same SMP server during subscriptions being transitioned from per-queue to service.
    forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs :: TMap SMPServer (TVar (Maybe (ServiceId, Int64))),
    forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs :: TMap SMPServer (TMap QueueId C.APrivateAuthKey),
    forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (SessionVar (Async ()))
smpSubWorkers :: TMap SMPServer (SessionVar (Async ())),
    forall (p :: Party). SMPClientAgent p -> TVar Int
workerSeq :: TVar Int
  }

type OwnServer = Bool

newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> TVar ChaChaDRG -> IO (SMPClientAgent p)
newSMPClientAgent :: forall (p :: Party).
SParty p
-> SMPClientAgentConfig -> TVar ChaChaDRG -> IO (SMPClientAgent p)
newSMPClientAgent SParty p
agentParty agentCfg :: SMPClientAgentConfig
agentCfg@SMPClientAgentConfig {Natural
$sel:msgQSize:SMPClientAgentConfig :: SMPClientAgentConfig -> Natural
msgQSize :: Natural
msgQSize, Natural
$sel:agentQSize:SMPClientAgentConfig :: SMPClientAgentConfig -> Natural
agentQSize :: Natural
agentQSize} TVar ChaChaDRG
randomDrg = do
  TVar Bool
active <- Bool -> IO (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
True
  UTCTime
startedAt <- IO UTCTime
getCurrentTime
  TBQueue
  ((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
   ByteString,
   NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg))
msgQ <- Natural
-> IO
     (TBQueue
        ((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
         ByteString,
         NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg)))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
msgQSize
  TBQueue SMPClientAgentEvent
agentQ <- Natural -> IO (TBQueue SMPClientAgentEvent)
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
agentQSize
  TMap SMPServer SMPClientVar
smpClients <- IO (TMap SMPServer SMPClientVar)
forall k a. IO (TMap k a)
TM.emptyIO
  TMap ByteString (Bool, SMPClient)
smpSessions <- IO (TMap ByteString (Bool, SMPClient))
forall k a. IO (TMap k a)
TM.emptyIO
  TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs <- IO (TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString))))
forall k a. IO (TMap k a)
TM.emptyIO
  TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs <- IO (TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey)))
forall k a. IO (TMap k a)
TM.emptyIO
  TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs <- IO (TMap SMPServer (TVar (Maybe (ServiceId, Int64))))
forall k a. IO (TMap k a)
TM.emptyIO
  TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs <- IO (TMap SMPServer (TMap ServiceId APrivateAuthKey))
forall k a. IO (TMap k a)
TM.emptyIO
  TMap SMPServer (SessionVar (Async ()))
smpSubWorkers <- IO (TMap SMPServer (SessionVar (Async ())))
forall k a. IO (TMap k a)
TM.emptyIO
  TVar Int
workerSeq <- Int -> IO (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
0
  SMPClientAgent p -> IO (SMPClientAgent p)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    SMPClientAgent
      { SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg,
        SParty p
$sel:agentParty:SMPClientAgent :: SParty p
agentParty :: SParty p
agentParty,
        TVar Bool
$sel:active:SMPClientAgent :: TVar Bool
active :: TVar Bool
active,
        UTCTime
$sel:startedAt:SMPClientAgent :: UTCTime
startedAt :: UTCTime
startedAt,
        TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
TBQueue
  ((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
   ByteString,
   NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg))
$sel:msgQ:SMPClientAgent :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
msgQ :: TBQueue
  ((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
   ByteString,
   NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg))
msgQ,
        TBQueue SMPClientAgentEvent
$sel:agentQ:SMPClientAgent :: TBQueue SMPClientAgentEvent
agentQ :: TBQueue SMPClientAgentEvent
agentQ,
        TVar ChaChaDRG
$sel:randomDrg:SMPClientAgent :: TVar ChaChaDRG
randomDrg :: TVar ChaChaDRG
randomDrg,
        TMap SMPServer SMPClientVar
$sel:smpClients:SMPClientAgent :: TMap SMPServer SMPClientVar
smpClients :: TMap SMPServer SMPClientVar
smpClients,
        TMap ByteString (Bool, SMPClient)
$sel:smpSessions:SMPClientAgent :: TMap ByteString (Bool, SMPClient)
smpSessions :: TMap ByteString (Bool, SMPClient)
smpSessions,
        TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
$sel:activeServiceSubs:SMPClientAgent :: TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs :: TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs,
        TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
$sel:activeQueueSubs:SMPClientAgent :: TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs :: TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs,
        TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
$sel:pendingServiceSubs:SMPClientAgent :: TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs :: TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs,
        TMap SMPServer (TMap ServiceId APrivateAuthKey)
$sel:pendingQueueSubs:SMPClientAgent :: TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs :: TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs,
        TMap SMPServer (SessionVar (Async ()))
$sel:smpSubWorkers:SMPClientAgent :: TMap SMPServer (SessionVar (Async ()))
smpSubWorkers :: TMap SMPServer (SessionVar (Async ()))
smpSubWorkers,
        TVar Int
$sel:workerSeq:SMPClientAgent :: TVar Int
workerSeq :: TVar Int
workerSeq
      }

-- | Get or create SMP client for SMPServer
getSMPServerClient' :: SMPClientAgent p -> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' :: forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' SMPClientAgent p
ca SMPServer
srv = (Bool, SMPClient) -> SMPClient
forall a b. (a, b) -> b
snd ((Bool, SMPClient) -> SMPClient)
-> ExceptT SMPClientError IO (Bool, SMPClient)
-> ExceptT SMPClientError IO SMPClient
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO (Bool, SMPClient)
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO (Bool, SMPClient)
getSMPServerClient'' SMPClientAgent p
ca SMPServer
srv
{-# INLINE getSMPServerClient' #-}

getSMPServerClient'' :: SMPClientAgent p -> SMPServer -> ExceptT SMPClientError IO (OwnServer, SMPClient)
getSMPServerClient'' :: forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO (Bool, SMPClient)
getSMPServerClient'' ca :: SMPClientAgent p
ca@SMPClientAgent {SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg, TMap SMPServer SMPClientVar
$sel:smpClients:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients :: TMap SMPServer SMPClientVar
smpClients, TMap ByteString (Bool, SMPClient)
$sel:smpSessions:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap ByteString (Bool, SMPClient)
smpSessions :: TMap ByteString (Bool, SMPClient)
smpSessions, TVar Int
$sel:workerSeq:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> TVar Int
workerSeq :: TVar Int
workerSeq} SMPServer
srv = do
  UTCTime
ts <- IO UTCTime -> ExceptT SMPClientError IO UTCTime
forall a. IO a -> ExceptT SMPClientError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
  STM (Either SMPClientVar SMPClientVar)
-> ExceptT SMPClientError IO (Either SMPClientVar SMPClientVar)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (UTCTime -> STM (Either SMPClientVar SMPClientVar)
getClientVar UTCTime
ts) ExceptT SMPClientError IO (Either SMPClientVar SMPClientVar)
-> (Either SMPClientVar SMPClientVar
    -> ExceptT SMPClientError IO (Bool, SMPClient))
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall a b.
ExceptT SMPClientError IO a
-> (a -> ExceptT SMPClientError IO b)
-> ExceptT SMPClientError IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SMPClientVar -> ExceptT SMPClientError IO (Bool, SMPClient))
-> (SMPClientVar -> ExceptT SMPClientError IO (Bool, SMPClient))
-> Either SMPClientVar SMPClientVar
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (IO (Either SMPClientError (Bool, SMPClient))
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO (Either SMPClientError (Bool, SMPClient))
 -> ExceptT SMPClientError IO (Bool, SMPClient))
-> (SMPClientVar -> IO (Either SMPClientError (Bool, SMPClient)))
-> SMPClientVar
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientVar -> IO (Either SMPClientError (Bool, SMPClient))
newSMPClient) SMPClientVar -> ExceptT SMPClientError IO (Bool, SMPClient)
waitForSMPClient
  where
    getClientVar :: UTCTime -> STM (Either SMPClientVar SMPClientVar)
    getClientVar :: UTCTime -> STM (Either SMPClientVar SMPClientVar)
getClientVar = TVar Int
-> SMPServer
-> TMap SMPServer SMPClientVar
-> UTCTime
-> STM (Either SMPClientVar SMPClientVar)
forall k a.
Ord k =>
TVar Int
-> k
-> TMap k (SessionVar a)
-> UTCTime
-> STM (Either (SessionVar a) (SessionVar a))
getSessVar TVar Int
workerSeq SMPServer
srv TMap SMPServer SMPClientVar
smpClients

    waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO (OwnServer, SMPClient)
    waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO (Bool, SMPClient)
waitForSMPClient SMPClientVar
v = do
      let ProtocolClientConfig {$sel:networkConfig:ProtocolClientConfig :: forall v. ProtocolClientConfig v -> NetworkConfig
networkConfig = NetworkConfig {NetworkTimeout
tcpConnectTimeout :: NetworkTimeout
$sel:tcpConnectTimeout:NetworkConfig :: NetworkConfig -> NetworkTimeout
tcpConnectTimeout}} = SMPClientAgentConfig -> ProtocolClientConfig SMPVersion
smpCfg SMPClientAgentConfig
agentCfg
      Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
smpClient_ <- IO
  (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> ExceptT
     SMPClientError
     IO
     (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall a. IO a -> ExceptT SMPClientError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
 -> ExceptT
      SMPClientError
      IO
      (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))))
-> IO
     (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> ExceptT
     SMPClientError
     IO
     (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall a b. (a -> b) -> a -> b
$ NetworkTimeout -> NetworkRequestMode -> Int
netTimeoutInt NetworkTimeout
tcpConnectTimeout NetworkRequestMode
NRMBackground Int
-> IO (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> IO
     (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall a. Int -> IO a -> IO (Maybe a)
`timeout` STM (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> IO (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> STM (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. TMVar a -> STM a
readTMVar (TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
 -> STM (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> STM (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a b. (a -> b) -> a -> b
$ SMPClientVar
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. SessionVar a -> TMVar a
sessionVar SMPClientVar
v)
      case Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
smpClient_ of
        Just (Right (Bool, SMPClient)
smpClient) -> (Bool, SMPClient) -> ExceptT SMPClientError IO (Bool, SMPClient)
forall a. a -> ExceptT SMPClientError IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool, SMPClient)
smpClient
        Just (Left (SMPClientError
e, Maybe UTCTime
ts_)) -> case Maybe UTCTime
ts_ of
          Maybe UTCTime
Nothing -> SMPClientError -> ExceptT SMPClientError IO (Bool, SMPClient)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE SMPClientError
e
          Just UTCTime
ts ->
            ExceptT SMPClientError IO Bool
-> ExceptT SMPClientError IO (Bool, SMPClient)
-> ExceptT SMPClientError IO (Bool, SMPClient)
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
              ((UTCTime
ts UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
<) (UTCTime -> Bool)
-> ExceptT SMPClientError IO UTCTime
-> ExceptT SMPClientError IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UTCTime -> ExceptT SMPClientError IO UTCTime
forall a. IO a -> ExceptT SMPClientError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime)
              (STM () -> ExceptT SMPClientError IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (SMPClientVar -> SMPServer -> TMap SMPServer SMPClientVar -> STM ()
forall k a.
Ord k =>
SessionVar a -> k -> TMap k (SessionVar a) -> STM ()
removeSessVar SMPClientVar
v SMPServer
srv TMap SMPServer SMPClientVar
smpClients) ExceptT SMPClientError IO ()
-> ExceptT SMPClientError IO (Bool, SMPClient)
-> ExceptT SMPClientError IO (Bool, SMPClient)
forall a b.
ExceptT SMPClientError IO a
-> ExceptT SMPClientError IO b -> ExceptT SMPClientError IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO (Bool, SMPClient)
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO (Bool, SMPClient)
getSMPServerClient'' SMPClientAgent p
ca SMPServer
srv)
              (SMPClientError -> ExceptT SMPClientError IO (Bool, SMPClient)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE SMPClientError
e)
        Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
Nothing -> SMPClientError -> ExceptT SMPClientError IO (Bool, SMPClient)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE SMPClientError
forall err. ProtocolClientError err
PCEResponseTimeout

    newSMPClient :: SMPClientVar -> IO (Either SMPClientError (OwnServer, SMPClient))
    newSMPClient :: SMPClientVar -> IO (Either SMPClientError (Bool, SMPClient))
newSMPClient SMPClientVar
v = do
      Either SMPClientError SMPClient
r <- SMPClientAgent p
-> SMPServer
-> SMPClientVar
-> IO (Either SMPClientError SMPClient)
forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> SMPClientVar
-> IO (Either SMPClientError SMPClient)
connectClient SMPClientAgent p
ca SMPServer
srv SMPClientVar
v IO (Either SMPClientError SMPClient)
-> (IOException -> IO (Either SMPClientError SMPClient))
-> IO (Either SMPClientError SMPClient)
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`E.catch` (Either SMPClientError SMPClient
-> IO (Either SMPClientError SMPClient)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SMPClientError SMPClient
 -> IO (Either SMPClientError SMPClient))
-> (IOException -> Either SMPClientError SMPClient)
-> IOException
-> IO (Either SMPClientError SMPClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientError -> Either SMPClientError SMPClient
forall a b. a -> Either a b
Left (SMPClientError -> Either SMPClientError SMPClient)
-> (IOException -> SMPClientError)
-> IOException
-> Either SMPClientError SMPClient
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOException -> SMPClientError
forall err. IOException -> ProtocolClientError err
PCEIOError)
      case Either SMPClientError SMPClient
r of
        Right SMPClient
smp -> do
          Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logInfo (Text -> IO ()) -> (ByteString -> Text) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
decodeUtf8 (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString
"Agent connected to " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> SMPServer -> ByteString
showServer SMPServer
srv
          let !owned :: Bool
owned = SMPClientAgent p -> SMPServer -> Bool
forall (p :: Party). SMPClientAgent p -> SMPServer -> Bool
isOwnServer SMPClientAgent p
ca SMPServer
srv
              !c :: (Bool, SMPClient)
c = (Bool
owned, SMPClient
smp)
          STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
-> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (SMPClientVar
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. SessionVar a -> TMVar a
sessionVar SMPClientVar
v) ((Bool, SMPClient)
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
forall a b. b -> Either a b
Right (Bool, SMPClient)
c)
            ByteString
-> (Bool, SMPClient) -> TMap ByteString (Bool, SMPClient) -> STM ()
forall k a. Ord k => k -> a -> TMap k a -> STM ()
TM.insert (THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (THandleParams SMPVersion 'TClient -> ByteString)
-> THandleParams SMPVersion 'TClient -> ByteString
forall a b. (a -> b) -> a -> b
$ SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams SMPClient
smp) (Bool, SMPClient)
c TMap ByteString (Bool, SMPClient)
smpSessions
          let serviceId_ :: Maybe ServiceId
serviceId_ = (\THClientService {ServiceId
serviceId :: ServiceId
$sel:serviceId:THClientService :: forall k. THClientService' k -> ServiceId
serviceId} -> ServiceId
serviceId) (THClientService' PrivateKeyEd25519 -> ServiceId)
-> Maybe (THClientService' PrivateKeyEd25519) -> Maybe ServiceId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SMPClient -> Maybe (THClientService' PrivateKeyEd25519)
smpClientService SMPClient
smp
          SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ()) -> SMPClientAgentEvent -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPServer -> Maybe ServiceId -> SMPClientAgentEvent
CAConnected SMPServer
srv Maybe ServiceId
serviceId_
          Either SMPClientError (Bool, SMPClient)
-> IO (Either SMPClientError (Bool, SMPClient))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SMPClientError (Bool, SMPClient)
 -> IO (Either SMPClientError (Bool, SMPClient)))
-> Either SMPClientError (Bool, SMPClient)
-> IO (Either SMPClientError (Bool, SMPClient))
forall a b. (a -> b) -> a -> b
$ (Bool, SMPClient) -> Either SMPClientError (Bool, SMPClient)
forall a b. b -> Either a b
Right (Bool, SMPClient)
c
        Left SMPClientError
e -> do
          let ei :: NominalDiffTime
ei = SMPClientAgentConfig -> NominalDiffTime
persistErrorInterval SMPClientAgentConfig
agentCfg
          if NominalDiffTime
ei NominalDiffTime -> NominalDiffTime -> Bool
forall a. Eq a => a -> a -> Bool
== NominalDiffTime
0
            then STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
              TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
-> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (SMPClientVar
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. SessionVar a -> TMVar a
sessionVar SMPClientVar
v) ((SMPClientError, Maybe UTCTime)
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
forall a b. a -> Either a b
Left (SMPClientError
e, Maybe UTCTime
forall a. Maybe a
Nothing))
              SMPClientVar -> SMPServer -> TMap SMPServer SMPClientVar -> STM ()
forall k a.
Ord k =>
SessionVar a -> k -> TMap k (SessionVar a) -> STM ()
removeSessVar SMPClientVar
v SMPServer
srv TMap SMPServer SMPClientVar
smpClients
            else do
              UTCTime
ts <- NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
ei (UTCTime -> UTCTime) -> IO UTCTime -> IO UTCTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UTCTime -> IO UTCTime
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
              STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
-> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (SMPClientVar
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. SessionVar a -> TMVar a
sessionVar SMPClientVar
v) ((SMPClientError, Maybe UTCTime)
-> Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
forall a b. a -> Either a b
Left (SMPClientError
e, UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
ts))
          SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv
          Either SMPClientError (Bool, SMPClient)
-> IO (Either SMPClientError (Bool, SMPClient))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SMPClientError (Bool, SMPClient)
 -> IO (Either SMPClientError (Bool, SMPClient)))
-> Either SMPClientError (Bool, SMPClient)
-> IO (Either SMPClientError (Bool, SMPClient))
forall a b. (a -> b) -> a -> b
$ SMPClientError -> Either SMPClientError (Bool, SMPClient)
forall a b. a -> Either a b
Left SMPClientError
e

isOwnServer :: SMPClientAgent p -> SMPServer -> OwnServer
isOwnServer :: forall (p :: Party). SMPClientAgent p -> SMPServer -> Bool
isOwnServer SMPClientAgent {SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg} ProtocolServer {NonEmpty TransportHost
host :: NonEmpty TransportHost
$sel:host:ProtocolServer :: forall (p :: ProtocolType).
ProtocolServer p -> NonEmpty TransportHost
host} =
  let srv :: ByteString
srv = TransportHost -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode (TransportHost -> ByteString) -> TransportHost -> ByteString
forall a b. (a -> b) -> a -> b
$ NonEmpty TransportHost -> TransportHost
forall a. NonEmpty a -> a
L.head NonEmpty TransportHost
host
   in (ByteString -> Bool) -> [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (\ByteString
s -> ByteString
s ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
srv Bool -> Bool -> Bool
|| Char -> ByteString -> ByteString
B.cons Char
'.' ByteString
s ByteString -> ByteString -> Bool
`B.isSuffixOf` ByteString
srv) (SMPClientAgentConfig -> [ByteString]
ownServerDomains SMPClientAgentConfig
agentCfg)

-- | Run an SMP client for SMPClientVar
connectClient :: SMPClientAgent p -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient)
connectClient :: forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> SMPClientVar
-> IO (Either SMPClientError SMPClient)
connectClient ca :: SMPClientAgent p
ca@SMPClientAgent {SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg, TMap SMPServer SMPClientVar
$sel:smpClients:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients :: TMap SMPServer SMPClientVar
smpClients, TMap ByteString (Bool, SMPClient)
$sel:smpSessions:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap ByteString (Bool, SMPClient)
smpSessions :: TMap ByteString (Bool, SMPClient)
smpSessions, TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
$sel:msgQ:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p
-> TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
msgQ, TVar ChaChaDRG
$sel:randomDrg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> TVar ChaChaDRG
randomDrg :: TVar ChaChaDRG
randomDrg, UTCTime
$sel:startedAt:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> UTCTime
startedAt :: UTCTime
startedAt} SMPServer
srv SMPClientVar
v =
  TVar ChaChaDRG
-> NetworkRequestMode
-> TransportSession BrokerMsg
-> ProtocolClientConfig SMPVersion
-> [HostName]
-> Maybe
     (TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg))
-> UTCTime
-> (SMPClient -> IO ())
-> IO (Either SMPClientError SMPClient)
forall v err msg.
Protocol v err msg =>
TVar ChaChaDRG
-> NetworkRequestMode
-> TransportSession msg
-> ProtocolClientConfig v
-> [HostName]
-> Maybe (TBQueue (ServerTransmissionBatch v err msg))
-> UTCTime
-> (ProtocolClient v err msg -> IO ())
-> IO (Either (ProtocolClientError err) (ProtocolClient v err msg))
getProtocolClient TVar ChaChaDRG
randomDrg NetworkRequestMode
NRMBackground (Int64
1, ProtocolServer (ProtoType BrokerMsg)
SMPServer
srv, Maybe ByteString
forall a. Maybe a
Nothing) (SMPClientAgentConfig -> ProtocolClientConfig SMPVersion
smpCfg SMPClientAgentConfig
agentCfg) [] (TBQueue
  ((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
   ByteString,
   NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg))
-> Maybe
     (TBQueue
        ((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
         ByteString,
         NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg)))
forall a. a -> Maybe a
Just TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg)
TBQueue
  ((Int64, SMPServer, Maybe ByteString), Version SMPVersion,
   ByteString,
   NonEmpty (ServiceId, ServerTransmission ErrorType BrokerMsg))
msgQ) UTCTime
startedAt SMPClient -> IO ()
clientDisconnected
  where
    clientDisconnected :: SMPClient -> IO ()
    clientDisconnected :: SMPClient -> IO ()
clientDisconnected SMPClient
smp = do
      SMPClient
-> IO
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
removeClientAndSubs SMPClient
smp IO
  (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> ((Maybe (ServiceId, Int64),
     Maybe (Map ServiceId APrivateAuthKey))
    -> IO ())
-> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO ()
serverDown
      Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logInfo (Text -> IO ()) -> (ByteString -> Text) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
decodeUtf8 (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString
"Agent disconnected from " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> SMPServer -> ByteString
showServer SMPServer
srv

    removeClientAndSubs :: SMPClient -> IO (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey))
    removeClientAndSubs :: SMPClient
-> IO
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
removeClientAndSubs SMPClient
smp = do
      -- Looking up subscription vars outside of STM transaction to reduce re-evaluation.
      -- It is possible because these vars are never removed, they are only added.
      Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))
sVar_ <- SMPServer
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> IO (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO SMPServer
srv (TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
 -> IO (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))))
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> IO (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs SMPClientAgent p
ca
      Maybe (TMap ServiceId (ByteString, APrivateAuthKey))
qVar_ <- SMPServer
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> IO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO SMPServer
srv (TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
 -> IO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey))))
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> IO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs SMPClientAgent p
ca
      STM
  (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM
   (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
 -> IO
      (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey)))
-> STM
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall a b. (a -> b) -> a -> b
$ do
        ByteString -> TMap ByteString (Bool, SMPClient) -> STM ()
forall k a. Ord k => k -> TMap k a -> STM ()
TM.delete ByteString
sessId TMap ByteString (Bool, SMPClient)
smpSessions
        SMPClientVar -> SMPServer -> TMap SMPServer SMPClientVar -> STM ()
forall k a.
Ord k =>
SessionVar a -> k -> TMap k (SessionVar a) -> STM ()
removeSessVar SMPClientVar
v SMPServer
srv TMap SMPServer SMPClientVar
smpClients
        Maybe (ServiceId, Int64)
sSub <- Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> STM (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))
sVar_ STM (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
-> (TVar (Maybe ((ServiceId, Int64), ByteString))
    -> STM (Maybe (ServiceId, Int64)))
-> STM (Maybe (ServiceId, Int64))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= TVar (Maybe ((ServiceId, Int64), ByteString))
-> STM (Maybe (ServiceId, Int64))
updateServiceSub
        Maybe (Map ServiceId APrivateAuthKey)
qSubs <- Maybe (TMap ServiceId (ByteString, APrivateAuthKey))
-> STM (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (TMap ServiceId (ByteString, APrivateAuthKey))
qVar_ STM (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
-> (TMap ServiceId (ByteString, APrivateAuthKey)
    -> STM (Maybe (Map ServiceId APrivateAuthKey)))
-> STM (Maybe (Map ServiceId APrivateAuthKey))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= TMap ServiceId (ByteString, APrivateAuthKey)
-> STM (Maybe (Map ServiceId APrivateAuthKey))
updateQueueSubs
        (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> STM
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ServiceId, Int64)
sSub, Maybe (Map ServiceId APrivateAuthKey)
qSubs)
      where
        sessId :: ByteString
sessId = THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (THandleParams SMPVersion 'TClient -> ByteString)
-> THandleParams SMPVersion 'TClient -> ByteString
forall a b. (a -> b) -> a -> b
$ SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams SMPClient
smp
        updateServiceSub :: TVar (Maybe ((ServiceId, Int64), ByteString))
-> STM (Maybe (ServiceId, Int64))
updateServiceSub TVar (Maybe ((ServiceId, Int64), ByteString))
sVar = do -- (sub, sessId')
          -- We don't change active subscription in case session ID is different from disconnected client
          Maybe (ServiceId, Int64)
serviceSub_ <- TVar (Maybe ((ServiceId, Int64), ByteString))
-> (Maybe ((ServiceId, Int64), ByteString)
    -> (Maybe (ServiceId, Int64),
        Maybe ((ServiceId, Int64), ByteString)))
-> STM (Maybe (ServiceId, Int64))
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar (Maybe ((ServiceId, Int64), ByteString))
sVar ((Maybe ((ServiceId, Int64), ByteString)
  -> (Maybe (ServiceId, Int64),
      Maybe ((ServiceId, Int64), ByteString)))
 -> STM (Maybe (ServiceId, Int64)))
-> (Maybe ((ServiceId, Int64), ByteString)
    -> (Maybe (ServiceId, Int64),
        Maybe ((ServiceId, Int64), ByteString)))
-> STM (Maybe (ServiceId, Int64))
forall a b. (a -> b) -> a -> b
$ \case
            Just ((ServiceId, Int64)
serviceSub, ByteString
sessId') | ByteString
sessId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
sessId' -> ((ServiceId, Int64) -> Maybe (ServiceId, Int64)
forall a. a -> Maybe a
Just (ServiceId, Int64)
serviceSub, Maybe ((ServiceId, Int64), ByteString)
forall a. Maybe a
Nothing)
            Maybe ((ServiceId, Int64), ByteString)
s -> (Maybe (ServiceId, Int64)
forall a. Maybe a
Nothing, Maybe ((ServiceId, Int64), ByteString)
s)
          -- We don't reset pending subscription to Nothing here to avoid any race conditions
          -- with subsequent client sessions that might have set pending already.
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (ServiceId, Int64) -> Bool
forall a. Maybe a -> Bool
isJust Maybe (ServiceId, Int64)
serviceSub_) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
forall (p :: Party).
SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub SMPClientAgent p
ca SMPServer
srv Maybe (ServiceId, Int64)
serviceSub_
          Maybe (ServiceId, Int64) -> STM (Maybe (ServiceId, Int64))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ServiceId, Int64)
serviceSub_
        updateQueueSubs :: TMap ServiceId (ByteString, APrivateAuthKey)
-> STM (Maybe (Map ServiceId APrivateAuthKey))
updateQueueSubs TMap ServiceId (ByteString, APrivateAuthKey)
qVar = do
          -- removing subscriptions that have matching sessionId to disconnected client
          -- and keep the other ones (they can be made by the new client)
          Map ServiceId APrivateAuthKey
subs <- ((ByteString, APrivateAuthKey) -> APrivateAuthKey)
-> Map ServiceId (ByteString, APrivateAuthKey)
-> Map ServiceId APrivateAuthKey
forall a b k. (a -> b) -> Map k a -> Map k b
M.map (ByteString, APrivateAuthKey) -> APrivateAuthKey
forall a b. (a, b) -> b
snd (Map ServiceId (ByteString, APrivateAuthKey)
 -> Map ServiceId APrivateAuthKey)
-> STM (Map ServiceId (ByteString, APrivateAuthKey))
-> STM (Map ServiceId APrivateAuthKey)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap ServiceId (ByteString, APrivateAuthKey)
-> (Map ServiceId (ByteString, APrivateAuthKey)
    -> (Map ServiceId (ByteString, APrivateAuthKey),
        Map ServiceId (ByteString, APrivateAuthKey)))
-> STM (Map ServiceId (ByteString, APrivateAuthKey))
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TMap ServiceId (ByteString, APrivateAuthKey)
qVar (((ByteString, APrivateAuthKey) -> Bool)
-> Map ServiceId (ByteString, APrivateAuthKey)
-> (Map ServiceId (ByteString, APrivateAuthKey),
    Map ServiceId (ByteString, APrivateAuthKey))
forall a k. (a -> Bool) -> Map k a -> (Map k a, Map k a)
M.partition ((ByteString
sessId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
==) (ByteString -> Bool)
-> ((ByteString, APrivateAuthKey) -> ByteString)
-> (ByteString, APrivateAuthKey)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString, APrivateAuthKey) -> ByteString
forall a b. (a, b) -> a
fst))
          if Map ServiceId APrivateAuthKey -> Bool
forall k a. Map k a -> Bool
M.null Map ServiceId APrivateAuthKey
subs
            then Maybe (Map ServiceId APrivateAuthKey)
-> STM (Maybe (Map ServiceId APrivateAuthKey))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Map ServiceId APrivateAuthKey)
forall a. Maybe a
Nothing
            else Map ServiceId APrivateAuthKey
-> Maybe (Map ServiceId APrivateAuthKey)
forall a. a -> Maybe a
Just Map ServiceId APrivateAuthKey
subs Maybe (Map ServiceId APrivateAuthKey)
-> STM () -> STM (Maybe (Map ServiceId APrivateAuthKey))
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> SMPServer -> Map ServiceId APrivateAuthKey -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> Map ServiceId s -> STM ()
addSubs_ (SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs SMPClientAgent p
ca) SMPServer
srv Map ServiceId APrivateAuthKey
subs

    serverDown :: (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey)) -> IO ()
    serverDown :: (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO ()
serverDown (Maybe (ServiceId, Int64)
sSub, Maybe (Map ServiceId APrivateAuthKey)
qSubs) = do
      ((ServiceId, Int64) -> IO ()) -> Maybe (ServiceId, Int64) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ())
-> ((ServiceId, Int64) -> SMPClientAgentEvent)
-> (ServiceId, Int64)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPServer -> (ServiceId, Int64) -> SMPClientAgentEvent
CAServiceDisconnected SMPServer
srv) Maybe (ServiceId, Int64)
sSub
      let qIds :: Maybe (NonEmpty ServiceId)
qIds = [ServiceId] -> Maybe (NonEmpty ServiceId)
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty ([ServiceId] -> Maybe (NonEmpty ServiceId))
-> (Map ServiceId APrivateAuthKey -> [ServiceId])
-> Map ServiceId APrivateAuthKey
-> Maybe (NonEmpty ServiceId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map ServiceId APrivateAuthKey -> [ServiceId]
forall k a. Map k a -> [k]
M.keys (Map ServiceId APrivateAuthKey -> Maybe (NonEmpty ServiceId))
-> Maybe (Map ServiceId APrivateAuthKey)
-> Maybe (NonEmpty ServiceId)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe (Map ServiceId APrivateAuthKey)
qSubs
      (NonEmpty ServiceId -> IO ())
-> Maybe (NonEmpty ServiceId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ())
-> (NonEmpty ServiceId -> SMPClientAgentEvent)
-> NonEmpty ServiceId
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPServer -> NonEmpty ServiceId -> SMPClientAgentEvent
CADisconnected SMPServer
srv) Maybe (NonEmpty ServiceId)
qIds
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (ServiceId, Int64) -> Bool
forall a. Maybe a -> Bool
isJust Maybe (ServiceId, Int64)
sSub Bool -> Bool -> Bool
|| Maybe (NonEmpty ServiceId) -> Bool
forall a. Maybe a -> Bool
isJust Maybe (NonEmpty ServiceId)
qIds) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv

-- | Spawn reconnect worker if needed
reconnectClient :: SMPClientAgent p -> SMPServer -> IO ()
reconnectClient :: forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient ca :: SMPClientAgent p
ca@SMPClientAgent {TVar Bool
$sel:active:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> TVar Bool
active :: TVar Bool
active, SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg, TMap SMPServer (SessionVar (Async ()))
$sel:smpSubWorkers:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (SessionVar (Async ()))
smpSubWorkers :: TMap SMPServer (SessionVar (Async ()))
smpSubWorkers, TVar Int
$sel:workerSeq:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> TVar Int
workerSeq :: TVar Int
workerSeq} SMPServer
srv = do
  UTCTime
ts <- IO UTCTime
getCurrentTime
  IO Bool -> IO () -> IO ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (TVar Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
active) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM
  (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
-> IO
     (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (UTCTime
-> STM
     (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
getWorkerVar UTCTime
ts) IO (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
-> (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ())))
    -> IO ())
-> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either (SessionVar (Async ())) (SessionVar (Async ())) -> IO ())
-> Maybe (Either (SessionVar (Async ())) (SessionVar (Async ())))
-> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((SessionVar (Async ()) -> IO ())
-> (SessionVar (Async ()) -> IO ())
-> Either (SessionVar (Async ())) (SessionVar (Async ()))
-> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SessionVar (Async ()) -> IO ()
newSubWorker (\SessionVar (Async ())
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))
  where
    getWorkerVar :: UTCTime
-> STM
     (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
getWorkerVar UTCTime
ts =
      STM Bool
-> STM
     (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
-> STM
     (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
-> STM
     (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
        ((Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> Bool
forall {a} {k} {a}. (Maybe a, Maybe (Map k a)) -> Bool
noPending ((Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
 -> Bool)
-> STM
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> STM Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall a. SMPServer -> TMap SMPServer a -> STM (Maybe a))
-> (forall a. TVar a -> STM a)
-> STM
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall (m :: * -> *).
Monad m =>
(forall a. SMPServer -> TMap SMPServer a -> m (Maybe a))
-> (forall a. TVar a -> m a)
-> m (Maybe (ServiceId, Int64),
      Maybe (Map ServiceId APrivateAuthKey))
getPending SMPServer -> TMap SMPServer a -> STM (Maybe a)
forall a. SMPServer -> TMap SMPServer a -> STM (Maybe a)
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup TVar a -> STM a
forall a. TVar a -> STM a
readTVar)
        (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ())))
-> STM
     (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Either (SessionVar (Async ())) (SessionVar (Async ())))
forall a. Maybe a
Nothing) -- prevent race with cleanup and adding pending queues in another call
        (Either (SessionVar (Async ())) (SessionVar (Async ()))
-> Maybe (Either (SessionVar (Async ())) (SessionVar (Async ())))
forall a. a -> Maybe a
Just (Either (SessionVar (Async ())) (SessionVar (Async ()))
 -> Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
-> STM (Either (SessionVar (Async ())) (SessionVar (Async ())))
-> STM
     (Maybe (Either (SessionVar (Async ())) (SessionVar (Async ()))))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int
-> SMPServer
-> TMap SMPServer (SessionVar (Async ()))
-> UTCTime
-> STM (Either (SessionVar (Async ())) (SessionVar (Async ())))
forall k a.
Ord k =>
TVar Int
-> k
-> TMap k (SessionVar a)
-> UTCTime
-> STM (Either (SessionVar a) (SessionVar a))
getSessVar TVar Int
workerSeq SMPServer
srv TMap SMPServer (SessionVar (Async ()))
smpSubWorkers UTCTime
ts)
    newSubWorker :: SessionVar (Async ()) -> IO ()
    newSubWorker :: SessionVar (Async ()) -> IO ()
newSubWorker SessionVar (Async ())
v = do
      Async ()
a <- IO () -> IO (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either SomeException ()) -> IO ())
-> IO (Either SomeException ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
E.tryAny (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ SessionVar (Async ()) -> IO ()
runSubWorker SessionVar (Async ())
v
      STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (Async ()) -> Async () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (SessionVar (Async ()) -> TMVar (Async ())
forall a. SessionVar a -> TMVar a
sessionVar SessionVar (Async ())
v) Async ()
a
    runSubWorker :: SessionVar (Async ()) -> IO ()
runSubWorker SessionVar (Async ())
v =
      RetryInterval -> (Int64 -> IO () -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
RetryInterval -> (Int64 -> m a -> m a) -> m a
withRetryInterval (SMPClientAgentConfig -> RetryInterval
reconnectInterval SMPClientAgentConfig
agentCfg) ((Int64 -> IO () -> IO ()) -> IO ())
-> (Int64 -> IO () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int64
_ IO ()
loop -> do
        (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
subs <- STM
  (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM
   (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
 -> IO
      (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey)))
-> STM
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> IO
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall a b. (a -> b) -> a -> b
$ do
          (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
s <- (forall a. SMPServer -> TMap SMPServer a -> STM (Maybe a))
-> (forall a. TVar a -> STM a)
-> STM
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall (m :: * -> *).
Monad m =>
(forall a. SMPServer -> TMap SMPServer a -> m (Maybe a))
-> (forall a. TVar a -> m a)
-> m (Maybe (ServiceId, Int64),
      Maybe (Map ServiceId APrivateAuthKey))
getPending SMPServer -> TMap SMPServer a -> STM (Maybe a)
forall a. SMPServer -> TMap SMPServer a -> STM (Maybe a)
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup TVar a -> STM a
forall a. TVar a -> STM a
readTVar
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ((Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> Bool
forall {a} {k} {a}. (Maybe a, Maybe (Map k a)) -> Bool
noPending (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
s) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ SessionVar (Async ()) -> STM ()
cleanup SessionVar (Async ())
v
          (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> STM
     (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
s
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ((Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> Bool
forall {a} {k} {a}. (Maybe a, Maybe (Map k a)) -> Bool
noPending (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
subs) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO () -> IO ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (TVar Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
active) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          IO (Maybe (Either SMPClientError ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe (Either SMPClientError ())) -> IO ())
-> IO (Maybe (Either SMPClientError ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ NetworkTimeout -> NetworkRequestMode -> Int
netTimeoutInt NetworkTimeout
tcpConnectTimeout NetworkRequestMode
NRMBackground Int
-> IO (Either SMPClientError ())
-> IO (Maybe (Either SMPClientError ()))
forall a. Int -> IO a -> IO (Maybe a)
`timeout` ExceptT SMPClientError IO () -> IO (Either SMPClientError ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SMPClientAgent p
-> SMPServer
-> (Maybe (ServiceId, Int64),
    Maybe (Map ServiceId APrivateAuthKey))
-> ExceptT SMPClientError IO ()
forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> (Maybe (ServiceId, Int64),
    Maybe (Map ServiceId APrivateAuthKey))
-> ExceptT SMPClientError IO ()
reconnectSMPClient SMPClientAgent p
ca SMPServer
srv (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
subs)
          IO ()
loop
    ProtocolClientConfig {$sel:networkConfig:ProtocolClientConfig :: forall v. ProtocolClientConfig v -> NetworkConfig
networkConfig = NetworkConfig {NetworkTimeout
$sel:tcpConnectTimeout:NetworkConfig :: NetworkConfig -> NetworkTimeout
tcpConnectTimeout :: NetworkTimeout
tcpConnectTimeout}} = SMPClientAgentConfig -> ProtocolClientConfig SMPVersion
smpCfg SMPClientAgentConfig
agentCfg
    noPending :: (Maybe a, Maybe (Map k a)) -> Bool
noPending (Maybe a
sSub, Maybe (Map k a)
qSubs) = Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing Maybe a
sSub Bool -> Bool -> Bool
&& Bool -> (Map k a -> Bool) -> Maybe (Map k a) -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
True Map k a -> Bool
forall k a. Map k a -> Bool
M.null Maybe (Map k a)
qSubs
    getPending :: Monad m => (forall a. SMPServer -> TMap SMPServer a -> m (Maybe a)) -> (forall a. TVar a -> m a) -> m (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey))
    getPending :: forall (m :: * -> *).
Monad m =>
(forall a. SMPServer -> TMap SMPServer a -> m (Maybe a))
-> (forall a. TVar a -> m a)
-> m (Maybe (ServiceId, Int64),
      Maybe (Map ServiceId APrivateAuthKey))
getPending forall a. SMPServer -> TMap SMPServer a -> m (Maybe a)
lkup forall a. TVar a -> m a
rd = do
      Maybe (ServiceId, Int64)
sSub <- SMPServer
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
-> m (Maybe (TVar (Maybe (ServiceId, Int64))))
forall a. SMPServer -> TMap SMPServer a -> m (Maybe a)
lkup SMPServer
srv (SMPClientAgent p
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs SMPClientAgent p
ca) m (Maybe (TVar (Maybe (ServiceId, Int64))))
-> (TVar (Maybe (ServiceId, Int64))
    -> m (Maybe (ServiceId, Int64)))
-> m (Maybe (ServiceId, Int64))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= TVar (Maybe (ServiceId, Int64)) -> m (Maybe (ServiceId, Int64))
forall a. TVar a -> m a
rd
      Maybe (Map ServiceId APrivateAuthKey)
qSubs <- SMPServer
-> TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> m (Maybe (TMap ServiceId APrivateAuthKey))
forall a. SMPServer -> TMap SMPServer a -> m (Maybe a)
lkup SMPServer
srv (SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs SMPClientAgent p
ca) m (Maybe (TMap ServiceId APrivateAuthKey))
-> (Maybe (TMap ServiceId APrivateAuthKey)
    -> m (Maybe (Map ServiceId APrivateAuthKey)))
-> m (Maybe (Map ServiceId APrivateAuthKey))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (TMap ServiceId APrivateAuthKey
 -> m (Map ServiceId APrivateAuthKey))
-> Maybe (TMap ServiceId APrivateAuthKey)
-> m (Maybe (Map ServiceId APrivateAuthKey))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM TMap ServiceId APrivateAuthKey -> m (Map ServiceId APrivateAuthKey)
forall a. TVar a -> m a
rd
      (Maybe (ServiceId, Int64), Maybe (Map ServiceId APrivateAuthKey))
-> m (Maybe (ServiceId, Int64),
      Maybe (Map ServiceId APrivateAuthKey))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ServiceId, Int64)
sSub, Maybe (Map ServiceId APrivateAuthKey)
qSubs)
    cleanup :: SessionVar (Async ()) -> STM ()
    cleanup :: SessionVar (Async ()) -> STM ()
cleanup SessionVar (Async ())
v = do
      -- Here we wait until TMVar is not empty to prevent worker cleanup happening before worker is added to TMVar.
      -- Not waiting may result in terminated worker remaining in the map.
      STM Bool -> STM () -> STM ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (TMVar (Async ()) -> STM Bool
forall a. TMVar a -> STM Bool
isEmptyTMVar (TMVar (Async ()) -> STM Bool) -> TMVar (Async ()) -> STM Bool
forall a b. (a -> b) -> a -> b
$ SessionVar (Async ()) -> TMVar (Async ())
forall a. SessionVar a -> TMVar a
sessionVar SessionVar (Async ())
v) STM ()
forall a. STM a
retry
      SessionVar (Async ())
-> SMPServer -> TMap SMPServer (SessionVar (Async ())) -> STM ()
forall k a.
Ord k =>
SessionVar a -> k -> TMap k (SessionVar a) -> STM ()
removeSessVar SessionVar (Async ())
v SMPServer
srv TMap SMPServer (SessionVar (Async ()))
smpSubWorkers

reconnectSMPClient :: forall p. SMPClientAgent p -> SMPServer -> (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey)) -> ExceptT SMPClientError IO ()
reconnectSMPClient :: forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> (Maybe (ServiceId, Int64),
    Maybe (Map ServiceId APrivateAuthKey))
-> ExceptT SMPClientError IO ()
reconnectSMPClient ca :: SMPClientAgent p
ca@SMPClientAgent {SMPClientAgentConfig
$sel:agentCfg:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SMPClientAgentConfig
agentCfg :: SMPClientAgentConfig
agentCfg, SParty p
$sel:agentParty:SMPClientAgent :: forall (p :: Party). SMPClientAgent p -> SParty p
agentParty :: SParty p
agentParty} SMPServer
srv (Maybe (ServiceId, Int64)
sSub_, Maybe (Map ServiceId APrivateAuthKey)
qSubs_) =
  SMPClientAgent p
-> SMPServer
-> (SMPClient -> ExceptT SMPClientError IO ())
-> ExceptT SMPClientError IO ()
forall (p :: Party) a.
SMPClientAgent p
-> SMPServer
-> (SMPClient -> ExceptT SMPClientError IO a)
-> ExceptT SMPClientError IO a
withSMP SMPClientAgent p
ca SMPServer
srv ((SMPClient -> ExceptT SMPClientError IO ())
 -> ExceptT SMPClientError IO ())
-> (SMPClient -> ExceptT SMPClientError IO ())
-> ExceptT SMPClientError IO ()
forall a b. (a -> b) -> a -> b
$ \SMPClient
smp -> IO () -> ExceptT SMPClientError IO ()
forall a. IO a -> ExceptT SMPClientError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT SMPClientError IO ())
-> IO () -> ExceptT SMPClientError IO ()
forall a b. (a -> b) -> a -> b
$ case SParty p -> Maybe (Dict (PartyI p, ServiceParty p))
forall (p :: Party).
SParty p -> Maybe (Dict (PartyI p, ServiceParty p))
serviceParty SParty p
agentParty of
    Just Dict (PartyI p, ServiceParty p)
Dict -> (PartyI p, ServiceParty p) => SMPClient -> IO ()
SMPClient -> IO ()
resubscribe SMPClient
smp
    Maybe (Dict (PartyI p, ServiceParty p))
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  where
    resubscribe :: (PartyI p, ServiceParty p) => SMPClient -> IO ()
    resubscribe :: (PartyI p, ServiceParty p) => SMPClient -> IO ()
resubscribe SMPClient
smp = do
      ((ServiceId, Int64) -> IO ()) -> Maybe (ServiceId, Int64) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SMPClientAgent p
-> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClientAgent p
-> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
smpSubscribeService SMPClientAgent p
ca SMPClient
smp SMPServer
srv) Maybe (ServiceId, Int64)
sSub_
      Maybe (Map ServiceId APrivateAuthKey)
-> (Map ServiceId APrivateAuthKey -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (Map ServiceId APrivateAuthKey)
qSubs_ ((Map ServiceId APrivateAuthKey -> IO ()) -> IO ())
-> (Map ServiceId APrivateAuthKey -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Map ServiceId APrivateAuthKey
qSubs -> do
        Maybe (Map ServiceId (ByteString, APrivateAuthKey))
currSubs_ <- (TMap ServiceId (ByteString, APrivateAuthKey)
 -> IO (Map ServiceId (ByteString, APrivateAuthKey)))
-> Maybe (TMap ServiceId (ByteString, APrivateAuthKey))
-> IO (Maybe (Map ServiceId (ByteString, APrivateAuthKey)))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM TMap ServiceId (ByteString, APrivateAuthKey)
-> IO (Map ServiceId (ByteString, APrivateAuthKey))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey))
 -> IO (Maybe (Map ServiceId (ByteString, APrivateAuthKey))))
-> IO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
-> IO (Maybe (Map ServiceId (ByteString, APrivateAuthKey)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< SMPServer
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> IO (Maybe (TMap ServiceId (ByteString, APrivateAuthKey)))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO SMPServer
srv (SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs SMPClientAgent p
ca)
        let [(ServiceId, APrivateAuthKey)]
qSubs' :: [(QueueId, C.APrivateAuthKey)] =
              ([(ServiceId, APrivateAuthKey)] -> [(ServiceId, APrivateAuthKey)])
-> (Map ServiceId (ByteString, APrivateAuthKey)
    -> [(ServiceId, APrivateAuthKey)]
    -> [(ServiceId, APrivateAuthKey)])
-> Maybe (Map ServiceId (ByteString, APrivateAuthKey))
-> [(ServiceId, APrivateAuthKey)]
-> [(ServiceId, APrivateAuthKey)]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [(ServiceId, APrivateAuthKey)] -> [(ServiceId, APrivateAuthKey)]
forall a. a -> a
id (\Map ServiceId (ByteString, APrivateAuthKey)
currSubs -> ((ServiceId, APrivateAuthKey) -> Bool)
-> [(ServiceId, APrivateAuthKey)] -> [(ServiceId, APrivateAuthKey)]
forall a. (a -> Bool) -> [a] -> [a]
filter ((ServiceId -> Map ServiceId (ByteString, APrivateAuthKey) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`M.notMember` Map ServiceId (ByteString, APrivateAuthKey)
currSubs) (ServiceId -> Bool)
-> ((ServiceId, APrivateAuthKey) -> ServiceId)
-> (ServiceId, APrivateAuthKey)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ServiceId, APrivateAuthKey) -> ServiceId
forall a b. (a, b) -> a
fst)) Maybe (Map ServiceId (ByteString, APrivateAuthKey))
currSubs_ ([(ServiceId, APrivateAuthKey)] -> [(ServiceId, APrivateAuthKey)])
-> [(ServiceId, APrivateAuthKey)] -> [(ServiceId, APrivateAuthKey)]
forall a b. (a -> b) -> a -> b
$ Map ServiceId APrivateAuthKey -> [(ServiceId, APrivateAuthKey)]
forall k a. Map k a -> [(k, a)]
M.assocs Map ServiceId APrivateAuthKey
qSubs
        (NonEmpty (ServiceId, APrivateAuthKey) -> IO ())
-> [NonEmpty (ServiceId, APrivateAuthKey)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall (p :: Party).
ServiceParty p =>
SMPClientAgent p
-> SMPClient
-> SMPServer
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO ()
smpSubscribeQueues @p SMPClientAgent p
ca SMPClient
smp SMPServer
srv) ([NonEmpty (ServiceId, APrivateAuthKey)] -> IO ())
-> [NonEmpty (ServiceId, APrivateAuthKey)] -> IO ()
forall a b. (a -> b) -> a -> b
$ Int
-> [(ServiceId, APrivateAuthKey)]
-> [NonEmpty (ServiceId, APrivateAuthKey)]
forall a. Int -> [a] -> [NonEmpty a]
toChunks (SMPClientAgentConfig -> Int
agentSubsBatchSize SMPClientAgentConfig
agentCfg) [(ServiceId, APrivateAuthKey)]
qSubs'

notify :: MonadIO m => SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify :: forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca SMPClientAgentEvent
evt = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue SMPClientAgentEvent -> SMPClientAgentEvent -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (SMPClientAgent p -> TBQueue SMPClientAgentEvent
forall (p :: Party).
SMPClientAgent p -> TBQueue SMPClientAgentEvent
agentQ SMPClientAgent p
ca) SMPClientAgentEvent
evt
{-# INLINE notify #-}

-- Returns already connected client for proxying messages or Nothing if client is absent, not connected yet or stores expired error.
-- If Nothing is return proxy will spawn a new thread to wait or to create another client connection to destination relay.
getConnectedSMPServerClient :: SMPClientAgent p -> SMPServer -> IO (Maybe (Either SMPClientError (OwnServer, SMPClient)))
getConnectedSMPServerClient :: forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
getConnectedSMPServerClient SMPClientAgent {TMap SMPServer SMPClientVar
$sel:smpClients:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients :: TMap SMPServer SMPClientVar
smpClients} SMPServer
srv =
  STM
  (Maybe
     (SMPClientVar,
      Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> IO
     (Maybe
        (SMPClientVar,
         Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (SMPServer
-> TMap SMPServer SMPClientVar -> STM (Maybe SMPClientVar)
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv TMap SMPServer SMPClientVar
smpClients STM (Maybe SMPClientVar)
-> (SMPClientVar
    -> STM
         (Maybe
            (SMPClientVar,
             Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))))
-> STM
     (Maybe
        (SMPClientVar,
         Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \SMPClientVar
v -> (SMPClientVar
v,) (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)
 -> (SMPClientVar,
     Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> STM
     (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> STM
     (Maybe
        (SMPClientVar,
         Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> STM
     (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall a. TMVar a -> STM (Maybe a)
tryReadTMVar (SMPClientVar
-> TMVar (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
forall a. SessionVar a -> TMVar a
sessionVar SMPClientVar
v)) -- Nothing: client is absent or not connected yet
    IO
  (Maybe
     (SMPClientVar,
      Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> ((SMPClientVar,
     Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
    -> IO (Maybe (Either SMPClientError (Bool, SMPClient))))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \case
      (SMPClientVar
_, Right (Bool, SMPClient)
r) -> Maybe (Either SMPClientError (Bool, SMPClient))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either SMPClientError (Bool, SMPClient))
 -> IO (Maybe (Either SMPClientError (Bool, SMPClient))))
-> Maybe (Either SMPClientError (Bool, SMPClient))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall a b. (a -> b) -> a -> b
$ Either SMPClientError (Bool, SMPClient)
-> Maybe (Either SMPClientError (Bool, SMPClient))
forall a. a -> Maybe a
Just (Either SMPClientError (Bool, SMPClient)
 -> Maybe (Either SMPClientError (Bool, SMPClient)))
-> Either SMPClientError (Bool, SMPClient)
-> Maybe (Either SMPClientError (Bool, SMPClient))
forall a b. (a -> b) -> a -> b
$ (Bool, SMPClient) -> Either SMPClientError (Bool, SMPClient)
forall a b. b -> Either a b
Right (Bool, SMPClient)
r
      (SMPClientVar
v, Left (SMPClientError
e, Maybe UTCTime
ts_)) ->
        Maybe UTCTime -> IO (Maybe UTCTime)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe UTCTime
ts_ IO (Maybe UTCTime)
-> (UTCTime
    -> IO (Maybe (Either SMPClientError (Bool, SMPClient))))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= \UTCTime
ts ->
          -- proxy will create a new connection if ts_ is Nothing
          IO Bool
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
            ((UTCTime
ts UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
<) (UTCTime -> Bool) -> IO UTCTime -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UTCTime -> IO UTCTime
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime) -- error persistence interval period expired?
            (Maybe (Either SMPClientError (Bool, SMPClient))
forall a. Maybe a
Nothing Maybe (Either SMPClientError (Bool, SMPClient))
-> IO () -> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (SMPClientVar -> SMPServer -> TMap SMPServer SMPClientVar -> STM ()
forall k a.
Ord k =>
SessionVar a -> k -> TMap k (SessionVar a) -> STM ()
removeSessVar SMPClientVar
v SMPServer
srv TMap SMPServer SMPClientVar
smpClients)) -- proxy will create a new connection
            (Maybe (Either SMPClientError (Bool, SMPClient))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either SMPClientError (Bool, SMPClient))
 -> IO (Maybe (Either SMPClientError (Bool, SMPClient))))
-> Maybe (Either SMPClientError (Bool, SMPClient))
-> IO (Maybe (Either SMPClientError (Bool, SMPClient)))
forall a b. (a -> b) -> a -> b
$ Either SMPClientError (Bool, SMPClient)
-> Maybe (Either SMPClientError (Bool, SMPClient))
forall a. a -> Maybe a
Just (Either SMPClientError (Bool, SMPClient)
 -> Maybe (Either SMPClientError (Bool, SMPClient)))
-> Either SMPClientError (Bool, SMPClient)
-> Maybe (Either SMPClientError (Bool, SMPClient))
forall a b. (a -> b) -> a -> b
$ SMPClientError -> Either SMPClientError (Bool, SMPClient)
forall a b. a -> Either a b
Left SMPClientError
e) -- not expired, returning error

lookupSMPServerClient :: SMPClientAgent p -> SessionId -> IO (Maybe (OwnServer, SMPClient))
lookupSMPServerClient :: forall (p :: Party).
SMPClientAgent p -> ByteString -> IO (Maybe (Bool, SMPClient))
lookupSMPServerClient SMPClientAgent {TMap ByteString (Bool, SMPClient)
$sel:smpSessions:SMPClientAgent :: forall (p :: Party).
SMPClientAgent p -> TMap ByteString (Bool, SMPClient)
smpSessions :: TMap ByteString (Bool, SMPClient)
smpSessions} ByteString
sessId = ByteString
-> TMap ByteString (Bool, SMPClient)
-> IO (Maybe (Bool, SMPClient))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO ByteString
sessId TMap ByteString (Bool, SMPClient)
smpSessions

closeSMPClientAgent :: SMPClientAgent p -> IO ()
closeSMPClientAgent :: forall (p :: Party). SMPClientAgent p -> IO ()
closeSMPClientAgent SMPClientAgent p
c = do
  STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (SMPClientAgent p -> TVar Bool
forall (p :: Party). SMPClientAgent p -> TVar Bool
active SMPClientAgent p
c) Bool
False
  SMPClientAgent p -> IO ()
forall (p :: Party). SMPClientAgent p -> IO ()
closeSMPServerClients SMPClientAgent p
c
  STM (Map SMPServer (SessionVar (Async ())))
-> IO (Map SMPServer (SessionVar (Async ())))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TMap SMPServer (SessionVar (Async ()))
-> Map SMPServer (SessionVar (Async ()))
-> STM (Map SMPServer (SessionVar (Async ())))
forall a. TVar a -> a -> STM a
swapTVar (SMPClientAgent p -> TMap SMPServer (SessionVar (Async ()))
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (SessionVar (Async ()))
smpSubWorkers SMPClientAgent p
c) Map SMPServer (SessionVar (Async ()))
forall k a. Map k a
M.empty) IO (Map SMPServer (SessionVar (Async ())))
-> (Map SMPServer (SessionVar (Async ())) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SessionVar (Async ()) -> IO ())
-> Map SMPServer (SessionVar (Async ())) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ SessionVar (Async ()) -> IO ()
cancelReconnect
  where
    cancelReconnect :: SessionVar (Async ()) -> IO ()
    cancelReconnect :: SessionVar (Async ()) -> IO ()
cancelReconnect SessionVar (Async ())
v = IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (Async ()) -> IO (Async ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TMVar (Async ()) -> STM (Async ())
forall a. TMVar a -> STM a
readTMVar (TMVar (Async ()) -> STM (Async ()))
-> TMVar (Async ()) -> STM (Async ())
forall a b. (a -> b) -> a -> b
$ SessionVar (Async ()) -> TMVar (Async ())
forall a. SessionVar a -> TMVar a
sessionVar SessionVar (Async ())
v) IO (Async ()) -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Async () -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel

closeSMPServerClients :: SMPClientAgent p -> IO ()
closeSMPServerClients :: forall (p :: Party). SMPClientAgent p -> IO ()
closeSMPServerClients SMPClientAgent p
c = STM (Map SMPServer SMPClientVar) -> IO (Map SMPServer SMPClientVar)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (SMPClientAgent p -> TMap SMPServer SMPClientVar
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients SMPClientAgent p
c TMap SMPServer SMPClientVar
-> Map SMPServer SMPClientVar -> STM (Map SMPServer SMPClientVar)
forall a. TVar a -> a -> STM a
`swapTVar` Map SMPServer SMPClientVar
forall k a. Map k a
M.empty) IO (Map SMPServer SMPClientVar)
-> (Map SMPServer SMPClientVar -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SMPClientVar -> IO ThreadId)
-> Map SMPServer SMPClientVar -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId)
-> (SMPClientVar -> IO ()) -> SMPClientVar -> IO ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientVar -> IO ()
forall {a} {a} {v} {err} {msg}.
SessionVar (Either a (a, ProtocolClient v err msg)) -> IO ()
closeClient)
  where
    closeClient :: SessionVar (Either a (a, ProtocolClient v err msg)) -> IO ()
closeClient SessionVar (Either a (a, ProtocolClient v err msg))
v =
      STM (Either a (a, ProtocolClient v err msg))
-> IO (Either a (a, ProtocolClient v err msg))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TMVar (Either a (a, ProtocolClient v err msg))
-> STM (Either a (a, ProtocolClient v err msg))
forall a. TMVar a -> STM a
readTMVar (TMVar (Either a (a, ProtocolClient v err msg))
 -> STM (Either a (a, ProtocolClient v err msg)))
-> TMVar (Either a (a, ProtocolClient v err msg))
-> STM (Either a (a, ProtocolClient v err msg))
forall a b. (a -> b) -> a -> b
$ SessionVar (Either a (a, ProtocolClient v err msg))
-> TMVar (Either a (a, ProtocolClient v err msg))
forall a. SessionVar a -> TMVar a
sessionVar SessionVar (Either a (a, ProtocolClient v err msg))
v) IO (Either a (a, ProtocolClient v err msg))
-> (Either a (a, ProtocolClient v err msg) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Right (a
_, ProtocolClient v err msg
smp) -> ProtocolClient v err msg -> IO ()
forall v err msg. ProtocolClient v err msg -> IO ()
closeProtocolClient ProtocolClient v err msg
smp IO () -> IO () -> IO ()
forall a. IO a -> IO a -> IO a
`catchAll_` () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Either a (a, ProtocolClient v err msg)
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

cancelActions :: Foldable f => TVar (f (Async ())) -> IO ()
cancelActions :: forall (f :: * -> *). Foldable f => TVar (f (Async ())) -> IO ()
cancelActions TVar (f (Async ()))
as = TVar (f (Async ())) -> IO (f (Async ()))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar (f (Async ()))
as IO (f (Async ())) -> (f (Async ()) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Async () -> IO ()) -> f (Async ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async () -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel

withSMP :: SMPClientAgent p -> SMPServer -> (SMPClient -> ExceptT SMPClientError IO a) -> ExceptT SMPClientError IO a
withSMP :: forall (p :: Party) a.
SMPClientAgent p
-> SMPServer
-> (SMPClient -> ExceptT SMPClientError IO a)
-> ExceptT SMPClientError IO a
withSMP SMPClientAgent p
ca SMPServer
srv SMPClient -> ExceptT SMPClientError IO a
action = (SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' SMPClientAgent p
ca SMPServer
srv ExceptT SMPClientError IO SMPClient
-> (SMPClient -> ExceptT SMPClientError IO a)
-> ExceptT SMPClientError IO a
forall a b.
ExceptT SMPClientError IO a
-> (a -> ExceptT SMPClientError IO b)
-> ExceptT SMPClientError IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SMPClient -> ExceptT SMPClientError IO a
action) ExceptT SMPClientError IO a
-> (SMPClientError -> ExceptT SMPClientError IO a)
-> ExceptT SMPClientError IO a
forall (m :: * -> *) e a e'.
Monad m =>
ExceptT e m a -> (e -> ExceptT e' m a) -> ExceptT e' m a
`catchE` SMPClientError -> ExceptT SMPClientError IO a
forall a. SMPClientError -> ExceptT SMPClientError IO a
logSMPError
  where
    logSMPError :: SMPClientError -> ExceptT SMPClientError IO a
    logSMPError :: forall a. SMPClientError -> ExceptT SMPClientError IO a
logSMPError SMPClientError
e = do
      Text -> ExceptT SMPClientError IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logInfo (Text -> ExceptT SMPClientError IO ())
-> Text -> ExceptT SMPClientError IO ()
forall a b. (a -> b) -> a -> b
$ Text
"SMP error (" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ByteString -> Text
safeDecodeUtf8 (SMPServer -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode SMPServer
srv) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"): " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SMPClientError -> Text
forall a. Show a => a -> Text
tshow SMPClientError
e
      SMPClientError -> ExceptT SMPClientError IO a
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE SMPClientError
e

subscribeQueuesNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> NonEmpty (NotifierId, NtfPrivateAuthKey) -> IO ()
subscribeQueuesNtfs :: SMPClientAgent 'NotifierService
-> SMPServer -> NonEmpty (ServiceId, APrivateAuthKey) -> IO ()
subscribeQueuesNtfs = SMPClientAgent 'NotifierService
-> SMPServer -> NonEmpty (ServiceId, APrivateAuthKey) -> IO ()
forall (p :: Party).
ServiceParty p =>
SMPClientAgent p
-> SMPServer -> NonEmpty (ServiceId, APrivateAuthKey) -> IO ()
subscribeQueues_
{-# INLINE subscribeQueuesNtfs #-}

subscribeQueues_ :: ServiceParty p => SMPClientAgent p -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
subscribeQueues_ :: forall (p :: Party).
ServiceParty p =>
SMPClientAgent p
-> SMPServer -> NonEmpty (ServiceId, APrivateAuthKey) -> IO ()
subscribeQueues_ SMPClientAgent p
ca SMPServer
srv NonEmpty (ServiceId, APrivateAuthKey)
subs = do
  STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p
-> SMPServer -> [(ServiceId, APrivateAuthKey)] -> STM ()
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> [(ServiceId, APrivateAuthKey)] -> STM ()
addPendingSubs SMPClientAgent p
ca SMPServer
srv ([(ServiceId, APrivateAuthKey)] -> STM ())
-> [(ServiceId, APrivateAuthKey)] -> STM ()
forall a b. (a -> b) -> a -> b
$ NonEmpty (ServiceId, APrivateAuthKey)
-> [(ServiceId, APrivateAuthKey)]
forall a. NonEmpty a -> [a]
L.toList NonEmpty (ServiceId, APrivateAuthKey)
subs
  ExceptT SMPClientError IO SMPClient
-> IO (Either SMPClientError SMPClient)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' SMPClientAgent p
ca SMPServer
srv) IO (Either SMPClientError SMPClient)
-> (Either SMPClientError SMPClient -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Right SMPClient
smp -> SMPClientAgent p
-> SMPClient
-> SMPServer
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO ()
forall (p :: Party).
ServiceParty p =>
SMPClientAgent p
-> SMPClient
-> SMPServer
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO ()
smpSubscribeQueues SMPClientAgent p
ca SMPClient
smp SMPServer
srv NonEmpty (ServiceId, APrivateAuthKey)
subs
    Left SMPClientError
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- no call to reconnectClient - failing getSMPServerClient' does that

smpSubscribeQueues :: ServiceParty p => SMPClientAgent p -> SMPClient -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
smpSubscribeQueues :: forall (p :: Party).
ServiceParty p =>
SMPClientAgent p
-> SMPClient
-> SMPServer
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO ()
smpSubscribeQueues SMPClientAgent p
ca SMPClient
smp SMPServer
srv NonEmpty (ServiceId, APrivateAuthKey)
subs = do
  NonEmpty (Either SMPClientError (Maybe ServiceId))
rs <- case SMPClientAgent p -> SParty p
forall (p :: Party). SMPClientAgent p -> SParty p
agentParty SMPClientAgent p
ca of
    SParty p
SRecipientService -> SMPClient
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO (NonEmpty (Either SMPClientError (Maybe ServiceId)))
subscribeSMPQueues SMPClient
smp NonEmpty (ServiceId, APrivateAuthKey)
subs
    SParty p
SNotifierService -> SMPClient
-> NonEmpty (ServiceId, APrivateAuthKey)
-> IO (NonEmpty (Either SMPClientError (Maybe ServiceId)))
subscribeSMPQueuesNtfs SMPClient
smp NonEmpty (ServiceId, APrivateAuthKey)
subs
  Maybe
  (Bool, [(ServiceId, SMPClientError)],
   ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
   [ServiceId])
rs' <-
    STM
  (Maybe
     (Bool, [(ServiceId, SMPClientError)],
      ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
      [ServiceId]))
-> IO
     (Maybe
        (Bool, [(ServiceId, SMPClientError)],
         ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
         [ServiceId]))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM
   (Maybe
      (Bool, [(ServiceId, SMPClientError)],
       ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
       [ServiceId]))
 -> IO
      (Maybe
         (Bool, [(ServiceId, SMPClientError)],
          ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
          [ServiceId])))
-> STM
     (Maybe
        (Bool, [(ServiceId, SMPClientError)],
         ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
         [ServiceId]))
-> IO
     (Maybe
        (Bool, [(ServiceId, SMPClientError)],
         ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
         [ServiceId]))
forall a b. (a -> b) -> a -> b
$
      STM Bool
-> STM
     (Maybe
        (Bool, [(ServiceId, SMPClientError)],
         ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
         [ServiceId]))
-> STM
     (Maybe
        (Bool, [(ServiceId, SMPClientError)],
         ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
         [ServiceId]))
-> STM
     (Maybe
        (Bool, [(ServiceId, SMPClientError)],
         ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
         [ServiceId]))
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
        (SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
forall (p :: Party).
SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
activeClientSession SMPClientAgent p
ca SMPClient
smp SMPServer
srv)
        ((Bool, [(ServiceId, SMPClientError)],
 ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
 [ServiceId])
-> Maybe
     (Bool, [(ServiceId, SMPClientError)],
      ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
      [ServiceId])
forall a. a -> Maybe a
Just ((Bool, [(ServiceId, SMPClientError)],
  ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
  [ServiceId])
 -> Maybe
      (Bool, [(ServiceId, SMPClientError)],
       ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
       [ServiceId]))
-> STM
     (Bool, [(ServiceId, SMPClientError)],
      ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
      [ServiceId])
-> STM
     (Maybe
        (Bool, [(ServiceId, SMPClientError)],
         ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
         [ServiceId]))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty (Either SMPClientError (Maybe ServiceId))
-> STM
     (Bool, [(ServiceId, SMPClientError)],
      ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
      [ServiceId])
processSubscriptions NonEmpty (Either SMPClientError (Maybe ServiceId))
rs)
        (Maybe
  (Bool, [(ServiceId, SMPClientError)],
   ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
   [ServiceId])
-> STM
     (Maybe
        (Bool, [(ServiceId, SMPClientError)],
         ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
         [ServiceId]))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe
  (Bool, [(ServiceId, SMPClientError)],
   ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
   [ServiceId])
forall a. Maybe a
Nothing)
  case Maybe
  (Bool, [(ServiceId, SMPClientError)],
   ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
   [ServiceId])
rs' of
    Just (Bool
tempErrs, [(ServiceId, SMPClientError)]
finalErrs, ([(ServiceId, (ByteString, APrivateAuthKey))]
qOks, [ServiceId]
sQs), [ServiceId]
_) -> do
      (SMPServer -> NonEmpty ServiceId -> SMPClientAgentEvent)
-> [ServiceId] -> IO ()
forall a.
(SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
notify_ (SMPServer
-> Maybe ServiceId -> NonEmpty ServiceId -> SMPClientAgentEvent
`CASubscribed` Maybe ServiceId
forall a. Maybe a
Nothing) ([ServiceId] -> IO ()) -> [ServiceId] -> IO ()
forall a b. (a -> b) -> a -> b
$ ((ServiceId, (ByteString, APrivateAuthKey)) -> ServiceId)
-> [(ServiceId, (ByteString, APrivateAuthKey))] -> [ServiceId]
forall a b. (a -> b) -> [a] -> [b]
map (ServiceId, (ByteString, APrivateAuthKey)) -> ServiceId
forall a b. (a, b) -> a
fst [(ServiceId, (ByteString, APrivateAuthKey))]
qOks
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe ServiceId -> Bool
forall a. Maybe a -> Bool
isJust Maybe ServiceId
smpServiceId) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (SMPServer -> NonEmpty ServiceId -> SMPClientAgentEvent)
-> [ServiceId] -> IO ()
forall a.
(SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
notify_ (SMPServer
-> Maybe ServiceId -> NonEmpty ServiceId -> SMPClientAgentEvent
`CASubscribed` Maybe ServiceId
smpServiceId) [ServiceId]
sQs
      (SMPServer
 -> NonEmpty (ServiceId, SMPClientError) -> SMPClientAgentEvent)
-> [(ServiceId, SMPClientError)] -> IO ()
forall a.
(SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
notify_ SMPServer
-> NonEmpty (ServiceId, SMPClientError) -> SMPClientAgentEvent
CASubError [(ServiceId, SMPClientError)]
finalErrs
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
tempErrs (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv
    Maybe
  (Bool, [(ServiceId, SMPClientError)],
   ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
   [ServiceId])
Nothing -> SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv
  where
    processSubscriptions :: NonEmpty (Either SMPClientError (Maybe ServiceId)) -> STM (Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId])
    processSubscriptions :: NonEmpty (Either SMPClientError (Maybe ServiceId))
-> STM
     (Bool, [(ServiceId, SMPClientError)],
      ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
      [ServiceId])
processSubscriptions NonEmpty (Either SMPClientError (Maybe ServiceId))
rs = do
      Map ServiceId APrivateAuthKey
pending <- STM (Map ServiceId APrivateAuthKey)
-> (TMap ServiceId APrivateAuthKey
    -> STM (Map ServiceId APrivateAuthKey))
-> Maybe (TMap ServiceId APrivateAuthKey)
-> STM (Map ServiceId APrivateAuthKey)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Map ServiceId APrivateAuthKey
-> STM (Map ServiceId APrivateAuthKey)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Map ServiceId APrivateAuthKey
forall k a. Map k a
M.empty) TMap ServiceId APrivateAuthKey
-> STM (Map ServiceId APrivateAuthKey)
forall a. TVar a -> STM a
readTVar (Maybe (TMap ServiceId APrivateAuthKey)
 -> STM (Map ServiceId APrivateAuthKey))
-> STM (Maybe (TMap ServiceId APrivateAuthKey))
-> STM (Map ServiceId APrivateAuthKey)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< SMPServer
-> TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> STM (Maybe (TMap ServiceId APrivateAuthKey))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv (SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs SMPClientAgent p
ca)
      let acc :: (Bool, [(ServiceId, SMPClientError)],
 ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
 [ServiceId])
acc@(Bool
_, [(ServiceId, SMPClientError)]
_, ([(ServiceId, (ByteString, APrivateAuthKey))]
qOks, [ServiceId]
sQs), [ServiceId]
notPending) = (((ServiceId, APrivateAuthKey),
  Either SMPClientError (Maybe ServiceId))
 -> (Bool, [(ServiceId, SMPClientError)],
     ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
     [ServiceId])
 -> (Bool, [(ServiceId, SMPClientError)],
     ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
     [ServiceId]))
-> (Bool, [(ServiceId, SMPClientError)],
    ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
    [ServiceId])
-> NonEmpty
     ((ServiceId, APrivateAuthKey),
      Either SMPClientError (Maybe ServiceId))
-> (Bool, [(ServiceId, SMPClientError)],
    ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
    [ServiceId])
forall a b. (a -> b -> b) -> b -> NonEmpty a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (Map ServiceId APrivateAuthKey
-> ((ServiceId, APrivateAuthKey),
    Either SMPClientError (Maybe ServiceId))
-> (Bool, [(ServiceId, SMPClientError)],
    ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
    [ServiceId])
-> (Bool, [(ServiceId, SMPClientError)],
    ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
    [ServiceId])
groupSub Map ServiceId APrivateAuthKey
pending) (Bool
False, [], ([], []), []) (NonEmpty (ServiceId, APrivateAuthKey)
-> NonEmpty (Either SMPClientError (Maybe ServiceId))
-> NonEmpty
     ((ServiceId, APrivateAuthKey),
      Either SMPClientError (Maybe ServiceId))
forall a b. NonEmpty a -> NonEmpty b -> NonEmpty (a, b)
L.zip NonEmpty (ServiceId, APrivateAuthKey)
subs NonEmpty (Either SMPClientError (Maybe ServiceId))
rs)
      Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(ServiceId, (ByteString, APrivateAuthKey))] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(ServiceId, (ByteString, APrivateAuthKey))]
qOks) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p
-> SMPServer
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> STM ()
forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> STM ()
addActiveSubs SMPClientAgent p
ca SMPServer
srv [(ServiceId, (ByteString, APrivateAuthKey))]
qOks
      Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([ServiceId] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ServiceId]
sQs) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe ServiceId -> (ServiceId -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ServiceId
smpServiceId ((ServiceId -> STM ()) -> STM ())
-> (ServiceId -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ServiceId
serviceId ->
        SMPClientAgent p
-> SMPServer -> ((ServiceId, Int64), ByteString) -> STM ()
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ((ServiceId, Int64), ByteString) -> STM ()
updateActiveServiceSub SMPClientAgent p
ca SMPServer
srv ((ServiceId
serviceId, Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ [ServiceId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ServiceId]
sQs), ByteString
sessId)
      Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([ServiceId] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ServiceId]
notPending) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> [ServiceId] -> STM ()
forall (p :: Party).
SMPClientAgent p -> SMPServer -> [ServiceId] -> STM ()
removePendingSubs SMPClientAgent p
ca SMPServer
srv [ServiceId]
notPending
      (Bool, [(ServiceId, SMPClientError)],
 ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
 [ServiceId])
-> STM
     (Bool, [(ServiceId, SMPClientError)],
      ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
      [ServiceId])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool, [(ServiceId, SMPClientError)],
 ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
 [ServiceId])
acc
    sessId :: ByteString
sessId = THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (THandleParams SMPVersion 'TClient -> ByteString)
-> THandleParams SMPVersion 'TClient -> ByteString
forall a b. (a -> b) -> a -> b
$ SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams SMPClient
smp
    smpServiceId :: Maybe ServiceId
smpServiceId = (\THClientService {ServiceId
$sel:serviceId:THClientService :: forall k. THClientService' k -> ServiceId
serviceId :: ServiceId
serviceId} -> ServiceId
serviceId) (THClientService' PrivateKeyEd25519 -> ServiceId)
-> Maybe (THClientService' PrivateKeyEd25519) -> Maybe ServiceId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SMPClient -> Maybe (THClientService' PrivateKeyEd25519)
smpClientService SMPClient
smp
    groupSub ::
      Map QueueId C.APrivateAuthKey ->
      ((QueueId, C.APrivateAuthKey), Either SMPClientError (Maybe ServiceId)) ->
      (Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId]) ->
      (Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId])
    groupSub :: Map ServiceId APrivateAuthKey
-> ((ServiceId, APrivateAuthKey),
    Either SMPClientError (Maybe ServiceId))
-> (Bool, [(ServiceId, SMPClientError)],
    ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
    [ServiceId])
-> (Bool, [(ServiceId, SMPClientError)],
    ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
    [ServiceId])
groupSub Map ServiceId APrivateAuthKey
pending ((ServiceId
qId, APrivateAuthKey
pk), Either SMPClientError (Maybe ServiceId)
r) acc :: (Bool, [(ServiceId, SMPClientError)],
 ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
 [ServiceId])
acc@(!Bool
tempErrs, [(ServiceId, SMPClientError)]
finalErrs, oks :: ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId])
oks@([(ServiceId, (ByteString, APrivateAuthKey))]
qOks, [ServiceId]
sQs), [ServiceId]
notPending) = case Either SMPClientError (Maybe ServiceId)
r of
      Right Maybe ServiceId
serviceId_
        | ServiceId -> Map ServiceId APrivateAuthKey -> Bool
forall k a. Ord k => k -> Map k a -> Bool
M.member ServiceId
qId Map ServiceId APrivateAuthKey
pending ->
            let oks' :: ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId])
oks' = case (Maybe ServiceId
smpServiceId, Maybe ServiceId
serviceId_) of
                  (Just ServiceId
sId, Just ServiceId
sId') | ServiceId
sId ServiceId -> ServiceId -> Bool
forall a. Eq a => a -> a -> Bool
== ServiceId
sId' -> ([(ServiceId, (ByteString, APrivateAuthKey))]
qOks, ServiceId
qId ServiceId -> [ServiceId] -> [ServiceId]
forall a. a -> [a] -> [a]
: [ServiceId]
sQs)
                  (Maybe ServiceId, Maybe ServiceId)
_ -> ((ServiceId
qId, (ByteString
sessId, APrivateAuthKey
pk)) (ServiceId, (ByteString, APrivateAuthKey))
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> [(ServiceId, (ByteString, APrivateAuthKey))]
forall a. a -> [a] -> [a]
: [(ServiceId, (ByteString, APrivateAuthKey))]
qOks, [ServiceId]
sQs)
             in (Bool
tempErrs, [(ServiceId, SMPClientError)]
finalErrs, ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId])
oks', ServiceId
qId ServiceId -> [ServiceId] -> [ServiceId]
forall a. a -> [a] -> [a]
: [ServiceId]
notPending)
        | Bool
otherwise -> (Bool, [(ServiceId, SMPClientError)],
 ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId]),
 [ServiceId])
acc
      Left SMPClientError
e
        | SMPClientError -> Bool
forall err. ProtocolClientError err -> Bool
temporaryClientError SMPClientError
e -> (Bool
True, [(ServiceId, SMPClientError)]
finalErrs, ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId])
oks, [ServiceId]
notPending)
        | Bool
otherwise -> (Bool
tempErrs, (ServiceId
qId, SMPClientError
e) (ServiceId, SMPClientError)
-> [(ServiceId, SMPClientError)] -> [(ServiceId, SMPClientError)]
forall a. a -> [a] -> [a]
: [(ServiceId, SMPClientError)]
finalErrs, ([(ServiceId, (ByteString, APrivateAuthKey))], [ServiceId])
oks, ServiceId
qId ServiceId -> [ServiceId] -> [ServiceId]
forall a. a -> [a] -> [a]
: [ServiceId]
notPending)
    notify_ :: (SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
    notify_ :: forall a.
(SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
notify_ SMPServer -> NonEmpty a -> SMPClientAgentEvent
evt [a]
qs = (NonEmpty a -> IO ()) -> Maybe (NonEmpty a) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ())
-> (NonEmpty a -> SMPClientAgentEvent) -> NonEmpty a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPServer -> NonEmpty a -> SMPClientAgentEvent
evt SMPServer
srv) (Maybe (NonEmpty a) -> IO ()) -> Maybe (NonEmpty a) -> IO ()
forall a b. (a -> b) -> a -> b
$ [a] -> Maybe (NonEmpty a)
forall a. [a] -> Maybe (NonEmpty a)
L.nonEmpty [a]
qs

subscribeServiceNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeServiceNtfs :: SMPClientAgent 'NotifierService
-> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeServiceNtfs = SMPClientAgent 'NotifierService
-> SMPServer -> (ServiceId, Int64) -> IO ()
forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClientAgent p -> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeService_
{-# INLINE subscribeServiceNtfs #-}

subscribeService_ :: (PartyI p, ServiceParty p) => SMPClientAgent p -> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeService_ :: forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClientAgent p -> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeService_ SMPClientAgent p
ca SMPServer
srv (ServiceId, Int64)
serviceSub = do
  STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
forall (p :: Party).
SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub SMPClientAgent p
ca SMPServer
srv (Maybe (ServiceId, Int64) -> STM ())
-> Maybe (ServiceId, Int64) -> STM ()
forall a b. (a -> b) -> a -> b
$ (ServiceId, Int64) -> Maybe (ServiceId, Int64)
forall a. a -> Maybe a
Just (ServiceId, Int64)
serviceSub
  ExceptT SMPClientError IO SMPClient
-> IO (Either SMPClientError SMPClient)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' SMPClientAgent p
ca SMPServer
srv) IO (Either SMPClientError SMPClient)
-> (Either SMPClientError SMPClient -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Right SMPClient
smp -> SMPClientAgent p
-> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClientAgent p
-> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
smpSubscribeService SMPClientAgent p
ca SMPClient
smp SMPServer
srv (ServiceId, Int64)
serviceSub
    Left SMPClientError
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- no call to reconnectClient - failing getSMPServerClient' does that

smpSubscribeService :: (PartyI p, ServiceParty p) => SMPClientAgent p -> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
smpSubscribeService :: forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClientAgent p
-> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
smpSubscribeService SMPClientAgent p
ca SMPClient
smp SMPServer
srv serviceSub :: (ServiceId, Int64)
serviceSub@(ServiceId
serviceId, Int64
_) = case SMPClient -> Maybe (THClientService' PrivateKeyEd25519)
smpClientService SMPClient
smp of
  Just THClientService' PrivateKeyEd25519
service | THClientService' PrivateKeyEd25519 -> Bool
serviceAvailable THClientService' PrivateKeyEd25519
service -> IO ()
subscribe
  Maybe (THClientService' PrivateKeyEd25519)
_ -> IO ()
notifyUnavailable
  where
    subscribe :: IO ()
subscribe = do
      Either SMPClientError Int64
r <- ExceptT SMPClientError IO Int64 -> IO (Either SMPClientError Int64)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SMPClientError IO Int64
 -> IO (Either SMPClientError Int64))
-> ExceptT SMPClientError IO Int64
-> IO (Either SMPClientError Int64)
forall a b. (a -> b) -> a -> b
$ SMPClient -> SParty p -> ExceptT SMPClientError IO Int64
forall (p :: Party).
(PartyI p, ServiceParty p) =>
SMPClient -> SParty p -> ExceptT SMPClientError IO Int64
subscribeService SMPClient
smp (SParty p -> ExceptT SMPClientError IO Int64)
-> SParty p -> ExceptT SMPClientError IO Int64
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SParty p
forall (p :: Party). SMPClientAgent p -> SParty p
agentParty SMPClientAgent p
ca
      Bool
ok <-
        STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$
          STM Bool -> STM Bool -> STM Bool -> STM Bool
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM
            (SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
forall (p :: Party).
SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
activeClientSession SMPClientAgent p
ca SMPClient
smp SMPServer
srv)
            (Bool
True Bool -> STM () -> STM Bool
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Either SMPClientError Int64 -> STM ()
processSubscription Either SMPClientError Int64
r)
            (Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False)
      if Bool
ok
        then case Either SMPClientError Int64
r of
          Right Int64
n -> SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ()) -> SMPClientAgentEvent -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPServer -> (ServiceId, Int64) -> Int64 -> SMPClientAgentEvent
CAServiceSubscribed SMPServer
srv (ServiceId, Int64)
serviceSub Int64
n
          Left SMPClientError
e
            | SMPClientError -> Bool
smpClientServiceError SMPClientError
e -> IO ()
notifyUnavailable
            | SMPClientError -> Bool
forall err. ProtocolClientError err -> Bool
temporaryClientError SMPClientError
e -> SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv
            | Bool
otherwise -> SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ()) -> SMPClientAgentEvent -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPServer
-> (ServiceId, Int64) -> SMPClientError -> SMPClientAgentEvent
CAServiceSubError SMPServer
srv (ServiceId, Int64)
serviceSub SMPClientError
e
        else SMPClientAgent p -> SMPServer -> IO ()
forall (p :: Party). SMPClientAgent p -> SMPServer -> IO ()
reconnectClient SMPClientAgent p
ca SMPServer
srv
    processSubscription :: Either SMPClientError Int64 -> STM ()
processSubscription = (Int64 -> STM ()) -> Either SMPClientError Int64 -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((Int64 -> STM ()) -> Either SMPClientError Int64 -> STM ())
-> (Int64 -> STM ()) -> Either SMPClientError Int64 -> STM ()
forall a b. (a -> b) -> a -> b
$ \Int64
n -> do
      SMPClientAgent p
-> SMPServer -> Maybe ((ServiceId, Int64), ByteString) -> STM ()
forall (p :: Party).
SMPClientAgent p
-> SMPServer -> Maybe ((ServiceId, Int64), ByteString) -> STM ()
setActiveServiceSub SMPClientAgent p
ca SMPServer
srv (Maybe ((ServiceId, Int64), ByteString) -> STM ())
-> Maybe ((ServiceId, Int64), ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$ ((ServiceId, Int64), ByteString)
-> Maybe ((ServiceId, Int64), ByteString)
forall a. a -> Maybe a
Just ((ServiceId
serviceId, Int64
n), ByteString
sessId)
      SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
forall (p :: Party).
SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub SMPClientAgent p
ca SMPServer
srv Maybe (ServiceId, Int64)
forall a. Maybe a
Nothing
    serviceAvailable :: THClientService' PrivateKeyEd25519 -> Bool
serviceAvailable THClientService {SMPServiceRole
serviceRole :: SMPServiceRole
$sel:serviceRole:THClientService :: forall k. THClientService' k -> SMPServiceRole
serviceRole, $sel:serviceId:THClientService :: forall k. THClientService' k -> ServiceId
serviceId = ServiceId
serviceId'} =
      ServiceId
serviceId ServiceId -> ServiceId -> Bool
forall a. Eq a => a -> a -> Bool
== ServiceId
serviceId' Bool -> Bool -> Bool
&& SParty p -> SMPServiceRole
forall (p :: Party). ServiceParty p => SParty p -> SMPServiceRole
partyServiceRole (SMPClientAgent p -> SParty p
forall (p :: Party). SMPClientAgent p -> SParty p
agentParty SMPClientAgent p
ca) SMPServiceRole -> SMPServiceRole -> Bool
forall a. Eq a => a -> a -> Bool
== SMPServiceRole
serviceRole
    notifyUnavailable :: IO ()
notifyUnavailable = do
      STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
forall (p :: Party).
SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub SMPClientAgent p
ca SMPServer
srv Maybe (ServiceId, Int64)
forall a. Maybe a
Nothing
      SMPClientAgent p -> SMPClientAgentEvent -> IO ()
forall (m :: * -> *) (p :: Party).
MonadIO m =>
SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify SMPClientAgent p
ca (SMPClientAgentEvent -> IO ()) -> SMPClientAgentEvent -> IO ()
forall a b. (a -> b) -> a -> b
$ SMPServer -> (ServiceId, Int64) -> SMPClientAgentEvent
CAServiceUnavailable SMPServer
srv (ServiceId, Int64)
serviceSub -- this will resubscribe all queues directly
    sessId :: ByteString
sessId = THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (THandleParams SMPVersion 'TClient -> ByteString)
-> THandleParams SMPVersion 'TClient -> ByteString
forall a b. (a -> b) -> a -> b
$ SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams SMPClient
smp

activeClientSession' :: SMPClientAgent p -> SessionId -> SMPServer -> STM Bool
activeClientSession' :: forall (p :: Party).
SMPClientAgent p -> ByteString -> SMPServer -> STM Bool
activeClientSession' SMPClientAgent p
ca ByteString
sessId SMPServer
srv = Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> Bool
sameSess (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
 -> Bool)
-> STM
     (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
-> STM Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SMPServer
-> TMap SMPServer SMPClientVar
-> STM
     (Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient)))
forall k a. Ord k => k -> TMap k (SessionVar a) -> STM (Maybe a)
tryReadSessVar SMPServer
srv (SMPClientAgent p -> TMap SMPServer SMPClientVar
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer SMPClientVar
smpClients SMPClientAgent p
ca)
  where
    sameSess :: Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
-> Bool
sameSess = \case
      Just (Right (Bool
_, SMPClient
smp')) -> ByteString
sessId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams SMPClient
smp')
      Maybe (Either (SMPClientError, Maybe UTCTime) (Bool, SMPClient))
_ -> Bool
False

activeClientSession :: SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
activeClientSession :: forall (p :: Party).
SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
activeClientSession SMPClientAgent p
ca = SMPClientAgent p -> ByteString -> SMPServer -> STM Bool
forall (p :: Party).
SMPClientAgent p -> ByteString -> SMPServer -> STM Bool
activeClientSession' SMPClientAgent p
ca (ByteString -> SMPServer -> STM Bool)
-> (SMPClient -> ByteString) -> SMPClient -> SMPServer -> STM Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. THandleParams SMPVersion 'TClient -> ByteString
forall v (p :: TransportPeer). THandleParams v p -> ByteString
sessionId (THandleParams SMPVersion 'TClient -> ByteString)
-> (SMPClient -> THandleParams SMPVersion 'TClient)
-> SMPClient
-> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClient -> THandleParams SMPVersion 'TClient
forall v err msg.
ProtocolClient v err msg -> THandleParams v 'TClient
thParams

showServer :: SMPServer -> ByteString
showServer :: SMPServer -> ByteString
showServer ProtocolServer {NonEmpty TransportHost
$sel:host:ProtocolServer :: forall (p :: ProtocolType).
ProtocolServer p -> NonEmpty TransportHost
host :: NonEmpty TransportHost
host, HostName
port :: HostName
$sel:port:ProtocolServer :: forall (p :: ProtocolType). ProtocolServer p -> HostName
port} =
  NonEmpty TransportHost -> ByteString
forall a. StrEncoding a => a -> ByteString
strEncode NonEmpty TransportHost
host ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> HostName -> ByteString
B.pack (if HostName -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null HostName
port then HostName
"" else Char
':' Char -> HostName -> HostName
forall a. a -> [a] -> [a]
: HostName
port)

addActiveSubs :: SMPClientAgent p -> SMPServer -> [(QueueId, (SessionId, C.APrivateAuthKey))] -> STM ()
addActiveSubs :: forall (p :: Party).
SMPClientAgent p
-> SMPServer
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> STM ()
addActiveSubs = TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> SMPServer
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [(ServiceId, s)] -> STM ()
addSubsList_ (TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
 -> SMPServer
 -> [(ServiceId, (ByteString, APrivateAuthKey))]
 -> STM ())
-> (SMPClientAgent p
    -> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey)))
-> SMPClientAgent p
-> SMPServer
-> [(ServiceId, (ByteString, APrivateAuthKey))]
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs
{-# INLINE addActiveSubs #-}

addPendingSubs :: SMPClientAgent p -> SMPServer -> [(QueueId, C.APrivateAuthKey)] -> STM ()
addPendingSubs :: forall (p :: Party).
SMPClientAgent p
-> SMPServer -> [(ServiceId, APrivateAuthKey)] -> STM ()
addPendingSubs = TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> SMPServer -> [(ServiceId, APrivateAuthKey)] -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [(ServiceId, s)] -> STM ()
addSubsList_ (TMap SMPServer (TMap ServiceId APrivateAuthKey)
 -> SMPServer -> [(ServiceId, APrivateAuthKey)] -> STM ())
-> (SMPClientAgent p
    -> TMap SMPServer (TMap ServiceId APrivateAuthKey))
-> SMPClientAgent p
-> SMPServer
-> [(ServiceId, APrivateAuthKey)]
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs
{-# INLINE addPendingSubs #-}

addSubsList_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> [(QueueId, s)] -> STM ()
addSubsList_ :: forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [(ServiceId, s)] -> STM ()
addSubsList_ TMap SMPServer (TMap ServiceId s)
subs SMPServer
srv [(ServiceId, s)]
ss = TMap SMPServer (TMap ServiceId s)
-> SMPServer -> Map ServiceId s -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> Map ServiceId s -> STM ()
addSubs_ TMap SMPServer (TMap ServiceId s)
subs SMPServer
srv (Map ServiceId s -> STM ()) -> Map ServiceId s -> STM ()
forall a b. (a -> b) -> a -> b
$ [(ServiceId, s)] -> Map ServiceId s
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [(ServiceId, s)]
ss
  -- where
  --   ss' = M.fromList $ map (first (party,)) ss

addSubs_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> Map QueueId s -> STM ()
addSubs_ :: forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> Map ServiceId s -> STM ()
addSubs_ TMap SMPServer (TMap ServiceId s)
subs SMPServer
srv Map ServiceId s
ss =
  SMPServer
-> TMap SMPServer (TMap ServiceId s)
-> STM (Maybe (TMap ServiceId s))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv TMap SMPServer (TMap ServiceId s)
subs STM (Maybe (TMap ServiceId s))
-> (Maybe (TMap ServiceId s) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just TMap ServiceId s
m -> Map ServiceId s -> TMap ServiceId s -> STM ()
forall k a. Ord k => Map k a -> TMap k a -> STM ()
TM.union Map ServiceId s
ss TMap ServiceId s
m
    Maybe (TMap ServiceId s)
_ -> SMPServer
-> STM (TMap ServiceId s)
-> TMap SMPServer (TMap ServiceId s)
-> STM ()
forall k a. Ord k => k -> STM a -> TMap k a -> STM ()
TM.insertM SMPServer
srv (Map ServiceId s -> STM (TMap ServiceId s)
forall a. a -> STM (TVar a)
newTVar Map ServiceId s
ss) TMap SMPServer (TMap ServiceId s)
subs

setActiveServiceSub :: SMPClientAgent p -> SMPServer -> Maybe ((ServiceId, Int64), SessionId) -> STM ()
setActiveServiceSub :: forall (p :: Party).
SMPClientAgent p
-> SMPServer -> Maybe ((ServiceId, Int64), ByteString) -> STM ()
setActiveServiceSub = (SMPClientAgent p
 -> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString))))
-> SMPClientAgent p
-> SMPServer
-> Maybe ((ServiceId, Int64), ByteString)
-> STM ()
forall (p :: Party) sub.
(SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub)))
-> SMPClientAgent p -> SMPServer -> Maybe sub -> STM ()
setServiceSub_ SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs
{-# INLINE setActiveServiceSub #-}

setPendingServiceSub :: SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub :: forall (p :: Party).
SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub = (SMPClientAgent p
 -> TMap SMPServer (TVar (Maybe (ServiceId, Int64))))
-> SMPClientAgent p
-> SMPServer
-> Maybe (ServiceId, Int64)
-> STM ()
forall (p :: Party) sub.
(SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub)))
-> SMPClientAgent p -> SMPServer -> Maybe sub -> STM ()
setServiceSub_ SMPClientAgent p
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe (ServiceId, Int64)))
pendingServiceSubs
{-# INLINE setPendingServiceSub #-}

setServiceSub_ ::
  (SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub))) ->
  SMPClientAgent p ->
  SMPServer ->
  Maybe sub ->
  STM ()
setServiceSub_ :: forall (p :: Party) sub.
(SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub)))
-> SMPClientAgent p -> SMPServer -> Maybe sub -> STM ()
setServiceSub_ SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub))
subsSel SMPClientAgent p
ca SMPServer
srv Maybe sub
sub =
  SMPServer
-> TMap SMPServer (TVar (Maybe sub))
-> STM (Maybe (TVar (Maybe sub)))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv (SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub))
subsSel SMPClientAgent p
ca) STM (Maybe (TVar (Maybe sub)))
-> (Maybe (TVar (Maybe sub)) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just TVar (Maybe sub)
v -> TVar (Maybe sub) -> Maybe sub -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe sub)
v Maybe sub
sub
    Maybe (TVar (Maybe sub))
Nothing -> SMPServer
-> STM (TVar (Maybe sub))
-> TMap SMPServer (TVar (Maybe sub))
-> STM ()
forall k a. Ord k => k -> STM a -> TMap k a -> STM ()
TM.insertM SMPServer
srv (Maybe sub -> STM (TVar (Maybe sub))
forall a. a -> STM (TVar a)
newTVar Maybe sub
sub) (SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub))
subsSel SMPClientAgent p
ca)

updateActiveServiceSub :: SMPClientAgent p -> SMPServer -> ((ServiceId, Int64), SessionId) -> STM ()
updateActiveServiceSub :: forall (p :: Party).
SMPClientAgent p
-> SMPServer -> ((ServiceId, Int64), ByteString) -> STM ()
updateActiveServiceSub SMPClientAgent p
ca SMPServer
srv sub :: ((ServiceId, Int64), ByteString)
sub@((ServiceId
serviceId', Int64
n'), ByteString
sessId') =
  SMPServer
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> STM (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv (SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs SMPClientAgent p
ca) STM (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString))))
-> (Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))
    -> STM ())
-> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just TVar (Maybe ((ServiceId, Int64), ByteString))
v -> TVar (Maybe ((ServiceId, Int64), ByteString))
-> (Maybe ((ServiceId, Int64), ByteString)
    -> Maybe ((ServiceId, Int64), ByteString))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Maybe ((ServiceId, Int64), ByteString))
v ((Maybe ((ServiceId, Int64), ByteString)
  -> Maybe ((ServiceId, Int64), ByteString))
 -> STM ())
-> (Maybe ((ServiceId, Int64), ByteString)
    -> Maybe ((ServiceId, Int64), ByteString))
-> STM ()
forall a b. (a -> b) -> a -> b
$ \case
      Just ((ServiceId
serviceId, Int64
n), ByteString
sessId) | ServiceId
serviceId ServiceId -> ServiceId -> Bool
forall a. Eq a => a -> a -> Bool
== ServiceId
serviceId' Bool -> Bool -> Bool
&& ByteString
sessId ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
sessId' ->
        ((ServiceId, Int64), ByteString)
-> Maybe ((ServiceId, Int64), ByteString)
forall a. a -> Maybe a
Just ((ServiceId
serviceId, Int64
n Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
n'), ByteString
sessId)
      Maybe ((ServiceId, Int64), ByteString)
_ -> ((ServiceId, Int64), ByteString)
-> Maybe ((ServiceId, Int64), ByteString)
forall a. a -> Maybe a
Just ((ServiceId, Int64), ByteString)
sub
    Maybe (TVar (Maybe ((ServiceId, Int64), ByteString)))
Nothing -> SMPServer
-> STM (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
-> STM ()
forall k a. Ord k => k -> STM a -> TMap k a -> STM ()
TM.insertM SMPServer
srv (Maybe ((ServiceId, Int64), ByteString)
-> STM (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall a. a -> STM (TVar a)
newTVar (Maybe ((ServiceId, Int64), ByteString)
 -> STM (TVar (Maybe ((ServiceId, Int64), ByteString))))
-> Maybe ((ServiceId, Int64), ByteString)
-> STM (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall a b. (a -> b) -> a -> b
$ ((ServiceId, Int64), ByteString)
-> Maybe ((ServiceId, Int64), ByteString)
forall a. a -> Maybe a
Just ((ServiceId, Int64), ByteString)
sub) (SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TVar (Maybe ((ServiceId, Int64), ByteString)))
activeServiceSubs SMPClientAgent p
ca)

removeActiveSub :: SMPClientAgent p -> SMPServer -> QueueId -> STM ()
removeActiveSub :: forall (p :: Party).
SMPClientAgent p -> SMPServer -> ServiceId -> STM ()
removeActiveSub = TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> SMPServer -> ServiceId -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> ServiceId -> STM ()
removeSub_ (TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
 -> SMPServer -> ServiceId -> STM ())
-> (SMPClientAgent p
    -> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey)))
-> SMPClientAgent p
-> SMPServer
-> ServiceId
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs
{-# INLINE removeActiveSub #-}

removePendingSub :: SMPClientAgent p -> SMPServer -> QueueId -> STM ()
removePendingSub :: forall (p :: Party).
SMPClientAgent p -> SMPServer -> ServiceId -> STM ()
removePendingSub = TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> SMPServer -> ServiceId -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> ServiceId -> STM ()
removeSub_ (TMap SMPServer (TMap ServiceId APrivateAuthKey)
 -> SMPServer -> ServiceId -> STM ())
-> (SMPClientAgent p
    -> TMap SMPServer (TMap ServiceId APrivateAuthKey))
-> SMPClientAgent p
-> SMPServer
-> ServiceId
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs
{-# INLINE removePendingSub #-}

removeSub_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> QueueId -> STM ()
removeSub_ :: forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> ServiceId -> STM ()
removeSub_ TMap SMPServer (TMap ServiceId s)
subs SMPServer
srv ServiceId
s = SMPServer
-> TMap SMPServer (TMap ServiceId s)
-> STM (Maybe (TMap ServiceId s))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv TMap SMPServer (TMap ServiceId s)
subs STM (Maybe (TMap ServiceId s))
-> (Maybe (TMap ServiceId s) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (TMap ServiceId s -> STM ()) -> Maybe (TMap ServiceId s) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ServiceId -> TMap ServiceId s -> STM ()
forall k a. Ord k => k -> TMap k a -> STM ()
TM.delete ServiceId
s)

removeActiveSubs :: SMPClientAgent p -> SMPServer -> [QueueId] -> STM ()
removeActiveSubs :: forall (p :: Party).
SMPClientAgent p -> SMPServer -> [ServiceId] -> STM ()
removeActiveSubs = TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
-> SMPServer -> [ServiceId] -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [ServiceId] -> STM ()
removeSubs_ (TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
 -> SMPServer -> [ServiceId] -> STM ())
-> (SMPClientAgent p
    -> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey)))
-> SMPClientAgent p
-> SMPServer
-> [ServiceId]
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
forall (p :: Party).
SMPClientAgent p
-> TMap SMPServer (TMap ServiceId (ByteString, APrivateAuthKey))
activeQueueSubs
{-# INLINE removeActiveSubs #-}

removePendingSubs :: SMPClientAgent p -> SMPServer -> [QueueId] -> STM ()
removePendingSubs :: forall (p :: Party).
SMPClientAgent p -> SMPServer -> [ServiceId] -> STM ()
removePendingSubs = TMap SMPServer (TMap ServiceId APrivateAuthKey)
-> SMPServer -> [ServiceId] -> STM ()
forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [ServiceId] -> STM ()
removeSubs_ (TMap SMPServer (TMap ServiceId APrivateAuthKey)
 -> SMPServer -> [ServiceId] -> STM ())
-> (SMPClientAgent p
    -> TMap SMPServer (TMap ServiceId APrivateAuthKey))
-> SMPClientAgent p
-> SMPServer
-> [ServiceId]
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
forall (p :: Party).
SMPClientAgent p -> TMap SMPServer (TMap ServiceId APrivateAuthKey)
pendingQueueSubs
{-# INLINE removePendingSubs #-}

removeSubs_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> [QueueId] -> STM ()
removeSubs_ :: forall s.
TMap SMPServer (TMap ServiceId s)
-> SMPServer -> [ServiceId] -> STM ()
removeSubs_ TMap SMPServer (TMap ServiceId s)
subs SMPServer
srv [ServiceId]
qs = SMPServer
-> TMap SMPServer (TMap ServiceId s)
-> STM (Maybe (TMap ServiceId s))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup SMPServer
srv TMap SMPServer (TMap ServiceId s)
subs STM (Maybe (TMap ServiceId s))
-> (Maybe (TMap ServiceId s) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (TMap ServiceId s -> STM ()) -> Maybe (TMap ServiceId s) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TMap ServiceId s -> (Map ServiceId s -> Map ServiceId s) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
`modifyTVar'` (Map ServiceId s -> Set ServiceId -> Map ServiceId s
forall k a. Ord k => Map k a -> Set k -> Map k a
`M.withoutKeys` [ServiceId] -> Set ServiceId
forall a. Ord a => [a] -> Set a
S.fromList [ServiceId]
qs))