class Shoryuken::Later::Poller

Attributes

table_name[R]

Public Class Methods

new(table_name) click to toggle source
# File lib/shoryuken/later/poller.rb, line 10
def initialize(table_name)
  @table_name = table_name
end

Public Instance Methods

poll() click to toggle source
# File lib/shoryuken/later/poller.rb, line 14
def poll
  watchdog('Later::Poller#poll died') do
    started_at = Time.now
    
    logger.debug { "Polling for scheduled messages in '#{table_name}'" }
    
    begin
      while item = next_item
        id = item['id']
        logger.info "Found message #{id} from '#{table_name}'"
        if sent_msg = process_item(item)
          logger.debug { "Enqueued message #{id} from '#{table_name}'" }
        else
          logger.debug { "Skipping already queued message #{id} from '#{table_name}'" }
        end
      end
  
      logger.debug { "Poller for '#{table_name}' completed in #{elapsed(started_at)} ms" }
    rescue => ex
      logger.error "Error fetching message: #{ex}"
      logger.error ex.backtrace.first
    end
  end
end

Private Instance Methods

client() click to toggle source
# File lib/shoryuken/later/poller.rb, line 41
def client
  Shoryuken::Later::Client
end
next_item() click to toggle source

Fetches the next available item from the schedule table.

# File lib/shoryuken/later/poller.rb, line 46
def next_item
  client.first_item table_name, 'perform_at' => {
          attribute_value_list: [ (Time.now + Shoryuken::Later::MAX_QUEUE_DELAY).to_i ],
          comparison_operator:  'LT'
        }
end
process_item(item) click to toggle source

Processes an item and enqueues it (unless another actor has already enqueued it).

# File lib/shoryuken/later/poller.rb, line 54
def process_item(item)
  time, worker_class, args, id = item.values_at('perform_at','shoryuken_class','shoryuken_args','id')
  
  worker_class = worker_class.constantize
  args = JSON.parse(args)
  time = Time.at(time)
  queue_name = item['shoryuken_queue']
  
  # Conditionally delete an item prior to enqueuing it, ensuring only one actor may enqueue it.
  begin client.delete_item table_name, item
  rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e
    # Item was already deleted, so it does not need to be queued.
    return
  end

  # Now the item is safe to be enqueued, since the conditional delete succeeded.
  body, options = args.values_at('body','options')
  if queue_name.nil?
    worker_class.perform_in(time, body, options)
    
  # For compatibility with Shoryuken's ActiveJob adapter, support an explicit queue name.
  else
    delay = (time - Time.now).to_i
    body = JSON.dump(body) if body.is_a? Hash
    options[:delay_seconds] = delay if delay > 0
    options[:message_body] = body
    options[:message_attributes] ||= {}
    options[:message_attributes]['shoryuken_class'] = { string_value: worker_class.to_s, data_type: 'String' }
    Shoryuken::Client.queues(queue_name).send_message(options)
  end
end