class LogStash::Filters::Multiline

This filter will collapse multiline messages from a single source into one Logstash event.

The original goal of this filter was to allow joining of multi-line messages from files into a single event. For example - joining java exception and stacktrace messages into a single event.

NOTE: This filter will not work with multiple worker threads `-w 2` on the logstash command line.

The config looks like this:

source,ruby

filter {

multiline {
  pattern => "pattern, a regexp"
  negate => boolean
  what => "previous" or "next"
}

}

The `pattern` should be a regexp (<<plugins-filters-grok,grok>> patterns are supported) which matches what you believe to be an indicator that the field is part of an event consisting of multiple lines of log data.

The `what` must be `previous` or `next` and indicates the relation to the multi-line event.

The `negate` can be `true` or `false` (defaults to `false`). If `true`, a message not matching the pattern will constitute a match of the multiline filter and the `what` will be applied. (vice-versa is also true)

For example, Java stack traces are multiline and usually have the message starting at the far-left, with each subsequent line indented. Do this:

source,ruby

filter {

multiline {
  pattern => "^\s"
  what => "previous"
}

}

This says that any line starting with whitespace belongs to the previous line.

Another example is C line continuations (backslash). Here's how to do that:

source,ruby

filter {

multiline {
  pattern => "\\$"
  what => "next"
}

}

This says that any line ending with a backslash should be combined with the following line.

Constants

MULTILINE_TAG

Public Class Methods

new(config = {}) click to toggle source
Calls superclass method
# File lib/logstash/filters/multiline.rb, line 124
def initialize(config = {})
  super

  # this filter cannot be parallelized because message order
  # cannot be garanteed across threads, line #2 could be processed
  # before line #1
  @threadsafe = false

  # this filter needs to keep state
  @pending = Hash.new
end

Private Class Methods

event_hash_merge!(dst, src, dups_key = nil) click to toggle source

merge two events data hash, src into dst and handle duplicate values for dups_key @param dst [Hash] the event to merge into, dst will be mutated @param src [Hash] the event to merge in dst @param dups_key [String] the field key to keep duplicate values @return [Hash] mutated dst

# File lib/logstash/filters/multiline.rb, line 288
def self.event_hash_merge!(dst, src, dups_key = nil)
  src.each do |key, svalue|
    dst[key] = if dst.has_key?(key)
      dvalue = dst[key]

      if dvalue.is_a?(Hash) && svalue.is_a?(Hash)
        event_hash_merge!(dvalue, svalue, dups_key)
      else
        v = (dups_key == key) ? Array(dvalue) + Array(svalue) : Array(dvalue) | Array(svalue)
        # the v result is always an Array, if none of the fields were arrays and there is a
        # single value in the array, return the value, not the array
        dvalue.is_a?(Array) || svalue.is_a?(Array) ? v : (v.size == 1 ? v.first : v)
      end
    else
      svalue
    end
  end

  dst
end

Public Instance Methods

close() click to toggle source
# File lib/logstash/filters/multiline.rb, line 210
def close
  # nothing to do
end
filter(event) click to toggle source
# File lib/logstash/filters/multiline.rb, line 167
def filter(event)
  match = event.get(@source).is_a?(Array) ? @grok.match(event.get(@source).first) : @grok.match(event.get(@source))
  match = (match && !@negate) || (!match && @negate) # add negate option

  @logger.debug? && @logger.debug("Multiline", :pattern => @pattern, :message => event.get(@source), :match => match, :negate => @negate)

  multiline_filter!(event, match)

  filter_matched(event) unless event.cancelled?
end
flush(options = {}) click to toggle source

flush any pending messages called at regular interval without options and at pipeline shutdown with the :final => true option @param options [Hash] @option options [Boolean] :final => true to signal a final shutdown flush @return [Array<LogStash::Event>] list of flushed events

# File lib/logstash/filters/multiline.rb, line 184
def flush(options = {})
  # note that thread safety concerns are not necessary here because the multiline filter
  # is not thread safe thus cannot be run in multiple filterworker threads and flushing
  # is called by the same thread

  # select all expired events from the @pending hash into a new expired hash
  # if :final flush then select all events
  expired = @pending.inject({}) do |result, (key, events)|
    unless events.empty?
      age = Time.now - events.first.get("@timestamp").time
      result[key] = events if (age >= @max_age) || options[:final]
    end
    result
  end

  # return list of uncancelled expired events
  expired.map do |key, events|
    @pending.delete(key)
    event = merge(events)
    event.uncancel
    filter_matched(event)
    event
  end
end
register() click to toggle source
# File lib/logstash/filters/multiline.rb, line 137
def register
  require "grok-pure" # rubygem 'jls-grok'

  @grok = Grok.new

  @patterns_dir = @@patterns_path.to_a + @patterns_dir
  @patterns_dir.each do |path|
    path = File.join(path, "*") if File.directory?(path)
    Dir.glob(path).each do |file|
      @logger.info("Grok loading patterns from file", :path => file)
      @grok.add_patterns_from_file(file)
    end
  end

  @grok.compile(@pattern)

  case @what
  when "previous"
    class << self; alias_method :multiline_filter!, :previous_filter!; end
  when "next"
    class << self; alias_method :multiline_filter!, :next_filter!; end
  else
    # we should never get here since @what is validated at config
    raise(ArgumentError, "Unknown multiline 'what' value")
  end # case @what

  @logger.debug("Registered multiline plugin", :type => @type, :config => @config)
end

Private Instance Methods

merge(events) click to toggle source

merge a list of events. @timestamp for the resulting merged event will be from the “oldest” (events.first). all @source fields will be deduplicated depending on @allow_duplicates and joined with n. all other fields will be deduplicated. @param events [Array<Event>] the list of events to merge @return [Event] the resulting merged event

# File lib/logstash/filters/multiline.rb, line 269
def merge(events)
  dups_key = @allow_duplicates ? @source : nil

  data = events.inject({}) do |result, event|
    self.class.event_hash_merge!(result, event.to_hash_with_metadata, dups_key)
  end

  # merged event @timestamp is from first event in sequence
  data["@timestamp"] = Array(data["@timestamp"]).first
  # collapse all @source field values
  data[@source] = Array(data[@source]).join("\n")
  LogStash::Event.new(data)
end
next_filter!(event, match) click to toggle source
# File lib/logstash/filters/multiline.rb, line 243
def next_filter!(event, match)
  key = event.sprintf(@stream_identity)
  pending = @pending[key] ||= []

  if match
    # this line is part of a multiline event, the next line will be part, too, put it into pending.
    event.tag(MULTILINE_TAG)
    pending << event
    event.cancel
  else
    # if we have something in pending, join it with this message and send it.
    # otherwise, this is a new message and not part of multiline, send it.
    unless pending.empty?
      merged_events = merge(pending << event)
      event.overwrite(merged_events)
      event.set("@metadata", merged_events.get("@metadata")) # Override does not copy the metadata
      pending.clear
    end
  end # if match
end
previous_filter!(event, match) click to toggle source
# File lib/logstash/filters/multiline.rb, line 216
def previous_filter!(event, match)
  key = event.sprintf(@stream_identity)
  pending = @pending[key] ||= []

  if match
    # previous previous line is part of this event. append it to the event and cancel it
    event.tag(MULTILINE_TAG)
    pending << event
    event.cancel
  else
    # this line is not part of the previous event if we have a pending event, it's done, send it.
    # put the current event into pending
    unless pending.empty?
      tmp           = event.to_hash_with_metadata
      merged_events = merge(pending)
      event.overwrite(merged_events)
      event.set("@metadata", merged_events.get("@metadata")) # Override does not copy the metadata
      pending.clear # avoid array creation
      pending << LogStash::Event.new(tmp)
    else
      pending.clear # avoid array creation
      pending << event
      event.cancel
    end
  end # if match
end