class LogStash::Outputs::SCACSV
SCACSV
- based upon original Logstash CSV output.
Write events to disk in CSV format Write a PI header as the first line in the file Name file per PI convention, based upon first and last timestamps encountered
Public Instance Methods
receive(event)
click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 121 def receive(event) return unless output?(event) @logger.debug("in SCACSV receive") if (event['SCAWindowMarker']) # just eat the marker - don't output it # if we had at least one record output, then close the file and move on if @recordCount >= 1 closeAndRenameCurrentFile end else # Now see if we need to close file because of a new boundary if @closeOnIntervalBoundaries and @recordCount >= 1 and (@currentOutputIntervalStartTime != snapTimestampToInterval(timestampAsEpochSeconds(event),@fileIntervalWidthSeconds)) closeAndRenameCurrentFile end @formattedPath = event.sprintf(@path) fd = open(@formattedPath) @logger.debug("SCACSVreceive - after opening fd=" + fd.to_s) if @recordCount == 0 # output header on first line - note, need a minimum of one record for sensible output if @header then # csv_header = @fields.map { |name| name } fd.write(@header.to_csv(@csv_options)) else fd.write(@fields.to_csv(@csv_options)) end end csv_values = @fields.map {|name| get_value(name, event)} fd.write(csv_values.to_csv(@csv_options)) flush(fd) close_stale_files # remember state @recordCount = @recordCount + 1 @lastOutputTime = Time.now # capture the earliest - assumption is that records are in order if (@recordCount) == 1 if !@closeOnIntervalBoundaries @startTime = event[@time_field] else @startTime = snapTimestampToInterval(timestampAsEpochSeconds(event),@fileIntervalWidthSeconds) end end # for every record, update endTime - again, assumption is that records are in order if !@closeOnIntervalBoundaries @endTime = event[@time_field] else @endTime = @startTime + @fileIntervalWidthSeconds - 1 # end of interval end #puts("After snapping. timestamp=" + event[@time_field].to_s + " startTime=" + @startTime.to_s + " endTime = " + @endTime.to_s) # remember start of boundary for next time if @closeOnIntervalBoundaries @currentOutputIntervalStartTime = @startTime end if ((@max_size > 0) and (@recordCount >= max_size)) # Have enough records, close it out closeAndRenameCurrentFile end end end
register()
click to toggle source
Calls superclass method
# File lib/logstash/outputs/scacsv.rb, line 56 def register super @csv_options = Hash[@csv_options.map{|(k,v)|[k.to_sym, v]}] # variables to hold the start and end times which we'll use to rename the files to @startTime = "missingStartTime" @endTime = "missingEndTime" @recordCount = 0 @lastOutputTime = 0 #data time @flushInterval = @flush_interval.to_i @timerThread = Thread.new { flushWatchdog(@flush_interval) } @currentOutputIntervalStartTime = 0 @fileIntervalWidthSeconds = 0 @closeOnIntervalBoundaries = false case @file_interval_width.upcase when "MINUTE" @fileIntervalWidthSeconds = 60 @closeOnIntervalBoundaries = true when "FIVE" @fileIntervalWidthSeconds = 300 @closeOnIntervalBoundaries = true when "FIFTEEN" @fileIntervalWidthSeconds = 900 @closeOnIntervalBoundaries = true when "HOUR" @fileIntervalWidthSeconds = 3600 @closeOnIntervalBoundaries = true when "DAY" @fileIntervalWidthSeconds = 86400 @closeOnIntervalBoundaries = true else @fileIntervalWidthSeconds = 0 #not used @closeOnIntervalBoundaries = false end @df = nil if (@time_field_format != "epoch") @df = java.text.SimpleDateFormat.new(@time_field_format) end end
Private Instance Methods
closeAndRenameCurrentFile()
click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 268 def closeAndRenameCurrentFile # cloned and changed from the 'file.rb' operator # even though this is in a loop - assumption is that we have one file here for the SCA CSV use @files.each do |path, fd| begin fd.close @files.delete(path) # so it will be forgotten and we can open it up again if needed @logger.debug("closeAndRenameCurrentFile #{path}", :fd => fd) # Now the various time adjustments begin # determine start&end times if (@time_field_format != "epoch") # if not epoch, then we expect java timestamp format # so must convert start/end times nStartTime = @df.parse(@startTime) nEndTime = @df.parse(@endTime) @startTime = @df.parse(@startTime).getTime @endTime = @df.parse(@endTime).getTime end # Ensure epoch time from here on out if (!@startTime.nil?) @startTime = @startTime.to_i + @tz_offset end if (!@endTime.nil?) @endTime = @endTime.to_i + @tz_offset if (@increment_time) # increment is used to ensure that the end-time on the filename is after the last data value @endTime = @endTime.to_i + 1000 # 1000ms = 1sec end end # then do conversion for output # @startTime = formatOutputTime( time, time_field_format, timestamp_output_format, missingString ) @startTime = formatOutputTime( @startTime, @time_field_format, @timestamp_output_format, "noStartTime" ) @endTime = formatOutputTime( @endTime, @time_field_format, @timestamp_output_format, "noEndTime" ) rescue Exception => e @logger.error("Exception while flushing and closing files - preparing start/end time", :exception => e) raise end # timestamps are strings here newFilename = "#{group}" + "__" + @startTime + "__" + @endTime + ".csv" if newFilename.include? '/' @logger.error("New filename " + newFilename + " cannot contain / characters. Check the timestamp format. / characters stripped from filename") newFilename = newFilename.delete! '/' end realdirpath = File.dirname(File.realdirpath("#{path}")) realdirpath = File.dirname(File.realdirpath(path)) oldFilename = File.basename(path) File.rename(realdirpath + "/" + oldFilename, realdirpath + "/" + newFilename) # reset record count so we'll pick up new start time, and put a header on next file # when a new record comes in @recordCount = 0 @lastOutputTime = Time.now rescue Exception => e @logger.error("Exception while flushing and closing files.", :exception => e) end end end
epochAsJavaDate( epochTimestamp )
click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 224 def epochAsJavaDate( epochTimestamp ) x = 0 if epochTimestamp.to_s.length == 13 x = java.util.Date.new(epochTimestamp.to_i) else # should be 10 x = java.util.Date.new(epochTimestamp.to_i * 1000) end x end
flushWatchdog(delay)
click to toggle source
This thread ensures that we output (close and rename) a file every so often
# File lib/logstash/outputs/scacsv.rb, line 104 def flushWatchdog(delay) begin @logger.debug("SCACSVFlushWatchdog - Last output time = " + @lastOutputTime.to_s) while true do @logger.debug("SCACSVFlushWatchdog - Time.now = " + Time.now.to_s + " $lastOutputTime=" + @lastOutputTime.to_s + " delay=" + delay.to_s) if ( (Time.now.to_i >= (@lastOutputTime.to_i + delay.to_i)) and (@recordCount > 0)) then @logger.debug("SCACSVFlushWatchdog - closeAndRenameCurrentFile") closeAndRenameCurrentFile end @logger.debug("SCACSVFlushWatchdog - Sleeping") sleep 1 end end end
formatOutputTime( timestamp, time_field_format, timestamp_output_format, missingString )
click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 236 def formatOutputTime( timestamp, time_field_format, timestamp_output_format, missingString ) outputString = "" begin if timestamp.nil? then @logger.debug("SCACSV " + missingString + " for #{group}") elsif timestamp_output_format == "epoch" then outputString = timestamp.to_s elsif timestamp_output_format == "" then # use time_field format if time_field_format == "epoch" then outputString = timestamp.to_s else df = java.text.SimpleDateFormat.new(time_field_format) outputString = df.format(epochAsJavaDate(timestamp)) end else # explicit java timeformat supplied df = java.text.SimpleDateFormat.new(timestamp_output_format) outputString = df.format(epochAsJavaDate(timestamp)) end rescue Exception => e @logger.error("Exception determining output file timestamp. " + missingString, :exception => e) outputString = missingString end outputString end
get_value(name, event)
click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 213 def get_value(name, event) val = event[name] case val when Hash return val.to_json else return val end end
snapTimestampToInterval(timestamp,interval)
click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 208 def snapTimestampToInterval(timestamp,interval) intervalStart = (timestamp/ interval) * interval end
teardown()
click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 346 def teardown @logger.debug("SCACSV - Teardown: closing files") Thread.kill(@timerThread) closeAndRenameCurrentFile finished end
timestampAsEpochSeconds(event)
click to toggle source
# File lib/logstash/outputs/scacsv.rb, line 197 def timestampAsEpochSeconds(event) # rmck: come back and remove global refs here! if !@df.nil? @df.parse(event[@time_field]) else #when df not set, we assume epoch seconds event[@time_field].to_i end end