class ActionSubscriber::Middleware::Env
Constants
- ACK_INSTRUMENT_KEY
- NACK_INSTRUMENT_KEY
- REJECT_INSTRUMENT_KEY
Attributes
action[R]
channel[R]
content_type[R]
encoded_payload[R]
exchange[R]
headers[R]
message_id[R]
payload[RW]
queue[R]
routing_key[R]
subscriber[R]
Public Class Methods
new(subscriber, encoded_payload, properties)
click to toggle source
@param subscriber [Class] the class that will handle this message @param encoded_payload
[String] the payload as it was received from RabbitMQ @param properties [Hash] that must contain the following keys (as symbols)
:channel => RabbitMQ channel for doing acknowledgement :content_type => String :delivery_tag => String (the message identifier to send back to rabbitmq for acknowledgement) :exchange => String :headers => Hash[ String => String ] :message_id => String :routing_key => String
# File lib/action_subscriber/middleware/env.rb, line 34 def initialize(subscriber, encoded_payload, properties) @action = properties.fetch(:action) @channel = properties[:channel] @content_type = properties.fetch(:content_type) @delivery_tag = properties.fetch(:delivery_tag) @encoded_payload = encoded_payload @exchange = properties.fetch(:exchange) @has_been_acked = false @has_been_nacked = false @has_been_rejected = false @headers = properties.fetch(:headers, {}) @message_id = properties[:message_id].presence || ::SecureRandom.hex(3) @queue = properties.fetch(:queue) @routing_key = properties.fetch(:routing_key) @subscriber = subscriber @uses_acknowledgements = properties.fetch(:uses_acknowledgements, false) end
Public Instance Methods
acknowledge()
click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 52 def acknowledge fail ::RuntimeError, "you can't acknowledge messages under the polling API" unless @channel return true if @has_been_acked acknowledge_multiple_messages = false @has_been_acked = true instrument_for(ACK_INSTRUMENT_KEY) do @channel.ack(@delivery_tag, acknowledge_multiple_messages) end true end
channel_open?()
click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 63 def channel_open? return false unless @channel @channel.open? end
nack()
click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 68 def nack fail ::RuntimeError, "you can't acknowledge messages under the polling API" unless @channel return true if @has_been_nacked nack_multiple_messages = false requeue_message = true @has_been_nacked = true instrument_for(NACK_INSTRUMENT_KEY) do @channel.nack(@delivery_tag, nack_multiple_messages, requeue_message) end true end
reject()
click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 80 def reject fail ::RuntimeError, "you can't acknowledge messages under the polling API" unless @channel return true if @has_been_rejected requeue_message = true @has_been_rejected = true instrument_for(REJECT_INSTRUMENT_KEY) do @channel.reject(@delivery_tag, requeue_message) end true end
safe_acknowledge()
click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 91 def safe_acknowledge acknowledge if uses_acknowledgements? && channel_open? && !has_used_delivery_tag? end
safe_nack()
click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 95 def safe_nack nack if uses_acknowledgements? && channel_open? && !has_used_delivery_tag? end
safe_reject()
click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 99 def safe_reject reject if uses_acknowledgements? && channel_open? && !has_used_delivery_tag? end
to_hash()
click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 103 def to_hash { :action => action, :content_type => content_type, :exchange => exchange, :routing_key => routing_key, :payload => payload } end
Also aliased as: to_h
Private Instance Methods
has_used_delivery_tag?()
click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 116 def has_used_delivery_tag? @has_been_acked || @has_been_nacked || @has_been_rejected end
instrument_for(instrumentation_key) { || ... }
click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 120 def instrument_for(instrumentation_key) ::ActiveSupport::Notifications.instrument(instrumentation_key, :subscriber => subscriber.to_s, :routing_key => routing_key, :queue => queue) do yield end end
uses_acknowledgements?()
click to toggle source
# File lib/action_subscriber/middleware/env.rb, line 126 def uses_acknowledgements? @uses_acknowledgements end