{-# LANGUAGE AllowAmbiguousTypes #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} module Simplex.Messaging.Server.StoreLog.ReadWrite ( writeQueueStore, readQueueStore, ) where import Control.Concurrent.STM import Control.Logger.Simple import Control.Monad import Control.Monad.IO.Class import Control.Monad.Trans.Except import qualified Data.ByteString.Char8 as B import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (ASubscriberParty (..), ErrorType, RecipientId, SParty (..)) import Simplex.Messaging.Server.QueueStore (QueueRec, ServiceRec (..)) import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..), STMService (..)) import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.Util (tshow) import System.IO writeQueueStore :: forall q. StoreQueueClass q => StoreLog 'WriteMode -> STMQueueStore q -> IO () writeQueueStore :: forall q. StoreQueueClass q => StoreLog 'WriteMode -> STMQueueStore q -> IO () writeQueueStore StoreLog 'WriteMode s STMQueueStore q st = do TVar (Map ServiceId STMService) -> IO (Map ServiceId STMService) forall a. TVar a -> IO a readTVarIO (STMQueueStore q -> TVar (Map ServiceId STMService) forall q. STMQueueStore q -> TVar (Map ServiceId STMService) services STMQueueStore q st) IO (Map ServiceId STMService) -> (Map ServiceId STMService -> IO ()) -> IO () forall a b. IO a -> (a -> IO b) -> IO b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= (STMService -> IO ()) -> Map ServiceId STMService -> IO () forall (t :: * -> *) (m :: * -> *) a b. (Foldable t, Monad m) => (a -> m b) -> t a -> m () mapM_ (StoreLog 'WriteMode -> ServiceRec -> IO () logNewService StoreLog 'WriteMode s (ServiceRec -> IO ()) -> (STMService -> ServiceRec) -> STMService -> IO () forall b c a. (b -> c) -> (a -> b) -> a -> c . STMService -> ServiceRec serviceRec) STMQueueStore q -> (q -> IO ()) -> IO () forall a q s. (Monoid a, QueueStoreClass q s) => s -> (q -> IO a) -> IO a withLoadedQueues STMQueueStore q st ((q -> IO ()) -> IO ()) -> (q -> IO ()) -> IO () forall a b. (a -> b) -> a -> b $ q -> IO () writeQueue where writeQueue :: q -> IO () writeQueue :: q -> IO () writeQueue q q = TVar (Maybe QueueRec) -> IO (Maybe QueueRec) forall a. TVar a -> IO a readTVarIO (q -> TVar (Maybe QueueRec) forall q. StoreQueueClass q => q -> TVar (Maybe QueueRec) queueRec q q) IO (Maybe QueueRec) -> (Maybe QueueRec -> IO ()) -> IO () forall a b. IO a -> (a -> IO b) -> IO b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= (QueueRec -> IO ()) -> Maybe QueueRec -> IO () forall (t :: * -> *) (m :: * -> *) a b. (Foldable t, Monad m) => (a -> m b) -> t a -> m () mapM_ (StoreLog 'WriteMode -> ServiceId -> QueueRec -> IO () logCreateQueue StoreLog 'WriteMode s (ServiceId -> QueueRec -> IO ()) -> ServiceId -> QueueRec -> IO () forall a b. (a -> b) -> a -> b $ q -> ServiceId forall q. StoreQueueClass q => q -> ServiceId recipientId q q) readQueueStore :: forall q. StoreQueueClass q => Bool -> (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO () readQueueStore :: forall q. StoreQueueClass q => Bool -> (ServiceId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO () readQueueStore Bool tty ServiceId -> QueueRec -> IO q mkQ FilePath f STMQueueStore q st = Bool -> FilePath -> (Bool -> ByteString -> IO ()) -> IO () readLogLines Bool tty FilePath f ((Bool -> ByteString -> IO ()) -> IO ()) -> (Bool -> ByteString -> IO ()) -> IO () forall a b. (a -> b) -> a -> b $ \Bool _ -> ByteString -> IO () processLine where processLine :: B.ByteString -> IO () processLine :: ByteString -> IO () processLine ByteString s = (FilePath -> IO ()) -> (StoreLogRecord -> IO ()) -> Either FilePath StoreLogRecord -> IO () forall a c b. (a -> c) -> (b -> c) -> Either a b -> c either FilePath -> IO () printError StoreLogRecord -> IO () procLogRecord (ByteString -> Either FilePath StoreLogRecord forall a. StrEncoding a => ByteString -> Either FilePath a strDecode ByteString s) where procLogRecord :: StoreLogRecord -> IO () procLogRecord :: StoreLogRecord -> IO () procLogRecord = \case CreateQueue ServiceId rId QueueRec qr -> STMQueueStore q -> (ServiceId -> QueueRec -> IO q) -> ServiceId -> QueueRec -> IO (Either ErrorType q) forall q s. QueueStoreClass q s => s -> (ServiceId -> QueueRec -> IO q) -> ServiceId -> QueueRec -> IO (Either ErrorType q) addQueue_ STMQueueStore q st ServiceId -> QueueRec -> IO q mkQ ServiceId rId QueueRec qr IO (Either ErrorType q) -> (Either ErrorType q -> IO ()) -> IO () forall a b. IO a -> (a -> IO b) -> IO b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= ServiceId -> Text -> Either ErrorType q -> IO () forall {m :: * -> *} {a} {a} {b}. (MonadIO m, StrEncoding a, Show a) => a -> Text -> Either a b -> m () qError ServiceId rId Text "CreateQueue" CreateLink ServiceId rId ServiceId lnkId QueueLinkData d -> ServiceId -> Text -> (q -> IO (Either ErrorType ())) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId rId Text "CreateLink" ((q -> IO (Either ErrorType ())) -> IO ()) -> (q -> IO (Either ErrorType ())) -> IO () forall a b. (a -> b) -> a -> b $ \q q -> STMQueueStore q -> q -> ServiceId -> QueueLinkData -> IO (Either ErrorType ()) forall q s. QueueStoreClass q s => s -> q -> ServiceId -> QueueLinkData -> IO (Either ErrorType ()) addQueueLinkData STMQueueStore q st q q ServiceId lnkId QueueLinkData d DeleteLink ServiceId rId -> ServiceId -> Text -> (q -> IO (Either ErrorType ())) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId rId Text "DeleteLink" ((q -> IO (Either ErrorType ())) -> IO ()) -> (q -> IO (Either ErrorType ())) -> IO () forall a b. (a -> b) -> a -> b $ \q q -> STMQueueStore q -> q -> IO (Either ErrorType ()) forall q s. QueueStoreClass q s => s -> q -> IO (Either ErrorType ()) deleteQueueLinkData STMQueueStore q st q q SecureQueue ServiceId qId SndPublicAuthKey sKey -> ServiceId -> Text -> (q -> IO (Either ErrorType ())) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId qId Text "SecureQueue" ((q -> IO (Either ErrorType ())) -> IO ()) -> (q -> IO (Either ErrorType ())) -> IO () forall a b. (a -> b) -> a -> b $ \q q -> STMQueueStore q -> q -> SndPublicAuthKey -> IO (Either ErrorType ()) forall q s. QueueStoreClass q s => s -> q -> SndPublicAuthKey -> IO (Either ErrorType ()) secureQueue STMQueueStore q st q q SndPublicAuthKey sKey UpdateKeys ServiceId rId NonEmpty SndPublicAuthKey rKeys -> ServiceId -> Text -> (q -> IO (Either ErrorType ())) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId rId Text "UpdateKeys" ((q -> IO (Either ErrorType ())) -> IO ()) -> (q -> IO (Either ErrorType ())) -> IO () forall a b. (a -> b) -> a -> b $ \q q -> STMQueueStore q -> q -> NonEmpty SndPublicAuthKey -> IO (Either ErrorType ()) forall q s. QueueStoreClass q s => s -> q -> NonEmpty SndPublicAuthKey -> IO (Either ErrorType ()) updateKeys STMQueueStore q st q q NonEmpty SndPublicAuthKey rKeys AddNotifier ServiceId qId NtfCreds ntfCreds -> ServiceId -> Text -> (q -> IO (Either ErrorType (Maybe NtfCreds))) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId qId Text "AddNotifier" ((q -> IO (Either ErrorType (Maybe NtfCreds))) -> IO ()) -> (q -> IO (Either ErrorType (Maybe NtfCreds))) -> IO () forall a b. (a -> b) -> a -> b $ \q q -> STMQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NtfCreds)) forall q s. QueueStoreClass q s => s -> q -> NtfCreds -> IO (Either ErrorType (Maybe NtfCreds)) addQueueNotifier STMQueueStore q st q q NtfCreds ntfCreds SuspendQueue ServiceId qId -> ServiceId -> Text -> (q -> IO (Either ErrorType ())) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId qId Text "SuspendQueue" ((q -> IO (Either ErrorType ())) -> IO ()) -> (q -> IO (Either ErrorType ())) -> IO () forall a b. (a -> b) -> a -> b $ STMQueueStore q -> q -> IO (Either ErrorType ()) forall q s. QueueStoreClass q s => s -> q -> IO (Either ErrorType ()) suspendQueue STMQueueStore q st BlockQueue ServiceId qId BlockingInfo info -> ServiceId -> Text -> (q -> IO (Either ErrorType ())) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId qId Text "BlockQueue" ((q -> IO (Either ErrorType ())) -> IO ()) -> (q -> IO (Either ErrorType ())) -> IO () forall a b. (a -> b) -> a -> b $ \q q -> STMQueueStore q -> q -> BlockingInfo -> IO (Either ErrorType ()) forall q s. QueueStoreClass q s => s -> q -> BlockingInfo -> IO (Either ErrorType ()) blockQueue STMQueueStore q st q q BlockingInfo info UnblockQueue ServiceId qId -> ServiceId -> Text -> (q -> IO (Either ErrorType ())) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId qId Text "UnblockQueue" ((q -> IO (Either ErrorType ())) -> IO ()) -> (q -> IO (Either ErrorType ())) -> IO () forall a b. (a -> b) -> a -> b $ STMQueueStore q -> q -> IO (Either ErrorType ()) forall q s. QueueStoreClass q s => s -> q -> IO (Either ErrorType ()) unblockQueue STMQueueStore q st DeleteQueue ServiceId qId -> ServiceId -> Text -> (q -> IO (Either ErrorType QueueRec)) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId qId Text "DeleteQueue" ((q -> IO (Either ErrorType QueueRec)) -> IO ()) -> (q -> IO (Either ErrorType QueueRec)) -> IO () forall a b. (a -> b) -> a -> b $ STMQueueStore q -> q -> IO (Either ErrorType QueueRec) forall q s. QueueStoreClass q s => s -> q -> IO (Either ErrorType QueueRec) deleteStoreQueue STMQueueStore q st DeleteNotifier ServiceId qId -> ServiceId -> Text -> (q -> IO (Either ErrorType (Maybe NtfCreds))) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId qId Text "DeleteNotifier" ((q -> IO (Either ErrorType (Maybe NtfCreds))) -> IO ()) -> (q -> IO (Either ErrorType (Maybe NtfCreds))) -> IO () forall a b. (a -> b) -> a -> b $ STMQueueStore q -> q -> IO (Either ErrorType (Maybe NtfCreds)) forall q s. QueueStoreClass q s => s -> q -> IO (Either ErrorType (Maybe NtfCreds)) deleteQueueNotifier STMQueueStore q st UpdateTime ServiceId qId SystemDate t -> ServiceId -> Text -> (q -> IO (Either ErrorType QueueRec)) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId qId Text "UpdateTime" ((q -> IO (Either ErrorType QueueRec)) -> IO ()) -> (q -> IO (Either ErrorType QueueRec)) -> IO () forall a b. (a -> b) -> a -> b $ \q q -> STMQueueStore q -> q -> SystemDate -> IO (Either ErrorType QueueRec) forall q s. QueueStoreClass q s => s -> q -> SystemDate -> IO (Either ErrorType QueueRec) updateQueueTime STMQueueStore q st q q SystemDate t NewService sr :: ServiceRec sr@ServiceRec {ServiceId serviceId :: ServiceId $sel:serviceId:ServiceRec :: ServiceRec -> ServiceId serviceId} -> forall q s. QueueStoreClass q s => s -> ServiceRec -> IO (Either ErrorType ServiceId) getCreateService @q STMQueueStore q st ServiceRec sr IO (Either ErrorType ServiceId) -> (Either ErrorType ServiceId -> IO ()) -> IO () forall a b. IO a -> (a -> IO b) -> IO b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= \case Right ServiceId serviceId' | ServiceId serviceId ServiceId -> ServiceId -> Bool forall a. Eq a => a -> a -> Bool == ServiceId serviceId' -> () -> IO () forall a. a -> IO a forall (f :: * -> *) a. Applicative f => a -> f a pure () | Bool otherwise -> Text -> IO () forall (m :: * -> *). (?callStack::CallStack, MonadIO m) => Text -> m () logError (Text -> IO ()) -> Text -> IO () forall a b. (a -> b) -> a -> b $ Text errPfx Text -> Text -> Text forall a. Semigroup a => a -> a -> a <> Text "created with the wrong ID " Text -> Text -> Text forall a. Semigroup a => a -> a -> a <> ByteString -> Text decodeLatin1 (ServiceId -> ByteString forall a. StrEncoding a => a -> ByteString strEncode ServiceId serviceId') Left ErrorType e -> Text -> IO () forall (m :: * -> *). (?callStack::CallStack, MonadIO m) => Text -> m () logError (Text -> IO ()) -> Text -> IO () forall a b. (a -> b) -> a -> b $ Text errPfx Text -> Text -> Text forall a. Semigroup a => a -> a -> a <> ErrorType -> Text forall a. Show a => a -> Text tshow ErrorType e where errPfx :: Text errPfx = Text "STORE: getCreateService, stored service " Text -> Text -> Text forall a. Semigroup a => a -> a -> a <> ByteString -> Text decodeLatin1 (ServiceId -> ByteString forall a. StrEncoding a => a -> ByteString strEncode ServiceId serviceId) Text -> Text -> Text forall a. Semigroup a => a -> a -> a <> Text ", " QueueService ServiceId rId (ASP SParty p party) Maybe ServiceId serviceId -> ServiceId -> Text -> (q -> IO (Either ErrorType ())) -> IO () forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId rId Text "QueueService" ((q -> IO (Either ErrorType ())) -> IO ()) -> (q -> IO (Either ErrorType ())) -> IO () forall a b. (a -> b) -> a -> b $ \q q -> STMQueueStore q -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ()) forall q s (p :: Party). (QueueStoreClass q s, PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ()) forall (p :: Party). (PartyI p, ServiceParty p) => STMQueueStore q -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ()) setQueueService STMQueueStore q st q q SParty p party Maybe ServiceId serviceId printError :: String -> IO () printError :: FilePath -> IO () printError FilePath e = ByteString -> IO () B.putStrLn (ByteString -> IO ()) -> ByteString -> IO () forall a b. (a -> b) -> a -> b $ ByteString "Error parsing log: " ByteString -> ByteString -> ByteString forall a. Semigroup a => a -> a -> a <> FilePath -> ByteString B.pack FilePath e ByteString -> ByteString -> ByteString forall a. Semigroup a => a -> a -> a <> ByteString " - " ByteString -> ByteString -> ByteString forall a. Semigroup a => a -> a -> a <> ByteString s withQueue :: forall a. RecipientId -> T.Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue :: forall a. ServiceId -> Text -> (q -> IO (Either ErrorType a)) -> IO () withQueue ServiceId qId Text op q -> IO (Either ErrorType a) a = ExceptT ErrorType IO () -> IO (Either ErrorType ()) forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a) runExceptT ExceptT ErrorType IO () go IO (Either ErrorType ()) -> (Either ErrorType () -> IO ()) -> IO () forall a b. IO a -> (a -> IO b) -> IO b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= ServiceId -> Text -> Either ErrorType () -> IO () forall {m :: * -> *} {a} {a} {b}. (MonadIO m, StrEncoding a, Show a) => a -> Text -> Either a b -> m () qError ServiceId qId Text op where go :: ExceptT ErrorType IO () go = do q q <- IO (Either ErrorType q) -> ExceptT ErrorType IO q forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a ExceptT (IO (Either ErrorType q) -> ExceptT ErrorType IO q) -> IO (Either ErrorType q) -> ExceptT ErrorType IO q forall a b. (a -> b) -> a -> b $ STMQueueStore q -> (Bool -> ServiceId -> QueueRec -> IO q) -> SParty 'Recipient -> ServiceId -> IO (Either ErrorType q) forall q s (p :: Party). (QueueStoreClass q s, QueueParty p) => s -> (Bool -> ServiceId -> QueueRec -> IO q) -> SParty p -> ServiceId -> IO (Either ErrorType q) forall (p :: Party). QueueParty p => STMQueueStore q -> (Bool -> ServiceId -> QueueRec -> IO q) -> SParty p -> ServiceId -> IO (Either ErrorType q) getQueue_ STMQueueStore q st (\Bool _ -> ServiceId -> QueueRec -> IO q mkQ) SParty 'Recipient SRecipient ServiceId qId IO (Maybe QueueRec) -> ExceptT ErrorType IO (Maybe QueueRec) forall a. IO a -> ExceptT ErrorType IO a forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (TVar (Maybe QueueRec) -> IO (Maybe QueueRec) forall a. TVar a -> IO a readTVarIO (TVar (Maybe QueueRec) -> IO (Maybe QueueRec)) -> TVar (Maybe QueueRec) -> IO (Maybe QueueRec) forall a b. (a -> b) -> a -> b $ q -> TVar (Maybe QueueRec) forall q. StoreQueueClass q => q -> TVar (Maybe QueueRec) queueRec q q) ExceptT ErrorType IO (Maybe QueueRec) -> (Maybe QueueRec -> ExceptT ErrorType IO ()) -> ExceptT ErrorType IO () forall a b. ExceptT ErrorType IO a -> (a -> ExceptT ErrorType IO b) -> ExceptT ErrorType IO b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= \case Maybe QueueRec Nothing -> Text -> ExceptT ErrorType IO () forall (m :: * -> *). (?callStack::CallStack, MonadIO m) => Text -> m () logWarn (Text -> ExceptT ErrorType IO ()) -> Text -> ExceptT ErrorType IO () forall a b. (a -> b) -> a -> b $ ServiceId -> Text -> Text forall {a}. StrEncoding a => a -> Text -> Text logPfx ServiceId qId Text op Text -> Text -> Text forall a. Semigroup a => a -> a -> a <> Text "already deleted" Just QueueRec _ -> ExceptT ErrorType IO a -> ExceptT ErrorType IO () forall (f :: * -> *) a. Functor f => f a -> f () void (ExceptT ErrorType IO a -> ExceptT ErrorType IO ()) -> ExceptT ErrorType IO a -> ExceptT ErrorType IO () forall a b. (a -> b) -> a -> b $ IO (Either ErrorType a) -> ExceptT ErrorType IO a forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a ExceptT (IO (Either ErrorType a) -> ExceptT ErrorType IO a) -> IO (Either ErrorType a) -> ExceptT ErrorType IO a forall a b. (a -> b) -> a -> b $ q -> IO (Either ErrorType a) a q q qError :: a -> Text -> Either a b -> m () qError a qId Text op = \case Left a e -> Text -> m () forall (m :: * -> *). (?callStack::CallStack, MonadIO m) => Text -> m () logError (Text -> m ()) -> Text -> m () forall a b. (a -> b) -> a -> b $ a -> Text -> Text forall {a}. StrEncoding a => a -> Text -> Text logPfx a qId Text op Text -> Text -> Text forall a. Semigroup a => a -> a -> a <> a -> Text forall a. Show a => a -> Text tshow a e Right b _ -> () -> m () forall a. a -> m a forall (f :: * -> *) a. Applicative f => a -> f a pure () logPfx :: a -> Text -> Text logPfx a qId Text op = Text "STORE: " Text -> Text -> Text forall a. Semigroup a => a -> a -> a <> Text op Text -> Text -> Text forall a. Semigroup a => a -> a -> a <> Text ", stored queue " Text -> Text -> Text forall a. Semigroup a => a -> a -> a <> ByteString -> Text decodeLatin1 (a -> ByteString forall a. StrEncoding a => a -> ByteString strEncode a qId) Text -> Text -> Text forall a. Semigroup a => a -> a -> a <> Text ", "