class LogStash::Outputs::Amqp

Constants

MQTYPES

Public Class Methods

new(url, config={}, &block) click to toggle source
Calls superclass method LogStash::Outputs::Base::new
# File lib/logstash/outputs/amqp.rb, line 11
def initialize(url, config={}, &block)
  super

  @mq = nil
  @bulk_prefix = nil

  # Handle path /<vhost>/<type>/<name> or /<type>/<name>
  # vhost allowed to contain slashes
  if @url.path =~ %r{^/((.*)/)?([^/]+)/([^/]+)}
    unused, @vhost, @mqtype, @name = $~.captures
  else
    raise "amqp urls must have a path of /<type>/name or /vhost/<type>/name where <type> is #{MQTYPES.join(", ")}"
  end

  if !MQTYPES.include?(@mqtype)
    raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.join(", ")}"
  end
end

Public Instance Methods

receive(event) click to toggle source
# File lib/logstash/outputs/amqp.rb, line 64
def receive(event)
  @logger.debug(["Sending event", { :url => @url, :event => event }])
  if @bulk_prefix
    @target.publish(@bulk_prefix + event.to_json + "\n")
  else
    @target.publish(event.to_json)
  end
end
receive_raw(raw) click to toggle source

This is used by the ElasticSearch AMQP/River output.

# File lib/logstash/outputs/amqp.rb, line 75
def receive_raw(raw)
  if @target == nil
    raise "had trouble registering AMQP URL #{@url.to_s}, @target is nil"
  end

  @target.publish(raw)
end
register() click to toggle source
# File lib/logstash/outputs/amqp.rb, line 31
def register
  @logger.info("Registering output #{@url}")
  query_args = @url.query ? CGI.parse(@url.query) : {}
  amqpsettings = {
    :vhost => (@vhost or "/"),
    :host => @url.host,
    :port => (@url.port or 5672),
  }
  amqpsettings[:user] = @url.user if @url.user
  amqpsettings[:pass] = @url.password if @url.password
  amqpsettings[:logging] = query_args.include? "debug"
  @logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} exchange #{@name.inspect}")
  @amqp = AMQP.connect(amqpsettings)
  @mq = MQ.new(@amqp)
  @target = nil

  if @urlopts.include? "es_index" and @urlopts.include? "es_type"
    @bulk_prefix = { "index" => { "_index" => @urlopts["es_index"], "_type" => @urlopts["es_type"] } }.to_json + "\n"
    @logger.debug "Preparing ElasticSearch bulk API header for injection: #{@bulk_prefix.inspect}"
  end

  @durable = @urlopts["durable"] ? true : false
  case @mqtype
    when "fanout"
      @target = @mq.fanout(@name, :durable => @durable)
    when "direct"
      @target = @mq.direct(@name, :durable => @durable)
    when "topic"
      @target = @mq.topic(@name, :durable => @durable)
  end # case @mqtype
end