class ActivePublisher::Async::InMemoryAdapter::ConsumerThread

Constants

CHANNEL_CLOSED_ERRORS
NETWORK_ERRORS
PRECONDITION_ERRORS

Attributes

channel[R]
flush_max[R]
last_tick_at[R]
queue[R]
sampled_queue_size[R]
thread[R]

Public Class Methods

new(listen_queue) click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 27
def initialize(listen_queue)
  @queue = listen_queue
  @sampled_queue_size = queue.size
  @flush_max = ::ActivePublisher.configuration.messages_per_batch

  update_last_tick_at
  start_thread
end

Public Instance Methods

alive?() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 36
def alive?
  @thread && @thread.alive?
end
kill() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 40
def kill
  @thread.kill if @thread
  @thread = nil
end

Private Instance Methods

await_network_reconnect() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 47
def await_network_reconnect
  if defined?(ActivePublisher::RabbitConnection)
    sleep ::ActivePublisher::RabbitConnection::NETWORK_RECOVERY_INTERVAL
  else
    sleep 0.1
  end
end
cleanup_up_channel() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 55
def cleanup_up_channel
  return if channel.nil?
  channel.close
rescue => error
  ::ActivePublisher.configuration.error_handler.call(error, {:status => "Cleaning up the channel"})
end
handle_current_messages_on_unknown_error(current_messages) click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 62
def handle_current_messages_on_unknown_error(current_messages)
  current_messages.each do |message|
    # Degrade to single message publish ... or at least attempt to
    begin
      ::ActivePublisher.publish(message.route, message.payload, message.exchange_name, message.options)
      current_messages.delete(message)
    rescue *CHANNEL_CLOSED_ERRORS
      # If the channel is bad, raise!
      raise
    rescue *PRECONDITION_ERRORS => error
      # Delete messages if rabbitmq cannot declare the exchange (or somet other precondition failed).
      ::ActivePublisher.configuration.error_handler.call(error, {:reason => "precondition failed", :message => message})
      current_messages.delete(message)
    rescue => other_error
      ::ActivePublisher.configuration.error_handler.call(other_error, {:route => message.route, :payload => message.payload, :exchange_name => message.exchange_name, :options => message.options})
    end
  end
end
make_channel() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 81
def make_channel
  channel = ::ActivePublisher::Async::InMemoryAdapter::Channel.new
  channel.confirm_select if ::ActivePublisher.configuration.publisher_confirms
  channel
end
publish_all(exchange_name, messages) click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 140
def publish_all(exchange_name, messages)
  exchange = channel.topic(exchange_name)
  messages.each do |message|
    fail ::ActivePublisher::ExchangeMismatchError, "bulk publish messages must match publish_all exchange_name" if message.exchange_name != exchange_name
    ::ActiveSupport::Notifications.instrument "message_published.active_publisher", :route => message.route, :message_count => 1 do
      options = ::ActivePublisher.publishing_options(message.route, message.options || {})
      exchange.publish(message.payload, options)
    end
  end
  wait_for_confirms
end
start_consuming_thread() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 96
def start_consuming_thread
  loop do
    # Sample the queue size so we don't shutdown when messages are in flight.
    @sampled_queue_size = queue.size
    current_messages = queue.pop_up_to(flush_max, :timeout => 0.1)
    update_last_tick_at
    # If the queue is empty, we should continue to update to "last_tick_at" time.
    next if current_messages.nil?

    @channel ||= make_channel

    # We only look at active publisher messages. Everything else is dropped.
    current_messages.select! { |message| message.is_a?(::ActivePublisher::Message) }

    begin
      # Only open a single connection for each group of messages to an exchange
      current_messages.group_by(&:exchange_name).each do |exchange_name, messages|
        publish_all(exchange_name, messages)
        current_messages -= messages
      end
    rescue *CHANNEL_CLOSED_ERRORS
      # If the channel is bad, raise without sending one-by-one!
      raise
    rescue *NETWORK_ERRORS
      # Sleep because connection is down
      await_network_reconnect
    rescue => unknown_error
      ::ActivePublisher.configuration.error_handler.call(unknown_error, {:number_of_messages => current_messages.size})

      # Attempt to deliver a message one-by-one. Raise if a closed channel error appears.
      handle_current_messages_on_unknown_error(current_messages)

      # TODO: Find a way to bubble this out of the thread for logging purposes.
      # Reraise the error out of the publisher loop. The Supervisor will restart the consumer.
      raise unknown_error
    ensure
      # Always requeue anything that gets stuck.
      queue.concat(current_messages) if current_messages && !current_messages.empty?
    end
  end
ensure
  cleanup_up_channel
end
start_thread() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 91
def start_thread
  return if alive?
  @thread = ::Thread.new { start_consuming_thread }
end
update_last_tick_at() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 87
def update_last_tick_at
  @last_tick_at = ::Time.now
end
wait_for_confirms() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/consumer_thread.rb, line 152
def wait_for_confirms
  return true unless channel.using_publisher_confirms?
  ::ActiveSupport::Notifications.instrument "publishes_confirmed.active_publisher" do
    channel.wait_for_confirms(::ActivePublisher.configuration.publisher_confirms_timeout)
  end
end