{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
module Simplex.Chat.Store.Delivery
( createMsgDeliveryTask,
deleteGroupDeliveryTasks,
deleteGroupDeliveryJobs,
getPendingDeliveryTaskScopes,
getNextDeliveryTask,
getNextDeliveryTasks,
updateDeliveryTaskStatus,
setDeliveryTaskErrStatus,
deleteDoneDeliveryTasks,
createMsgDeliveryJob,
getPendingDeliveryJobScopes,
getNextDeliveryJob,
updateDeliveryJobStatus,
setDeliveryJobErrStatus,
getGroupMembersByCursor,
updateDeliveryJobCursor,
deleteDoneDeliveryJobs,
)
where
import Data.ByteString.Char8 (ByteString)
import Data.Int (Int64)
import Data.Text (Text)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Simplex.Chat.Delivery
import Simplex.Chat.Protocol hiding (Binary)
import Simplex.Chat.Store.Shared
import Simplex.Chat.Types
import Simplex.Messaging.Agent.Store.AgentStore (getWorkItem, getWorkItems, maybeFirstRow)
import Simplex.Messaging.Agent.Store.DB (Binary (..), BoolInt (..))
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Util (firstRow')
#if defined(dbPostgres)
import Database.PostgreSQL.Simple (In (..), Only (..), (:.) (..))
import Database.PostgreSQL.Simple.SqlQQ (sql)
#else
import Control.Monad.Except
import Data.Either (rights)
import Database.SQLite.Simple (Only (..), (:.) (..))
import Database.SQLite.Simple.QQ (sql)
import Simplex.Chat.Store.Groups (getGroupMemberById)
#endif
type DeliveryJobScopeRow = (DeliveryWorkerScope, Maybe DeliveryJobSpecTag, Maybe BoolInt, Maybe GroupMemberId)
jobScopeRow_ :: DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ :: DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ = \case
DJSGroup {DeliveryJobSpec
jobSpec :: DeliveryJobSpec
jobSpec :: DeliveryJobScope -> DeliveryJobSpec
jobSpec} -> case DeliveryJobSpec
jobSpec of
DJDeliveryJob {Bool
includePending :: Bool
includePending :: DeliveryJobSpec -> Bool
includePending} -> (DeliveryWorkerScope
DWSGroup, DeliveryJobSpecTag -> Maybe DeliveryJobSpecTag
forall a. a -> Maybe a
Just DeliveryJobSpecTag
DJSTDeliveryJob, BoolInt -> Maybe BoolInt
forall a. a -> Maybe a
Just (Bool -> BoolInt
BI Bool
includePending), Maybe GroupMemberId
forall a. Maybe a
Nothing)
DeliveryJobSpec
DJRelayRemoved -> (DeliveryWorkerScope
DWSGroup, DeliveryJobSpecTag -> Maybe DeliveryJobSpecTag
forall a. a -> Maybe a
Just DeliveryJobSpecTag
DJSTRelayRemoved, Maybe BoolInt
forall a. Maybe a
Nothing, Maybe GroupMemberId
forall a. Maybe a
Nothing)
DJSMemberSupport {GroupMemberId
supportGMId :: GroupMemberId
supportGMId :: DeliveryJobScope -> GroupMemberId
supportGMId} -> (DeliveryWorkerScope
DWSMemberSupport, Maybe DeliveryJobSpecTag
forall a. Maybe a
Nothing, Maybe BoolInt
forall a. Maybe a
Nothing, GroupMemberId -> Maybe GroupMemberId
forall a. a -> Maybe a
Just GroupMemberId
supportGMId)
toJobScope_ :: DeliveryJobScopeRow -> Maybe DeliveryJobScope
toJobScope_ :: DeliveryJobScopeRow -> Maybe DeliveryJobScope
toJobScope_ = \case
(DeliveryWorkerScope
DWSGroup, Just DeliveryJobSpecTag
DJSTDeliveryJob, Just (BI Bool
includePending), Maybe GroupMemberId
Nothing) -> DeliveryJobScope -> Maybe DeliveryJobScope
forall a. a -> Maybe a
Just (DeliveryJobScope -> Maybe DeliveryJobScope)
-> DeliveryJobScope -> Maybe DeliveryJobScope
forall a b. (a -> b) -> a -> b
$ DJSGroup {jobSpec :: DeliveryJobSpec
jobSpec = DJDeliveryJob {Bool
includePending :: Bool
includePending :: Bool
includePending}}
(DeliveryWorkerScope
DWSGroup, Just DeliveryJobSpecTag
DJSTRelayRemoved, Maybe BoolInt
Nothing, Maybe GroupMemberId
Nothing) -> DeliveryJobScope -> Maybe DeliveryJobScope
forall a. a -> Maybe a
Just (DeliveryJobScope -> Maybe DeliveryJobScope)
-> DeliveryJobScope -> Maybe DeliveryJobScope
forall a b. (a -> b) -> a -> b
$ DJSGroup {jobSpec :: DeliveryJobSpec
jobSpec = DeliveryJobSpec
DJRelayRemoved}
(DeliveryWorkerScope
DWSMemberSupport, Maybe DeliveryJobSpecTag
Nothing, Maybe BoolInt
Nothing, Just GroupMemberId
supportGMId) -> DeliveryJobScope -> Maybe DeliveryJobScope
forall a. a -> Maybe a
Just (DeliveryJobScope -> Maybe DeliveryJobScope)
-> DeliveryJobScope -> Maybe DeliveryJobScope
forall a b. (a -> b) -> a -> b
$ DJSMemberSupport {GroupMemberId
supportGMId :: GroupMemberId
supportGMId :: GroupMemberId
supportGMId}
DeliveryJobScopeRow
_ -> Maybe DeliveryJobScope
forall a. Maybe a
Nothing
createMsgDeliveryTask :: DB.Connection -> GroupInfo -> GroupMember -> NewMessageDeliveryTask -> IO ()
createMsgDeliveryTask :: Connection
-> GroupInfo -> GroupMember -> NewMessageDeliveryTask -> IO ()
createMsgDeliveryTask Connection
db GroupInfo
gInfo GroupMember
sender NewMessageDeliveryTask
newTask = do
UTCTime
currentTs <- IO UTCTime
getCurrentTime
Connection
-> Query
-> (Only GroupMemberId
:. (DeliveryJobScopeRow
:. (GroupMemberId, GroupMemberId, BoolInt, DeliveryTaskStatus,
UTCTime, UTCTime)))
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
Connection
db
[sql|
INSERT INTO delivery_tasks (
group_id,
worker_scope, job_scope_spec_tag, job_scope_include_pending, job_scope_support_gm_id,
sender_group_member_id, message_id, message_from_channel, task_status,
created_at, updated_at
) VALUES (?,?,?,?,?,?,?,?,?,?,?)
|]
((GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId) Only GroupMemberId
-> (DeliveryJobScopeRow
:. (GroupMemberId, GroupMemberId, BoolInt, DeliveryTaskStatus,
UTCTime, UTCTime))
-> Only GroupMemberId
:. (DeliveryJobScopeRow
:. (GroupMemberId, GroupMemberId, BoolInt, DeliveryTaskStatus,
UTCTime, UTCTime))
forall h t. h -> t -> h :. t
:. DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ DeliveryJobScope
jobScope DeliveryJobScopeRow
-> (GroupMemberId, GroupMemberId, BoolInt, DeliveryTaskStatus,
UTCTime, UTCTime)
-> DeliveryJobScopeRow
:. (GroupMemberId, GroupMemberId, BoolInt, DeliveryTaskStatus,
UTCTime, UTCTime)
forall h t. h -> t -> h :. t
:. (GroupMember -> GroupMemberId
groupMemberId' GroupMember
sender, GroupMemberId
messageId, Bool -> BoolInt
BI Bool
messageFromChannel, DeliveryTaskStatus
DTSNew, UTCTime
currentTs, UTCTime
currentTs))
where
GroupInfo {GroupMemberId
groupId :: GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId} = GroupInfo
gInfo
NewMessageDeliveryTask {GroupMemberId
messageId :: GroupMemberId
messageId :: NewMessageDeliveryTask -> GroupMemberId
messageId, DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope :: NewMessageDeliveryTask -> DeliveryJobScope
jobScope, Bool
messageFromChannel :: Bool
messageFromChannel :: NewMessageDeliveryTask -> Bool
messageFromChannel} = NewMessageDeliveryTask
newTask
deleteGroupDeliveryTasks :: DB.Connection -> GroupInfo -> IO ()
deleteGroupDeliveryTasks :: Connection -> GroupInfo -> IO ()
deleteGroupDeliveryTasks Connection
db GroupInfo {GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId :: GroupMemberId
groupId} =
Connection -> Query -> Only GroupMemberId -> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute Connection
db Query
"DELETE FROM delivery_tasks WHERE group_id = ?" (GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId)
deleteGroupDeliveryJobs :: DB.Connection -> GroupInfo -> IO ()
deleteGroupDeliveryJobs :: Connection -> GroupInfo -> IO ()
deleteGroupDeliveryJobs Connection
db GroupInfo {GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId :: GroupMemberId
groupId} =
Connection -> Query -> Only GroupMemberId -> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute Connection
db Query
"DELETE FROM delivery_jobs WHERE group_id = ?" (GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId)
getPendingDeliveryTaskScopes :: DB.Connection -> IO [DeliveryWorkerKey]
getPendingDeliveryTaskScopes :: Connection -> IO [DeliveryWorkerKey]
getPendingDeliveryTaskScopes Connection
db =
Connection
-> Query -> Only DeliveryTaskStatus -> IO [DeliveryWorkerKey]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
Connection
db
[sql|
SELECT DISTINCT group_id, worker_scope
FROM delivery_tasks
WHERE failed = 0 AND task_status = ?
|]
(DeliveryTaskStatus -> Only DeliveryTaskStatus
forall a. a -> Only a
Only DeliveryTaskStatus
DTSNew)
getNextDeliveryTask :: DB.Connection -> DeliveryWorkerKey -> IO (Either StoreError (Maybe MessageDeliveryTask))
getNextDeliveryTask :: Connection
-> DeliveryWorkerKey
-> IO (Either StoreError (Maybe MessageDeliveryTask))
getNextDeliveryTask Connection
db DeliveryWorkerKey
deliveryKey = do
String
-> IO (Maybe GroupMemberId)
-> (GroupMemberId -> IO (Either StoreError MessageDeliveryTask))
-> (GroupMemberId -> IO ())
-> IO (Either StoreError (Maybe MessageDeliveryTask))
forall i e a.
(Show i, AnyStoreError e) =>
String
-> IO (Maybe i)
-> (i -> IO (Either e a))
-> (i -> IO ())
-> IO (Either e (Maybe a))
getWorkItem String
"delivery task" IO (Maybe GroupMemberId)
getTaskId (Connection
-> GroupMemberId -> IO (Either StoreError MessageDeliveryTask)
getMsgDeliveryTask_ Connection
db) (Connection -> GroupMemberId -> IO ()
markDeliveryTaskFailed_ Connection
db)
where
(GroupMemberId
groupId, DeliveryWorkerScope
workerScope) = DeliveryWorkerKey
deliveryKey
getTaskId :: IO (Maybe Int64)
getTaskId :: IO (Maybe GroupMemberId)
getTaskId =
(Only GroupMemberId -> GroupMemberId)
-> IO [Only GroupMemberId] -> IO (Maybe GroupMemberId)
forall (f :: * -> *) a b.
Functor f =>
(a -> b) -> f [a] -> f (Maybe b)
maybeFirstRow Only GroupMemberId -> GroupMemberId
forall a. Only a -> a
fromOnly (IO [Only GroupMemberId] -> IO (Maybe GroupMemberId))
-> IO [Only GroupMemberId] -> IO (Maybe GroupMemberId)
forall a b. (a -> b) -> a -> b
$
Connection
-> Query
-> (GroupMemberId, DeliveryWorkerScope, DeliveryTaskStatus)
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
Connection
db
[sql|
SELECT delivery_task_id
FROM delivery_tasks
WHERE group_id = ? AND worker_scope = ?
AND failed = 0 AND task_status = ?
ORDER BY delivery_task_id ASC
LIMIT 1
|]
(GroupMemberId
groupId, DeliveryWorkerScope
workerScope, DeliveryTaskStatus
DTSNew)
type MessageDeliveryTaskRow = (Only Int64) :. DeliveryJobScopeRow :. (GroupMemberId, MemberId, ContactName, UTCTime, ChatMessage 'Json, BoolInt)
getMsgDeliveryTask_ :: DB.Connection -> Int64 -> IO (Either StoreError MessageDeliveryTask)
getMsgDeliveryTask_ :: Connection
-> GroupMemberId -> IO (Either StoreError MessageDeliveryTask)
getMsgDeliveryTask_ Connection
db GroupMemberId
taskId =
(MessageDeliveryTaskRow -> Either StoreError MessageDeliveryTask)
-> StoreError
-> IO [MessageDeliveryTaskRow]
-> IO (Either StoreError MessageDeliveryTask)
forall a e b. (a -> Either e b) -> e -> IO [a] -> IO (Either e b)
firstRow' MessageDeliveryTaskRow -> Either StoreError MessageDeliveryTask
toTask (GroupMemberId -> StoreError
SEDeliveryTaskNotFound GroupMemberId
taskId) (IO [MessageDeliveryTaskRow]
-> IO (Either StoreError MessageDeliveryTask))
-> IO [MessageDeliveryTaskRow]
-> IO (Either StoreError MessageDeliveryTask)
forall a b. (a -> b) -> a -> b
$
Connection
-> Query -> Only GroupMemberId -> IO [MessageDeliveryTaskRow]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
Connection
db
[sql|
SELECT
t.delivery_task_id,
t.worker_scope, t.job_scope_spec_tag, t.job_scope_include_pending, t.job_scope_support_gm_id,
m.group_member_id, m.member_id, p.display_name, msg.broker_ts, msg.msg_body, t.message_from_channel
FROM delivery_tasks t
JOIN messages msg ON msg.message_id = t.message_id
JOIN group_members m ON m.group_member_id = t.sender_group_member_id
JOIN contact_profiles p ON p.contact_profile_id = COALESCE(m.member_profile_id, m.contact_profile_id)
WHERE t.delivery_task_id = ?
|]
(GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
taskId)
where
toTask :: MessageDeliveryTaskRow -> Either StoreError MessageDeliveryTask
toTask :: MessageDeliveryTaskRow -> Either StoreError MessageDeliveryTask
toTask ((Only GroupMemberId
taskId') :. DeliveryJobScopeRow
jobScopeRow :. (GroupMemberId
senderGMId, MemberId
senderMemberId, Text
senderMemberName, UTCTime
brokerTs, ChatMessage 'Json
chatMessage, BI Bool
messageFromChannel)) =
case DeliveryJobScopeRow -> Maybe DeliveryJobScope
toJobScope_ DeliveryJobScopeRow
jobScopeRow of
Just DeliveryJobScope
jobScope -> MessageDeliveryTask -> Either StoreError MessageDeliveryTask
forall a b. b -> Either a b
Right (MessageDeliveryTask -> Either StoreError MessageDeliveryTask)
-> MessageDeliveryTask -> Either StoreError MessageDeliveryTask
forall a b. (a -> b) -> a -> b
$ MessageDeliveryTask {taskId :: GroupMemberId
taskId = GroupMemberId
taskId', DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope, GroupMemberId
senderGMId :: GroupMemberId
senderGMId :: GroupMemberId
senderGMId, MemberId
senderMemberId :: MemberId
senderMemberId :: MemberId
senderMemberId, Text
senderMemberName :: Text
senderMemberName :: Text
senderMemberName, UTCTime
brokerTs :: UTCTime
brokerTs :: UTCTime
brokerTs, ChatMessage 'Json
chatMessage :: ChatMessage 'Json
chatMessage :: ChatMessage 'Json
chatMessage, Bool
messageFromChannel :: Bool
messageFromChannel :: Bool
messageFromChannel}
Maybe DeliveryJobScope
Nothing -> StoreError -> Either StoreError MessageDeliveryTask
forall a b. a -> Either a b
Left (StoreError -> Either StoreError MessageDeliveryTask)
-> StoreError -> Either StoreError MessageDeliveryTask
forall a b. (a -> b) -> a -> b
$ GroupMemberId -> StoreError
SEInvalidDeliveryTask GroupMemberId
taskId'
markDeliveryTaskFailed_ :: DB.Connection -> Int64 -> IO ()
markDeliveryTaskFailed_ :: Connection -> GroupMemberId -> IO ()
markDeliveryTaskFailed_ Connection
db GroupMemberId
taskId =
Connection -> Query -> Only GroupMemberId -> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute Connection
db Query
"UPDATE delivery_tasks SET failed = 1 where delivery_task_id = ?" (GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
taskId)
getNextDeliveryTasks :: DB.Connection -> GroupInfo -> MessageDeliveryTask -> IO (Either StoreError [Either StoreError MessageDeliveryTask])
getNextDeliveryTasks :: Connection
-> GroupInfo
-> MessageDeliveryTask
-> IO (Either StoreError [Either StoreError MessageDeliveryTask])
getNextDeliveryTasks Connection
db GroupInfo
gInfo MessageDeliveryTask
task =
String
-> IO [GroupMemberId]
-> (GroupMemberId -> IO (Either StoreError MessageDeliveryTask))
-> (GroupMemberId -> IO ())
-> IO (Either StoreError [Either StoreError MessageDeliveryTask])
forall i e a.
(Show i, AnyStoreError e) =>
String
-> IO [i]
-> (i -> IO (Either e a))
-> (i -> IO ())
-> IO (Either e [Either e a])
getWorkItems String
"message delivery task" IO [GroupMemberId]
getTaskIds (Connection
-> GroupMemberId -> IO (Either StoreError MessageDeliveryTask)
getMsgDeliveryTask_ Connection
db) (Connection -> GroupMemberId -> IO ()
markDeliveryTaskFailed_ Connection
db)
where
GroupInfo {GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId :: GroupMemberId
groupId, BoolDef
useRelays :: BoolDef
useRelays :: GroupInfo -> BoolDef
useRelays} = GroupInfo
gInfo
MessageDeliveryTask {DeliveryJobScope
jobScope :: MessageDeliveryTask -> DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope, GroupMemberId
senderGMId :: MessageDeliveryTask -> GroupMemberId
senderGMId :: GroupMemberId
senderGMId} = MessageDeliveryTask
task
getTaskIds :: IO [Int64]
getTaskIds :: IO [GroupMemberId]
getTaskIds
| BoolDef -> Bool
isTrue BoolDef
useRelays =
(Only GroupMemberId -> GroupMemberId)
-> [Only GroupMemberId] -> [GroupMemberId]
forall a b. (a -> b) -> [a] -> [b]
map Only GroupMemberId -> GroupMemberId
forall a. Only a -> a
fromOnly
([Only GroupMemberId] -> [GroupMemberId])
-> IO [Only GroupMemberId] -> IO [GroupMemberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection
-> Query
-> (Only GroupMemberId
:. (DeliveryJobScopeRow :. Only DeliveryTaskStatus))
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
Connection
db
[sql|
SELECT delivery_task_id
FROM delivery_tasks
WHERE group_id = ?
AND worker_scope = ?
AND job_scope_spec_tag IS NOT DISTINCT FROM ?
AND job_scope_include_pending IS NOT DISTINCT FROM ?
AND job_scope_support_gm_id IS NOT DISTINCT FROM ?
AND failed = 0
AND task_status = ?
ORDER BY delivery_task_id ASC
|]
((GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId) Only GroupMemberId
-> (DeliveryJobScopeRow :. Only DeliveryTaskStatus)
-> Only GroupMemberId
:. (DeliveryJobScopeRow :. Only DeliveryTaskStatus)
forall h t. h -> t -> h :. t
:. DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ DeliveryJobScope
jobScope DeliveryJobScopeRow
-> Only DeliveryTaskStatus
-> DeliveryJobScopeRow :. Only DeliveryTaskStatus
forall h t. h -> t -> h :. t
:. (DeliveryTaskStatus -> Only DeliveryTaskStatus
forall a. a -> Only a
Only DeliveryTaskStatus
DTSNew))
| Bool
otherwise =
(Only GroupMemberId -> GroupMemberId)
-> [Only GroupMemberId] -> [GroupMemberId]
forall a b. (a -> b) -> [a] -> [b]
map Only GroupMemberId -> GroupMemberId
forall a. Only a -> a
fromOnly
([Only GroupMemberId] -> [GroupMemberId])
-> IO [Only GroupMemberId] -> IO [GroupMemberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection
-> Query
-> (Only GroupMemberId
:. (DeliveryJobScopeRow :. (GroupMemberId, DeliveryTaskStatus)))
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
Connection
db
[sql|
SELECT delivery_task_id
FROM delivery_tasks
WHERE group_id = ?
AND worker_scope = ?
AND job_scope_spec_tag IS NOT DISTINCT FROM ?
AND job_scope_include_pending IS NOT DISTINCT FROM ?
AND job_scope_support_gm_id IS NOT DISTINCT FROM ?
AND sender_group_member_id = ?
AND failed = 0
AND task_status = ?
ORDER BY delivery_task_id ASC
|]
((GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId) Only GroupMemberId
-> (DeliveryJobScopeRow :. (GroupMemberId, DeliveryTaskStatus))
-> Only GroupMemberId
:. (DeliveryJobScopeRow :. (GroupMemberId, DeliveryTaskStatus))
forall h t. h -> t -> h :. t
:. DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ DeliveryJobScope
jobScope DeliveryJobScopeRow
-> (GroupMemberId, DeliveryTaskStatus)
-> DeliveryJobScopeRow :. (GroupMemberId, DeliveryTaskStatus)
forall h t. h -> t -> h :. t
:. (GroupMemberId
senderGMId, DeliveryTaskStatus
DTSNew))
updateDeliveryTaskStatus :: DB.Connection -> Int64 -> DeliveryTaskStatus -> IO ()
updateDeliveryTaskStatus :: Connection -> GroupMemberId -> DeliveryTaskStatus -> IO ()
updateDeliveryTaskStatus Connection
db GroupMemberId
taskId DeliveryTaskStatus
status = Connection
-> GroupMemberId -> DeliveryTaskStatus -> Maybe Text -> IO ()
updateDeliveryTaskStatus_ Connection
db GroupMemberId
taskId DeliveryTaskStatus
status Maybe Text
forall a. Maybe a
Nothing
setDeliveryTaskErrStatus :: DB.Connection -> Int64 -> Text -> IO ()
setDeliveryTaskErrStatus :: Connection -> GroupMemberId -> Text -> IO ()
setDeliveryTaskErrStatus Connection
db GroupMemberId
taskId Text
errReason = Connection
-> GroupMemberId -> DeliveryTaskStatus -> Maybe Text -> IO ()
updateDeliveryTaskStatus_ Connection
db GroupMemberId
taskId DeliveryTaskStatus
DTSError (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
errReason)
updateDeliveryTaskStatus_ :: DB.Connection -> Int64 -> DeliveryTaskStatus -> Maybe Text -> IO ()
updateDeliveryTaskStatus_ :: Connection
-> GroupMemberId -> DeliveryTaskStatus -> Maybe Text -> IO ()
updateDeliveryTaskStatus_ Connection
db GroupMemberId
taskId DeliveryTaskStatus
status Maybe Text
errReason_ = do
UTCTime
currentTs <- IO UTCTime
getCurrentTime
Connection
-> Query
-> (DeliveryTaskStatus, Maybe Text, UTCTime, GroupMemberId)
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
Connection
db
Query
"UPDATE delivery_tasks SET task_status = ?, task_err_reason = ?, updated_at = ? WHERE delivery_task_id = ?"
(DeliveryTaskStatus
status, Maybe Text
errReason_, UTCTime
currentTs, GroupMemberId
taskId)
deleteDoneDeliveryTasks :: DB.Connection -> UTCTime -> IO ()
deleteDoneDeliveryTasks :: Connection -> UTCTime -> IO ()
deleteDoneDeliveryTasks Connection
db UTCTime
createdAtCutoff = do
Connection
-> Query
-> (UTCTime, DeliveryTaskStatus, DeliveryTaskStatus)
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
Connection
db
[sql|
DELETE FROM delivery_tasks
WHERE created_at <= ?
AND (task_status IN (?,?) OR failed = 1)
|]
(UTCTime
createdAtCutoff, DeliveryTaskStatus
DTSProcessed, DeliveryTaskStatus
DTSError)
createMsgDeliveryJob :: DB.Connection -> GroupInfo -> DeliveryJobScope -> Maybe GroupMemberId -> ByteString -> IO ()
createMsgDeliveryJob :: Connection
-> GroupInfo
-> DeliveryJobScope
-> Maybe GroupMemberId
-> ByteString
-> IO ()
createMsgDeliveryJob Connection
db GroupInfo
gInfo DeliveryJobScope
jobScope Maybe GroupMemberId
singleSenderGMId_ ByteString
body = do
UTCTime
currentTs <- IO UTCTime
getCurrentTime
Connection
-> Query
-> (Only GroupMemberId
:. (DeliveryJobScopeRow
:. (Maybe GroupMemberId, Binary ByteString, DeliveryJobStatus,
UTCTime, UTCTime)))
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
Connection
db
[sql|
INSERT INTO delivery_jobs (
group_id,
worker_scope, job_scope_spec_tag, job_scope_include_pending, job_scope_support_gm_id,
single_sender_group_member_id, body, job_status, created_at, updated_at
) VALUES (?,?,?,?,?,?,?,?,?,?)
|]
((GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId) Only GroupMemberId
-> (DeliveryJobScopeRow
:. (Maybe GroupMemberId, Binary ByteString, DeliveryJobStatus,
UTCTime, UTCTime))
-> Only GroupMemberId
:. (DeliveryJobScopeRow
:. (Maybe GroupMemberId, Binary ByteString, DeliveryJobStatus,
UTCTime, UTCTime))
forall h t. h -> t -> h :. t
:. DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ DeliveryJobScope
jobScope DeliveryJobScopeRow
-> (Maybe GroupMemberId, Binary ByteString, DeliveryJobStatus,
UTCTime, UTCTime)
-> DeliveryJobScopeRow
:. (Maybe GroupMemberId, Binary ByteString, DeliveryJobStatus,
UTCTime, UTCTime)
forall h t. h -> t -> h :. t
:. (Maybe GroupMemberId
singleSenderGMId_, ByteString -> Binary ByteString
forall a. a -> Binary a
Binary ByteString
body, DeliveryJobStatus
DJSPending, UTCTime
currentTs, UTCTime
currentTs))
where
GroupInfo {GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId :: GroupMemberId
groupId} = GroupInfo
gInfo
getPendingDeliveryJobScopes :: DB.Connection -> IO [DeliveryWorkerKey]
getPendingDeliveryJobScopes :: Connection -> IO [DeliveryWorkerKey]
getPendingDeliveryJobScopes Connection
db =
Connection
-> Query -> Only DeliveryJobStatus -> IO [DeliveryWorkerKey]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
Connection
db
[sql|
SELECT DISTINCT group_id, worker_scope
FROM delivery_jobs
WHERE failed = 0 AND job_status = ?
|]
(DeliveryJobStatus -> Only DeliveryJobStatus
forall a. a -> Only a
Only DeliveryJobStatus
DJSPending)
type MessageDeliveryJobRow = (Only Int64) :. DeliveryJobScopeRow :. (Maybe GroupMemberId, Binary ByteString, Maybe GroupMemberId)
getNextDeliveryJob :: DB.Connection -> DeliveryWorkerKey -> IO (Either StoreError (Maybe MessageDeliveryJob))
getNextDeliveryJob :: Connection
-> DeliveryWorkerKey
-> IO (Either StoreError (Maybe MessageDeliveryJob))
getNextDeliveryJob Connection
db DeliveryWorkerKey
deliveryKey = do
String
-> IO (Maybe GroupMemberId)
-> (GroupMemberId -> IO (Either StoreError MessageDeliveryJob))
-> (GroupMemberId -> IO ())
-> IO (Either StoreError (Maybe MessageDeliveryJob))
forall i e a.
(Show i, AnyStoreError e) =>
String
-> IO (Maybe i)
-> (i -> IO (Either e a))
-> (i -> IO ())
-> IO (Either e (Maybe a))
getWorkItem String
"delivery job" IO (Maybe GroupMemberId)
getJobId GroupMemberId -> IO (Either StoreError MessageDeliveryJob)
getJob GroupMemberId -> IO ()
markJobFailed
where
(GroupMemberId
groupId, DeliveryWorkerScope
workerScope) = DeliveryWorkerKey
deliveryKey
getJobId :: IO (Maybe Int64)
getJobId :: IO (Maybe GroupMemberId)
getJobId =
(Only GroupMemberId -> GroupMemberId)
-> IO [Only GroupMemberId] -> IO (Maybe GroupMemberId)
forall (f :: * -> *) a b.
Functor f =>
(a -> b) -> f [a] -> f (Maybe b)
maybeFirstRow Only GroupMemberId -> GroupMemberId
forall a. Only a -> a
fromOnly (IO [Only GroupMemberId] -> IO (Maybe GroupMemberId))
-> IO [Only GroupMemberId] -> IO (Maybe GroupMemberId)
forall a b. (a -> b) -> a -> b
$
Connection
-> Query
-> (GroupMemberId, DeliveryWorkerScope, DeliveryJobStatus)
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
Connection
db
[sql|
SELECT delivery_job_id
FROM delivery_jobs
WHERE group_id = ? AND worker_scope = ?
AND failed = 0 AND job_status = ?
ORDER BY delivery_job_id ASC
LIMIT 1
|]
(GroupMemberId
groupId, DeliveryWorkerScope
workerScope, DeliveryJobStatus
DJSPending)
getJob :: Int64 -> IO (Either StoreError MessageDeliveryJob)
getJob :: GroupMemberId -> IO (Either StoreError MessageDeliveryJob)
getJob GroupMemberId
jobId =
(MessageDeliveryJobRow -> Either StoreError MessageDeliveryJob)
-> StoreError
-> IO [MessageDeliveryJobRow]
-> IO (Either StoreError MessageDeliveryJob)
forall a e b. (a -> Either e b) -> e -> IO [a] -> IO (Either e b)
firstRow' MessageDeliveryJobRow -> Either StoreError MessageDeliveryJob
toDeliveryJob (GroupMemberId -> StoreError
SEDeliveryJobNotFound GroupMemberId
jobId) (IO [MessageDeliveryJobRow]
-> IO (Either StoreError MessageDeliveryJob))
-> IO [MessageDeliveryJobRow]
-> IO (Either StoreError MessageDeliveryJob)
forall a b. (a -> b) -> a -> b
$
Connection
-> Query -> Only GroupMemberId -> IO [MessageDeliveryJobRow]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
Connection
db
[sql|
SELECT
delivery_job_id,
worker_scope, job_scope_spec_tag, job_scope_include_pending, job_scope_support_gm_id,
single_sender_group_member_id, body, cursor_group_member_id
FROM delivery_jobs
WHERE delivery_job_id = ?
|]
(GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
jobId)
where
toDeliveryJob :: MessageDeliveryJobRow -> Either StoreError MessageDeliveryJob
toDeliveryJob :: MessageDeliveryJobRow -> Either StoreError MessageDeliveryJob
toDeliveryJob ((Only GroupMemberId
jobId') :. DeliveryJobScopeRow
jobScopeRow :. (Maybe GroupMemberId
singleSenderGMId_, Binary ByteString
body, Maybe GroupMemberId
cursorGMId_)) =
case DeliveryJobScopeRow -> Maybe DeliveryJobScope
toJobScope_ DeliveryJobScopeRow
jobScopeRow of
Just DeliveryJobScope
jobScope -> MessageDeliveryJob -> Either StoreError MessageDeliveryJob
forall a b. b -> Either a b
Right (MessageDeliveryJob -> Either StoreError MessageDeliveryJob)
-> MessageDeliveryJob -> Either StoreError MessageDeliveryJob
forall a b. (a -> b) -> a -> b
$ MessageDeliveryJob {jobId :: GroupMemberId
jobId = GroupMemberId
jobId', DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope, Maybe GroupMemberId
singleSenderGMId_ :: Maybe GroupMemberId
singleSenderGMId_ :: Maybe GroupMemberId
singleSenderGMId_, ByteString
body :: ByteString
body :: ByteString
body, Maybe GroupMemberId
cursorGMId_ :: Maybe GroupMemberId
cursorGMId_ :: Maybe GroupMemberId
cursorGMId_}
Maybe DeliveryJobScope
Nothing -> StoreError -> Either StoreError MessageDeliveryJob
forall a b. a -> Either a b
Left (StoreError -> Either StoreError MessageDeliveryJob)
-> StoreError -> Either StoreError MessageDeliveryJob
forall a b. (a -> b) -> a -> b
$ GroupMemberId -> StoreError
SEInvalidDeliveryJob GroupMemberId
jobId'
markJobFailed :: Int64 -> IO ()
markJobFailed :: GroupMemberId -> IO ()
markJobFailed GroupMemberId
jobId =
Connection -> Query -> Only GroupMemberId -> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute Connection
db Query
"UPDATE delivery_jobs SET failed = 1 where delivery_job_id = ?" (GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
jobId)
updateDeliveryJobStatus :: DB.Connection -> Int64 -> DeliveryJobStatus -> IO ()
updateDeliveryJobStatus :: Connection -> GroupMemberId -> DeliveryJobStatus -> IO ()
updateDeliveryJobStatus Connection
db GroupMemberId
jobId DeliveryJobStatus
status = Connection
-> GroupMemberId -> DeliveryJobStatus -> Maybe Text -> IO ()
updateDeliveryJobStatus_ Connection
db GroupMemberId
jobId DeliveryJobStatus
status Maybe Text
forall a. Maybe a
Nothing
setDeliveryJobErrStatus :: DB.Connection -> Int64 -> Text -> IO ()
setDeliveryJobErrStatus :: Connection -> GroupMemberId -> Text -> IO ()
setDeliveryJobErrStatus Connection
db GroupMemberId
jobId Text
errReason = Connection
-> GroupMemberId -> DeliveryJobStatus -> Maybe Text -> IO ()
updateDeliveryJobStatus_ Connection
db GroupMemberId
jobId DeliveryJobStatus
DJSError (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
errReason)
updateDeliveryJobStatus_ :: DB.Connection -> Int64 -> DeliveryJobStatus -> Maybe Text -> IO ()
updateDeliveryJobStatus_ :: Connection
-> GroupMemberId -> DeliveryJobStatus -> Maybe Text -> IO ()
updateDeliveryJobStatus_ Connection
db GroupMemberId
jobId DeliveryJobStatus
status Maybe Text
errReason_ = do
UTCTime
currentTs <- IO UTCTime
getCurrentTime
Connection
-> Query
-> (DeliveryJobStatus, Maybe Text, UTCTime, GroupMemberId)
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
Connection
db
Query
"UPDATE delivery_jobs SET job_status = ?, job_err_reason = ?, updated_at = ? WHERE delivery_job_id = ?"
(DeliveryJobStatus
status, Maybe Text
errReason_, UTCTime
currentTs, GroupMemberId
jobId)
getGroupMembersByCursor :: DB.Connection -> VersionRangeChat -> User -> GroupInfo -> Maybe GroupMemberId -> Maybe GroupMemberId -> Int -> IO [GroupMember]
getGroupMembersByCursor :: Connection
-> VersionRangeChat
-> User
-> GroupInfo
-> Maybe GroupMemberId
-> Maybe GroupMemberId
-> Int
-> IO [GroupMember]
getGroupMembersByCursor Connection
db VersionRangeChat
vr user :: User
user@User {GroupMemberId
userContactId :: GroupMemberId
userContactId :: User -> GroupMemberId
userContactId} GroupInfo {GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId :: GroupMemberId
groupId} Maybe GroupMemberId
cursorGMId_ Maybe GroupMemberId
singleSenderGMId_ Int
count = do
[GroupMemberId]
gmIds :: [Int64] <-
(Only GroupMemberId -> GroupMemberId)
-> [Only GroupMemberId] -> [GroupMemberId]
forall a b. (a -> b) -> [a] -> [b]
map Only GroupMemberId -> GroupMemberId
forall a. Only a -> a
fromOnly ([Only GroupMemberId] -> [GroupMemberId])
-> IO [Only GroupMemberId] -> IO [GroupMemberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> case Maybe GroupMemberId
cursorGMId_ of
Maybe GroupMemberId
Nothing ->
Connection
-> Query
-> ((GroupMemberId, GroupMemberId, Maybe GroupMemberId,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
:. Only Int)
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
Connection
db
(Query
query Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
orderLimit)
( (GroupMemberId
groupId, GroupMemberId
userContactId, Maybe GroupMemberId
singleSenderGMId_, GroupMemberStatus
GSMemIntroduced, GroupMemberStatus
GSMemIntroInvited, GroupMemberStatus
GSMemAccepted, GroupMemberStatus
GSMemAnnounced, GroupMemberStatus
GSMemConnected, GroupMemberStatus
GSMemComplete)
(GroupMemberId, GroupMemberId, Maybe GroupMemberId,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
-> Only Int
-> (GroupMemberId, GroupMemberId, Maybe GroupMemberId,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
:. Only Int
forall h t. h -> t -> h :. t
:. (Int -> Only Int
forall a. a -> Only a
Only Int
count)
)
Just GroupMemberId
cursorGMId ->
Connection
-> Query
-> ((GroupMemberId, GroupMemberId, Maybe GroupMemberId,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
:. (GroupMemberId, Int))
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
Connection
db
(Query
query Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
" AND group_member_id > ?" Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
orderLimit)
( (GroupMemberId
groupId, GroupMemberId
userContactId, Maybe GroupMemberId
singleSenderGMId_, GroupMemberStatus
GSMemIntroduced, GroupMemberStatus
GSMemIntroInvited, GroupMemberStatus
GSMemAccepted, GroupMemberStatus
GSMemAnnounced, GroupMemberStatus
GSMemConnected, GroupMemberStatus
GSMemComplete)
(GroupMemberId, GroupMemberId, Maybe GroupMemberId,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
-> (GroupMemberId, Int)
-> (GroupMemberId, GroupMemberId, Maybe GroupMemberId,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
:. (GroupMemberId, Int)
forall h t. h -> t -> h :. t
:. (GroupMemberId
cursorGMId, Int
count)
)
#if defined(dbPostgres)
map (toContactMember vr user) <$>
DB.query
db
(groupMemberQuery <> " WHERE m.group_member_id IN ?")
(Only (In gmIds))
#else
[Either StoreError GroupMember] -> [GroupMember]
forall a b. [Either a b] -> [b]
rights ([Either StoreError GroupMember] -> [GroupMember])
-> IO [Either StoreError GroupMember] -> IO [GroupMember]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (GroupMemberId -> IO (Either StoreError GroupMember))
-> [GroupMemberId] -> IO [Either StoreError GroupMember]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ExceptT StoreError IO GroupMember
-> IO (Either StoreError GroupMember)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT StoreError IO GroupMember
-> IO (Either StoreError GroupMember))
-> (GroupMemberId -> ExceptT StoreError IO GroupMember)
-> GroupMemberId
-> IO (Either StoreError GroupMember)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection
-> VersionRangeChat
-> User
-> GroupMemberId
-> ExceptT StoreError IO GroupMember
getGroupMemberById Connection
db VersionRangeChat
vr User
user) [GroupMemberId]
gmIds
#endif
where
query :: Query
query =
[sql|
SELECT group_member_id
FROM group_members
WHERE group_id = ?
AND contact_id IS DISTINCT FROM ?
AND group_member_id IS DISTINCT FROM ?
AND member_status IN (?,?,?,?,?,?)
|]
orderLimit :: Query
orderLimit = Query
" ORDER BY group_member_id ASC LIMIT ?"
updateDeliveryJobCursor :: DB.Connection -> Int64 -> GroupMemberId -> IO ()
updateDeliveryJobCursor :: Connection -> GroupMemberId -> GroupMemberId -> IO ()
updateDeliveryJobCursor Connection
db GroupMemberId
jobId GroupMemberId
cursorGMId = do
UTCTime
currentTs <- IO UTCTime
getCurrentTime
Connection
-> Query -> (GroupMemberId, UTCTime, GroupMemberId) -> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
Connection
db
Query
"UPDATE delivery_jobs SET cursor_group_member_id = ?, updated_at = ? WHERE delivery_job_id = ?"
(GroupMemberId
cursorGMId, UTCTime
currentTs, GroupMemberId
jobId)
deleteDoneDeliveryJobs :: DB.Connection -> UTCTime -> IO ()
deleteDoneDeliveryJobs :: Connection -> UTCTime -> IO ()
deleteDoneDeliveryJobs Connection
db UTCTime
createdAtCutoff = do
Connection
-> Query
-> (UTCTime, DeliveryJobStatus, DeliveryJobStatus)
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
Connection
db
[sql|
DELETE FROM delivery_jobs
WHERE created_at <= ?
AND (job_status IN (?,?) OR failed = 1)
|]
(UTCTime
createdAtCutoff, DeliveryJobStatus
DJSComplete, DeliveryJobStatus
DJSError)