class Fluent::ZabbixAgentInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_zabbix_agent.rb, line 28
def initialize
  super
  require 'socket'
  require 'zabbix_protocol'
  require 'json'
  require 'open-uri'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_zabbix_agent.rb, line 36
def configure(conf)
  super

  if @items.nil? and @items_file.nil?
    raise Fluent::ConfigError, 'One of "items" or "items_file" is required'
  elsif @items and @items_file
    raise Fluent::ConfigError, %!It isn't possible to specify both of items" and "items_file"!
  end

  if @items_file
    @items = {}

    if @items_file =~ %r{\A(https?|ftp)://}
      file = open(@items_file, &:read)
      json = JSON.load(file)
      @items.update(json) if json
    else
      Dir.glob(@items_file) do |path|
        file = File.read(path)
        json = JSON.load(file)
        @items.update(json) if json
      end
    end
  end

  if @items.empty? and not @allow_items_empty
    raise Fluent::ConfigError, '"items" or "items_file" is empty'
  end

  @items.keys.each do |key|
    value = @items[key]
    @items[key] = key if value.nil?
  end

  if @include_hostname
    @extra.update(@hostname_key => hostname)
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_zabbix_agent.rb, line 84
def shutdown
  @loop.watchers.each(&:detach)
  @loop.stop

  # XXX: Comment out for exit soon. Is it OK?
  #@thread.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_zabbix_agent.rb, line 75
def start
  super

  @loop = Coolio::Loop.new
  timer = TimerWatcher.new(@interval, true, log, &method(:fetch_items))
  @loop.attach(timer)
  @thread = Thread.new(&method(:run))
end

Private Instance Methods

emit_items(value_by_item) click to toggle source
# File lib/fluent/plugin/in_zabbix_agent.rb, line 137
def emit_items(value_by_item)
  if @bulk
    records = value_by_item.map do |key, value|
      {key => value}
    end

    bulk_record = records.inject({}) {|r, i| r.merge(i) }
    router.emit(@tag, Fluent::Engine.now, bulk_record.merge(extra))
  else
    records = value_by_item.map do |key, value|
      if key.is_a?(Hash)
        key.merge(@item_value_key => value)
      else
        {@item_key_key => key, @item_value_key => value}
      end
    end

    records.each do |rcrd|
      router.emit(@tag, Fluent::Engine.now, rcrd.merge(extra))
    end
  end
end
fetch_items() click to toggle source
# File lib/fluent/plugin/in_zabbix_agent.rb, line 101
def fetch_items
  value_by_item = {}

  @items.each do |key, record_key|
    begin
      value = zabbix_get(key)

      if value =~ /\AZBX_(NOTSUPPORTED|ERROR)\x00/
        log.warn("#{key}: #{value}")
      else
        value_by_item[record_key] = value
      end
    rescue => e
      log.warn("#{key}: #{e.message}")
      log.warn_backtrace(e.backtrace)
    end
  end

  unless value_by_item.empty?
    emit_items(value_by_item)
  end
end
hostname() click to toggle source
# File lib/fluent/plugin/in_zabbix_agent.rb, line 160
def hostname
  `hostname`.strip
end
run() click to toggle source
# File lib/fluent/plugin/in_zabbix_agent.rb, line 94
def run
  @loop.run
rescue => e
  log.error(e.message)
  log.error_backtrace(e.backtrace)
end
zabbix_get(key) click to toggle source
# File lib/fluent/plugin/in_zabbix_agent.rb, line 124
def zabbix_get(key)
  value = nil

  TCPSocket.open(@agent_host, @agent_port) do |sock|
    sock.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
    sock.write ZabbixProtocol.dump(key + "\n")
    sock.close_write
    value = ZabbixProtocol.load(sock.read)
  end

  value
end