class Fluent::SensuOutput

Fluentd output plugin to send checks to sensu-client.

Constants

CHECK_NAME_PATTERN

Pattern for check names.

CRITICAL_PATTERN

Pattern for CRITICAL status.

OK_PATTERN

Pattern for OK status.

UNKNOWN_PATTERN

Pattern for UNKNOWN status.

WARNING_PATTERN

Pattern for WARNING status.

Public Class Methods

new() click to toggle source

Load modules.

Calls superclass method
# File lib/fluent/plugin/out_sensu.rb, line 95
def initialize
  super
  require 'json'
end

Private Class Methods

normalize_status(status_val) click to toggle source

Converts variations of status values to an integer.

# File lib/fluent/plugin/out_sensu.rb, line 284
def self.normalize_status(status_val)
  status_str = status_val.to_s
  return 0 if status_str =~ OK_PATTERN
  return 1 if status_str =~ WARNING_PATTERN
  return 2 if status_str =~ CRITICAL_PATTERN
  return 3 if status_str =~ UNKNOWN_PATTERN
  return nil
end

Public Instance Methods

configure(conf) click to toggle source

Read and validate the configuration.

Calls superclass method
# File lib/fluent/plugin/out_sensu.rb, line 102
def configure(conf)
  super
  reject_invalid_check_name
  reject_invalid_flapping_thresholds
end
format(tag, time, record) click to toggle source

Pack the tuple (tag, time, record).

# File lib/fluent/plugin/out_sensu.rb, line 145
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
to_json(data) click to toggle source

Make a json from the data.

# File lib/fluent/plugin/out_sensu.rb, line 199
def to_json(data)
  raw_json = data.to_json.force_encoding(Encoding::BINARY)
  regex = /[\xc0-\xdf][\x80-\xbf]
          |[\xe0-\xef][\x80-\xbf]{2}
          |[\xf0-\xf7][\x80-\xbf]{3}/nx
  return raw_json.gsub(regex) { |ch|
    ch.unpack('U*').pack('n*').unpack('H*')[0].gsub(/.+/, %q(\\\\u\&))
  }
end
write(chunk) click to toggle source

Send a check result for each data.

# File lib/fluent/plugin/out_sensu.rb, line 151
def write(chunk)
  chunk.msgpack_each { |(tag, time, record)|
    payload = make_check_result(tag, time, record)
    send_check(@server, @port, payload)
  }
end

Private Instance Methods

add_attribute_if_present(check_result, name, value) click to toggle source

Adds an attribute to the check result if present.

# File lib/fluent/plugin/out_sensu.rb, line 211
def add_attribute_if_present(check_result, name, value)
  check_result[name] = value if value
end
determine_check_name(tag, record) click to toggle source

Determines “name” attribute of a check.

# File lib/fluent/plugin/out_sensu.rb, line 217
def determine_check_name(tag, record)
  # Field specified by check_name_field option
  if @check_name_field
    check_name = record[@check_name_field]
    return check_name if check_name =~ CHECK_NAME_PATTERN
    log.warn('Invalid check name in the field.' +
             ' Fallback to check_name option, tag,' +
             ' or constant "fluent-plugin-sensu".',
             :tag => tag,
             :check_name_field => @check_name_field,
             :value => check_name)
    # Fall through
  end

  # check_name option
  return @check_name if @check_name

  # Tag
  return tag if tag =~ CHECK_NAME_PATTERN
  log.warn('Invalid check name in the tag.' +
           'Fallback to the constant "fluent-plugin-sensu".',
           :tag => tag)

  # Default value
  return 'fluent-plugin-sensu'
end
determine_executed_time(tag, time, record) click to toggle source

Determines “executed” attribute of a check.

# File lib/fluent/plugin/out_sensu.rb, line 310
def determine_executed_time(tag, time, record)
  # Read the field
  if @check_executed_field
    executed = record[@check_executed_field]
    return executed if executed.is_a?(Integer)
    log.warn('the field for "executed" attribute is absent',
            :tag => tag, :check_executed_field => @check_executed_field)
  end

  # Default to the time of Fluentd data
  return time
end
determine_output(tag, record) click to toggle source

Determines “output” attribute of a check.

# File lib/fluent/plugin/out_sensu.rb, line 246
def determine_output(tag, record)
  # Read from the field
  if @check_output_field
    check_output = record[@check_output_field]
    return check_output if check_output
    log.warn('the field for "output" attribute is absent',
            :tag => tag, :check_output_field => @check_output_field)
    # Fall through
  end

  # Returns the option value
  return @check_output if @check_output

  # Default to JSON notation of the record
  return record.to_json
end
determine_source(tag, record) click to toggle source

Determines “source” attribute of a check.

# File lib/fluent/plugin/out_sensu.rb, line 295
def determine_source(tag, record)
  # Read the field
  if @check_source_field
    source = record[@check_source_field]
    return source if source
    log.warn('the field for "source" attribute is absent',
            :tag => tag, :check_source_field => @check_source_field)
  end

  # Default to "check_source" option or N/A
  return @check_source
end
determine_status(tag, record) click to toggle source

Determines “status” attribute of a check.

# File lib/fluent/plugin/out_sensu.rb, line 265
def determine_status(tag, record)
  # Read from the field
  if @check_status_field
    status_field_val = record[@check_status_field]
    if status_field_val
      check_status = SensuOutput.normalize_status(status_field_val)
      return check_status if check_status
    end
    log.warn('the field for "status" attribute is invalid',
            :tag => tag, :check_status_field => @check_status_field,
            :value => status_field_val)
  end

  # Returns the default
  return @check_status
end
make_check_result(tag, time, record) click to toggle source

Make a check result for the Fluentd data.

# File lib/fluent/plugin/out_sensu.rb, line 160
def make_check_result(tag, time, record)
  check_result = {
    'name' => determine_check_name(tag, record),
    'output' => determine_output(tag, record),
    'status' => determine_status(tag, record),
    'type' => @check_type,
    'handlers' => @check_handlers,
    'executed' => determine_executed_time(tag, time, record),
    'fluentd' => {
      'tag' => tag,
      'time' => time.to_i,
      'record' => record,
    },
  }
  add_attribute_if_present(
    check_result, 'ttl', @check_ttl)
  add_attribute_if_present(
    check_result, 'low_flap_threshold', @check_low_flap_threshold)
  add_attribute_if_present(
    check_result, 'high_flap_threshold', @check_high_flap_threshold)
  add_attribute_if_present(
    check_result, 'source', determine_source(tag, record))
  return check_result
end
reject_invalid_check_name() click to toggle source

Reject check_name option if invalid

# File lib/fluent/plugin/out_sensu.rb, line 110
def reject_invalid_check_name
  if @check_name && @check_name !~ CHECK_NAME_PATTERN
    raise ConfigError,
      "check_name must be a string consisting of one or more" +
      " ASCII alphanumerics, underscores, periods, and hyphens"
  end
end
reject_invalid_flapping_thresholds() click to toggle source

Reject invalid check_low_flap_threshold and check_high_flap_threshold

# File lib/fluent/plugin/out_sensu.rb, line 120
def reject_invalid_flapping_thresholds
  # Thresholds must be specified together or not specified at all.
  only_one_threshold_is_specified =
    @check_low_flap_threshold.nil? ^ @check_high_flap_threshold.nil?
  if only_one_threshold_is_specified
    raise ConfigError,
      "'check_low_flap_threshold' and 'check_high_flap_threshold'" +
      " must be specified togher, or not specified at all."
  end

  # Check 0 <= check_low_flap_threshold <= check_high_flap_threshold <= 100
  if @check_low_flap_threshold
    in_order = 0 <= @check_low_flap_threshold &&
      @check_low_flap_threshold <= @check_high_flap_threshold &&
      @check_high_flap_threshold <= 100
    if not in_order
      raise ConfigError,
        "the following condition must be true:" +
        " 0 <= check_low_flap_threshold <= check_high_flap_threshold <= 100"
    end
  end
end
send_check(server, port, check_result) click to toggle source

Send a check result to sensu-client.

# File lib/fluent/plugin/out_sensu.rb, line 187
def send_check(server, port, check_result)
  json = to_json(check_result)
  sensu_client = TCPSocket.open(@server, @port)
  begin
    sensu_client.puts(json)
  ensure
    sensu_client.close
  end
end