module ActionSubscriber::MarchHare::Subscriber

Public Instance Methods

cancel_consumers!() click to toggle source
# File lib/action_subscriber/march_hare/subscriber.rb, line 7
def cancel_consumers!
  # Cancel any non-cancelled consumers.
  march_hare_consumers.reject(&:cancelled?).each(&:cancel)
  ::ActionSubscriber::ThreadPools.threadpools.each do |name, threadpool|
    threadpool.shutdown
  end
end
march_hare_consumers() click to toggle source
# File lib/action_subscriber/march_hare/subscriber.rb, line 15
def march_hare_consumers
  @march_hare_consumers ||= []
end
setup_subscriptions!() click to toggle source
# File lib/action_subscriber/march_hare/subscriber.rb, line 19
def setup_subscriptions!
  fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty?
  routes.each do |route|
    subscriptions << {
      :route => route,
      :queue => setup_queue(route),
    }
  end
end
start_subscribers!() click to toggle source
# File lib/action_subscriber/march_hare/subscriber.rb, line 29
def start_subscribers!
  subscriptions.each do |subscription|
    start_subscriber_for_subscription(subscription)
  end
end

Private Instance Methods

_normalized_headers(metadata) click to toggle source
# File lib/action_subscriber/march_hare/subscriber.rb, line 83
def _normalized_headers(metadata)
  return {} unless metadata.headers
  metadata.headers.each_with_object({}) do |(header,value), hash|
    hash[header] = value.to_s
  end
end
setup_queue(route) click to toggle source
# File lib/action_subscriber/march_hare/subscriber.rb, line 75
def setup_queue(route)
  channel = ::ActionSubscriber::RabbitConnection.with_connection{|connection| connection.create_channel }
  exchange = channel.topic(route.exchange)
  queue = channel.queue(route.queue, :durable => route.durable)
  queue.bind(exchange, :routing_key => route.routing_key)
  queue
end
start_subscriber_for_subscription(subscription) click to toggle source
# File lib/action_subscriber/march_hare/subscriber.rb, line 37
def start_subscriber_for_subscription(subscription)
  route = subscription[:route]
  queue = subscription[:queue]
  queue.channel.prefetch = route.prefetch if route.acknowledgements?
  threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name)
  opts = route.queue_subscription_options

  if ::ActionSubscriber.configuration.resubscribe_on_consumer_cancellation
    # Add cancellation callback to rebuild subscriber on cancel.
    opts[:on_cancellation] = lambda do |the_consumer|
      ::ActionSubscriber.logger.warn "Cancellation received for queue consumer: #{queue.name}, rebuilding subscription..."
      march_hare_consumers.delete(the_consumer)
      queue.channel.close
      safely_restart_subscriber(subscription)
    end
  end

  consumer = queue.subscribe(opts) do |metadata, encoded_payload|
    ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
    properties = {
      :action => route.action,
      :channel => queue.channel,
      :content_type => metadata.content_type,
      :delivery_tag => metadata.delivery_tag,
      :exchange => metadata.exchange,
      :headers => _normalized_headers(metadata),
      :message_id => metadata.message_id,
      :routing_key => metadata.routing_key,
      :queue => queue.name,
      :uses_acknowledgements => route.acknowledgements?,
    }
    env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
    run_env(env, threadpool)
  end

  march_hare_consumers << consumer
end