class LogStash::Filters::Collate

Collate events by time or count.

The original goal of this filter was to merge the logs from different sources by the time of log, for example, in real-time log collection, logs can be collated by amount of 3000 logs or can be collated in 30 seconds.

The config looks like this:

source,ruby

filter {

collate {
  count => 3000
  interval => "30s"
  order => "ascending"
}

}

Public Instance Methods

filter(event) { |collatedEvent| ... } click to toggle source
# File lib/logstash/filters/collate.rb, line 52
def filter(event)
  @logger.info("do collate filter")
  if event == LogStash::SHUTDOWN
    @job.trigger()
    @job.unschedule()
    @logger.info("collate filter thread shutdown.")
    return
  end

  # if the event is collated, a "collated" tag will be marked, so for those uncollated event, cancel them first.
  if event["tags"].nil? || !event["tags"].include?("collated")
    event.cancel
  else
    return
  end

  @mutex.synchronize{
    @collatingArray.push(event.clone)

    if (@collatingArray.length == @count)
      collate
    end

    if (@collatingDone)
      while collatedEvent = @collatingArray.pop
        collatedEvent["tags"] = Array.new if collatedEvent["tags"].nil?
        collatedEvent["tags"] << "collated"
        filter_matched(collatedEvent)
        yield collatedEvent
      end # while @collatingArray.pop
      # reset collatingDone flag
      @collatingDone = false
    end
  }
end
flush(options = {}) click to toggle source

Flush any pending messages.

# File lib/logstash/filters/collate.rb, line 101
def flush(options = {})
  events = []
  if (@collatingDone)
    @mutex.synchronize{
      while collatedEvent = @collatingArray.pop
        collatedEvent["tags"] = Array.new if collatedEvent["tags"].nil?
        collatedEvent["tags"] << "collated"
        events << collatedEvent
      end # while @collatingArray.pop
    }
    # reset collatingDone flag.
    @collatingDone = false
  end
  return events
end
register() click to toggle source
# File lib/logstash/filters/collate.rb, line 35
def register
  require "thread"
  require "rufus/scheduler"

  @mutex = Mutex.new
  @collatingDone = false
  @collatingArray = Array.new
  @scheduler = Rufus::Scheduler.new
  @job = @scheduler.every @interval do
    @logger.info("Scheduler Activated")
    @mutex.synchronize{
      collate
    }
  end
end

Private Instance Methods

collate() click to toggle source
# File lib/logstash/filters/collate.rb, line 89
def collate
  if (@order == "ascending")
    # call .to_i for now until https://github.com/elasticsearch/logstash/issues/2052 is fixed
    @collatingArray.sort! { |eventA, eventB| eventB.timestamp.to_i <=> eventA.timestamp.to_i }
  else 
    @collatingArray.sort! { |eventA, eventB| eventA.timestamp.to_i <=> eventB.timestamp.to_i }
  end
  @collatingDone = true
end