class Shoryuken::Waiter::Enqueuer

Public Class Methods

enqueue_items(table, items) click to toggle source

TODO If there is ever support for tables that don’t map 1:1 to queues, a query may return items intended for many queues, so in order to be able to batch then with send_messages, the would need to be sorted

TODO If the actor were to crash in the middle of a batch, messages could get lost

# File lib/shoryuken/waiter/enqueuer.rb, line 13
def enqueue_items(table, items)
  return if items.empty?

  queue_name = items.first["sqs_message_body"]["queue_name"]
  queue = Shoryuken::Client.queues(queue_name)

  send_messages(queue, items.map { |item| message(table, item) })
end

Private Class Methods

delete_item(table, item) click to toggle source

TODO Maybe use BatchWriteItem to delete many items at once

# File lib/shoryuken/waiter/enqueuer.rb, line 53
def delete_item(table, item)
  table.delete_item(
    key: {
      "scheduler": Shoryuken::Waiter::TABLE_PRIMARY_ITEM_KEY_VALUE,
      "job_id": item["sqs_message_body"]["job_id"]
    },
    return_values: "NONE",
    return_consumed_capacity: "NONE",
    return_item_collection_metrics: "NONE"
  )
end
item_deleted?(table, item) click to toggle source
# File lib/shoryuken/waiter/enqueuer.rb, line 44
def item_deleted?(table, item)
  delete_item(table, item)
  logger.debug { "[Shoryuken::Waiter] Deleting 1 delayed message from '#{table.table_name}'" }
  return true
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException
  return false
end
message(table, item) click to toggle source
# File lib/shoryuken/waiter/enqueuer.rb, line 24
def message(table, item)
  if item_deleted?(table, item)
    # TODO Only return the message if it has all the parts it needs
    {
      message_body: item["sqs_message_body"],
      delay_seconds: (Time.at(item["perform_at"]) - Time.now).to_i,
      message_attributes: item["sqs_message_attributes"].map do |k, v|
        [k, (v.map { |_k, _v| [_k.to_sym, _v] }.to_h)]
      end.to_h
    }
  end
end
send_messages(queue, messages) click to toggle source
# File lib/shoryuken/waiter/enqueuer.rb, line 37
def send_messages(queue, messages)
  messages.compact.each_slice(10) do |batch|
    logger.info { "[Shoryuken::Waiter] Queueing #{batch.count} delayed messages to '#{queue.name}'" }
    queue.send_messages(batch)
  end
end