{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}

module Simplex.Chat.Store.Delivery
  ( createMsgDeliveryTask,
    deleteGroupDeliveryTasks,
    deleteGroupDeliveryJobs,
    getPendingDeliveryTaskScopes,
    getNextDeliveryTask,
    getNextDeliveryTasks,
    updateDeliveryTaskStatus,
    setDeliveryTaskErrStatus,
    deleteDoneDeliveryTasks,
    createMsgDeliveryJob,
    getPendingDeliveryJobScopes,
    getNextDeliveryJob,
    updateDeliveryJobStatus,
    setDeliveryJobErrStatus,
    getGroupMembersByCursor,
    updateDeliveryJobCursor,
    deleteDoneDeliveryJobs,
  )
where

import Data.ByteString.Char8 (ByteString)
import Data.Int (Int64)
import Data.Text (Text)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Simplex.Chat.Delivery
import Simplex.Chat.Protocol hiding (Binary)
import Simplex.Chat.Store.Shared
import Simplex.Chat.Types
import Simplex.Messaging.Agent.Store.AgentStore (getWorkItem, getWorkItems, maybeFirstRow)
import Simplex.Messaging.Agent.Store.DB (Binary (..), BoolInt (..))
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Util (firstRow')
#if defined(dbPostgres)
import Database.PostgreSQL.Simple (In (..), Only (..), (:.) (..))
import Database.PostgreSQL.Simple.SqlQQ (sql)
#else
import Control.Monad.Except
import Data.Either (rights)
import Database.SQLite.Simple (Only (..), (:.) (..))
import Database.SQLite.Simple.QQ (sql)
import Simplex.Chat.Store.Groups (getGroupMemberById)
#endif

type DeliveryJobScopeRow = (DeliveryWorkerScope, Maybe DeliveryJobSpecTag, Maybe BoolInt, Maybe GroupMemberId)

jobScopeRow_ :: DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ :: DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ = \case
  DJSGroup {DeliveryJobSpec
jobSpec :: DeliveryJobSpec
jobSpec :: DeliveryJobScope -> DeliveryJobSpec
jobSpec} -> case DeliveryJobSpec
jobSpec of
    DJDeliveryJob {Bool
includePending :: Bool
includePending :: DeliveryJobSpec -> Bool
includePending} -> (DeliveryWorkerScope
DWSGroup, DeliveryJobSpecTag -> Maybe DeliveryJobSpecTag
forall a. a -> Maybe a
Just DeliveryJobSpecTag
DJSTDeliveryJob, BoolInt -> Maybe BoolInt
forall a. a -> Maybe a
Just (Bool -> BoolInt
BI Bool
includePending), Maybe GroupMemberId
forall a. Maybe a
Nothing)
    DeliveryJobSpec
DJRelayRemoved -> (DeliveryWorkerScope
DWSGroup, DeliveryJobSpecTag -> Maybe DeliveryJobSpecTag
forall a. a -> Maybe a
Just DeliveryJobSpecTag
DJSTRelayRemoved, Maybe BoolInt
forall a. Maybe a
Nothing, Maybe GroupMemberId
forall a. Maybe a
Nothing)
  DJSMemberSupport {GroupMemberId
supportGMId :: GroupMemberId
supportGMId :: DeliveryJobScope -> GroupMemberId
supportGMId} -> (DeliveryWorkerScope
DWSMemberSupport, Maybe DeliveryJobSpecTag
forall a. Maybe a
Nothing, Maybe BoolInt
forall a. Maybe a
Nothing, GroupMemberId -> Maybe GroupMemberId
forall a. a -> Maybe a
Just GroupMemberId
supportGMId)

toJobScope_ :: DeliveryJobScopeRow -> Maybe DeliveryJobScope
toJobScope_ :: DeliveryJobScopeRow -> Maybe DeliveryJobScope
toJobScope_ = \case
  (DeliveryWorkerScope
DWSGroup, Just DeliveryJobSpecTag
DJSTDeliveryJob, Just (BI Bool
includePending), Maybe GroupMemberId
Nothing) -> DeliveryJobScope -> Maybe DeliveryJobScope
forall a. a -> Maybe a
Just (DeliveryJobScope -> Maybe DeliveryJobScope)
-> DeliveryJobScope -> Maybe DeliveryJobScope
forall a b. (a -> b) -> a -> b
$ DJSGroup {jobSpec :: DeliveryJobSpec
jobSpec = DJDeliveryJob {Bool
includePending :: Bool
includePending :: Bool
includePending}}
  (DeliveryWorkerScope
DWSGroup, Just DeliveryJobSpecTag
DJSTRelayRemoved, Maybe BoolInt
Nothing, Maybe GroupMemberId
Nothing) -> DeliveryJobScope -> Maybe DeliveryJobScope
forall a. a -> Maybe a
Just (DeliveryJobScope -> Maybe DeliveryJobScope)
-> DeliveryJobScope -> Maybe DeliveryJobScope
forall a b. (a -> b) -> a -> b
$ DJSGroup {jobSpec :: DeliveryJobSpec
jobSpec = DeliveryJobSpec
DJRelayRemoved}
  (DeliveryWorkerScope
DWSMemberSupport, Maybe DeliveryJobSpecTag
Nothing, Maybe BoolInt
Nothing, Just GroupMemberId
supportGMId) -> DeliveryJobScope -> Maybe DeliveryJobScope
forall a. a -> Maybe a
Just (DeliveryJobScope -> Maybe DeliveryJobScope)
-> DeliveryJobScope -> Maybe DeliveryJobScope
forall a b. (a -> b) -> a -> b
$ DJSMemberSupport {GroupMemberId
supportGMId :: GroupMemberId
supportGMId :: GroupMemberId
supportGMId}
  DeliveryJobScopeRow
_ -> Maybe DeliveryJobScope
forall a. Maybe a
Nothing

createMsgDeliveryTask :: DB.Connection -> GroupInfo -> GroupMember -> NewMessageDeliveryTask -> IO ()
createMsgDeliveryTask :: Connection
-> GroupInfo -> GroupMember -> NewMessageDeliveryTask -> IO ()
createMsgDeliveryTask Connection
db GroupInfo
gInfo GroupMember
sender NewMessageDeliveryTask
newTask = do
  UTCTime
currentTs <- IO UTCTime
getCurrentTime
  Connection
-> Query
-> (Only GroupMemberId
    :. (DeliveryJobScopeRow
        :. (GroupMemberId, GroupMemberId, BoolInt, DeliveryTaskStatus,
            UTCTime, UTCTime)))
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
    Connection
db
    [sql|
      INSERT INTO delivery_tasks (
        group_id,
        worker_scope, job_scope_spec_tag, job_scope_include_pending, job_scope_support_gm_id,
        sender_group_member_id, message_id, message_from_channel, task_status,
        created_at, updated_at
      ) VALUES (?,?,?,?,?,?,?,?,?,?,?)
    |]
    ((GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId) Only GroupMemberId
-> (DeliveryJobScopeRow
    :. (GroupMemberId, GroupMemberId, BoolInt, DeliveryTaskStatus,
        UTCTime, UTCTime))
-> Only GroupMemberId
   :. (DeliveryJobScopeRow
       :. (GroupMemberId, GroupMemberId, BoolInt, DeliveryTaskStatus,
           UTCTime, UTCTime))
forall h t. h -> t -> h :. t
:. DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ DeliveryJobScope
jobScope DeliveryJobScopeRow
-> (GroupMemberId, GroupMemberId, BoolInt, DeliveryTaskStatus,
    UTCTime, UTCTime)
-> DeliveryJobScopeRow
   :. (GroupMemberId, GroupMemberId, BoolInt, DeliveryTaskStatus,
       UTCTime, UTCTime)
forall h t. h -> t -> h :. t
:. (GroupMember -> GroupMemberId
groupMemberId' GroupMember
sender, GroupMemberId
messageId, Bool -> BoolInt
BI Bool
messageFromChannel, DeliveryTaskStatus
DTSNew, UTCTime
currentTs, UTCTime
currentTs))
  where
    GroupInfo {GroupMemberId
groupId :: GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId} = GroupInfo
gInfo
    NewMessageDeliveryTask {GroupMemberId
messageId :: GroupMemberId
messageId :: NewMessageDeliveryTask -> GroupMemberId
messageId, DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope :: NewMessageDeliveryTask -> DeliveryJobScope
jobScope, Bool
messageFromChannel :: Bool
messageFromChannel :: NewMessageDeliveryTask -> Bool
messageFromChannel} = NewMessageDeliveryTask
newTask

deleteGroupDeliveryTasks :: DB.Connection -> GroupInfo -> IO ()
deleteGroupDeliveryTasks :: Connection -> GroupInfo -> IO ()
deleteGroupDeliveryTasks Connection
db GroupInfo {GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId :: GroupMemberId
groupId} =
  Connection -> Query -> Only GroupMemberId -> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute Connection
db Query
"DELETE FROM delivery_tasks WHERE group_id = ?" (GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId)

deleteGroupDeliveryJobs :: DB.Connection -> GroupInfo -> IO ()
deleteGroupDeliveryJobs :: Connection -> GroupInfo -> IO ()
deleteGroupDeliveryJobs Connection
db GroupInfo {GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId :: GroupMemberId
groupId} =
  Connection -> Query -> Only GroupMemberId -> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute Connection
db Query
"DELETE FROM delivery_jobs WHERE group_id = ?" (GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId)

getPendingDeliveryTaskScopes :: DB.Connection -> IO [DeliveryWorkerKey]
getPendingDeliveryTaskScopes :: Connection -> IO [DeliveryWorkerKey]
getPendingDeliveryTaskScopes Connection
db =
  Connection
-> Query -> Only DeliveryTaskStatus -> IO [DeliveryWorkerKey]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
    Connection
db
    [sql|
      SELECT DISTINCT group_id, worker_scope
      FROM delivery_tasks
      WHERE failed = 0 AND task_status = ?
    |]
    (DeliveryTaskStatus -> Only DeliveryTaskStatus
forall a. a -> Only a
Only DeliveryTaskStatus
DTSNew)

getNextDeliveryTask :: DB.Connection -> DeliveryWorkerKey -> IO (Either StoreError (Maybe MessageDeliveryTask))
getNextDeliveryTask :: Connection
-> DeliveryWorkerKey
-> IO (Either StoreError (Maybe MessageDeliveryTask))
getNextDeliveryTask Connection
db DeliveryWorkerKey
deliveryKey = do
  String
-> IO (Maybe GroupMemberId)
-> (GroupMemberId -> IO (Either StoreError MessageDeliveryTask))
-> (GroupMemberId -> IO ())
-> IO (Either StoreError (Maybe MessageDeliveryTask))
forall i e a.
(Show i, AnyStoreError e) =>
String
-> IO (Maybe i)
-> (i -> IO (Either e a))
-> (i -> IO ())
-> IO (Either e (Maybe a))
getWorkItem String
"delivery task" IO (Maybe GroupMemberId)
getTaskId (Connection
-> GroupMemberId -> IO (Either StoreError MessageDeliveryTask)
getMsgDeliveryTask_ Connection
db) (Connection -> GroupMemberId -> IO ()
markDeliveryTaskFailed_ Connection
db)
  where
    (GroupMemberId
groupId, DeliveryWorkerScope
workerScope) = DeliveryWorkerKey
deliveryKey
    getTaskId :: IO (Maybe Int64)
    getTaskId :: IO (Maybe GroupMemberId)
getTaskId =
      (Only GroupMemberId -> GroupMemberId)
-> IO [Only GroupMemberId] -> IO (Maybe GroupMemberId)
forall (f :: * -> *) a b.
Functor f =>
(a -> b) -> f [a] -> f (Maybe b)
maybeFirstRow Only GroupMemberId -> GroupMemberId
forall a. Only a -> a
fromOnly (IO [Only GroupMemberId] -> IO (Maybe GroupMemberId))
-> IO [Only GroupMemberId] -> IO (Maybe GroupMemberId)
forall a b. (a -> b) -> a -> b
$
        Connection
-> Query
-> (GroupMemberId, DeliveryWorkerScope, DeliveryTaskStatus)
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
          Connection
db
          [sql|
            SELECT delivery_task_id
            FROM delivery_tasks
            WHERE group_id = ? AND worker_scope = ?
              AND failed = 0 AND task_status = ?
            ORDER BY delivery_task_id ASC
            LIMIT 1
          |]
          (GroupMemberId
groupId, DeliveryWorkerScope
workerScope, DeliveryTaskStatus
DTSNew)

type MessageDeliveryTaskRow = (Only Int64) :. DeliveryJobScopeRow :. (GroupMemberId, MemberId, ContactName, UTCTime, ChatMessage 'Json, BoolInt)

getMsgDeliveryTask_ :: DB.Connection -> Int64 -> IO (Either StoreError MessageDeliveryTask)
getMsgDeliveryTask_ :: Connection
-> GroupMemberId -> IO (Either StoreError MessageDeliveryTask)
getMsgDeliveryTask_ Connection
db GroupMemberId
taskId =
  (MessageDeliveryTaskRow -> Either StoreError MessageDeliveryTask)
-> StoreError
-> IO [MessageDeliveryTaskRow]
-> IO (Either StoreError MessageDeliveryTask)
forall a e b. (a -> Either e b) -> e -> IO [a] -> IO (Either e b)
firstRow' MessageDeliveryTaskRow -> Either StoreError MessageDeliveryTask
toTask (GroupMemberId -> StoreError
SEDeliveryTaskNotFound GroupMemberId
taskId) (IO [MessageDeliveryTaskRow]
 -> IO (Either StoreError MessageDeliveryTask))
-> IO [MessageDeliveryTaskRow]
-> IO (Either StoreError MessageDeliveryTask)
forall a b. (a -> b) -> a -> b
$
    Connection
-> Query -> Only GroupMemberId -> IO [MessageDeliveryTaskRow]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
      Connection
db
      [sql|
        SELECT
          t.delivery_task_id,
          t.worker_scope, t.job_scope_spec_tag, t.job_scope_include_pending, t.job_scope_support_gm_id,
          m.group_member_id, m.member_id, p.display_name, msg.broker_ts, msg.msg_body, t.message_from_channel
        FROM delivery_tasks t
        JOIN messages msg ON msg.message_id = t.message_id
        JOIN group_members m ON m.group_member_id = t.sender_group_member_id
        JOIN contact_profiles p ON p.contact_profile_id = COALESCE(m.member_profile_id, m.contact_profile_id)
        WHERE t.delivery_task_id = ?
      |]
      (GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
taskId)
  where
    toTask :: MessageDeliveryTaskRow -> Either StoreError MessageDeliveryTask
    toTask :: MessageDeliveryTaskRow -> Either StoreError MessageDeliveryTask
toTask ((Only GroupMemberId
taskId') :. DeliveryJobScopeRow
jobScopeRow :. (GroupMemberId
senderGMId, MemberId
senderMemberId, Text
senderMemberName, UTCTime
brokerTs, ChatMessage 'Json
chatMessage, BI Bool
messageFromChannel)) =
      case DeliveryJobScopeRow -> Maybe DeliveryJobScope
toJobScope_ DeliveryJobScopeRow
jobScopeRow of
        Just DeliveryJobScope
jobScope -> MessageDeliveryTask -> Either StoreError MessageDeliveryTask
forall a b. b -> Either a b
Right (MessageDeliveryTask -> Either StoreError MessageDeliveryTask)
-> MessageDeliveryTask -> Either StoreError MessageDeliveryTask
forall a b. (a -> b) -> a -> b
$ MessageDeliveryTask {taskId :: GroupMemberId
taskId = GroupMemberId
taskId', DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope, GroupMemberId
senderGMId :: GroupMemberId
senderGMId :: GroupMemberId
senderGMId, MemberId
senderMemberId :: MemberId
senderMemberId :: MemberId
senderMemberId, Text
senderMemberName :: Text
senderMemberName :: Text
senderMemberName, UTCTime
brokerTs :: UTCTime
brokerTs :: UTCTime
brokerTs, ChatMessage 'Json
chatMessage :: ChatMessage 'Json
chatMessage :: ChatMessage 'Json
chatMessage, Bool
messageFromChannel :: Bool
messageFromChannel :: Bool
messageFromChannel}
        Maybe DeliveryJobScope
Nothing -> StoreError -> Either StoreError MessageDeliveryTask
forall a b. a -> Either a b
Left (StoreError -> Either StoreError MessageDeliveryTask)
-> StoreError -> Either StoreError MessageDeliveryTask
forall a b. (a -> b) -> a -> b
$ GroupMemberId -> StoreError
SEInvalidDeliveryTask GroupMemberId
taskId'

markDeliveryTaskFailed_ :: DB.Connection -> Int64 -> IO ()
markDeliveryTaskFailed_ :: Connection -> GroupMemberId -> IO ()
markDeliveryTaskFailed_ Connection
db GroupMemberId
taskId =
  Connection -> Query -> Only GroupMemberId -> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute Connection
db Query
"UPDATE delivery_tasks SET failed = 1 where delivery_task_id = ?" (GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
taskId)

-- TODO [channels fwd] possible optimization is to read and add tasks to batch iteratively to avoid reading too many tasks
-- passed MessageDeliveryTask defines the jobScope to search for
getNextDeliveryTasks :: DB.Connection -> GroupInfo -> MessageDeliveryTask -> IO (Either StoreError [Either StoreError MessageDeliveryTask])
getNextDeliveryTasks :: Connection
-> GroupInfo
-> MessageDeliveryTask
-> IO (Either StoreError [Either StoreError MessageDeliveryTask])
getNextDeliveryTasks Connection
db GroupInfo
gInfo MessageDeliveryTask
task =
  String
-> IO [GroupMemberId]
-> (GroupMemberId -> IO (Either StoreError MessageDeliveryTask))
-> (GroupMemberId -> IO ())
-> IO (Either StoreError [Either StoreError MessageDeliveryTask])
forall i e a.
(Show i, AnyStoreError e) =>
String
-> IO [i]
-> (i -> IO (Either e a))
-> (i -> IO ())
-> IO (Either e [Either e a])
getWorkItems String
"message delivery task" IO [GroupMemberId]
getTaskIds (Connection
-> GroupMemberId -> IO (Either StoreError MessageDeliveryTask)
getMsgDeliveryTask_ Connection
db) (Connection -> GroupMemberId -> IO ()
markDeliveryTaskFailed_ Connection
db)
  where
    GroupInfo {GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId :: GroupMemberId
groupId, BoolDef
useRelays :: BoolDef
useRelays :: GroupInfo -> BoolDef
useRelays} = GroupInfo
gInfo
    MessageDeliveryTask {DeliveryJobScope
jobScope :: MessageDeliveryTask -> DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope, GroupMemberId
senderGMId :: MessageDeliveryTask -> GroupMemberId
senderGMId :: GroupMemberId
senderGMId} = MessageDeliveryTask
task
    getTaskIds :: IO [Int64]
    getTaskIds :: IO [GroupMemberId]
getTaskIds
      | BoolDef -> Bool
isTrue BoolDef
useRelays =
          (Only GroupMemberId -> GroupMemberId)
-> [Only GroupMemberId] -> [GroupMemberId]
forall a b. (a -> b) -> [a] -> [b]
map Only GroupMemberId -> GroupMemberId
forall a. Only a -> a
fromOnly
            ([Only GroupMemberId] -> [GroupMemberId])
-> IO [Only GroupMemberId] -> IO [GroupMemberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection
-> Query
-> (Only GroupMemberId
    :. (DeliveryJobScopeRow :. Only DeliveryTaskStatus))
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
              Connection
db
              [sql|
                SELECT delivery_task_id
                FROM delivery_tasks
                WHERE group_id = ?
                  AND worker_scope = ?
                  AND job_scope_spec_tag IS NOT DISTINCT FROM ?
                  AND job_scope_include_pending IS NOT DISTINCT FROM ?
                  AND job_scope_support_gm_id IS NOT DISTINCT FROM ?
                  AND failed = 0
                  AND task_status = ?
                ORDER BY delivery_task_id ASC
              |]
              ((GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId) Only GroupMemberId
-> (DeliveryJobScopeRow :. Only DeliveryTaskStatus)
-> Only GroupMemberId
   :. (DeliveryJobScopeRow :. Only DeliveryTaskStatus)
forall h t. h -> t -> h :. t
:. DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ DeliveryJobScope
jobScope DeliveryJobScopeRow
-> Only DeliveryTaskStatus
-> DeliveryJobScopeRow :. Only DeliveryTaskStatus
forall h t. h -> t -> h :. t
:. (DeliveryTaskStatus -> Only DeliveryTaskStatus
forall a. a -> Only a
Only DeliveryTaskStatus
DTSNew))
      | Bool
otherwise =
          -- For fully connected groups we guarantee a singleSenderGMId for a delivery job by additionally filtering
          -- on sender_group_member_id here, so that the job can then retrieve less members as recipients,
          -- optimizing for this single sender (see processDeliveryJob -> fully connected group branch).
          -- We do this optimization in the job to decrease load on admins using mobile devices for clients.
          (Only GroupMemberId -> GroupMemberId)
-> [Only GroupMemberId] -> [GroupMemberId]
forall a b. (a -> b) -> [a] -> [b]
map Only GroupMemberId -> GroupMemberId
forall a. Only a -> a
fromOnly
            ([Only GroupMemberId] -> [GroupMemberId])
-> IO [Only GroupMemberId] -> IO [GroupMemberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection
-> Query
-> (Only GroupMemberId
    :. (DeliveryJobScopeRow :. (GroupMemberId, DeliveryTaskStatus)))
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
              Connection
db
              [sql|
                SELECT delivery_task_id
                FROM delivery_tasks
                WHERE group_id = ?
                  AND worker_scope = ?
                  AND job_scope_spec_tag IS NOT DISTINCT FROM ?
                  AND job_scope_include_pending IS NOT DISTINCT FROM ?
                  AND job_scope_support_gm_id IS NOT DISTINCT FROM ?
                  AND sender_group_member_id = ?
                  AND failed = 0
                  AND task_status = ?
                ORDER BY delivery_task_id ASC
              |]
              ((GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId) Only GroupMemberId
-> (DeliveryJobScopeRow :. (GroupMemberId, DeliveryTaskStatus))
-> Only GroupMemberId
   :. (DeliveryJobScopeRow :. (GroupMemberId, DeliveryTaskStatus))
forall h t. h -> t -> h :. t
:. DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ DeliveryJobScope
jobScope DeliveryJobScopeRow
-> (GroupMemberId, DeliveryTaskStatus)
-> DeliveryJobScopeRow :. (GroupMemberId, DeliveryTaskStatus)
forall h t. h -> t -> h :. t
:. (GroupMemberId
senderGMId, DeliveryTaskStatus
DTSNew))

updateDeliveryTaskStatus :: DB.Connection -> Int64 -> DeliveryTaskStatus -> IO ()
updateDeliveryTaskStatus :: Connection -> GroupMemberId -> DeliveryTaskStatus -> IO ()
updateDeliveryTaskStatus Connection
db GroupMemberId
taskId DeliveryTaskStatus
status = Connection
-> GroupMemberId -> DeliveryTaskStatus -> Maybe Text -> IO ()
updateDeliveryTaskStatus_ Connection
db GroupMemberId
taskId DeliveryTaskStatus
status Maybe Text
forall a. Maybe a
Nothing

setDeliveryTaskErrStatus :: DB.Connection -> Int64 -> Text -> IO ()
setDeliveryTaskErrStatus :: Connection -> GroupMemberId -> Text -> IO ()
setDeliveryTaskErrStatus Connection
db GroupMemberId
taskId Text
errReason = Connection
-> GroupMemberId -> DeliveryTaskStatus -> Maybe Text -> IO ()
updateDeliveryTaskStatus_ Connection
db GroupMemberId
taskId DeliveryTaskStatus
DTSError (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
errReason)

updateDeliveryTaskStatus_ :: DB.Connection -> Int64 -> DeliveryTaskStatus -> Maybe Text -> IO ()
updateDeliveryTaskStatus_ :: Connection
-> GroupMemberId -> DeliveryTaskStatus -> Maybe Text -> IO ()
updateDeliveryTaskStatus_ Connection
db GroupMemberId
taskId DeliveryTaskStatus
status Maybe Text
errReason_ = do
  UTCTime
currentTs <- IO UTCTime
getCurrentTime
  Connection
-> Query
-> (DeliveryTaskStatus, Maybe Text, UTCTime, GroupMemberId)
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
    Connection
db
    Query
"UPDATE delivery_tasks SET task_status = ?, task_err_reason = ?, updated_at = ? WHERE delivery_task_id = ?"
    (DeliveryTaskStatus
status, Maybe Text
errReason_, UTCTime
currentTs, GroupMemberId
taskId)

deleteDoneDeliveryTasks :: DB.Connection -> UTCTime -> IO ()
deleteDoneDeliveryTasks :: Connection -> UTCTime -> IO ()
deleteDoneDeliveryTasks Connection
db UTCTime
createdAtCutoff = do
  Connection
-> Query
-> (UTCTime, DeliveryTaskStatus, DeliveryTaskStatus)
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
    Connection
db
    [sql|
      DELETE FROM delivery_tasks
      WHERE created_at <= ?
        AND (task_status IN (?,?) OR failed = 1)
    |]
    (UTCTime
createdAtCutoff, DeliveryTaskStatus
DTSProcessed, DeliveryTaskStatus
DTSError)

createMsgDeliveryJob :: DB.Connection -> GroupInfo -> DeliveryJobScope -> Maybe GroupMemberId -> ByteString -> IO ()
createMsgDeliveryJob :: Connection
-> GroupInfo
-> DeliveryJobScope
-> Maybe GroupMemberId
-> ByteString
-> IO ()
createMsgDeliveryJob Connection
db GroupInfo
gInfo DeliveryJobScope
jobScope Maybe GroupMemberId
singleSenderGMId_ ByteString
body = do
  UTCTime
currentTs <- IO UTCTime
getCurrentTime
  Connection
-> Query
-> (Only GroupMemberId
    :. (DeliveryJobScopeRow
        :. (Maybe GroupMemberId, Binary ByteString, DeliveryJobStatus,
            UTCTime, UTCTime)))
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
    Connection
db
    [sql|
      INSERT INTO delivery_jobs (
        group_id,
        worker_scope, job_scope_spec_tag, job_scope_include_pending, job_scope_support_gm_id,
        single_sender_group_member_id, body, job_status, created_at, updated_at
      ) VALUES (?,?,?,?,?,?,?,?,?,?)
    |]
    ((GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
groupId) Only GroupMemberId
-> (DeliveryJobScopeRow
    :. (Maybe GroupMemberId, Binary ByteString, DeliveryJobStatus,
        UTCTime, UTCTime))
-> Only GroupMemberId
   :. (DeliveryJobScopeRow
       :. (Maybe GroupMemberId, Binary ByteString, DeliveryJobStatus,
           UTCTime, UTCTime))
forall h t. h -> t -> h :. t
:. DeliveryJobScope -> DeliveryJobScopeRow
jobScopeRow_ DeliveryJobScope
jobScope DeliveryJobScopeRow
-> (Maybe GroupMemberId, Binary ByteString, DeliveryJobStatus,
    UTCTime, UTCTime)
-> DeliveryJobScopeRow
   :. (Maybe GroupMemberId, Binary ByteString, DeliveryJobStatus,
       UTCTime, UTCTime)
forall h t. h -> t -> h :. t
:. (Maybe GroupMemberId
singleSenderGMId_, ByteString -> Binary ByteString
forall a. a -> Binary a
Binary ByteString
body, DeliveryJobStatus
DJSPending, UTCTime
currentTs, UTCTime
currentTs))
  where
    GroupInfo {GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId :: GroupMemberId
groupId} = GroupInfo
gInfo

getPendingDeliveryJobScopes :: DB.Connection -> IO [DeliveryWorkerKey]
getPendingDeliveryJobScopes :: Connection -> IO [DeliveryWorkerKey]
getPendingDeliveryJobScopes Connection
db =
  Connection
-> Query -> Only DeliveryJobStatus -> IO [DeliveryWorkerKey]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
    Connection
db
    [sql|
      SELECT DISTINCT group_id, worker_scope
      FROM delivery_jobs
      WHERE failed = 0 AND job_status = ?
    |]
    (DeliveryJobStatus -> Only DeliveryJobStatus
forall a. a -> Only a
Only DeliveryJobStatus
DJSPending)

type MessageDeliveryJobRow = (Only Int64) :. DeliveryJobScopeRow :. (Maybe GroupMemberId, Binary ByteString, Maybe GroupMemberId)

getNextDeliveryJob :: DB.Connection -> DeliveryWorkerKey -> IO (Either StoreError (Maybe MessageDeliveryJob))
getNextDeliveryJob :: Connection
-> DeliveryWorkerKey
-> IO (Either StoreError (Maybe MessageDeliveryJob))
getNextDeliveryJob Connection
db DeliveryWorkerKey
deliveryKey = do
  String
-> IO (Maybe GroupMemberId)
-> (GroupMemberId -> IO (Either StoreError MessageDeliveryJob))
-> (GroupMemberId -> IO ())
-> IO (Either StoreError (Maybe MessageDeliveryJob))
forall i e a.
(Show i, AnyStoreError e) =>
String
-> IO (Maybe i)
-> (i -> IO (Either e a))
-> (i -> IO ())
-> IO (Either e (Maybe a))
getWorkItem String
"delivery job" IO (Maybe GroupMemberId)
getJobId GroupMemberId -> IO (Either StoreError MessageDeliveryJob)
getJob GroupMemberId -> IO ()
markJobFailed
  where
    (GroupMemberId
groupId, DeliveryWorkerScope
workerScope) = DeliveryWorkerKey
deliveryKey
    getJobId :: IO (Maybe Int64)
    getJobId :: IO (Maybe GroupMemberId)
getJobId =
      (Only GroupMemberId -> GroupMemberId)
-> IO [Only GroupMemberId] -> IO (Maybe GroupMemberId)
forall (f :: * -> *) a b.
Functor f =>
(a -> b) -> f [a] -> f (Maybe b)
maybeFirstRow Only GroupMemberId -> GroupMemberId
forall a. Only a -> a
fromOnly (IO [Only GroupMemberId] -> IO (Maybe GroupMemberId))
-> IO [Only GroupMemberId] -> IO (Maybe GroupMemberId)
forall a b. (a -> b) -> a -> b
$
        Connection
-> Query
-> (GroupMemberId, DeliveryWorkerScope, DeliveryJobStatus)
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
          Connection
db
          [sql|
            SELECT delivery_job_id
            FROM delivery_jobs
            WHERE group_id = ? AND worker_scope = ?
              AND failed = 0 AND job_status = ?
            ORDER BY delivery_job_id ASC
            LIMIT 1
          |]
          (GroupMemberId
groupId, DeliveryWorkerScope
workerScope, DeliveryJobStatus
DJSPending)
    getJob :: Int64 -> IO (Either StoreError MessageDeliveryJob)
    getJob :: GroupMemberId -> IO (Either StoreError MessageDeliveryJob)
getJob GroupMemberId
jobId =
      (MessageDeliveryJobRow -> Either StoreError MessageDeliveryJob)
-> StoreError
-> IO [MessageDeliveryJobRow]
-> IO (Either StoreError MessageDeliveryJob)
forall a e b. (a -> Either e b) -> e -> IO [a] -> IO (Either e b)
firstRow' MessageDeliveryJobRow -> Either StoreError MessageDeliveryJob
toDeliveryJob (GroupMemberId -> StoreError
SEDeliveryJobNotFound GroupMemberId
jobId) (IO [MessageDeliveryJobRow]
 -> IO (Either StoreError MessageDeliveryJob))
-> IO [MessageDeliveryJobRow]
-> IO (Either StoreError MessageDeliveryJob)
forall a b. (a -> b) -> a -> b
$
        Connection
-> Query -> Only GroupMemberId -> IO [MessageDeliveryJobRow]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
          Connection
db
          [sql|
            SELECT
              delivery_job_id,
              worker_scope, job_scope_spec_tag, job_scope_include_pending, job_scope_support_gm_id,
              single_sender_group_member_id, body, cursor_group_member_id
            FROM delivery_jobs
            WHERE delivery_job_id = ?
          |]
          (GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
jobId)
      where
        toDeliveryJob :: MessageDeliveryJobRow -> Either StoreError MessageDeliveryJob
        toDeliveryJob :: MessageDeliveryJobRow -> Either StoreError MessageDeliveryJob
toDeliveryJob ((Only GroupMemberId
jobId') :. DeliveryJobScopeRow
jobScopeRow :. (Maybe GroupMemberId
singleSenderGMId_, Binary ByteString
body, Maybe GroupMemberId
cursorGMId_)) =
          case DeliveryJobScopeRow -> Maybe DeliveryJobScope
toJobScope_ DeliveryJobScopeRow
jobScopeRow of
            Just DeliveryJobScope
jobScope -> MessageDeliveryJob -> Either StoreError MessageDeliveryJob
forall a b. b -> Either a b
Right (MessageDeliveryJob -> Either StoreError MessageDeliveryJob)
-> MessageDeliveryJob -> Either StoreError MessageDeliveryJob
forall a b. (a -> b) -> a -> b
$ MessageDeliveryJob {jobId :: GroupMemberId
jobId = GroupMemberId
jobId', DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope :: DeliveryJobScope
jobScope, Maybe GroupMemberId
singleSenderGMId_ :: Maybe GroupMemberId
singleSenderGMId_ :: Maybe GroupMemberId
singleSenderGMId_, ByteString
body :: ByteString
body :: ByteString
body, Maybe GroupMemberId
cursorGMId_ :: Maybe GroupMemberId
cursorGMId_ :: Maybe GroupMemberId
cursorGMId_}
            Maybe DeliveryJobScope
Nothing -> StoreError -> Either StoreError MessageDeliveryJob
forall a b. a -> Either a b
Left (StoreError -> Either StoreError MessageDeliveryJob)
-> StoreError -> Either StoreError MessageDeliveryJob
forall a b. (a -> b) -> a -> b
$ GroupMemberId -> StoreError
SEInvalidDeliveryJob GroupMemberId
jobId'
    markJobFailed :: Int64 -> IO ()
    markJobFailed :: GroupMemberId -> IO ()
markJobFailed GroupMemberId
jobId =
      Connection -> Query -> Only GroupMemberId -> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute Connection
db Query
"UPDATE delivery_jobs SET failed = 1 where delivery_job_id = ?" (GroupMemberId -> Only GroupMemberId
forall a. a -> Only a
Only GroupMemberId
jobId)

updateDeliveryJobStatus :: DB.Connection -> Int64 -> DeliveryJobStatus -> IO ()
updateDeliveryJobStatus :: Connection -> GroupMemberId -> DeliveryJobStatus -> IO ()
updateDeliveryJobStatus Connection
db GroupMemberId
jobId DeliveryJobStatus
status = Connection
-> GroupMemberId -> DeliveryJobStatus -> Maybe Text -> IO ()
updateDeliveryJobStatus_ Connection
db GroupMemberId
jobId DeliveryJobStatus
status Maybe Text
forall a. Maybe a
Nothing

setDeliveryJobErrStatus :: DB.Connection -> Int64 -> Text -> IO ()
setDeliveryJobErrStatus :: Connection -> GroupMemberId -> Text -> IO ()
setDeliveryJobErrStatus Connection
db GroupMemberId
jobId Text
errReason = Connection
-> GroupMemberId -> DeliveryJobStatus -> Maybe Text -> IO ()
updateDeliveryJobStatus_ Connection
db GroupMemberId
jobId DeliveryJobStatus
DJSError (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
errReason)

updateDeliveryJobStatus_ :: DB.Connection -> Int64 -> DeliveryJobStatus -> Maybe Text -> IO ()
updateDeliveryJobStatus_ :: Connection
-> GroupMemberId -> DeliveryJobStatus -> Maybe Text -> IO ()
updateDeliveryJobStatus_ Connection
db GroupMemberId
jobId DeliveryJobStatus
status Maybe Text
errReason_ = do
  UTCTime
currentTs <- IO UTCTime
getCurrentTime
  Connection
-> Query
-> (DeliveryJobStatus, Maybe Text, UTCTime, GroupMemberId)
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
    Connection
db
    Query
"UPDATE delivery_jobs SET job_status = ?, job_err_reason = ?, updated_at = ? WHERE delivery_job_id = ?"
    (DeliveryJobStatus
status, Maybe Text
errReason_, UTCTime
currentTs, GroupMemberId
jobId)

-- TODO [channels fwd] possible improvement is to prioritize owners and "active" members
getGroupMembersByCursor :: DB.Connection -> VersionRangeChat -> User -> GroupInfo -> Maybe GroupMemberId -> Maybe GroupMemberId -> Int -> IO [GroupMember]
getGroupMembersByCursor :: Connection
-> VersionRangeChat
-> User
-> GroupInfo
-> Maybe GroupMemberId
-> Maybe GroupMemberId
-> Int
-> IO [GroupMember]
getGroupMembersByCursor Connection
db VersionRangeChat
vr user :: User
user@User {GroupMemberId
userContactId :: GroupMemberId
userContactId :: User -> GroupMemberId
userContactId} GroupInfo {GroupMemberId
groupId :: GroupInfo -> GroupMemberId
groupId :: GroupMemberId
groupId} Maybe GroupMemberId
cursorGMId_ Maybe GroupMemberId
singleSenderGMId_ Int
count = do
  [GroupMemberId]
gmIds :: [Int64] <-
    (Only GroupMemberId -> GroupMemberId)
-> [Only GroupMemberId] -> [GroupMemberId]
forall a b. (a -> b) -> [a] -> [b]
map Only GroupMemberId -> GroupMemberId
forall a. Only a -> a
fromOnly ([Only GroupMemberId] -> [GroupMemberId])
-> IO [Only GroupMemberId] -> IO [GroupMemberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> case Maybe GroupMemberId
cursorGMId_ of
      Maybe GroupMemberId
Nothing ->
        Connection
-> Query
-> ((GroupMemberId, GroupMemberId, Maybe GroupMemberId,
     GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
     GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
    :. Only Int)
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
          Connection
db
          (Query
query Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
orderLimit)
          ( (GroupMemberId
groupId, GroupMemberId
userContactId, Maybe GroupMemberId
singleSenderGMId_, GroupMemberStatus
GSMemIntroduced, GroupMemberStatus
GSMemIntroInvited, GroupMemberStatus
GSMemAccepted, GroupMemberStatus
GSMemAnnounced, GroupMemberStatus
GSMemConnected, GroupMemberStatus
GSMemComplete)
              (GroupMemberId, GroupMemberId, Maybe GroupMemberId,
 GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
 GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
-> Only Int
-> (GroupMemberId, GroupMemberId, Maybe GroupMemberId,
    GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
    GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
   :. Only Int
forall h t. h -> t -> h :. t
:. (Int -> Only Int
forall a. a -> Only a
Only Int
count)
          )
      Just GroupMemberId
cursorGMId ->
        Connection
-> Query
-> ((GroupMemberId, GroupMemberId, Maybe GroupMemberId,
     GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
     GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
    :. (GroupMemberId, Int))
-> IO [Only GroupMemberId]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
DB.query
          Connection
db
          (Query
query Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
" AND group_member_id > ?" Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
orderLimit)
          ( (GroupMemberId
groupId, GroupMemberId
userContactId, Maybe GroupMemberId
singleSenderGMId_, GroupMemberStatus
GSMemIntroduced, GroupMemberStatus
GSMemIntroInvited, GroupMemberStatus
GSMemAccepted, GroupMemberStatus
GSMemAnnounced, GroupMemberStatus
GSMemConnected, GroupMemberStatus
GSMemComplete)
              (GroupMemberId, GroupMemberId, Maybe GroupMemberId,
 GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
 GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
-> (GroupMemberId, Int)
-> (GroupMemberId, GroupMemberId, Maybe GroupMemberId,
    GroupMemberStatus, GroupMemberStatus, GroupMemberStatus,
    GroupMemberStatus, GroupMemberStatus, GroupMemberStatus)
   :. (GroupMemberId, Int)
forall h t. h -> t -> h :. t
:. (GroupMemberId
cursorGMId, Int
count)
          )
#if defined(dbPostgres)
  map (toContactMember vr user) <$>
    DB.query
      db
      (groupMemberQuery <> " WHERE m.group_member_id IN ?")
      (Only (In gmIds))
#else
  [Either StoreError GroupMember] -> [GroupMember]
forall a b. [Either a b] -> [b]
rights ([Either StoreError GroupMember] -> [GroupMember])
-> IO [Either StoreError GroupMember] -> IO [GroupMember]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (GroupMemberId -> IO (Either StoreError GroupMember))
-> [GroupMemberId] -> IO [Either StoreError GroupMember]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ExceptT StoreError IO GroupMember
-> IO (Either StoreError GroupMember)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT StoreError IO GroupMember
 -> IO (Either StoreError GroupMember))
-> (GroupMemberId -> ExceptT StoreError IO GroupMember)
-> GroupMemberId
-> IO (Either StoreError GroupMember)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection
-> VersionRangeChat
-> User
-> GroupMemberId
-> ExceptT StoreError IO GroupMember
getGroupMemberById Connection
db VersionRangeChat
vr User
user) [GroupMemberId]
gmIds
#endif
  where
    query :: Query
query =
      [sql|
        SELECT group_member_id
        FROM group_members
        WHERE group_id = ?
          AND contact_id IS DISTINCT FROM ?
          AND group_member_id IS DISTINCT FROM ?
          AND member_status IN (?,?,?,?,?,?)
      |]
    orderLimit :: Query
orderLimit = Query
" ORDER BY group_member_id ASC LIMIT ?"

updateDeliveryJobCursor :: DB.Connection -> Int64 -> GroupMemberId -> IO ()
updateDeliveryJobCursor :: Connection -> GroupMemberId -> GroupMemberId -> IO ()
updateDeliveryJobCursor Connection
db GroupMemberId
jobId GroupMemberId
cursorGMId = do
  UTCTime
currentTs <- IO UTCTime
getCurrentTime
  Connection
-> Query -> (GroupMemberId, UTCTime, GroupMemberId) -> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
    Connection
db
    Query
"UPDATE delivery_jobs SET cursor_group_member_id = ?, updated_at = ? WHERE delivery_job_id = ?"
    (GroupMemberId
cursorGMId, UTCTime
currentTs, GroupMemberId
jobId)

deleteDoneDeliveryJobs :: DB.Connection -> UTCTime -> IO ()
deleteDoneDeliveryJobs :: Connection -> UTCTime -> IO ()
deleteDoneDeliveryJobs Connection
db UTCTime
createdAtCutoff = do
  Connection
-> Query
-> (UTCTime, DeliveryJobStatus, DeliveryJobStatus)
-> IO ()
forall q. ToRow q => Connection -> Query -> q -> IO ()
DB.execute
    Connection
db
    [sql|
      DELETE FROM delivery_jobs
      WHERE created_at <= ?
        AND (job_status IN (?,?) OR failed = 1)
    |]
    (UTCTime
createdAtCutoff, DeliveryJobStatus
DJSComplete, DeliveryJobStatus
DJSError)