class Fluent::MongoKpiOutput

Constants

DAY
HOUR
MIN
UNIT_DAYS
UNIT_HOURS
UNIT_MINUTES

Attributes

collections_opts[R]
connection_opts[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mongokpi.rb, line 41
def initialize
  super
  require 'date'
  require 'mongo'
  require 'msgpack'
end

Public Instance Methods

configure(conf) click to toggle source

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.

Calls superclass method
# File lib/fluent/plugin/out_mongokpi.rb, line 51
def configure(conf)
  super
  @kpi_unit = get_kpi_unit(@kpi_unit)
  @connection_opts = {}
  @connection_opts[:w] = @write_concern unless @write_concern.nil?
  @connection_opts[:name] = @name unless @name.nil?
  @connection_opts[:read] = @read unless @read.nil?
  @connection_opts[:refresh_mode] = @refresh_mode unless @refresh_mode.nil?
  @connection_opts[:refresh_interval] = @refresh_interval unless @refresh_interval.nil?
  @collections_opts = {}
  if @capped_size > 0
    @collections_opts[:capped] = true
    @collections_opts[:size] = @capped_size
    @collections_opts[:max] = @capped_max if @capped_max
  else
    @collections_opts[:capped] = false
  end
end
convert_collection_name(collection_name, yyyymmdd) click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 148
def convert_collection_name(collection_name, yyyymmdd) 
  return collection_name.sub('yyyymmdd', yyyymmdd.to_s)
end
count_up(kpi_type, kpi_unit, doc, record, count_name, time) click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 224
def count_up(kpi_type, kpi_unit, doc, record, count_name, time)
  doc[count_name] += 1
  if 'access' == kpi_type
    response_time = record.key?(@f_response_time) ? record[@f_response_time].to_i : 0
    if response_time >= @response_threshold
      doc['countOver'] += 1
    end
    case record[@f_code].to_i / 100
      when 1 then doc['count1xx'] += 1
      when 2 then doc['count2xx'] += 1
      when 3 then doc['count3xx'] += 1
      when 4 then doc['count4xx'] += 1
      when 5 then doc['count5xx'] += 1
    end
    if doc['responseTimeMax'] < response_time
      doc['responseTimeMax'] = response_time
    end
    if doc['responseTimeMin'] > response_time
      doc['responseTimeMin'] = response_time
    end
    doc['responseTimeSum'] += response_time
    doc['counter'][time.strftime('%S').to_i] += 1 if MIN == kpi_unit
  end
  return doc
end
format(tag, time, record) click to toggle source
This method is called when an event is reached.
Convert event to a raw string.

def format(tag, time, record)

[tag, time, record].to_json + "\n"

end

optionally, you can use to_msgpack to serialize the object.

def format(tag, time, record)

[tag, time, record].to_msgpack

end

# File lib/fluent/plugin/out_mongokpi.rb, line 101
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
get_client(address, connection_opts) click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 122
def get_client(address, connection_opts)
  begin
    if address.include?(',')
      return Mongo::MongoReplicaSetClient.new(address.split(','), connection_opts)
    else
      host_port = address.split(':', 2)
      return Mongo::MongoClient.new(host_port[0], host_port[1], collections_opts)
    end
  rescue Mongo::ConnectionFailure => e
    $log.fatal "Failed to connect to 'mongod'. Please restart 'fluentd' after 'mongod' started: #{e}"
    exit!
  rescue Mongo::OperationFailure => e
    $log.fatal "Operation failed. Probably, 'mongod' needs an authentication: #{e}"
    exit!
  end
end
get_collection(collection_name, yyyymmdd) click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 139
def get_collection(collection_name, yyyymmdd)
  converted_collection_name = convert_collection_name(collection_name, yyyymmdd)
  if @current_collection.nil? || @current_collection.name != converted_collection_name
    $log.info "Start using collection: #{converted_collection_name}"
    @current_collection = get_collection_from_db(@client, @db, converted_collection_name, @collections_opts)
  end
  return @current_collection
end
get_collection_from_db(client, db_name, collection_name, collections_opts) click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 152
def get_collection_from_db(client, db_name, collection_name, collections_opts)
  return client.db(db_name).collection(collection_name, @collections_opts)
end
get_doc(kpi_type, kpi_unit, count_key, count_key_value, count_name, time) click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 189
def get_doc(kpi_type, kpi_unit, count_key, count_key_value, count_name, time)
  doc = {}
  doc[count_key] = count_key_value
  time_field_hash = {}
  time_field_hash['yyyymmdd'] = time.strftime('%Y%m%d').to_i
  if HOUR == kpi_unit
    time_field_hash['hh'] = time.strftime('%H').to_i
  elsif MIN == kpi_unit
    time_field_hash['hh'] = time.strftime('%H').to_i
    time_field_hash['mm'] = time.strftime('%M').to_i
  end
  doc['time_field_hash'] = time_field_hash
  doc[count_name] = 0
  if 'access' == kpi_type
    doc['countOver'] = 0
    doc['count1xx'] = 0
    doc['count2xx'] = 0
    doc['count3xx'] = 0
    doc['count4xx'] = 0
    doc['count5xx'] = 0
    doc['responseTimeAve'] = 0
    doc['responseTimeMax'] = 0
    doc['responseTimeMin'] = 100000000
    doc['responseTimeSum'] = 0
    doc['okRatio'] = 0.0000
    if MIN == kpi_unit
      doc['qpsAve'] = 0
      doc['qpsMax'] = 0
      doc['qpsMin'] = 100000000
      doc['counter'] = Array.new(60, 0)
    end
  end
  return doc
end
get_insert_doc_hash(kpi_type, kpi_unit, time_key, time_format, count_key, count_name, chunk) click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 156
def get_insert_doc_hash(kpi_type, kpi_unit, time_key, time_format, count_key, count_name, chunk)
  hash_counter = {}
  chunk.msgpack_each { |tag, time, record|
    $log.debug record
    tmp_time = time_key.nil? ? Time.at(time)
                             : time_format.nil? ? DateTime.parse(record[time_key])
                                                : DateTime.strptime(record[time_key], time_format)
    # with count_key
    if 'none' != count_key
      count_key_value = ''
      count_key.split(',').each { |x| count_key_value += record[x].to_s }
      key_str = count_key_value + get_time_key_value(kpi_unit, tmp_time)
      doc = hash_counter.key?(key_str) ? hash_counter[key_str]
              : get_doc(kpi_type, kpi_unit, count_key, count_key_value, count_name, tmp_time)
      hash_counter[key_str] = count_up(kpi_type, kpi_unit, doc, record, count_name, tmp_time)
    end
    # total
    total_key_str = 'total' + get_time_key_value(kpi_unit, tmp_time)
    total = hash_counter.key?(total_key_str) ? hash_counter[total_key_str]
              : get_doc(kpi_type, kpi_unit, count_key, 'total', count_name, tmp_time)
    hash_counter[total_key_str] = count_up(kpi_type, kpi_unit, total, record, count_name, tmp_time)
  }
  return hash_counter
end
get_kpi_unit(kpi_unit) click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 70
def get_kpi_unit(kpi_unit)
  case kpi_unit
    when *UNIT_DAYS  then return DAY
    when *UNIT_HOURS then return HOUR
    else                  return MIN
  end
end
get_time_key_value(kpi_unit, time) click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 181
def get_time_key_value(kpi_unit, time)
  case kpi_unit
    when DAY  then return time.strftime('%Y%m%d')
    when HOUR then return time.strftime('%Y%m%d%H')
    else           return time.strftime('%Y%m%d%H%M')
  end
end
insert(kpi_type, kpi_unit, collection_name, count_key, count_name, doc_hash) click to toggle source

2.5 or less stackoverflow.com/questions/8508663/calculate-max-value-in-an-atomic-findandmodify-operation TODO improve for Mongo 2.6 $min, $max field update operators jira.mongodb.org/browse/SERVER-1534 jira.mongodb.org/browse/DOCS-2012

# File lib/fluent/plugin/out_mongokpi.rb, line 256
def insert(kpi_type, kpi_unit, collection_name, count_key, count_name, doc_hash)
  begin
    doc_hash.each { |key, doc|
      $log.debug "key: #{key}, doc: #{doc}"
      collection = get_collection(collection_name, doc['time_field_hash']['yyyymmdd'])
      select_hash = doc['time_field_hash']
      select_hash['_id'] = key
      select_hash[count_key] = doc[count_key]
      if 'access' == kpi_type
        # initialize the target doc
        # Without this, "0" in "counter.0' can be regarded as an child element, not as an array element.
        if MIN == kpi_unit
          collection.update(
            select_hash,
            {'$setOnInsert' => {'counter' => Array.new(60, 0)}},
            {:upsert => true}
          )
        end
        # main update
        increment_hash = {}
        increment_hash[count_name] = doc[count_name]
        increment_hash['countOver'] = doc['countOver']
        increment_hash['count1xx'] = doc['count1xx']
        increment_hash['count2xx'] = doc['count2xx']
        increment_hash['count3xx'] = doc['count3xx']
        increment_hash['count4xx'] = doc['count4xx']
        increment_hash['count5xx'] = doc['count5xx']
        if MIN == kpi_unit
          for sec in 0..59
            increment_hash['counter.' + sec.to_s] = doc['counter'][sec]
          end            
        end
        increment_hash['responseTimeSum'] = doc['responseTimeSum']
        collection.update(
          select_hash,
          {'$inc' => increment_hash},
          {:upsert => true}
        )
        # add supplemental fields using existing data
        # NOTICE: this operation is not atomic, then the field value is not reliable in highly distributed processing.
        updated_result = collection.find({'_id' => key})
        if updated_result.nil?
          $log.info "there is no updated result for the key: #{key}" if updated_result.nil?
          continue
        end
        updated_doc = updated_result.to_a[0]
        set_hash = {}
        set_hash['responseTimeAve'] = updated_doc['responseTimeSum'] / updated_doc[count_name]
        if !updated_doc['responseTimeMax'].nil? && updated_doc['responseTimeMax'] > doc['responseTimeMax']
          set_hash['responseTimeMax'] = updated_doc['responseTimeMax']
        else
          set_hash['responseTimeMax'] = doc['responseTimeMax']
        end
        if !updated_doc['responseTimeMin'].nil? && updated_doc['responseTimeMin'] < doc['responseTimeMin']
          set_hash['responseTimeMin'] = updated_doc['responseTimeMin']
        else
          set_hash['responseTimeMin'] = doc['responseTimeMin']
        end
        set_hash['okRatio'] = ((updated_doc[count_name] - updated_doc['countOver']).to_f / updated_doc[count_name]).round(4)
        if MIN == kpi_unit
          set_hash['qpsAve'] = (updated_doc['counter'].inject(0.0){|r,i| r+=i } / updated_doc['counter'].size).round
          set_hash['qpsMax'] = updated_doc['counter'].max
          set_hash['qpsMin'] = updated_doc['counter'].min
        end
        collection.update(
          select_hash,
          { '$set' => set_hash}
        )
      else
        collection.update(
          select_hash,
          {'$inc' => {count_name => doc[count_name]}},
          {:upsert => true}
        )
      end
    }
  rescue Mongo::OperationFailure => e
    raise e
  end
end
shutdown() click to toggle source

This method is called when shutting down. Shutdown the thread and close sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_mongokpi.rb, line 87
def shutdown
  @client.db.connection.close
  super
end
start() click to toggle source

This method is called when starting. Open sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_mongokpi.rb, line 80
def start
  super
  @client = get_client(@address, @connection_opts)
end
write(chunk) click to toggle source
This method is called every flush interval. write the buffer chunk to files or databases here.
'chunk' is a buffer chunk that includes multiple formatted events.
You can use 'data = chunk.read' to get all events and 'chunk.open {|io| ... }' to get IO object.

def write(chunk)

data = chunk.read
print data

end

optionally, you can use chunk.msgpack_each to deserialize objects.

def write(chunk)

chunk.msgpack_each {|(tag,time,record)|
}

end

# File lib/fluent/plugin/out_mongokpi.rb, line 117
def write(chunk)
  doc_hash = get_insert_doc_hash(@kpi_type, @kpi_unit, @time_key, @time_format, @count_key, @count_name, chunk)
  insert(@kpi_type, @kpi_unit, @collection, @count_key, @count_name, doc_hash)
end