{-# LANGUAGE CPP #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
#if __GLASGOW_HASKELL__ == 810
{-# LANGUAGE UndecidableInstances #-}
#endif
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
module Simplex.Messaging.Server.Env.STM
( ServerConfig (..),
ServerStoreCfg (..),
SupportedStore,
StorePaths (..),
StartOptions (..),
Env (..),
Server (..),
ServerSubscribers (..),
SubscribedClients,
ProxyAgent (..),
Client (..),
ClientId,
ClientSub (..),
Sub (..),
ServerSub (..),
SubscriptionThread (..),
MsgStoreType,
MsgStore (..),
AStoreType (..),
VerifiedTransmission,
ResponseAndMessage,
newEnv,
mkJournalStoreConfig,
msgStore,
fromMsgStore,
newClient,
getServerClients,
getServerClient,
insertServerClient,
deleteServerClient,
getSubscribedClients,
getSubscribedClient,
upsertSubscribedClient,
lookupSubscribedClient,
lookupDeleteSubscribedClient,
deleteSubcribedClient,
sameClientId,
sameClient,
newSubscription,
newProhibitedSub,
defaultMsgQueueQuota,
defMsgExpirationDays,
defNtfExpirationHours,
defaultMessageExpiration,
defaultNtfExpiration,
defaultInactiveClientExpiration,
defaultProxyClientConcurrency,
defaultMaxJournalMsgCount,
defaultMaxJournalStateLines,
defaultIdleQueueInterval,
journalMsgStoreDepth,
readWriteQueueStore,
noPostgresExitStr,
noPostgresExit,
dbStoreCfg,
storeLogFile',
)
where
import Control.Concurrent (ThreadId)
import Control.Logger.Simple
import Control.Monad
import qualified Crypto.PubKey.RSA as RSA
import Crypto.Random
import Data.Int (Int64)
import Data.IntMap.Strict (IntMap)
import qualified Data.IntMap.Strict as IM
import Data.IntSet (IntSet)
import qualified Data.IntSet as IS
import Data.Kind (Constraint)
import Data.List (intercalate)
import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import Data.Maybe (isJust)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime, nominalDay)
import Data.Time.Clock.System (SystemTime)
import qualified Data.X509 as X
import Data.X509.Validation (Fingerprint (..))
import GHC.TypeLits (TypeError)
import qualified GHC.TypeLits as TE
import Network.Socket (ServiceName)
import qualified Network.TLS as T
import Numeric.Natural
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..))
import Simplex.Messaging.Client.Agent (SMPClientAgent, SMPClientAgentConfig, newSMPClientAgent)
import Simplex.Messaging.Crypto (KeyHash (..))
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.Information
import Simplex.Messaging.Server.MsgStore.Journal
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.Postgres.Config
import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore, setStoreLog)
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Server.StoreLog.ReadWrite
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ASrvTransport, SMPVersion, THandleParams, TransportPeer (..), VersionRangeSMP)
import Simplex.Messaging.Transport.Server
import Simplex.Messaging.Util (ifM, whenM, ($>>=))
import System.Directory (doesFileExist)
import System.Exit (exitFailure)
import System.IO (IOMode (..))
import System.Mem.Weak (Weak)
import UnliftIO.STM
#if defined(dbServerPostgres)
import Simplex.Messaging.Server.MsgStore.Postgres
#endif
data ServerConfig s = ServerConfig
{ forall s. ServerConfig s -> [(String, ASrvTransport, AddHTTP)]
transports :: [(ServiceName, ASrvTransport, AddHTTP)],
forall s. ServerConfig s -> Int
smpHandshakeTimeout :: Int,
forall s. ServerConfig s -> Natural
tbqSize :: Natural,
forall s. ServerConfig s -> Int
msgQueueQuota :: Int,
forall s. ServerConfig s -> Int
maxJournalMsgCount :: Int,
forall s. ServerConfig s -> Int
maxJournalStateLines :: Int,
forall s. ServerConfig s -> Int
queueIdBytes :: Int,
forall s. ServerConfig s -> Int
msgIdBytes :: Int,
forall s. ServerConfig s -> ServerStoreCfg s
serverStoreCfg :: ServerStoreCfg s,
forall s. ServerConfig s -> Maybe String
storeNtfsFile :: Maybe FilePath,
forall s. ServerConfig s -> AddHTTP
allowNewQueues :: Bool,
forall s. ServerConfig s -> Maybe BasicAuth
newQueueBasicAuth :: Maybe BasicAuth,
forall s. ServerConfig s -> Maybe BasicAuth
controlPortUserAuth :: Maybe BasicAuth,
forall s. ServerConfig s -> Maybe BasicAuth
controlPortAdminAuth :: Maybe BasicAuth,
forall s. ServerConfig s -> Int
dailyBlockQueueQuota :: Int,
forall s. ServerConfig s -> Maybe ExpirationConfig
messageExpiration :: Maybe ExpirationConfig,
forall s. ServerConfig s -> AddHTTP
expireMessagesOnStart :: Bool,
forall s. ServerConfig s -> AddHTTP
expireMessagesOnSend :: Bool,
forall s. ServerConfig s -> Int64
idleQueueInterval :: Int64,
forall s. ServerConfig s -> ExpirationConfig
notificationExpiration :: ExpirationConfig,
forall s. ServerConfig s -> Maybe ExpirationConfig
inactiveClientExpiration :: Maybe ExpirationConfig,
forall s. ServerConfig s -> Maybe Int64
logStatsInterval :: Maybe Int64,
forall s. ServerConfig s -> Int64
logStatsStartTime :: Int64,
forall s. ServerConfig s -> String
serverStatsLogFile :: FilePath,
forall s. ServerConfig s -> Maybe String
serverStatsBackupFile :: Maybe FilePath,
forall s. ServerConfig s -> Maybe Int
prometheusInterval :: Maybe Int,
forall s. ServerConfig s -> String
prometheusMetricsFile :: FilePath,
forall s. ServerConfig s -> Int
ntfDeliveryInterval :: Int,
forall s. ServerConfig s -> Int
pendingENDInterval :: Int,
forall s. ServerConfig s -> ServerCredentials
smpCredentials :: ServerCredentials,
forall s. ServerConfig s -> Maybe ServerCredentials
httpCredentials :: Maybe ServerCredentials,
forall s. ServerConfig s -> VersionRangeSMP
smpServerVRange :: VersionRangeSMP,
forall s. ServerConfig s -> TransportServerConfig
transportConfig :: TransportServerConfig,
forall s. ServerConfig s -> Maybe String
controlPort :: Maybe ServiceName,
forall s. ServerConfig s -> SMPClientAgentConfig
smpAgentCfg :: SMPClientAgentConfig,
forall s. ServerConfig s -> AddHTTP
allowSMPProxy :: Bool,
forall s. ServerConfig s -> Int
serverClientConcurrency :: Int,
forall s. ServerConfig s -> Maybe ServerPublicInfo
information :: Maybe ServerPublicInfo,
forall s. ServerConfig s -> StartOptions
startOptions :: StartOptions
}
data StartOptions = StartOptions
{ StartOptions -> AddHTTP
maintenance :: Bool,
StartOptions -> AddHTTP
compactLog :: Bool,
StartOptions -> LogLevel
logLevel :: LogLevel,
StartOptions -> AddHTTP
skipWarnings :: Bool,
StartOptions -> MigrationConfirmation
confirmMigrations :: MigrationConfirmation
}
defMsgExpirationDays :: Int64
defMsgExpirationDays :: Int64
defMsgExpirationDays = Int64
21
defaultMessageExpiration :: ExpirationConfig
defaultMessageExpiration :: ExpirationConfig
defaultMessageExpiration =
ExpirationConfig
{ ttl :: Int64
ttl = Int64
defMsgExpirationDays Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
86400,
checkInterval :: Int64
checkInterval = Int64
7200
}
defaultIdleQueueInterval :: Int64
defaultIdleQueueInterval :: Int64
defaultIdleQueueInterval = Int64
14400
defNtfExpirationHours :: Int64
defNtfExpirationHours :: Int64
defNtfExpirationHours = Int64
24
defaultNtfExpiration :: ExpirationConfig
defaultNtfExpiration :: ExpirationConfig
defaultNtfExpiration =
ExpirationConfig
{ ttl :: Int64
ttl = Int64
defNtfExpirationHours Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
3600,
checkInterval :: Int64
checkInterval = Int64
3600
}
defaultInactiveClientExpiration :: ExpirationConfig
defaultInactiveClientExpiration :: ExpirationConfig
defaultInactiveClientExpiration =
ExpirationConfig
{ ttl :: Int64
ttl = Int64
21600,
checkInterval :: Int64
checkInterval = Int64
3600
}
defaultProxyClientConcurrency :: Int
defaultProxyClientConcurrency :: Int
defaultProxyClientConcurrency = Int
32
journalMsgStoreDepth :: Int
journalMsgStoreDepth :: Int
journalMsgStoreDepth = Int
5
defaultMaxJournalStateLines :: Int
defaultMaxJournalStateLines :: Int
defaultMaxJournalStateLines = Int
16
defaultMaxJournalMsgCount :: Int
defaultMaxJournalMsgCount :: Int
defaultMaxJournalMsgCount = Int
256
defaultMsgQueueQuota :: Int
defaultMsgQueueQuota :: Int
defaultMsgQueueQuota = Int
128
defaultStateTailSize :: Int
defaultStateTailSize :: Int
defaultStateTailSize = Int
512
data Env s = Env
{ forall s. Env s -> ServerConfig s
config :: ServerConfig s,
forall s. Env s -> TVar AddHTTP
serverActive :: TVar Bool,
forall s. Env s -> ServerInformation
serverInfo :: ServerInformation,
forall s. Env s -> Server s
server :: Server s,
forall s. Env s -> KeyHash
serverIdentity :: KeyHash,
forall s. Env s -> MsgStore s
msgStore_ :: MsgStore s,
forall s. Env s -> NtfStore
ntfStore :: NtfStore,
forall s. Env s -> TVar ChaChaDRG
random :: TVar ChaChaDRG,
forall s. Env s -> Credential
tlsServerCreds :: T.Credential,
forall s. Env s -> Maybe Credential
httpServerCreds :: Maybe T.Credential,
forall s. Env s -> ServerStats
serverStats :: ServerStats,
forall s. Env s -> TVar [(String, SocketState)]
sockets :: TVar [(ServiceName, SocketState)],
forall s. Env s -> TVar Int
clientSeq :: TVar ClientId,
forall s. Env s -> ProxyAgent
proxyAgent :: ProxyAgent
}
msgStore :: Env s -> s
msgStore :: forall s. Env s -> s
msgStore = MsgStore s -> s
forall s. MsgStore s -> s
fromMsgStore (MsgStore s -> s) -> (Env s -> MsgStore s) -> Env s -> s
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env s -> MsgStore s
forall s. Env s -> MsgStore s
msgStore_
{-# INLINE msgStore #-}
fromMsgStore :: MsgStore s -> s
fromMsgStore :: forall s. MsgStore s -> s
fromMsgStore = \case
StoreMemory STMMsgStore
s -> s
STMMsgStore
s
StoreJournal JournalMsgStore qs
s -> s
JournalMsgStore qs
s
#if defined(dbServerPostgres)
StoreDatabase s -> s
#endif
{-# INLINE fromMsgStore #-}
type family SupportedStore (qs :: QSType) (ms :: MSType) :: Constraint where
SupportedStore 'QSMemory 'MSMemory = ()
SupportedStore 'QSMemory 'MSJournal = ()
SupportedStore 'QSMemory 'MSPostgres =
(Int ~ Bool, TypeError ('TE.Text "Storing messages in Postgres DB with queues in memory is not supported"))
SupportedStore 'QSPostgres 'MSMemory =
(Int ~ Bool, TypeError ('TE.Text "Storing messages in memory with queues in Postgres DB is not supported"))
SupportedStore 'QSPostgres 'MSJournal = ()
#if defined(dbServerPostgres)
SupportedStore 'QSPostgres 'MSPostgres = ()
#else
SupportedStore 'QSPostgres 'MSPostgres =
(Int ~ Bool, TypeError ('TE.Text "Server compiled without server_postgres flag"))
#endif
data AStoreType =
forall qs ms. (SupportedStore qs ms, MsgStoreClass (MsgStoreType qs ms)) =>
ASType (SQSType qs) (SMSType ms)
data ServerStoreCfg s where
SSCMemory :: Maybe StorePaths -> ServerStoreCfg STMMsgStore
SSCMemoryJournal :: {ServerStoreCfg (JournalMsgStore 'QSMemory) -> String
storeLogFile :: FilePath, ServerStoreCfg (JournalMsgStore 'QSMemory) -> String
storeMsgsPath :: FilePath} -> ServerStoreCfg (JournalMsgStore 'QSMemory)
SSCDatabaseJournal :: {ServerStoreCfg (JournalMsgStore 'QSPostgres) -> PostgresStoreCfg
storeCfg :: PostgresStoreCfg, ServerStoreCfg (JournalMsgStore 'QSPostgres) -> String
storeMsgsPath' :: FilePath} -> ServerStoreCfg (JournalMsgStore 'QSPostgres)
#if defined(dbServerPostgres)
SSCDatabase :: PostgresStoreCfg -> ServerStoreCfg PostgresMsgStore
#endif
dbStoreCfg :: ServerStoreCfg s -> Maybe PostgresStoreCfg
dbStoreCfg :: forall s. ServerStoreCfg s -> Maybe PostgresStoreCfg
dbStoreCfg = \case
SSCMemory Maybe StorePaths
_ -> Maybe PostgresStoreCfg
forall a. Maybe a
Nothing
SSCMemoryJournal {} -> Maybe PostgresStoreCfg
forall a. Maybe a
Nothing
SSCDatabaseJournal {PostgresStoreCfg
$sel:storeCfg:SSCMemory :: ServerStoreCfg (JournalMsgStore 'QSPostgres) -> PostgresStoreCfg
storeCfg :: PostgresStoreCfg
storeCfg} -> PostgresStoreCfg -> Maybe PostgresStoreCfg
forall a. a -> Maybe a
Just PostgresStoreCfg
storeCfg
#if defined(dbServerPostgres)
SSCDatabase cfg -> Just cfg
#endif
storeLogFile' :: ServerStoreCfg s -> Maybe FilePath
storeLogFile' :: forall s. ServerStoreCfg s -> Maybe String
storeLogFile' = \case
SSCMemory Maybe StorePaths
sp_ -> (\StorePaths {String
storeLogFile :: String
$sel:storeLogFile:StorePaths :: StorePaths -> String
storeLogFile} -> String
storeLogFile) (StorePaths -> String) -> Maybe StorePaths -> Maybe String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe StorePaths
sp_
SSCMemoryJournal {String
$sel:storeLogFile:SSCMemory :: ServerStoreCfg (JournalMsgStore 'QSMemory) -> String
storeLogFile :: String
storeLogFile} -> String -> Maybe String
forall a. a -> Maybe a
Just String
storeLogFile
SSCDatabaseJournal {$sel:storeCfg:SSCMemory :: ServerStoreCfg (JournalMsgStore 'QSPostgres) -> PostgresStoreCfg
storeCfg = PostgresStoreCfg {Maybe String
dbStoreLogPath :: Maybe String
dbStoreLogPath :: PostgresStoreCfg -> Maybe String
dbStoreLogPath}} -> Maybe String
dbStoreLogPath
#if defined(dbServerPostgres)
SSCDatabase (PostgresStoreCfg {dbStoreLogPath}) -> dbStoreLogPath
#endif
data StorePaths = StorePaths {StorePaths -> String
storeLogFile :: FilePath, StorePaths -> Maybe String
storeMsgsFile :: Maybe FilePath}
type family MsgStoreType (qs :: QSType) (ms :: MSType) where
MsgStoreType 'QSMemory 'MSMemory = STMMsgStore
MsgStoreType qs 'MSJournal = JournalMsgStore qs
#if defined(dbServerPostgres)
MsgStoreType 'QSPostgres 'MSPostgres = PostgresMsgStore
#endif
data MsgStore s where
StoreMemory :: STMMsgStore -> MsgStore STMMsgStore
StoreJournal :: JournalMsgStore qs -> MsgStore (JournalMsgStore qs)
#if defined(dbServerPostgres)
StoreDatabase :: PostgresMsgStore -> MsgStore PostgresMsgStore
#endif
data Server s = Server
{ forall s. Server s -> ServerClients s
clients :: ServerClients s,
forall s. Server s -> ServerSubscribers s
subscribers :: ServerSubscribers s,
forall s. Server s -> ServerSubscribers s
ntfSubscribers :: ServerSubscribers s,
forall s. Server s -> Lock
savingLock :: Lock
}
newtype ServerClients s = ServerClients {forall s. ServerClients s -> TVar (IntMap (Client s))
serverClients :: TVar (IntMap (Client s))}
data ServerSubscribers s = ServerSubscribers
{ forall s. ServerSubscribers s -> TQueue (ClientSub, Int)
subQ :: TQueue (ClientSub, ClientId),
forall s. ServerSubscribers s -> SubscribedClients s
queueSubscribers :: SubscribedClients s,
forall s. ServerSubscribers s -> SubscribedClients s
serviceSubscribers :: SubscribedClients s,
forall s. ServerSubscribers s -> TVar Int64
totalServiceSubs :: TVar Int64,
forall s. ServerSubscribers s -> TVar IntSet
subClients :: TVar IntSet,
forall s.
ServerSubscribers s
-> TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
}
data SubscribedClients s = SubscribedClients (TMap EntityId (TVar (Maybe (Client s))))
getSubscribedClients :: SubscribedClients s -> IO (Map EntityId (TVar (Maybe (Client s))))
getSubscribedClients :: forall s.
SubscribedClients s -> IO (Map EntityId (TVar (Maybe (Client s))))
getSubscribedClients (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) = TMap EntityId (TVar (Maybe (Client s)))
-> IO (Map EntityId (TVar (Maybe (Client s))))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TMap EntityId (TVar (Maybe (Client s)))
cs
getSubscribedClient :: EntityId -> SubscribedClients s -> IO (Maybe (TVar (Maybe (Client s))))
getSubscribedClient :: forall s.
EntityId
-> SubscribedClients s -> IO (Maybe (TVar (Maybe (Client s))))
getSubscribedClient EntityId
entId (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) = EntityId
-> TMap EntityId (TVar (Maybe (Client s)))
-> IO (Maybe (TVar (Maybe (Client s))))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs
{-# INLINE getSubscribedClient #-}
upsertSubscribedClient :: EntityId -> Client s -> SubscribedClients s -> STM (Maybe (Client s))
upsertSubscribedClient :: forall s.
EntityId
-> Client s -> SubscribedClients s -> STM (Maybe (Client s))
upsertSubscribedClient EntityId
entId Client s
c (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) =
EntityId
-> TMap EntityId (TVar (Maybe (Client s)))
-> STM (Maybe (TVar (Maybe (Client s))))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs STM (Maybe (TVar (Maybe (Client s))))
-> (Maybe (TVar (Maybe (Client s))) -> STM (Maybe (Client s)))
-> STM (Maybe (Client s))
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (TVar (Maybe (Client s)))
Nothing -> Maybe (Client s)
forall a. Maybe a
Nothing Maybe (Client s) -> STM () -> STM (Maybe (Client s))
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ EntityId
-> STM (TVar (Maybe (Client s)))
-> TMap EntityId (TVar (Maybe (Client s)))
-> STM ()
forall k a. Ord k => k -> STM a -> TMap k a -> STM ()
TM.insertM EntityId
entId (Maybe (Client s) -> STM (TVar (Maybe (Client s)))
forall a. a -> STM (TVar a)
newTVar (Client s -> Maybe (Client s)
forall a. a -> Maybe a
Just Client s
c)) TMap EntityId (TVar (Maybe (Client s)))
cs
Just TVar (Maybe (Client s))
cv ->
TVar (Maybe (Client s)) -> STM (Maybe (Client s))
forall a. TVar a -> STM a
readTVar TVar (Maybe (Client s))
cv STM (Maybe (Client s))
-> (Maybe (Client s) -> STM (Maybe (Client s)))
-> STM (Maybe (Client s))
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Client s
c' | Client s -> Client s -> AddHTTP
forall s. Client s -> Client s -> AddHTTP
sameClientId Client s
c Client s
c' -> Maybe (Client s) -> STM (Maybe (Client s))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Client s)
forall a. Maybe a
Nothing
Maybe (Client s)
c_ -> Maybe (Client s)
c_ Maybe (Client s) -> STM () -> STM (Maybe (Client s))
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TVar (Maybe (Client s)) -> Maybe (Client s) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Client s))
cv (Client s -> Maybe (Client s)
forall a. a -> Maybe a
Just Client s
c)
lookupSubscribedClient :: EntityId -> SubscribedClients s -> STM (Maybe (Client s))
lookupSubscribedClient :: forall s. EntityId -> SubscribedClients s -> STM (Maybe (Client s))
lookupSubscribedClient EntityId
entId (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) = EntityId
-> TMap EntityId (TVar (Maybe (Client s)))
-> STM (Maybe (TVar (Maybe (Client s))))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs STM (Maybe (TVar (Maybe (Client s))))
-> (TVar (Maybe (Client s)) -> STM (Maybe (Client s)))
-> STM (Maybe (Client s))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= TVar (Maybe (Client s)) -> STM (Maybe (Client s))
forall a. TVar a -> STM a
readTVar
{-# INLINE lookupSubscribedClient #-}
lookupDeleteSubscribedClient :: EntityId -> SubscribedClients s -> STM (Maybe (Client s))
lookupDeleteSubscribedClient :: forall s. EntityId -> SubscribedClients s -> STM (Maybe (Client s))
lookupDeleteSubscribedClient EntityId
entId (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) =
EntityId
-> TMap EntityId (TVar (Maybe (Client s)))
-> STM (Maybe (TVar (Maybe (Client s))))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookupDelete EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs STM (Maybe (TVar (Maybe (Client s))))
-> (TVar (Maybe (Client s)) -> STM (Maybe (Client s)))
-> STM (Maybe (Client s))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= (TVar (Maybe (Client s))
-> Maybe (Client s) -> STM (Maybe (Client s))
forall a. TVar a -> a -> STM a
`swapTVar` Maybe (Client s)
forall a. Maybe a
Nothing)
{-# INLINE lookupDeleteSubscribedClient #-}
deleteSubcribedClient :: EntityId -> Client s -> SubscribedClients s -> IO ()
deleteSubcribedClient :: forall s. EntityId -> Client s -> SubscribedClients s -> IO ()
deleteSubcribedClient EntityId
entId Client s
c (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) =
EntityId
-> TMap EntityId (TVar (Maybe (Client s)))
-> IO (Maybe (TVar (Maybe (Client s))))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs IO (Maybe (TVar (Maybe (Client s))))
-> (Maybe (TVar (Maybe (Client s))) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (TVar (Maybe (Client s)) -> IO ())
-> Maybe (TVar (Maybe (Client s))) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\TVar (Maybe (Client s))
cv -> STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM AddHTTP -> STM () -> STM ()
forall (m :: * -> *). Monad m => m AddHTTP -> m () -> m ()
whenM (Client s -> TVar (Maybe (Client s)) -> STM AddHTTP
forall s. Client s -> TVar (Maybe (Client s)) -> STM AddHTTP
sameClient Client s
c TVar (Maybe (Client s))
cv) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Client s)) -> STM ()
delete TVar (Maybe (Client s))
cv)
where
delete :: TVar (Maybe (Client s)) -> STM ()
delete TVar (Maybe (Client s))
cv = do
TVar (Maybe (Client s)) -> Maybe (Client s) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Client s))
cv Maybe (Client s)
forall a. Maybe a
Nothing
EntityId -> TMap EntityId (TVar (Maybe (Client s))) -> STM ()
forall k a. Ord k => k -> TMap k a -> STM ()
TM.delete EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs
sameClientId :: Client s -> (Client s) -> Bool
sameClientId :: forall s. Client s -> Client s -> AddHTTP
sameClientId Client s
c Client s
c' = Client s -> Int
forall s. Client s -> Int
clientId Client s
c Int -> Int -> AddHTTP
forall a. Eq a => a -> a -> AddHTTP
== Client s -> Int
forall s. Client s -> Int
clientId Client s
c'
{-# INLINE sameClientId #-}
sameClient :: Client s -> TVar (Maybe (Client s)) -> STM Bool
sameClient :: forall s. Client s -> TVar (Maybe (Client s)) -> STM AddHTTP
sameClient Client s
c TVar (Maybe (Client s))
cv = AddHTTP -> (Client s -> AddHTTP) -> Maybe (Client s) -> AddHTTP
forall b a. b -> (a -> b) -> Maybe a -> b
maybe AddHTTP
False (Client s -> Client s -> AddHTTP
forall s. Client s -> Client s -> AddHTTP
sameClientId Client s
c) (Maybe (Client s) -> AddHTTP)
-> STM (Maybe (Client s)) -> STM AddHTTP
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Maybe (Client s)) -> STM (Maybe (Client s))
forall a. TVar a -> STM a
readTVar TVar (Maybe (Client s))
cv
{-# INLINE sameClient #-}
data ClientSub
= CSClient QueueId (Maybe ServiceId) (Maybe ServiceId)
| CSDeleted QueueId (Maybe ServiceId)
| CSService ServiceId Int64
newtype ProxyAgent = ProxyAgent
{ ProxyAgent -> SMPClientAgent 'Sender
smpAgent :: SMPClientAgent 'Sender
}
type ClientId = Int
data Client s = Client
{ forall s. Client s -> Int
clientId :: ClientId,
forall s. Client s -> TMap EntityId Sub
subscriptions :: TMap RecipientId Sub,
forall s. Client s -> TMap EntityId ()
ntfSubscriptions :: TMap NotifierId (),
forall s. Client s -> TVar AddHTTP
serviceSubscribed :: TVar Bool,
forall s. Client s -> TVar AddHTTP
ntfServiceSubscribed :: TVar Bool,
forall s. Client s -> TVar Int64
serviceSubsCount :: TVar Int64,
forall s. Client s -> TVar Int64
ntfServiceSubsCount :: TVar Int64,
forall s. Client s -> TBQueue (NonEmpty (VerifiedTransmission s))
rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)),
forall s.
Client s
-> TBQueue
(NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg])
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]),
forall s. Client s -> TBQueue (NonEmpty (Transmission BrokerMsg))
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
forall s. Client s -> TVar Int
procThreads :: TVar Int,
forall s. Client s -> TVar (IntMap (Weak ThreadId))
endThreads :: TVar (IntMap (Weak ThreadId)),
forall s. Client s -> TVar Int
endThreadSeq :: TVar Int,
forall s. Client s -> THandleParams SMPVersion 'TServer
clientTHParams :: THandleParams SMPVersion 'TServer,
forall s. Client s -> TVar AddHTTP
connected :: TVar Bool,
forall s. Client s -> SystemTime
createdAt :: SystemTime,
forall s. Client s -> TVar SystemTime
rcvActiveAt :: TVar SystemTime,
forall s. Client s -> TVar SystemTime
sndActiveAt :: TVar SystemTime
}
type VerifiedTransmission s = (Maybe (StoreQueue s, QueueRec), Transmission Cmd)
type ResponseAndMessage = (Transmission BrokerMsg, Maybe (Transmission BrokerMsg))
data ServerSub = ServerSub (TVar SubscriptionThread) | ProhibitSub
data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId)
data Sub = Sub
{ Sub -> ServerSub
subThread :: ServerSub,
Sub -> TVar (Maybe (MsgId, SystemSeconds))
delivered :: TVar (Maybe (MsgId, SystemSeconds))
}
newServer :: IO (Server s)
newServer :: forall s. IO (Server s)
newServer = do
ServerClients s
clients <- TVar (IntMap (Client s)) -> ServerClients s
forall s. TVar (IntMap (Client s)) -> ServerClients s
ServerClients (TVar (IntMap (Client s)) -> ServerClients s)
-> IO (TVar (IntMap (Client s))) -> IO (ServerClients s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IntMap (Client s) -> IO (TVar (IntMap (Client s)))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO IntMap (Client s)
forall a. Monoid a => a
mempty
ServerSubscribers s
subscribers <- IO (ServerSubscribers s)
forall s. IO (ServerSubscribers s)
newServerSubscribers
ServerSubscribers s
ntfSubscribers <- IO (ServerSubscribers s)
forall s. IO (ServerSubscribers s)
newServerSubscribers
Lock
savingLock <- IO Lock
createLockIO
Server s -> IO (Server s)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Server {ServerClients s
$sel:clients:Server :: ServerClients s
clients :: ServerClients s
clients, ServerSubscribers s
$sel:subscribers:Server :: ServerSubscribers s
subscribers :: ServerSubscribers s
subscribers, ServerSubscribers s
$sel:ntfSubscribers:Server :: ServerSubscribers s
ntfSubscribers :: ServerSubscribers s
ntfSubscribers, Lock
$sel:savingLock:Server :: Lock
savingLock :: Lock
savingLock}
getServerClients :: Server s -> IO (IntMap (Client s))
getServerClients :: forall s. Server s -> IO (IntMap (Client s))
getServerClients = TVar (IntMap (Client s)) -> IO (IntMap (Client s))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar (IntMap (Client s)) -> IO (IntMap (Client s)))
-> (Server s -> TVar (IntMap (Client s)))
-> Server s
-> IO (IntMap (Client s))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ServerClients s -> TVar (IntMap (Client s))
forall s. ServerClients s -> TVar (IntMap (Client s))
serverClients (ServerClients s -> TVar (IntMap (Client s)))
-> (Server s -> ServerClients s)
-> Server s
-> TVar (IntMap (Client s))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Server s -> ServerClients s
forall s. Server s -> ServerClients s
clients
{-# INLINE getServerClients #-}
getServerClient :: ClientId -> Server s -> IO (Maybe (Client s))
getServerClient :: forall s. Int -> Server s -> IO (Maybe (Client s))
getServerClient Int
cId Server s
s = Int -> IntMap (Client s) -> Maybe (Client s)
forall a. Int -> IntMap a -> Maybe a
IM.lookup Int
cId (IntMap (Client s) -> Maybe (Client s))
-> IO (IntMap (Client s)) -> IO (Maybe (Client s))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Server s -> IO (IntMap (Client s))
forall s. Server s -> IO (IntMap (Client s))
getServerClients Server s
s
{-# INLINE getServerClient #-}
insertServerClient :: Client s -> Server s -> IO Bool
insertServerClient :: forall s. Client s -> Server s -> IO AddHTTP
insertServerClient c :: Client s
c@Client {Int
$sel:clientId:Client :: forall s. Client s -> Int
clientId :: Int
clientId, TVar AddHTTP
$sel:connected:Client :: forall s. Client s -> TVar AddHTTP
connected :: TVar AddHTTP
connected} Server {ServerClients s
$sel:clients:Server :: forall s. Server s -> ServerClients s
clients :: ServerClients s
clients} =
STM AddHTTP -> IO AddHTTP
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM AddHTTP -> IO AddHTTP) -> STM AddHTTP -> IO AddHTTP
forall a b. (a -> b) -> a -> b
$
STM AddHTTP -> STM AddHTTP -> STM AddHTTP -> STM AddHTTP
forall (m :: * -> *) a. Monad m => m AddHTTP -> m a -> m a -> m a
ifM
(TVar AddHTTP -> STM AddHTTP
forall a. TVar a -> STM a
readTVar TVar AddHTTP
connected)
(AddHTTP
True AddHTTP -> STM () -> STM AddHTTP
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TVar (IntMap (Client s))
-> (IntMap (Client s) -> IntMap (Client s)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (ServerClients s -> TVar (IntMap (Client s))
forall s. ServerClients s -> TVar (IntMap (Client s))
serverClients ServerClients s
clients) (Int -> Client s -> IntMap (Client s) -> IntMap (Client s)
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
clientId Client s
c))
(AddHTTP -> STM AddHTTP
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure AddHTTP
False)
{-# INLINE insertServerClient #-}
deleteServerClient :: ClientId -> Server s -> IO ()
deleteServerClient :: forall s. Int -> Server s -> IO ()
deleteServerClient Int
cId Server {ServerClients s
$sel:clients:Server :: forall s. Server s -> ServerClients s
clients :: ServerClients s
clients} = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (IntMap (Client s))
-> (IntMap (Client s) -> IntMap (Client s)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (ServerClients s -> TVar (IntMap (Client s))
forall s. ServerClients s -> TVar (IntMap (Client s))
serverClients ServerClients s
clients) ((IntMap (Client s) -> IntMap (Client s)) -> STM ())
-> (IntMap (Client s) -> IntMap (Client s)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Int -> IntMap (Client s) -> IntMap (Client s)
forall a. Int -> IntMap a -> IntMap a
IM.delete Int
cId
{-# INLINE deleteServerClient #-}
newServerSubscribers :: IO (ServerSubscribers s)
newServerSubscribers :: forall s. IO (ServerSubscribers s)
newServerSubscribers = do
TQueue (ClientSub, Int)
subQ <- IO (TQueue (ClientSub, Int))
forall (m :: * -> *) a. MonadIO m => m (TQueue a)
newTQueueIO
SubscribedClients s
queueSubscribers <- TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s
forall s.
TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s
SubscribedClients (TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s)
-> IO (TMap EntityId (TVar (Maybe (Client s))))
-> IO (SubscribedClients s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (TMap EntityId (TVar (Maybe (Client s))))
forall k a. IO (TMap k a)
TM.emptyIO
SubscribedClients s
serviceSubscribers <- TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s
forall s.
TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s
SubscribedClients (TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s)
-> IO (TMap EntityId (TVar (Maybe (Client s))))
-> IO (SubscribedClients s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (TMap EntityId (TVar (Maybe (Client s))))
forall k a. IO (TMap k a)
TM.emptyIO
TVar Int64
totalServiceSubs <- Int64 -> IO (TVar Int64)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int64
0
TVar IntSet
subClients <- IntSet -> IO (TVar IntSet)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO IntSet
IS.empty
TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
pendingEvents <- IntMap (NonEmpty (EntityId, BrokerMsg))
-> IO (TVar (IntMap (NonEmpty (EntityId, BrokerMsg))))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO IntMap (NonEmpty (EntityId, BrokerMsg))
forall a. IntMap a
IM.empty
ServerSubscribers s -> IO (ServerSubscribers s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ServerSubscribers {TQueue (ClientSub, Int)
$sel:subQ:ServerSubscribers :: TQueue (ClientSub, Int)
subQ :: TQueue (ClientSub, Int)
subQ, SubscribedClients s
$sel:queueSubscribers:ServerSubscribers :: SubscribedClients s
queueSubscribers :: SubscribedClients s
queueSubscribers, SubscribedClients s
$sel:serviceSubscribers:ServerSubscribers :: SubscribedClients s
serviceSubscribers :: SubscribedClients s
serviceSubscribers, TVar Int64
$sel:totalServiceSubs:ServerSubscribers :: TVar Int64
totalServiceSubs :: TVar Int64
totalServiceSubs, TVar IntSet
$sel:subClients:ServerSubscribers :: TVar IntSet
subClients :: TVar IntSet
subClients, TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
$sel:pendingEvents:ServerSubscribers :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
pendingEvents}
newClient :: ClientId -> Natural -> THandleParams SMPVersion 'TServer -> SystemTime -> IO (Client s)
newClient :: forall s.
Int
-> Natural
-> THandleParams SMPVersion 'TServer
-> SystemTime
-> IO (Client s)
newClient Int
clientId Natural
qSize THandleParams SMPVersion 'TServer
clientTHParams SystemTime
createdAt = do
TMap EntityId Sub
subscriptions <- IO (TMap EntityId Sub)
forall k a. IO (TMap k a)
TM.emptyIO
TMap EntityId ()
ntfSubscriptions <- IO (TMap EntityId ())
forall k a. IO (TMap k a)
TM.emptyIO
TVar AddHTTP
serviceSubscribed <- AddHTTP -> IO (TVar AddHTTP)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO AddHTTP
False
TVar AddHTTP
ntfServiceSubscribed <- AddHTTP -> IO (TVar AddHTTP)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO AddHTTP
False
TVar Int64
serviceSubsCount <- Int64 -> IO (TVar Int64)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int64
0
TVar Int64
ntfServiceSubsCount <- Int64 -> IO (TVar Int64)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int64
0
TBQueue
(NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd))
rcvQ <- Natural
-> IO
(TBQueue
(NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd)))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
qSize
TBQueue
(NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg])
sndQ <- Natural
-> IO
(TBQueue
(NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
qSize
TBQueue (NonEmpty (Transmission BrokerMsg))
msgQ <- Natural -> IO (TBQueue (NonEmpty (Transmission BrokerMsg)))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
qSize
TVar Int
procThreads <- Int -> IO (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
0
TVar (IntMap (Weak ThreadId))
endThreads <- IntMap (Weak ThreadId) -> IO (TVar (IntMap (Weak ThreadId)))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO IntMap (Weak ThreadId)
forall a. IntMap a
IM.empty
TVar Int
endThreadSeq <- Int -> IO (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
0
TVar AddHTTP
connected <- AddHTTP -> IO (TVar AddHTTP)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO AddHTTP
True
TVar SystemTime
rcvActiveAt <- SystemTime -> IO (TVar SystemTime)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO SystemTime
createdAt
TVar SystemTime
sndActiveAt <- SystemTime -> IO (TVar SystemTime)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO SystemTime
createdAt
Client s -> IO (Client s)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
Client
{ Int
$sel:clientId:Client :: Int
clientId :: Int
clientId,
TMap EntityId Sub
$sel:subscriptions:Client :: TMap EntityId Sub
subscriptions :: TMap EntityId Sub
subscriptions,
TMap EntityId ()
$sel:ntfSubscriptions:Client :: TMap EntityId ()
ntfSubscriptions :: TMap EntityId ()
ntfSubscriptions,
TVar AddHTTP
$sel:serviceSubscribed:Client :: TVar AddHTTP
serviceSubscribed :: TVar AddHTTP
serviceSubscribed,
TVar AddHTTP
$sel:ntfServiceSubscribed:Client :: TVar AddHTTP
ntfServiceSubscribed :: TVar AddHTTP
ntfServiceSubscribed,
TVar Int64
$sel:serviceSubsCount:Client :: TVar Int64
serviceSubsCount :: TVar Int64
serviceSubsCount,
TVar Int64
$sel:ntfServiceSubsCount:Client :: TVar Int64
ntfServiceSubsCount :: TVar Int64
ntfServiceSubsCount,
TBQueue
(NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd))
$sel:rcvQ:Client :: TBQueue
(NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd))
rcvQ :: TBQueue
(NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd))
rcvQ,
TBQueue
(NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg])
$sel:sndQ:Client :: TBQueue
(NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg])
sndQ :: TBQueue
(NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg])
sndQ,
TBQueue (NonEmpty (Transmission BrokerMsg))
$sel:msgQ:Client :: TBQueue (NonEmpty (Transmission BrokerMsg))
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg))
msgQ,
TVar Int
$sel:procThreads:Client :: TVar Int
procThreads :: TVar Int
procThreads,
TVar (IntMap (Weak ThreadId))
$sel:endThreads:Client :: TVar (IntMap (Weak ThreadId))
endThreads :: TVar (IntMap (Weak ThreadId))
endThreads,
TVar Int
$sel:endThreadSeq:Client :: TVar Int
endThreadSeq :: TVar Int
endThreadSeq,
THandleParams SMPVersion 'TServer
$sel:clientTHParams:Client :: THandleParams SMPVersion 'TServer
clientTHParams :: THandleParams SMPVersion 'TServer
clientTHParams,
TVar AddHTTP
$sel:connected:Client :: TVar AddHTTP
connected :: TVar AddHTTP
connected,
SystemTime
$sel:createdAt:Client :: SystemTime
createdAt :: SystemTime
createdAt,
TVar SystemTime
$sel:rcvActiveAt:Client :: TVar SystemTime
rcvActiveAt :: TVar SystemTime
rcvActiveAt,
TVar SystemTime
$sel:sndActiveAt:Client :: TVar SystemTime
sndActiveAt :: TVar SystemTime
sndActiveAt
}
newSubscription :: SubscriptionThread -> STM Sub
newSubscription :: SubscriptionThread -> STM Sub
newSubscription SubscriptionThread
st = do
TVar (Maybe (MsgId, SystemSeconds))
delivered <- Maybe (MsgId, SystemSeconds)
-> STM (TVar (Maybe (MsgId, SystemSeconds)))
forall a. a -> STM (TVar a)
newTVar Maybe (MsgId, SystemSeconds)
forall a. Maybe a
Nothing
ServerSub
subThread <- TVar SubscriptionThread -> ServerSub
ServerSub (TVar SubscriptionThread -> ServerSub)
-> STM (TVar SubscriptionThread) -> STM ServerSub
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SubscriptionThread -> STM (TVar SubscriptionThread)
forall a. a -> STM (TVar a)
newTVar SubscriptionThread
st
Sub -> STM Sub
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Sub {ServerSub
$sel:subThread:Sub :: ServerSub
subThread :: ServerSub
subThread, TVar (Maybe (MsgId, SystemSeconds))
$sel:delivered:Sub :: TVar (Maybe (MsgId, SystemSeconds))
delivered :: TVar (Maybe (MsgId, SystemSeconds))
delivered}
newProhibitedSub :: STM Sub
newProhibitedSub :: STM Sub
newProhibitedSub = do
TVar (Maybe (MsgId, SystemSeconds))
delivered <- Maybe (MsgId, SystemSeconds)
-> STM (TVar (Maybe (MsgId, SystemSeconds)))
forall a. a -> STM (TVar a)
newTVar Maybe (MsgId, SystemSeconds)
forall a. Maybe a
Nothing
Sub -> STM Sub
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Sub {$sel:subThread:Sub :: ServerSub
subThread = ServerSub
ProhibitSub, TVar (Maybe (MsgId, SystemSeconds))
$sel:delivered:Sub :: TVar (Maybe (MsgId, SystemSeconds))
delivered :: TVar (Maybe (MsgId, SystemSeconds))
delivered}
newEnv :: ServerConfig s -> IO (Env s)
newEnv :: forall s. ServerConfig s -> IO (Env s)
newEnv config :: ServerConfig s
config@ServerConfig {ServerCredentials
$sel:smpCredentials:ServerConfig :: forall s. ServerConfig s -> ServerCredentials
smpCredentials :: ServerCredentials
smpCredentials, Maybe ServerCredentials
$sel:httpCredentials:ServerConfig :: forall s. ServerConfig s -> Maybe ServerCredentials
httpCredentials :: Maybe ServerCredentials
httpCredentials, ServerStoreCfg s
$sel:serverStoreCfg:ServerConfig :: forall s. ServerConfig s -> ServerStoreCfg s
serverStoreCfg :: ServerStoreCfg s
serverStoreCfg, SMPClientAgentConfig
$sel:smpAgentCfg:ServerConfig :: forall s. ServerConfig s -> SMPClientAgentConfig
smpAgentCfg :: SMPClientAgentConfig
smpAgentCfg, Maybe ServerPublicInfo
$sel:information:ServerConfig :: forall s. ServerConfig s -> Maybe ServerPublicInfo
information :: Maybe ServerPublicInfo
information, Maybe ExpirationConfig
$sel:messageExpiration:ServerConfig :: forall s. ServerConfig s -> Maybe ExpirationConfig
messageExpiration :: Maybe ExpirationConfig
messageExpiration, Int64
$sel:idleQueueInterval:ServerConfig :: forall s. ServerConfig s -> Int64
idleQueueInterval :: Int64
idleQueueInterval, Int
$sel:msgQueueQuota:ServerConfig :: forall s. ServerConfig s -> Int
msgQueueQuota :: Int
msgQueueQuota, Int
$sel:maxJournalMsgCount:ServerConfig :: forall s. ServerConfig s -> Int
maxJournalMsgCount :: Int
maxJournalMsgCount, Int
$sel:maxJournalStateLines:ServerConfig :: forall s. ServerConfig s -> Int
maxJournalStateLines :: Int
maxJournalStateLines} = do
TVar AddHTTP
serverActive <- AddHTTP -> IO (TVar AddHTTP)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO AddHTTP
True
Server s
server <- IO (Server s)
forall s. IO (Server s)
newServer
MsgStore s
msgStore_ <- case ServerStoreCfg s
serverStoreCfg of
SSCMemory Maybe StorePaths
storePaths_ -> do
let storePath :: Maybe String
storePath = StorePaths -> Maybe String
storeMsgsFile (StorePaths -> Maybe String) -> Maybe StorePaths -> Maybe String
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe StorePaths
storePaths_
STMMsgStore
ms <- MsgStoreConfig STMMsgStore -> IO STMMsgStore
forall s. MsgStoreClass s => MsgStoreConfig s -> IO s
newMsgStore STMStoreConfig {Maybe String
storePath :: Maybe String
$sel:storePath:STMStoreConfig :: Maybe String
storePath, $sel:quota:STMStoreConfig :: Int
quota = Int
msgQueueQuota}
Maybe StorePaths -> (StorePaths -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe StorePaths
storePaths_ ((StorePaths -> IO ()) -> IO ()) -> (StorePaths -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \StorePaths {$sel:storeLogFile:StorePaths :: StorePaths -> String
storeLogFile = String
f} -> (EntityId -> QueueRec -> IO STMQueue)
-> String -> STMQueueStore STMQueue -> IO ()
forall q.
StoreQueueClass q =>
(EntityId -> QueueRec -> IO q)
-> String -> STMQueueStore q -> IO ()
loadStoreLog (STMMsgStore
-> AddHTTP -> EntityId -> QueueRec -> IO (StoreQueue STMMsgStore)
forall s.
MsgStoreClass s =>
s -> AddHTTP -> EntityId -> QueueRec -> IO (StoreQueue s)
mkQueue STMMsgStore
ms AddHTTP
True) String
f (STMQueueStore STMQueue -> IO ())
-> STMQueueStore STMQueue -> IO ()
forall a b. (a -> b) -> a -> b
$ STMMsgStore -> QueueStore STMMsgStore
forall s. MsgStoreClass s => s -> QueueStore s
queueStore STMMsgStore
ms
MsgStore STMMsgStore -> IO (MsgStore STMMsgStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgStore STMMsgStore -> IO (MsgStore STMMsgStore))
-> MsgStore STMMsgStore -> IO (MsgStore STMMsgStore)
forall a b. (a -> b) -> a -> b
$ STMMsgStore -> MsgStore STMMsgStore
StoreMemory STMMsgStore
ms
SSCMemoryJournal {String
$sel:storeLogFile:SSCMemory :: ServerStoreCfg (JournalMsgStore 'QSMemory) -> String
storeLogFile :: String
storeLogFile, String
$sel:storeMsgsPath:SSCMemory :: ServerStoreCfg (JournalMsgStore 'QSMemory) -> String
storeMsgsPath :: String
storeMsgsPath} -> do
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logWarn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
Text
"Journal message store is deprecated and will be removed soon.\n"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"Please migrate to in-memory storage using `journal export` command.\n"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"After that you can migrate to PostgreSQL using `database import` command."
let qsCfg :: QStoreCfg 'QSMemory
qsCfg = QStoreCfg 'QSMemory
MQStoreCfg
cfg :: JournalStoreConfig 'QSMemory
cfg = QStoreCfg 'QSMemory
-> String
-> Int
-> Int
-> Int
-> Int64
-> JournalStoreConfig 'QSMemory
forall (s :: QSType).
QStoreCfg s
-> String -> Int -> Int -> Int -> Int64 -> JournalStoreConfig s
mkJournalStoreConfig QStoreCfg 'QSMemory
qsCfg String
storeMsgsPath Int
msgQueueQuota Int
maxJournalMsgCount Int
maxJournalStateLines Int64
idleQueueInterval
JournalMsgStore 'QSMemory
ms <- MsgStoreConfig (JournalMsgStore 'QSMemory)
-> IO (JournalMsgStore 'QSMemory)
forall s. MsgStoreClass s => MsgStoreConfig s -> IO s
newMsgStore MsgStoreConfig (JournalMsgStore 'QSMemory)
JournalStoreConfig 'QSMemory
cfg
(EntityId -> QueueRec -> IO (JournalQueue 'QSMemory))
-> String -> STMQueueStore (JournalQueue 'QSMemory) -> IO ()
forall q.
StoreQueueClass q =>
(EntityId -> QueueRec -> IO q)
-> String -> STMQueueStore q -> IO ()
loadStoreLog (JournalMsgStore 'QSMemory
-> AddHTTP
-> EntityId
-> QueueRec
-> IO (StoreQueue (JournalMsgStore 'QSMemory))
forall s.
MsgStoreClass s =>
s -> AddHTTP -> EntityId -> QueueRec -> IO (StoreQueue s)
mkQueue JournalMsgStore 'QSMemory
ms AddHTTP
True) String
storeLogFile (STMQueueStore (JournalQueue 'QSMemory) -> IO ())
-> STMQueueStore (JournalQueue 'QSMemory) -> IO ()
forall a b. (a -> b) -> a -> b
$ JournalMsgStore 'QSMemory -> STMQueueStore (JournalQueue 'QSMemory)
stmQueueStore JournalMsgStore 'QSMemory
ms
MsgStore s -> IO (MsgStore s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgStore s -> IO (MsgStore s)) -> MsgStore s -> IO (MsgStore s)
forall a b. (a -> b) -> a -> b
$ JournalMsgStore 'QSMemory -> MsgStore (JournalMsgStore 'QSMemory)
forall (qs :: QSType).
JournalMsgStore qs -> MsgStore (JournalMsgStore qs)
StoreJournal JournalMsgStore 'QSMemory
ms
#if defined(dbServerPostgres)
SSCDatabaseJournal {storeCfg, storeMsgsPath'} -> do
let StartOptions {compactLog, confirmMigrations} = startOptions config
qsCfg = PQStoreCfg (storeCfg {confirmMigrations} :: PostgresStoreCfg)
cfg = mkJournalStoreConfig qsCfg storeMsgsPath' msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval
when compactLog $ compactDbStoreLog $ dbStoreLogPath storeCfg
StoreJournal <$> newMsgStore cfg
SSCDatabase storeCfg -> do
let StartOptions {compactLog, confirmMigrations} = startOptions config
cfg = PostgresMsgStoreCfg storeCfg {confirmMigrations} msgQueueQuota
when compactLog $ compactDbStoreLog $ dbStoreLogPath storeCfg
StoreDatabase <$> newMsgStore cfg
#else
SSCDatabaseJournal {} -> IO (MsgStore s)
forall a. IO a
noPostgresExit
#endif
NtfStore
ntfStore <- TMap EntityId (TVar [MsgNtf]) -> NtfStore
NtfStore (TMap EntityId (TVar [MsgNtf]) -> NtfStore)
-> IO (TMap EntityId (TVar [MsgNtf])) -> IO NtfStore
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (TMap EntityId (TVar [MsgNtf]))
forall k a. IO (TMap k a)
TM.emptyIO
TVar ChaChaDRG
random <- IO (TVar ChaChaDRG)
C.newRandom
Credential
tlsServerCreds <- String -> ServerCredentials -> IO Credential
getCredentials String
"SMP" ServerCredentials
smpCredentials
Maybe Credential
httpServerCreds <- (ServerCredentials -> IO Credential)
-> Maybe ServerCredentials -> IO (Maybe Credential)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM (String -> ServerCredentials -> IO Credential
getCredentials String
"HTTPS") Maybe ServerCredentials
httpCredentials
(Credential -> IO ()) -> Maybe Credential -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Credential -> IO ()
checkHTTPSCredentials Maybe Credential
httpServerCreds
Fingerprint MsgId
fp <- ServerCredentials -> IO Fingerprint
loadFingerprint ServerCredentials
smpCredentials
let serverIdentity :: KeyHash
serverIdentity = MsgId -> KeyHash
KeyHash MsgId
fp
ServerStats
serverStats <- UTCTime -> IO ServerStats
newServerStats (UTCTime -> IO ServerStats) -> IO UTCTime -> IO ServerStats
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO UTCTime
getCurrentTime
TVar [(String, SocketState)]
sockets <- [(String, SocketState)] -> IO (TVar [(String, SocketState)])
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO []
TVar Int
clientSeq <- Int -> IO (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
0
ProxyAgent
proxyAgent <- SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent
newSMPProxyAgent SMPClientAgentConfig
smpAgentCfg TVar ChaChaDRG
random
Env s -> IO (Env s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
Env
{ TVar AddHTTP
$sel:serverActive:Env :: TVar AddHTTP
serverActive :: TVar AddHTTP
serverActive,
ServerConfig s
$sel:config:Env :: ServerConfig s
config :: ServerConfig s
config,
ServerInformation
$sel:serverInfo:Env :: ServerInformation
serverInfo :: ServerInformation
serverInfo,
Server s
$sel:server:Env :: Server s
server :: Server s
server,
KeyHash
$sel:serverIdentity:Env :: KeyHash
serverIdentity :: KeyHash
serverIdentity,
MsgStore s
$sel:msgStore_:Env :: MsgStore s
msgStore_ :: MsgStore s
msgStore_,
NtfStore
$sel:ntfStore:Env :: NtfStore
ntfStore :: NtfStore
ntfStore,
TVar ChaChaDRG
$sel:random:Env :: TVar ChaChaDRG
random :: TVar ChaChaDRG
random,
Credential
$sel:tlsServerCreds:Env :: Credential
tlsServerCreds :: Credential
tlsServerCreds,
Maybe Credential
$sel:httpServerCreds:Env :: Maybe Credential
httpServerCreds :: Maybe Credential
httpServerCreds,
ServerStats
$sel:serverStats:Env :: ServerStats
serverStats :: ServerStats
serverStats,
TVar [(String, SocketState)]
$sel:sockets:Env :: TVar [(String, SocketState)]
sockets :: TVar [(String, SocketState)]
sockets,
TVar Int
$sel:clientSeq:Env :: TVar Int
clientSeq :: TVar Int
clientSeq,
ProxyAgent
$sel:proxyAgent:Env :: ProxyAgent
proxyAgent :: ProxyAgent
proxyAgent
}
where
loadStoreLog :: StoreQueueClass q => (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO ()
loadStoreLog :: forall q.
StoreQueueClass q =>
(EntityId -> QueueRec -> IO q)
-> String -> STMQueueStore q -> IO ()
loadStoreLog EntityId -> QueueRec -> IO q
mkQ String
f STMQueueStore q
st = do
Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logNote (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"restoring queues from file " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
f
StoreLog 'WriteMode
sl <- AddHTTP
-> (EntityId -> QueueRec -> IO q)
-> String
-> STMQueueStore q
-> IO (StoreLog 'WriteMode)
forall q.
StoreQueueClass q =>
AddHTTP
-> (EntityId -> QueueRec -> IO q)
-> String
-> STMQueueStore q
-> IO (StoreLog 'WriteMode)
readWriteQueueStore AddHTTP
False EntityId -> QueueRec -> IO q
mkQ String
f STMQueueStore q
st
STMQueueStore q -> StoreLog 'WriteMode -> IO ()
forall q. STMQueueStore q -> StoreLog 'WriteMode -> IO ()
setStoreLog STMQueueStore q
st StoreLog 'WriteMode
sl
#if defined(dbServerPostgres)
compactDbStoreLog = \case
Just f -> do
logNote $ "compacting queues in file " <> T.pack f
st <- newMsgStore STMStoreConfig {storePath = Nothing, quota = msgQueueQuota}
sl <- readWriteQueueStore False (mkQueue st False) f (queueStore st)
setStoreLog (queueStore st) sl
closeMsgStore st
Nothing -> do
logError "Error: `--compact-log` used without `db_store_log` INI option"
exitFailure
#endif
getCredentials :: String -> ServerCredentials -> IO Credential
getCredentials String
protocol ServerCredentials
creds = do
[String]
files <- IO [String]
missingCreds
AddHTTP -> IO () -> IO ()
forall (f :: * -> *). Applicative f => AddHTTP -> f () -> f ()
unless ([String] -> AddHTTP
forall a. [a] -> AddHTTP
forall (t :: * -> *) a. Foldable t => t a -> AddHTTP
null [String]
files) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"----------\nError: no " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
protocol String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" credentials: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
", " [String]
files
AddHTTP -> IO () -> IO ()
forall (f :: * -> *). Applicative f => AddHTTP -> f () -> f ()
when (String
protocol String -> String -> AddHTTP
forall a. Eq a => a -> a -> AddHTTP
== String
"HTTPS") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
String -> IO ()
putStrLn String
"Server should serve static pages to show connection links in the browser."
String -> IO ()
putStrLn String
letsEncrypt
IO ()
forall a. IO a
exitFailure
ServerCredentials -> IO Credential
loadServerCredential ServerCredentials
creds
where
missingfile :: String -> IO [String]
missingfile String
f = (\AddHTTP
y -> [String
f | AddHTTP -> AddHTTP
not AddHTTP
y]) (AddHTTP -> [String]) -> IO AddHTTP -> IO [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IO AddHTTP
doesFileExist String
f
missingCreds :: IO [String]
missingCreds = do
let files :: [String]
files = ([String] -> [String])
-> (String -> [String] -> [String])
-> Maybe String
-> [String]
-> [String]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [String] -> [String]
forall a. a -> a
id (:) (ServerCredentials -> Maybe String
caCertificateFile ServerCredentials
creds) [ServerCredentials -> String
certificateFile ServerCredentials
creds, ServerCredentials -> String
privateKeyFile ServerCredentials
creds]
in [[String]] -> [String]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[String]] -> [String]) -> IO [[String]] -> IO [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (String -> IO [String]) -> [String] -> IO [[String]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM String -> IO [String]
missingfile [String]
files
checkHTTPSCredentials :: Credential -> IO ()
checkHTTPSCredentials (X.CertificateChain [SignedExact Certificate]
cc, PrivKey
_k) =
case (SignedExact Certificate -> Certificate)
-> [SignedExact Certificate] -> [Certificate]
forall a b. (a -> b) -> [a] -> [b]
map (Signed Certificate -> Certificate
forall a. (Show a, Eq a, ASN1Object a) => Signed a -> a
X.signedObject (Signed Certificate -> Certificate)
-> (SignedExact Certificate -> Signed Certificate)
-> SignedExact Certificate
-> Certificate
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SignedExact Certificate -> Signed Certificate
forall a. (Show a, Eq a, ASN1Object a) => SignedExact a -> Signed a
X.getSigned) [SignedExact Certificate]
cc of
X.Certificate {certPubKey :: Certificate -> PubKey
X.certPubKey = X.PubKeyRSA PublicKey
rsa} : [Certificate]
_ca | PublicKey -> Int
RSA.public_size PublicKey
rsa Int -> Int -> AddHTTP
forall a. Ord a => a -> a -> AddHTTP
>= Int
512 -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
[Certificate]
_ -> do
String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Error: unsupported HTTPS credentials, required 4096-bit RSA\n" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
letsEncrypt
IO ()
forall a. IO a
exitFailure
letsEncrypt :: String
letsEncrypt = String
"Use Let's Encrypt to generate: certbot certonly --standalone -d yourdomainname --key-type rsa --rsa-key-size 4096\n----------"
serverInfo :: ServerInformation
serverInfo =
ServerInformation
{ Maybe ServerPublicInfo
information :: Maybe ServerPublicInfo
$sel:information:ServerInformation :: Maybe ServerPublicInfo
information,
$sel:config:ServerInformation :: ServerPublicConfig
config =
ServerPublicConfig
{ ServerPersistenceMode
persistence :: ServerPersistenceMode
$sel:persistence:ServerPublicConfig :: ServerPersistenceMode
persistence,
$sel:messageExpiration:ServerPublicConfig :: Maybe Int64
messageExpiration = ExpirationConfig -> Int64
ttl (ExpirationConfig -> Int64)
-> Maybe ExpirationConfig -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ExpirationConfig
messageExpiration,
$sel:statsEnabled:ServerPublicConfig :: AddHTTP
statsEnabled = Maybe Int64 -> AddHTTP
forall a. Maybe a -> AddHTTP
isJust (Maybe Int64 -> AddHTTP) -> Maybe Int64 -> AddHTTP
forall a b. (a -> b) -> a -> b
$ ServerConfig s -> Maybe Int64
forall s. ServerConfig s -> Maybe Int64
logStatsInterval ServerConfig s
config,
$sel:newQueuesAllowed:ServerPublicConfig :: AddHTTP
newQueuesAllowed = ServerConfig s -> AddHTTP
forall s. ServerConfig s -> AddHTTP
allowNewQueues ServerConfig s
config,
$sel:basicAuthEnabled:ServerPublicConfig :: AddHTTP
basicAuthEnabled = Maybe BasicAuth -> AddHTTP
forall a. Maybe a -> AddHTTP
isJust (Maybe BasicAuth -> AddHTTP) -> Maybe BasicAuth -> AddHTTP
forall a b. (a -> b) -> a -> b
$ ServerConfig s -> Maybe BasicAuth
forall s. ServerConfig s -> Maybe BasicAuth
newQueueBasicAuth ServerConfig s
config
}
}
where
persistence :: ServerPersistenceMode
persistence = case ServerStoreCfg s
serverStoreCfg of
SSCMemory Maybe StorePaths
sp_ -> case Maybe StorePaths
sp_ of
Maybe StorePaths
Nothing -> ServerPersistenceMode
SPMMemoryOnly
Just StorePaths {$sel:storeMsgsFile:StorePaths :: StorePaths -> Maybe String
storeMsgsFile = Just String
_} -> ServerPersistenceMode
SPMMessages
Maybe StorePaths
_ -> ServerPersistenceMode
SPMQueues
ServerStoreCfg s
_ -> ServerPersistenceMode
SPMMessages
noPostgresExit :: IO a
noPostgresExit :: forall a. IO a
noPostgresExit = String -> IO ()
putStrLn String
noPostgresExitStr IO () -> IO a -> IO a
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a
forall a. IO a
exitFailure
noPostgresExitStr :: String
noPostgresExitStr :: String
noPostgresExitStr =
String
"Error: server binary is compiled without support for PostgreSQL database.\n"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"Please download `smp-server-postgres` or re-compile with `cabal build -fserver_postgres`."
mkJournalStoreConfig :: QStoreCfg s -> FilePath -> Int -> Int -> Int -> Int64 -> JournalStoreConfig s
mkJournalStoreConfig :: forall (s :: QSType).
QStoreCfg s
-> String -> Int -> Int -> Int -> Int64 -> JournalStoreConfig s
mkJournalStoreConfig QStoreCfg s
queueStoreCfg String
storePath Int
msgQueueQuota Int
maxJournalMsgCount Int
maxJournalStateLines Int64
idleQueueInterval =
JournalStoreConfig
{ String
storePath :: String
$sel:storePath:JournalStoreConfig :: String
storePath,
$sel:quota:JournalStoreConfig :: Int
quota = Int
msgQueueQuota,
$sel:pathParts:JournalStoreConfig :: Int
pathParts = Int
journalMsgStoreDepth,
QStoreCfg s
queueStoreCfg :: QStoreCfg s
$sel:queueStoreCfg:JournalStoreConfig :: QStoreCfg s
queueStoreCfg,
$sel:maxMsgCount:JournalStoreConfig :: Int
maxMsgCount = Int
maxJournalMsgCount,
$sel:maxStateLines:JournalStoreConfig :: Int
maxStateLines = Int
maxJournalStateLines,
$sel:stateTailSize:JournalStoreConfig :: Int
stateTailSize = Int
defaultStateTailSize,
$sel:idleInterval:JournalStoreConfig :: Int64
idleInterval = Int64
idleQueueInterval,
$sel:expireBackupsAfter:JournalStoreConfig :: NominalDiffTime
expireBackupsAfter = NominalDiffTime
14 NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
nominalDay,
$sel:keepMinBackups:JournalStoreConfig :: Int
keepMinBackups = Int
2
}
newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent
newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent
newSMPProxyAgent SMPClientAgentConfig
smpAgentCfg TVar ChaChaDRG
random = do
SMPClientAgent 'Sender
smpAgent <- SParty 'Sender
-> SMPClientAgentConfig
-> TVar ChaChaDRG
-> IO (SMPClientAgent 'Sender)
forall (p :: Party).
SParty p
-> SMPClientAgentConfig -> TVar ChaChaDRG -> IO (SMPClientAgent p)
newSMPClientAgent SParty 'Sender
SSender SMPClientAgentConfig
smpAgentCfg TVar ChaChaDRG
random
ProxyAgent -> IO ProxyAgent
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ProxyAgent {SMPClientAgent 'Sender
$sel:smpAgent:ProxyAgent :: SMPClientAgent 'Sender
smpAgent :: SMPClientAgent 'Sender
smpAgent}
readWriteQueueStore :: forall q. StoreQueueClass q => Bool -> (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO (StoreLog 'WriteMode)
readWriteQueueStore :: forall q.
StoreQueueClass q =>
AddHTTP
-> (EntityId -> QueueRec -> IO q)
-> String
-> STMQueueStore q
-> IO (StoreLog 'WriteMode)
readWriteQueueStore AddHTTP
tty EntityId -> QueueRec -> IO q
mkQ = (String -> STMQueueStore q -> IO ())
-> (StoreLog 'WriteMode -> STMQueueStore q -> IO ())
-> String
-> STMQueueStore q
-> IO (StoreLog 'WriteMode)
forall s.
(String -> s -> IO ())
-> (StoreLog 'WriteMode -> s -> IO ())
-> String
-> s
-> IO (StoreLog 'WriteMode)
readWriteStoreLog (AddHTTP
-> (EntityId -> QueueRec -> IO q)
-> String
-> STMQueueStore q
-> IO ()
forall q.
StoreQueueClass q =>
AddHTTP
-> (EntityId -> QueueRec -> IO q)
-> String
-> STMQueueStore q
-> IO ()
readQueueStore AddHTTP
tty EntityId -> QueueRec -> IO q
mkQ) (forall q.
StoreQueueClass q =>
StoreLog 'WriteMode -> STMQueueStore q -> IO ()
writeQueueStore @q)