class Fluent::Plugin::ExecFilterOutput
Constants
- COMPAT_EXTRACT_PARAMS
- COMPAT_FORMAT_PARAMS
- COMPAT_INJECT_PARAMS
- COMPAT_PARSE_PARAMS
- ExecutedProcess
- KEYS_FOR_IN_AND_OUT
- NEWLINE
Attributes
formatter[R]
parser[R]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_exec_filter.rb, line 130 def configure(conf) exec_filter_compat_parameters_convert!(conf) compat_parameters_convert(conf, :buffer) if inject_section = conf.elements('inject').first if inject_section.has_key?('time_format') inject_section['time_type'] ||= 'string' end end if extract_section = conf.elements('extract').first if extract_section.has_key?('time_format') extract_section['time_type'] ||= 'string' end end super if !@tag && (!@extract_config || !@extract_config.tag_key) raise Fluent::ConfigError, "'tag' or '<extract> tag_key </extract>' option is required on exec_filter output" end @formatter = formatter_create @parser = parser_create if @remove_prefix @removed_prefix_string = @remove_prefix + '.' @removed_length = @removed_prefix_string.length end if @add_prefix @added_prefix_string = @add_prefix + '.' end @respawns = if @child_respawn.nil? || (@child_respawn == 'none') || (@child_respawn == '0') 0 elsif (@child_respawn == 'inf') || (@child_respawn == '-1') -1 elsif /^\d+$/.match?(@child_respawn) @child_respawn.to_i else raise ConfigError, "child_respawn option argument invalid: none(or 0), inf(or -1) or positive number" end @suppress_error_log_interval ||= 0 @next_log_time = Time.now.to_i end
exec_filter_compat_parameters_convert!(conf)
click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 116 def exec_filter_compat_parameters_convert!(conf) KEYS_FOR_IN_AND_OUT.each_pair do |inout, keys| if conf.has_key?(inout) keys.each do |k| conf[k] = conf[inout] end end end exec_filter_compat_parameters_copy_to_subsection!(conf, 'inject', COMPAT_INJECT_PARAMS) exec_filter_compat_parameters_copy_to_subsection!(conf, 'format', COMPAT_FORMAT_PARAMS) exec_filter_compat_parameters_copy_to_subsection!(conf, 'parse', COMPAT_PARSE_PARAMS) exec_filter_compat_parameters_copy_to_subsection!(conf, 'extract', COMPAT_EXTRACT_PARAMS) end
exec_filter_compat_parameters_copy_to_subsection!(conf, subsection_name, params)
click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 106 def exec_filter_compat_parameters_copy_to_subsection!(conf, subsection_name, params) return unless conf.elements(subsection_name).empty? return unless params.keys.any?{|k| conf.has_key?(k) } hash = {} params.each_pair do |compat, current| hash[current] = conf[compat] if conf.has_key?(compat) end conf.elements << Fluent::Config::Element.new(subsection_name, '', hash, []) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 263 def format(tag, time, record) tag = tag_remove_prefix(tag) record = inject_values_to_record(tag, time, record) if @formatter.formatter_type == :text_per_line @formatter.format(tag, time, record).chomp + NEWLINE else @formatter.format(tag, time, record) end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 176 def multi_workers_ready? true end
on_record(time, record)
click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 304 def on_record(time, record) tag = extract_tag_from_record(record) tag = @added_prefix_string + tag if tag && @add_prefix tag ||= @tag time ||= extract_time_from_record(record) || Fluent::EventTime.now router.emit(tag, time, record) rescue => e if @suppress_error_log_interval == 0 || Time.now.to_i > @next_log_time log.error "exec_filter failed to emit", record: Yajl.dump(record), error: e log.error_backtrace e.backtrace @next_log_time = Time.now.to_i + @suppress_error_log_interval end router.emit_error_event(tag, time, record, e) if tag && time && record end
run(io)
click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 286 def run(io) io.set_encoding(Encoding::ASCII_8BIT) case when @parser.implement?(:parse_io) @parser.parse_io(io, &method(:on_record)) when @parser.implement?(:parse_partial_data) until io.eof? @parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record)) end when @parser.parser_type == :text_per_line io.each_line do |line| @parser.parse(line.chomp, &method(:on_record)) end else @parser.parse(io.read, &method(:on_record)) end end
start()
click to toggle source
Calls superclass method
Fluent::Compat::Output#start
# File lib/fluent/plugin/out_exec_filter.rb, line 182 def start super @children_mutex = Mutex.new @children = [] @rr = 0 exit_callback = ->(status){ c = @children.find{|child| child.pid == status.pid } if c unless self.stopped? log.warn "child process exits with error code", code: status.to_i, status: status.exitstatus, signal: status.termsig end c.mutex.synchronize do (c.writeio && c.writeio.close) rescue nil (c.readio && c.readio.close) rescue nil c.pid = c.readio = c.writeio = nil end end } child_process_callback = ->(index, readio, writeio){ pid = child_process_id c = @children[index] writeio.sync = true c.mutex.synchronize do c.pid = pid c.respawns = @respawns c.readio = readio c.writeio = writeio end run(readio) } execute_child_process = ->(index){ child_process_execute("out_exec_filter_child#{index}".to_sym, @command, on_exit_callback: exit_callback) do |readio, writeio| child_process_callback.call(index, readio, writeio) end } @children_mutex.synchronize do @num_children.times do |i| @children << ExecutedProcess.new(Mutex.new, nil, 0, nil, nil) execute_child_process.call(i) end end if @respawns != 0 thread_create(:out_exec_filter_respawn_monitor) do while thread_current_running? @children.each_with_index do |c, i| if c.mutex && c.mutex.synchronize{ c.pid.nil? && c.respawns != 0 } respawns = c.mutex.synchronize do c.respawns -= 1 if c.respawns > 0 c.respawns end log.info "respawning child process", num: i, respawn_counter: respawns execute_child_process.call(i) end end sleep 0.2 end end end end
tag_remove_prefix(tag)
click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 252 def tag_remove_prefix(tag) if @remove_prefix if ((tag[0, @removed_length] == @removed_prefix_string) && (tag.length > @removed_length)) || (tag == @removed_prefix_string) tag = tag[@removed_length..-1] || '' end end tag end
terminate()
click to toggle source
Calls superclass method
Fluent::Plugin::Output#terminate
# File lib/fluent/plugin/out_exec_filter.rb, line 247 def terminate @children = [] super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 273 def write(chunk) try_times = 0 while true r = @rr = (@rr + 1) % @children.length if @children[r].pid && writeio = @children[r].writeio chunk.write_to(writeio) break end try_times += 1 raise "no healthy child processes exist" if try_times >= @children.length end end