class Fluent::CephInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_ceph.rb, line 10 def initialize super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_ceph.rb, line 22 def configure(conf) super @commands = Hash.new if @arguments.include? "," @arguments.split(",").each do |argument| @commands[argument.strip] = "#{@ceph_path} #{argument.strip} --format json" end else @commands[@arguments.strip] = "#{@ceph_path} #{@arguments.strip} --format json" end @hostname = `#{@hostname_command}`.chomp! end
emit_message()
click to toggle source
# File lib/fluent/plugin/in_ceph.rb, line 35 def emit_message @output = Hash.new @pids = Array.new @commands.each do |argument, command| io = IO.popen(command, "r") pid = io.pid json = io.read.strip @output[argument] = json && json.length >= 2 ? JSON.parse(json) : nil Process.detach(pid) Process.kill(:TERM, pid) @pids.push pid end @output['eventtime'] = DateTime.parse(Time.now.to_s).strftime("%d/%m/%Y %H:%M:%S") router.emit(@tag.dup, Engine.now, @output) end
restart()
click to toggle source
# File lib/fluent/plugin/in_ceph.rb, line 77 def restart @pids.each { |pid| Process.detach(@pid) Process.kill(:TERM, @pid) } @tw.detach @tw = TimerWatcher.new(@delay, true, &method(:emit_message)) @tw.attach(@loop) end
run()
click to toggle source
# File lib/fluent/plugin/in_ceph.rb, line 68 def run begin @loop.run rescue $log.error "unexpected error", :error=>$!.to_s $log.error_backtrace end end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_ceph.rb, line 58 def shutdown @pids.each { |pid| Process.detach(pid) Process.kill(:TERM, pid) } @tw.detach @loop.stop @thread.join end
start()
click to toggle source
# File lib/fluent/plugin/in_ceph.rb, line 51 def start @loop = Coolio::Loop.new @tw = TimerWatcher.new(@granularity, true, &method(:emit_message)) @tw.attach(@loop) @thread = Thread.new(&method(:run)) end