module ActionSubscriber::Bunny::Subscriber

Public Instance Methods

bunny_consumers() click to toggle source
# File lib/action_subscriber/bunny/subscriber.rb, line 7
def bunny_consumers
  @bunny_consumers ||= []
end
cancel_consumers!() click to toggle source
# File lib/action_subscriber/bunny/subscriber.rb, line 11
def cancel_consumers!
  bunny_consumers.each(&:cancel)
  ::ActionSubscriber::ThreadPools.threadpools.each do |name, threadpool|
    threadpool.shutdown
  end
end
setup_subscriptions!() click to toggle source
# File lib/action_subscriber/bunny/subscriber.rb, line 18
def setup_subscriptions!
  fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty?
  routes.each do |route|
    subscriptions << {
      :route => route,
      :queue => setup_queue(route),
    }
  end
end
start_subscribers!() click to toggle source
# File lib/action_subscriber/bunny/subscriber.rb, line 28
def start_subscribers!
  subscriptions.each do |subscription|
    start_subscriber_for_subscription(subscription)
  end
end

Private Instance Methods

setup_queue(route) click to toggle source
# File lib/action_subscriber/bunny/subscriber.rb, line 76
def setup_queue(route)
  channel = ::ActionSubscriber::RabbitConnection.with_connection{|connection| connection.create_channel(nil, 1) }
  exchange = channel.topic(route.exchange)
  queue = channel.queue(route.queue, :durable => route.durable)
  queue.bind(exchange, :routing_key => route.routing_key)
  queue
end
start_subscriber_for_subscription(subscription) click to toggle source
# File lib/action_subscriber/bunny/subscriber.rb, line 36
def start_subscriber_for_subscription(subscription)
  route = subscription[:route]
  queue = subscription[:queue]
  channel = queue.channel
  threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name)
  channel.prefetch(route.prefetch) if route.acknowledgements?
  consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?)

  if ::ActionSubscriber.configuration.resubscribe_on_consumer_cancellation
    # Add cancellation callback to rebuild subscriber on cancel.
    consumer.on_cancellation do
      ::ActionSubscriber.logger.warn "Cancellation received for queue consumer: #{queue.name}, rebuilding subscription..."
      bunny_consumers.delete(consumer)
      channel.close
      safely_restart_subscriber(subscription)
    end
  end

  consumer.on_delivery do |delivery_info, properties, encoded_payload|
    ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
    properties = {
      :action => route.action,
      :channel => queue.channel,
      :content_type => properties.content_type,
      :delivery_tag => delivery_info.delivery_tag,
      :exchange => delivery_info.exchange,
      :headers => properties.headers,
      :message_id => properties.message_id,
      :routing_key => delivery_info.routing_key,
      :queue => queue.name,
      :uses_acknowledgements => route.acknowledgements?,
    }
    env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
    run_env(env, threadpool)
  end

  bunny_consumers << consumer
  queue.subscribe_with(consumer)
end