class LogStash::Inputs::RabbitMQ
Pull events from a www.rabbitmq.com/[RabbitMQ] queue.
The default settings will create an entirely transient queue and listen for all messages by default. If you need durability or any other advanced settings, please set the appropriate options
This plugin uses the rubymarchhare.info/[March Hare] library for interacting with the RabbitMQ
server. Most configuration options map directly to standard RabbitMQ
and AMQP concepts. The www.rabbitmq.com/amqp-0-9-1-reference.html[AMQP 0-9-1 reference guide] and other parts of the RabbitMQ
documentation are useful for deeper understanding.
The properties of messages received will be stored in the `[@metadata]` field if the `@metadata_enabled` setting is checked. Note that storing metadata may degrade performance. The following properties may be available (in most cases dependent on whether they were set by the sender):
-
app-id
-
cluster-id
-
consumer-tag
-
content-encoding
-
content-type
-
correlation-id
-
delivery-mode
-
exchange
-
expiration
-
message-id
-
priority
-
redeliver
-
reply-to
-
routing-key
-
timestamp
-
type
-
user-id
For example, to get the RabbitMQ
message's timestamp property into the Logstash event's `@timestamp` field, use the date filter to parse the `[@metadata][timestamp]` field:
- source,ruby
-
filter {
if [@metadata][rabbitmq_properties][timestamp] { date { match => ["[@metadata][rabbitmq_properties][timestamp]", "UNIX"] } }
}
Additionally, any message headers will be saved in the `[@metadata]` field.
Constants
- INTERNAL_QUEUE_POISON
- MESSAGE_PROPERTIES
The properties to extract from each message and store in a @metadata field.
Technically the exchange, redeliver, and routing-key properties belong to the envelope and not the message but we ignore that distinction here. However, we extract the headers separately via
get_headers
even though the header table technically is a message property.Freezing all strings so that code modifying the event's @metadata field can't touch them.
If updating this list, remember to update the documentation above too.
Public Instance Methods
# File lib/logstash/inputs/rabbitmq.rb, line 194 def bind_exchange! if @exchange if @exchange_type # Only declare the exchange if @exchange_type is set! @logger.info? && @logger.info("Declaring exchange '#{@exchange}' with type #{@exchange_type}") @hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable) end @hare_info.queue.bind(@exchange, :routing_key => @key) end end
# File lib/logstash/inputs/rabbitmq.rb, line 217 def consume! @consumer = @hare_info.queue.build_consumer(:on_cancellation => Proc.new { on_cancellation }) do |metadata, data| @internal_queue.put [metadata, data] end begin @hare_info.queue.subscribe_with(@consumer, :manual_ack => @ack) rescue MarchHare::Exception => e @logger.warn("Could not subscribe to queue! Will retry in #{@subscription_retry_interval_seconds} seconds", :queue => @queue) sleep @subscription_retry_interval_seconds retry end internal_queue_consume! end
# File lib/logstash/inputs/rabbitmq.rb, line 208 def declare_queue @hare_info.channel.queue(@queue, :durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :passive => @passive, :arguments => @arguments) end
# File lib/logstash/inputs/rabbitmq.rb, line 204 def declare_queue! @hare_info.queue = declare_queue() end
# File lib/logstash/inputs/rabbitmq.rb, line 234 def internal_queue_consume! i=0 last_delivery_tag=nil while true payload = @internal_queue.poll(10, TimeUnit::MILLISECONDS) if !payload # Nothing in the queue if last_delivery_tag # And we have unacked stuff @hare_info.channel.ack(last_delivery_tag, true) if @ack i=0 last_delivery_tag = nil end next end break if payload == INTERNAL_QUEUE_POISON metadata, data = payload @codec.decode(data) do |event| decorate(event) if @metadata_enabled event.set("[@metadata][rabbitmq_headers]", get_headers(metadata)) event.set("[@metadata][rabbitmq_properties]", get_properties(metadata)) end @output_queue << event if event end i += 1 if i >= @prefetch_count @hare_info.channel.ack(metadata.delivery_tag, true) if @ack i = 0 last_delivery_tag = nil else last_delivery_tag = metadata.delivery_tag end end end
# File lib/logstash/inputs/rabbitmq.rb, line 287 def on_cancellation if !stop? # If this isn't already part of a regular stop @logger.info("Received basic.cancel from #{rabbitmq_settings[:host]}, shutting down.") stop end end
# File lib/logstash/inputs/rabbitmq.rb, line 170 def register @internal_queue = java.util.concurrent.ArrayBlockingQueue.new(@prefetch_count*2) end
# File lib/logstash/inputs/rabbitmq.rb, line 174 def run(output_queue) setup! @output_queue = output_queue consume! end
# File lib/logstash/inputs/rabbitmq.rb, line 180 def setup! connect! declare_queue! bind_exchange! @hare_info.channel.prefetch = @prefetch_count rescue => e @logger.warn("Error while setting up connection for rabbitmq input! Will retry.", :message => e.message, :class => e.class.name, :location => e.backtrace.first) sleep_for_retry retry end
# File lib/logstash/inputs/rabbitmq.rb, line 278 def shutdown_consumer return unless @consumer @hare_info.channel.basic_cancel(@consumer.consumer_tag) until @consumer.terminated? @logger.info("Waiting for rabbitmq consumer to terminate before stopping!", :params => self.params) sleep 1 end end
# File lib/logstash/inputs/rabbitmq.rb, line 272 def stop @internal_queue.put(INTERNAL_QUEUE_POISON) shutdown_consumer close_connection end
Private Instance Methods
# File lib/logstash/inputs/rabbitmq.rb, line 295 def get_headers(metadata) metadata.headers || {} end
# File lib/logstash/inputs/rabbitmq.rb, line 300 def get_properties(metadata) MESSAGE_PROPERTIES.reduce({}) do |acc, name| # The method names obviously can't contain hyphens. value = metadata.send(name.gsub("-", "_")) if value # The AMQP 0.9.1 timestamp field only has second resolution # so storing milliseconds serves no purpose and might give # the incorrect impression of a higher resolution. acc[name] = name != "timestamp" ? value : value.getTime / 1000 end acc end end