class Toiler::Actor::Fetcher
Actor
polling for messages only when processors are ready, otherwise idle
Constants
- FETCH_LIMIT
Attributes
concurrency[RW]
executing[RW]
free_processors[RW]
queue[RW]
visibility_timeout[RW]
wait[RW]
waiting_messages[RW]
Public Class Methods
new(queue, client, count)
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 15 def initialize(queue, client, count) debug "Initializing Fetcher for queue #{queue}..." @queue = Toiler::Aws::Queue.new queue, client @wait = Toiler.options[:wait] || 60 @free_processors = count @batch = Toiler.worker_class_registry[queue].batch? @visibility_timeout = @queue.visibility_timeout @executing = false @waiting_messages = 0 @concurrency = count debug "Finished initializing Fetcher for queue #{queue}" tell :poll_messages end
Public Instance Methods
default_executor()
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 29 def default_executor Concurrent.global_fast_executor end
executing?()
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 44 def executing? @executing end
on_message(msg)
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 33 def on_message(msg) @executing = true method, *args = msg send(method, *args) rescue StandardError, SystemStackError => e # rescue SystemStackError, if we misbehave and cause a stack level too deep exception, we should be able to recover error "Fetcher #{queue.name} raised exception #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" ensure @executing = false end
Private Instance Methods
assign_messages(messages)
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 113 def assign_messages(messages) messages = [messages] if batch? messages.each do |m| processor_pool.tell [:process, visibility_timeout, m] @free_processors -= 1 end debug "Fetcher #{queue.name} assigned #{messages.count} messages" end
batch?()
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 50 def batch? @batch end
full_batch?(max_number_of_messages)
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 105 def full_batch?(max_number_of_messages) max_number_of_messages == FETCH_LIMIT || max_number_of_messages >= concurrency * 0.1 end
max_messages()
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 60 def max_messages batch? ? FETCH_LIMIT : [FETCH_LIMIT, free_processors].min end
poll_future(max_number_of_messages)
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 64 def poll_future(max_number_of_messages) Concurrent::Promises.future do queue.receive_messages attribute_names: %w[All], message_attribute_names: %w[All], wait_time_seconds: wait, max_number_of_messages: max_number_of_messages end end
poll_messages()
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 77 def poll_messages return unless should_poll? max_number_of_messages = max_messages return if waiting_messages > 0 && !full_batch?(max_number_of_messages) @waiting_messages += max_number_of_messages debug "Fetcher #{queue.name} polling messages..." future = poll_future max_number_of_messages future.on_rejection! do tell [:release_messages, max_number_of_messages] tell :poll_messages end future.on_fulfillment! do |msgs| tell [:assign_messages, msgs] if !msgs.nil? && !msgs.empty? tell [:release_messages, max_number_of_messages] tell :poll_messages end # defer method execution to avoid recursion tell :poll_messages if should_poll? end
processor_finished()
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 54 def processor_finished debug "Fetcher #{queue.name} received processor finished signal..." @free_processors += 1 tell :poll_messages end
processor_pool()
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 109 def processor_pool @processor_pool ||= Toiler.processor_pool queue.name end
release_messages(messages)
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 73 def release_messages(messages) @waiting_messages -= messages end
should_poll?()
click to toggle source
# File lib/toiler/actor/fetcher.rb, line 101 def should_poll? free_processors / 2 > waiting_messages end