class LogStash::Inputs::CSVFile
Subclass of logstash-input-file that parses CSV lines, with support for first-line schemas. Set first_line_defines_columns => true to enable this behavior. Statically defined columns are also supported, a la logstash-filter-csv, via the columns param. first_line_defines_columns => true takes precedence, though.
Since multiple files may be being read by the same plugin instance, and each can have a distinct schema, this plugin records a schema per source file (as defined by the event's path attribute) in a hash. When it receives an event for a file it doesn't know it reads/parses that file's first line to obtain the schema. This method supports resuming processing after logstash restarts in mid-file.
I considered extending logstash-filter-csv for to do this, but felt that the only reliable way to support streaming csv read was to explicitly read it from the file's schema row (and cache it so subsequent row performance for that file is good.) Since we cannot count on a logstash filter having read-access to the file, or even processing events that originate from files I rejected this approach. By definition, a file input plugin must have read-access to the file it's sourcing data from.
This plugin borrows most of its csv parsing logic from logstash-filter-csv.
This plugin extends logstash-input-file by overriding its decorate method. Note that logstash-input-plugin 0.0.10, released with Logstash 1.5, doesn't set the event's path element before calling decorate (which this plugin requires), so gemspec insists on logstash-input-file 1.1.0
Public Instance Methods
# File lib/logstash/inputs/csvfile.rb, line 187 def addSchemaToCache(path, schema) @fileColumns[path] = schema touchSchema(path) end
# File lib/logstash/inputs/csvfile.rb, line 94 def decorate(event) super(event) message = event["message"] return if !message begin values = CSV.parse_line(message, :col_sep => @separator, :quote_char => @quote_char) return if values.length == 0 # Get names for the columns. if @first_line_defines_columns @logger.debug? && @logger.debug("handling csv in first_line_defines_columns mode", :message => message, :columns => @columns) cols = getSchemaForFile(event, values) else @logger.debug? && @logger.debug("handling csv in explicitly defined columns mode", :message => message, :columns => @columns) cols = @columns end # Determine where to write the new attributes if @target.nil? # Default is to write to the root of the event. dest = event else dest = event[@target] ||= {} end # Add the per-column attributes (as long as this isn't the event from the schema defining row) if !event["_csvmetadata"] values.each_index do |i| field_name = cols[i] || "column#{i+1}" dest[field_name] = values[i] end end rescue => e event.tag "_csvparsefailure" @logger.warn("Trouble parsing csv", :message => message, :exception => e) return end # begin end
# File lib/logstash/inputs/csvfile.rb, line 183 def getCachedSchemaForFile(path) @fileColumns[path] end
# File lib/logstash/inputs/csvfile.rb, line 136 def getSchemaForFile(event, parsedValues) path = event["path"] if !path @logger.warn("No path in event. Cannot retrieve a schema for this event.") return [] end @logger.debug? && @logger.debug("Getting schema for file", :path => path) schema = getCachedSchemaForFile(path) if schema @logger.debug? && @logger.debug("Using cached schema", :cols => schema) event["_schemacachetelemetry"]="cachedEntryUsed" if @add_schema_cache_telemetry_to_event touchSchema(path) return schema end @logger.debug? && @logger.debug("Event from unknown file/schema. Reading schema from that file.", :path => path) scrubSchemaCache(event) if @max_cached_schema_age_hours > 0 csvFileLine = readSchemaLineFromFile(path) if !csvFileLine || csvFileLine.length == 0 @logger.warn("No suitable schema row found in file.", :path => path) return [] end schema = CSV.parse_line(csvFileLine, :col_sep => @separator, :quote_char => @quote_char) addSchemaToCache(path, schema) @logger.debug? && @logger.debug("Schema read from file:", :path => path, :cols => schema) if @add_schema_cache_telemetry_to_event event["_schemacachetelemetry"]="newEntryCreated" event["_cache_touch_time"]=Time.now end # Special handling for the schema row event: tag _csvmetadata and don't return individual column attributes if @fileColumns[path].join == parsedValues.join @logger.debug? && @logger.debug("Received the schema row event. Tagging w/ _csvmetadata", :message => message) event["_csvmetadata"] = true return [] else return schema end end
# File lib/logstash/inputs/csvfile.rb, line 196 def readSchemaLineFromFile(path) csvFileLine = "" File.open(path, "r") do |f| while csvFileLine.length == 0 and csvFileLine = f.gets if @schema_pattern_to_match if !csvFileLine.end_with?("\n") or !csvFileLine.match(@schema_pattern_to_match) csvFileLine = "" end end end end csvFileLine end
# File lib/logstash/inputs/csvfile.rb, line 86 def register @fileColumns = Hash.new @schemaTouchedTimes = Hash.new super() @logger.warn("schema cache scrubbing disabled. Memory use will grow over time.") if @max_cached_schema_age_hours <= 0 end
# File lib/logstash/inputs/csvfile.rb, line 210 def scrubSchemaCache(event) @logger.debug? && @logger.debug("Scrubbing schema cache", :size => @fileColumns.length) event["_schemacachetelemetryscrubbedbeforecount"]=@fileColumns.length if @add_schema_cache_telemetry_to_event expiringFiles = [] now = Time.now @schemaTouchedTimes.each do |filename, lastReadTime| if (lastReadTime + (@max_cached_schema_age_hours * 60 * 60)) < now expiringFiles << filename @logger.debug? && @logger.debug("Expiring schema for: ", :file => filename, :lastRead => lastReadTime) end end expiringFiles.each do |filename| @fileColumns.delete(filename) @schemaTouchedTimes.delete(filename) @logger.debug? && @logger.debug("Deleted schema for: ", :file => filename) end event["_schemacachetelemetryscrubbedaftercount"]=@fileColumns.length if @add_schema_cache_telemetry_to_event @logger.debug? && @logger.debug("Done scrubbing schema cache", :size => @fileColumns.length) end
# File lib/logstash/inputs/csvfile.rb, line 192 def touchSchema(path) @schemaTouchedTimes[path] = Time.now end