class LogStash::Inputs::Burrow

Constants

Schedule_types

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/burrow.rb, line 35
def register
  @host = Socket.gethostname.force_encoding(Encoding::UTF_8)

  @logger.info("Registering burrow Input", :type => @type, :schedule => @schedule, :timeout => @timeout)

  if @api_version != "v3"
    raise LogStash::ConfigurationError, "at the moment only Burrow API version v3 is supported."
  end

  if @codec.class.config_name != "json"
    raise LogStash::ConfigurationError, "this plugin needs codec to be json."
  end

  setup_request!
end
run(queue) click to toggle source
# File lib/logstash/inputs/burrow.rb, line 126
def run(queue)
  setup_schedule(queue)
end
run_once(queue) click to toggle source
# File lib/logstash/inputs/burrow.rb, line 146
def run_once(queue)
  request(queue, @request)
end
setup_schedule(queue) click to toggle source
# File lib/logstash/inputs/burrow.rb, line 130
def setup_schedule(queue)
  #schedule hash must contain exactly one of the allowed keys
  msg_invalid_schedule = "Invalid config. schedule hash must contain " +
      "exactly one of the following keys - cron, at, every or in"
  raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length != 1
  schedule_type = @schedule.keys.first
  schedule_value = @schedule[schedule_type]
  raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)

  @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
  #as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
  opts = schedule_type == "every" ? {:first_in => 0.01} : {}
  @scheduler.send(schedule_type, schedule_value, opts) {run_once(queue)}
  @scheduler.join
end
stop() click to toggle source
# File lib/logstash/inputs/burrow.rb, line 51
def stop
  Stud.stop!(@interval_thread) if @interval_thread
  @scheduler.stop if @scheduler
end

Private Instance Methods

decode_and_flush(codec, body, &yielder) click to toggle source
# File lib/logstash/inputs/burrow.rb, line 218
def decode_and_flush(codec, body, &yielder)
  codec.decode(body, &yielder)
  codec.flush(&yielder)
end
handle_decoded_event(queue, request, response, event, _execution_time) click to toggle source
# File lib/logstash/inputs/burrow.rb, line 225
def handle_decoded_event(queue, request, response, event, _execution_time)
  decorate(event)
  queue << event
rescue StandardError, java.lang.Exception => e
  @logger.error? && @logger.error("Error eventifying response!",
                                  :exception => e,
                                  :exception_message => e.message,
                                  :client_config => request,
                                  :response => response
  )
end
handle_failure(queue, request, exception, execution_time) click to toggle source

Beware, on old versions of manticore some uncommon failures are not handled

# File lib/logstash/inputs/burrow.rb, line 240
def handle_failure(queue, request, exception, execution_time)
  event = LogStash::Event.new

  event.tag("_http_request_failure")

  # This is also in the metadata, but we send it anyone because we want this
  # persisted by default, whereas metadata isn't. People don't like mysterious errors
  event.set("http_request_failure", {
      "request" => structure_request(request),
      "error" => exception.to_s,
      "backtrace" => exception.backtrace,
      "runtime_seconds" => execution_time
  })

  queue << event
rescue StandardError, java.lang.Exception => e
  @logger.error? && @logger.error("Cannot read URL or send the error as an event!",
                                  :exception => e,
                                  :exception_message => e.message,
                                  :exception_backtrace => e.backtrace)

  # If we are running in debug mode we can display more information about the
  # specific request which could give more details about the connection.
  @logger.debug? && @logger.debug("Cannot read URL or send the error as an event!",
                                  :exception => e,
                                  :exception_message => e.message,
                                  :exception_backtrace => e.backtrace,
                                  :client_config => request)
end
handle_success(queue, request, response, execution_time) click to toggle source
# File lib/logstash/inputs/burrow.rb, line 201
def handle_success(queue, request, response, execution_time)
  body = response.body
  # If there is a usable response. HEAD request are `nil` and empty get
  # responses come up as "" which will cause the codec to not yield anything
  if body && body.size > 0
    decode_and_flush(@codec, body) do |decoded|
      event = decoded
      handle_decoded_event(queue, request, response, event, execution_time)
    end
  else
    event = ::LogStash::Event.new
    handle_decoded_event(queue, request, response, event, execution_time)
  end
end
normalize_request(client_spec) click to toggle source
# File lib/logstash/inputs/burrow.rb, line 62
def normalize_request(client_spec)
  if client_spec.is_a?(String)
    res = [client_spec + ('/' unless client_spec.end_with?('/')) + @api_version << "/kafka"]
  elsif client_spec.is_a?(Hash)
    # The client will expect keys / values
    spec = Hash[client_spec.clone.map {|k, v| [k.to_sym, v]}] # symbolize keys

    # method and url aren't really part of the options, so we pull them out
    spec.delete(:method)
    url = spec.delete(:url)

    raise LogStash::ConfigurationError, "Invalid URL #{url}" unless URI::DEFAULT_PARSER.regexp[:ABS_URI].match(url)

    # Manticore wants auth options that are like {:auth => {:user => u, :pass => p}}
    # We allow that because earlier versions of this plugin documented that as the main way to
    # to do things, but now prefer top level "user", and "password" options
    # So, if the top level user/password are defined they are moved to the :auth key for manticore
    # if those attributes are already in :auth they still need to be transformed to symbols
    auth = spec[:auth]
    user = spec.delete(:user) || (auth && auth["user"])
    password = spec.delete(:password) || (auth && auth["password"])

    if user.nil? ^ password.nil?
      raise LogStash::ConfigurationError, "'user' and 'password' must both be specified for input HTTP poller!"
    end

    if user && password
      spec[:auth] = {
          user: user,
          pass: password,
          eager: true
      }
    end
    res = [url + ('/' unless url.end_with?('/')) + @api_version << "/kafka", spec]
  else
    raise LogStash::ConfigurationError, "Invalid URL or request spec: '#{client_spec}', expected a String or Hash!"
  end

  validate_request!(res)
  res
end
request(queue, request) click to toggle source
# File lib/logstash/inputs/burrow.rb, line 153
def request(queue, request)
  @logger.debug? && @logger.debug?("Fetching URL", :client_config => request)
  started = Time.now

  url, spec = request
  client.get(url, spec).
      on_success do |cluster_response|
    body = cluster_response.body
    if body && body.size > 0
      @codec.decode(body) do |clusters|
        cluster_list = clusters.get('clusters')
        @logger.debug? && @logger.debug?("found clusters", :cluster_list => cluster_list)
        cluster_list.each {|cluster|
          consumer_url = url + "/" << cluster + "/consumer"
          client.get(consumer_url, spec).
              on_success do |consumers_response|
            body = consumers_response.body
            if body && body.size > 0
              @codec.decode(body) do |consumers|
                consumer_list = consumers.get('consumers')
                @logger.debug? && @logger.debug?("found consumers", :consumer_list => consumer_list)
                consumer_list.each {|consumer|
                  lag_url = url + "/" + cluster + "/consumer" + "/" + consumer + "/lag"
                  client.get(lag_url, spec).
                      on_success do |consumer_lag_response|
                    handle_success(queue, [lag_url, spec], consumer_lag_response, Time.now - started)
                  end.
                      on_failure do |exception|
                    handle_failure(queue, [lag_url, spec], exception, Time.now - started)
                  end.call
                } unless consumer_list.nil?
              end
            end
          end.
              on_failure do |exception|
            handle_failure(queue, [consumer_url, spec], exception, Time.now - started)
          end.call
        } unless cluster_list.nil?
      end
    end
  end.
      on_failure do |exception|
    handle_failure(queue, request, exception, Time.now - started)
  end.call
end
setup_request!() click to toggle source
# File lib/logstash/inputs/burrow.rb, line 58
def setup_request!
  @request = normalize_request(client_config)
end
structure_request(request) click to toggle source

Turn [method, url, spec] request into a hash for friendlier logging / ES indexing

# File lib/logstash/inputs/burrow.rb, line 273
def structure_request(request)
  url, spec = request
  # Flatten everything into the 'spec' hash, also stringify any keys to normalize
  Hash[(spec || {}).merge({
                              "url" => url,
                          }).map {|k, v| [k.to_s, v]}]
end
validate_request!(request) click to toggle source
# File lib/logstash/inputs/burrow.rb, line 106
def validate_request!(request)
  url, spec = request

  raise LogStash::ConfigurationError, "Invalid URL #{url}" unless URI::DEFAULT_PARSER.regexp[:ABS_URI].match(url)

  raise LogStash::ConfigurationError, "No URL provided for request! #{url}" unless url
  if spec && spec[:auth]
    unless spec[:auth][:user]
      raise LogStash::ConfigurationError, "Auth was specified, but 'user' was not!"
    end
    unless spec[:auth][:pass]
      raise LogStash::ConfigurationError, "Auth was specified, but 'password' was not!"
    end
  end

  request
end