class Fluent::CephInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_ceph.rb, line 10
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_ceph.rb, line 22
def configure(conf)
  super
  @commands = Hash.new
  if @arguments.include? ","
      @arguments.split(",").each do |argument|
          @commands[argument.strip] = "#{@ceph_path} #{argument.strip} --format json"
      end
  else
      @commands[@arguments.strip] = "#{@ceph_path} #{@arguments.strip} --format json"
  end
  @hostname = `#{@hostname_command}`.chomp!
end
emit_message() click to toggle source
# File lib/fluent/plugin/in_ceph.rb, line 35
def emit_message
  @output = Hash.new
  @pids = Array.new
  @commands.each do |argument, command|
      io = IO.popen(command, "r")
      pid = io.pid
      json = io.read.strip
      @output[argument] = json && json.length >= 2 ? JSON.parse(json) : nil
      Process.detach(pid)
      Process.kill(:TERM, pid)
      @pids.push pid
  end
  @output['eventtime'] = DateTime.parse(Time.now.to_s).strftime("%d/%m/%Y %H:%M:%S")
  router.emit(@tag.dup, Engine.now, @output)
end
restart() click to toggle source
# File lib/fluent/plugin/in_ceph.rb, line 77
def restart
  @pids.each { |pid|
      Process.detach(@pid)
      Process.kill(:TERM, @pid)
  }
  @tw.detach
  @tw = TimerWatcher.new(@delay, true,  &method(:emit_message))
  @tw.attach(@loop)
end
run() click to toggle source
# File lib/fluent/plugin/in_ceph.rb, line 68
def run
  begin
    @loop.run
  rescue
    $log.error "unexpected error", :error=>$!.to_s
    $log.error_backtrace
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_ceph.rb, line 58
def shutdown
  @pids.each { |pid|
      Process.detach(pid) 
      Process.kill(:TERM, pid)
  }
  @tw.detach
  @loop.stop
  @thread.join
end
start() click to toggle source
# File lib/fluent/plugin/in_ceph.rb, line 51
def start
  @loop = Coolio::Loop.new
  @tw = TimerWatcher.new(@granularity, true,  &method(:emit_message))
  @tw.attach(@loop)
  @thread = Thread.new(&method(:run))
end