class LogStash::Filters::KV

This filter helps automatically parse messages (or specific event fields) which are of the ‘foo=bar` variety.

For example, if you have a log message which contains ‘ip=1.2.3.4 error=REFUSED`, you can parse those automatically by configuring:

source,ruby

filter {

kv { }

}

The above will result in a message of ‘ip=1.2.3.4 error=REFUSED` having the fields:

This is great for postfix, iptables, and other types of logs that tend towards ‘key=value` syntax.

You can configure any arbitrary strings to split your data on, in case your data is not structured using ‘=` signs and whitespace. For example, this filter can also be used to parse query parameters like `foo=bar&baz=fizz` by setting the `field_split` parameter to `&`.

Constants

EMPTY_STRING
TRANSFORM_CAPITALIZE_KEY
TRANSFORM_LOWERCASE_KEY

Constants used for transform check

TRANSFORM_UPPERCASE_KEY

Public Instance Methods

close() click to toggle source
# File lib/logstash/filters/kv.rb, line 466
def close
end
filter(event) click to toggle source
# File lib/logstash/filters/kv.rb, line 434
def filter(event)
  value = event.get(@source)

  # if timeout is 0 avoid creating a closure although Timeout.timeout has a bypass for 0s timeouts.
  kv = @timeout_seconds > 0.0 ? Timeout.timeout(@timeout_seconds, TimeoutException) { parse_value(value, event) } : parse_value(value, event)

  # Add default key-values for missing keys
  kv = @default_keys.merge(kv)

  return if kv.empty?

  if @target
    if event.include?(@target)
      @logger.debug? && @logger.debug("Overwriting existing target field", field: @target, existing_value: event.get(@target))
    end
    event.set(@target, kv)
  else
    kv.each{|k, v| event.set(k, v)}
  end

  filter_matched(event)

rescue TimeoutException => e
  logger.warn("Timeout reached in KV filter with value #{summarize(value)}")
  event.tag(@tag_on_timeout)
rescue => ex
  meta = { :exception => ex.message }
  meta[:backtrace] = ex.backtrace if logger.debug?
  logger.warn('Exception while parsing KV', meta)
  @tag_on_failure.each { |tag| event.tag(tag) }
end
register() click to toggle source
# File lib/logstash/filters/kv.rb, line 346
def register
  # Too late to set the regexp interruptible flag, at least warn if it is not set.
  require 'java'
  if java.lang.System.getProperty("jruby.regexp.interruptible") != "true"
    logger.warn("KV Filter registered without jruby interruptible regular expressions enabled (`-Djruby.regexp.interruptible=true`); timeouts may not be respected.")
  end

  if @value_split.empty?
    raise LogStash::ConfigurationError, I18n.t(
      "logstash.runner.configuration.invalid_plugin_register",
      :plugin => "filter",
      :type => "kv",
      :error => "Configuration option 'value_split' must be a non-empty string"
    )
  end

  if @field_split_pattern && @field_split_pattern.empty?
    raise LogStash::ConfigurationError, I18n.t(
        "logstash.runner.configuration.invalid_plugin_register",
        :plugin => "filter",
        :type => "kv",
        :error => "Configuration option 'field_split_pattern' must be a non-empty string"
    )
  end

  if @value_split_pattern && @value_split_pattern.empty?
    raise LogStash::ConfigurationError, I18n.t(
        "logstash.runner.configuration.invalid_plugin_register",
        :plugin => "filter",
        :type => "kv",
        :error => "Configuration option 'value_split_pattern' must be a non-empty string"
    )
  end

  @trim_value_re = Regexp.new("^[#{@trim_value}]+|[#{@trim_value}]+$") if @trim_value
  @trim_key_re = Regexp.new("^[#{@trim_key}]+|[#{@trim_key}]+$") if @trim_key

  @remove_char_value_re = Regexp.new("[#{@remove_char_value}]") if @remove_char_value
  @remove_char_key_re = Regexp.new("[#{@remove_char_key}]") if @remove_char_key

  optional_whitespace = / */
  eof = /$/

  field_split_pattern = Regexp::compile(@field_split_pattern || "[#{@field_split}]")
  value_split_pattern = Regexp::compile(@value_split_pattern || "[#{@value_split}]")

  # in legacy-compatible lenient mode, the value splitter can be wrapped in optional whitespace
  if @whitespace == 'lenient'
    value_split_pattern = /#{optional_whitespace}#{value_split_pattern}#{optional_whitespace}/
  end

  # a key is a _captured_ sequence of characters or escaped spaces before optional whitespace
  # and followed by either a `value_split`, a `field_split`, or EOF.
  key_pattern = (original_params.include?('value_split_pattern') || original_params.include?('field_split_pattern')) ?
                    unquoted_capture_until_pattern(value_split_pattern, field_split_pattern) :
                    unquoted_capture_until_charclass(@value_split + @field_split)

  value_pattern = begin
    # each component expression within value_pattern _must_ capture exactly once.
    value_patterns = []

    value_patterns << quoted_capture(%q(")) # quoted double
    value_patterns << quoted_capture(%q(')) # quoted single
    if @include_brackets
      value_patterns << quoted_capture('(', ')') # bracketed paren
      value_patterns << quoted_capture('[', ']') # bracketed square
      value_patterns << quoted_capture('<', '>') # bracketed angle
    end

    # an unquoted value is a _captured_ sequence of characters or escaped spaces before a `field_split` or EOF.
    value_patterns << (original_params.include?('field_split_pattern') ?
                           unquoted_capture_until_pattern(field_split_pattern) :
                           unquoted_capture_until_charclass(@field_split))

    Regexp.union(value_patterns)
  end

  @scan_re = /#{key_pattern}#{value_split_pattern}#{value_pattern}?#{Regexp::union(field_split_pattern, eof)}/
  @value_split_re = value_split_pattern

  @logger.debug? && @logger.debug("KV scan regex", :regex => @scan_re.inspect)

  # divide by float to allow fractional seconds, the Timeout class timeout value is in seconds but the underlying
  # executor resolution is in microseconds so fractional second parameter down to microseconds is possible.
  # see https://github.com/jruby/jruby/blob/9.2.7.0/core/src/main/java/org/jruby/ext/timeout/Timeout.java#L125
  @timeout_seconds = @timeout_millis / 1000.0
end

Private Instance Methods

has_value_splitter?(s) click to toggle source
# File lib/logstash/filters/kv.rb, line 506
def has_value_splitter?(s)
  s =~ @value_split_re
end
parse(text, event, kv_keys) click to toggle source

Parses the given ‘text`, using the `event` for context, into the provided `kv_keys` hash

@param text [String]: the text to parse @param event [LogStash::Event]: the event from which to extract context (e.g., sprintf vs (in|ex)clude keys) @param kv_keys [Hash{String=>Object}]: the hash in which to inject found key/value pairs

@return [void]

# File lib/logstash/filters/kv.rb, line 569
def parse(text, event, kv_keys)
  # short circuit parsing if the text does not contain the @value_split
  return unless has_value_splitter?(text)

  # Interpret dynamic keys for @include_keys and @exclude_keys
  include_keys = @include_keys.map{|key| event.sprintf(key)}
  exclude_keys = @exclude_keys.map{|key| event.sprintf(key)}

  text.scan(@scan_re) do |key, *value_candidates|
    value = value_candidates.compact.first || EMPTY_STRING
    next if value.empty? && !@allow_empty_values

    key = key.gsub(@trim_key_re, EMPTY_STRING) if @trim_key
    key = key.gsub(@remove_char_key_re, EMPTY_STRING) if @remove_char_key
    key = transform(key, @transform_key) if @transform_key

    # Bail out as per the values of include_keys and exclude_keys
    next if not include_keys.empty? and not include_keys.include?(key)
    # next unless include_keys.include?(key)
    next if exclude_keys.include?(key)

    key = event.sprintf(@prefix) + key

    value = value.gsub(@trim_value_re, EMPTY_STRING) if @trim_value
    value = value.gsub(@remove_char_value_re, EMPTY_STRING) if @remove_char_value
    value = transform(value, @transform_value) if @transform_value

    # Bail out if inserting duplicate value in key mapping when unique_values
    # option is set to true.
    next if not @allow_duplicate_values and kv_keys.has_key?(key) and kv_keys[key].include?(value)

    # recursively get more kv pairs from the value
    if @recursive
      innerKv = {}
      parse(value, event, innerKv)
      value = innerKv unless innerKv.empty?
    end

    if kv_keys.has_key?(key)
      if kv_keys[key].is_a?(Array)
        kv_keys[key].push(value)
      else
        kv_keys[key] = [kv_keys[key], value]
      end
    else
      kv_keys[key] = value
    end
  end
end
parse_value(value, event) click to toggle source
# File lib/logstash/filters/kv.rb, line 471
def parse_value(value, event)
  kv = Hash.new

  case value
  when nil
    # Nothing to do
  when String
    parse(value, event, kv)
  when Array
    value.each { |v| parse(v, event, kv) }
  else
    @logger.warn("kv filter has no support for this type of data", :type => value.class, :value => value)
  end

  kv
end
quoted_capture(quote_sequence, close_quote_sequence=quote_sequence) click to toggle source

Helper function for generating single-capture ‘Regexp` that, when matching a string bound by the given quotes or brackets, will capture the content that is between the quotes or brackets.

@api private @param quote_sequence [String] a character sequence that begins a quoted expression @param close_quote_sequence [String] a character sequence that ends a quoted expression; (default: quote_sequence) @return [Regexp] with a single capture group representing content that is between the given quotes

# File lib/logstash/filters/kv.rb, line 517
def quoted_capture(quote_sequence, close_quote_sequence=quote_sequence)
  fail('quote_sequence must be non-empty!') if quote_sequence.nil? || quote_sequence.empty?
  fail('close_quote_sequence must be non-empty!') if close_quote_sequence.nil? || close_quote_sequence.empty?

  open_pattern = /#{Regexp.quote(quote_sequence)}/
  close_pattern = /#{Regexp.quote(close_quote_sequence)}/

  # matches a sequence of zero or more characters are _not_ the `close_quote_sequence`
  quoted_value_pattern = unquoted_capture_until_charclass(Regexp.quote(close_quote_sequence))

  /#{open_pattern}#{quoted_value_pattern}?#{close_pattern}/
end
summarize(value) click to toggle source

@overload summarize(value)

@param value [Array]
@return [String]

@overload summarize(value)

@param value [String]
@return [String]
# File lib/logstash/filters/kv.rb, line 494
def summarize(value)
  if value.kind_of?(Array)
    value.map(&:to_s).map do |entry|
      summarize(entry)
    end.to_s
  end

  value = value.to_s

  value.bytesize < 255 ? "`#{value.dump}`" : "(entry too large to show; showing first 255 characters) `#{value[0..255].dump}`[...]"
end
transform(text, method) click to toggle source
# File lib/logstash/filters/kv.rb, line 551
def transform(text, method)
  case method
  when TRANSFORM_LOWERCASE_KEY
    return text.downcase
  when TRANSFORM_UPPERCASE_KEY
    return text.upcase
  when TRANSFORM_CAPITALIZE_KEY
    return text.capitalize
  end
end
unquoted_capture_until_charclass(charclass) click to toggle source

Helper function for generating capturing ‘Regexp` that will efficiently match any sequence of characters that are either backslash-escaped or do not belong to the given charclass.

@api private @param charclass [String] characters to be injected directly into a regexp charclass; special characters must be pre-escaped. @return [Regexp]

# File lib/logstash/filters/kv.rb, line 547
def unquoted_capture_until_charclass(charclass)
  /((?:\\.|[^#{charclass}])+)/
end
unquoted_capture_until_pattern(*patterns) click to toggle source

Helper function for generating capturing ‘Regexp` that will match any sequence of characters that are either backslash-escaped OR NOT matching any of the given pattern(s)

@api private @param *until_lookahead_patterns [Regexp] @return [Regexp]

# File lib/logstash/filters/kv.rb, line 536
def unquoted_capture_until_pattern(*patterns)
  pattern = patterns.size > 1 ? Regexp.union(patterns) : patterns.first
  /((?:(?!#{pattern})(?:\\.|.))+)/
end