class Shoryuken::Waiter::Querier
Public Class Methods
new()
click to toggle source
# File lib/shoryuken/waiter/querier.rb, line 7 def initialize delay = Shoryuken::Waiter.poll_delay logger.debug { "[Shoryuken::Waiter] Checking for delayed messages every #{delay} seconds" } @timer = every(delay) { poll } end
Private Instance Methods
poll()
click to toggle source
# File lib/shoryuken/waiter/querier.rb, line 15 def poll Shoryuken::Waiter.tables.each { |table| poll_table(table) } end
poll_table(table)
click to toggle source
# File lib/shoryuken/waiter/querier.rb, line 19 def poll_table(table) logger.debug { "[Shoryuken::Waiter] Looking for delayed messages in '#{table.table_name}' ready to be queued" } query_results(table).each do |response| items = response.items logger.debug { "[Shoryuken::Waiter] Found #{items.count} delayed messages in '#{table.table_name}'" } Shoryuken::Waiter::Enqueuer.enqueue_items(table, items) end end
query_options(threshold)
click to toggle source
# File lib/shoryuken/waiter/querier.rb, line 34 def query_options(threshold) { index_name: "scheduler-perform_at-index", select: "SPECIFIC_ATTRIBUTES", consistent_read: true, projection_expression: [ "perform_at", "sqs_message_body", "sqs_message_attributes" ].join(","), return_consumed_capacity: "NONE", key_condition_expression: [ "#H = :hashval", "#R < :rangeval" ].join(" AND "), expression_attribute_names: { "#H": "scheduler", "#R": "perform_at" }, expression_attribute_values: { ":hashval": Shoryuken::Waiter::TABLE_PRIMARY_ITEM_KEY_VALUE, ":rangeval": threshold } } end
query_results(table)
click to toggle source
# File lib/shoryuken/waiter/querier.rb, line 29 def query_results(table) threshold = (Time.now + Shoryuken::Waiter::MAX_QUEUE_DELAY).to_f table.query(query_options(threshold)) end