class LogStash::Filters::Augment
This filter will allow you to augment events in logstash from an external file source
Public Instance Methods
filter(event)
click to toggle source
# File lib/logstash/filters/augment.rb, line 157 def filter(event) load_or_refresh_dictionaries(false) return unless event.include?(@field) # Skip translation in case event does not have @event field. begin #If source field is array use first value and make sure source value is string source = event.get(@field).is_a?(Array) ? event.get(@field).first.to_s : event.get(@field).to_s row = lock_for_read { @dictionary[source] } if !row row = @default end return unless row # nothing to do if there's nothing to add if @only_fields only_fields.each { |k| event.set("#{@target}[#{k}]",row[v]) if row[v] } else row.each { |k,v| event.set("#{@target}[#{k}]",v) unless @exclude_keys[k] } end filter_matched(event) rescue Exception => e @logger.error("Something went wrong when attempting to augment from dictionary", :exception => e, :field => @field, :event => event) end end
register()
click to toggle source
# File lib/logstash/filters/augment.rb, line 113 def register @fileModifiedTime = Hash.new rw_lock = java.util.concurrent.locks.ReentrantReadWriteLock.new @read_lock = rw_lock.readLock @write_lock = rw_lock.writeLock if !@dictionary @dictionary = Hash.new end @dictionaries = @dictionary_path.nil? ? nil : (@dictionary_path.is_a?(Array) ? @dictionary_path : [ @dictionary_path ]) if @dictionary_path && !@dictionary.empty? raise LogStash::ConfigurationError, "The configuration options 'dictionary' and 'dictionary_path' are mutually exclusive" end if @csv_ignore_first_line && !@csv_header raise LogStash::ConfigurationError, "The parameter csv_header is required if csv_ignore_first_line = true" end load_or_refresh_dictionaries(true) @exclude_keys = Hash.new if @ignore_fields @ignore_fields.each { |k| @exclude_keys[k]=true } end # validate the dictionary is in the right format if @dictionary newdic = Hash.new @dictionary.each do |key,val| if val.is_a?(Array) newdic[key] = Hash[*val] elsif val.is_a?(Hash) newdic[key] = val else raise LogStash::ConfigurationError, "The dictionary must be a hash of string to dictionary. "+key+" is neither a "+val.class.to_s end end @dictionary = newdic end @logger.debug? and @logger.debug("#{self.class.name}: Dictionary - ", :dictionary => @dictionary) end
Private Instance Methods
cleanup_data(filename, tree, key_if_array, key_type, remove_key)
click to toggle source
# File lib/logstash/filters/augment.rb, line 223 def cleanup_data(filename, tree, key_if_array, key_type, remove_key) if tree.is_a?(Array) if !key_if_array raise LogStash::ConfigurationError, "The #{filename} file is an array, but #{key_type}_key is not set" end newTree = Hash.new tree.each do |v| newTree[v[key_if_array].to_s] = v if remove_key v.delete(key_if_array) end end tree = newTree end newTree = Hash.new tree.each { |k,v| newTree[k.to_s] = v if (v.is_a?(Object))} tree = newTree return tree end
load_csv(filename, raise_exception=false)
click to toggle source
# File lib/logstash/filters/augment.rb, line 255 def load_csv(filename, raise_exception=false) if !@initialized @initialized = true if @csv_first_line == 'auto' if @csv_header @csv_first_line = 'data' else @csv_first_line = 'header' end end if @csv_first_line == 'header' && @csv_header raise LogStash::ConfigurationError, "The csv_first_line is set to 'header' but csv_header is set" end if @csv_first_line == 'ignore' && !csv_header raise LogStash::ConfigurationError, "The csv_first_line is set to 'ignore' but csv_header is not set" end end csv_lines = CSV.read(filename,{ :col_sep => @csv_col_sep, :quote_char => @csv_quote_char }); if @csv_first_line == 'header' @csv_header = csv_lines.shift elsif @csv_first_line == 'ignore' csv_lines.shift end if @csv_key.nil? @csv_key = @csv_header[0]; end data = Hash.new csv_lines.each do |line| o = Hash.new line.zip(@csv_header).each do |value, header| o[header] = value end key = o[csv_key] if @csv_remove_key o.delete(csv_key) end data[key] = o end merge_dictionary!(filename, data) end
load_dictionary(filename, raise_exception=false)
click to toggle source
# File lib/logstash/filters/augment.rb, line 201 def load_dictionary(filename, raise_exception=false) if !File.exists?(filename) if raise_exception raise "Dictionary #{filename} does not exist" else @logger.warn("Dictionary #{filename} does not exist") return end end if @dictionary_type == 'yaml' || @dictionary_type == 'yml' || (@dictionary_type == 'auto' && /.y[a]?ml$/.match(filename)) load_yaml(filename,raise_exception) elsif @dictionary_type == 'json' || (@dictionary_type == 'auto' && filename.end_with?(".json")) load_json(filename,raise_exception) elsif @dictionary_type == 'csv' || (@dictionary_type == 'auto' && filename.end_with?(".csv")) load_csv(filename,raise_exception) else raise "#{self.class.name}: Dictionary #{filename} format not recognized from filename or dictionary_type" end rescue => e loading_exception(e, raise_exception) end
load_json(filename, raise_exception=false)
click to toggle source
# File lib/logstash/filters/augment.rb, line 249 def load_json(filename, raise_exception=false) json = JSON.parse(File.read(filename)) json = cleanup_data(filename, json, @json_key, 'json', @json_remove_key) merge_dictionary!(filename, json) end
load_or_refresh_dictionaries(raise_exception=false)
click to toggle source
# File lib/logstash/filters/augment.rb, line 319 def load_or_refresh_dictionaries(raise_exception=false) if ! @dictionaries return end if @refresh_interval < 0 && @dictionary_mtime # don't refresh if we aren't supposed to return end if (@next_refresh && @next_refresh > Time.now) return end lock_for_write do if ! @dictionary_mtime @dictionary_mtime = Hash.new end if (@next_refresh && @next_refresh > Time.now) return end @logger.info("checking for modified dictionary files") @dictionaries.each { |filename| refresh_dictionary(filename,raise_exception) } @next_refresh = Time.now + @refresh_interval end end
load_yaml(filename, raise_exception=false)
click to toggle source
# File lib/logstash/filters/augment.rb, line 243 def load_yaml(filename, raise_exception=false) yaml = YAML.load_file(filename) yaml = cleanup_data(filename, yaml, @yaml_key, 'yaml', @yaml_remove_key) merge_dictionary!(filename, yaml) end
loading_exception(e, raise_exception=false)
click to toggle source
# File lib/logstash/filters/augment.rb, line 301 def loading_exception(e, raise_exception=false) msg = "#{self.class.name}: #{e.message} when loading dictionary file" if raise_exception raise RuntimeError.new(msg) else @logger.warn("#{msg}, continuing with old dictionary") end end
lock_for_read() { || ... }
click to toggle source
# File lib/logstash/filters/augment.rb, line 184 def lock_for_read @read_lock.lock begin yield ensure @read_lock.unlock end end
lock_for_write() { || ... }
click to toggle source
# File lib/logstash/filters/augment.rb, line 193 def lock_for_write @write_lock.lock begin yield ensure @write_lock.unlock end end
merge_dictionary!(filename, data)
click to toggle source
# File lib/logstash/filters/augment.rb, line 296 def merge_dictionary!(filename, data) @logger.debug("Merging data from #{filename} = #{data}") @dictionary.merge!(data) end
refresh_dictionary(filename, raise_exception)
click to toggle source
# File lib/logstash/filters/augment.rb, line 310 def refresh_dictionary(filename, raise_exception) mtime = File.mtime(filename) if ! @dictionary_mtime[filename] || @dictionary_mtime[filename] != mtime @dictionary_mtime[filename] = mtime @logger.info("file #{filename} has been modified, reloading") load_dictionary(filename, raise_exception) end end