{-# LANGUAGE DataKinds #-}
{-# LANGUAGE NamedFieldPuns #-}
module Simplex.Messaging.Transport.HTTP2.Server where
import Control.Concurrent.Async (Async, async, uninterruptibleCancel)
import Control.Concurrent.STM
import Control.Monad
import Data.Time.Clock.System (getSystemTime, systemSeconds)
import Network.HPACK (BufferSize)
import Network.HTTP2.Server (Request, Response)
import qualified Network.HTTP2.Server as H
import Network.Socket
import qualified Network.TLS as T
import Numeric.Natural (Natural)
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Transport (ALPN, SessionId, TLS, closeConnection, tlsALPN, tlsUniq)
import Simplex.Messaging.Transport.HTTP2
import Simplex.Messaging.Transport.Server (ServerCredentials, TransportServerConfig (..), loadServerCredential, runTransportServer)
import Simplex.Messaging.Util (threadDelay')
import UnliftIO (finally)
import UnliftIO.Concurrent (forkIO, killThread)
type HTTP2ServerFunc = SessionId -> Maybe ALPN -> Request -> (Response -> IO ()) -> IO ()
data HTTP2ServerConfig = HTTP2ServerConfig
{ HTTP2ServerConfig -> Natural
qSize :: Natural,
HTTP2ServerConfig -> ServiceName
http2Port :: ServiceName,
HTTP2ServerConfig -> BufferSize
bufferSize :: BufferSize,
HTTP2ServerConfig -> BufferSize
bodyHeadSize :: Int,
HTTP2ServerConfig -> Supported
serverSupported :: T.Supported,
HTTP2ServerConfig -> ServerCredentials
https2Credentials :: ServerCredentials,
HTTP2ServerConfig -> TransportServerConfig
transportConfig :: TransportServerConfig
}
deriving (BufferSize -> HTTP2ServerConfig -> ShowS
[HTTP2ServerConfig] -> ShowS
HTTP2ServerConfig -> ServiceName
(BufferSize -> HTTP2ServerConfig -> ShowS)
-> (HTTP2ServerConfig -> ServiceName)
-> ([HTTP2ServerConfig] -> ShowS)
-> Show HTTP2ServerConfig
forall a.
(BufferSize -> a -> ShowS)
-> (a -> ServiceName) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: BufferSize -> HTTP2ServerConfig -> ShowS
showsPrec :: BufferSize -> HTTP2ServerConfig -> ShowS
$cshow :: HTTP2ServerConfig -> ServiceName
show :: HTTP2ServerConfig -> ServiceName
$cshowList :: [HTTP2ServerConfig] -> ShowS
showList :: [HTTP2ServerConfig] -> ShowS
Show)
data HTTP2Request = HTTP2Request
{ HTTP2Request -> SessionId
sessionId :: SessionId,
HTTP2Request -> Maybe SessionId
sessionALPN :: Maybe ALPN,
HTTP2Request -> Request
request :: Request,
HTTP2Request -> HTTP2Body
reqBody :: HTTP2Body,
HTTP2Request -> Response -> IO ()
sendResponse :: Response -> IO ()
}
data HTTP2Server = HTTP2Server
{ HTTP2Server -> Async ()
action :: Async (),
HTTP2Server -> TBQueue HTTP2Request
reqQ :: TBQueue HTTP2Request
}
getHTTP2Server :: HTTP2ServerConfig -> IO HTTP2Server
getHTTP2Server :: HTTP2ServerConfig -> IO HTTP2Server
getHTTP2Server HTTP2ServerConfig {Natural
qSize :: HTTP2ServerConfig -> Natural
qSize :: Natural
qSize, ServiceName
http2Port :: HTTP2ServerConfig -> ServiceName
http2Port :: ServiceName
http2Port, BufferSize
bufferSize :: HTTP2ServerConfig -> BufferSize
bufferSize :: BufferSize
bufferSize, BufferSize
bodyHeadSize :: HTTP2ServerConfig -> BufferSize
bodyHeadSize :: BufferSize
bodyHeadSize, Supported
serverSupported :: HTTP2ServerConfig -> Supported
serverSupported :: Supported
serverSupported, ServerCredentials
https2Credentials :: HTTP2ServerConfig -> ServerCredentials
https2Credentials :: ServerCredentials
https2Credentials, TransportServerConfig
transportConfig :: HTTP2ServerConfig -> TransportServerConfig
transportConfig :: TransportServerConfig
transportConfig} = do
Credential
srvCreds <- ServerCredentials -> IO Credential
loadServerCredential ServerCredentials
https2Credentials
TMVar Bool
started <- IO (TMVar Bool)
forall a. IO (TMVar a)
newEmptyTMVarIO
TBQueue HTTP2Request
reqQ <- Natural -> IO (TBQueue HTTP2Request)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
qSize
Async ()
action <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$
TMVar Bool
-> ServiceName
-> BufferSize
-> Supported
-> Credential
-> TransportServerConfig
-> Maybe ExpirationConfig
-> (SessionId -> IO ())
-> HTTP2ServerFunc
-> IO ()
runHTTP2Server TMVar Bool
started ServiceName
http2Port BufferSize
bufferSize Supported
serverSupported Credential
srvCreds TransportServerConfig
transportConfig Maybe ExpirationConfig
forall a. Maybe a
Nothing (IO () -> SessionId -> IO ()
forall a b. a -> b -> a
const (IO () -> SessionId -> IO ()) -> IO () -> SessionId -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (HTTP2ServerFunc -> IO ()) -> HTTP2ServerFunc -> IO ()
forall a b. (a -> b) -> a -> b
$ \SessionId
sessionId Maybe SessionId
sessionALPN Request
r Response -> IO ()
sendResponse -> do
HTTP2Body
reqBody <- Request -> BufferSize -> IO HTTP2Body
forall a. HTTP2BodyChunk a => a -> BufferSize -> IO HTTP2Body
getHTTP2Body Request
r BufferSize
bodyHeadSize
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue HTTP2Request -> HTTP2Request -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue HTTP2Request
reqQ HTTP2Request {SessionId
sessionId :: SessionId
sessionId :: SessionId
sessionId, Maybe SessionId
sessionALPN :: Maybe SessionId
sessionALPN :: Maybe SessionId
sessionALPN, request :: Request
request = Request
r, HTTP2Body
reqBody :: HTTP2Body
reqBody :: HTTP2Body
reqBody, Response -> IO ()
sendResponse :: Response -> IO ()
sendResponse :: Response -> IO ()
sendResponse}
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (STM Bool -> IO Bool) -> STM Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO ()) -> STM Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar Bool -> STM Bool
forall a. TMVar a -> STM a
takeTMVar TMVar Bool
started
HTTP2Server -> IO HTTP2Server
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HTTP2Server {Async ()
action :: Async ()
action :: Async ()
action, TBQueue HTTP2Request
reqQ :: TBQueue HTTP2Request
reqQ :: TBQueue HTTP2Request
reqQ}
closeHTTP2Server :: HTTP2Server -> IO ()
closeHTTP2Server :: HTTP2Server -> IO ()
closeHTTP2Server = Async () -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel (Async () -> IO ())
-> (HTTP2Server -> Async ()) -> HTTP2Server -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HTTP2Server -> Async ()
action
runHTTP2Server :: TMVar Bool -> ServiceName -> BufferSize -> T.Supported -> T.Credential -> TransportServerConfig -> Maybe ExpirationConfig -> (SessionId -> IO ()) -> HTTP2ServerFunc -> IO ()
runHTTP2Server :: TMVar Bool
-> ServiceName
-> BufferSize
-> Supported
-> Credential
-> TransportServerConfig
-> Maybe ExpirationConfig
-> (SessionId -> IO ())
-> HTTP2ServerFunc
-> IO ()
runHTTP2Server TMVar Bool
started ServiceName
port BufferSize
bufferSize Supported
srvSupported Credential
srvCreds TransportServerConfig
transportConfig Maybe ExpirationConfig
expCfg_ SessionId -> IO ()
clientFinished = Maybe ExpirationConfig
-> (SessionId -> IO ())
-> BufferSize
-> ((TLS 'TServer -> IO ()) -> IO ())
-> HTTP2ServerFunc
-> IO ()
forall (p :: TransportPeer) a.
Maybe ExpirationConfig
-> (SessionId -> IO ())
-> BufferSize
-> ((TLS p -> IO ()) -> a)
-> HTTP2ServerFunc
-> a
runHTTP2ServerWith_ Maybe ExpirationConfig
expCfg_ SessionId -> IO ()
clientFinished BufferSize
bufferSize (TLS 'TServer -> IO ()) -> IO ()
setup
where
setup :: (TLS 'TServer -> IO ()) -> IO ()
setup = TMVar Bool
-> ServiceName
-> Supported
-> Credential
-> TransportServerConfig
-> (TLS 'TServer -> IO ())
-> IO ()
forall (c :: TransportPeer -> *).
Transport c =>
TMVar Bool
-> ServiceName
-> Supported
-> Credential
-> TransportServerConfig
-> (c 'TServer -> IO ())
-> IO ()
runTransportServer TMVar Bool
started ServiceName
port Supported
srvSupported Credential
srvCreds TransportServerConfig
transportConfig
runHTTP2ServerWith :: BufferSize -> ((TLS p -> IO ()) -> a) -> HTTP2ServerFunc -> a
runHTTP2ServerWith :: forall (p :: TransportPeer) a.
BufferSize -> ((TLS p -> IO ()) -> a) -> HTTP2ServerFunc -> a
runHTTP2ServerWith = Maybe ExpirationConfig
-> (SessionId -> IO ())
-> BufferSize
-> ((TLS p -> IO ()) -> a)
-> HTTP2ServerFunc
-> a
forall (p :: TransportPeer) a.
Maybe ExpirationConfig
-> (SessionId -> IO ())
-> BufferSize
-> ((TLS p -> IO ()) -> a)
-> HTTP2ServerFunc
-> a
runHTTP2ServerWith_ Maybe ExpirationConfig
forall a. Maybe a
Nothing (\SessionId
_sessId -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
runHTTP2ServerWith_ :: Maybe ExpirationConfig -> (SessionId -> IO ()) -> BufferSize -> ((TLS p -> IO ()) -> a) -> HTTP2ServerFunc -> a
runHTTP2ServerWith_ :: forall (p :: TransportPeer) a.
Maybe ExpirationConfig
-> (SessionId -> IO ())
-> BufferSize
-> ((TLS p -> IO ()) -> a)
-> HTTP2ServerFunc
-> a
runHTTP2ServerWith_ Maybe ExpirationConfig
expCfg_ SessionId -> IO ()
clientFinished BufferSize
bufferSize (TLS p -> IO ()) -> a
setup HTTP2ServerFunc
http2Server = (TLS p -> IO ()) -> a
setup ((TLS p -> IO ()) -> a) -> (TLS p -> IO ()) -> a
forall a b. (a -> b) -> a -> b
$ \TLS p
tls -> do
TVar SystemTime
activeAt <- SystemTime -> IO (TVar SystemTime)
forall a. a -> IO (TVar a)
newTVarIO (SystemTime -> IO (TVar SystemTime))
-> IO SystemTime -> IO (TVar SystemTime)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO SystemTime
getSystemTime
Maybe ThreadId
tid_ <- (ExpirationConfig -> IO ThreadId)
-> Maybe ExpirationConfig -> IO (Maybe ThreadId)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM (IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (IO () -> IO ThreadId)
-> (ExpirationConfig -> IO ()) -> ExpirationConfig -> IO ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TLS p -> TVar SystemTime -> ExpirationConfig -> IO ()
forall {c :: TransportPeer -> *} {p :: TransportPeer}.
Transport c =>
c p -> TVar SystemTime -> ExpirationConfig -> IO ()
expireInactiveClient TLS p
tls TVar SystemTime
activeAt) Maybe ExpirationConfig
expCfg_
BufferSize -> (Config -> IO ()) -> IO () -> TLS p -> IO ()
forall a (p :: TransportPeer).
BufferSize -> (Config -> IO a) -> IO () -> TLS p -> IO a
withHTTP2 BufferSize
bufferSize (TLS p -> TVar SystemTime -> Config -> IO ()
forall {p :: TransportPeer}.
TLS p -> TVar SystemTime -> Config -> IO ()
run TLS p
tls TVar SystemTime
activeAt) (SessionId -> IO ()
clientFinished (SessionId -> IO ()) -> SessionId -> IO ()
forall a b. (a -> b) -> a -> b
$ TLS p -> SessionId
forall (p :: TransportPeer). TLS p -> SessionId
tlsUniq TLS p
tls) TLS p
tls IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` (ThreadId -> IO ()) -> Maybe ThreadId -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread Maybe ThreadId
tid_
where
run :: TLS p -> TVar SystemTime -> Config -> IO ()
run TLS p
tls TVar SystemTime
activeAt Config
cfg = ServerConfig -> Config -> Server -> IO ()
H.run ServerConfig
H.defaultServerConfig Config
cfg (Server -> IO ()) -> Server -> IO ()
forall a b. (a -> b) -> a -> b
$ \Request
req Aux
_aux Response -> [PushPromise] -> IO ()
sendResp -> do
IO SystemTime
getSystemTime IO SystemTime -> (SystemTime -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (SystemTime -> STM ()) -> SystemTime -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar SystemTime -> SystemTime -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar SystemTime
activeAt
HTTP2ServerFunc
http2Server (TLS p -> SessionId
forall (p :: TransportPeer). TLS p -> SessionId
tlsUniq TLS p
tls) (TLS p -> Maybe SessionId
forall (p :: TransportPeer). TLS p -> Maybe SessionId
tlsALPN TLS p
tls) Request
req (Response -> [PushPromise] -> IO ()
`sendResp` [])
expireInactiveClient :: c p -> TVar SystemTime -> ExpirationConfig -> IO ()
expireInactiveClient c p
tls TVar SystemTime
activeAt ExpirationConfig
expCfg = IO ()
loop
where
loop :: IO ()
loop = do
Int64 -> IO ()
threadDelay' (Int64 -> IO ()) -> Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ ExpirationConfig -> Int64
checkInterval ExpirationConfig
expCfg Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
1000000
Int64
old <- ExpirationConfig -> IO Int64
expireBeforeEpoch ExpirationConfig
expCfg
SystemTime
ts <- TVar SystemTime -> IO SystemTime
forall a. TVar a -> IO a
readTVarIO TVar SystemTime
activeAt
if SystemTime -> Int64
systemSeconds SystemTime
ts Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
old
then c p -> IO ()
forall (p :: TransportPeer). c p -> IO ()
forall (c :: TransportPeer -> *) (p :: TransportPeer).
Transport c =>
c p -> IO ()
closeConnection c p
tls
else IO ()
loop