module ActionSubscriber::DSL

Public Instance Methods

_run_action_at_least_once_with_filters(env, action) click to toggle source
# File lib/action_subscriber/dsl.rb, line 176
    def _run_action_at_least_once_with_filters(env, action)
      processed_acknowledgement = false
      rejected_message = false

      _run_action_with_filters(env, action)

      processed_acknowledgement = env.acknowledge
    rescue
      ::ActionSubscriber::MessageRetry.redeliver_message_with_backoff(env)
      processed_acknowledgement = env.acknowledge

      raise
    ensure
      rejected_message = env.reject if !processed_acknowledgement

      if !rejected_message && !processed_acknowledgement
        $stdout << <<-UNREJECTABLE
          CANNOT ACKNOWLEDGE OR REJECT THE MESSAGE

          This is a exceptional state for ActionSubscriber to enter and puts the current
          Process in the position of "I can't get new work from RabbitMQ, but also
          can't acknowledge or reject the work that I currently have" ... While rare
          this state can happen.

          Instead of continuing to try to process the message ActionSubscriber is
          sending a Kill signal to the current running process to gracefully shutdown
          so that the RabbitMQ server will purge any outstanding acknowledgements. If
          you are running a process monitoring tool (like Upstart) the Subscriber
          process will be restarted and be able to take on new work.

          ** Running a process monitoring tool like Upstart is recommended for this reason **
        UNREJECTABLE

        Process.kill(:TERM, Process.pid)
      end
    end
_run_action_at_most_once_with_filters(env, action) click to toggle source
# File lib/action_subscriber/dsl.rb, line 145
    def _run_action_at_most_once_with_filters(env, action)
      processed_acknowledgement = false
      rejected_message = false
      processed_acknowledgement = env.acknowledge

      _run_action_with_filters(env, action)
    ensure
      rejected_message = env.reject if !processed_acknowledgement

      if !rejected_message && !processed_acknowledgement
        $stdout << <<-UNREJECTABLE
          CANNOT ACKNOWLEDGE OR REJECT THE MESSAGE

          This is a exceptional state for ActionSubscriber to enter and puts the current
          Process in the position of "I can't get new work from RabbitMQ, but also
          can't acknowledge or reject the work that I currently have" ... While rare
          this state can happen.

          Instead of continuing to try to process the message ActionSubscriber is
          sending a Kill signal to the current running process to gracefully shutdown
          so that the RabbitMQ server will purge any outstanding acknowledgements. If
          you are running a process monitoring tool (like Upstart) the Subscriber
          process will be restarted and be able to take on new work.

          ** Running a process monitoring tool like Upstart is recommended for this reason **
        UNREJECTABLE

        Process.kill(:TERM, Process.pid)
      end
    end
_run_action_with_filters(env, action) click to toggle source
# File lib/action_subscriber/dsl.rb, line 131
def _run_action_with_filters(env, action)
  subscriber_instance = self.new(env)
  final_block = Proc.new { subscriber_instance.public_send(action) }

  first_proc = around_filters.reverse.reduce(final_block) do |block, filter|
    if filter.matches(action)
      Proc.new { subscriber_instance.send(filter.callback_method, &block) }
    else
      block
    end
  end
  first_proc.call
end
acknowledge_messages?() click to toggle source
# File lib/action_subscriber/dsl.rb, line 54
def acknowledge_messages?
  !!@_acknowledge_messages
end
around_filter(callback_method, options = nil) click to toggle source
# File lib/action_subscriber/dsl.rb, line 58
def around_filter(callback_method, options = nil)
  filter = Filter.new(callback_method, options)
  conditionally_add_filter!(filter)
  around_filters
end
around_filters() click to toggle source
# File lib/action_subscriber/dsl.rb, line 68
def around_filters
  @_around_filters ||= []
end
at_least_once!() click to toggle source
# File lib/action_subscriber/dsl.rb, line 36
def at_least_once!
  @_acknowledge_messages = true
  @_at_least_once = true
end
at_least_once?() click to toggle source
# File lib/action_subscriber/dsl.rb, line 41
def at_least_once?
  !!@_at_least_once
end
at_most_once!() click to toggle source
# File lib/action_subscriber/dsl.rb, line 45
def at_most_once!
  @_acknowledge_messages = true
  @_at_most_once = true
end
at_most_once?() click to toggle source
# File lib/action_subscriber/dsl.rb, line 50
def at_most_once?
  !!@_at_most_once
end
conditionally_add_filter!(filter) click to toggle source
# File lib/action_subscriber/dsl.rb, line 64
def conditionally_add_filter!(filter)
  around_filters << filter unless around_filters.any? { |f| f.callback_method == filter.callback_method }
end
exchange(*names)
Alias for: exchange_names
exchange_names(*names) click to toggle source

Explicitly set the name of the exchange

# File lib/action_subscriber/dsl.rb, line 74
def exchange_names(*names)
  @_exchange_names ||= []
  @_exchange_names += names.flatten.map(&:to_s)

  if @_exchange_names.empty?
    return [ ::ActionSubscriber.config.default_exchange ]
  else
    return @_exchange_names.compact.uniq
  end
end
Also aliased as: exchange
manual_acknowledgement!() click to toggle source
# File lib/action_subscriber/dsl.rb, line 86
def manual_acknowledgement!
  @_acknowledge_messages = true
  @_manual_acknowedgement = true
end
manual_acknowledgement?() click to toggle source
# File lib/action_subscriber/dsl.rb, line 91
def manual_acknowledgement?
  !!@_manual_acknowedgement
end
no_acknowledgement!() click to toggle source
# File lib/action_subscriber/dsl.rb, line 95
def no_acknowledgement!
  @_acknowledge_messages = false
end
publisher(name = nil)
queue_for(method, queue_name) click to toggle source

Explicitly set the name of a queue for the given method route

Ex.

queue_for :created, "derp.derp"
queue_for :updated, "foo.bar"
# File lib/action_subscriber/dsl.rb, line 105
def queue_for(method, queue_name)
  @_queue_names ||= {}
  @_queue_names[method] = queue_name
end
queue_names() click to toggle source
# File lib/action_subscriber/dsl.rb, line 110
def queue_names
  @_queue_names ||= {}
end
remote_application_name(name = nil) click to toggle source
# File lib/action_subscriber/dsl.rb, line 114
def remote_application_name(name = nil)
  @_remote_application_name = name if name
  @_remote_application_name
end
Also aliased as: publisher
routing_key_for(method, routing_key_name) click to toggle source

Explicitly set the whole routing key to use for a given method route.

# File lib/action_subscriber/dsl.rb, line 122
def routing_key_for(method, routing_key_name)
  @_routing_key_names ||= {}
  @_routing_key_names[method] = routing_key_name
end
routing_key_names() click to toggle source
# File lib/action_subscriber/dsl.rb, line 127
def routing_key_names
  @_routing_key_names ||= {}
end
run_action_with_filters(env, action) click to toggle source
# File lib/action_subscriber/dsl.rb, line 213
def run_action_with_filters(env, action)
  case
  when at_least_once?
    _run_action_at_least_once_with_filters(env, action)
  when at_most_once?
    _run_action_at_most_once_with_filters(env, action)
  else
    _run_action_with_filters(env, action)
  end
end