class LogStash::Outputs::SCACSV

SCACSV - based upon original Logstash CSV output.

Write events to disk in CSV format Write a PI header as the first line in the file Name file per PI convention, based upon first and last timestamps encountered

Public Instance Methods

receive(event) click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 121
  def receive(event)
    return unless output?(event)

    @logger.debug("in SCACSV receive")

    if (event['SCAWindowMarker']) 
      # just eat the marker - don't output it
      # if we had at least one record output, then close the file and move on
      if @recordCount >= 1
        closeAndRenameCurrentFile
      end
    else

      # Now see if we need to close file because of a new boundary
      if @closeOnIntervalBoundaries and @recordCount >= 1 and (@currentOutputIntervalStartTime != snapTimestampToInterval(timestampAsEpochSeconds(event),@fileIntervalWidthSeconds))
          closeAndRenameCurrentFile
      end

      @formattedPath = event.sprintf(@path)
      fd = open(@formattedPath)
      @logger.debug("SCACSVreceive - after opening fd=" + fd.to_s)

      if @recordCount == 0
        # output header on first line - note, need a minimum of one record for sensible output
        if @header then 
#         csv_header = @fields.map { |name| name }
          fd.write(@header.to_csv(@csv_options))
        else     
          fd.write(@fields.to_csv(@csv_options))
        end
      end

      csv_values = @fields.map {|name| get_value(name, event)}
      fd.write(csv_values.to_csv(@csv_options))

      flush(fd)
      close_stale_files

      # remember state
      @recordCount = @recordCount + 1
      @lastOutputTime = Time.now

      # capture the earliest - assumption is that records are in order
      if (@recordCount) == 1 
        if !@closeOnIntervalBoundaries
          @startTime = event[@time_field]
        else
          @startTime = snapTimestampToInterval(timestampAsEpochSeconds(event),@fileIntervalWidthSeconds)
        end
      end

      # for every record, update endTime - again, assumption is that records are in order
      if !@closeOnIntervalBoundaries
        @endTime = event[@time_field]
      else
        @endTime = @startTime + @fileIntervalWidthSeconds - 1   # end of interval
      end

#puts("After snapping. timestamp=" + event[@time_field].to_s + " startTime=" + @startTime.to_s + " endTime = " + @endTime.to_s)

      # remember start of boundary for next time
      if @closeOnIntervalBoundaries
         @currentOutputIntervalStartTime = @startTime
      end


      if ((@max_size > 0) and (@recordCount >= max_size))
        # Have enough records, close it out
        closeAndRenameCurrentFile
      end

    end  

  end
register() click to toggle source
Calls superclass method
# File lib/logstash/outputs/scacsv.rb, line 56
def register
  super
  @csv_options = Hash[@csv_options.map{|(k,v)|[k.to_sym, v]}]
  
  # variables to hold the start and end times which we'll use to rename the files to
  @startTime   = "missingStartTime"
  @endTime     = "missingEndTime"
  @recordCount = 0

  @lastOutputTime = 0 #data time
  @flushInterval = @flush_interval.to_i

  @timerThread = Thread.new { flushWatchdog(@flush_interval) }

  @currentOutputIntervalStartTime = 0
  @fileIntervalWidthSeconds = 0
  @closeOnIntervalBoundaries = false
  case @file_interval_width.upcase
  when "MINUTE"
    @fileIntervalWidthSeconds = 60
    @closeOnIntervalBoundaries = true
  when "FIVE"
    @fileIntervalWidthSeconds = 300
    @closeOnIntervalBoundaries = true 
  when "FIFTEEN"
    @fileIntervalWidthSeconds = 900
    @closeOnIntervalBoundaries = true 
  when "HOUR"
    @fileIntervalWidthSeconds = 3600
    @closeOnIntervalBoundaries = true 
  when "DAY"
    @fileIntervalWidthSeconds = 86400
    @closeOnIntervalBoundaries = true
  else
    @fileIntervalWidthSeconds = 0 #not used
    @closeOnIntervalBoundaries = false
  end

  @df = nil 
  if (@time_field_format != "epoch")
    @df = java.text.SimpleDateFormat.new(@time_field_format)
  end


end

Private Instance Methods

closeAndRenameCurrentFile() click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 268
  def closeAndRenameCurrentFile

    # cloned and changed from the 'file.rb' operator
    # even though this is in a loop - assumption is that we have one file here for the SCA CSV use
    @files.each do |path, fd|
      begin
        fd.close
        @files.delete(path) # so it will be forgotten and we can open it up again if needed
        @logger.debug("closeAndRenameCurrentFile #{path}", :fd => fd)

        # Now the various time adjustments

        begin # determine start&end times

          if (@time_field_format != "epoch")
            # if not epoch, then we expect java timestamp format
            # so must convert start/end times
            nStartTime = @df.parse(@startTime)
            nEndTime   = @df.parse(@endTime)

            @startTime = @df.parse(@startTime).getTime
            @endTime   = @df.parse(@endTime).getTime

          end

          # Ensure epoch time from here on out

          if (!@startTime.nil?)
            @startTime = @startTime.to_i + @tz_offset
          end

          if (!@endTime.nil?)
            @endTime   = @endTime.to_i + @tz_offset
            if (@increment_time)
              # increment is used to ensure that the end-time on the filename is after the last data value
              @endTime = @endTime.to_i + 1000 # 1000ms = 1sec

            end
          end

          # then do conversion for output

#        @startTime = formatOutputTime( time, time_field_format, timestamp_output_format, missingString )
         @startTime = formatOutputTime( @startTime, @time_field_format, @timestamp_output_format, "noStartTime" )
         @endTime   = formatOutputTime( @endTime,   @time_field_format, @timestamp_output_format, "noEndTime" )
         
        rescue Exception => e
          @logger.error("Exception while flushing and closing files - preparing start/end time", :exception => e)
          raise
        end

        # timestamps are strings here

        newFilename = "#{group}" + "__" + @startTime + "__" + @endTime + ".csv"

        if newFilename.include? '/'
          @logger.error("New filename " + newFilename + " cannot contain / characters. Check the timestamp format. / characters stripped from filename")
          newFilename = newFilename.delete! '/'
        end

        realdirpath = File.dirname(File.realdirpath("#{path}"))
        realdirpath = File.dirname(File.realdirpath(path))
        oldFilename = File.basename(path)

        File.rename(realdirpath + "/" + oldFilename, realdirpath + "/" + newFilename)
        
        # reset record count so we'll pick up new start time, and put a header on next file
        # when a new record comes in
        @recordCount = 0
        @lastOutputTime = Time.now

      rescue Exception => e
        @logger.error("Exception while flushing and closing files.", :exception => e)
      end
    end

  end
epochAsJavaDate( epochTimestamp ) click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 224
def epochAsJavaDate( epochTimestamp )

  x = 0
   if epochTimestamp.to_s.length == 13
     x = java.util.Date.new(epochTimestamp.to_i)
   else
     # should be 10
     x = java.util.Date.new(epochTimestamp.to_i * 1000)
   end
  x
end
flushWatchdog(delay) click to toggle source

This thread ensures that we output (close and rename) a file every so often

# File lib/logstash/outputs/scacsv.rb, line 104
def flushWatchdog(delay)
  begin
    @logger.debug("SCACSVFlushWatchdog - Last output time = " + @lastOutputTime.to_s)
    while true do
      @logger.debug("SCACSVFlushWatchdog - Time.now = " + Time.now.to_s + " $lastOutputTime=" + @lastOutputTime.to_s + " delay=" + delay.to_s)

      if ( (Time.now.to_i >= (@lastOutputTime.to_i + delay.to_i)) and (@recordCount > 0)) then
        @logger.debug("SCACSVFlushWatchdog - closeAndRenameCurrentFile")
        closeAndRenameCurrentFile
      end
      @logger.debug("SCACSVFlushWatchdog - Sleeping")
      sleep 1
    end
  end
end
formatOutputTime( timestamp, time_field_format, timestamp_output_format, missingString ) click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 236
def formatOutputTime( timestamp, time_field_format, timestamp_output_format, missingString )

  outputString = ""

  begin

    if timestamp.nil? then
      @logger.debug("SCACSV " + missingString + " for  #{group}")
    elsif timestamp_output_format == "epoch" then  
      outputString = timestamp.to_s  
    elsif timestamp_output_format == "" then
      # use time_field format
      if time_field_format == "epoch" then
        outputString = timestamp.to_s
      else
        df = java.text.SimpleDateFormat.new(time_field_format)
        outputString = df.format(epochAsJavaDate(timestamp))
      end
    else # explicit java timeformat supplied
      df = java.text.SimpleDateFormat.new(timestamp_output_format)
      outputString = df.format(epochAsJavaDate(timestamp))
    end
 
  rescue Exception => e
    @logger.error("Exception determining output file timestamp. " + missingString, :exception => e)
    outputString = missingString
  end

  outputString

end
get_value(name, event) click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 213
def get_value(name, event)
  val = event[name]
  case val
    when Hash
      return val.to_json
    else
      return val
  end
end
snapTimestampToInterval(timestamp,interval) click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 208
def snapTimestampToInterval(timestamp,interval)
  intervalStart = (timestamp/ interval) * interval    
end
teardown() click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 346
def teardown
   @logger.debug("SCACSV - Teardown: closing files")

  Thread.kill(@timerThread)
  closeAndRenameCurrentFile

  finished
end
timestampAsEpochSeconds(event) click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 197
def timestampAsEpochSeconds(event)
  # rmck: come back and remove global refs here!
  if !@df.nil?
    @df.parse(event[@time_field])
  else
    #when df not set, we assume epoch seconds
    event[@time_field].to_i
  end
end