{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
module Simplex.FileTransfer.Agent
( startXFTPWorkers,
startXFTPSndWorkers,
closeXFTPAgent,
toFSFilePath,
xftpReceiveFile',
xftpDeleteRcvFile',
xftpDeleteRcvFiles',
xftpSendFile',
xftpSendDescription',
deleteSndFileInternal,
deleteSndFilesInternal,
deleteSndFileRemote,
deleteSndFilesRemote,
)
where
import Control.Logger.Simple (logError)
import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.Trans.Except
import Data.Bifunctor (first)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Coerce (coerce)
import Data.Composition ((.:))
import Data.Either (partitionEithers, rights)
import Data.Int (Int64)
import Data.List (foldl', partition, sortOn)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe, mapMaybe)
import qualified Data.Set as S
import Data.Text (Text, pack)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
import Simplex.FileTransfer.Chunks (toKB)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), getChunkDigest, prepareChunkSizes, prepareChunkSpecs, singleChunkSize)
import Simplex.FileTransfer.Crypto
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..), SFileParty (..))
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..))
import qualified Simplex.FileTransfer.Transport as XFTP
import Simplex.FileTransfer.Types
import qualified Simplex.FileTransfer.Types as FT
import Simplex.FileTransfer.Util (removePath, uniqueCombine)
import Simplex.Messaging.Agent.Client
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store.AgentStore
import qualified Simplex.Messaging.Agent.Store.DB as DB
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs)
import qualified Simplex.Messaging.Crypto.File as CF
import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String (strDecode, strEncode)
import Simplex.Messaging.Protocol (ProtocolServer, ProtocolType (..), XFTPServer)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (allFinally, catchAll_, catchAllErrors, liftError, tshow, unlessM, whenM)
import System.FilePath (takeFileName, (</>))
import UnliftIO
import UnliftIO.Directory
import qualified UnliftIO.Exception as E
startXFTPWorkers :: AgentClient -> Maybe FilePath -> AM ()
startXFTPWorkers :: AgentClient -> Maybe String -> AM ()
startXFTPWorkers = Bool -> AgentClient -> Maybe String -> AM ()
startXFTPWorkers_ Bool
True
{-# INLINE startXFTPWorkers #-}
startXFTPSndWorkers :: AgentClient -> Maybe FilePath -> AM ()
startXFTPSndWorkers :: AgentClient -> Maybe String -> AM ()
startXFTPSndWorkers = Bool -> AgentClient -> Maybe String -> AM ()
startXFTPWorkers_ Bool
False
{-# INLINE startXFTPSndWorkers #-}
startXFTPWorkers_ :: Bool -> AgentClient -> Maybe FilePath -> AM ()
startXFTPWorkers_ :: Bool -> AgentClient -> Maybe String -> AM ()
startXFTPWorkers_ Bool
allWorkers AgentClient
c Maybe String
workDir = do
TVar (Maybe String)
wd <- (Env -> TVar (Maybe String))
-> ExceptT AgentErrorType (ReaderT Env IO) (TVar (Maybe String))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TVar (Maybe String))
-> ExceptT AgentErrorType (ReaderT Env IO) (TVar (Maybe String)))
-> (Env -> TVar (Maybe String))
-> ExceptT AgentErrorType (ReaderT Env IO) (TVar (Maybe String))
forall a b. (a -> b) -> a -> b
$ XFTPAgent -> TVar (Maybe String)
xftpWorkDir (XFTPAgent -> TVar (Maybe String))
-> (Env -> XFTPAgent) -> Env -> TVar (Maybe String)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> XFTPAgent
xftpAgent
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ()) -> STM () -> AM ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe String) -> Maybe String -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe String)
wd Maybe String
workDir
AgentConfig
cfg <- (Env -> AgentConfig)
-> ExceptT AgentErrorType (ReaderT Env IO) AgentConfig
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> AgentConfig
config
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
allWorkers (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentConfig -> AM ()
startRcvFiles AgentConfig
cfg
AgentConfig -> AM ()
startSndFiles AgentConfig
cfg
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
allWorkers (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentConfig -> AM ()
startDelFiles AgentConfig
cfg
where
startRcvFiles :: AgentConfig -> AM ()
startRcvFiles :: AgentConfig -> AM ()
startRcvFiles AgentConfig {NominalDiffTime
rcvFilesTTL :: NominalDiffTime
$sel:rcvFilesTTL:AgentConfig :: AgentConfig -> NominalDiffTime
rcvFilesTTL} = do
[XFTPServer]
pendingRcvServers <- AgentClient -> (Connection -> IO [XFTPServer]) -> AM [XFTPServer]
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c (Connection -> NominalDiffTime -> IO [XFTPServer]
`getPendingRcvFilesServers` NominalDiffTime
rcvFilesTTL)
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ())
-> ((XFTPServer -> ReaderT Env IO ()) -> ReaderT Env IO ())
-> (XFTPServer -> ReaderT Env IO ())
-> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [XFTPServer]
-> (XFTPServer -> ReaderT Env IO ()) -> ReaderT Env IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [XFTPServer]
pendingRcvServers ((XFTPServer -> ReaderT Env IO ()) -> AM ())
-> (XFTPServer -> ReaderT Env IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \XFTPServer
s -> AgentClient -> Maybe XFTPServer -> ReaderT Env IO ()
resumeXFTPRcvWork AgentClient
c (XFTPServer -> Maybe XFTPServer
forall a. a -> Maybe a
Just XFTPServer
s)
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> Maybe XFTPServer -> ReaderT Env IO ()
resumeXFTPRcvWork AgentClient
c Maybe XFTPServer
forall a. Maybe a
Nothing
startSndFiles :: AgentConfig -> AM ()
startSndFiles :: AgentConfig -> AM ()
startSndFiles AgentConfig {NominalDiffTime
sndFilesTTL :: NominalDiffTime
$sel:sndFilesTTL:AgentConfig :: AgentConfig -> NominalDiffTime
sndFilesTTL} = do
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> Maybe XFTPServer -> ReaderT Env IO ()
resumeXFTPSndWork AgentClient
c Maybe XFTPServer
forall a. Maybe a
Nothing
[XFTPServer]
pendingSndServers <- AgentClient -> (Connection -> IO [XFTPServer]) -> AM [XFTPServer]
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c (Connection -> NominalDiffTime -> IO [XFTPServer]
`getPendingSndFilesServers` NominalDiffTime
sndFilesTTL)
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ())
-> ((XFTPServer -> ReaderT Env IO ()) -> ReaderT Env IO ())
-> (XFTPServer -> ReaderT Env IO ())
-> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [XFTPServer]
-> (XFTPServer -> ReaderT Env IO ()) -> ReaderT Env IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [XFTPServer]
pendingSndServers ((XFTPServer -> ReaderT Env IO ()) -> AM ())
-> (XFTPServer -> ReaderT Env IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \XFTPServer
s -> AgentClient -> Maybe XFTPServer -> ReaderT Env IO ()
resumeXFTPSndWork AgentClient
c (XFTPServer -> Maybe XFTPServer
forall a. a -> Maybe a
Just XFTPServer
s)
startDelFiles :: AgentConfig -> AM ()
startDelFiles :: AgentConfig -> AM ()
startDelFiles AgentConfig {NominalDiffTime
$sel:rcvFilesTTL:AgentConfig :: AgentConfig -> NominalDiffTime
rcvFilesTTL :: NominalDiffTime
rcvFilesTTL} = do
[XFTPServer]
pendingDelServers <- AgentClient -> (Connection -> IO [XFTPServer]) -> AM [XFTPServer]
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c (Connection -> NominalDiffTime -> IO [XFTPServer]
`getPendingDelFilesServers` NominalDiffTime
rcvFilesTTL)
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ())
-> ((XFTPServer -> ReaderT Env IO ()) -> ReaderT Env IO ())
-> (XFTPServer -> ReaderT Env IO ())
-> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [XFTPServer]
-> (XFTPServer -> ReaderT Env IO ()) -> ReaderT Env IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [XFTPServer]
pendingDelServers ((XFTPServer -> ReaderT Env IO ()) -> AM ())
-> (XFTPServer -> ReaderT Env IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> XFTPServer -> ReaderT Env IO ()
resumeXFTPDelWork AgentClient
c
closeXFTPAgent :: XFTPAgent -> IO ()
closeXFTPAgent :: XFTPAgent -> IO ()
closeXFTPAgent XFTPAgent
a = do
TVar (Map (Maybe XFTPServer) Worker) -> IO ()
forall {m :: * -> *} {k}. MonadIO m => TVar (Map k Worker) -> m ()
stopWorkers (TVar (Map (Maybe XFTPServer) Worker) -> IO ())
-> TVar (Map (Maybe XFTPServer) Worker) -> IO ()
forall a b. (a -> b) -> a -> b
$ XFTPAgent -> TVar (Map (Maybe XFTPServer) Worker)
xftpRcvWorkers XFTPAgent
a
TVar (Map (Maybe XFTPServer) Worker) -> IO ()
forall {m :: * -> *} {k}. MonadIO m => TVar (Map k Worker) -> m ()
stopWorkers (TVar (Map (Maybe XFTPServer) Worker) -> IO ())
-> TVar (Map (Maybe XFTPServer) Worker) -> IO ()
forall a b. (a -> b) -> a -> b
$ XFTPAgent -> TVar (Map (Maybe XFTPServer) Worker)
xftpSndWorkers XFTPAgent
a
TVar (Map XFTPServer Worker) -> IO ()
forall {m :: * -> *} {k}. MonadIO m => TVar (Map k Worker) -> m ()
stopWorkers (TVar (Map XFTPServer Worker) -> IO ())
-> TVar (Map XFTPServer Worker) -> IO ()
forall a b. (a -> b) -> a -> b
$ XFTPAgent -> TVar (Map XFTPServer Worker)
xftpDelWorkers XFTPAgent
a
where
stopWorkers :: TVar (Map k Worker) -> m ()
stopWorkers TVar (Map k Worker)
workers = STM (Map k Worker) -> m (Map k Worker)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar (Map k Worker) -> Map k Worker -> STM (Map k Worker)
forall a. TVar a -> a -> STM a
swapTVar TVar (Map k Worker)
workers Map k Worker
forall k a. Map k a
M.empty) m (Map k Worker) -> (Map k Worker -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Worker -> m ()) -> Map k Worker -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Worker -> IO ()) -> Worker -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Worker -> IO ()
cancelWorker)
xftpReceiveFile' :: AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> Bool -> AM RcvFileId
xftpReceiveFile' :: AgentClient
-> Int64
-> ValidFileDescription 'FRecipient
-> Maybe CryptoFileArgs
-> Bool
-> AM AEntityId
xftpReceiveFile' AgentClient
c Int64
userId (ValidFileDescription fd :: FileDescription 'FRecipient
fd@FileDescription {[FileChunk]
chunks :: [FileChunk]
$sel:chunks:FileDescription :: forall (p :: FileParty). FileDescription p -> [FileChunk]
chunks, Maybe RedirectFileInfo
redirect :: Maybe RedirectFileInfo
$sel:redirect:FileDescription :: forall (p :: FileParty).
FileDescription p -> Maybe RedirectFileInfo
redirect}) Maybe CryptoFileArgs
cfArgs Bool
approvedRelays = do
TVar ChaChaDRG
g <- (Env -> TVar ChaChaDRG)
-> ExceptT AgentErrorType (ReaderT Env IO) (TVar ChaChaDRG)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> TVar ChaChaDRG
random
String
prefixPath <- ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String)
-> ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO String
getPrefixPath String
"rcv.xftp"
String -> AM ()
forall (m :: * -> *). MonadIO m => String -> m ()
createDirectory String
prefixPath
let relPrefixPath :: String
relPrefixPath = String -> String
takeFileName String
prefixPath
relTmpPath :: String
relTmpPath = String
relPrefixPath String -> String -> String
</> String
"xftp.encrypted"
relSavePath :: String
relSavePath = String
relPrefixPath String -> String -> String
</> String
"xftp.decrypted"
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
createDirectory (String -> ReaderT Env IO ())
-> ReaderT Env IO String -> ReaderT Env IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< String -> ReaderT Env IO String
toFSFilePath String
relTmpPath
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO ()
createEmptyFile (String -> ReaderT Env IO ())
-> ReaderT Env IO String -> ReaderT Env IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< String -> ReaderT Env IO String
toFSFilePath String
relSavePath
let saveFile :: CryptoFile
saveFile = String -> Maybe CryptoFileArgs -> CryptoFile
CryptoFile String
relSavePath Maybe CryptoFileArgs
cfArgs
AEntityId
fId <- case Maybe RedirectFileInfo
redirect of
Maybe RedirectFileInfo
Nothing -> AgentClient
-> (Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId
forall a.
AgentClient -> (Connection -> IO (Either StoreError a)) -> AM a
withStore AgentClient
c ((Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId)
-> (Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> TVar ChaChaDRG
-> Int64
-> FileDescription 'FRecipient
-> String
-> String
-> CryptoFile
-> Bool
-> IO (Either StoreError AEntityId)
createRcvFile Connection
db TVar ChaChaDRG
g Int64
userId FileDescription 'FRecipient
fd String
relPrefixPath String
relTmpPath CryptoFile
saveFile Bool
approvedRelays
Just RedirectFileInfo
_ -> do
let relTmpPathRedirect :: String
relTmpPathRedirect = String
relPrefixPath String -> String -> String
</> String
"xftp.redirect-encrypted"
relSavePathRedirect :: String
relSavePathRedirect = String
relPrefixPath String -> String -> String
</> String
"xftp.redirect-decrypted"
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
createDirectory (String -> ReaderT Env IO ())
-> ReaderT Env IO String -> ReaderT Env IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< String -> ReaderT Env IO String
toFSFilePath String
relTmpPathRedirect
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO ()
createEmptyFile (String -> ReaderT Env IO ())
-> ReaderT Env IO String -> ReaderT Env IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< String -> ReaderT Env IO String
toFSFilePath String
relSavePathRedirect
CryptoFileArgs
cfArgsRedirect <- STM CryptoFileArgs
-> ExceptT AgentErrorType (ReaderT Env IO) CryptoFileArgs
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM CryptoFileArgs
-> ExceptT AgentErrorType (ReaderT Env IO) CryptoFileArgs)
-> STM CryptoFileArgs
-> ExceptT AgentErrorType (ReaderT Env IO) CryptoFileArgs
forall a b. (a -> b) -> a -> b
$ TVar ChaChaDRG -> STM CryptoFileArgs
CF.randomArgs TVar ChaChaDRG
g
let saveFileRedirect :: CryptoFile
saveFileRedirect = String -> Maybe CryptoFileArgs -> CryptoFile
CryptoFile String
relSavePathRedirect (Maybe CryptoFileArgs -> CryptoFile)
-> Maybe CryptoFileArgs -> CryptoFile
forall a b. (a -> b) -> a -> b
$ CryptoFileArgs -> Maybe CryptoFileArgs
forall a. a -> Maybe a
Just CryptoFileArgs
cfArgsRedirect
AgentClient
-> (Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId
forall a.
AgentClient -> (Connection -> IO (Either StoreError a)) -> AM a
withStore AgentClient
c ((Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId)
-> (Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> TVar ChaChaDRG
-> Int64
-> FileDescription 'FRecipient
-> String
-> String
-> CryptoFile
-> String
-> CryptoFile
-> Bool
-> IO (Either StoreError AEntityId)
createRcvFileRedirect Connection
db TVar ChaChaDRG
g Int64
userId FileDescription 'FRecipient
fd String
relPrefixPath String
relTmpPathRedirect CryptoFile
saveFileRedirect String
relTmpPath CryptoFile
saveFile Bool
approvedRelays
[FileChunk] -> (FileChunk -> AM ()) -> AM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [FileChunk]
chunks (AgentClient -> FileChunk -> AM ()
downloadChunk AgentClient
c)
AEntityId -> AM AEntityId
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure AEntityId
fId
downloadChunk :: AgentClient -> FileChunk -> AM ()
downloadChunk :: AgentClient -> FileChunk -> AM ()
downloadChunk AgentClient
c FileChunk {$sel:replicas:FileChunk :: FileChunk -> [FileChunkReplica]
replicas = (FileChunkReplica {XFTPServer
server :: XFTPServer
$sel:server:FileChunkReplica :: FileChunkReplica -> XFTPServer
server} : [FileChunkReplica]
_)} = do
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ())
-> (AM' Worker -> ReaderT Env IO ()) -> AM' Worker -> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AM' Worker -> ReaderT Env IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (AM' Worker -> AM ()) -> AM' Worker -> AM ()
forall a b. (a -> b) -> a -> b
$ Bool -> AgentClient -> Maybe XFTPServer -> AM' Worker
getXFTPRcvWorker Bool
True AgentClient
c (XFTPServer -> Maybe XFTPServer
forall a. a -> Maybe a
Just XFTPServer
server)
downloadChunk AgentClient
_ FileChunk
_ = AgentErrorType -> AM ()
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType -> AM ()) -> AgentErrorType -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> AgentErrorType
INTERNAL String
"no replicas"
getPrefixPath :: String -> AM' FilePath
getPrefixPath :: String -> ReaderT Env IO String
getPrefixPath String
suffix = do
String
workPath <- ReaderT Env IO String
getXFTPWorkPath
UTCTime
ts <- IO UTCTime -> ReaderT Env IO UTCTime
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let isoTime :: String
isoTime = TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%Y%m%d_%H%M%S_%6q" UTCTime
ts
String -> String -> ReaderT Env IO String
forall (m :: * -> *). MonadIO m => String -> String -> m String
uniqueCombine String
workPath (String
isoTime String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"_" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
suffix)
toFSFilePath :: FilePath -> AM' FilePath
toFSFilePath :: String -> ReaderT Env IO String
toFSFilePath String
f = (String -> String -> String
</> String
f) (String -> String)
-> ReaderT Env IO String -> ReaderT Env IO String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ReaderT Env IO String
getXFTPWorkPath
createEmptyFile :: FilePath -> AM' ()
createEmptyFile :: String -> ReaderT Env IO ()
createEmptyFile String
fPath = IO () -> ReaderT Env IO ()
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ReaderT Env IO ()) -> IO () -> ReaderT Env IO ()
forall a b. (a -> b) -> a -> b
$ String -> AEntityId -> IO ()
B.writeFile String
fPath AEntityId
""
resumeXFTPRcvWork :: AgentClient -> Maybe XFTPServer -> AM' ()
resumeXFTPRcvWork :: AgentClient -> Maybe XFTPServer -> ReaderT Env IO ()
resumeXFTPRcvWork = AM' Worker -> ReaderT Env IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (AM' Worker -> ReaderT Env IO ())
-> (AgentClient -> Maybe XFTPServer -> AM' Worker)
-> AgentClient
-> Maybe XFTPServer
-> ReaderT Env IO ()
forall c d a b. (c -> d) -> (a -> b -> c) -> a -> b -> d
.: Bool -> AgentClient -> Maybe XFTPServer -> AM' Worker
getXFTPRcvWorker Bool
False
getXFTPRcvWorker :: Bool -> AgentClient -> Maybe XFTPServer -> AM' Worker
getXFTPRcvWorker :: Bool -> AgentClient -> Maybe XFTPServer -> AM' Worker
getXFTPRcvWorker Bool
hasWork AgentClient
c Maybe XFTPServer
server = do
TVar (Map (Maybe XFTPServer) Worker)
ws <- (Env -> TVar (Map (Maybe XFTPServer) Worker))
-> ReaderT Env IO (TVar (Map (Maybe XFTPServer) Worker))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TVar (Map (Maybe XFTPServer) Worker))
-> ReaderT Env IO (TVar (Map (Maybe XFTPServer) Worker)))
-> (Env -> TVar (Map (Maybe XFTPServer) Worker))
-> ReaderT Env IO (TVar (Map (Maybe XFTPServer) Worker))
forall a b. (a -> b) -> a -> b
$ XFTPAgent -> TVar (Map (Maybe XFTPServer) Worker)
xftpRcvWorkers (XFTPAgent -> TVar (Map (Maybe XFTPServer) Worker))
-> (Env -> XFTPAgent)
-> Env
-> TVar (Map (Maybe XFTPServer) Worker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> XFTPAgent
xftpAgent
String
-> Bool
-> AgentClient
-> Maybe XFTPServer
-> TVar (Map (Maybe XFTPServer) Worker)
-> (Worker -> AM ())
-> AM' Worker
forall k e (m :: * -> *).
(Ord k, Show k, AnyError e, MonadUnliftIO m) =>
String
-> Bool
-> AgentClient
-> k
-> TMap k Worker
-> (Worker -> ExceptT e m ())
-> m Worker
getAgentWorker String
"xftp_rcv" Bool
hasWork AgentClient
c Maybe XFTPServer
server TVar (Map (Maybe XFTPServer) Worker)
ws ((Worker -> AM ()) -> AM' Worker)
-> (Worker -> AM ()) -> AM' Worker
forall a b. (a -> b) -> a -> b
$
(Worker -> AM ())
-> (XFTPServer -> Worker -> AM ())
-> Maybe XFTPServer
-> Worker
-> AM ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (AgentClient -> Worker -> AM ()
runXFTPRcvLocalWorker AgentClient
c) (AgentClient -> XFTPServer -> Worker -> AM ()
runXFTPRcvWorker AgentClient
c) Maybe XFTPServer
server
runXFTPRcvWorker :: AgentClient -> XFTPServer -> Worker -> AM ()
runXFTPRcvWorker :: AgentClient -> XFTPServer -> Worker -> AM ()
runXFTPRcvWorker AgentClient
c XFTPServer
srv Worker {TMVar ()
doWork :: TMVar ()
$sel:doWork:Worker :: Worker -> TMVar ()
doWork} = do
AgentConfig
cfg <- (Env -> AgentConfig)
-> ExceptT AgentErrorType (ReaderT Env IO) AgentConfig
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> AgentConfig
config
AM () -> AM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ do
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => TMVar () -> m ()
waitForWork TMVar ()
doWork
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
assertAgentForeground AgentClient
c
AgentConfig -> AM ()
runXFTPOperation AgentConfig
cfg
where
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation AgentConfig {NominalDiffTime
$sel:rcvFilesTTL:AgentConfig :: AgentConfig -> NominalDiffTime
rcvFilesTTL :: NominalDiffTime
rcvFilesTTL, $sel:reconnectInterval:AgentConfig :: AgentConfig -> RetryInterval
reconnectInterval = RetryInterval
ri, Int
xftpConsecutiveRetries :: Int
$sel:xftpConsecutiveRetries:AgentConfig :: AgentConfig -> Int
xftpConsecutiveRetries} =
AgentClient
-> TMVar ()
-> (Connection
-> IO
(Either StoreError (Maybe (RcvFileChunk, Bool, Maybe AEntityId))))
-> ((RcvFileChunk, Bool, Maybe AEntityId) -> AM ())
-> AM ()
forall a.
AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe a)))
-> (a -> AM ())
-> AM ()
withWork AgentClient
c TMVar ()
doWork (\Connection
db -> Connection
-> XFTPServer
-> NominalDiffTime
-> IO
(Either StoreError (Maybe (RcvFileChunk, Bool, Maybe AEntityId)))
getNextRcvChunkToDownload Connection
db XFTPServer
srv NominalDiffTime
rcvFilesTTL) (((RcvFileChunk, Bool, Maybe AEntityId) -> AM ()) -> AM ())
-> ((RcvFileChunk, Bool, Maybe AEntityId) -> AM ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \case
(RcvFileChunk {Int64
rcvFileId :: Int64
$sel:rcvFileId:RcvFileChunk :: RcvFileChunk -> Int64
rcvFileId, AEntityId
rcvFileEntityId :: AEntityId
$sel:rcvFileEntityId:RcvFileChunk :: RcvFileChunk -> AEntityId
rcvFileEntityId, String
fileTmpPath :: String
$sel:fileTmpPath:RcvFileChunk :: RcvFileChunk -> String
fileTmpPath, $sel:replicas:RcvFileChunk :: RcvFileChunk -> [RcvFileChunkReplica]
replicas = []}, Bool
_, Maybe AEntityId
redirectEntityId_) ->
AgentClient
-> Int64
-> AEntityId
-> Maybe AEntityId
-> Maybe String
-> AgentErrorType
-> AM ()
rcvWorkerInternalError AgentClient
c Int64
rcvFileId AEntityId
rcvFileEntityId Maybe AEntityId
redirectEntityId_ (String -> Maybe String
forall a. a -> Maybe a
Just String
fileTmpPath) (String -> AgentErrorType
INTERNAL String
"chunk has no replicas")
(fc :: RcvFileChunk
fc@RcvFileChunk {Int64
userId :: Int64
$sel:userId:RcvFileChunk :: RcvFileChunk -> Int64
userId, Int64
$sel:rcvFileId:RcvFileChunk :: RcvFileChunk -> Int64
rcvFileId :: Int64
rcvFileId, AEntityId
$sel:rcvFileEntityId:RcvFileChunk :: RcvFileChunk -> AEntityId
rcvFileEntityId :: AEntityId
rcvFileEntityId, FileDigest
digest :: FileDigest
$sel:digest:RcvFileChunk :: RcvFileChunk -> FileDigest
digest, String
$sel:fileTmpPath:RcvFileChunk :: RcvFileChunk -> String
fileTmpPath :: String
fileTmpPath, $sel:replicas:RcvFileChunk :: RcvFileChunk -> [RcvFileChunkReplica]
replicas = replica :: RcvFileChunkReplica
replica@RcvFileChunkReplica {Int64
rcvChunkReplicaId :: Int64
$sel:rcvChunkReplicaId:RcvFileChunkReplica :: RcvFileChunkReplica -> Int64
rcvChunkReplicaId, XFTPServer
server :: XFTPServer
$sel:server:RcvFileChunkReplica :: RcvFileChunkReplica -> XFTPServer
server, Maybe Int64
delay :: Maybe Int64
$sel:delay:RcvFileChunkReplica :: RcvFileChunkReplica -> Maybe Int64
delay} : [RcvFileChunkReplica]
_}, Bool
approvedRelays, Maybe AEntityId
redirectEntityId_) -> do
let ri' :: RetryInterval
ri' = RetryInterval
-> (Int64 -> RetryInterval) -> Maybe Int64 -> RetryInterval
forall b a. b -> (a -> b) -> Maybe a -> b
maybe RetryInterval
ri (\Int64
d -> RetryInterval
ri {initialInterval = d, increaseAfter = 0}) Maybe Int64
delay
Int -> RetryInterval -> (Int64 -> AM () -> AM ()) -> AM ()
forall (m :: * -> *).
MonadIO m =>
Int -> RetryInterval -> (Int64 -> m () -> m ()) -> m ()
withRetryIntervalLimit Int
xftpConsecutiveRetries RetryInterval
ri' ((Int64 -> AM () -> AM ()) -> AM ())
-> (Int64 -> AM () -> AM ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Int64
delay' AM ()
loop -> do
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitWhileSuspended AgentClient
c
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitForUserNetwork AgentClient
c
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ()) -> STM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> XFTPServer
-> (AgentXFTPServerStats -> TVar Int)
-> STM ()
incXFTPServerStat AgentClient
c Int64
userId XFTPServer
srv AgentXFTPServerStats -> TVar Int
downloadAttempts
RcvFileChunk -> RcvFileChunkReplica -> Bool -> AM ()
downloadFileChunk RcvFileChunk
fc RcvFileChunkReplica
replica Bool
approvedRelays
AM () -> (AgentErrorType -> AM ()) -> AM ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
`catchAllErrors` \AgentErrorType
e -> Text -> AM () -> AM () -> AgentErrorType -> AM ()
forall a. Text -> AM a -> AM a -> AgentErrorType -> AM a
retryOnError Text
"XFTP rcv worker" (AM () -> AgentErrorType -> Int64 -> AM ()
forall {b}.
ExceptT AgentErrorType (ReaderT Env IO) b
-> AgentErrorType
-> Int64
-> ExceptT AgentErrorType (ReaderT Env IO) b
retryLoop AM ()
loop AgentErrorType
e Int64
delay') (AgentErrorType -> AM ()
retryDone AgentErrorType
e) AgentErrorType
e
where
retryLoop :: ExceptT AgentErrorType (ReaderT Env IO) b
-> AgentErrorType
-> Int64
-> ExceptT AgentErrorType (ReaderT Env IO) b
retryLoop ExceptT AgentErrorType (ReaderT Env IO) b
loop AgentErrorType
e Int64
replicaDelay = do
(AM () -> (AgentErrorType -> AM ()) -> AM ())
-> (AgentErrorType -> AM ()) -> AM () -> AM ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip AM () -> (AgentErrorType -> AM ()) -> AM ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
catchAllErrors (\AgentErrorType
_ -> () -> AM ()
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ do
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (AgentErrorType -> Bool
serverHostError AgentErrorType
e) (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> AEntityId -> AEvent 'AERcvFile -> AM ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c (AEntityId -> Maybe AEntityId -> AEntityId
forall a. a -> Maybe a -> a
fromMaybe AEntityId
rcvFileEntityId Maybe AEntityId
redirectEntityId_) (AgentErrorType -> AEvent 'AERcvFile
RFWARN AgentErrorType
e)
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> Int64 -> XFTPServer -> FileDigest -> IO ()
closeXFTPServerClient AgentClient
c Int64
userId XFTPServer
server FileDigest
digest
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> Int64 -> IO ()
updateRcvChunkReplicaDelay Connection
db Int64
rcvChunkReplicaId Int64
replicaDelay
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
assertAgentForeground AgentClient
c
ExceptT AgentErrorType (ReaderT Env IO) b
loop
retryDone :: AgentErrorType -> AM ()
retryDone AgentErrorType
e = do
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ())
-> ((AgentXFTPServerStats -> TVar Int) -> STM ())
-> (AgentXFTPServerStats -> TVar Int)
-> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AgentClient
-> Int64
-> XFTPServer
-> (AgentXFTPServerStats -> TVar Int)
-> STM ()
incXFTPServerStat AgentClient
c Int64
userId XFTPServer
srv ((AgentXFTPServerStats -> TVar Int) -> AM ())
-> (AgentXFTPServerStats -> TVar Int) -> AM ()
forall a b. (a -> b) -> a -> b
$ case AgentErrorType
e of
XFTP String
_ XFTPErrorType
XFTP.AUTH -> AgentXFTPServerStats -> TVar Int
downloadAuthErrs
AgentErrorType
_ -> AgentXFTPServerStats -> TVar Int
downloadErrs
AgentClient
-> Int64
-> AEntityId
-> Maybe AEntityId
-> Maybe String
-> AgentErrorType
-> AM ()
rcvWorkerInternalError AgentClient
c Int64
rcvFileId AEntityId
rcvFileEntityId Maybe AEntityId
redirectEntityId_ (String -> Maybe String
forall a. a -> Maybe a
Just String
fileTmpPath) AgentErrorType
e
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> Bool -> AM ()
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> Bool -> AM ()
downloadFileChunk RcvFileChunk {Int64
$sel:userId:RcvFileChunk :: RcvFileChunk -> Int64
userId :: Int64
userId, Int64
$sel:rcvFileId:RcvFileChunk :: RcvFileChunk -> Int64
rcvFileId :: Int64
rcvFileId, AEntityId
$sel:rcvFileEntityId:RcvFileChunk :: RcvFileChunk -> AEntityId
rcvFileEntityId :: AEntityId
rcvFileEntityId, Int64
rcvChunkId :: Int64
$sel:rcvChunkId:RcvFileChunk :: RcvFileChunk -> Int64
rcvChunkId, Int
chunkNo :: Int
$sel:chunkNo:RcvFileChunk :: RcvFileChunk -> Int
chunkNo, FileSize Word32
chunkSize :: FileSize Word32
$sel:chunkSize:RcvFileChunk :: RcvFileChunk -> FileSize Word32
chunkSize, FileDigest
$sel:digest:RcvFileChunk :: RcvFileChunk -> FileDigest
digest :: FileDigest
digest, String
$sel:fileTmpPath:RcvFileChunk :: RcvFileChunk -> String
fileTmpPath :: String
fileTmpPath} RcvFileChunkReplica
replica Bool
approvedRelays = do
ExceptT AgentErrorType (ReaderT Env IO) Bool -> AM () -> AM ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
unlessM ((Bool
approvedRelays Bool -> Bool -> Bool
||) (Bool -> Bool)
-> ExceptT AgentErrorType (ReaderT Env IO) Bool
-> ExceptT AgentErrorType (ReaderT Env IO) Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ExceptT AgentErrorType (ReaderT Env IO) Bool
ipAddressProtected') (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AM ()
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType -> AM ()) -> AgentErrorType -> AM ()
forall a b. (a -> b) -> a -> b
$ FileErrorType -> AgentErrorType
FILE FileErrorType
NOT_APPROVED
String
fsFileTmpPath <- ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String)
-> ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO String
toFSFilePath String
fileTmpPath
String
chunkPath <- String -> String -> ExceptT AgentErrorType (ReaderT Env IO) String
forall (m :: * -> *). MonadIO m => String -> String -> m String
uniqueCombine String
fsFileTmpPath (String -> ExceptT AgentErrorType (ReaderT Env IO) String)
-> String -> ExceptT AgentErrorType (ReaderT Env IO) String
forall a b. (a -> b) -> a -> b
$ Int -> String
forall a. Show a => a -> String
show Int
chunkNo
let chSize :: Word32
chSize = FileSize Word32 -> Word32
forall a. FileSize a -> a
unFileSize FileSize Word32
chunkSize
chunkSpec :: XFTPRcvChunkSpec
chunkSpec = String -> Word32 -> AEntityId -> XFTPRcvChunkSpec
XFTPRcvChunkSpec String
chunkPath Word32
chSize (FileDigest -> AEntityId
unFileDigest FileDigest
digest)
relChunkPath :: String
relChunkPath = String
fileTmpPath String -> String -> String
</> String -> String
takeFileName String
chunkPath
AgentClient
-> Int64
-> FileDigest
-> RcvFileChunkReplica
-> XFTPRcvChunkSpec
-> AM ()
agentXFTPDownloadChunk AgentClient
c Int64
userId FileDigest
digest RcvFileChunkReplica
replica XFTPRcvChunkSpec
chunkSpec
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitUntilForeground AgentClient
c
(AEntityId
entityId, Bool
complete, AEvent 'AERcvFile
progress) <- AgentClient
-> (Connection
-> IO (Either StoreError (AEntityId, Bool, AEvent 'AERcvFile)))
-> AM (AEntityId, Bool, AEvent 'AERcvFile)
forall a.
AgentClient -> (Connection -> IO (Either StoreError a)) -> AM a
withStore AgentClient
c ((Connection
-> IO (Either StoreError (AEntityId, Bool, AEvent 'AERcvFile)))
-> AM (AEntityId, Bool, AEvent 'AERcvFile))
-> (Connection
-> IO (Either StoreError (AEntityId, Bool, AEvent 'AERcvFile)))
-> AM (AEntityId, Bool, AEvent 'AERcvFile)
forall a b. (a -> b) -> a -> b
$ \Connection
db -> ExceptT StoreError IO (AEntityId, Bool, AEvent 'AERcvFile)
-> IO (Either StoreError (AEntityId, Bool, AEvent 'AERcvFile))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT StoreError IO (AEntityId, Bool, AEvent 'AERcvFile)
-> IO (Either StoreError (AEntityId, Bool, AEvent 'AERcvFile)))
-> ExceptT StoreError IO (AEntityId, Bool, AEvent 'AERcvFile)
-> IO (Either StoreError (AEntityId, Bool, AEvent 'AERcvFile))
forall a b. (a -> b) -> a -> b
$ do
IO () -> ExceptT StoreError IO ()
forall a. IO a -> ExceptT StoreError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT StoreError IO ())
-> IO () -> ExceptT StoreError IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> Int64 -> IO ()
lockRcvFileForUpdate Connection
db Int64
rcvFileId
IO () -> ExceptT StoreError IO ()
forall a. IO a -> ExceptT StoreError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT StoreError IO ())
-> IO () -> ExceptT StoreError IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> Int64 -> Int64 -> String -> IO ()
updateRcvFileChunkReceived Connection
db (RcvFileChunkReplica -> Int64
rcvChunkReplicaId RcvFileChunkReplica
replica) Int64
rcvChunkId String
relChunkPath
RcvFile {$sel:size:RcvFile :: RcvFile -> FileSize Int64
size = FileSize Int64
currentSize, [RcvFileChunk]
chunks :: [RcvFileChunk]
$sel:chunks:RcvFile :: RcvFile -> [RcvFileChunk]
chunks, Maybe RcvFileRedirect
redirect :: Maybe RcvFileRedirect
$sel:redirect:RcvFile :: RcvFile -> Maybe RcvFileRedirect
redirect} <- IO (Either StoreError RcvFile) -> ExceptT StoreError IO RcvFile
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO (Either StoreError RcvFile) -> ExceptT StoreError IO RcvFile)
-> IO (Either StoreError RcvFile) -> ExceptT StoreError IO RcvFile
forall a b. (a -> b) -> a -> b
$ Connection -> Int64 -> IO (Either StoreError RcvFile)
getRcvFile Connection
db Int64
rcvFileId
let rcvd :: Int64
rcvd = [RcvFileChunk] -> Int64
receivedSize [RcvFileChunk]
chunks
complete :: Bool
complete = (RcvFileChunk -> Bool) -> [RcvFileChunk] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all RcvFileChunk -> Bool
chunkReceived [RcvFileChunk]
chunks
(AEntityId
entityId, Int64
total) = case Maybe RcvFileRedirect
redirect of
Maybe RcvFileRedirect
Nothing -> (AEntityId
rcvFileEntityId, Int64
currentSize)
Just RcvFileRedirect {$sel:redirectFileInfo:RcvFileRedirect :: RcvFileRedirect -> RedirectFileInfo
redirectFileInfo = RedirectFileInfo {$sel:size:RedirectFileInfo :: RedirectFileInfo -> FileSize Int64
size = FileSize Int64
finalSize}, AEntityId
redirectEntityId :: AEntityId
$sel:redirectEntityId:RcvFileRedirect :: RcvFileRedirect -> AEntityId
redirectEntityId} -> (AEntityId
redirectEntityId, Int64
finalSize)
IO () -> ExceptT StoreError IO ()
forall a. IO a -> ExceptT StoreError IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT StoreError IO ())
-> (IO () -> IO ()) -> IO () -> ExceptT StoreError IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
complete (IO () -> ExceptT StoreError IO ())
-> IO () -> ExceptT StoreError IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> Int64 -> RcvFileStatus -> IO ()
updateRcvFileStatus Connection
db Int64
rcvFileId RcvFileStatus
RFSReceived
(AEntityId, Bool, AEvent 'AERcvFile)
-> ExceptT StoreError IO (AEntityId, Bool, AEvent 'AERcvFile)
forall a. a -> ExceptT StoreError IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AEntityId
entityId, Bool
complete, Int64 -> Int64 -> AEvent 'AERcvFile
RFPROG Int64
rcvd Int64
total)
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ()) -> STM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> XFTPServer
-> (AgentXFTPServerStats -> TVar Int)
-> STM ()
incXFTPServerStat AgentClient
c Int64
userId XFTPServer
srv AgentXFTPServerStats -> TVar Int
downloads
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ()) -> STM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> XFTPServer
-> (AgentXFTPServerStats -> TVar Int64)
-> Int64
-> STM ()
incXFTPServerSizeStat AgentClient
c Int64
userId XFTPServer
srv AgentXFTPServerStats -> TVar Int64
downloadsSize (Word32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int64) -> Word32 -> Int64
forall a b. (a -> b) -> a -> b
$ Word32 -> Word32
forall a. Integral a => a -> a
toKB Word32
chSize)
AgentClient -> AEntityId -> AEvent 'AERcvFile -> AM ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c AEntityId
entityId AEvent 'AERcvFile
progress
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
complete (AM () -> AM ()) -> (AM' Worker -> AM ()) -> AM' Worker -> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ())
-> (AM' Worker -> ReaderT Env IO ()) -> AM' Worker -> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AM' Worker -> ReaderT Env IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (AM' Worker -> AM ()) -> AM' Worker -> AM ()
forall a b. (a -> b) -> a -> b
$
Bool -> AgentClient -> Maybe XFTPServer -> AM' Worker
getXFTPRcvWorker Bool
True AgentClient
c Maybe XFTPServer
forall a. Maybe a
Nothing
where
ipAddressProtected' :: AM Bool
ipAddressProtected' :: ExceptT AgentErrorType (ReaderT Env IO) Bool
ipAddressProtected' = do
NetworkConfig
cfg <- IO NetworkConfig
-> ExceptT AgentErrorType (ReaderT Env IO) NetworkConfig
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO NetworkConfig
-> ExceptT AgentErrorType (ReaderT Env IO) NetworkConfig)
-> IO NetworkConfig
-> ExceptT AgentErrorType (ReaderT Env IO) NetworkConfig
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO NetworkConfig
getFastNetworkConfig AgentClient
c
Bool -> ExceptT AgentErrorType (ReaderT Env IO) Bool
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> ExceptT AgentErrorType (ReaderT Env IO) Bool)
-> Bool -> ExceptT AgentErrorType (ReaderT Env IO) Bool
forall a b. (a -> b) -> a -> b
$ NetworkConfig -> XFTPServer -> Bool
forall (p :: ProtocolType).
NetworkConfig -> ProtocolServer p -> Bool
ipAddressProtected NetworkConfig
cfg XFTPServer
srv
receivedSize :: [RcvFileChunk] -> Int64
receivedSize :: [RcvFileChunk] -> Int64
receivedSize = (Int64 -> RcvFileChunk -> Int64)
-> Int64 -> [RcvFileChunk] -> Int64
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\Int64
sz RcvFileChunk
ch -> Int64
sz Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ RcvFileChunk -> Int64
forall {b}. Num b => RcvFileChunk -> b
receivedChunkSize RcvFileChunk
ch) Int64
0
receivedChunkSize :: RcvFileChunk -> b
receivedChunkSize ch :: RcvFileChunk
ch@RcvFileChunk {$sel:chunkSize:RcvFileChunk :: RcvFileChunk -> FileSize Word32
chunkSize = FileSize Word32
s}
| RcvFileChunk -> Bool
chunkReceived RcvFileChunk
ch = Word32 -> b
forall a b. (Integral a, Num b) => a -> b
fromIntegral (FileSize Word32 -> Word32
forall a. FileSize a -> a
unFileSize FileSize Word32
s)
| Bool
otherwise = b
0
chunkReceived :: RcvFileChunk -> Bool
chunkReceived RcvFileChunk {[RcvFileChunkReplica]
$sel:replicas:RcvFileChunk :: RcvFileChunk -> [RcvFileChunkReplica]
replicas :: [RcvFileChunkReplica]
replicas} = (RcvFileChunkReplica -> Bool) -> [RcvFileChunkReplica] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any RcvFileChunkReplica -> Bool
received [RcvFileChunkReplica]
replicas
withRetryIntervalLimit :: forall m. MonadIO m => Int -> RetryInterval -> (Int64 -> m () -> m ()) -> m ()
withRetryIntervalLimit :: forall (m :: * -> *).
MonadIO m =>
Int -> RetryInterval -> (Int64 -> m () -> m ()) -> m ()
withRetryIntervalLimit Int
maxN RetryInterval
ri Int64 -> m () -> m ()
action =
RetryInterval -> (Int -> Int64 -> m () -> m ()) -> m ()
forall (m :: * -> *) a.
MonadIO m =>
RetryInterval -> (Int -> Int64 -> m a -> m a) -> m a
withRetryIntervalCount RetryInterval
ri ((Int -> Int64 -> m () -> m ()) -> m ())
-> (Int -> Int64 -> m () -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Int
n Int64
delay m ()
loop ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
maxN) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Int64 -> m () -> m ()
action Int64
delay m ()
loop
retryOnError :: Text -> AM a -> AM a -> AgentErrorType -> AM a
retryOnError :: forall a. Text -> AM a -> AM a -> AgentErrorType -> AM a
retryOnError Text
name AM a
loop AM a
done AgentErrorType
e = do
Text -> AM ()
forall (m :: * -> *).
(?callStack::CallStack, MonadIO m) =>
Text -> m ()
logError (Text -> AM ()) -> Text -> AM ()
forall a b. (a -> b) -> a -> b
$ Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> AgentErrorType -> Text
forall a. Show a => a -> Text
tshow AgentErrorType
e
if AgentErrorType -> Bool
temporaryOrHostError AgentErrorType
e
then AM a
loop
else AM a
done
rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe RcvFileId -> Maybe FilePath -> AgentErrorType -> AM ()
rcvWorkerInternalError :: AgentClient
-> Int64
-> AEntityId
-> Maybe AEntityId
-> Maybe String
-> AgentErrorType
-> AM ()
rcvWorkerInternalError AgentClient
c Int64
rcvFileId AEntityId
rcvFileEntityId Maybe AEntityId
redirectEntityId_ Maybe String
tmpPath AgentErrorType
err = do
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ Maybe String -> (String -> ReaderT Env IO ()) -> ReaderT Env IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe String
tmpPath (String -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
removePath (String -> ReaderT Env IO ())
-> (String -> ReaderT Env IO String) -> String -> ReaderT Env IO ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< String -> ReaderT Env IO String
toFSFilePath)
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> String -> IO ()
updateRcvFileError Connection
db Int64
rcvFileId (AgentErrorType -> String
forall a. Show a => a -> String
show AgentErrorType
err)
AgentClient -> AEntityId -> AEvent 'AERcvFile -> AM ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c (AEntityId -> Maybe AEntityId -> AEntityId
forall a. a -> Maybe a -> a
fromMaybe AEntityId
rcvFileEntityId Maybe AEntityId
redirectEntityId_) (AgentErrorType -> AEvent 'AERcvFile
RFERR AgentErrorType
err)
runXFTPRcvLocalWorker :: AgentClient -> Worker -> AM ()
runXFTPRcvLocalWorker :: AgentClient -> Worker -> AM ()
runXFTPRcvLocalWorker AgentClient
c Worker {TMVar ()
$sel:doWork:Worker :: Worker -> TMVar ()
doWork :: TMVar ()
doWork} = do
AgentConfig
cfg <- (Env -> AgentConfig)
-> ExceptT AgentErrorType (ReaderT Env IO) AgentConfig
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> AgentConfig
config
AM () -> AM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ do
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => TMVar () -> m ()
waitForWork TMVar ()
doWork
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
assertAgentForeground AgentClient
c
AgentConfig -> AM ()
runXFTPOperation AgentConfig
cfg
where
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation AgentConfig {NominalDiffTime
$sel:rcvFilesTTL:AgentConfig :: AgentConfig -> NominalDiffTime
rcvFilesTTL :: NominalDiffTime
rcvFilesTTL} =
AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe RcvFile)))
-> (RcvFile -> AM ())
-> AM ()
forall a.
AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe a)))
-> (a -> AM ())
-> AM ()
withWork AgentClient
c TMVar ()
doWork (Connection
-> NominalDiffTime -> IO (Either StoreError (Maybe RcvFile))
`getNextRcvFileToDecrypt` NominalDiffTime
rcvFilesTTL) ((RcvFile -> AM ()) -> AM ()) -> (RcvFile -> AM ()) -> AM ()
forall a b. (a -> b) -> a -> b
$
\f :: RcvFile
f@RcvFile {Int64
rcvFileId :: Int64
$sel:rcvFileId:RcvFile :: RcvFile -> Int64
rcvFileId, AEntityId
rcvFileEntityId :: AEntityId
$sel:rcvFileEntityId:RcvFile :: RcvFile -> AEntityId
rcvFileEntityId, Maybe String
tmpPath :: Maybe String
$sel:tmpPath:RcvFile :: RcvFile -> Maybe String
tmpPath, Maybe RcvFileRedirect
$sel:redirect:RcvFile :: RcvFile -> Maybe RcvFileRedirect
redirect :: Maybe RcvFileRedirect
redirect} ->
RcvFile -> AM ()
decryptFile RcvFile
f AM () -> (AgentErrorType -> AM ()) -> AM ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
`catchAllErrors` AgentClient
-> Int64
-> AEntityId
-> Maybe AEntityId
-> Maybe String
-> AgentErrorType
-> AM ()
rcvWorkerInternalError AgentClient
c Int64
rcvFileId AEntityId
rcvFileEntityId (RcvFileRedirect -> AEntityId
redirectEntityId (RcvFileRedirect -> AEntityId)
-> Maybe RcvFileRedirect -> Maybe AEntityId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe RcvFileRedirect
redirect) Maybe String
tmpPath
decryptFile :: RcvFile -> AM ()
decryptFile :: RcvFile -> AM ()
decryptFile RcvFile {Int64
$sel:rcvFileId:RcvFile :: RcvFile -> Int64
rcvFileId :: Int64
rcvFileId, AEntityId
$sel:rcvFileEntityId:RcvFile :: RcvFile -> AEntityId
rcvFileEntityId :: AEntityId
rcvFileEntityId, FileSize Int64
$sel:size:RcvFile :: RcvFile -> FileSize Int64
size :: FileSize Int64
size, FileDigest
digest :: FileDigest
$sel:digest:RcvFile :: RcvFile -> FileDigest
digest, SbKey
key :: SbKey
$sel:key:RcvFile :: RcvFile -> SbKey
key, CbNonce
nonce :: CbNonce
$sel:nonce:RcvFile :: RcvFile -> CbNonce
nonce, Maybe String
$sel:tmpPath:RcvFile :: RcvFile -> Maybe String
tmpPath :: Maybe String
tmpPath, CryptoFile
saveFile :: CryptoFile
$sel:saveFile:RcvFile :: RcvFile -> CryptoFile
saveFile, RcvFileStatus
status :: RcvFileStatus
$sel:status:RcvFile :: RcvFile -> RcvFileStatus
status, [RcvFileChunk]
$sel:chunks:RcvFile :: RcvFile -> [RcvFileChunk]
chunks :: [RcvFileChunk]
chunks, Maybe RcvFileRedirect
$sel:redirect:RcvFile :: RcvFile -> Maybe RcvFileRedirect
redirect :: Maybe RcvFileRedirect
redirect} = do
let CryptoFile String
savePath Maybe CryptoFileArgs
cfArgs = CryptoFile
saveFile
String
fsSavePath <- ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String)
-> ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO String
toFSFilePath String
savePath
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ())
-> (ReaderT Env IO () -> ReaderT Env IO ())
-> ReaderT Env IO ()
-> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> ReaderT Env IO () -> ReaderT Env IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (RcvFileStatus
status RcvFileStatus -> RcvFileStatus -> Bool
forall a. Eq a => a -> a -> Bool
== RcvFileStatus
RFSDecrypting) (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$
ReaderT Env IO Bool -> ReaderT Env IO () -> ReaderT Env IO ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (String -> ReaderT Env IO Bool
forall (m :: * -> *). MonadIO m => String -> m Bool
doesFileExist String
fsSavePath) (String -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
removeFile String
fsSavePath ReaderT Env IO () -> ReaderT Env IO () -> ReaderT Env IO ()
forall a b.
ReaderT Env IO a -> ReaderT Env IO b -> ReaderT Env IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> String -> ReaderT Env IO ()
createEmptyFile String
fsSavePath)
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> RcvFileStatus -> IO ()
updateRcvFileStatus Connection
db Int64
rcvFileId RcvFileStatus
RFSDecrypting
[String]
chunkPaths <- [RcvFileChunk] -> AM [String]
getChunkPaths [RcvFileChunk]
chunks
Int64
encSize <- IO Int64 -> ExceptT AgentErrorType (ReaderT Env IO) Int64
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> ExceptT AgentErrorType (ReaderT Env IO) Int64)
-> IO Int64 -> ExceptT AgentErrorType (ReaderT Env IO) Int64
forall a b. (a -> b) -> a -> b
$ (Int64 -> String -> IO Int64) -> Int64 -> [String] -> IO Int64
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (\Int64
s String
path -> (Int64
s Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+) (Int64 -> Int64) -> (Integer -> Int64) -> Integer -> Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Integer -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Integer -> Int64) -> IO Integer -> IO Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IO Integer
forall (m :: * -> *). MonadIO m => String -> m Integer
getFileSize String
path) Int64
0 [String]
chunkPaths
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int64 -> FileSize Int64
forall a. a -> FileSize a
FileSize Int64
encSize FileSize Int64 -> FileSize Int64 -> Bool
forall a. Eq a => a -> a -> Bool
/= FileSize Int64
size) (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AM ()
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType -> AM ()) -> AgentErrorType -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> XFTPErrorType -> AgentErrorType
XFTP String
"" XFTPErrorType
XFTP.SIZE
AEntityId
encDigest <- IO AEntityId -> AM AEntityId
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AEntityId -> AM AEntityId) -> IO AEntityId -> AM AEntityId
forall a b. (a -> b) -> a -> b
$ LazyByteString -> AEntityId
LC.sha512Hash (LazyByteString -> AEntityId) -> IO LazyByteString -> IO AEntityId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [String] -> IO LazyByteString
readChunks [String]
chunkPaths
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (AEntityId -> FileDigest
FileDigest AEntityId
encDigest FileDigest -> FileDigest -> Bool
forall a. Eq a => a -> a -> Bool
/= FileDigest
digest) (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AM ()
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType -> AM ()) -> AgentErrorType -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> XFTPErrorType -> AgentErrorType
XFTP String
"" XFTPErrorType
XFTP.DIGEST
let destFile :: CryptoFile
destFile = String -> Maybe CryptoFileArgs -> CryptoFile
CryptoFile String
fsSavePath Maybe CryptoFileArgs
cfArgs
ExceptT AgentErrorType (ReaderT Env IO) CryptoFile -> AM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ExceptT AgentErrorType (ReaderT Env IO) CryptoFile -> AM ())
-> ExceptT AgentErrorType (ReaderT Env IO) CryptoFile -> AM ()
forall a b. (a -> b) -> a -> b
$ (FTCryptoError -> AgentErrorType)
-> ExceptT FTCryptoError IO CryptoFile
-> ExceptT AgentErrorType (ReaderT Env IO) CryptoFile
forall (m :: * -> *) e e' a.
MonadIO m =>
(e -> e') -> ExceptT e IO a -> ExceptT e' m a
liftError (FileErrorType -> AgentErrorType
FILE (FileErrorType -> AgentErrorType)
-> (FTCryptoError -> FileErrorType)
-> FTCryptoError
-> AgentErrorType
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> FileErrorType
FILE_IO (String -> FileErrorType)
-> (FTCryptoError -> String) -> FTCryptoError -> FileErrorType
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FTCryptoError -> String
forall a. Show a => a -> String
show) (ExceptT FTCryptoError IO CryptoFile
-> ExceptT AgentErrorType (ReaderT Env IO) CryptoFile)
-> ExceptT FTCryptoError IO CryptoFile
-> ExceptT AgentErrorType (ReaderT Env IO) CryptoFile
forall a b. (a -> b) -> a -> b
$ Int64
-> [String]
-> SbKey
-> CbNonce
-> (Text -> ExceptT String IO CryptoFile)
-> ExceptT FTCryptoError IO CryptoFile
decryptChunks Int64
encSize [String]
chunkPaths SbKey
key CbNonce
nonce ((Text -> ExceptT String IO CryptoFile)
-> ExceptT FTCryptoError IO CryptoFile)
-> (Text -> ExceptT String IO CryptoFile)
-> ExceptT FTCryptoError IO CryptoFile
forall a b. (a -> b) -> a -> b
$ \Text
_ -> CryptoFile -> ExceptT String IO CryptoFile
forall a. a -> ExceptT String IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure CryptoFile
destFile
case Maybe RcvFileRedirect
redirect of
Maybe RcvFileRedirect
Nothing -> do
AgentClient -> AEntityId -> AEvent 'AERcvFile -> AM ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c AEntityId
rcvFileEntityId (AEvent 'AERcvFile -> AM ()) -> AEvent 'AERcvFile -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> AEvent 'AERcvFile
RFDONE String
fsSavePath
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ Maybe String -> (String -> ReaderT Env IO ()) -> ReaderT Env IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe String
tmpPath (String -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
removePath (String -> ReaderT Env IO ())
-> (String -> ReaderT Env IO String) -> String -> ReaderT Env IO ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< String -> ReaderT Env IO String
toFSFilePath)
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitUntilForeground AgentClient
c
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c (Connection -> Int64 -> IO ()
`updateRcvFileComplete` Int64
rcvFileId)
Just RcvFileRedirect {RedirectFileInfo
$sel:redirectFileInfo:RcvFileRedirect :: RcvFileRedirect -> RedirectFileInfo
redirectFileInfo :: RedirectFileInfo
redirectFileInfo, Int64
redirectDbId :: Int64
$sel:redirectDbId:RcvFileRedirect :: RcvFileRedirect -> Int64
redirectDbId} -> do
let RedirectFileInfo {$sel:size:RedirectFileInfo :: RedirectFileInfo -> FileSize Int64
size = FileSize Int64
redirectSize, $sel:digest:RedirectFileInfo :: RedirectFileInfo -> FileDigest
digest = FileDigest
redirectDigest} = RedirectFileInfo
redirectFileInfo
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ Maybe String -> (String -> ReaderT Env IO ()) -> ReaderT Env IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe String
tmpPath (String -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
removePath (String -> ReaderT Env IO ())
-> (String -> ReaderT Env IO String) -> String -> ReaderT Env IO ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< String -> ReaderT Env IO String
toFSFilePath)
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitUntilForeground AgentClient
c
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c (Connection -> Int64 -> IO ()
`updateRcvFileComplete` Int64
rcvFileId)
LazyByteString
yaml <- (FTCryptoError -> AgentErrorType)
-> ExceptT FTCryptoError IO LazyByteString
-> ExceptT AgentErrorType (ReaderT Env IO) LazyByteString
forall (m :: * -> *) e e' a.
MonadIO m =>
(e -> e') -> ExceptT e IO a -> ExceptT e' m a
liftError (FileErrorType -> AgentErrorType
FILE (FileErrorType -> AgentErrorType)
-> (FTCryptoError -> FileErrorType)
-> FTCryptoError
-> AgentErrorType
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> FileErrorType
FILE_IO (String -> FileErrorType)
-> (FTCryptoError -> String) -> FTCryptoError -> FileErrorType
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FTCryptoError -> String
forall a. Show a => a -> String
show) (CryptoFile -> ExceptT FTCryptoError IO LazyByteString
CF.readFile (CryptoFile -> ExceptT FTCryptoError IO LazyByteString)
-> CryptoFile -> ExceptT FTCryptoError IO LazyByteString
forall a b. (a -> b) -> a -> b
$ String -> Maybe CryptoFileArgs -> CryptoFile
CryptoFile String
fsSavePath Maybe CryptoFileArgs
cfArgs) ExceptT AgentErrorType (ReaderT Env IO) LazyByteString
-> AM () -> ExceptT AgentErrorType (ReaderT Env IO) LazyByteString
forall e (m :: * -> *) a b.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> ExceptT e m b -> ExceptT e m a
`allFinally` (ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO String
toFSFilePath String
fsSavePath ReaderT Env IO String
-> (String -> ReaderT Env IO ()) -> ReaderT Env IO ()
forall a b.
ReaderT Env IO a -> (a -> ReaderT Env IO b) -> ReaderT Env IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= String -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
removePath)
next :: FileDescription 'FRecipient
next@FileDescription {$sel:chunks:FileDescription :: forall (p :: FileParty). FileDescription p -> [FileChunk]
chunks = [FileChunk]
nextChunks} <- case AEntityId -> Either String (ValidFileDescription 'FRecipient)
forall a. StrEncoding a => AEntityId -> Either String a
strDecode (LazyByteString -> AEntityId
LB.toStrict LazyByteString
yaml) of
Left String
_ -> AgentErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient))
-> (FileErrorType -> AgentErrorType)
-> FileErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FileErrorType -> AgentErrorType
FILE (FileErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient))
-> FileErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient)
forall a b. (a -> b) -> a -> b
$ String -> FileErrorType
REDIRECT String
"decode error"
Right (ValidFileDescription fd :: FileDescription 'FRecipient
fd@FileDescription {$sel:size:FileDescription :: forall (p :: FileParty). FileDescription p -> FileSize Int64
size = FileSize Int64
dstSize, $sel:digest:FileDescription :: forall (p :: FileParty). FileDescription p -> FileDigest
digest = FileDigest
dstDigest})
| FileSize Int64
dstSize FileSize Int64 -> FileSize Int64 -> Bool
forall a. Eq a => a -> a -> Bool
/= FileSize Int64
redirectSize -> AgentErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient))
-> (FileErrorType -> AgentErrorType)
-> FileErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FileErrorType -> AgentErrorType
FILE (FileErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient))
-> FileErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient)
forall a b. (a -> b) -> a -> b
$ String -> FileErrorType
REDIRECT String
"size mismatch"
| FileDigest
dstDigest FileDigest -> FileDigest -> Bool
forall a. Eq a => a -> a -> Bool
/= FileDigest
redirectDigest -> AgentErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient))
-> (FileErrorType -> AgentErrorType)
-> FileErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FileErrorType -> AgentErrorType
FILE (FileErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient))
-> FileErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient)
forall a b. (a -> b) -> a -> b
$ String -> FileErrorType
REDIRECT String
"digest mismatch"
| Bool
otherwise -> FileDescription 'FRecipient
-> ExceptT
AgentErrorType (ReaderT Env IO) (FileDescription 'FRecipient)
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure FileDescription 'FRecipient
fd
AgentClient -> (Connection -> IO (Either StoreError ())) -> AM ()
forall a.
AgentClient -> (Connection -> IO (Either StoreError a)) -> AM a
withStore AgentClient
c ((Connection -> IO (Either StoreError ())) -> AM ())
-> (Connection -> IO (Either StoreError ())) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> Int64
-> FileDescription 'FRecipient
-> IO (Either StoreError ())
updateRcvFileRedirect Connection
db Int64
redirectDbId FileDescription 'FRecipient
next
[FileChunk] -> (FileChunk -> AM ()) -> AM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [FileChunk]
nextChunks (AgentClient -> FileChunk -> AM ()
downloadChunk AgentClient
c)
where
getChunkPaths :: [RcvFileChunk] -> AM [FilePath]
getChunkPaths :: [RcvFileChunk] -> AM [String]
getChunkPaths [] = [String] -> AM [String]
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
getChunkPaths (RcvFileChunk {$sel:chunkTmpPath:RcvFileChunk :: RcvFileChunk -> Maybe String
chunkTmpPath = Just String
path} : [RcvFileChunk]
cs) = do
[String]
ps <- [RcvFileChunk] -> AM [String]
getChunkPaths [RcvFileChunk]
cs
String
fsPath <- ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String)
-> ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO String
toFSFilePath String
path
[String] -> AM [String]
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([String] -> AM [String]) -> [String] -> AM [String]
forall a b. (a -> b) -> a -> b
$ String
fsPath String -> [String] -> [String]
forall a. a -> [a] -> [a]
: [String]
ps
getChunkPaths (RcvFileChunk {$sel:chunkTmpPath:RcvFileChunk :: RcvFileChunk -> Maybe String
chunkTmpPath = Maybe String
Nothing} : [RcvFileChunk]
_cs) =
AgentErrorType -> AM [String]
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType -> AM [String]) -> AgentErrorType -> AM [String]
forall a b. (a -> b) -> a -> b
$ String -> AgentErrorType
INTERNAL String
"no chunk path"
xftpDeleteRcvFile' :: AgentClient -> RcvFileId -> AM' ()
xftpDeleteRcvFile' :: AgentClient -> AEntityId -> ReaderT Env IO ()
xftpDeleteRcvFile' AgentClient
c AEntityId
rcvFileEntityId = AgentClient -> [AEntityId] -> ReaderT Env IO ()
xftpDeleteRcvFiles' AgentClient
c [AEntityId
rcvFileEntityId]
xftpDeleteRcvFiles' :: AgentClient -> [RcvFileId] -> AM' ()
xftpDeleteRcvFiles' :: AgentClient -> [AEntityId] -> ReaderT Env IO ()
xftpDeleteRcvFiles' AgentClient
c [AEntityId]
rcvFileEntityIds = do
[RcvFile]
rcvFiles <- [Either AgentErrorType RcvFile] -> [RcvFile]
forall a b. [Either a b] -> [b]
rights ([Either AgentErrorType RcvFile] -> [RcvFile])
-> ReaderT Env IO [Either AgentErrorType RcvFile]
-> ReaderT Env IO [RcvFile]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType RcvFile)])
-> ReaderT Env IO [Either AgentErrorType RcvFile]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (AEntityId -> IO (Either AgentErrorType RcvFile))
-> [AEntityId] -> [IO (Either AgentErrorType RcvFile)]
forall a b. (a -> b) -> [a] -> [b]
map ((Either StoreError RcvFile -> Either AgentErrorType RcvFile)
-> IO (Either StoreError RcvFile)
-> IO (Either AgentErrorType RcvFile)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((StoreError -> AgentErrorType)
-> Either StoreError RcvFile -> Either AgentErrorType RcvFile
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError) (IO (Either StoreError RcvFile)
-> IO (Either AgentErrorType RcvFile))
-> (AEntityId -> IO (Either StoreError RcvFile))
-> AEntityId
-> IO (Either AgentErrorType RcvFile)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> AEntityId -> IO (Either StoreError RcvFile)
getRcvFileByEntityId Connection
db) [AEntityId]
rcvFileEntityIds)
[[RcvFile]]
redirects <- [Either AgentErrorType [RcvFile]] -> [[RcvFile]]
forall a b. [Either a b] -> [b]
rights ([Either AgentErrorType [RcvFile]] -> [[RcvFile]])
-> ReaderT Env IO [Either AgentErrorType [RcvFile]]
-> ReaderT Env IO [[RcvFile]]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Connection -> Int64 -> IO [RcvFile])
-> [RcvFile] -> ReaderT Env IO [Either AgentErrorType [RcvFile]]
forall a.
(Connection -> Int64 -> IO a)
-> [RcvFile] -> AM' [Either AgentErrorType a]
batchFiles Connection -> Int64 -> IO [RcvFile]
getRcvFileRedirects [RcvFile]
rcvFiles
let ([RcvFile]
toDelete, [RcvFile]
toMarkDeleted) = (RcvFile -> Bool) -> [RcvFile] -> ([RcvFile], [RcvFile])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition RcvFile -> Bool
fileComplete ([RcvFile] -> ([RcvFile], [RcvFile]))
-> [RcvFile] -> ([RcvFile], [RcvFile])
forall a b. (a -> b) -> a -> b
$ [[RcvFile]] -> [RcvFile]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[RcvFile]]
redirects [RcvFile] -> [RcvFile] -> [RcvFile]
forall a. Semigroup a => a -> a -> a
<> [RcvFile]
rcvFiles
ReaderT Env IO [Either AgentErrorType ()] -> ReaderT Env IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ReaderT Env IO [Either AgentErrorType ()] -> ReaderT Env IO ())
-> ReaderT Env IO [Either AgentErrorType ()] -> ReaderT Env IO ()
forall a b. (a -> b) -> a -> b
$ (Connection -> Int64 -> IO ())
-> [RcvFile] -> ReaderT Env IO [Either AgentErrorType ()]
forall a.
(Connection -> Int64 -> IO a)
-> [RcvFile] -> AM' [Either AgentErrorType a]
batchFiles Connection -> Int64 -> IO ()
deleteRcvFile' [RcvFile]
toDelete
ReaderT Env IO [Either AgentErrorType ()] -> ReaderT Env IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ReaderT Env IO [Either AgentErrorType ()] -> ReaderT Env IO ())
-> ReaderT Env IO [Either AgentErrorType ()] -> ReaderT Env IO ()
forall a b. (a -> b) -> a -> b
$ (Connection -> Int64 -> IO ())
-> [RcvFile] -> ReaderT Env IO [Either AgentErrorType ()]
forall a.
(Connection -> Int64 -> IO a)
-> [RcvFile] -> AM' [Either AgentErrorType a]
batchFiles Connection -> Int64 -> IO ()
updateRcvFileDeleted [RcvFile]
toMarkDeleted
String
workPath <- ReaderT Env IO String
getXFTPWorkPath
IO () -> ReaderT Env IO ()
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ReaderT Env IO ())
-> ((RcvFile -> IO ()) -> IO ())
-> (RcvFile -> IO ())
-> ReaderT Env IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [RcvFile] -> (RcvFile -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [RcvFile]
toDelete ((RcvFile -> IO ()) -> ReaderT Env IO ())
-> (RcvFile -> IO ()) -> ReaderT Env IO ()
forall a b. (a -> b) -> a -> b
$ \RcvFile {String
prefixPath :: String
$sel:prefixPath:RcvFile :: RcvFile -> String
prefixPath} ->
(String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
removePath (String -> IO ()) -> (String -> String) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String
workPath String -> String -> String
</>)) String
prefixPath IO () -> IO () -> IO ()
forall a. IO a -> IO a -> IO a
`catchAll_` () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where
fileComplete :: RcvFile -> Bool
fileComplete RcvFile {RcvFileStatus
$sel:status:RcvFile :: RcvFile -> RcvFileStatus
status :: RcvFileStatus
status} = RcvFileStatus
status RcvFileStatus -> RcvFileStatus -> Bool
forall a. Eq a => a -> a -> Bool
== RcvFileStatus
RFSComplete Bool -> Bool -> Bool
|| RcvFileStatus
status RcvFileStatus -> RcvFileStatus -> Bool
forall a. Eq a => a -> a -> Bool
== RcvFileStatus
RFSError
batchFiles :: (DB.Connection -> DBRcvFileId -> IO a) -> [RcvFile] -> AM' [Either AgentErrorType a]
batchFiles :: forall a.
(Connection -> Int64 -> IO a)
-> [RcvFile] -> AM' [Either AgentErrorType a]
batchFiles Connection -> Int64 -> IO a
f [RcvFile]
rcvFiles = AgentClient
-> (Connection -> [IO a]) -> AM' [Either AgentErrorType a]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c ((Connection -> [IO a]) -> AM' [Either AgentErrorType a])
-> (Connection -> [IO a]) -> AM' [Either AgentErrorType a]
forall a b. (a -> b) -> a -> b
$ \Connection
db -> (RcvFile -> IO a) -> [RcvFile] -> [IO a]
forall a b. (a -> b) -> [a] -> [b]
map (\RcvFile {Int64
$sel:rcvFileId:RcvFile :: RcvFile -> Int64
rcvFileId :: Int64
rcvFileId} -> Connection -> Int64 -> IO a
f Connection
db Int64
rcvFileId) [RcvFile]
rcvFiles
notify :: forall m e. (MonadIO m, AEntityI e) => AgentClient -> AEntityId -> AEvent e -> m ()
notify :: forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c AEntityId
entId AEvent e
cmd = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue ATransmission -> ATransmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (AgentClient -> TBQueue ATransmission
subQ AgentClient
c) (AEntityId
"", AEntityId
entId, SAEntity e -> AEvent e -> AEvt
forall (e :: AEntity). AEntityI e => SAEntity e -> AEvent e -> AEvt
AEvt (forall (e :: AEntity). AEntityI e => SAEntity e
sAEntity @e) AEvent e
cmd)
xftpSendFile' :: AgentClient -> UserId -> CryptoFile -> Int -> AM SndFileId
xftpSendFile' :: AgentClient -> Int64 -> CryptoFile -> Int -> AM AEntityId
xftpSendFile' AgentClient
c Int64
userId CryptoFile
file Int
numRecipients = do
TVar ChaChaDRG
g <- (Env -> TVar ChaChaDRG)
-> ExceptT AgentErrorType (ReaderT Env IO) (TVar ChaChaDRG)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> TVar ChaChaDRG
random
String
prefixPath <- ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String)
-> ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO String
getPrefixPath String
"snd.xftp"
String -> AM ()
forall (m :: * -> *). MonadIO m => String -> m ()
createDirectory String
prefixPath
let relPrefixPath :: String
relPrefixPath = String -> String
takeFileName String
prefixPath
SbKey
key <- STM SbKey -> ExceptT AgentErrorType (ReaderT Env IO) SbKey
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM SbKey -> ExceptT AgentErrorType (ReaderT Env IO) SbKey)
-> STM SbKey -> ExceptT AgentErrorType (ReaderT Env IO) SbKey
forall a b. (a -> b) -> a -> b
$ TVar ChaChaDRG -> STM SbKey
C.randomSbKey TVar ChaChaDRG
g
CbNonce
nonce <- STM CbNonce -> ExceptT AgentErrorType (ReaderT Env IO) CbNonce
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM CbNonce -> ExceptT AgentErrorType (ReaderT Env IO) CbNonce)
-> STM CbNonce -> ExceptT AgentErrorType (ReaderT Env IO) CbNonce
forall a b. (a -> b) -> a -> b
$ TVar ChaChaDRG -> STM CbNonce
C.randomCbNonce TVar ChaChaDRG
g
AEntityId
fId <- AgentClient
-> (Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId
forall a.
AgentClient -> (Connection -> IO (Either StoreError a)) -> AM a
withStore AgentClient
c ((Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId)
-> (Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> TVar ChaChaDRG
-> Int64
-> CryptoFile
-> Int
-> String
-> SbKey
-> CbNonce
-> Maybe RedirectFileInfo
-> IO (Either StoreError AEntityId)
createSndFile Connection
db TVar ChaChaDRG
g Int64
userId CryptoFile
file Int
numRecipients String
relPrefixPath SbKey
key CbNonce
nonce Maybe RedirectFileInfo
forall a. Maybe a
Nothing
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ())
-> (AM' Worker -> ReaderT Env IO ()) -> AM' Worker -> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AM' Worker -> ReaderT Env IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (AM' Worker -> AM ()) -> AM' Worker -> AM ()
forall a b. (a -> b) -> a -> b
$ Bool -> AgentClient -> Maybe XFTPServer -> AM' Worker
getXFTPSndWorker Bool
True AgentClient
c Maybe XFTPServer
forall a. Maybe a
Nothing
AEntityId -> AM AEntityId
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure AEntityId
fId
xftpSendDescription' :: AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Int -> AM SndFileId
xftpSendDescription' :: AgentClient
-> Int64 -> ValidFileDescription 'FRecipient -> Int -> AM AEntityId
xftpSendDescription' AgentClient
c Int64
userId (ValidFileDescription fdDirect :: FileDescription 'FRecipient
fdDirect@FileDescription {FileSize Int64
$sel:size:FileDescription :: forall (p :: FileParty). FileDescription p -> FileSize Int64
size :: FileSize Int64
size, FileDigest
$sel:digest:FileDescription :: forall (p :: FileParty). FileDescription p -> FileDigest
digest :: FileDigest
digest}) Int
numRecipients = do
TVar ChaChaDRG
g <- (Env -> TVar ChaChaDRG)
-> ExceptT AgentErrorType (ReaderT Env IO) (TVar ChaChaDRG)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> TVar ChaChaDRG
random
String
prefixPath <- ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String)
-> ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO String
getPrefixPath String
"snd.xftp"
String -> AM ()
forall (m :: * -> *). MonadIO m => String -> m ()
createDirectory String
prefixPath
let relPrefixPath :: String
relPrefixPath = String -> String
takeFileName String
prefixPath
let directYaml :: String
directYaml = String
prefixPath String -> String -> String
</> String
"direct.yaml"
CryptoFileArgs
cfArgs <- STM CryptoFileArgs
-> ExceptT AgentErrorType (ReaderT Env IO) CryptoFileArgs
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM CryptoFileArgs
-> ExceptT AgentErrorType (ReaderT Env IO) CryptoFileArgs)
-> STM CryptoFileArgs
-> ExceptT AgentErrorType (ReaderT Env IO) CryptoFileArgs
forall a b. (a -> b) -> a -> b
$ TVar ChaChaDRG -> STM CryptoFileArgs
CF.randomArgs TVar ChaChaDRG
g
let file :: CryptoFile
file = String -> Maybe CryptoFileArgs -> CryptoFile
CryptoFile String
directYaml (CryptoFileArgs -> Maybe CryptoFileArgs
forall a. a -> Maybe a
Just CryptoFileArgs
cfArgs)
(FTCryptoError -> AgentErrorType)
-> ExceptT FTCryptoError IO () -> AM ()
forall (m :: * -> *) e e' a.
MonadIO m =>
(e -> e') -> ExceptT e IO a -> ExceptT e' m a
liftError (FileErrorType -> AgentErrorType
FILE (FileErrorType -> AgentErrorType)
-> (FTCryptoError -> FileErrorType)
-> FTCryptoError
-> AgentErrorType
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> FileErrorType
FILE_IO (String -> FileErrorType)
-> (FTCryptoError -> String) -> FTCryptoError -> FileErrorType
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FTCryptoError -> String
forall a. Show a => a -> String
show) (ExceptT FTCryptoError IO () -> AM ())
-> ExceptT FTCryptoError IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ CryptoFile -> LazyByteString -> ExceptT FTCryptoError IO ()
CF.writeFile CryptoFile
file (AEntityId -> LazyByteString
LB.fromStrict (AEntityId -> LazyByteString) -> AEntityId -> LazyByteString
forall a b. (a -> b) -> a -> b
$ FileDescription 'FRecipient -> AEntityId
forall a. StrEncoding a => a -> AEntityId
strEncode FileDescription 'FRecipient
fdDirect)
SbKey
key <- STM SbKey -> ExceptT AgentErrorType (ReaderT Env IO) SbKey
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM SbKey -> ExceptT AgentErrorType (ReaderT Env IO) SbKey)
-> STM SbKey -> ExceptT AgentErrorType (ReaderT Env IO) SbKey
forall a b. (a -> b) -> a -> b
$ TVar ChaChaDRG -> STM SbKey
C.randomSbKey TVar ChaChaDRG
g
CbNonce
nonce <- STM CbNonce -> ExceptT AgentErrorType (ReaderT Env IO) CbNonce
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM CbNonce -> ExceptT AgentErrorType (ReaderT Env IO) CbNonce)
-> STM CbNonce -> ExceptT AgentErrorType (ReaderT Env IO) CbNonce
forall a b. (a -> b) -> a -> b
$ TVar ChaChaDRG -> STM CbNonce
C.randomCbNonce TVar ChaChaDRG
g
AEntityId
fId <- AgentClient
-> (Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId
forall a.
AgentClient -> (Connection -> IO (Either StoreError a)) -> AM a
withStore AgentClient
c ((Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId)
-> (Connection -> IO (Either StoreError AEntityId)) -> AM AEntityId
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> TVar ChaChaDRG
-> Int64
-> CryptoFile
-> Int
-> String
-> SbKey
-> CbNonce
-> Maybe RedirectFileInfo
-> IO (Either StoreError AEntityId)
createSndFile Connection
db TVar ChaChaDRG
g Int64
userId CryptoFile
file Int
numRecipients String
relPrefixPath SbKey
key CbNonce
nonce (Maybe RedirectFileInfo -> IO (Either StoreError AEntityId))
-> Maybe RedirectFileInfo -> IO (Either StoreError AEntityId)
forall a b. (a -> b) -> a -> b
$ RedirectFileInfo -> Maybe RedirectFileInfo
forall a. a -> Maybe a
Just RedirectFileInfo {FileSize Int64
$sel:size:RedirectFileInfo :: FileSize Int64
size :: FileSize Int64
size, FileDigest
$sel:digest:RedirectFileInfo :: FileDigest
digest :: FileDigest
digest}
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ())
-> (AM' Worker -> ReaderT Env IO ()) -> AM' Worker -> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AM' Worker -> ReaderT Env IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (AM' Worker -> AM ()) -> AM' Worker -> AM ()
forall a b. (a -> b) -> a -> b
$ Bool -> AgentClient -> Maybe XFTPServer -> AM' Worker
getXFTPSndWorker Bool
True AgentClient
c Maybe XFTPServer
forall a. Maybe a
Nothing
AEntityId -> AM AEntityId
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure AEntityId
fId
resumeXFTPSndWork :: AgentClient -> Maybe XFTPServer -> AM' ()
resumeXFTPSndWork :: AgentClient -> Maybe XFTPServer -> ReaderT Env IO ()
resumeXFTPSndWork = AM' Worker -> ReaderT Env IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (AM' Worker -> ReaderT Env IO ())
-> (AgentClient -> Maybe XFTPServer -> AM' Worker)
-> AgentClient
-> Maybe XFTPServer
-> ReaderT Env IO ()
forall c d a b. (c -> d) -> (a -> b -> c) -> a -> b -> d
.: Bool -> AgentClient -> Maybe XFTPServer -> AM' Worker
getXFTPSndWorker Bool
False
getXFTPSndWorker :: Bool -> AgentClient -> Maybe XFTPServer -> AM' Worker
getXFTPSndWorker :: Bool -> AgentClient -> Maybe XFTPServer -> AM' Worker
getXFTPSndWorker Bool
hasWork AgentClient
c Maybe XFTPServer
server = do
TVar (Map (Maybe XFTPServer) Worker)
ws <- (Env -> TVar (Map (Maybe XFTPServer) Worker))
-> ReaderT Env IO (TVar (Map (Maybe XFTPServer) Worker))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TVar (Map (Maybe XFTPServer) Worker))
-> ReaderT Env IO (TVar (Map (Maybe XFTPServer) Worker)))
-> (Env -> TVar (Map (Maybe XFTPServer) Worker))
-> ReaderT Env IO (TVar (Map (Maybe XFTPServer) Worker))
forall a b. (a -> b) -> a -> b
$ XFTPAgent -> TVar (Map (Maybe XFTPServer) Worker)
xftpSndWorkers (XFTPAgent -> TVar (Map (Maybe XFTPServer) Worker))
-> (Env -> XFTPAgent)
-> Env
-> TVar (Map (Maybe XFTPServer) Worker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> XFTPAgent
xftpAgent
String
-> Bool
-> AgentClient
-> Maybe XFTPServer
-> TVar (Map (Maybe XFTPServer) Worker)
-> (Worker -> AM ())
-> AM' Worker
forall k e (m :: * -> *).
(Ord k, Show k, AnyError e, MonadUnliftIO m) =>
String
-> Bool
-> AgentClient
-> k
-> TMap k Worker
-> (Worker -> ExceptT e m ())
-> m Worker
getAgentWorker String
"xftp_snd" Bool
hasWork AgentClient
c Maybe XFTPServer
server TVar (Map (Maybe XFTPServer) Worker)
ws ((Worker -> AM ()) -> AM' Worker)
-> (Worker -> AM ()) -> AM' Worker
forall a b. (a -> b) -> a -> b
$
(Worker -> AM ())
-> (XFTPServer -> Worker -> AM ())
-> Maybe XFTPServer
-> Worker
-> AM ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (AgentClient -> Worker -> AM ()
runXFTPSndPrepareWorker AgentClient
c) (AgentClient -> XFTPServer -> Worker -> AM ()
runXFTPSndWorker AgentClient
c) Maybe XFTPServer
server
runXFTPSndPrepareWorker :: AgentClient -> Worker -> AM ()
runXFTPSndPrepareWorker :: AgentClient -> Worker -> AM ()
runXFTPSndPrepareWorker AgentClient
c Worker {TMVar ()
$sel:doWork:Worker :: Worker -> TMVar ()
doWork :: TMVar ()
doWork} = do
AgentConfig
cfg <- (Env -> AgentConfig)
-> ExceptT AgentErrorType (ReaderT Env IO) AgentConfig
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> AgentConfig
config
AM () -> AM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ do
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => TMVar () -> m ()
waitForWork TMVar ()
doWork
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
assertAgentForeground AgentClient
c
AgentConfig -> AM ()
runXFTPOperation AgentConfig
cfg
where
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation cfg :: AgentConfig
cfg@AgentConfig {NominalDiffTime
$sel:sndFilesTTL:AgentConfig :: AgentConfig -> NominalDiffTime
sndFilesTTL :: NominalDiffTime
sndFilesTTL} =
AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe SndFile)))
-> (SndFile -> AM ())
-> AM ()
forall a.
AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe a)))
-> (a -> AM ())
-> AM ()
withWork AgentClient
c TMVar ()
doWork (Connection
-> NominalDiffTime -> IO (Either StoreError (Maybe SndFile))
`getNextSndFileToPrepare` NominalDiffTime
sndFilesTTL) ((SndFile -> AM ()) -> AM ()) -> (SndFile -> AM ()) -> AM ()
forall a b. (a -> b) -> a -> b
$
\f :: SndFile
f@SndFile {Int64
sndFileId :: Int64
$sel:sndFileId:SndFile :: SndFile -> Int64
sndFileId, AEntityId
sndFileEntityId :: AEntityId
$sel:sndFileEntityId:SndFile :: SndFile -> AEntityId
sndFileEntityId, Maybe String
prefixPath :: Maybe String
$sel:prefixPath:SndFile :: SndFile -> Maybe String
prefixPath} ->
AgentConfig -> SndFile -> AM ()
prepareFile AgentConfig
cfg SndFile
f AM () -> (AgentErrorType -> AM ()) -> AM ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
`catchAllErrors` AgentClient
-> Int64 -> AEntityId -> Maybe String -> AgentErrorType -> AM ()
sndWorkerInternalError AgentClient
c Int64
sndFileId AEntityId
sndFileEntityId Maybe String
prefixPath
prepareFile :: AgentConfig -> SndFile -> AM ()
prepareFile :: AgentConfig -> SndFile -> AM ()
prepareFile AgentConfig
_ SndFile {$sel:prefixPath:SndFile :: SndFile -> Maybe String
prefixPath = Maybe String
Nothing} =
AgentErrorType -> AM ()
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType -> AM ()) -> AgentErrorType -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> AgentErrorType
INTERNAL String
"no prefix path"
prepareFile AgentConfig
cfg sndFile :: SndFile
sndFile@SndFile {Int64
$sel:sndFileId:SndFile :: SndFile -> Int64
sndFileId :: Int64
sndFileId, AEntityId
$sel:sndFileEntityId:SndFile :: SndFile -> AEntityId
sndFileEntityId :: AEntityId
sndFileEntityId, Int64
userId :: Int64
$sel:userId:SndFile :: SndFile -> Int64
userId, $sel:prefixPath:SndFile :: SndFile -> Maybe String
prefixPath = Just String
ppath, SndFileStatus
status :: SndFileStatus
$sel:status:SndFile :: SndFile -> SndFileStatus
status} = do
SndFile {Int
numRecipients :: Int
$sel:numRecipients:SndFile :: SndFile -> Int
numRecipients, [SndFileChunk]
chunks :: [SndFileChunk]
$sel:chunks:SndFile :: SndFile -> [SndFileChunk]
chunks} <-
if SndFileStatus
status SndFileStatus -> SndFileStatus -> Bool
forall a. Eq a => a -> a -> Bool
/= SndFileStatus
SFSEncrypted
then do
String
fsEncPath <- ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String)
-> (String -> ReaderT Env IO String)
-> String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> ReaderT Env IO String
toFSFilePath (String -> ExceptT AgentErrorType (ReaderT Env IO) String)
-> String -> ExceptT AgentErrorType (ReaderT Env IO) String
forall a b. (a -> b) -> a -> b
$ String -> String
sndFileEncPath String
ppath
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SndFileStatus
status SndFileStatus -> SndFileStatus -> Bool
forall a. Eq a => a -> a -> Bool
== SndFileStatus
SFSEncrypting) (AM () -> AM ()) -> (AM () -> AM ()) -> AM () -> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT AgentErrorType (ReaderT Env IO) Bool -> AM () -> AM ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (String -> ExceptT AgentErrorType (ReaderT Env IO) Bool
forall (m :: * -> *). MonadIO m => String -> m Bool
doesFileExist String
fsEncPath) (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$
String -> AM ()
forall (m :: * -> *). MonadIO m => String -> m ()
removeFile String
fsEncPath
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> SndFileStatus -> IO ()
updateSndFileStatus Connection
db Int64
sndFileId SndFileStatus
SFSEncrypting
(FileDigest
digest, [(XFTPChunkSpec, FileDigest)]
chunkSpecsDigests) <- SndFile -> String -> AM (FileDigest, [(XFTPChunkSpec, FileDigest)])
encryptFileForUpload SndFile
sndFile String
fsEncPath
AgentClient
-> (Connection -> IO (Either StoreError SndFile))
-> ExceptT AgentErrorType (ReaderT Env IO) SndFile
forall a.
AgentClient -> (Connection -> IO (Either StoreError a)) -> AM a
withStore AgentClient
c ((Connection -> IO (Either StoreError SndFile))
-> ExceptT AgentErrorType (ReaderT Env IO) SndFile)
-> (Connection -> IO (Either StoreError SndFile))
-> ExceptT AgentErrorType (ReaderT Env IO) SndFile
forall a b. (a -> b) -> a -> b
$ \Connection
db -> do
Connection -> Int64 -> IO ()
lockSndFileForUpdate Connection
db Int64
sndFileId
Connection
-> Int64 -> FileDigest -> [(XFTPChunkSpec, FileDigest)] -> IO ()
updateSndFileEncrypted Connection
db Int64
sndFileId FileDigest
digest [(XFTPChunkSpec, FileDigest)]
chunkSpecsDigests
Connection -> Int64 -> IO (Either StoreError SndFile)
getSndFile Connection
db Int64
sndFileId
else SndFile -> ExceptT AgentErrorType (ReaderT Env IO) SndFile
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SndFile
sndFile
let numRecipients' :: Int
numRecipients' = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
numRecipients Int
maxRecipients
let ([SndFileChunk]
pendingChunks, [XFTPServer]
preparedSrvs) = [Either SndFileChunk XFTPServer] -> ([SndFileChunk], [XFTPServer])
forall a b. [Either a b] -> ([a], [b])
partitionEithers ([Either SndFileChunk XFTPServer]
-> ([SndFileChunk], [XFTPServer]))
-> [Either SndFileChunk XFTPServer]
-> ([SndFileChunk], [XFTPServer])
forall a b. (a -> b) -> a -> b
$ (SndFileChunk -> Either SndFileChunk XFTPServer)
-> [SndFileChunk] -> [Either SndFileChunk XFTPServer]
forall a b. (a -> b) -> [a] -> [b]
map SndFileChunk -> Either SndFileChunk XFTPServer
srvOrPendingChunk [SndFileChunk]
chunks
[XFTPServer]
srvs <- [SndFileChunk]
-> (SndFileChunk
-> ExceptT AgentErrorType (ReaderT Env IO) XFTPServer)
-> AM [XFTPServer]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [SndFileChunk]
pendingChunks ((SndFileChunk
-> ExceptT AgentErrorType (ReaderT Env IO) XFTPServer)
-> AM [XFTPServer])
-> (SndFileChunk
-> ExceptT AgentErrorType (ReaderT Env IO) XFTPServer)
-> AM [XFTPServer]
forall a b. (a -> b) -> a -> b
$ Int
-> SndFileChunk
-> ExceptT AgentErrorType (ReaderT Env IO) XFTPServer
createChunk Int
numRecipients'
let allSrvs :: Set XFTPServer
allSrvs = [XFTPServer] -> Set XFTPServer
forall a. Ord a => [a] -> Set a
S.fromList ([XFTPServer] -> Set XFTPServer) -> [XFTPServer] -> Set XFTPServer
forall a b. (a -> b) -> a -> b
$ [XFTPServer]
preparedSrvs [XFTPServer] -> [XFTPServer] -> [XFTPServer]
forall a. Semigroup a => a -> a -> a
<> [XFTPServer]
srvs
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ Set XFTPServer -> (XFTPServer -> AM' Worker) -> ReaderT Env IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Set XFTPServer
allSrvs ((XFTPServer -> AM' Worker) -> ReaderT Env IO ())
-> (XFTPServer -> AM' Worker) -> ReaderT Env IO ()
forall a b. (a -> b) -> a -> b
$ \XFTPServer
srv -> Bool -> AgentClient -> Maybe XFTPServer -> AM' Worker
getXFTPSndWorker Bool
True AgentClient
c (XFTPServer -> Maybe XFTPServer
forall a. a -> Maybe a
Just XFTPServer
srv)
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> SndFileStatus -> IO ()
updateSndFileStatus Connection
db Int64
sndFileId SndFileStatus
SFSUploading
where
AgentConfig {$sel:xftpMaxRecipientsPerRequest:AgentConfig :: AgentConfig -> Int
xftpMaxRecipientsPerRequest = Int
maxRecipients, $sel:messageRetryInterval:AgentConfig :: AgentConfig -> RetryInterval2
messageRetryInterval = RetryInterval2
ri} = AgentConfig
cfg
encryptFileForUpload :: SndFile -> FilePath -> AM (FileDigest, [(XFTPChunkSpec, FileDigest)])
encryptFileForUpload :: SndFile -> String -> AM (FileDigest, [(XFTPChunkSpec, FileDigest)])
encryptFileForUpload SndFile {SbKey
key :: SbKey
$sel:key:SndFile :: SndFile -> SbKey
key, CbNonce
nonce :: CbNonce
$sel:nonce:SndFile :: SndFile -> CbNonce
nonce, CryptoFile
srcFile :: CryptoFile
$sel:srcFile:SndFile :: SndFile -> CryptoFile
srcFile, Maybe RedirectFileInfo
redirect :: Maybe RedirectFileInfo
$sel:redirect:SndFile :: SndFile -> Maybe RedirectFileInfo
redirect} String
fsEncPath = do
let CryptoFile {String
filePath :: String
filePath :: CryptoFile -> String
filePath} = CryptoFile
srcFile
fileName :: Text
fileName = String -> Text
pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String -> String
takeFileName String
filePath
Int64
fileSize <- IO Int64 -> ExceptT AgentErrorType (ReaderT Env IO) Int64
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> ExceptT AgentErrorType (ReaderT Env IO) Int64)
-> IO Int64 -> ExceptT AgentErrorType (ReaderT Env IO) Int64
forall a b. (a -> b) -> a -> b
$ Integer -> Int64
forall a. Num a => Integer -> a
fromInteger (Integer -> Int64) -> IO Integer -> IO Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> CryptoFile -> IO Integer
CF.getFileContentsSize CryptoFile
srcFile
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int64
fileSize Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
maxFileSizeHard) (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AM ()
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType -> AM ()) -> AgentErrorType -> AM ()
forall a b. (a -> b) -> a -> b
$ FileErrorType -> AgentErrorType
FILE FileErrorType
FT.SIZE
let fileHdr :: AEntityId
fileHdr = FileHeader -> AEntityId
forall a. Encoding a => a -> AEntityId
smpEncode FileHeader {Text
fileName :: Text
$sel:fileName:FileHeader :: Text
fileName, $sel:fileExtra:FileHeader :: Maybe Text
fileExtra = Maybe Text
forall a. Maybe a
Nothing}
fileSize' :: Int64
fileSize' = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AEntityId -> Int
B.length AEntityId
fileHdr) Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
fileSize
payloadSize :: Int64
payloadSize = Int64
fileSize' Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
fileSizeLen Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
authTagSize
[Word32]
chunkSizes <- case Maybe RedirectFileInfo
redirect of
Maybe RedirectFileInfo
Nothing -> [Word32] -> ExceptT AgentErrorType (ReaderT Env IO) [Word32]
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Word32] -> ExceptT AgentErrorType (ReaderT Env IO) [Word32])
-> [Word32] -> ExceptT AgentErrorType (ReaderT Env IO) [Word32]
forall a b. (a -> b) -> a -> b
$ Int64 -> [Word32]
prepareChunkSizes Int64
payloadSize
Just RedirectFileInfo
_ -> case Int64 -> Maybe Word32
singleChunkSize Int64
payloadSize of
Maybe Word32
Nothing -> AgentErrorType -> ExceptT AgentErrorType (ReaderT Env IO) [Word32]
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType
-> ExceptT AgentErrorType (ReaderT Env IO) [Word32])
-> AgentErrorType
-> ExceptT AgentErrorType (ReaderT Env IO) [Word32]
forall a b. (a -> b) -> a -> b
$ FileErrorType -> AgentErrorType
FILE FileErrorType
FT.SIZE
Just Word32
chunkSize -> [Word32] -> ExceptT AgentErrorType (ReaderT Env IO) [Word32]
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Word32
chunkSize]
let encSize :: Int64
encSize = [Int64] -> Int64
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int64] -> Int64) -> [Int64] -> Int64
forall a b. (a -> b) -> a -> b
$ (Word32 -> Int64) -> [Word32] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map Word32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral [Word32]
chunkSizes
AM () -> AM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ (FTCryptoError -> AgentErrorType)
-> ExceptT FTCryptoError IO () -> AM ()
forall (m :: * -> *) e e' a.
MonadIO m =>
(e -> e') -> ExceptT e IO a -> ExceptT e' m a
liftError (FileErrorType -> AgentErrorType
FILE (FileErrorType -> AgentErrorType)
-> (FTCryptoError -> FileErrorType)
-> FTCryptoError
-> AgentErrorType
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> FileErrorType
FILE_IO (String -> FileErrorType)
-> (FTCryptoError -> String) -> FTCryptoError -> FileErrorType
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FTCryptoError -> String
forall a. Show a => a -> String
show) (ExceptT FTCryptoError IO () -> AM ())
-> ExceptT FTCryptoError IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ CryptoFile
-> AEntityId
-> SbKey
-> CbNonce
-> Int64
-> Int64
-> String
-> ExceptT FTCryptoError IO ()
encryptFile CryptoFile
srcFile AEntityId
fileHdr SbKey
key CbNonce
nonce Int64
fileSize' Int64
encSize String
fsEncPath
AEntityId
digest <- IO AEntityId -> AM AEntityId
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AEntityId -> AM AEntityId) -> IO AEntityId -> AM AEntityId
forall a b. (a -> b) -> a -> b
$ LazyByteString -> AEntityId
LC.sha512Hash (LazyByteString -> AEntityId) -> IO LazyByteString -> IO AEntityId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IO LazyByteString
LB.readFile String
fsEncPath
let chunkSpecs :: [XFTPChunkSpec]
chunkSpecs = String -> [Word32] -> [XFTPChunkSpec]
prepareChunkSpecs String
fsEncPath [Word32]
chunkSizes
[AEntityId]
chunkDigests <- IO [AEntityId]
-> ExceptT AgentErrorType (ReaderT Env IO) [AEntityId]
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [AEntityId]
-> ExceptT AgentErrorType (ReaderT Env IO) [AEntityId])
-> IO [AEntityId]
-> ExceptT AgentErrorType (ReaderT Env IO) [AEntityId]
forall a b. (a -> b) -> a -> b
$ (XFTPChunkSpec -> IO AEntityId)
-> [XFTPChunkSpec] -> IO [AEntityId]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM XFTPChunkSpec -> IO AEntityId
getChunkDigest [XFTPChunkSpec]
chunkSpecs
(FileDigest, [(XFTPChunkSpec, FileDigest)])
-> AM (FileDigest, [(XFTPChunkSpec, FileDigest)])
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AEntityId -> FileDigest
FileDigest AEntityId
digest, [XFTPChunkSpec] -> [FileDigest] -> [(XFTPChunkSpec, FileDigest)]
forall a b. [a] -> [b] -> [(a, b)]
zip [XFTPChunkSpec]
chunkSpecs ([FileDigest] -> [(XFTPChunkSpec, FileDigest)])
-> [FileDigest] -> [(XFTPChunkSpec, FileDigest)]
forall a b. (a -> b) -> a -> b
$ [AEntityId] -> [FileDigest]
forall a b. Coercible a b => a -> b
coerce [AEntityId]
chunkDigests)
srvOrPendingChunk :: SndFileChunk -> Either SndFileChunk (ProtocolServer 'PXFTP)
srvOrPendingChunk :: SndFileChunk -> Either SndFileChunk XFTPServer
srvOrPendingChunk ch :: SndFileChunk
ch@SndFileChunk {[SndFileChunkReplica]
replicas :: [SndFileChunkReplica]
$sel:replicas:SndFileChunk :: SndFileChunk -> [SndFileChunkReplica]
replicas} = case [SndFileChunkReplica]
replicas of
[] -> SndFileChunk -> Either SndFileChunk XFTPServer
forall a b. a -> Either a b
Left SndFileChunk
ch
SndFileChunkReplica {XFTPServer
server :: XFTPServer
$sel:server:SndFileChunkReplica :: SndFileChunkReplica -> XFTPServer
server} : [SndFileChunkReplica]
_ -> XFTPServer -> Either SndFileChunk XFTPServer
forall a b. b -> Either a b
Right XFTPServer
server
createChunk :: Int -> SndFileChunk -> AM (ProtocolServer 'PXFTP)
createChunk :: Int
-> SndFileChunk
-> ExceptT AgentErrorType (ReaderT Env IO) XFTPServer
createChunk Int
numRecipients' SndFileChunk
ch = do
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
assertAgentForeground AgentClient
c
(NewSndChunkReplica
replica, ProtoServerWithAuth XFTPServer
srv Maybe BasicAuth
_) <- ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
tryCreate
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> SndFileChunk -> NewSndChunkReplica -> IO ()
createSndFileReplica Connection
db SndFileChunk
ch NewSndChunkReplica
replica
XFTPServer -> ExceptT AgentErrorType (ReaderT Env IO) XFTPServer
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure XFTPServer
srv
where
tryCreate :: ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
tryCreate = do
TVar (Set TransportHost)
triedHosts <- Set TransportHost
-> ExceptT
AgentErrorType (ReaderT Env IO) (TVar (Set TransportHost))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Set TransportHost
forall a. Set a
S.empty
let AgentClient {TMap Int64 (UserServers 'PXFTP)
xftpServers :: TMap Int64 (UserServers 'PXFTP)
$sel:xftpServers:AgentClient :: AgentClient -> TMap Int64 (UserServers 'PXFTP)
xftpServers} = AgentClient
c
Int
userSrvCount <- IO Int -> ExceptT AgentErrorType (ReaderT Env IO) Int
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> ExceptT AgentErrorType (ReaderT Env IO) Int)
-> IO Int -> ExceptT AgentErrorType (ReaderT Env IO) Int
forall a b. (a -> b) -> a -> b
$ Maybe (UserServers 'PXFTP) -> Int
forall a. Maybe a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (Maybe (UserServers 'PXFTP) -> Int)
-> IO (Maybe (UserServers 'PXFTP)) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int64
-> TMap Int64 (UserServers 'PXFTP)
-> IO (Maybe (UserServers 'PXFTP))
forall k a. Ord k => k -> TMap k a -> IO (Maybe a)
TM.lookupIO Int64
userId TMap Int64 (UserServers 'PXFTP)
xftpServers
RetryInterval
-> (Int
-> Int64
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
forall (m :: * -> *) a.
MonadIO m =>
RetryInterval -> (Int -> Int64 -> m a -> m a) -> m a
withRetryIntervalCount (RetryInterval2 -> RetryInterval
riFast RetryInterval2
ri) ((Int
-> Int64
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP))
-> (Int
-> Int64
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
forall a b. (a -> b) -> a -> b
$ \Int
n Int64
_ ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
loop -> do
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitWhileSuspended AgentClient
c
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitForUserNetwork AgentClient
c
let triedAllSrvs :: Bool
triedAllSrvs = Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
userSrvCount
TVar (Set TransportHost)
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
createWithNextSrv TVar (Set TransportHost)
triedHosts
ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
-> (AgentErrorType
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
`catchAllErrors` \AgentErrorType
e -> Text
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
-> AgentErrorType
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
forall a. Text -> AM a -> AM a -> AgentErrorType -> AM a
retryOnError Text
"XFTP prepare worker" (ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
-> Bool
-> AgentErrorType
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
forall {m :: * -> *} {e} {b}.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m b -> Bool -> AgentErrorType -> ExceptT e m b
retryLoop ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
loop Bool
triedAllSrvs AgentErrorType
e) (AgentErrorType
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE AgentErrorType
e) AgentErrorType
e
where
retryLoop :: ExceptT e m b -> Bool -> AgentErrorType -> ExceptT e m b
retryLoop ExceptT e m b
loop Bool
triedAllSrvs AgentErrorType
e = do
(ExceptT e m () -> (e -> ExceptT e m ()) -> ExceptT e m ())
-> (e -> ExceptT e m ()) -> ExceptT e m () -> ExceptT e m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ExceptT e m () -> (e -> ExceptT e m ()) -> ExceptT e m ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
catchAllErrors (\e
_ -> () -> ExceptT e m ()
forall a. a -> ExceptT e m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (ExceptT e m () -> ExceptT e m ())
-> ExceptT e m () -> ExceptT e m ()
forall a b. (a -> b) -> a -> b
$ do
Bool -> ExceptT e m () -> ExceptT e m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
triedAllSrvs Bool -> Bool -> Bool
&& AgentErrorType -> Bool
serverHostError AgentErrorType
e) (ExceptT e m () -> ExceptT e m ())
-> ExceptT e m () -> ExceptT e m ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> AEntityId -> AEvent 'AESndFile -> ExceptT e m ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c AEntityId
sndFileEntityId (AEvent 'AESndFile -> ExceptT e m ())
-> AEvent 'AESndFile -> ExceptT e m ()
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AEvent 'AESndFile
SFWARN AgentErrorType
e
IO () -> ExceptT e m ()
forall a. IO a -> ExceptT e m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT e m ()) -> IO () -> ExceptT e m ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
assertAgentForeground AgentClient
c
ExceptT e m b
loop
createWithNextSrv :: TVar (Set TransportHost)
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
createWithNextSrv TVar (Set TransportHost)
triedHosts = do
Bool
deleted <- AgentClient
-> (Connection -> IO Bool)
-> ExceptT AgentErrorType (ReaderT Env IO) Bool
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO Bool)
-> ExceptT AgentErrorType (ReaderT Env IO) Bool)
-> (Connection -> IO Bool)
-> ExceptT AgentErrorType (ReaderT Env IO) Bool
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> IO Bool
getSndFileDeleted Connection
db Int64
sndFileId
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
deleted (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AM ()
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType -> AM ()) -> AgentErrorType -> AM ()
forall a b. (a -> b) -> a -> b
$ FileErrorType -> AgentErrorType
FILE FileErrorType
NO_FILE
AgentClient
-> Int64
-> (UserServers 'PXFTP
-> NonEmpty (Maybe Int64, ProtoServerWithAuth 'PXFTP))
-> TVar (Set TransportHost)
-> [XFTPServer]
-> (ProtoServerWithAuth 'PXFTP
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
forall (p :: ProtocolType) a.
(ProtocolTypeI p, UserProtocol p) =>
AgentClient
-> Int64
-> (UserServers p -> NonEmpty (Maybe Int64, ProtoServerWithAuth p))
-> TVar (Set TransportHost)
-> [ProtocolServer p]
-> (ProtoServerWithAuth p -> AM a)
-> AM a
withNextSrv AgentClient
c Int64
userId UserServers 'PXFTP
-> NonEmpty (Maybe Int64, ProtoServerWithAuth 'PXFTP)
forall (p :: ProtocolType).
UserServers p -> NonEmpty (Maybe Int64, ProtoServerWithAuth p)
storageSrvs TVar (Set TransportHost)
triedHosts [] ((ProtoServerWithAuth 'PXFTP
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP))
-> (ProtoServerWithAuth 'PXFTP
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP))
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
forall a b. (a -> b) -> a -> b
$ \ProtoServerWithAuth 'PXFTP
srvAuth -> do
NewSndChunkReplica
replica <- AgentClient
-> SndFileChunk
-> Int
-> ProtoServerWithAuth 'PXFTP
-> AM NewSndChunkReplica
agentXFTPNewChunk AgentClient
c SndFileChunk
ch Int
numRecipients' ProtoServerWithAuth 'PXFTP
srvAuth
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
-> ExceptT
AgentErrorType
(ReaderT Env IO)
(NewSndChunkReplica, ProtoServerWithAuth 'PXFTP)
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NewSndChunkReplica
replica, ProtoServerWithAuth 'PXFTP
srvAuth)
sndWorkerInternalError :: AgentClient -> DBSndFileId -> SndFileId -> Maybe FilePath -> AgentErrorType -> AM ()
sndWorkerInternalError :: AgentClient
-> Int64 -> AEntityId -> Maybe String -> AgentErrorType -> AM ()
sndWorkerInternalError AgentClient
c Int64
sndFileId AEntityId
sndFileEntityId Maybe String
prefixPath AgentErrorType
err = do
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ())
-> ((String -> ReaderT Env IO ()) -> ReaderT Env IO ())
-> (String -> ReaderT Env IO ())
-> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe String -> (String -> ReaderT Env IO ()) -> ReaderT Env IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe String
prefixPath ((String -> ReaderT Env IO ()) -> AM ())
-> (String -> ReaderT Env IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
removePath (String -> ReaderT Env IO ())
-> (String -> ReaderT Env IO String) -> String -> ReaderT Env IO ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< String -> ReaderT Env IO String
toFSFilePath
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> String -> IO ()
updateSndFileError Connection
db Int64
sndFileId (AgentErrorType -> String
forall a. Show a => a -> String
show AgentErrorType
err)
AgentClient -> AEntityId -> AEvent 'AESndFile -> AM ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c AEntityId
sndFileEntityId (AEvent 'AESndFile -> AM ()) -> AEvent 'AESndFile -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AEvent 'AESndFile
SFERR AgentErrorType
err
runXFTPSndWorker :: AgentClient -> XFTPServer -> Worker -> AM ()
runXFTPSndWorker :: AgentClient -> XFTPServer -> Worker -> AM ()
runXFTPSndWorker AgentClient
c XFTPServer
srv Worker {TMVar ()
$sel:doWork:Worker :: Worker -> TMVar ()
doWork :: TMVar ()
doWork} = do
AgentConfig
cfg <- (Env -> AgentConfig)
-> ExceptT AgentErrorType (ReaderT Env IO) AgentConfig
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> AgentConfig
config
AM () -> AM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ do
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => TMVar () -> m ()
waitForWork TMVar ()
doWork
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
assertAgentForeground AgentClient
c
AgentConfig -> AM ()
runXFTPOperation AgentConfig
cfg
where
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation cfg :: AgentConfig
cfg@AgentConfig {NominalDiffTime
$sel:sndFilesTTL:AgentConfig :: AgentConfig -> NominalDiffTime
sndFilesTTL :: NominalDiffTime
sndFilesTTL, $sel:reconnectInterval:AgentConfig :: AgentConfig -> RetryInterval
reconnectInterval = RetryInterval
ri, Int
$sel:xftpConsecutiveRetries:AgentConfig :: AgentConfig -> Int
xftpConsecutiveRetries :: Int
xftpConsecutiveRetries} = do
AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe SndFileChunk)))
-> (SndFileChunk -> AM ())
-> AM ()
forall a.
AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe a)))
-> (a -> AM ())
-> AM ()
withWork AgentClient
c TMVar ()
doWork (\Connection
db -> Connection
-> XFTPServer
-> NominalDiffTime
-> IO (Either StoreError (Maybe SndFileChunk))
getNextSndChunkToUpload Connection
db XFTPServer
srv NominalDiffTime
sndFilesTTL) ((SndFileChunk -> AM ()) -> AM ())
-> (SndFileChunk -> AM ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \case
SndFileChunk {Int64
sndFileId :: Int64
$sel:sndFileId:SndFileChunk :: SndFileChunk -> Int64
sndFileId, AEntityId
sndFileEntityId :: AEntityId
$sel:sndFileEntityId:SndFileChunk :: SndFileChunk -> AEntityId
sndFileEntityId, String
filePrefixPath :: String
$sel:filePrefixPath:SndFileChunk :: SndFileChunk -> String
filePrefixPath, $sel:replicas:SndFileChunk :: SndFileChunk -> [SndFileChunkReplica]
replicas = []} -> AgentClient
-> Int64 -> AEntityId -> Maybe String -> AgentErrorType -> AM ()
sndWorkerInternalError AgentClient
c Int64
sndFileId AEntityId
sndFileEntityId (String -> Maybe String
forall a. a -> Maybe a
Just String
filePrefixPath) (String -> AgentErrorType
INTERNAL String
"chunk has no replicas")
fc :: SndFileChunk
fc@SndFileChunk {Int64
userId :: Int64
$sel:userId:SndFileChunk :: SndFileChunk -> Int64
userId, Int64
$sel:sndFileId:SndFileChunk :: SndFileChunk -> Int64
sndFileId :: Int64
sndFileId, AEntityId
$sel:sndFileEntityId:SndFileChunk :: SndFileChunk -> AEntityId
sndFileEntityId :: AEntityId
sndFileEntityId, String
$sel:filePrefixPath:SndFileChunk :: SndFileChunk -> String
filePrefixPath :: String
filePrefixPath, FileDigest
digest :: FileDigest
$sel:digest:SndFileChunk :: SndFileChunk -> FileDigest
digest, $sel:replicas:SndFileChunk :: SndFileChunk -> [SndFileChunkReplica]
replicas = replica :: SndFileChunkReplica
replica@SndFileChunkReplica {Int64
sndChunkReplicaId :: Int64
$sel:sndChunkReplicaId:SndFileChunkReplica :: SndFileChunkReplica -> Int64
sndChunkReplicaId, XFTPServer
$sel:server:SndFileChunkReplica :: SndFileChunkReplica -> XFTPServer
server :: XFTPServer
server, Maybe Int64
delay :: Maybe Int64
$sel:delay:SndFileChunkReplica :: SndFileChunkReplica -> Maybe Int64
delay} : [SndFileChunkReplica]
_} -> do
let ri' :: RetryInterval
ri' = RetryInterval
-> (Int64 -> RetryInterval) -> Maybe Int64 -> RetryInterval
forall b a. b -> (a -> b) -> Maybe a -> b
maybe RetryInterval
ri (\Int64
d -> RetryInterval
ri {initialInterval = d, increaseAfter = 0}) Maybe Int64
delay
Int -> RetryInterval -> (Int64 -> AM () -> AM ()) -> AM ()
forall (m :: * -> *).
MonadIO m =>
Int -> RetryInterval -> (Int64 -> m () -> m ()) -> m ()
withRetryIntervalLimit Int
xftpConsecutiveRetries RetryInterval
ri' ((Int64 -> AM () -> AM ()) -> AM ())
-> (Int64 -> AM () -> AM ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Int64
delay' AM ()
loop -> do
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitWhileSuspended AgentClient
c
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitForUserNetwork AgentClient
c
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ()) -> STM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> XFTPServer
-> (AgentXFTPServerStats -> TVar Int)
-> STM ()
incXFTPServerStat AgentClient
c Int64
userId XFTPServer
srv AgentXFTPServerStats -> TVar Int
uploadAttempts
AgentConfig -> SndFileChunk -> SndFileChunkReplica -> AM ()
uploadFileChunk AgentConfig
cfg SndFileChunk
fc SndFileChunkReplica
replica
AM () -> (AgentErrorType -> AM ()) -> AM ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
`catchAllErrors` \AgentErrorType
e -> Text -> AM () -> AM () -> AgentErrorType -> AM ()
forall a. Text -> AM a -> AM a -> AgentErrorType -> AM a
retryOnError Text
"XFTP snd worker" (AM () -> AgentErrorType -> Int64 -> AM ()
forall {b}.
ExceptT AgentErrorType (ReaderT Env IO) b
-> AgentErrorType
-> Int64
-> ExceptT AgentErrorType (ReaderT Env IO) b
retryLoop AM ()
loop AgentErrorType
e Int64
delay') (AgentErrorType -> AM ()
retryDone AgentErrorType
e) AgentErrorType
e
where
retryLoop :: ExceptT AgentErrorType (ReaderT Env IO) b
-> AgentErrorType
-> Int64
-> ExceptT AgentErrorType (ReaderT Env IO) b
retryLoop ExceptT AgentErrorType (ReaderT Env IO) b
loop AgentErrorType
e Int64
replicaDelay = do
(AM () -> (AgentErrorType -> AM ()) -> AM ())
-> (AgentErrorType -> AM ()) -> AM () -> AM ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip AM () -> (AgentErrorType -> AM ()) -> AM ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
catchAllErrors (\AgentErrorType
_ -> () -> AM ()
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ do
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (AgentErrorType -> Bool
serverHostError AgentErrorType
e) (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> AEntityId -> AEvent 'AESndFile -> AM ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c AEntityId
sndFileEntityId (AEvent 'AESndFile -> AM ()) -> AEvent 'AESndFile -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AEvent 'AESndFile
SFWARN AgentErrorType
e
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> Int64 -> XFTPServer -> FileDigest -> IO ()
closeXFTPServerClient AgentClient
c Int64
userId XFTPServer
server FileDigest
digest
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> Int64 -> IO ()
updateSndChunkReplicaDelay Connection
db Int64
sndChunkReplicaId Int64
replicaDelay
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
assertAgentForeground AgentClient
c
ExceptT AgentErrorType (ReaderT Env IO) b
loop
retryDone :: AgentErrorType -> AM ()
retryDone AgentErrorType
e = do
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ()) -> STM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> XFTPServer
-> (AgentXFTPServerStats -> TVar Int)
-> STM ()
incXFTPServerStat AgentClient
c Int64
userId XFTPServer
srv AgentXFTPServerStats -> TVar Int
uploadErrs
AgentClient
-> Int64 -> AEntityId -> Maybe String -> AgentErrorType -> AM ()
sndWorkerInternalError AgentClient
c Int64
sndFileId AEntityId
sndFileEntityId (String -> Maybe String
forall a. a -> Maybe a
Just String
filePrefixPath) AgentErrorType
e
uploadFileChunk :: AgentConfig -> SndFileChunk -> SndFileChunkReplica -> AM ()
uploadFileChunk :: AgentConfig -> SndFileChunk -> SndFileChunkReplica -> AM ()
uploadFileChunk AgentConfig {$sel:xftpMaxRecipientsPerRequest:AgentConfig :: AgentConfig -> Int
xftpMaxRecipientsPerRequest = Int
maxRecipients} sndFileChunk :: SndFileChunk
sndFileChunk@SndFileChunk {Int64
$sel:sndFileId:SndFileChunk :: SndFileChunk -> Int64
sndFileId :: Int64
sndFileId, Int64
$sel:userId:SndFileChunk :: SndFileChunk -> Int64
userId :: Int64
userId, $sel:chunkSpec:SndFileChunk :: SndFileChunk -> XFTPChunkSpec
chunkSpec = chunkSpec :: XFTPChunkSpec
chunkSpec@XFTPChunkSpec {String
filePath :: String
$sel:filePath:XFTPChunkSpec :: XFTPChunkSpec -> String
filePath, $sel:chunkSize:XFTPChunkSpec :: XFTPChunkSpec -> Word32
chunkSize = Word32
chSize}, $sel:digest:SndFileChunk :: SndFileChunk -> FileDigest
digest = FileDigest
chunkDigest} SndFileChunkReplica
replica = do
replica' :: SndFileChunkReplica
replica'@SndFileChunkReplica {Int64
$sel:sndChunkReplicaId:SndFileChunkReplica :: SndFileChunkReplica -> Int64
sndChunkReplicaId :: Int64
sndChunkReplicaId} <- SndFileChunk -> SndFileChunkReplica -> AM SndFileChunkReplica
addRecipients SndFileChunk
sndFileChunk SndFileChunkReplica
replica
String
fsFilePath <- ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String)
-> ReaderT Env IO String
-> ExceptT AgentErrorType (ReaderT Env IO) String
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO String
toFSFilePath String
filePath
ExceptT AgentErrorType (ReaderT Env IO) Bool -> AM () -> AM ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
unlessM (String -> ExceptT AgentErrorType (ReaderT Env IO) Bool
forall (m :: * -> *). MonadIO m => String -> m Bool
doesFileExist String
fsFilePath) (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AM ()
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType -> AM ()) -> AgentErrorType -> AM ()
forall a b. (a -> b) -> a -> b
$ FileErrorType -> AgentErrorType
FILE FileErrorType
NO_FILE
let chunkSpec' :: XFTPChunkSpec
chunkSpec' = XFTPChunkSpec
chunkSpec {filePath = fsFilePath} :: XFTPChunkSpec
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
assertAgentForeground AgentClient
c
AgentClient
-> Int64
-> FileDigest
-> SndFileChunkReplica
-> XFTPChunkSpec
-> AM ()
agentXFTPUploadChunk AgentClient
c Int64
userId FileDigest
chunkDigest SndFileChunkReplica
replica' XFTPChunkSpec
chunkSpec'
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitUntilForeground AgentClient
c
sf :: SndFile
sf@SndFile {AEntityId
$sel:sndFileEntityId:SndFile :: SndFile -> AEntityId
sndFileEntityId :: AEntityId
sndFileEntityId, Maybe String
$sel:prefixPath:SndFile :: SndFile -> Maybe String
prefixPath :: Maybe String
prefixPath, [SndFileChunk]
$sel:chunks:SndFile :: SndFile -> [SndFileChunk]
chunks :: [SndFileChunk]
chunks} <- AgentClient
-> (Connection -> IO (Either StoreError SndFile))
-> ExceptT AgentErrorType (ReaderT Env IO) SndFile
forall a.
AgentClient -> (Connection -> IO (Either StoreError a)) -> AM a
withStore AgentClient
c ((Connection -> IO (Either StoreError SndFile))
-> ExceptT AgentErrorType (ReaderT Env IO) SndFile)
-> (Connection -> IO (Either StoreError SndFile))
-> ExceptT AgentErrorType (ReaderT Env IO) SndFile
forall a b. (a -> b) -> a -> b
$ \Connection
db -> do
Connection -> Int64 -> IO ()
lockSndFileForUpdate Connection
db Int64
sndFileId
Connection -> Int64 -> SndFileReplicaStatus -> IO ()
updateSndChunkReplicaStatus Connection
db Int64
sndChunkReplicaId SndFileReplicaStatus
SFRSUploaded
Connection -> Int64 -> IO (Either StoreError SndFile)
getSndFile Connection
db Int64
sndFileId
let uploaded :: Int64
uploaded = [SndFileChunk] -> Int64
uploadedSize [SndFileChunk]
chunks
total :: Int64
total = [SndFileChunk] -> Int64
totalSize [SndFileChunk]
chunks
complete :: Bool
complete = (SndFileChunk -> Bool) -> [SndFileChunk] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all SndFileChunk -> Bool
chunkUploaded [SndFileChunk]
chunks
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ()) -> STM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> XFTPServer
-> (AgentXFTPServerStats -> TVar Int)
-> STM ()
incXFTPServerStat AgentClient
c Int64
userId XFTPServer
srv AgentXFTPServerStats -> TVar Int
uploads
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ()) -> STM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> XFTPServer
-> (AgentXFTPServerStats -> TVar Int64)
-> Int64
-> STM ()
incXFTPServerSizeStat AgentClient
c Int64
userId XFTPServer
srv AgentXFTPServerStats -> TVar Int64
uploadsSize (Word32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int64) -> Word32 -> Int64
forall a b. (a -> b) -> a -> b
$ Word32 -> Word32
forall a. Integral a => a -> a
toKB Word32
chSize)
AgentClient -> AEntityId -> AEvent 'AESndFile -> AM ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c AEntityId
sndFileEntityId (AEvent 'AESndFile -> AM ()) -> AEvent 'AESndFile -> AM ()
forall a b. (a -> b) -> a -> b
$ Int64 -> Int64 -> AEvent 'AESndFile
SFPROG Int64
uploaded Int64
total
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
complete (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ do
(ValidFileDescription 'FSender
sndDescr, [ValidFileDescription 'FRecipient]
rcvDescrs) <- SndFile
-> AM
(ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
sndFileToDescrs SndFile
sf
AgentClient -> AEntityId -> AEvent 'AESndFile -> AM ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c AEntityId
sndFileEntityId (AEvent 'AESndFile -> AM ()) -> AEvent 'AESndFile -> AM ()
forall a b. (a -> b) -> a -> b
$ ValidFileDescription 'FSender
-> [ValidFileDescription 'FRecipient] -> AEvent 'AESndFile
SFDONE ValidFileDescription 'FSender
sndDescr [ValidFileDescription 'FRecipient]
rcvDescrs
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ())
-> ((String -> ReaderT Env IO ()) -> ReaderT Env IO ())
-> (String -> ReaderT Env IO ())
-> AM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe String -> (String -> ReaderT Env IO ()) -> ReaderT Env IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe String
prefixPath ((String -> ReaderT Env IO ()) -> AM ())
-> (String -> ReaderT Env IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ String -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
removePath (String -> ReaderT Env IO ())
-> (String -> ReaderT Env IO String) -> String -> ReaderT Env IO ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< String -> ReaderT Env IO String
toFSFilePath
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> IO ()
updateSndFileComplete Connection
db Int64
sndFileId
where
addRecipients :: SndFileChunk -> SndFileChunkReplica -> AM SndFileChunkReplica
addRecipients :: SndFileChunk -> SndFileChunkReplica -> AM SndFileChunkReplica
addRecipients ch :: SndFileChunk
ch@SndFileChunk {Int
numRecipients :: Int
$sel:numRecipients:SndFileChunk :: SndFileChunk -> Int
numRecipients} cr :: SndFileChunkReplica
cr@SndFileChunkReplica {Int64
$sel:sndChunkReplicaId:SndFileChunkReplica :: SndFileChunkReplica -> Int64
sndChunkReplicaId :: Int64
sndChunkReplicaId, [(ChunkReplicaId, APrivateAuthKey)]
rcvIdsKeys :: [(ChunkReplicaId, APrivateAuthKey)]
$sel:rcvIdsKeys:SndFileChunkReplica :: SndFileChunkReplica -> [(ChunkReplicaId, APrivateAuthKey)]
rcvIdsKeys}
| [(ChunkReplicaId, APrivateAuthKey)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(ChunkReplicaId, APrivateAuthKey)]
rcvIdsKeys Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
numRecipients = AgentErrorType -> AM SndFileChunkReplica
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType -> AM SndFileChunkReplica)
-> AgentErrorType -> AM SndFileChunkReplica
forall a b. (a -> b) -> a -> b
$ String -> AgentErrorType
INTERNAL (String
"too many recipients, sndChunkReplicaId = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int64 -> String
forall a. Show a => a -> String
show Int64
sndChunkReplicaId)
| [(ChunkReplicaId, APrivateAuthKey)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(ChunkReplicaId, APrivateAuthKey)]
rcvIdsKeys Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
numRecipients = SndFileChunkReplica -> AM SndFileChunkReplica
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SndFileChunkReplica
cr
| Bool
otherwise = do
let numRecipients' :: Int
numRecipients' = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min (Int
numRecipients Int -> Int -> Int
forall a. Num a => a -> a -> a
- [(ChunkReplicaId, APrivateAuthKey)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(ChunkReplicaId, APrivateAuthKey)]
rcvIdsKeys) Int
maxRecipients
NonEmpty (ChunkReplicaId, APrivateAuthKey)
rcvIdsKeys' <- AgentClient
-> Int64
-> FileDigest
-> SndFileChunkReplica
-> Int
-> AM (NonEmpty (ChunkReplicaId, APrivateAuthKey))
agentXFTPAddRecipients AgentClient
c Int64
userId FileDigest
chunkDigest SndFileChunkReplica
cr Int
numRecipients'
SndFileChunkReplica
cr' <- AgentClient
-> (Connection -> IO SndFileChunkReplica) -> AM SndFileChunkReplica
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO SndFileChunkReplica) -> AM SndFileChunkReplica)
-> (Connection -> IO SndFileChunkReplica) -> AM SndFileChunkReplica
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection
-> SndFileChunkReplica
-> [(ChunkReplicaId, APrivateAuthKey)]
-> IO SndFileChunkReplica
addSndChunkReplicaRecipients Connection
db SndFileChunkReplica
cr ([(ChunkReplicaId, APrivateAuthKey)] -> IO SndFileChunkReplica)
-> [(ChunkReplicaId, APrivateAuthKey)] -> IO SndFileChunkReplica
forall a b. (a -> b) -> a -> b
$ NonEmpty (ChunkReplicaId, APrivateAuthKey)
-> [(ChunkReplicaId, APrivateAuthKey)]
forall a. NonEmpty a -> [a]
L.toList NonEmpty (ChunkReplicaId, APrivateAuthKey)
rcvIdsKeys'
SndFileChunk -> SndFileChunkReplica -> AM SndFileChunkReplica
addRecipients SndFileChunk
ch SndFileChunkReplica
cr'
sndFileToDescrs :: SndFile -> AM (ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
sndFileToDescrs :: SndFile
-> AM
(ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
sndFileToDescrs SndFile {$sel:digest:SndFile :: SndFile -> Maybe FileDigest
digest = Maybe FileDigest
Nothing} = AgentErrorType
-> AM
(ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType
-> AM
(ValidFileDescription 'FSender,
[ValidFileDescription 'FRecipient]))
-> AgentErrorType
-> AM
(ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
forall a b. (a -> b) -> a -> b
$ String -> AgentErrorType
INTERNAL String
"snd file has no digest"
sndFileToDescrs SndFile {$sel:chunks:SndFile :: SndFile -> [SndFileChunk]
chunks = []} = AgentErrorType
-> AM
(ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType
-> AM
(ValidFileDescription 'FSender,
[ValidFileDescription 'FRecipient]))
-> AgentErrorType
-> AM
(ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
forall a b. (a -> b) -> a -> b
$ String -> AgentErrorType
INTERNAL String
"snd file has no chunks"
sndFileToDescrs SndFile {$sel:digest:SndFile :: SndFile -> Maybe FileDigest
digest = Just FileDigest
digest, SbKey
$sel:key:SndFile :: SndFile -> SbKey
key :: SbKey
key, CbNonce
$sel:nonce:SndFile :: SndFile -> CbNonce
nonce :: CbNonce
nonce, $sel:chunks:SndFile :: SndFile -> [SndFileChunk]
chunks = chunks :: [SndFileChunk]
chunks@(SndFileChunk
fstChunk : [SndFileChunk]
_), Maybe RedirectFileInfo
$sel:redirect:SndFile :: SndFile -> Maybe RedirectFileInfo
redirect :: Maybe RedirectFileInfo
redirect} = do
let chunkSize :: FileSize Word32
chunkSize = Word32 -> FileSize Word32
forall a. a -> FileSize a
FileSize (Word32 -> FileSize Word32) -> Word32 -> FileSize Word32
forall a b. (a -> b) -> a -> b
$ SndFileChunk -> Word32
sndChunkSize SndFileChunk
fstChunk
size :: FileSize Int64
size = Int64 -> FileSize Int64
forall a. a -> FileSize a
FileSize (Int64 -> FileSize Int64) -> Int64 -> FileSize Int64
forall a b. (a -> b) -> a -> b
$ [Int64] -> Int64
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int64] -> Int64) -> [Int64] -> Int64
forall a b. (a -> b) -> a -> b
$ (SndFileChunk -> Int64) -> [SndFileChunk] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (Word32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int64)
-> (SndFileChunk -> Word32) -> SndFileChunk -> Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SndFileChunk -> Word32
sndChunkSize) [SndFileChunk]
chunks
[FileChunk]
sndDescrChunks <- (SndFileChunk -> ExceptT AgentErrorType (ReaderT Env IO) FileChunk)
-> [SndFileChunk]
-> ExceptT AgentErrorType (ReaderT Env IO) [FileChunk]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM SndFileChunk -> ExceptT AgentErrorType (ReaderT Env IO) FileChunk
toSndDescrChunk [SndFileChunk]
chunks
let fdSnd :: FileDescription 'FSender
fdSnd = FileDescription {$sel:party:FileDescription :: SFileParty 'FSender
party = SFileParty 'FSender
SFSender, FileSize Int64
$sel:size:FileDescription :: FileSize Int64
size :: FileSize Int64
size, FileDigest
$sel:digest:FileDescription :: FileDigest
digest :: FileDigest
digest, SbKey
key :: SbKey
$sel:key:FileDescription :: SbKey
key, CbNonce
nonce :: CbNonce
$sel:nonce:FileDescription :: CbNonce
nonce, FileSize Word32
chunkSize :: FileSize Word32
$sel:chunkSize:FileDescription :: FileSize Word32
chunkSize, $sel:chunks:FileDescription :: [FileChunk]
chunks = [FileChunk]
sndDescrChunks, $sel:redirect:FileDescription :: Maybe RedirectFileInfo
redirect = Maybe RedirectFileInfo
forall a. Maybe a
Nothing}
ValidFileDescription 'FSender
validFdSnd <- (String
-> ExceptT
AgentErrorType (ReaderT Env IO) (ValidFileDescription 'FSender))
-> (ValidFileDescription 'FSender
-> ExceptT
AgentErrorType (ReaderT Env IO) (ValidFileDescription 'FSender))
-> Either String (ValidFileDescription 'FSender)
-> ExceptT
AgentErrorType (ReaderT Env IO) (ValidFileDescription 'FSender)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (AgentErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (ValidFileDescription 'FSender)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) (ValidFileDescription 'FSender))
-> (String -> AgentErrorType)
-> String
-> ExceptT
AgentErrorType (ReaderT Env IO) (ValidFileDescription 'FSender)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> AgentErrorType
INTERNAL) ValidFileDescription 'FSender
-> ExceptT
AgentErrorType (ReaderT Env IO) (ValidFileDescription 'FSender)
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String (ValidFileDescription 'FSender)
-> ExceptT
AgentErrorType (ReaderT Env IO) (ValidFileDescription 'FSender))
-> Either String (ValidFileDescription 'FSender)
-> ExceptT
AgentErrorType (ReaderT Env IO) (ValidFileDescription 'FSender)
forall a b. (a -> b) -> a -> b
$ FileDescription 'FSender
-> Either String (ValidFileDescription 'FSender)
forall (p :: FileParty).
FileDescription p -> Either String (ValidFileDescription p)
validateFileDescription FileDescription 'FSender
fdSnd
let fdRcv :: FileDescription 'FRecipient
fdRcv = FileDescription {$sel:party:FileDescription :: SFileParty 'FRecipient
party = SFileParty 'FRecipient
SFRecipient, FileSize Int64
$sel:size:FileDescription :: FileSize Int64
size :: FileSize Int64
size, FileDigest
$sel:digest:FileDescription :: FileDigest
digest :: FileDigest
digest, SbKey
key :: SbKey
$sel:key:FileDescription :: SbKey
key, CbNonce
nonce :: CbNonce
$sel:nonce:FileDescription :: CbNonce
nonce, FileSize Word32
chunkSize :: FileSize Word32
$sel:chunkSize:FileDescription :: FileSize Word32
chunkSize, $sel:chunks:FileDescription :: [FileChunk]
chunks = [], Maybe RedirectFileInfo
$sel:redirect:FileDescription :: Maybe RedirectFileInfo
redirect :: Maybe RedirectFileInfo
redirect}
fdRcvs :: [FileDescription 'FRecipient]
fdRcvs = FileDescription 'FRecipient
-> [SndFileChunk] -> [FileDescription 'FRecipient]
createRcvFileDescriptions FileDescription 'FRecipient
fdRcv [SndFileChunk]
chunks
[ValidFileDescription 'FRecipient]
validFdRcvs <- (String
-> ExceptT
AgentErrorType (ReaderT Env IO) [ValidFileDescription 'FRecipient])
-> ([ValidFileDescription 'FRecipient]
-> ExceptT
AgentErrorType (ReaderT Env IO) [ValidFileDescription 'FRecipient])
-> Either String [ValidFileDescription 'FRecipient]
-> ExceptT
AgentErrorType (ReaderT Env IO) [ValidFileDescription 'FRecipient]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (AgentErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) [ValidFileDescription 'FRecipient]
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType
-> ExceptT
AgentErrorType (ReaderT Env IO) [ValidFileDescription 'FRecipient])
-> (String -> AgentErrorType)
-> String
-> ExceptT
AgentErrorType (ReaderT Env IO) [ValidFileDescription 'FRecipient]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> AgentErrorType
INTERNAL) [ValidFileDescription 'FRecipient]
-> ExceptT
AgentErrorType (ReaderT Env IO) [ValidFileDescription 'FRecipient]
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String [ValidFileDescription 'FRecipient]
-> ExceptT
AgentErrorType (ReaderT Env IO) [ValidFileDescription 'FRecipient])
-> Either String [ValidFileDescription 'FRecipient]
-> ExceptT
AgentErrorType (ReaderT Env IO) [ValidFileDescription 'FRecipient]
forall a b. (a -> b) -> a -> b
$ (FileDescription 'FRecipient
-> Either String (ValidFileDescription 'FRecipient))
-> [FileDescription 'FRecipient]
-> Either String [ValidFileDescription 'FRecipient]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM FileDescription 'FRecipient
-> Either String (ValidFileDescription 'FRecipient)
forall (p :: FileParty).
FileDescription p -> Either String (ValidFileDescription p)
validateFileDescription [FileDescription 'FRecipient]
fdRcvs
(ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
-> AM
(ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ValidFileDescription 'FSender
validFdSnd, [ValidFileDescription 'FRecipient]
validFdRcvs)
toSndDescrChunk :: SndFileChunk -> AM FileChunk
toSndDescrChunk :: SndFileChunk -> ExceptT AgentErrorType (ReaderT Env IO) FileChunk
toSndDescrChunk SndFileChunk {$sel:replicas:SndFileChunk :: SndFileChunk -> [SndFileChunkReplica]
replicas = []} = AgentErrorType -> ExceptT AgentErrorType (ReaderT Env IO) FileChunk
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE (AgentErrorType
-> ExceptT AgentErrorType (ReaderT Env IO) FileChunk)
-> AgentErrorType
-> ExceptT AgentErrorType (ReaderT Env IO) FileChunk
forall a b. (a -> b) -> a -> b
$ String -> AgentErrorType
INTERNAL String
"snd file chunk has no replicas"
toSndDescrChunk ch :: SndFileChunk
ch@SndFileChunk {Int
chunkNo :: Int
$sel:chunkNo:SndFileChunk :: SndFileChunk -> Int
chunkNo, $sel:digest:SndFileChunk :: SndFileChunk -> FileDigest
digest = FileDigest
chDigest, $sel:replicas:SndFileChunk :: SndFileChunk -> [SndFileChunkReplica]
replicas = (SndFileChunkReplica {XFTPServer
$sel:server:SndFileChunkReplica :: SndFileChunkReplica -> XFTPServer
server :: XFTPServer
server, ChunkReplicaId
replicaId :: ChunkReplicaId
$sel:replicaId:SndFileChunkReplica :: SndFileChunkReplica -> ChunkReplicaId
replicaId, APrivateAuthKey
replicaKey :: APrivateAuthKey
$sel:replicaKey:SndFileChunkReplica :: SndFileChunkReplica -> APrivateAuthKey
replicaKey} : [SndFileChunkReplica]
_)} = do
let chunkSize :: FileSize Word32
chunkSize = Word32 -> FileSize Word32
forall a. a -> FileSize a
FileSize (Word32 -> FileSize Word32) -> Word32 -> FileSize Word32
forall a b. (a -> b) -> a -> b
$ SndFileChunk -> Word32
sndChunkSize SndFileChunk
ch
replicas :: [FileChunkReplica]
replicas = [FileChunkReplica {XFTPServer
$sel:server:FileChunkReplica :: XFTPServer
server :: XFTPServer
server, ChunkReplicaId
replicaId :: ChunkReplicaId
$sel:replicaId:FileChunkReplica :: ChunkReplicaId
replicaId, APrivateAuthKey
replicaKey :: APrivateAuthKey
$sel:replicaKey:FileChunkReplica :: APrivateAuthKey
replicaKey}]
FileChunk -> ExceptT AgentErrorType (ReaderT Env IO) FileChunk
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure FileChunk {Int
chunkNo :: Int
$sel:chunkNo:FileChunk :: Int
chunkNo, $sel:digest:FileChunk :: FileDigest
digest = FileDigest
chDigest, FileSize Word32
chunkSize :: FileSize Word32
$sel:chunkSize:FileChunk :: FileSize Word32
chunkSize, [FileChunkReplica]
$sel:replicas:FileChunk :: [FileChunkReplica]
replicas :: [FileChunkReplica]
replicas}
createRcvFileDescriptions :: FileDescription 'FRecipient -> [SndFileChunk] -> [FileDescription 'FRecipient]
createRcvFileDescriptions :: FileDescription 'FRecipient
-> [SndFileChunk] -> [FileDescription 'FRecipient]
createRcvFileDescriptions FileDescription 'FRecipient
fd [SndFileChunk]
sndChunks = ([FileChunk] -> FileDescription 'FRecipient)
-> [[FileChunk]] -> [FileDescription 'FRecipient]
forall a b. (a -> b) -> [a] -> [b]
map (\[FileChunk]
chunks -> (FileDescription 'FRecipient
fd :: (FileDescription 'FRecipient)) {chunks}) [[FileChunk]]
rcvChunks
where
rcvReplicas :: [SentRecipientReplica]
rcvReplicas :: [SentRecipientReplica]
rcvReplicas = (SndFileChunk -> [SentRecipientReplica])
-> [SndFileChunk] -> [SentRecipientReplica]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap SndFileChunk -> [SentRecipientReplica]
toSentRecipientReplicas [SndFileChunk]
sndChunks
toSentRecipientReplicas :: SndFileChunk -> [SentRecipientReplica]
toSentRecipientReplicas :: SndFileChunk -> [SentRecipientReplica]
toSentRecipientReplicas ch :: SndFileChunk
ch@SndFileChunk {Int
$sel:chunkNo:SndFileChunk :: SndFileChunk -> Int
chunkNo :: Int
chunkNo, FileDigest
$sel:digest:SndFileChunk :: SndFileChunk -> FileDigest
digest :: FileDigest
digest, [SndFileChunkReplica]
$sel:replicas:SndFileChunk :: SndFileChunk -> [SndFileChunkReplica]
replicas :: [SndFileChunkReplica]
replicas} =
let chunkSize :: FileSize Word32
chunkSize = Word32 -> FileSize Word32
forall a. a -> FileSize a
FileSize (Word32 -> FileSize Word32) -> Word32 -> FileSize Word32
forall a b. (a -> b) -> a -> b
$ SndFileChunk -> Word32
sndChunkSize SndFileChunk
ch
in (SndFileChunkReplica -> [SentRecipientReplica])
-> [SndFileChunkReplica] -> [SentRecipientReplica]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap
( \SndFileChunkReplica {XFTPServer
$sel:server:SndFileChunkReplica :: SndFileChunkReplica -> XFTPServer
server :: XFTPServer
server, [(ChunkReplicaId, APrivateAuthKey)]
$sel:rcvIdsKeys:SndFileChunkReplica :: SndFileChunkReplica -> [(ChunkReplicaId, APrivateAuthKey)]
rcvIdsKeys :: [(ChunkReplicaId, APrivateAuthKey)]
rcvIdsKeys} ->
(Int -> (ChunkReplicaId, APrivateAuthKey) -> SentRecipientReplica)
-> [Int]
-> [(ChunkReplicaId, APrivateAuthKey)]
-> [SentRecipientReplica]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith
(\Int
rcvNo (ChunkReplicaId
replicaId, APrivateAuthKey
replicaKey) -> SentRecipientReplica {Int
chunkNo :: Int
$sel:chunkNo:SentRecipientReplica :: Int
chunkNo, XFTPServer
server :: XFTPServer
$sel:server:SentRecipientReplica :: XFTPServer
server, Int
rcvNo :: Int
$sel:rcvNo:SentRecipientReplica :: Int
rcvNo, ChunkReplicaId
replicaId :: ChunkReplicaId
$sel:replicaId:SentRecipientReplica :: ChunkReplicaId
replicaId, APrivateAuthKey
replicaKey :: APrivateAuthKey
$sel:replicaKey:SentRecipientReplica :: APrivateAuthKey
replicaKey, FileDigest
digest :: FileDigest
$sel:digest:SentRecipientReplica :: FileDigest
digest, FileSize Word32
chunkSize :: FileSize Word32
$sel:chunkSize:SentRecipientReplica :: FileSize Word32
chunkSize})
[Int
1 ..]
[(ChunkReplicaId, APrivateAuthKey)]
rcvIdsKeys
)
[SndFileChunkReplica]
replicas
rcvChunks :: [[FileChunk]]
rcvChunks :: [[FileChunk]]
rcvChunks = (Map Int FileChunk -> [FileChunk])
-> [Map Int FileChunk] -> [[FileChunk]]
forall a b. (a -> b) -> [a] -> [b]
map ([FileChunk] -> [FileChunk]
sortChunks ([FileChunk] -> [FileChunk])
-> (Map Int FileChunk -> [FileChunk])
-> Map Int FileChunk
-> [FileChunk]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map Int FileChunk -> [FileChunk]
forall k a. Map k a -> [a]
M.elems) ([Map Int FileChunk] -> [[FileChunk]])
-> [Map Int FileChunk] -> [[FileChunk]]
forall a b. (a -> b) -> a -> b
$ Map Int (Map Int FileChunk) -> [Map Int FileChunk]
forall k a. Map k a -> [a]
M.elems (Map Int (Map Int FileChunk) -> [Map Int FileChunk])
-> Map Int (Map Int FileChunk) -> [Map Int FileChunk]
forall a b. (a -> b) -> a -> b
$ (Map Int (Map Int FileChunk)
-> SentRecipientReplica -> Map Int (Map Int FileChunk))
-> Map Int (Map Int FileChunk)
-> [SentRecipientReplica]
-> Map Int (Map Int FileChunk)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' Map Int (Map Int FileChunk)
-> SentRecipientReplica -> Map Int (Map Int FileChunk)
addRcvChunk Map Int (Map Int FileChunk)
forall k a. Map k a
M.empty [SentRecipientReplica]
rcvReplicas
sortChunks :: [FileChunk] -> [FileChunk]
sortChunks :: [FileChunk] -> [FileChunk]
sortChunks = (FileChunk -> FileChunk) -> [FileChunk] -> [FileChunk]
forall a b. (a -> b) -> [a] -> [b]
map FileChunk -> FileChunk
reverseReplicas ([FileChunk] -> [FileChunk])
-> ([FileChunk] -> [FileChunk]) -> [FileChunk] -> [FileChunk]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (FileChunk -> Int) -> [FileChunk] -> [FileChunk]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn (\FileChunk {Int
$sel:chunkNo:FileChunk :: FileChunk -> Int
chunkNo :: Int
chunkNo} -> Int
chunkNo)
reverseReplicas :: FileChunk -> FileChunk
reverseReplicas ch :: FileChunk
ch@FileChunk {[FileChunkReplica]
$sel:replicas:FileChunk :: FileChunk -> [FileChunkReplica]
replicas :: [FileChunkReplica]
replicas} = (FileChunk
ch :: FileChunk) {replicas = reverse replicas}
addRcvChunk :: Map Int (Map Int FileChunk) -> SentRecipientReplica -> Map Int (Map Int FileChunk)
addRcvChunk :: Map Int (Map Int FileChunk)
-> SentRecipientReplica -> Map Int (Map Int FileChunk)
addRcvChunk Map Int (Map Int FileChunk)
m SentRecipientReplica {Int
$sel:chunkNo:SentRecipientReplica :: SentRecipientReplica -> Int
chunkNo :: Int
chunkNo, XFTPServer
$sel:server:SentRecipientReplica :: SentRecipientReplica -> XFTPServer
server :: XFTPServer
server, Int
$sel:rcvNo:SentRecipientReplica :: SentRecipientReplica -> Int
rcvNo :: Int
rcvNo, ChunkReplicaId
$sel:replicaId:SentRecipientReplica :: SentRecipientReplica -> ChunkReplicaId
replicaId :: ChunkReplicaId
replicaId, APrivateAuthKey
$sel:replicaKey:SentRecipientReplica :: SentRecipientReplica -> APrivateAuthKey
replicaKey :: APrivateAuthKey
replicaKey, FileDigest
$sel:digest:SentRecipientReplica :: SentRecipientReplica -> FileDigest
digest :: FileDigest
digest, FileSize Word32
$sel:chunkSize:SentRecipientReplica :: SentRecipientReplica -> FileSize Word32
chunkSize :: FileSize Word32
chunkSize} =
(Maybe (Map Int FileChunk) -> Maybe (Map Int FileChunk))
-> Int
-> Map Int (Map Int FileChunk)
-> Map Int (Map Int FileChunk)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
M.alter (Map Int FileChunk -> Maybe (Map Int FileChunk)
forall a. a -> Maybe a
Just (Map Int FileChunk -> Maybe (Map Int FileChunk))
-> (Maybe (Map Int FileChunk) -> Map Int FileChunk)
-> Maybe (Map Int FileChunk)
-> Maybe (Map Int FileChunk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (Map Int FileChunk) -> Map Int FileChunk
addOrChangeRecipient) Int
rcvNo Map Int (Map Int FileChunk)
m
where
addOrChangeRecipient :: Maybe (Map Int FileChunk) -> Map Int FileChunk
addOrChangeRecipient :: Maybe (Map Int FileChunk) -> Map Int FileChunk
addOrChangeRecipient = \case
Just Map Int FileChunk
m' -> (Maybe FileChunk -> Maybe FileChunk)
-> Int -> Map Int FileChunk -> Map Int FileChunk
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
M.alter (FileChunk -> Maybe FileChunk
forall a. a -> Maybe a
Just (FileChunk -> Maybe FileChunk)
-> (Maybe FileChunk -> FileChunk)
-> Maybe FileChunk
-> Maybe FileChunk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe FileChunk -> FileChunk
addOrChangeChunk) Int
chunkNo Map Int FileChunk
m'
Maybe (Map Int FileChunk)
_ -> Int -> FileChunk -> Map Int FileChunk
forall k a. k -> a -> Map k a
M.singleton Int
chunkNo (FileChunk -> Map Int FileChunk) -> FileChunk -> Map Int FileChunk
forall a b. (a -> b) -> a -> b
$ FileChunk {Int
$sel:chunkNo:FileChunk :: Int
chunkNo :: Int
chunkNo, FileDigest
$sel:digest:FileChunk :: FileDigest
digest :: FileDigest
digest, FileSize Word32
$sel:chunkSize:FileChunk :: FileSize Word32
chunkSize :: FileSize Word32
chunkSize, $sel:replicas:FileChunk :: [FileChunkReplica]
replicas = [FileChunkReplica
replica']}
addOrChangeChunk :: Maybe FileChunk -> FileChunk
addOrChangeChunk :: Maybe FileChunk -> FileChunk
addOrChangeChunk = \case
Just ch :: FileChunk
ch@FileChunk {[FileChunkReplica]
$sel:replicas:FileChunk :: FileChunk -> [FileChunkReplica]
replicas :: [FileChunkReplica]
replicas} -> FileChunk
ch {replicas = replica' : replicas}
Maybe FileChunk
_ -> FileChunk {Int
$sel:chunkNo:FileChunk :: Int
chunkNo :: Int
chunkNo, FileDigest
$sel:digest:FileChunk :: FileDigest
digest :: FileDigest
digest, FileSize Word32
$sel:chunkSize:FileChunk :: FileSize Word32
chunkSize :: FileSize Word32
chunkSize, $sel:replicas:FileChunk :: [FileChunkReplica]
replicas = [FileChunkReplica
replica']}
replica' :: FileChunkReplica
replica' = FileChunkReplica {XFTPServer
$sel:server:FileChunkReplica :: XFTPServer
server :: XFTPServer
server, ChunkReplicaId
$sel:replicaId:FileChunkReplica :: ChunkReplicaId
replicaId :: ChunkReplicaId
replicaId, APrivateAuthKey
$sel:replicaKey:FileChunkReplica :: APrivateAuthKey
replicaKey :: APrivateAuthKey
replicaKey}
uploadedSize :: [SndFileChunk] -> Int64
uploadedSize :: [SndFileChunk] -> Int64
uploadedSize = (Int64 -> SndFileChunk -> Int64)
-> Int64 -> [SndFileChunk] -> Int64
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\Int64
sz SndFileChunk
ch -> Int64
sz Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ SndFileChunk -> Int64
forall {b}. Num b => SndFileChunk -> b
uploadedChunkSize SndFileChunk
ch) Int64
0
uploadedChunkSize :: SndFileChunk -> b
uploadedChunkSize SndFileChunk
ch
| SndFileChunk -> Bool
chunkUploaded SndFileChunk
ch = Word32 -> b
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SndFileChunk -> Word32
sndChunkSize SndFileChunk
ch)
| Bool
otherwise = b
0
totalSize :: [SndFileChunk] -> Int64
totalSize :: [SndFileChunk] -> Int64
totalSize = (Int64 -> SndFileChunk -> Int64)
-> Int64 -> [SndFileChunk] -> Int64
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\Int64
sz SndFileChunk
ch -> Int64
sz Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Word32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SndFileChunk -> Word32
sndChunkSize SndFileChunk
ch)) Int64
0
chunkUploaded :: SndFileChunk -> Bool
chunkUploaded :: SndFileChunk -> Bool
chunkUploaded SndFileChunk {[SndFileChunkReplica]
$sel:replicas:SndFileChunk :: SndFileChunk -> [SndFileChunkReplica]
replicas :: [SndFileChunkReplica]
replicas} =
(SndFileChunkReplica -> Bool) -> [SndFileChunkReplica] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (\SndFileChunkReplica {SndFileReplicaStatus
replicaStatus :: SndFileReplicaStatus
$sel:replicaStatus:SndFileChunkReplica :: SndFileChunkReplica -> SndFileReplicaStatus
replicaStatus} -> SndFileReplicaStatus
replicaStatus SndFileReplicaStatus -> SndFileReplicaStatus -> Bool
forall a. Eq a => a -> a -> Bool
== SndFileReplicaStatus
SFRSUploaded) [SndFileChunkReplica]
replicas
deleteSndFileInternal :: AgentClient -> SndFileId -> AM' ()
deleteSndFileInternal :: AgentClient -> AEntityId -> ReaderT Env IO ()
deleteSndFileInternal AgentClient
c AEntityId
sndFileEntityId = AgentClient -> [AEntityId] -> ReaderT Env IO ()
deleteSndFilesInternal AgentClient
c [AEntityId
sndFileEntityId]
deleteSndFilesInternal :: AgentClient -> [SndFileId] -> AM' ()
deleteSndFilesInternal :: AgentClient -> [AEntityId] -> ReaderT Env IO ()
deleteSndFilesInternal AgentClient
c [AEntityId]
sndFileEntityIds = do
[SndFile]
sndFiles <- [Either AgentErrorType SndFile] -> [SndFile]
forall a b. [Either a b] -> [b]
rights ([Either AgentErrorType SndFile] -> [SndFile])
-> ReaderT Env IO [Either AgentErrorType SndFile]
-> ReaderT Env IO [SndFile]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AgentClient
-> (Connection -> [IO (Either AgentErrorType SndFile)])
-> ReaderT Env IO [Either AgentErrorType SndFile]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO (Either AgentErrorType a)))
-> AM' (t (Either AgentErrorType a))
withStoreBatch AgentClient
c (\Connection
db -> (AEntityId -> IO (Either AgentErrorType SndFile))
-> [AEntityId] -> [IO (Either AgentErrorType SndFile)]
forall a b. (a -> b) -> [a] -> [b]
map ((Either StoreError SndFile -> Either AgentErrorType SndFile)
-> IO (Either StoreError SndFile)
-> IO (Either AgentErrorType SndFile)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((StoreError -> AgentErrorType)
-> Either StoreError SndFile -> Either AgentErrorType SndFile
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first StoreError -> AgentErrorType
storeError) (IO (Either StoreError SndFile)
-> IO (Either AgentErrorType SndFile))
-> (AEntityId -> IO (Either StoreError SndFile))
-> AEntityId
-> IO (Either AgentErrorType SndFile)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> AEntityId -> IO (Either StoreError SndFile)
getSndFileByEntityId Connection
db) [AEntityId]
sndFileEntityIds)
let ([SndFile]
toDelete, [SndFile]
toMarkDeleted) = (SndFile -> Bool) -> [SndFile] -> ([SndFile], [SndFile])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition SndFile -> Bool
fileComplete [SndFile]
sndFiles
String
workPath <- ReaderT Env IO String
getXFTPWorkPath
IO () -> ReaderT Env IO ()
forall a. IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ReaderT Env IO ())
-> ((SndFile -> IO ()) -> IO ())
-> (SndFile -> IO ())
-> ReaderT Env IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [SndFile] -> (SndFile -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [SndFile]
toDelete ((SndFile -> IO ()) -> ReaderT Env IO ())
-> (SndFile -> IO ()) -> ReaderT Env IO ()
forall a b. (a -> b) -> a -> b
$ \SndFile {Maybe String
$sel:prefixPath:SndFile :: SndFile -> Maybe String
prefixPath :: Maybe String
prefixPath} ->
(String -> IO ()) -> Maybe String -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
removePath (String -> IO ()) -> (String -> String) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String
workPath String -> String -> String
</>)) Maybe String
prefixPath IO () -> IO () -> IO ()
forall a. IO a -> IO a -> IO a
`catchAll_` () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(Connection -> Int64 -> IO ()) -> [SndFile] -> ReaderT Env IO ()
forall a.
(Connection -> Int64 -> IO a) -> [SndFile] -> ReaderT Env IO ()
batchFiles_ Connection -> Int64 -> IO ()
deleteSndFile' [SndFile]
toDelete
(Connection -> Int64 -> IO ()) -> [SndFile] -> ReaderT Env IO ()
forall a.
(Connection -> Int64 -> IO a) -> [SndFile] -> ReaderT Env IO ()
batchFiles_ Connection -> Int64 -> IO ()
updateSndFileDeleted [SndFile]
toMarkDeleted
where
fileComplete :: SndFile -> Bool
fileComplete SndFile {SndFileStatus
$sel:status:SndFile :: SndFile -> SndFileStatus
status :: SndFileStatus
status} = SndFileStatus
status SndFileStatus -> SndFileStatus -> Bool
forall a. Eq a => a -> a -> Bool
== SndFileStatus
SFSComplete Bool -> Bool -> Bool
|| SndFileStatus
status SndFileStatus -> SndFileStatus -> Bool
forall a. Eq a => a -> a -> Bool
== SndFileStatus
SFSError
batchFiles_ :: (DB.Connection -> DBSndFileId -> IO a) -> [SndFile] -> AM' ()
batchFiles_ :: forall a.
(Connection -> Int64 -> IO a) -> [SndFile] -> ReaderT Env IO ()
batchFiles_ Connection -> Int64 -> IO a
f [SndFile]
sndFiles = ReaderT Env IO [Either AgentErrorType a] -> ReaderT Env IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ReaderT Env IO [Either AgentErrorType a] -> ReaderT Env IO ())
-> ReaderT Env IO [Either AgentErrorType a] -> ReaderT Env IO ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> (Connection -> [IO a])
-> ReaderT Env IO [Either AgentErrorType a]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c ((Connection -> [IO a])
-> ReaderT Env IO [Either AgentErrorType a])
-> (Connection -> [IO a])
-> ReaderT Env IO [Either AgentErrorType a]
forall a b. (a -> b) -> a -> b
$ \Connection
db -> (SndFile -> IO a) -> [SndFile] -> [IO a]
forall a b. (a -> b) -> [a] -> [b]
map (\SndFile {Int64
$sel:sndFileId:SndFile :: SndFile -> Int64
sndFileId :: Int64
sndFileId} -> Connection -> Int64 -> IO a
f Connection
db Int64
sndFileId) [SndFile]
sndFiles
deleteSndFileRemote :: AgentClient -> UserId -> SndFileId -> ValidFileDescription 'FSender -> AM' ()
deleteSndFileRemote :: AgentClient
-> Int64
-> AEntityId
-> ValidFileDescription 'FSender
-> ReaderT Env IO ()
deleteSndFileRemote AgentClient
c Int64
userId AEntityId
sndFileEntityId ValidFileDescription 'FSender
sfd = AgentClient
-> Int64
-> [(AEntityId, ValidFileDescription 'FSender)]
-> ReaderT Env IO ()
deleteSndFilesRemote AgentClient
c Int64
userId [(AEntityId
sndFileEntityId, ValidFileDescription 'FSender
sfd)]
deleteSndFilesRemote :: AgentClient -> UserId -> [(SndFileId, ValidFileDescription 'FSender)] -> AM' ()
deleteSndFilesRemote :: AgentClient
-> Int64
-> [(AEntityId, ValidFileDescription 'FSender)]
-> ReaderT Env IO ()
deleteSndFilesRemote AgentClient
c Int64
userId [(AEntityId, ValidFileDescription 'FSender)]
sndFileIdsDescrs = do
AgentClient -> [AEntityId] -> ReaderT Env IO ()
deleteSndFilesInternal AgentClient
c (((AEntityId, ValidFileDescription 'FSender) -> AEntityId)
-> [(AEntityId, ValidFileDescription 'FSender)] -> [AEntityId]
forall a b. (a -> b) -> [a] -> [b]
map (AEntityId, ValidFileDescription 'FSender) -> AEntityId
forall a b. (a, b) -> a
fst [(AEntityId, ValidFileDescription 'FSender)]
sndFileIdsDescrs) ReaderT Env IO ()
-> (SomeException -> ReaderT Env IO ()) -> ReaderT Env IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> (SomeException -> m a) -> m a
`E.catchAny` (AgentClient -> AEntityId -> AEvent 'AESndFile -> ReaderT Env IO ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c AEntityId
"" (AEvent 'AESndFile -> ReaderT Env IO ())
-> (SomeException -> AEvent 'AESndFile)
-> SomeException
-> ReaderT Env IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AgentErrorType -> AEvent 'AESndFile
SFERR (AgentErrorType -> AEvent 'AESndFile)
-> (SomeException -> AgentErrorType)
-> SomeException
-> AEvent 'AESndFile
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> AgentErrorType
INTERNAL (String -> AgentErrorType)
-> (SomeException -> String) -> SomeException -> AgentErrorType
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> String
forall a. Show a => a -> String
show)
let rs :: [(FileChunkReplica, FileDigest)]
rs = ((AEntityId, ValidFileDescription 'FSender)
-> [(FileChunkReplica, FileDigest)])
-> [(AEntityId, ValidFileDescription 'FSender)]
-> [(FileChunkReplica, FileDigest)]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap ((FileChunk -> Maybe (FileChunkReplica, FileDigest))
-> [FileChunk] -> [(FileChunkReplica, FileDigest)]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe FileChunk -> Maybe (FileChunkReplica, FileDigest)
chunkReplica ([FileChunk] -> [(FileChunkReplica, FileDigest)])
-> ((AEntityId, ValidFileDescription 'FSender) -> [FileChunk])
-> (AEntityId, ValidFileDescription 'FSender)
-> [(FileChunkReplica, FileDigest)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ValidFileDescription 'FSender -> [FileChunk]
forall {p :: FileParty}. ValidFileDescription p -> [FileChunk]
fdChunks (ValidFileDescription 'FSender -> [FileChunk])
-> ((AEntityId, ValidFileDescription 'FSender)
-> ValidFileDescription 'FSender)
-> (AEntityId, ValidFileDescription 'FSender)
-> [FileChunk]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (AEntityId, ValidFileDescription 'FSender)
-> ValidFileDescription 'FSender
forall a b. (a, b) -> b
snd) [(AEntityId, ValidFileDescription 'FSender)]
sndFileIdsDescrs
ReaderT Env IO [Either AgentErrorType ()] -> ReaderT Env IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ReaderT Env IO [Either AgentErrorType ()] -> ReaderT Env IO ())
-> ReaderT Env IO [Either AgentErrorType ()] -> ReaderT Env IO ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> (Connection -> [IO ()])
-> ReaderT Env IO [Either AgentErrorType ()]
forall (t :: * -> *) a.
Traversable t =>
AgentClient
-> (Connection -> t (IO a)) -> AM' (t (Either AgentErrorType a))
withStoreBatch' AgentClient
c (\Connection
db -> ((FileChunkReplica, FileDigest) -> IO ())
-> [(FileChunkReplica, FileDigest)] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map ((FileChunkReplica -> FileDigest -> IO ())
-> (FileChunkReplica, FileDigest) -> IO ()
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry ((FileChunkReplica -> FileDigest -> IO ())
-> (FileChunkReplica, FileDigest) -> IO ())
-> (FileChunkReplica -> FileDigest -> IO ())
-> (FileChunkReplica, FileDigest)
-> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> Int64 -> FileChunkReplica -> FileDigest -> IO ()
createDeletedSndChunkReplica Connection
db Int64
userId) [(FileChunkReplica, FileDigest)]
rs)
let servers :: Set XFTPServer
servers = [XFTPServer] -> Set XFTPServer
forall a. Ord a => [a] -> Set a
S.fromList ([XFTPServer] -> Set XFTPServer) -> [XFTPServer] -> Set XFTPServer
forall a b. (a -> b) -> a -> b
$ ((FileChunkReplica, FileDigest) -> XFTPServer)
-> [(FileChunkReplica, FileDigest)] -> [XFTPServer]
forall a b. (a -> b) -> [a] -> [b]
map (\(FileChunkReplica {XFTPServer
$sel:server:FileChunkReplica :: FileChunkReplica -> XFTPServer
server :: XFTPServer
server}, FileDigest
_) -> XFTPServer
server) [(FileChunkReplica, FileDigest)]
rs
(XFTPServer -> AM' Worker) -> Set XFTPServer -> ReaderT Env IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Bool -> AgentClient -> XFTPServer -> AM' Worker
getXFTPDelWorker Bool
True AgentClient
c) Set XFTPServer
servers
where
fdChunks :: ValidFileDescription p -> [FileChunk]
fdChunks (ValidFileDescription FileDescription {[FileChunk]
$sel:chunks:FileDescription :: forall (p :: FileParty). FileDescription p -> [FileChunk]
chunks :: [FileChunk]
chunks}) = [FileChunk]
chunks
chunkReplica :: FileChunk -> Maybe (FileChunkReplica, FileDigest)
chunkReplica :: FileChunk -> Maybe (FileChunkReplica, FileDigest)
chunkReplica = \case
FileChunk {FileDigest
$sel:digest:FileChunk :: FileChunk -> FileDigest
digest :: FileDigest
digest, $sel:replicas:FileChunk :: FileChunk -> [FileChunkReplica]
replicas = FileChunkReplica
replica : [FileChunkReplica]
_} -> (FileChunkReplica, FileDigest)
-> Maybe (FileChunkReplica, FileDigest)
forall a. a -> Maybe a
Just (FileChunkReplica
replica, FileDigest
digest)
FileChunk
_ -> Maybe (FileChunkReplica, FileDigest)
forall a. Maybe a
Nothing
resumeXFTPDelWork :: AgentClient -> XFTPServer -> AM' ()
resumeXFTPDelWork :: AgentClient -> XFTPServer -> ReaderT Env IO ()
resumeXFTPDelWork = AM' Worker -> ReaderT Env IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (AM' Worker -> ReaderT Env IO ())
-> (AgentClient -> XFTPServer -> AM' Worker)
-> AgentClient
-> XFTPServer
-> ReaderT Env IO ()
forall c d a b. (c -> d) -> (a -> b -> c) -> a -> b -> d
.: Bool -> AgentClient -> XFTPServer -> AM' Worker
getXFTPDelWorker Bool
False
getXFTPDelWorker :: Bool -> AgentClient -> XFTPServer -> AM' Worker
getXFTPDelWorker :: Bool -> AgentClient -> XFTPServer -> AM' Worker
getXFTPDelWorker Bool
hasWork AgentClient
c XFTPServer
server = do
TVar (Map XFTPServer Worker)
ws <- (Env -> TVar (Map XFTPServer Worker))
-> ReaderT Env IO (TVar (Map XFTPServer Worker))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TVar (Map XFTPServer Worker))
-> ReaderT Env IO (TVar (Map XFTPServer Worker)))
-> (Env -> TVar (Map XFTPServer Worker))
-> ReaderT Env IO (TVar (Map XFTPServer Worker))
forall a b. (a -> b) -> a -> b
$ XFTPAgent -> TVar (Map XFTPServer Worker)
xftpDelWorkers (XFTPAgent -> TVar (Map XFTPServer Worker))
-> (Env -> XFTPAgent) -> Env -> TVar (Map XFTPServer Worker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> XFTPAgent
xftpAgent
String
-> Bool
-> AgentClient
-> XFTPServer
-> TVar (Map XFTPServer Worker)
-> (Worker -> AM ())
-> AM' Worker
forall k e (m :: * -> *).
(Ord k, Show k, AnyError e, MonadUnliftIO m) =>
String
-> Bool
-> AgentClient
-> k
-> TMap k Worker
-> (Worker -> ExceptT e m ())
-> m Worker
getAgentWorker String
"xftp_del" Bool
hasWork AgentClient
c XFTPServer
server TVar (Map XFTPServer Worker)
ws ((Worker -> AM ()) -> AM' Worker)
-> (Worker -> AM ()) -> AM' Worker
forall a b. (a -> b) -> a -> b
$ AgentClient -> XFTPServer -> Worker -> AM ()
runXFTPDelWorker AgentClient
c XFTPServer
server
runXFTPDelWorker :: AgentClient -> XFTPServer -> Worker -> AM ()
runXFTPDelWorker :: AgentClient -> XFTPServer -> Worker -> AM ()
runXFTPDelWorker AgentClient
c XFTPServer
srv Worker {TMVar ()
$sel:doWork:Worker :: Worker -> TMVar ()
doWork :: TMVar ()
doWork} = do
AgentConfig
cfg <- (Env -> AgentConfig)
-> ExceptT AgentErrorType (ReaderT Env IO) AgentConfig
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> AgentConfig
config
AM () -> AM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ do
ReaderT Env IO () -> AM ()
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT AgentErrorType m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ReaderT Env IO () -> AM ()) -> ReaderT Env IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> ReaderT Env IO ()
forall (m :: * -> *). MonadIO m => TMVar () -> m ()
waitForWork TMVar ()
doWork
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
assertAgentForeground AgentClient
c
AgentConfig -> AM ()
runXFTPOperation AgentConfig
cfg
where
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation AgentConfig {NominalDiffTime
$sel:rcvFilesTTL:AgentConfig :: AgentConfig -> NominalDiffTime
rcvFilesTTL :: NominalDiffTime
rcvFilesTTL, $sel:reconnectInterval:AgentConfig :: AgentConfig -> RetryInterval
reconnectInterval = RetryInterval
ri, Int
$sel:xftpConsecutiveRetries:AgentConfig :: AgentConfig -> Int
xftpConsecutiveRetries :: Int
xftpConsecutiveRetries} = do
AgentClient
-> TMVar ()
-> (Connection
-> IO (Either StoreError (Maybe DeletedSndChunkReplica)))
-> (DeletedSndChunkReplica -> AM ())
-> AM ()
forall a.
AgentClient
-> TMVar ()
-> (Connection -> IO (Either StoreError (Maybe a)))
-> (a -> AM ())
-> AM ()
withWork AgentClient
c TMVar ()
doWork (\Connection
db -> Connection
-> XFTPServer
-> NominalDiffTime
-> IO (Either StoreError (Maybe DeletedSndChunkReplica))
getNextDeletedSndChunkReplica Connection
db XFTPServer
srv NominalDiffTime
rcvFilesTTL) DeletedSndChunkReplica -> AM ()
processDeletedReplica
where
processDeletedReplica :: DeletedSndChunkReplica -> AM ()
processDeletedReplica replica :: DeletedSndChunkReplica
replica@DeletedSndChunkReplica {Int64
deletedSndChunkReplicaId :: Int64
$sel:deletedSndChunkReplicaId:DeletedSndChunkReplica :: DeletedSndChunkReplica -> Int64
deletedSndChunkReplicaId, Int64
userId :: Int64
$sel:userId:DeletedSndChunkReplica :: DeletedSndChunkReplica -> Int64
userId, XFTPServer
server :: XFTPServer
$sel:server:DeletedSndChunkReplica :: DeletedSndChunkReplica -> XFTPServer
server, FileDigest
chunkDigest :: FileDigest
$sel:chunkDigest:DeletedSndChunkReplica :: DeletedSndChunkReplica -> FileDigest
chunkDigest, Maybe Int64
delay :: Maybe Int64
$sel:delay:DeletedSndChunkReplica :: DeletedSndChunkReplica -> Maybe Int64
delay} = do
let ri' :: RetryInterval
ri' = RetryInterval
-> (Int64 -> RetryInterval) -> Maybe Int64 -> RetryInterval
forall b a. b -> (a -> b) -> Maybe a -> b
maybe RetryInterval
ri (\Int64
d -> RetryInterval
ri {initialInterval = d, increaseAfter = 0}) Maybe Int64
delay
Int -> RetryInterval -> (Int64 -> AM () -> AM ()) -> AM ()
forall (m :: * -> *).
MonadIO m =>
Int -> RetryInterval -> (Int64 -> m () -> m ()) -> m ()
withRetryIntervalLimit Int
xftpConsecutiveRetries RetryInterval
ri' ((Int64 -> AM () -> AM ()) -> AM ())
-> (Int64 -> AM () -> AM ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Int64
delay' AM ()
loop -> do
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitWhileSuspended AgentClient
c
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
waitForUserNetwork AgentClient
c
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ()) -> STM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> XFTPServer
-> (AgentXFTPServerStats -> TVar Int)
-> STM ()
incXFTPServerStat AgentClient
c Int64
userId XFTPServer
srv AgentXFTPServerStats -> TVar Int
deleteAttempts
AM ()
deleteChunkReplica
AM () -> (AgentErrorType -> AM ()) -> AM ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
`catchAllErrors` \AgentErrorType
e -> Text -> AM () -> AM () -> AgentErrorType -> AM ()
forall a. Text -> AM a -> AM a -> AgentErrorType -> AM a
retryOnError Text
"XFTP del worker" (AM () -> AgentErrorType -> Int64 -> AM ()
forall {b}.
ExceptT AgentErrorType (ReaderT Env IO) b
-> AgentErrorType
-> Int64
-> ExceptT AgentErrorType (ReaderT Env IO) b
retryLoop AM ()
loop AgentErrorType
e Int64
delay') (AgentErrorType -> AM ()
retryDone AgentErrorType
e) AgentErrorType
e
where
retryLoop :: ExceptT AgentErrorType (ReaderT Env IO) b
-> AgentErrorType
-> Int64
-> ExceptT AgentErrorType (ReaderT Env IO) b
retryLoop ExceptT AgentErrorType (ReaderT Env IO) b
loop AgentErrorType
e Int64
replicaDelay = do
(AM () -> (AgentErrorType -> AM ()) -> AM ())
-> (AgentErrorType -> AM ()) -> AM () -> AM ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip AM () -> (AgentErrorType -> AM ()) -> AM ()
forall e (m :: * -> *) a.
(AnyError e, MonadUnliftIO m) =>
ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
catchAllErrors (\AgentErrorType
_ -> () -> AM ()
forall a. a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ do
Bool -> AM () -> AM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (AgentErrorType -> Bool
serverHostError AgentErrorType
e) (AM () -> AM ()) -> AM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> AEntityId -> AEvent 'AESndFile -> AM ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c AEntityId
"" (AEvent 'AESndFile -> AM ()) -> AEvent 'AESndFile -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AEvent 'AESndFile
SFWARN AgentErrorType
e
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> Int64 -> XFTPServer -> FileDigest -> IO ()
closeXFTPServerClient AgentClient
c Int64
userId XFTPServer
server FileDigest
chunkDigest
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> Int64 -> IO ()
updateDeletedSndChunkReplicaDelay Connection
db Int64
deletedSndChunkReplicaId Int64
replicaDelay
IO () -> AM ()
forall a. IO a -> ExceptT AgentErrorType (ReaderT Env IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AM ()) -> IO () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient -> IO ()
assertAgentForeground AgentClient
c
ExceptT AgentErrorType (ReaderT Env IO) b
loop
retryDone :: AgentErrorType -> AM ()
retryDone AgentErrorType
e = do
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ()) -> STM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> XFTPServer
-> (AgentXFTPServerStats -> TVar Int)
-> STM ()
incXFTPServerStat AgentClient
c Int64
userId XFTPServer
srv AgentXFTPServerStats -> TVar Int
deleteErrs
AgentClient -> Int64 -> AgentErrorType -> AM ()
delWorkerInternalError AgentClient
c Int64
deletedSndChunkReplicaId AgentErrorType
e
deleteChunkReplica :: AM ()
deleteChunkReplica = do
AgentClient -> Int64 -> DeletedSndChunkReplica -> AM ()
agentXFTPDeleteChunk AgentClient
c Int64
userId DeletedSndChunkReplica
replica
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> IO ()
deleteDeletedSndChunkReplica Connection
db Int64
deletedSndChunkReplicaId
STM () -> AM ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> AM ()) -> STM () -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentClient
-> Int64
-> XFTPServer
-> (AgentXFTPServerStats -> TVar Int)
-> STM ()
incXFTPServerStat AgentClient
c Int64
userId XFTPServer
srv AgentXFTPServerStats -> TVar Int
deletions
delWorkerInternalError :: AgentClient -> Int64 -> AgentErrorType -> AM ()
delWorkerInternalError :: AgentClient -> Int64 -> AgentErrorType -> AM ()
delWorkerInternalError AgentClient
c Int64
deletedSndChunkReplicaId AgentErrorType
e = do
AgentClient -> (Connection -> IO ()) -> AM ()
forall a. AgentClient -> (Connection -> IO a) -> AM a
withStore' AgentClient
c ((Connection -> IO ()) -> AM ()) -> (Connection -> IO ()) -> AM ()
forall a b. (a -> b) -> a -> b
$ \Connection
db -> Connection -> Int64 -> IO ()
deleteDeletedSndChunkReplica Connection
db Int64
deletedSndChunkReplicaId
AgentClient -> AEntityId -> AEvent 'AESndFile -> AM ()
forall (m :: * -> *) (e :: AEntity).
(MonadIO m, AEntityI e) =>
AgentClient -> AEntityId -> AEvent e -> m ()
notify AgentClient
c AEntityId
"" (AEvent 'AESndFile -> AM ()) -> AEvent 'AESndFile -> AM ()
forall a b. (a -> b) -> a -> b
$ AgentErrorType -> AEvent 'AESndFile
SFERR AgentErrorType
e
assertAgentForeground :: AgentClient -> IO ()
assertAgentForeground :: AgentClient -> IO ()
assertAgentForeground AgentClient
c = do
AgentClient -> IO ()
throwWhenInactive AgentClient
c
AgentClient -> IO ()
waitUntilForeground AgentClient
c