class Toiler::Actor::Fetcher

Actor polling for messages only when processors are ready, otherwise idle

Constants

FETCH_LIMIT

Attributes

concurrency[RW]
executing[RW]
free_processors[RW]
queue[RW]
visibility_timeout[RW]
wait[RW]
waiting_messages[RW]

Public Class Methods

new(queue, client, count) click to toggle source
# File lib/toiler/actor/fetcher.rb, line 15
def initialize(queue, client, count)
  debug "Initializing Fetcher for queue #{queue}..."
  @queue = Toiler::Aws::Queue.new queue, client
  @wait = Toiler.options[:wait] || 60
  @free_processors = count
  @batch = Toiler.worker_class_registry[queue].batch?
  @visibility_timeout = @queue.visibility_timeout
  @executing = false
  @waiting_messages = 0
  @concurrency = count
  debug "Finished initializing Fetcher for queue #{queue}"
  tell :poll_messages
end

Public Instance Methods

default_executor() click to toggle source
# File lib/toiler/actor/fetcher.rb, line 29
def default_executor
  Concurrent.global_fast_executor
end
executing?() click to toggle source
# File lib/toiler/actor/fetcher.rb, line 44
def executing?
  @executing
end
on_message(msg) click to toggle source
# File lib/toiler/actor/fetcher.rb, line 33
def on_message(msg)
  @executing = true
  method, *args = msg
  send(method, *args)
rescue StandardError, SystemStackError => e
  # rescue SystemStackError, if we misbehave and cause a stack level too deep exception, we should be able to recover
  error "Fetcher #{queue.name} raised exception #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
ensure
  @executing = false
end

Private Instance Methods

assign_messages(messages) click to toggle source
# File lib/toiler/actor/fetcher.rb, line 113
def assign_messages(messages)
  messages = [messages] if batch?
  messages.each do |m|
    processor_pool.tell [:process, visibility_timeout, m]
    @free_processors -= 1
  end
  debug "Fetcher #{queue.name} assigned #{messages.count} messages"
end
batch?() click to toggle source
# File lib/toiler/actor/fetcher.rb, line 50
def batch?
  @batch
end
full_batch?(max_number_of_messages) click to toggle source
# File lib/toiler/actor/fetcher.rb, line 105
def full_batch?(max_number_of_messages)
  max_number_of_messages == FETCH_LIMIT || max_number_of_messages >= concurrency * 0.1
end
max_messages() click to toggle source
# File lib/toiler/actor/fetcher.rb, line 60
def max_messages
  batch? ? FETCH_LIMIT : [FETCH_LIMIT, free_processors].min
end
poll_future(max_number_of_messages) click to toggle source
# File lib/toiler/actor/fetcher.rb, line 64
def poll_future(max_number_of_messages)
  Concurrent::Promises.future do
    queue.receive_messages attribute_names: %w[All],
                           message_attribute_names: %w[All],
                           wait_time_seconds: wait,
                           max_number_of_messages: max_number_of_messages
  end
end
poll_messages() click to toggle source
# File lib/toiler/actor/fetcher.rb, line 77
def poll_messages
  return unless should_poll?

  max_number_of_messages = max_messages
  return if waiting_messages > 0 && !full_batch?(max_number_of_messages)

  @waiting_messages += max_number_of_messages

  debug "Fetcher #{queue.name} polling messages..."
  future = poll_future max_number_of_messages
  future.on_rejection! do
    tell [:release_messages, max_number_of_messages]
    tell :poll_messages
  end
  future.on_fulfillment! do |msgs|
    tell [:assign_messages, msgs] if !msgs.nil? && !msgs.empty?
    tell [:release_messages, max_number_of_messages]
    tell :poll_messages
  end

  # defer method execution to avoid recursion
  tell :poll_messages if should_poll?
end
processor_finished() click to toggle source
# File lib/toiler/actor/fetcher.rb, line 54
def processor_finished
  debug "Fetcher #{queue.name} received processor finished signal..."
  @free_processors += 1
  tell :poll_messages
end
processor_pool() click to toggle source
# File lib/toiler/actor/fetcher.rb, line 109
def processor_pool
  @processor_pool ||= Toiler.processor_pool queue.name
end
release_messages(messages) click to toggle source
# File lib/toiler/actor/fetcher.rb, line 73
def release_messages(messages)
  @waiting_messages -= messages
end
should_poll?() click to toggle source
# File lib/toiler/actor/fetcher.rb, line 101
def should_poll?
  free_processors / 2 > waiting_messages
end