class ActionSubscriber::Middleware::Env

Constants

ACK_INSTRUMENT_KEY
NACK_INSTRUMENT_KEY
REJECT_INSTRUMENT_KEY

Attributes

action[R]
channel[R]
content_type[R]
encoded_payload[R]
exchange[R]
headers[R]
message_id[R]
payload[RW]
queue[R]
routing_key[R]
subscriber[R]

Public Class Methods

new(subscriber, encoded_payload, properties) click to toggle source

@param subscriber [Class] the class that will handle this message @param encoded_payload [String] the payload as it was received from RabbitMQ @param properties [Hash] that must contain the following keys (as symbols)

:channel => RabbitMQ channel for doing acknowledgement
:content_type => String
:delivery_tag => String (the message identifier to send back to rabbitmq for acknowledgement)
:exchange => String
:headers => Hash[ String => String ]
:message_id => String
:routing_key => String
# File lib/action_subscriber/middleware/env.rb, line 34
def initialize(subscriber, encoded_payload, properties)
  @action = properties.fetch(:action)
  @channel = properties[:channel]
  @content_type = properties.fetch(:content_type)
  @delivery_tag = properties.fetch(:delivery_tag)
  @encoded_payload = encoded_payload
  @exchange = properties.fetch(:exchange)
  @has_been_acked = false
  @has_been_nacked = false
  @has_been_rejected = false
  @headers = properties.fetch(:headers, {})
  @message_id = properties[:message_id].presence || ::SecureRandom.hex(3)
  @queue = properties.fetch(:queue)
  @routing_key = properties.fetch(:routing_key)
  @subscriber = subscriber
  @uses_acknowledgements = properties.fetch(:uses_acknowledgements, false)
end

Public Instance Methods

acknowledge() click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 52
def acknowledge
  fail ::RuntimeError, "you can't acknowledge messages under the polling API" unless @channel
  return true if @has_been_acked
  acknowledge_multiple_messages = false
  @has_been_acked = true
  instrument_for(ACK_INSTRUMENT_KEY) do
    @channel.ack(@delivery_tag, acknowledge_multiple_messages)
  end
  true
end
channel_open?() click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 63
def channel_open?
  return false unless @channel
  @channel.open?
end
nack() click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 68
def nack
  fail ::RuntimeError, "you can't acknowledge messages under the polling API" unless @channel
  return true if @has_been_nacked
  nack_multiple_messages = false
  requeue_message = true
  @has_been_nacked = true
  instrument_for(NACK_INSTRUMENT_KEY) do
    @channel.nack(@delivery_tag, nack_multiple_messages, requeue_message)
  end
  true
end
reject() click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 80
def reject
  fail ::RuntimeError, "you can't acknowledge messages under the polling API" unless @channel
  return true if @has_been_rejected
  requeue_message = true
  @has_been_rejected = true
  instrument_for(REJECT_INSTRUMENT_KEY) do
    @channel.reject(@delivery_tag, requeue_message)
  end
  true
end
safe_acknowledge() click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 91
def safe_acknowledge
  acknowledge if uses_acknowledgements? && channel_open? && !has_used_delivery_tag?
end
safe_nack() click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 95
def safe_nack
  nack if uses_acknowledgements? && channel_open? && !has_used_delivery_tag?
end
safe_reject() click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 99
def safe_reject
  reject if uses_acknowledgements? && channel_open? && !has_used_delivery_tag?
end
to_h()
Alias for: to_hash
to_hash() click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 103
def to_hash
  {
    :action => action,
    :content_type => content_type,
    :exchange => exchange,
    :routing_key => routing_key,
    :payload => payload
  }
end
Also aliased as: to_h

Private Instance Methods

has_used_delivery_tag?() click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 116
def has_used_delivery_tag?
  @has_been_acked || @has_been_nacked || @has_been_rejected
end
instrument_for(instrumentation_key) { || ... } click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 120
def instrument_for(instrumentation_key)
  ::ActiveSupport::Notifications.instrument(instrumentation_key, :subscriber => subscriber.to_s, :routing_key => routing_key, :queue => queue) do
    yield
  end
end
uses_acknowledgements?() click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 126
def uses_acknowledgements?
  @uses_acknowledgements
end