# File lib/fluent/plugin/in_exec.rb, line 21 def initialize super require 'fluent/plugin/exec_util' require 'fluent/timezone' end
# File lib/fluent/plugin/in_exec.rb, line 48 def configure(conf) super if localtime = conf['localtime'] @localtime = true elsif utc = conf['utc'] @localtime = false end if conf['timezone'] @timezone = conf['timezone'] Fluent::Timezone.validate!(@timezone) end if !@tag && !@tag_key raise ConfigError, "'tag' or 'tag_key' option is required on exec input" end if @time_key if @time_format f = @time_format @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i } else @time_parse_proc = Proc.new {|str| str.to_i } end end case @format when :tsv if @keys.empty? raise ConfigError, "keys option is required on exec input for tsv format" end @parser = ExecUtil::TSVParser.new(@keys, method(:on_message)) when :json @parser = ExecUtil::JSONParser.new(method(:on_message)) when :msgpack @parser = ExecUtil::MessagePackParser.new(method(:on_message)) end end
# File lib/fluent/plugin/in_exec.rb, line 120 def run @parser.call(@io) end
# File lib/fluent/plugin/in_exec.rb, line 124 def run_periodic until @finished begin sleep @run_interval io = IO.popen(@command, "r") @parser.call(io) Process.waitpid(io.pid) rescue log.error "exec failed to run or shutdown child process", :error => $!.to_s, :error_class => $!.class.to_s log.warn_backtrace $!.backtrace end end end
# File lib/fluent/plugin/in_exec.rb, line 99 def shutdown if @run_interval @finished = true @thread.join else begin Process.kill(:TERM, @pid) rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM end if @thread.join(60) # TODO wait time return end begin Process.kill(:KILL, @pid) rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM end @thread.join end end
# File lib/fluent/plugin/in_exec.rb, line 88 def start if @run_interval @finished = false @thread = Thread.new(&method(:run_periodic)) else @io = IO.popen(@command, "r") @pid = @io.pid @thread = Thread.new(&method(:run)) end end
# File lib/fluent/plugin/in_exec.rb, line 140 def on_message(record) if val = record.delete(@tag_key) tag = val else tag = @tag end if val = record.delete(@time_key) time = @time_parse_proc.call(val) else time = Engine.now end router.emit(tag, time, record) rescue => e log.error "exec failed to emit", :error => e.to_s, :error_class => e.class.to_s, :tag => tag, :record => Yajl.dump(record) end