class Fluent::ViaqDataModelFilter
Constants
- DOT_REPLACE_CHAR_UNUSED
- NORMAL_LEVELS
github.com/ViaQ/elasticsearch-templates/blob/master/namespaces/default.yml#L63
- NUM_FIELDS_UNLIMITED
- PRIORITY_LEVELS
numeric levels for the PRIORITY field
Public Instance Methods
add_elasticsearch_index_name_field(tag, time, record)
click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 389 def add_elasticsearch_index_name_field(tag, time, record) found = false @elasticsearch_index_names.each do |ein| if ein.matcher.match(tag) found = true return unless ein.enabled if ein.name_type == :operations_full || ein.name_type == :project_full || ein.name_type == :audit_full field_name = @elasticsearch_index_name_field need_time = true else field_name = @elasticsearch_index_prefix_field need_time = false end case ein.name_type when :audit_full, :audit_prefix prefix = ".audit" when :operations_full, :operations_prefix prefix = ".operations" when :project_full, :project_prefix name, uuid = nil unless record['kubernetes'].nil? k8s = record['kubernetes'] name = k8s['namespace_name'] uuid = k8s['namespace_id'] if name.nil? log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes.namespace_name field: #{record}") end if uuid.nil? log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes.namespace_id field: #{record}") end else log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes field: #{record}") end if name.nil? || uuid.nil? name = @orphaned_namespace_name end prefix = name == @orphaned_namespace_name ? @orphaned_namespace_name : "project.#{name}.#{uuid}" end if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("prefix #{prefix} need_time #{need_time} time #{record[@dest_time_name]}") end end if need_time ts = DateTime.parse(record[@dest_time_name]) record[field_name] = prefix + "." + ts.strftime("%Y.%m.%d") else record[field_name] = prefix end if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("record[#{field_name}] = #{record[field_name]}") end end break end end unless found if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("no match for tag #{tag}") end end end end
add_pipeline_metadata(tag, time, record)
click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 374 def add_pipeline_metadata (tag, time, record) record['pipeline_metadata'] = {} unless record.key?('pipeline_metadata') pipeline_type = @pipeline_type.to_s # this will catch the case where pipeline_type doesn't exist, or is not a Hash record['pipeline_metadata'][pipeline_type] = {} unless record['pipeline_metadata'][pipeline_type].respond_to?(:fetch) record['pipeline_metadata'][pipeline_type]['ipaddr4'] = @ipaddr4 if @ipaddr6 record['pipeline_metadata'][pipeline_type]['ipaddr6'] = @ipaddr6 end record['pipeline_metadata'][pipeline_type]['inputname'] = 'fluent-plugin-systemd' record['pipeline_metadata'][pipeline_type]['name'] = 'fluentd' record['pipeline_metadata'][pipeline_type]['received_at'] = Time.now.utc.to_datetime.rfc3339(6) record['pipeline_metadata'][pipeline_type]['version'] = @pipeline_version end
check_for_match_and_format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 350 def check_for_match_and_format(tag, time, record) return unless @formatters return if @formatter_cache_nomatch[tag] fmtr = @formatter_cache[tag] unless fmtr idx = @formatters.index{|fmtr| fmtr.matcher.match(tag)} if idx && (fmtr = @formatters[idx]).enabled @formatter_cache[tag] = fmtr else @formatter_cache_nomatch[tag] = true return end end fmtr.fmtr_func.call(tag, time, record, fmtr) if record[@dest_time_name].nil? && record['time'].nil? record['time'] = Time.at(time).utc.to_datetime.rfc3339(6) end if fmtr.fmtr_remove_keys fmtr.fmtr_remove_keys.each{|k| record.delete(k)} end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 164 def configure(conf) super @keep_fields = {} @default_keep_fields.each{|xx| @keep_fields[xx] = true} @extra_keep_fields.each{|xx| @keep_fields[xx] = true} @keep_empty_fields_hash = {} @keep_empty_fields.each do |xx| @keep_empty_fields_hash[xx] = true @keep_fields[xx] = true end if @use_undefined && @keep_fields.key?(@undefined_name) raise Fluent::ConfigError, "Do not put [#{@undefined_name}] in default_keep_fields or extra_keep_fields" end if (@rename_time || @rename_time_if_not_exist) && @use_undefined && !@keep_fields.key?(@src_time_name) raise Fluent::ConfigError, "Field [#{@src_time_name}] must be listed in default_keep_fields or extra_keep_fields" end @undefined_dot_replace_char = nil if @undefined_dot_replace_char == DOT_REPLACE_CHAR_UNUSED if @formatters @formatters.each do |fmtr| matcher = ViaqMatchClass.new(fmtr.tag, nil) fmtr.instance_eval{ @params[:matcher] = matcher } if fmtr.remove_keys fmtr.instance_eval{ @params[:fmtr_remove_keys] = fmtr.remove_keys.split(',') } else fmtr.instance_eval{ @params[:fmtr_remove_keys] = nil } end case fmtr.type when :sys_journal, :k8s_journal fmtr_func = method(:process_journal_fields) when :sys_var_log fmtr_func = method(:process_sys_var_log_fields) when :k8s_json_file fmtr_func = method(:process_k8s_json_file_fields) end fmtr.instance_eval{ @params[:fmtr_func] = fmtr_func } proc_k8s_ev = fmtr.process_kubernetes_events.nil? ? @process_kubernetes_events : fmtr.process_kubernetes_events fmtr.instance_eval{ @params[:process_kubernetes_events] = proc_k8s_ev } end @formatter_cache = {} @formatter_cache_nomatch = {} end begin @docker_hostname = File.open('/etc/docker-hostname') { |f| f.readline }.rstrip rescue @docker_hostname = ENV['NODE_NAME'] || nil end @ipaddr4 = ENV['IPADDR4'] || '127.0.0.1' @ipaddr6 = nil if ENV['IPADDR6'] && ENV['IPADDR6'].length > 0 @ipaddr6 = ENV['IPADDR6'] end @pipeline_version = (ENV['FLUENTD_VERSION'] || 'unknown fluentd version') + ' ' + (ENV['DATA_VERSION'] || 'unknown data version') # create the elasticsearch index name tag matchers unless @elasticsearch_index_names.empty? @elasticsearch_index_names.each do |ein| matcher = ViaqMatchClass.new(ein.tag, nil) ein.instance_eval{ @params[:matcher] = matcher } end end end
delempty(thing)
click to toggle source
recursively delete empty fields and empty lists/hashes from thing
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 242 def delempty(thing) if thing.respond_to?(:delete_if) if thing.kind_of? Hash thing.delete_if{|k,v| v.nil? || isempty(delempty(v)) || isempty(v)} else # assume single element iterable thing.delete_if{|elem| elem.nil? || isempty(delempty(elem)) || isempty(elem)} end end thing end
filter(tag, time, record)
click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 509 def filter(tag, time, record) if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("input #{time} #{tag} #{record}") end end check_for_match_and_format(tag, time, record) add_pipeline_metadata(tag, time, record) handle_undefined_fields(tag, time, record) # remove the field from record if it is not in the list of fields to keep and # it is empty record.delete_if{|k,v| !@keep_empty_fields_hash.key?(k) && (v.nil? || isempty(delempty(v)) || isempty(v))} # probably shouldn't remove everything . . . log.warn("Empty record! tag [#{tag}] time [#{time}]") if record.empty? # rename the time field if (@rename_time || @rename_time_if_missing) && record.key?(@src_time_name) val = record.delete(@src_time_name) unless @rename_time_if_missing && record.key?(@dest_time_name) record[@dest_time_name] = val end end if !@elasticsearch_index_names.empty? add_elasticsearch_index_name_field(tag, time, record) elsif ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("not adding elasticsearch index name or prefix") end end if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("output #{time} #{tag} #{record}") end end record end
handle_undefined_fields(tag, time, record)
click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 475 def handle_undefined_fields(tag, time, record) if @undefined_to_string || @use_undefined || @undefined_dot_replace_char || (@undefined_max_num_fields > NUM_FIELDS_UNLIMITED) # undefined contains all of the fields not in keep_fields undefined_keys = record.keys - @keep_fields.keys return if undefined_keys.empty? if @undefined_max_num_fields > NUM_FIELDS_UNLIMITED && undefined_keys.length > @undefined_max_num_fields undefined = {} undefined_keys.each{|k|undefined[k] = record.delete(k)} record[@undefined_name] = JSON.dump(undefined) else if @use_undefined record[@undefined_name] = {} modify_hsh = record[@undefined_name] else modify_hsh = record end undefined_keys.each do |k| origk = k if @use_undefined modify_hsh[k] = record.delete(k) end if @undefined_dot_replace_char && k.index('.') newk = k.gsub('.', @undefined_dot_replace_char) modify_hsh[newk] = modify_hsh.delete(k) k = newk end if @undefined_to_string && !modify_hsh[k].is_a?(String) modify_hsh[k] = JSON.dump(modify_hsh[k]) end end end end end
isempty(thing)
click to toggle source
if thing doesn't respond to empty? then assume it isn't empty e.g. 0.respond_to?(:empty?) == false - the FixNum 0 is not empty
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 237 def isempty(thing) thing.respond_to?(:empty?) && thing.empty? end
normalize_level(level, newlevel, priority=nil)
click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 293 def normalize_level(level, newlevel, priority=nil) # if the record already has a level field, and it looks like one of our well # known values, convert it to the canonical normalized form - otherwise, # preserve the value in string format retlevel = nil if !level.nil? unless (retlevel = NORMAL_LEVELS[level]) || (level.respond_to?(:downcase) && (retlevel = NORMAL_LEVELS[level.downcase])) retlevel = level.to_s # don't know what it is - just convert to string end elsif !priority.nil? retlevel = PRIORITY_LEVELS[priority] else retlevel = NORMAL_LEVELS[newlevel] end retlevel || 'unknown' end
process_k8s_json_file_fields(tag, time, record, fmtr = nil)
click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 328 def process_k8s_json_file_fields(tag, time, record, fmtr = nil) record['message'] = record['message'] || record['log'] record['level'] = normalize_level(record['level'], nil) if record.key?('kubernetes') && record['kubernetes'].respond_to?(:fetch) && \ (k8shost = record['kubernetes'].fetch('host', nil)) record['hostname'] = k8shost elsif @docker_hostname record['hostname'] = @docker_hostname end if record[@dest_time_name].nil? # e.g. already has @timestamp unless record['time'].nil? # convert from string - parses a wide variety of formats rectime = Time.parse(record['time']) else # convert from time_t rectime = Time.at(time) end record['time'] = rectime.utc.to_datetime.rfc3339(6) end transform_eventrouter(tag, record, fmtr) end
process_sys_var_log_fields(tag, time, record, fmtr = nil)
click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 311 def process_sys_var_log_fields(tag, time, record, fmtr = nil) record['systemd'] = {"t" => {"PID" => record['pid']}, "u" => {"SYSLOG_IDENTIFIER" => record['ident']}} if record[@dest_time_name].nil? # e.g. already has @timestamp # handle the case where the time reported in /var/log/messages is for a previous year timeobj = Time.at(time) if timeobj > Time.now timeobj = Time.new((timeobj.year - 1), timeobj.month, timeobj.day, timeobj.hour, timeobj.min, timeobj.sec, timeobj.utc_offset) end record['time'] = timeobj.utc.to_datetime.rfc3339(6) end if record['host'].eql?('localhost') && @docker_hostname record['hostname'] = @docker_hostname else record['hostname'] = record['host'] end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 231 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 227 def start super end
transform_eventrouter(tag, record, fmtr)
click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 459 def transform_eventrouter(tag, record, fmtr) return if fmtr.nil? || !fmtr.process_kubernetes_events if record.key?("event") && record["event"].respond_to?(:key?) if record.key?("verb") record["event"]["verb"] = record.delete("verb") end record["kubernetes"] = {} unless record.key?("kubernetes") record["kubernetes"]["event"] = record.delete("event") if record["kubernetes"]["event"].key?('message') ((record['pipeline_metadata'] ||= {})[@pipeline_type.to_s] ||= {})['original_raw_message'] = record['message'] end record['message'] = record["kubernetes"]["event"].delete("message") record['time'] = record["kubernetes"]["event"]["metadata"].delete("creationTimestamp") end end