{-# LANGUAGE CPP #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
#if __GLASGOW_HASKELL__ == 810
{-# LANGUAGE UndecidableInstances #-}
#endif
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

module Simplex.Messaging.Server.Env.STM
  ( ServerConfig (..),
    ServerStoreCfg (..),
    -- AServerStoreCfg (..),
    SupportedStore,
    StorePaths (..),
    StartOptions (..),
    Env (..),
    Server (..),
    ServerSubscribers (..),
    SubscribedClients,
    ProxyAgent (..),
    Client (..),
    ClientId,
    ClientSub (..),
    Sub (..),
    ServerSub (..),
    SubscriptionThread (..),
    MsgStoreType,
    MsgStore (..),
    AStoreType (..),
    VerifiedTransmission,
    ResponseAndMessage,
    newEnv,
    mkJournalStoreConfig,
    msgStore,
    fromMsgStore,
    newClient,
    getServerClients,
    getServerClient,
    insertServerClient,
    deleteServerClient,
    getSubscribedClients,
    getSubscribedClient,
    upsertSubscribedClient,
    lookupSubscribedClient,
    lookupDeleteSubscribedClient,
    deleteSubcribedClient,
    sameClientId,
    sameClient,
    newSubscription,
    newProhibitedSub,
    defaultMsgQueueQuota,
    defMsgExpirationDays,
    defNtfExpirationHours,
    defaultMessageExpiration,
    defaultNtfExpiration,
    defaultInactiveClientExpiration,
    defaultProxyClientConcurrency,
    defaultMaxJournalMsgCount,
    defaultMaxJournalStateLines,
    defaultIdleQueueInterval,
    journalMsgStoreDepth,
    readWriteQueueStore,
    noPostgresExitStr,
    noPostgresExit,
    dbStoreCfg,
    storeLogFile',
  )
where

import Control.Concurrent (ThreadId)
import Control.Logger.Simple
import Control.Monad
import qualified Crypto.PubKey.RSA as RSA
import Crypto.Random
import Data.Int (Int64)
import Data.IntMap.Strict (IntMap)
import qualified Data.IntMap.Strict as IM
import Data.IntSet (IntSet)
import qualified Data.IntSet as IS
import Data.Kind (Constraint)
import Data.List (intercalate)
import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import Data.Maybe (isJust)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime, nominalDay)
import Data.Time.Clock.System (SystemTime)
import qualified Data.X509 as X
import Data.X509.Validation (Fingerprint (..))
import GHC.TypeLits (TypeError)
import qualified GHC.TypeLits as TE
import Network.Socket (ServiceName)
import qualified Network.TLS as T
import Numeric.Natural
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..))
import Simplex.Messaging.Client.Agent (SMPClientAgent, SMPClientAgentConfig, newSMPClientAgent)
import Simplex.Messaging.Crypto (KeyHash (..))
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.Information
import Simplex.Messaging.Server.MsgStore.Journal
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.Postgres.Config
import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore, setStoreLog)
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Server.StoreLog.ReadWrite
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ASrvTransport, SMPVersion, THandleParams, TransportPeer (..), VersionRangeSMP)
import Simplex.Messaging.Transport.Server
import Simplex.Messaging.Util (ifM, whenM, ($>>=))
import System.Directory (doesFileExist)
import System.Exit (exitFailure)
import System.IO (IOMode (..))
import System.Mem.Weak (Weak)
import UnliftIO.STM

#if defined(dbServerPostgres)
import Simplex.Messaging.Server.MsgStore.Postgres
#endif

data ServerConfig s = ServerConfig
  { forall s. ServerConfig s -> [(String, ASrvTransport, AddHTTP)]
transports :: [(ServiceName, ASrvTransport, AddHTTP)],
    forall s. ServerConfig s -> Int
smpHandshakeTimeout :: Int,
    forall s. ServerConfig s -> Natural
tbqSize :: Natural,
    forall s. ServerConfig s -> Int
msgQueueQuota :: Int,
    forall s. ServerConfig s -> Int
maxJournalMsgCount :: Int,
    forall s. ServerConfig s -> Int
maxJournalStateLines :: Int,
    forall s. ServerConfig s -> Int
queueIdBytes :: Int,
    forall s. ServerConfig s -> Int
msgIdBytes :: Int,
    forall s. ServerConfig s -> ServerStoreCfg s
serverStoreCfg :: ServerStoreCfg s,
    forall s. ServerConfig s -> Maybe String
storeNtfsFile :: Maybe FilePath,
    -- | set to False to prohibit creating new queues
    forall s. ServerConfig s -> AddHTTP
allowNewQueues :: Bool,
    -- | simple password that the clients need to pass in handshake to be able to create new queues
    forall s. ServerConfig s -> Maybe BasicAuth
newQueueBasicAuth :: Maybe BasicAuth,
    -- | control port passwords,
    forall s. ServerConfig s -> Maybe BasicAuth
controlPortUserAuth :: Maybe BasicAuth,
    forall s. ServerConfig s -> Maybe BasicAuth
controlPortAdminAuth :: Maybe BasicAuth,
    forall s. ServerConfig s -> Int
dailyBlockQueueQuota :: Int,
    -- | time after which the messages can be removed from the queues and check interval, seconds
    forall s. ServerConfig s -> Maybe ExpirationConfig
messageExpiration :: Maybe ExpirationConfig,
    forall s. ServerConfig s -> AddHTTP
expireMessagesOnStart :: Bool,
    forall s. ServerConfig s -> AddHTTP
expireMessagesOnSend :: Bool,
    -- | interval of inactivity after which journal queue is closed
    forall s. ServerConfig s -> Int64
idleQueueInterval :: Int64,
    -- | notification expiration interval (seconds)
    forall s. ServerConfig s -> ExpirationConfig
notificationExpiration :: ExpirationConfig,
    -- | time after which the socket with inactive client can be disconnected (without any messages or commands, incl. PING),
    -- and check interval, seconds
    forall s. ServerConfig s -> Maybe ExpirationConfig
inactiveClientExpiration :: Maybe ExpirationConfig,
    -- | log SMP server usage statistics, only aggregates are logged, seconds
    forall s. ServerConfig s -> Maybe Int64
logStatsInterval :: Maybe Int64,
    -- | time of the day when the stats are logged first, to log at consistent times,
    -- irrespective of when the server is started (seconds from 00:00 UTC)
    forall s. ServerConfig s -> Int64
logStatsStartTime :: Int64,
    -- | file to log stats
    forall s. ServerConfig s -> String
serverStatsLogFile :: FilePath,
    -- | file to save and restore stats
    forall s. ServerConfig s -> Maybe String
serverStatsBackupFile :: Maybe FilePath,
    -- | interval and file to save prometheus metrics
    forall s. ServerConfig s -> Maybe Int
prometheusInterval :: Maybe Int,
    forall s. ServerConfig s -> String
prometheusMetricsFile :: FilePath,
    -- | notification delivery interval
    forall s. ServerConfig s -> Int
ntfDeliveryInterval :: Int,
    -- | interval between sending pending END events to unsubscribed clients, seconds
    forall s. ServerConfig s -> Int
pendingENDInterval :: Int,
    forall s. ServerConfig s -> ServerCredentials
smpCredentials :: ServerCredentials,
    forall s. ServerConfig s -> Maybe ServerCredentials
httpCredentials :: Maybe ServerCredentials,
    -- | SMP client-server protocol version range
    forall s. ServerConfig s -> VersionRangeSMP
smpServerVRange :: VersionRangeSMP,
    -- | TCP transport config
    forall s. ServerConfig s -> TransportServerConfig
transportConfig :: TransportServerConfig,
    -- | run listener on control port
    forall s. ServerConfig s -> Maybe String
controlPort :: Maybe ServiceName,
    -- | SMP proxy config
    forall s. ServerConfig s -> SMPClientAgentConfig
smpAgentCfg :: SMPClientAgentConfig,
    forall s. ServerConfig s -> AddHTTP
allowSMPProxy :: Bool, -- auth is the same with `newQueueBasicAuth`
    forall s. ServerConfig s -> Int
serverClientConcurrency :: Int,
    -- | server public information
    forall s. ServerConfig s -> Maybe ServerPublicInfo
information :: Maybe ServerPublicInfo,
    forall s. ServerConfig s -> StartOptions
startOptions :: StartOptions
  }

data StartOptions = StartOptions
  { StartOptions -> AddHTTP
maintenance :: Bool,
    StartOptions -> AddHTTP
compactLog :: Bool,
    StartOptions -> LogLevel
logLevel :: LogLevel,
    StartOptions -> AddHTTP
skipWarnings :: Bool,
    StartOptions -> MigrationConfirmation
confirmMigrations :: MigrationConfirmation
  }

defMsgExpirationDays :: Int64
defMsgExpirationDays :: Int64
defMsgExpirationDays = Int64
21

defaultMessageExpiration :: ExpirationConfig
defaultMessageExpiration :: ExpirationConfig
defaultMessageExpiration =
  ExpirationConfig
    { ttl :: Int64
ttl = Int64
defMsgExpirationDays Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
86400, -- seconds
      checkInterval :: Int64
checkInterval = Int64
7200 -- seconds, 2 hours
    }

defaultIdleQueueInterval :: Int64
defaultIdleQueueInterval :: Int64
defaultIdleQueueInterval = Int64
14400 -- seconds, 4 hours

defNtfExpirationHours :: Int64
defNtfExpirationHours :: Int64
defNtfExpirationHours = Int64
24

defaultNtfExpiration :: ExpirationConfig
defaultNtfExpiration :: ExpirationConfig
defaultNtfExpiration =
  ExpirationConfig
    { ttl :: Int64
ttl = Int64
defNtfExpirationHours Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
3600, -- seconds
      checkInterval :: Int64
checkInterval = Int64
3600 -- seconds, 1 hour
    }

defaultInactiveClientExpiration :: ExpirationConfig
defaultInactiveClientExpiration :: ExpirationConfig
defaultInactiveClientExpiration =
  ExpirationConfig
    { ttl :: Int64
ttl = Int64
21600, -- seconds, 6 hours
      checkInterval :: Int64
checkInterval = Int64
3600 -- seconds, 1 hours
    }

defaultProxyClientConcurrency :: Int
defaultProxyClientConcurrency :: Int
defaultProxyClientConcurrency = Int
32

journalMsgStoreDepth :: Int
journalMsgStoreDepth :: Int
journalMsgStoreDepth = Int
5

defaultMaxJournalStateLines :: Int
defaultMaxJournalStateLines :: Int
defaultMaxJournalStateLines = Int
16

defaultMaxJournalMsgCount :: Int
defaultMaxJournalMsgCount :: Int
defaultMaxJournalMsgCount = Int
256

defaultMsgQueueQuota :: Int
defaultMsgQueueQuota :: Int
defaultMsgQueueQuota = Int
128

defaultStateTailSize :: Int
defaultStateTailSize :: Int
defaultStateTailSize = Int
512

data Env s = Env
  { forall s. Env s -> ServerConfig s
config :: ServerConfig s,
    forall s. Env s -> TVar AddHTTP
serverActive :: TVar Bool,
    forall s. Env s -> ServerInformation
serverInfo :: ServerInformation,
    forall s. Env s -> Server s
server :: Server s,
    forall s. Env s -> KeyHash
serverIdentity :: KeyHash,
    forall s. Env s -> MsgStore s
msgStore_ :: MsgStore s,
    forall s. Env s -> NtfStore
ntfStore :: NtfStore,
    forall s. Env s -> TVar ChaChaDRG
random :: TVar ChaChaDRG,
    forall s. Env s -> Credential
tlsServerCreds :: T.Credential,
    forall s. Env s -> Maybe Credential
httpServerCreds :: Maybe T.Credential,
    forall s. Env s -> ServerStats
serverStats :: ServerStats,
    forall s. Env s -> TVar [(String, SocketState)]
sockets :: TVar [(ServiceName, SocketState)],
    forall s. Env s -> TVar Int
clientSeq :: TVar ClientId,
    forall s. Env s -> ProxyAgent
proxyAgent :: ProxyAgent -- senders served on this proxy
  }

msgStore :: Env s -> s
msgStore :: forall s. Env s -> s
msgStore = MsgStore s -> s
forall s. MsgStore s -> s
fromMsgStore (MsgStore s -> s) -> (Env s -> MsgStore s) -> Env s -> s
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env s -> MsgStore s
forall s. Env s -> MsgStore s
msgStore_
{-# INLINE msgStore #-}

fromMsgStore :: MsgStore s -> s
fromMsgStore :: forall s. MsgStore s -> s
fromMsgStore = \case
  StoreMemory STMMsgStore
s -> s
STMMsgStore
s
  StoreJournal JournalMsgStore qs
s -> s
JournalMsgStore qs
s
#if defined(dbServerPostgres)
  StoreDatabase s -> s
#endif
{-# INLINE fromMsgStore #-}

type family SupportedStore (qs :: QSType) (ms :: MSType) :: Constraint where
  SupportedStore 'QSMemory 'MSMemory = ()
  SupportedStore 'QSMemory 'MSJournal = ()
  SupportedStore 'QSMemory 'MSPostgres =
    (Int ~ Bool, TypeError ('TE.Text "Storing messages in Postgres DB with queues in memory is not supported"))
  SupportedStore 'QSPostgres 'MSMemory =
    (Int ~ Bool, TypeError ('TE.Text "Storing messages in memory with queues in Postgres DB is not supported"))
  SupportedStore 'QSPostgres 'MSJournal = ()
#if defined(dbServerPostgres)
  SupportedStore 'QSPostgres 'MSPostgres = ()
#else
  SupportedStore 'QSPostgres 'MSPostgres =
    (Int ~ Bool, TypeError ('TE.Text "Server compiled without server_postgres flag"))
#endif

data AStoreType =
  forall qs ms. (SupportedStore qs ms, MsgStoreClass (MsgStoreType qs ms)) =>
  ASType (SQSType qs) (SMSType ms)

data ServerStoreCfg s where
  SSCMemory :: Maybe StorePaths -> ServerStoreCfg STMMsgStore
  SSCMemoryJournal :: {ServerStoreCfg (JournalMsgStore 'QSMemory) -> String
storeLogFile :: FilePath, ServerStoreCfg (JournalMsgStore 'QSMemory) -> String
storeMsgsPath :: FilePath} -> ServerStoreCfg (JournalMsgStore 'QSMemory)
  SSCDatabaseJournal :: {ServerStoreCfg (JournalMsgStore 'QSPostgres) -> PostgresStoreCfg
storeCfg :: PostgresStoreCfg, ServerStoreCfg (JournalMsgStore 'QSPostgres) -> String
storeMsgsPath' :: FilePath} -> ServerStoreCfg (JournalMsgStore 'QSPostgres)
#if defined(dbServerPostgres)
  SSCDatabase :: PostgresStoreCfg -> ServerStoreCfg PostgresMsgStore
#endif

dbStoreCfg :: ServerStoreCfg s -> Maybe PostgresStoreCfg
dbStoreCfg :: forall s. ServerStoreCfg s -> Maybe PostgresStoreCfg
dbStoreCfg = \case
  SSCMemory Maybe StorePaths
_ -> Maybe PostgresStoreCfg
forall a. Maybe a
Nothing
  SSCMemoryJournal {} -> Maybe PostgresStoreCfg
forall a. Maybe a
Nothing
  SSCDatabaseJournal {PostgresStoreCfg
$sel:storeCfg:SSCMemory :: ServerStoreCfg (JournalMsgStore 'QSPostgres) -> PostgresStoreCfg
storeCfg :: PostgresStoreCfg
storeCfg} -> PostgresStoreCfg -> Maybe PostgresStoreCfg
forall a. a -> Maybe a
Just PostgresStoreCfg
storeCfg
#if defined(dbServerPostgres)
  SSCDatabase cfg -> Just cfg
#endif

storeLogFile' :: ServerStoreCfg s -> Maybe FilePath
storeLogFile' :: forall s. ServerStoreCfg s -> Maybe String
storeLogFile' = \case
  SSCMemory Maybe StorePaths
sp_ -> (\StorePaths {String
storeLogFile :: String
$sel:storeLogFile:StorePaths :: StorePaths -> String
storeLogFile} -> String
storeLogFile) (StorePaths -> String) -> Maybe StorePaths -> Maybe String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe StorePaths
sp_
  SSCMemoryJournal {String
$sel:storeLogFile:SSCMemory :: ServerStoreCfg (JournalMsgStore 'QSMemory) -> String
storeLogFile :: String
storeLogFile} -> String -> Maybe String
forall a. a -> Maybe a
Just String
storeLogFile
  SSCDatabaseJournal {$sel:storeCfg:SSCMemory :: ServerStoreCfg (JournalMsgStore 'QSPostgres) -> PostgresStoreCfg
storeCfg = PostgresStoreCfg {Maybe String
dbStoreLogPath :: Maybe String
dbStoreLogPath :: PostgresStoreCfg -> Maybe String
dbStoreLogPath}} -> Maybe String
dbStoreLogPath
#if defined(dbServerPostgres)
  SSCDatabase (PostgresStoreCfg {dbStoreLogPath}) -> dbStoreLogPath
#endif

data StorePaths = StorePaths {StorePaths -> String
storeLogFile :: FilePath, StorePaths -> Maybe String
storeMsgsFile :: Maybe FilePath}

type family MsgStoreType (qs :: QSType) (ms :: MSType) where
  MsgStoreType 'QSMemory 'MSMemory = STMMsgStore
  MsgStoreType qs 'MSJournal = JournalMsgStore qs
#if defined(dbServerPostgres)
  MsgStoreType 'QSPostgres 'MSPostgres = PostgresMsgStore
#endif

data MsgStore s where
  StoreMemory :: STMMsgStore -> MsgStore STMMsgStore
  StoreJournal :: JournalMsgStore qs -> MsgStore (JournalMsgStore qs)
#if defined(dbServerPostgres)
  StoreDatabase :: PostgresMsgStore -> MsgStore PostgresMsgStore
#endif

data Server s = Server
  { forall s. Server s -> ServerClients s
clients :: ServerClients s,
    forall s. Server s -> ServerSubscribers s
subscribers :: ServerSubscribers s,
    forall s. Server s -> ServerSubscribers s
ntfSubscribers :: ServerSubscribers s,
    forall s. Server s -> Lock
savingLock :: Lock
  }

-- not exported, to prevent concurrent IntMap lookups inside STM transactions.
newtype ServerClients s = ServerClients {forall s. ServerClients s -> TVar (IntMap (Client s))
serverClients :: TVar (IntMap (Client s))}

data ServerSubscribers s = ServerSubscribers
  { forall s. ServerSubscribers s -> TQueue (ClientSub, Int)
subQ :: TQueue (ClientSub, ClientId),
    forall s. ServerSubscribers s -> SubscribedClients s
queueSubscribers :: SubscribedClients s,
    forall s. ServerSubscribers s -> SubscribedClients s
serviceSubscribers :: SubscribedClients s, -- service clients with long-term certificates that have subscriptions
    forall s. ServerSubscribers s -> TVar Int64
totalServiceSubs :: TVar Int64,
    forall s. ServerSubscribers s -> TVar IntSet
subClients :: TVar IntSet, -- clients with individual or service subscriptions
    forall s.
ServerSubscribers s
-> TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
  }

-- not exported, to prevent accidental concurrent Map lookups inside STM transactions.
-- Map stores TVars with pointers to the clients rather than client ID to allow reading the same TVar
-- inside transactions to ensure that transaction is re-evaluated in case subscriber changes.
-- Storing Maybe allows to have continuity of subscription when the same user client disconnects and re-connects -
-- any STM transaction that reads subscribed client will re-evaluate in this case.
-- The subscriptions that were made at any point are not removed -
-- this is a better trade-off with intermittently connected mobile clients.
data SubscribedClients s = SubscribedClients (TMap EntityId (TVar (Maybe (Client s))))

getSubscribedClients :: SubscribedClients s -> IO (Map EntityId (TVar (Maybe (Client s))))
getSubscribedClients :: forall s.
SubscribedClients s -> IO (Map EntityId (TVar (Maybe (Client s))))
getSubscribedClients (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) = TMap EntityId (TVar (Maybe (Client s)))
-> IO (Map EntityId (TVar (Maybe (Client s))))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TMap EntityId (TVar (Maybe (Client s)))
cs

getSubscribedClient :: EntityId -> SubscribedClients s -> IO (Maybe (TVar (Maybe (Client s))))
getSubscribedClient :: forall s.
EntityId
-> SubscribedClients s -> IO (Maybe (TVar (Maybe (Client s))))
getSubscribedClient EntityId
entId (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) = EntityId
-> TMap EntityId (TVar (Maybe (Client s)))
-> IO (Maybe (TVar (Maybe (Client s))))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs
{-# INLINE getSubscribedClient #-}

-- insert subscribed and current client, return previously subscribed client if it is different
upsertSubscribedClient :: EntityId -> Client s -> SubscribedClients s -> STM (Maybe (Client s))
upsertSubscribedClient :: forall s.
EntityId
-> Client s -> SubscribedClients s -> STM (Maybe (Client s))
upsertSubscribedClient EntityId
entId Client s
c (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) =
  EntityId
-> TMap EntityId (TVar (Maybe (Client s)))
-> STM (Maybe (TVar (Maybe (Client s))))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs STM (Maybe (TVar (Maybe (Client s))))
-> (Maybe (TVar (Maybe (Client s))) -> STM (Maybe (Client s)))
-> STM (Maybe (Client s))
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe (TVar (Maybe (Client s)))
Nothing -> Maybe (Client s)
forall a. Maybe a
Nothing Maybe (Client s) -> STM () -> STM (Maybe (Client s))
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ EntityId
-> STM (TVar (Maybe (Client s)))
-> TMap EntityId (TVar (Maybe (Client s)))
-> STM ()
forall k a. Ord k => k -> STM a -> TMap k a -> STM ()
TM.insertM EntityId
entId (Maybe (Client s) -> STM (TVar (Maybe (Client s)))
forall a. a -> STM (TVar a)
newTVar (Client s -> Maybe (Client s)
forall a. a -> Maybe a
Just Client s
c)) TMap EntityId (TVar (Maybe (Client s)))
cs
    Just TVar (Maybe (Client s))
cv ->
      TVar (Maybe (Client s)) -> STM (Maybe (Client s))
forall a. TVar a -> STM a
readTVar TVar (Maybe (Client s))
cv STM (Maybe (Client s))
-> (Maybe (Client s) -> STM (Maybe (Client s)))
-> STM (Maybe (Client s))
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just Client s
c' | Client s -> Client s -> AddHTTP
forall s. Client s -> Client s -> AddHTTP
sameClientId Client s
c Client s
c' -> Maybe (Client s) -> STM (Maybe (Client s))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Client s)
forall a. Maybe a
Nothing
        Maybe (Client s)
c_ -> Maybe (Client s)
c_ Maybe (Client s) -> STM () -> STM (Maybe (Client s))
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TVar (Maybe (Client s)) -> Maybe (Client s) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Client s))
cv (Client s -> Maybe (Client s)
forall a. a -> Maybe a
Just Client s
c)

lookupSubscribedClient :: EntityId -> SubscribedClients s -> STM (Maybe (Client s))
lookupSubscribedClient :: forall s. EntityId -> SubscribedClients s -> STM (Maybe (Client s))
lookupSubscribedClient EntityId
entId (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) = EntityId
-> TMap EntityId (TVar (Maybe (Client s)))
-> STM (Maybe (TVar (Maybe (Client s))))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookup EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs STM (Maybe (TVar (Maybe (Client s))))
-> (TVar (Maybe (Client s)) -> STM (Maybe (Client s)))
-> STM (Maybe (Client s))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= TVar (Maybe (Client s)) -> STM (Maybe (Client s))
forall a. TVar a -> STM a
readTVar
{-# INLINE lookupSubscribedClient #-}

-- lookup and delete currently subscribed client
lookupDeleteSubscribedClient :: EntityId -> SubscribedClients s -> STM (Maybe (Client s))
lookupDeleteSubscribedClient :: forall s. EntityId -> SubscribedClients s -> STM (Maybe (Client s))
lookupDeleteSubscribedClient EntityId
entId (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) =
  EntityId
-> TMap EntityId (TVar (Maybe (Client s)))
-> STM (Maybe (TVar (Maybe (Client s))))
forall k a. Ord k => k -> TMap k a -> STM (Maybe a)
TM.lookupDelete EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs STM (Maybe (TVar (Maybe (Client s))))
-> (TVar (Maybe (Client s)) -> STM (Maybe (Client s)))
-> STM (Maybe (Client s))
forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, Monad f, Traversable f) =>
m (f a) -> (a -> m (f b)) -> m (f b)
$>>= (TVar (Maybe (Client s))
-> Maybe (Client s) -> STM (Maybe (Client s))
forall a. TVar a -> a -> STM a
`swapTVar` Maybe (Client s)
forall a. Maybe a
Nothing)
{-# INLINE lookupDeleteSubscribedClient #-}

deleteSubcribedClient :: EntityId -> Client s -> SubscribedClients s -> IO ()
deleteSubcribedClient :: forall s. EntityId -> Client s -> SubscribedClients s -> IO ()
deleteSubcribedClient EntityId
entId Client s
c (SubscribedClients TMap EntityId (TVar (Maybe (Client s)))
cs) =
  -- lookup of the subscribed client TVar can be in separate transaction,
  -- as long as the client is read in the same transaction -
  -- it prevents removing the next subscribed client and also avoids STM contention for the Map.
  EntityId
-> TMap EntityId (TVar (Maybe (Client s)))
-> IO (Maybe (TVar (Maybe (Client s))))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs IO (Maybe (TVar (Maybe (Client s))))
-> (Maybe (TVar (Maybe (Client s))) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (TVar (Maybe (Client s)) -> IO ())
-> Maybe (TVar (Maybe (Client s))) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\TVar (Maybe (Client s))
cv -> STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM AddHTTP -> STM () -> STM ()
forall (m :: * -> *). Monad m => m AddHTTP -> m () -> m ()
whenM (Client s -> TVar (Maybe (Client s)) -> STM AddHTTP
forall s. Client s -> TVar (Maybe (Client s)) -> STM AddHTTP
sameClient Client s
c TVar (Maybe (Client s))
cv) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Client s)) -> STM ()
delete TVar (Maybe (Client s))
cv)
  where
    delete :: TVar (Maybe (Client s)) -> STM ()
delete TVar (Maybe (Client s))
cv = do
      TVar (Maybe (Client s)) -> Maybe (Client s) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Client s))
cv Maybe (Client s)
forall a. Maybe a
Nothing
      EntityId -> TMap EntityId (TVar (Maybe (Client s))) -> STM ()
forall k a. Ord k => k -> TMap k a -> STM ()
TM.delete EntityId
entId TMap EntityId (TVar (Maybe (Client s)))
cs

sameClientId :: Client s -> (Client s) -> Bool
sameClientId :: forall s. Client s -> Client s -> AddHTTP
sameClientId Client s
c Client s
c' = Client s -> Int
forall s. Client s -> Int
clientId Client s
c Int -> Int -> AddHTTP
forall a. Eq a => a -> a -> AddHTTP
== Client s -> Int
forall s. Client s -> Int
clientId Client s
c'
{-# INLINE sameClientId #-}

sameClient :: Client s -> TVar (Maybe (Client s)) -> STM Bool
sameClient :: forall s. Client s -> TVar (Maybe (Client s)) -> STM AddHTTP
sameClient Client s
c TVar (Maybe (Client s))
cv = AddHTTP -> (Client s -> AddHTTP) -> Maybe (Client s) -> AddHTTP
forall b a. b -> (a -> b) -> Maybe a -> b
maybe AddHTTP
False (Client s -> Client s -> AddHTTP
forall s. Client s -> Client s -> AddHTTP
sameClientId Client s
c) (Maybe (Client s) -> AddHTTP)
-> STM (Maybe (Client s)) -> STM AddHTTP
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Maybe (Client s)) -> STM (Maybe (Client s))
forall a. TVar a -> STM a
readTVar TVar (Maybe (Client s))
cv
{-# INLINE sameClient #-}

data ClientSub
  = CSClient QueueId (Maybe ServiceId) (Maybe ServiceId) -- includes previous and new associated service IDs
  | CSDeleted QueueId (Maybe ServiceId) -- includes previously associated service IDs
  | CSService ServiceId Int64 -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB

newtype ProxyAgent = ProxyAgent
  { ProxyAgent -> SMPClientAgent 'Sender
smpAgent :: SMPClientAgent 'Sender
  }

type ClientId = Int

data Client s = Client
  { forall s. Client s -> Int
clientId :: ClientId,
    forall s. Client s -> TMap EntityId Sub
subscriptions :: TMap RecipientId Sub,
    forall s. Client s -> TMap EntityId ()
ntfSubscriptions :: TMap NotifierId (),
    forall s. Client s -> TVar AddHTTP
serviceSubscribed :: TVar Bool, -- set independently of serviceSubsCount, to track whether service subscription command was received
    forall s. Client s -> TVar AddHTTP
ntfServiceSubscribed :: TVar Bool,
    forall s. Client s -> TVar Int64
serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
    forall s. Client s -> TVar Int64
ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
    forall s. Client s -> TBQueue (NonEmpty (VerifiedTransmission s))
rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)),
    forall s.
Client s
-> TBQueue
     (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg])
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]),
    forall s. Client s -> TBQueue (NonEmpty (Transmission BrokerMsg))
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
    forall s. Client s -> TVar Int
procThreads :: TVar Int,
    forall s. Client s -> TVar (IntMap (Weak ThreadId))
endThreads :: TVar (IntMap (Weak ThreadId)),
    forall s. Client s -> TVar Int
endThreadSeq :: TVar Int,
    forall s. Client s -> THandleParams SMPVersion 'TServer
clientTHParams :: THandleParams SMPVersion 'TServer,
    forall s. Client s -> TVar AddHTTP
connected :: TVar Bool,
    forall s. Client s -> SystemTime
createdAt :: SystemTime,
    forall s. Client s -> TVar SystemTime
rcvActiveAt :: TVar SystemTime,
    forall s. Client s -> TVar SystemTime
sndActiveAt :: TVar SystemTime
  }

type VerifiedTransmission s = (Maybe (StoreQueue s, QueueRec), Transmission Cmd)

type ResponseAndMessage = (Transmission BrokerMsg, Maybe (Transmission BrokerMsg))

data ServerSub = ServerSub (TVar SubscriptionThread) | ProhibitSub

data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId)

data Sub = Sub
  { Sub -> ServerSub
subThread :: ServerSub, -- Nothing value indicates that sub
    Sub -> TVar (Maybe (MsgId, SystemSeconds))
delivered :: TVar (Maybe (MsgId, SystemSeconds))
  }

newServer :: IO (Server s)
newServer :: forall s. IO (Server s)
newServer = do
  ServerClients s
clients <- TVar (IntMap (Client s)) -> ServerClients s
forall s. TVar (IntMap (Client s)) -> ServerClients s
ServerClients (TVar (IntMap (Client s)) -> ServerClients s)
-> IO (TVar (IntMap (Client s))) -> IO (ServerClients s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IntMap (Client s) -> IO (TVar (IntMap (Client s)))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO IntMap (Client s)
forall a. Monoid a => a
mempty
  ServerSubscribers s
subscribers <- IO (ServerSubscribers s)
forall s. IO (ServerSubscribers s)
newServerSubscribers
  ServerSubscribers s
ntfSubscribers <- IO (ServerSubscribers s)
forall s. IO (ServerSubscribers s)
newServerSubscribers
  Lock
savingLock <- IO Lock
createLockIO
  Server s -> IO (Server s)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Server {ServerClients s
$sel:clients:Server :: ServerClients s
clients :: ServerClients s
clients, ServerSubscribers s
$sel:subscribers:Server :: ServerSubscribers s
subscribers :: ServerSubscribers s
subscribers, ServerSubscribers s
$sel:ntfSubscribers:Server :: ServerSubscribers s
ntfSubscribers :: ServerSubscribers s
ntfSubscribers, Lock
$sel:savingLock:Server :: Lock
savingLock :: Lock
savingLock}

getServerClients :: Server s -> IO (IntMap (Client s))
getServerClients :: forall s. Server s -> IO (IntMap (Client s))
getServerClients = TVar (IntMap (Client s)) -> IO (IntMap (Client s))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar (IntMap (Client s)) -> IO (IntMap (Client s)))
-> (Server s -> TVar (IntMap (Client s)))
-> Server s
-> IO (IntMap (Client s))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ServerClients s -> TVar (IntMap (Client s))
forall s. ServerClients s -> TVar (IntMap (Client s))
serverClients (ServerClients s -> TVar (IntMap (Client s)))
-> (Server s -> ServerClients s)
-> Server s
-> TVar (IntMap (Client s))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Server s -> ServerClients s
forall s. Server s -> ServerClients s
clients
{-# INLINE getServerClients #-}

getServerClient :: ClientId -> Server s -> IO (Maybe (Client s))
getServerClient :: forall s. Int -> Server s -> IO (Maybe (Client s))
getServerClient Int
cId Server s
s = Int -> IntMap (Client s) -> Maybe (Client s)
forall a. Int -> IntMap a -> Maybe a
IM.lookup Int
cId (IntMap (Client s) -> Maybe (Client s))
-> IO (IntMap (Client s)) -> IO (Maybe (Client s))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Server s -> IO (IntMap (Client s))
forall s. Server s -> IO (IntMap (Client s))
getServerClients Server s
s
{-# INLINE getServerClient #-}

insertServerClient :: Client s -> Server s -> IO Bool
insertServerClient :: forall s. Client s -> Server s -> IO AddHTTP
insertServerClient c :: Client s
c@Client {Int
$sel:clientId:Client :: forall s. Client s -> Int
clientId :: Int
clientId, TVar AddHTTP
$sel:connected:Client :: forall s. Client s -> TVar AddHTTP
connected :: TVar AddHTTP
connected} Server {ServerClients s
$sel:clients:Server :: forall s. Server s -> ServerClients s
clients :: ServerClients s
clients} =
  STM AddHTTP -> IO AddHTTP
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM AddHTTP -> IO AddHTTP) -> STM AddHTTP -> IO AddHTTP
forall a b. (a -> b) -> a -> b
$
    STM AddHTTP -> STM AddHTTP -> STM AddHTTP -> STM AddHTTP
forall (m :: * -> *) a. Monad m => m AddHTTP -> m a -> m a -> m a
ifM
      (TVar AddHTTP -> STM AddHTTP
forall a. TVar a -> STM a
readTVar TVar AddHTTP
connected)
      (AddHTTP
True AddHTTP -> STM () -> STM AddHTTP
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TVar (IntMap (Client s))
-> (IntMap (Client s) -> IntMap (Client s)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (ServerClients s -> TVar (IntMap (Client s))
forall s. ServerClients s -> TVar (IntMap (Client s))
serverClients ServerClients s
clients) (Int -> Client s -> IntMap (Client s) -> IntMap (Client s)
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
clientId Client s
c))
      (AddHTTP -> STM AddHTTP
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure AddHTTP
False)
{-# INLINE insertServerClient #-}

deleteServerClient :: ClientId -> Server s -> IO ()
deleteServerClient :: forall s. Int -> Server s -> IO ()
deleteServerClient Int
cId Server {ServerClients s
$sel:clients:Server :: forall s. Server s -> ServerClients s
clients :: ServerClients s
clients} = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (IntMap (Client s))
-> (IntMap (Client s) -> IntMap (Client s)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (ServerClients s -> TVar (IntMap (Client s))
forall s. ServerClients s -> TVar (IntMap (Client s))
serverClients ServerClients s
clients) ((IntMap (Client s) -> IntMap (Client s)) -> STM ())
-> (IntMap (Client s) -> IntMap (Client s)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Int -> IntMap (Client s) -> IntMap (Client s)
forall a. Int -> IntMap a -> IntMap a
IM.delete Int
cId
{-# INLINE deleteServerClient #-}

newServerSubscribers :: IO (ServerSubscribers s)
newServerSubscribers :: forall s. IO (ServerSubscribers s)
newServerSubscribers = do
  TQueue (ClientSub, Int)
subQ <- IO (TQueue (ClientSub, Int))
forall (m :: * -> *) a. MonadIO m => m (TQueue a)
newTQueueIO
  SubscribedClients s
queueSubscribers <- TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s
forall s.
TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s
SubscribedClients (TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s)
-> IO (TMap EntityId (TVar (Maybe (Client s))))
-> IO (SubscribedClients s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (TMap EntityId (TVar (Maybe (Client s))))
forall k a. IO (TMap k a)
TM.emptyIO
  SubscribedClients s
serviceSubscribers <- TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s
forall s.
TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s
SubscribedClients (TMap EntityId (TVar (Maybe (Client s))) -> SubscribedClients s)
-> IO (TMap EntityId (TVar (Maybe (Client s))))
-> IO (SubscribedClients s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (TMap EntityId (TVar (Maybe (Client s))))
forall k a. IO (TMap k a)
TM.emptyIO
  TVar Int64
totalServiceSubs <- Int64 -> IO (TVar Int64)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int64
0
  TVar IntSet
subClients <- IntSet -> IO (TVar IntSet)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO IntSet
IS.empty
  TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
pendingEvents <- IntMap (NonEmpty (EntityId, BrokerMsg))
-> IO (TVar (IntMap (NonEmpty (EntityId, BrokerMsg))))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO IntMap (NonEmpty (EntityId, BrokerMsg))
forall a. IntMap a
IM.empty
  ServerSubscribers s -> IO (ServerSubscribers s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ServerSubscribers {TQueue (ClientSub, Int)
$sel:subQ:ServerSubscribers :: TQueue (ClientSub, Int)
subQ :: TQueue (ClientSub, Int)
subQ, SubscribedClients s
$sel:queueSubscribers:ServerSubscribers :: SubscribedClients s
queueSubscribers :: SubscribedClients s
queueSubscribers, SubscribedClients s
$sel:serviceSubscribers:ServerSubscribers :: SubscribedClients s
serviceSubscribers :: SubscribedClients s
serviceSubscribers, TVar Int64
$sel:totalServiceSubs:ServerSubscribers :: TVar Int64
totalServiceSubs :: TVar Int64
totalServiceSubs, TVar IntSet
$sel:subClients:ServerSubscribers :: TVar IntSet
subClients :: TVar IntSet
subClients, TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
$sel:pendingEvents:ServerSubscribers :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
pendingEvents}

newClient :: ClientId -> Natural -> THandleParams SMPVersion 'TServer -> SystemTime -> IO (Client s)
newClient :: forall s.
Int
-> Natural
-> THandleParams SMPVersion 'TServer
-> SystemTime
-> IO (Client s)
newClient Int
clientId Natural
qSize THandleParams SMPVersion 'TServer
clientTHParams SystemTime
createdAt = do
  TMap EntityId Sub
subscriptions <- IO (TMap EntityId Sub)
forall k a. IO (TMap k a)
TM.emptyIO
  TMap EntityId ()
ntfSubscriptions <- IO (TMap EntityId ())
forall k a. IO (TMap k a)
TM.emptyIO
  TVar AddHTTP
serviceSubscribed <- AddHTTP -> IO (TVar AddHTTP)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO AddHTTP
False
  TVar AddHTTP
ntfServiceSubscribed <- AddHTTP -> IO (TVar AddHTTP)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO AddHTTP
False
  TVar Int64
serviceSubsCount <- Int64 -> IO (TVar Int64)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int64
0
  TVar Int64
ntfServiceSubsCount <- Int64 -> IO (TVar Int64)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int64
0
  TBQueue
  (NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd))
rcvQ <- Natural
-> IO
     (TBQueue
        (NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd)))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
qSize
  TBQueue
  (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg])
sndQ <- Natural
-> IO
     (TBQueue
        (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
qSize
  TBQueue (NonEmpty (Transmission BrokerMsg))
msgQ <- Natural -> IO (TBQueue (NonEmpty (Transmission BrokerMsg)))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
qSize
  TVar Int
procThreads <- Int -> IO (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
0
  TVar (IntMap (Weak ThreadId))
endThreads <- IntMap (Weak ThreadId) -> IO (TVar (IntMap (Weak ThreadId)))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO IntMap (Weak ThreadId)
forall a. IntMap a
IM.empty
  TVar Int
endThreadSeq <- Int -> IO (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
0
  TVar AddHTTP
connected <- AddHTTP -> IO (TVar AddHTTP)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO AddHTTP
True
  TVar SystemTime
rcvActiveAt <- SystemTime -> IO (TVar SystemTime)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO SystemTime
createdAt
  TVar SystemTime
sndActiveAt <- SystemTime -> IO (TVar SystemTime)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO SystemTime
createdAt
  Client s -> IO (Client s)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
    Client
      { Int
$sel:clientId:Client :: Int
clientId :: Int
clientId,
        TMap EntityId Sub
$sel:subscriptions:Client :: TMap EntityId Sub
subscriptions :: TMap EntityId Sub
subscriptions,
        TMap EntityId ()
$sel:ntfSubscriptions:Client :: TMap EntityId ()
ntfSubscriptions :: TMap EntityId ()
ntfSubscriptions,
        TVar AddHTTP
$sel:serviceSubscribed:Client :: TVar AddHTTP
serviceSubscribed :: TVar AddHTTP
serviceSubscribed,
        TVar AddHTTP
$sel:ntfServiceSubscribed:Client :: TVar AddHTTP
ntfServiceSubscribed :: TVar AddHTTP
ntfServiceSubscribed,
        TVar Int64
$sel:serviceSubsCount:Client :: TVar Int64
serviceSubsCount :: TVar Int64
serviceSubsCount,
        TVar Int64
$sel:ntfServiceSubsCount:Client :: TVar Int64
ntfServiceSubsCount :: TVar Int64
ntfServiceSubsCount,
        TBQueue
  (NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd))
$sel:rcvQ:Client :: TBQueue
  (NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd))
rcvQ :: TBQueue
  (NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd))
rcvQ,
        TBQueue
  (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg])
$sel:sndQ:Client :: TBQueue
  (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg])
sndQ :: TBQueue
  (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg])
sndQ,
        TBQueue (NonEmpty (Transmission BrokerMsg))
$sel:msgQ:Client :: TBQueue (NonEmpty (Transmission BrokerMsg))
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg))
msgQ,
        TVar Int
$sel:procThreads:Client :: TVar Int
procThreads :: TVar Int
procThreads,
        TVar (IntMap (Weak ThreadId))
$sel:endThreads:Client :: TVar (IntMap (Weak ThreadId))
endThreads :: TVar (IntMap (Weak ThreadId))
endThreads,
        TVar Int
$sel:endThreadSeq:Client :: TVar Int
endThreadSeq :: TVar Int
endThreadSeq,
        THandleParams SMPVersion 'TServer
$sel:clientTHParams:Client :: THandleParams SMPVersion 'TServer
clientTHParams :: THandleParams SMPVersion 'TServer
clientTHParams,
        TVar AddHTTP
$sel:connected:Client :: TVar AddHTTP
connected :: TVar AddHTTP
connected,
        SystemTime
$sel:createdAt:Client :: SystemTime
createdAt :: SystemTime
createdAt,
        TVar SystemTime
$sel:rcvActiveAt:Client :: TVar SystemTime
rcvActiveAt :: TVar SystemTime
rcvActiveAt,
        TVar SystemTime
$sel:sndActiveAt:Client :: TVar SystemTime
sndActiveAt :: TVar SystemTime
sndActiveAt
      }

newSubscription :: SubscriptionThread -> STM Sub
newSubscription :: SubscriptionThread -> STM Sub
newSubscription SubscriptionThread
st = do
  TVar (Maybe (MsgId, SystemSeconds))
delivered <- Maybe (MsgId, SystemSeconds)
-> STM (TVar (Maybe (MsgId, SystemSeconds)))
forall a. a -> STM (TVar a)
newTVar Maybe (MsgId, SystemSeconds)
forall a. Maybe a
Nothing
  ServerSub
subThread <- TVar SubscriptionThread -> ServerSub
ServerSub (TVar SubscriptionThread -> ServerSub)
-> STM (TVar SubscriptionThread) -> STM ServerSub
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SubscriptionThread -> STM (TVar SubscriptionThread)
forall a. a -> STM (TVar a)
newTVar SubscriptionThread
st
  Sub -> STM Sub
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Sub {ServerSub
$sel:subThread:Sub :: ServerSub
subThread :: ServerSub
subThread, TVar (Maybe (MsgId, SystemSeconds))
$sel:delivered:Sub :: TVar (Maybe (MsgId, SystemSeconds))
delivered :: TVar (Maybe (MsgId, SystemSeconds))
delivered}

newProhibitedSub :: STM Sub
newProhibitedSub :: STM Sub
newProhibitedSub = do
  TVar (Maybe (MsgId, SystemSeconds))
delivered <- Maybe (MsgId, SystemSeconds)
-> STM (TVar (Maybe (MsgId, SystemSeconds)))
forall a. a -> STM (TVar a)
newTVar Maybe (MsgId, SystemSeconds)
forall a. Maybe a
Nothing
  Sub -> STM Sub
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Sub {$sel:subThread:Sub :: ServerSub
subThread = ServerSub
ProhibitSub, TVar (Maybe (MsgId, SystemSeconds))
$sel:delivered:Sub :: TVar (Maybe (MsgId, SystemSeconds))
delivered :: TVar (Maybe (MsgId, SystemSeconds))
delivered}

newEnv :: ServerConfig s -> IO (Env s)
newEnv :: forall s. ServerConfig s -> IO (Env s)
newEnv config :: ServerConfig s
config@ServerConfig {ServerCredentials
$sel:smpCredentials:ServerConfig :: forall s. ServerConfig s -> ServerCredentials
smpCredentials :: ServerCredentials
smpCredentials, Maybe ServerCredentials
$sel:httpCredentials:ServerConfig :: forall s. ServerConfig s -> Maybe ServerCredentials
httpCredentials :: Maybe ServerCredentials
httpCredentials, ServerStoreCfg s
$sel:serverStoreCfg:ServerConfig :: forall s. ServerConfig s -> ServerStoreCfg s
serverStoreCfg :: ServerStoreCfg s
serverStoreCfg, SMPClientAgentConfig
$sel:smpAgentCfg:ServerConfig :: forall s. ServerConfig s -> SMPClientAgentConfig
smpAgentCfg :: SMPClientAgentConfig
smpAgentCfg, Maybe ServerPublicInfo
$sel:information:ServerConfig :: forall s. ServerConfig s -> Maybe ServerPublicInfo
information :: Maybe ServerPublicInfo
information, Maybe ExpirationConfig
$sel:messageExpiration:ServerConfig :: forall s. ServerConfig s -> Maybe ExpirationConfig
messageExpiration :: Maybe ExpirationConfig
messageExpiration, Int64
$sel:idleQueueInterval:ServerConfig :: forall s. ServerConfig s -> Int64
idleQueueInterval :: Int64
idleQueueInterval, Int
$sel:msgQueueQuota:ServerConfig :: forall s. ServerConfig s -> Int
msgQueueQuota :: Int
msgQueueQuota, Int
$sel:maxJournalMsgCount:ServerConfig :: forall s. ServerConfig s -> Int
maxJournalMsgCount :: Int
maxJournalMsgCount, Int
$sel:maxJournalStateLines:ServerConfig :: forall s. ServerConfig s -> Int
maxJournalStateLines :: Int
maxJournalStateLines} = do
  TVar AddHTTP
serverActive <- AddHTTP -> IO (TVar AddHTTP)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO AddHTTP
True
  Server s
server <- IO (Server s)
forall s. IO (Server s)
newServer
  MsgStore s
msgStore_ <- case ServerStoreCfg s
serverStoreCfg of
    SSCMemory Maybe StorePaths
storePaths_ -> do
      let storePath :: Maybe String
storePath = StorePaths -> Maybe String
storeMsgsFile (StorePaths -> Maybe String) -> Maybe StorePaths -> Maybe String
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe StorePaths
storePaths_
      STMMsgStore
ms <- MsgStoreConfig STMMsgStore -> IO STMMsgStore
forall s. MsgStoreClass s => MsgStoreConfig s -> IO s
newMsgStore STMStoreConfig {Maybe String
storePath :: Maybe String
$sel:storePath:STMStoreConfig :: Maybe String
storePath, $sel:quota:STMStoreConfig :: Int
quota = Int
msgQueueQuota}
      Maybe StorePaths -> (StorePaths -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe StorePaths
storePaths_ ((StorePaths -> IO ()) -> IO ()) -> (StorePaths -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \StorePaths {$sel:storeLogFile:StorePaths :: StorePaths -> String
storeLogFile = String
f} -> (EntityId -> QueueRec -> IO STMQueue)
-> String -> STMQueueStore STMQueue -> IO ()
forall q.
StoreQueueClass q =>
(EntityId -> QueueRec -> IO q)
-> String -> STMQueueStore q -> IO ()
loadStoreLog (STMMsgStore
-> AddHTTP -> EntityId -> QueueRec -> IO (StoreQueue STMMsgStore)
forall s.
MsgStoreClass s =>
s -> AddHTTP -> EntityId -> QueueRec -> IO (StoreQueue s)
mkQueue STMMsgStore
ms AddHTTP
True) String
f (STMQueueStore STMQueue -> IO ())
-> STMQueueStore STMQueue -> IO ()
forall a b. (a -> b) -> a -> b
$ STMMsgStore -> QueueStore STMMsgStore
forall s. MsgStoreClass s => s -> QueueStore s
queueStore STMMsgStore
ms
      MsgStore STMMsgStore -> IO (MsgStore STMMsgStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgStore STMMsgStore -> IO (MsgStore STMMsgStore))
-> MsgStore STMMsgStore -> IO (MsgStore STMMsgStore)
forall a b. (a -> b) -> a -> b
$ STMMsgStore -> MsgStore STMMsgStore
StoreMemory STMMsgStore
ms
    SSCMemoryJournal {String
$sel:storeLogFile:SSCMemory :: ServerStoreCfg (JournalMsgStore 'QSMemory) -> String
storeLogFile :: String
storeLogFile, String
$sel:storeMsgsPath:SSCMemory :: ServerStoreCfg (JournalMsgStore 'QSMemory) -> String
storeMsgsPath :: String
storeMsgsPath} -> do
      Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logWarn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
        Text
"Journal message store is deprecated and will be removed soon.\n"
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"Please migrate to in-memory storage using `journal export` command.\n"
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"After that you can migrate to PostgreSQL using `database import` command."
      let qsCfg :: QStoreCfg 'QSMemory
qsCfg = QStoreCfg 'QSMemory
MQStoreCfg
          cfg :: JournalStoreConfig 'QSMemory
cfg = QStoreCfg 'QSMemory
-> String
-> Int
-> Int
-> Int
-> Int64
-> JournalStoreConfig 'QSMemory
forall (s :: QSType).
QStoreCfg s
-> String -> Int -> Int -> Int -> Int64 -> JournalStoreConfig s
mkJournalStoreConfig QStoreCfg 'QSMemory
qsCfg String
storeMsgsPath Int
msgQueueQuota Int
maxJournalMsgCount Int
maxJournalStateLines Int64
idleQueueInterval
      JournalMsgStore 'QSMemory
ms <- MsgStoreConfig (JournalMsgStore 'QSMemory)
-> IO (JournalMsgStore 'QSMemory)
forall s. MsgStoreClass s => MsgStoreConfig s -> IO s
newMsgStore MsgStoreConfig (JournalMsgStore 'QSMemory)
JournalStoreConfig 'QSMemory
cfg
      (EntityId -> QueueRec -> IO (JournalQueue 'QSMemory))
-> String -> STMQueueStore (JournalQueue 'QSMemory) -> IO ()
forall q.
StoreQueueClass q =>
(EntityId -> QueueRec -> IO q)
-> String -> STMQueueStore q -> IO ()
loadStoreLog (JournalMsgStore 'QSMemory
-> AddHTTP
-> EntityId
-> QueueRec
-> IO (StoreQueue (JournalMsgStore 'QSMemory))
forall s.
MsgStoreClass s =>
s -> AddHTTP -> EntityId -> QueueRec -> IO (StoreQueue s)
mkQueue JournalMsgStore 'QSMemory
ms AddHTTP
True) String
storeLogFile (STMQueueStore (JournalQueue 'QSMemory) -> IO ())
-> STMQueueStore (JournalQueue 'QSMemory) -> IO ()
forall a b. (a -> b) -> a -> b
$ JournalMsgStore 'QSMemory -> STMQueueStore (JournalQueue 'QSMemory)
stmQueueStore JournalMsgStore 'QSMemory
ms
      MsgStore s -> IO (MsgStore s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MsgStore s -> IO (MsgStore s)) -> MsgStore s -> IO (MsgStore s)
forall a b. (a -> b) -> a -> b
$ JournalMsgStore 'QSMemory -> MsgStore (JournalMsgStore 'QSMemory)
forall (qs :: QSType).
JournalMsgStore qs -> MsgStore (JournalMsgStore qs)
StoreJournal JournalMsgStore 'QSMemory
ms
#if defined(dbServerPostgres)
    SSCDatabaseJournal {storeCfg, storeMsgsPath'} -> do
      let StartOptions {compactLog, confirmMigrations} = startOptions config
          qsCfg = PQStoreCfg (storeCfg {confirmMigrations} :: PostgresStoreCfg)
          cfg = mkJournalStoreConfig qsCfg storeMsgsPath' msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval
      when compactLog $ compactDbStoreLog $ dbStoreLogPath storeCfg
      StoreJournal <$> newMsgStore cfg
    SSCDatabase storeCfg -> do
      let StartOptions {compactLog, confirmMigrations} = startOptions config
          cfg = PostgresMsgStoreCfg storeCfg {confirmMigrations} msgQueueQuota
      when compactLog $ compactDbStoreLog $ dbStoreLogPath storeCfg
      StoreDatabase <$> newMsgStore cfg
#else
    SSCDatabaseJournal {} -> IO (MsgStore s)
forall a. IO a
noPostgresExit
#endif
  NtfStore
ntfStore <- TMap EntityId (TVar [MsgNtf]) -> NtfStore
NtfStore (TMap EntityId (TVar [MsgNtf]) -> NtfStore)
-> IO (TMap EntityId (TVar [MsgNtf])) -> IO NtfStore
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (TMap EntityId (TVar [MsgNtf]))
forall k a. IO (TMap k a)
TM.emptyIO
  TVar ChaChaDRG
random <- IO (TVar ChaChaDRG)
C.newRandom
  Credential
tlsServerCreds <- String -> ServerCredentials -> IO Credential
getCredentials String
"SMP" ServerCredentials
smpCredentials
  Maybe Credential
httpServerCreds <- (ServerCredentials -> IO Credential)
-> Maybe ServerCredentials -> IO (Maybe Credential)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM (String -> ServerCredentials -> IO Credential
getCredentials String
"HTTPS") Maybe ServerCredentials
httpCredentials
  (Credential -> IO ()) -> Maybe Credential -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Credential -> IO ()
checkHTTPSCredentials Maybe Credential
httpServerCreds
  Fingerprint MsgId
fp <- ServerCredentials -> IO Fingerprint
loadFingerprint ServerCredentials
smpCredentials
  let serverIdentity :: KeyHash
serverIdentity = MsgId -> KeyHash
KeyHash MsgId
fp
  ServerStats
serverStats <- UTCTime -> IO ServerStats
newServerStats (UTCTime -> IO ServerStats) -> IO UTCTime -> IO ServerStats
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO UTCTime
getCurrentTime
  TVar [(String, SocketState)]
sockets <- [(String, SocketState)] -> IO (TVar [(String, SocketState)])
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO []
  TVar Int
clientSeq <- Int -> IO (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
0
  ProxyAgent
proxyAgent <- SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent
newSMPProxyAgent SMPClientAgentConfig
smpAgentCfg TVar ChaChaDRG
random
  Env s -> IO (Env s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    Env
      { TVar AddHTTP
$sel:serverActive:Env :: TVar AddHTTP
serverActive :: TVar AddHTTP
serverActive,
        ServerConfig s
$sel:config:Env :: ServerConfig s
config :: ServerConfig s
config,
        ServerInformation
$sel:serverInfo:Env :: ServerInformation
serverInfo :: ServerInformation
serverInfo,
        Server s
$sel:server:Env :: Server s
server :: Server s
server,
        KeyHash
$sel:serverIdentity:Env :: KeyHash
serverIdentity :: KeyHash
serverIdentity,
        MsgStore s
$sel:msgStore_:Env :: MsgStore s
msgStore_ :: MsgStore s
msgStore_,
        NtfStore
$sel:ntfStore:Env :: NtfStore
ntfStore :: NtfStore
ntfStore,
        TVar ChaChaDRG
$sel:random:Env :: TVar ChaChaDRG
random :: TVar ChaChaDRG
random,
        Credential
$sel:tlsServerCreds:Env :: Credential
tlsServerCreds :: Credential
tlsServerCreds,
        Maybe Credential
$sel:httpServerCreds:Env :: Maybe Credential
httpServerCreds :: Maybe Credential
httpServerCreds,
        ServerStats
$sel:serverStats:Env :: ServerStats
serverStats :: ServerStats
serverStats,
        TVar [(String, SocketState)]
$sel:sockets:Env :: TVar [(String, SocketState)]
sockets :: TVar [(String, SocketState)]
sockets,
        TVar Int
$sel:clientSeq:Env :: TVar Int
clientSeq :: TVar Int
clientSeq,
        ProxyAgent
$sel:proxyAgent:Env :: ProxyAgent
proxyAgent :: ProxyAgent
proxyAgent
      }
  where
    loadStoreLog :: StoreQueueClass q => (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO ()
    loadStoreLog :: forall q.
StoreQueueClass q =>
(EntityId -> QueueRec -> IO q)
-> String -> STMQueueStore q -> IO ()
loadStoreLog EntityId -> QueueRec -> IO q
mkQ String
f STMQueueStore q
st = do
      Text -> IO ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logNote (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"restoring queues from file " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
f
      StoreLog 'WriteMode
sl <- AddHTTP
-> (EntityId -> QueueRec -> IO q)
-> String
-> STMQueueStore q
-> IO (StoreLog 'WriteMode)
forall q.
StoreQueueClass q =>
AddHTTP
-> (EntityId -> QueueRec -> IO q)
-> String
-> STMQueueStore q
-> IO (StoreLog 'WriteMode)
readWriteQueueStore AddHTTP
False EntityId -> QueueRec -> IO q
mkQ String
f STMQueueStore q
st
      STMQueueStore q -> StoreLog 'WriteMode -> IO ()
forall q. STMQueueStore q -> StoreLog 'WriteMode -> IO ()
setStoreLog STMQueueStore q
st StoreLog 'WriteMode
sl
#if defined(dbServerPostgres)
    compactDbStoreLog = \case
      Just f -> do
        logNote $ "compacting queues in file " <> T.pack f
        st <- newMsgStore STMStoreConfig {storePath = Nothing, quota = msgQueueQuota}
        -- we don't need to have locks in the map
        sl <- readWriteQueueStore False (mkQueue st False) f (queueStore st)
        setStoreLog (queueStore st) sl
        closeMsgStore st
      Nothing -> do
        logError "Error: `--compact-log` used without `db_store_log` INI option"
        exitFailure
#endif
    getCredentials :: String -> ServerCredentials -> IO Credential
getCredentials String
protocol ServerCredentials
creds = do
      [String]
files <- IO [String]
missingCreds
      AddHTTP -> IO () -> IO ()
forall (f :: * -> *). Applicative f => AddHTTP -> f () -> f ()
unless ([String] -> AddHTTP
forall a. [a] -> AddHTTP
forall (t :: * -> *) a. Foldable t => t a -> AddHTTP
null [String]
files) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"----------\nError: no " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
protocol String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" credentials: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
", " [String]
files
        AddHTTP -> IO () -> IO ()
forall (f :: * -> *). Applicative f => AddHTTP -> f () -> f ()
when (String
protocol String -> String -> AddHTTP
forall a. Eq a => a -> a -> AddHTTP
== String
"HTTPS") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          String -> IO ()
putStrLn String
"Server should serve static pages to show connection links in the browser."
          String -> IO ()
putStrLn String
letsEncrypt
        IO ()
forall a. IO a
exitFailure
      ServerCredentials -> IO Credential
loadServerCredential ServerCredentials
creds
      where
        missingfile :: String -> IO [String]
missingfile String
f = (\AddHTTP
y -> [String
f | AddHTTP -> AddHTTP
not AddHTTP
y]) (AddHTTP -> [String]) -> IO AddHTTP -> IO [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IO AddHTTP
doesFileExist String
f
        missingCreds :: IO [String]
missingCreds = do
          let files :: [String]
files = ([String] -> [String])
-> (String -> [String] -> [String])
-> Maybe String
-> [String]
-> [String]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [String] -> [String]
forall a. a -> a
id (:) (ServerCredentials -> Maybe String
caCertificateFile ServerCredentials
creds) [ServerCredentials -> String
certificateFile ServerCredentials
creds, ServerCredentials -> String
privateKeyFile ServerCredentials
creds]
           in [[String]] -> [String]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[String]] -> [String]) -> IO [[String]] -> IO [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (String -> IO [String]) -> [String] -> IO [[String]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM String -> IO [String]
missingfile [String]
files
    checkHTTPSCredentials :: Credential -> IO ()
checkHTTPSCredentials (X.CertificateChain [SignedExact Certificate]
cc, PrivKey
_k) =
      -- LetsEncrypt provides ECDSA with insecure curve p256 (https://safecurves.cr.yp.to)
      case (SignedExact Certificate -> Certificate)
-> [SignedExact Certificate] -> [Certificate]
forall a b. (a -> b) -> [a] -> [b]
map (Signed Certificate -> Certificate
forall a. (Show a, Eq a, ASN1Object a) => Signed a -> a
X.signedObject (Signed Certificate -> Certificate)
-> (SignedExact Certificate -> Signed Certificate)
-> SignedExact Certificate
-> Certificate
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SignedExact Certificate -> Signed Certificate
forall a. (Show a, Eq a, ASN1Object a) => SignedExact a -> Signed a
X.getSigned) [SignedExact Certificate]
cc of
        X.Certificate {certPubKey :: Certificate -> PubKey
X.certPubKey = X.PubKeyRSA PublicKey
rsa} : [Certificate]
_ca | PublicKey -> Int
RSA.public_size PublicKey
rsa Int -> Int -> AddHTTP
forall a. Ord a => a -> a -> AddHTTP
>= Int
512 -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        [Certificate]
_ -> do
          String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Error: unsupported HTTPS credentials, required 4096-bit RSA\n" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
letsEncrypt
          IO ()
forall a. IO a
exitFailure
    letsEncrypt :: String
letsEncrypt = String
"Use Let's Encrypt to generate: certbot certonly --standalone -d yourdomainname --key-type rsa --rsa-key-size 4096\n----------"
    serverInfo :: ServerInformation
serverInfo =
      ServerInformation
        { Maybe ServerPublicInfo
information :: Maybe ServerPublicInfo
$sel:information:ServerInformation :: Maybe ServerPublicInfo
information,
          $sel:config:ServerInformation :: ServerPublicConfig
config =
            ServerPublicConfig
              { ServerPersistenceMode
persistence :: ServerPersistenceMode
$sel:persistence:ServerPublicConfig :: ServerPersistenceMode
persistence,
                $sel:messageExpiration:ServerPublicConfig :: Maybe Int64
messageExpiration = ExpirationConfig -> Int64
ttl (ExpirationConfig -> Int64)
-> Maybe ExpirationConfig -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ExpirationConfig
messageExpiration,
                $sel:statsEnabled:ServerPublicConfig :: AddHTTP
statsEnabled = Maybe Int64 -> AddHTTP
forall a. Maybe a -> AddHTTP
isJust (Maybe Int64 -> AddHTTP) -> Maybe Int64 -> AddHTTP
forall a b. (a -> b) -> a -> b
$ ServerConfig s -> Maybe Int64
forall s. ServerConfig s -> Maybe Int64
logStatsInterval ServerConfig s
config,
                $sel:newQueuesAllowed:ServerPublicConfig :: AddHTTP
newQueuesAllowed = ServerConfig s -> AddHTTP
forall s. ServerConfig s -> AddHTTP
allowNewQueues ServerConfig s
config,
                $sel:basicAuthEnabled:ServerPublicConfig :: AddHTTP
basicAuthEnabled = Maybe BasicAuth -> AddHTTP
forall a. Maybe a -> AddHTTP
isJust (Maybe BasicAuth -> AddHTTP) -> Maybe BasicAuth -> AddHTTP
forall a b. (a -> b) -> a -> b
$ ServerConfig s -> Maybe BasicAuth
forall s. ServerConfig s -> Maybe BasicAuth
newQueueBasicAuth ServerConfig s
config
              }
        }
      where
        persistence :: ServerPersistenceMode
persistence = case ServerStoreCfg s
serverStoreCfg of
          SSCMemory Maybe StorePaths
sp_ -> case Maybe StorePaths
sp_ of
            Maybe StorePaths
Nothing -> ServerPersistenceMode
SPMMemoryOnly
            Just StorePaths {$sel:storeMsgsFile:StorePaths :: StorePaths -> Maybe String
storeMsgsFile = Just String
_} -> ServerPersistenceMode
SPMMessages
            Maybe StorePaths
_ -> ServerPersistenceMode
SPMQueues
          ServerStoreCfg s
_ -> ServerPersistenceMode
SPMMessages

noPostgresExit :: IO a
noPostgresExit :: forall a. IO a
noPostgresExit = String -> IO ()
putStrLn String
noPostgresExitStr IO () -> IO a -> IO a
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a
forall a. IO a
exitFailure

noPostgresExitStr :: String
noPostgresExitStr :: String
noPostgresExitStr =
  String
"Error: server binary is compiled without support for PostgreSQL database.\n"
    String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"Please download `smp-server-postgres` or re-compile with `cabal build -fserver_postgres`."

mkJournalStoreConfig :: QStoreCfg s -> FilePath -> Int -> Int -> Int -> Int64 -> JournalStoreConfig s
mkJournalStoreConfig :: forall (s :: QSType).
QStoreCfg s
-> String -> Int -> Int -> Int -> Int64 -> JournalStoreConfig s
mkJournalStoreConfig QStoreCfg s
queueStoreCfg String
storePath Int
msgQueueQuota Int
maxJournalMsgCount Int
maxJournalStateLines Int64
idleQueueInterval =
  JournalStoreConfig
    { String
storePath :: String
$sel:storePath:JournalStoreConfig :: String
storePath,
      $sel:quota:JournalStoreConfig :: Int
quota = Int
msgQueueQuota,
      $sel:pathParts:JournalStoreConfig :: Int
pathParts = Int
journalMsgStoreDepth,
      QStoreCfg s
queueStoreCfg :: QStoreCfg s
$sel:queueStoreCfg:JournalStoreConfig :: QStoreCfg s
queueStoreCfg,
      $sel:maxMsgCount:JournalStoreConfig :: Int
maxMsgCount = Int
maxJournalMsgCount,
      $sel:maxStateLines:JournalStoreConfig :: Int
maxStateLines = Int
maxJournalStateLines,
      $sel:stateTailSize:JournalStoreConfig :: Int
stateTailSize = Int
defaultStateTailSize,
      $sel:idleInterval:JournalStoreConfig :: Int64
idleInterval = Int64
idleQueueInterval,
      $sel:expireBackupsAfter:JournalStoreConfig :: NominalDiffTime
expireBackupsAfter = NominalDiffTime
14 NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
nominalDay,
      $sel:keepMinBackups:JournalStoreConfig :: Int
keepMinBackups = Int
2
    }

newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent
newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent
newSMPProxyAgent SMPClientAgentConfig
smpAgentCfg TVar ChaChaDRG
random = do
  SMPClientAgent 'Sender
smpAgent <- SParty 'Sender
-> SMPClientAgentConfig
-> TVar ChaChaDRG
-> IO (SMPClientAgent 'Sender)
forall (p :: Party).
SParty p
-> SMPClientAgentConfig -> TVar ChaChaDRG -> IO (SMPClientAgent p)
newSMPClientAgent SParty 'Sender
SSender SMPClientAgentConfig
smpAgentCfg TVar ChaChaDRG
random
  ProxyAgent -> IO ProxyAgent
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ProxyAgent {SMPClientAgent 'Sender
$sel:smpAgent:ProxyAgent :: SMPClientAgent 'Sender
smpAgent :: SMPClientAgent 'Sender
smpAgent}

readWriteQueueStore :: forall q. StoreQueueClass q => Bool -> (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO (StoreLog 'WriteMode)
readWriteQueueStore :: forall q.
StoreQueueClass q =>
AddHTTP
-> (EntityId -> QueueRec -> IO q)
-> String
-> STMQueueStore q
-> IO (StoreLog 'WriteMode)
readWriteQueueStore AddHTTP
tty EntityId -> QueueRec -> IO q
mkQ = (String -> STMQueueStore q -> IO ())
-> (StoreLog 'WriteMode -> STMQueueStore q -> IO ())
-> String
-> STMQueueStore q
-> IO (StoreLog 'WriteMode)
forall s.
(String -> s -> IO ())
-> (StoreLog 'WriteMode -> s -> IO ())
-> String
-> s
-> IO (StoreLog 'WriteMode)
readWriteStoreLog (AddHTTP
-> (EntityId -> QueueRec -> IO q)
-> String
-> STMQueueStore q
-> IO ()
forall q.
StoreQueueClass q =>
AddHTTP
-> (EntityId -> QueueRec -> IO q)
-> String
-> STMQueueStore q
-> IO ()
readQueueStore AddHTTP
tty EntityId -> QueueRec -> IO q
mkQ) (forall q.
StoreQueueClass q =>
StoreLog 'WriteMode -> STMQueueStore q -> IO ()
writeQueueStore @q)