class ActivePublisher::Async::InMemoryAdapter::ConsumerThread
Constants
- CHANNEL_CLOSED_ERRORS
- NETWORK_ERRORS
- PRECONDITION_ERRORS
Attributes
channel[R]
flush_max[R]
last_tick_at[R]
queue[R]
sampled_queue_size[R]
thread[R]
Public Class Methods
new(listen_queue)
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 27 def initialize(listen_queue) @queue = listen_queue @sampled_queue_size = queue.size @flush_max = ::ActivePublisher.configuration.messages_per_batch update_last_tick_at start_thread end
Public Instance Methods
alive?()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 36 def alive? @thread && @thread.alive? end
kill()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 40 def kill @thread.kill if @thread @thread = nil end
Private Instance Methods
await_network_reconnect()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 47 def await_network_reconnect if defined?(ActivePublisher::RabbitConnection) sleep ::ActivePublisher::RabbitConnection::NETWORK_RECOVERY_INTERVAL else sleep 0.1 end end
cleanup_up_channel()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 55 def cleanup_up_channel return if channel.nil? channel.close rescue => error ::ActivePublisher.configuration.error_handler.call(error, {:status => "Cleaning up the channel"}) end
handle_current_messages_on_unknown_error(current_messages)
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 62 def handle_current_messages_on_unknown_error(current_messages) current_messages.each do |message| # Degrade to single message publish ... or at least attempt to begin ::ActivePublisher.publish(message.route, message.payload, message.exchange_name, message.options) current_messages.delete(message) rescue *CHANNEL_CLOSED_ERRORS # If the channel is bad, raise! raise rescue *PRECONDITION_ERRORS => error # Delete messages if rabbitmq cannot declare the exchange (or somet other precondition failed). ::ActivePublisher.configuration.error_handler.call(error, {:reason => "precondition failed", :message => message}) current_messages.delete(message) rescue => other_error ::ActivePublisher.configuration.error_handler.call(other_error, {:route => message.route, :payload => message.payload, :exchange_name => message.exchange_name, :options => message.options}) end end end
make_channel()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 81 def make_channel channel = ::ActivePublisher::Async::InMemoryAdapter::Channel.new channel.confirm_select if ::ActivePublisher.configuration.publisher_confirms channel end
publish_all(exchange_name, messages)
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 140 def publish_all(exchange_name, messages) exchange = channel.topic(exchange_name) messages.each do |message| fail ::ActivePublisher::ExchangeMismatchError, "bulk publish messages must match publish_all exchange_name" if message.exchange_name != exchange_name ::ActiveSupport::Notifications.instrument "message_published.active_publisher", :route => message.route, :message_count => 1 do options = ::ActivePublisher.publishing_options(message.route, message.options || {}) exchange.publish(message.payload, options) end end wait_for_confirms end
start_consuming_thread()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 96 def start_consuming_thread loop do # Sample the queue size so we don't shutdown when messages are in flight. @sampled_queue_size = queue.size current_messages = queue.pop_up_to(flush_max, :timeout => 0.1) update_last_tick_at # If the queue is empty, we should continue to update to "last_tick_at" time. next if current_messages.nil? @channel ||= make_channel # We only look at active publisher messages. Everything else is dropped. current_messages.select! { |message| message.is_a?(::ActivePublisher::Message) } begin # Only open a single connection for each group of messages to an exchange current_messages.group_by(&:exchange_name).each do |exchange_name, messages| publish_all(exchange_name, messages) current_messages -= messages end rescue *CHANNEL_CLOSED_ERRORS # If the channel is bad, raise without sending one-by-one! raise rescue *NETWORK_ERRORS # Sleep because connection is down await_network_reconnect rescue => unknown_error ::ActivePublisher.configuration.error_handler.call(unknown_error, {:number_of_messages => current_messages.size}) # Attempt to deliver a message one-by-one. Raise if a closed channel error appears. handle_current_messages_on_unknown_error(current_messages) # TODO: Find a way to bubble this out of the thread for logging purposes. # Reraise the error out of the publisher loop. The Supervisor will restart the consumer. raise unknown_error ensure # Always requeue anything that gets stuck. queue.concat(current_messages) if current_messages && !current_messages.empty? end end ensure cleanup_up_channel end
start_thread()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 91 def start_thread return if alive? @thread = ::Thread.new { start_consuming_thread } end
update_last_tick_at()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 87 def update_last_tick_at @last_tick_at = ::Time.now end
wait_for_confirms()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 152 def wait_for_confirms return true unless channel.using_publisher_confirms? ::ActiveSupport::Notifications.instrument "publishes_confirmed.active_publisher" do channel.wait_for_confirms(::ActivePublisher.configuration.publisher_confirms_timeout) end end