class Fluent::MongoKpiOutput
Constants
- DAY
- HOUR
- MIN
- UNIT_DAYS
- UNIT_HOURS
- UNIT_MINUTES
Attributes
collections_opts[R]
connection_opts[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mongokpi.rb, line 41 def initialize super require 'date' require 'mongo' require 'msgpack' end
Public Instance Methods
configure(conf)
click to toggle source
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
Calls superclass method
# File lib/fluent/plugin/out_mongokpi.rb, line 51 def configure(conf) super @kpi_unit = get_kpi_unit(@kpi_unit) @connection_opts = {} @connection_opts[:w] = @write_concern unless @write_concern.nil? @connection_opts[:name] = @name unless @name.nil? @connection_opts[:read] = @read unless @read.nil? @connection_opts[:refresh_mode] = @refresh_mode unless @refresh_mode.nil? @connection_opts[:refresh_interval] = @refresh_interval unless @refresh_interval.nil? @collections_opts = {} if @capped_size > 0 @collections_opts[:capped] = true @collections_opts[:size] = @capped_size @collections_opts[:max] = @capped_max if @capped_max else @collections_opts[:capped] = false end end
convert_collection_name(collection_name, yyyymmdd)
click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 148 def convert_collection_name(collection_name, yyyymmdd) return collection_name.sub('yyyymmdd', yyyymmdd.to_s) end
count_up(kpi_type, kpi_unit, doc, record, count_name, time)
click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 224 def count_up(kpi_type, kpi_unit, doc, record, count_name, time) doc[count_name] += 1 if 'access' == kpi_type response_time = record.key?(@f_response_time) ? record[@f_response_time].to_i : 0 if response_time >= @response_threshold doc['countOver'] += 1 end case record[@f_code].to_i / 100 when 1 then doc['count1xx'] += 1 when 2 then doc['count2xx'] += 1 when 3 then doc['count3xx'] += 1 when 4 then doc['count4xx'] += 1 when 5 then doc['count5xx'] += 1 end if doc['responseTimeMax'] < response_time doc['responseTimeMax'] = response_time end if doc['responseTimeMin'] > response_time doc['responseTimeMin'] = response_time end doc['responseTimeSum'] += response_time doc['counter'][time.strftime('%S').to_i] += 1 if MIN == kpi_unit end return doc end
format(tag, time, record)
click to toggle source
This method is called when an event is reached. Convert event to a raw string.
def format(tag, time, record)
[tag, time, record].to_json + "\n"
end
optionally, you can use to_msgpack to serialize the object.
def format(tag, time, record)
[tag, time, record].to_msgpack
end
# File lib/fluent/plugin/out_mongokpi.rb, line 101 def format(tag, time, record) [tag, time, record].to_msgpack end
get_client(address, connection_opts)
click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 122 def get_client(address, connection_opts) begin if address.include?(',') return Mongo::MongoReplicaSetClient.new(address.split(','), connection_opts) else host_port = address.split(':', 2) return Mongo::MongoClient.new(host_port[0], host_port[1], collections_opts) end rescue Mongo::ConnectionFailure => e $log.fatal "Failed to connect to 'mongod'. Please restart 'fluentd' after 'mongod' started: #{e}" exit! rescue Mongo::OperationFailure => e $log.fatal "Operation failed. Probably, 'mongod' needs an authentication: #{e}" exit! end end
get_collection(collection_name, yyyymmdd)
click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 139 def get_collection(collection_name, yyyymmdd) converted_collection_name = convert_collection_name(collection_name, yyyymmdd) if @current_collection.nil? || @current_collection.name != converted_collection_name $log.info "Start using collection: #{converted_collection_name}" @current_collection = get_collection_from_db(@client, @db, converted_collection_name, @collections_opts) end return @current_collection end
get_collection_from_db(client, db_name, collection_name, collections_opts)
click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 152 def get_collection_from_db(client, db_name, collection_name, collections_opts) return client.db(db_name).collection(collection_name, @collections_opts) end
get_doc(kpi_type, kpi_unit, count_key, count_key_value, count_name, time)
click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 189 def get_doc(kpi_type, kpi_unit, count_key, count_key_value, count_name, time) doc = {} doc[count_key] = count_key_value time_field_hash = {} time_field_hash['yyyymmdd'] = time.strftime('%Y%m%d').to_i if HOUR == kpi_unit time_field_hash['hh'] = time.strftime('%H').to_i elsif MIN == kpi_unit time_field_hash['hh'] = time.strftime('%H').to_i time_field_hash['mm'] = time.strftime('%M').to_i end doc['time_field_hash'] = time_field_hash doc[count_name] = 0 if 'access' == kpi_type doc['countOver'] = 0 doc['count1xx'] = 0 doc['count2xx'] = 0 doc['count3xx'] = 0 doc['count4xx'] = 0 doc['count5xx'] = 0 doc['responseTimeAve'] = 0 doc['responseTimeMax'] = 0 doc['responseTimeMin'] = 100000000 doc['responseTimeSum'] = 0 doc['okRatio'] = 0.0000 if MIN == kpi_unit doc['qpsAve'] = 0 doc['qpsMax'] = 0 doc['qpsMin'] = 100000000 doc['counter'] = Array.new(60, 0) end end return doc end
get_insert_doc_hash(kpi_type, kpi_unit, time_key, time_format, count_key, count_name, chunk)
click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 156 def get_insert_doc_hash(kpi_type, kpi_unit, time_key, time_format, count_key, count_name, chunk) hash_counter = {} chunk.msgpack_each { |tag, time, record| $log.debug record tmp_time = time_key.nil? ? Time.at(time) : time_format.nil? ? DateTime.parse(record[time_key]) : DateTime.strptime(record[time_key], time_format) # with count_key if 'none' != count_key count_key_value = '' count_key.split(',').each { |x| count_key_value += record[x].to_s } key_str = count_key_value + get_time_key_value(kpi_unit, tmp_time) doc = hash_counter.key?(key_str) ? hash_counter[key_str] : get_doc(kpi_type, kpi_unit, count_key, count_key_value, count_name, tmp_time) hash_counter[key_str] = count_up(kpi_type, kpi_unit, doc, record, count_name, tmp_time) end # total total_key_str = 'total' + get_time_key_value(kpi_unit, tmp_time) total = hash_counter.key?(total_key_str) ? hash_counter[total_key_str] : get_doc(kpi_type, kpi_unit, count_key, 'total', count_name, tmp_time) hash_counter[total_key_str] = count_up(kpi_type, kpi_unit, total, record, count_name, tmp_time) } return hash_counter end
get_kpi_unit(kpi_unit)
click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 70 def get_kpi_unit(kpi_unit) case kpi_unit when *UNIT_DAYS then return DAY when *UNIT_HOURS then return HOUR else return MIN end end
get_time_key_value(kpi_unit, time)
click to toggle source
# File lib/fluent/plugin/out_mongokpi.rb, line 181 def get_time_key_value(kpi_unit, time) case kpi_unit when DAY then return time.strftime('%Y%m%d') when HOUR then return time.strftime('%Y%m%d%H') else return time.strftime('%Y%m%d%H%M') end end
insert(kpi_type, kpi_unit, collection_name, count_key, count_name, doc_hash)
click to toggle source
2.5 or less stackoverflow.com/questions/8508663/calculate-max-value-in-an-atomic-findandmodify-operation TODO improve for Mongo 2.6 $min, $max field update operators jira.mongodb.org/browse/SERVER-1534 jira.mongodb.org/browse/DOCS-2012
# File lib/fluent/plugin/out_mongokpi.rb, line 256 def insert(kpi_type, kpi_unit, collection_name, count_key, count_name, doc_hash) begin doc_hash.each { |key, doc| $log.debug "key: #{key}, doc: #{doc}" collection = get_collection(collection_name, doc['time_field_hash']['yyyymmdd']) select_hash = doc['time_field_hash'] select_hash['_id'] = key select_hash[count_key] = doc[count_key] if 'access' == kpi_type # initialize the target doc # Without this, "0" in "counter.0' can be regarded as an child element, not as an array element. if MIN == kpi_unit collection.update( select_hash, {'$setOnInsert' => {'counter' => Array.new(60, 0)}}, {:upsert => true} ) end # main update increment_hash = {} increment_hash[count_name] = doc[count_name] increment_hash['countOver'] = doc['countOver'] increment_hash['count1xx'] = doc['count1xx'] increment_hash['count2xx'] = doc['count2xx'] increment_hash['count3xx'] = doc['count3xx'] increment_hash['count4xx'] = doc['count4xx'] increment_hash['count5xx'] = doc['count5xx'] if MIN == kpi_unit for sec in 0..59 increment_hash['counter.' + sec.to_s] = doc['counter'][sec] end end increment_hash['responseTimeSum'] = doc['responseTimeSum'] collection.update( select_hash, {'$inc' => increment_hash}, {:upsert => true} ) # add supplemental fields using existing data # NOTICE: this operation is not atomic, then the field value is not reliable in highly distributed processing. updated_result = collection.find({'_id' => key}) if updated_result.nil? $log.info "there is no updated result for the key: #{key}" if updated_result.nil? continue end updated_doc = updated_result.to_a[0] set_hash = {} set_hash['responseTimeAve'] = updated_doc['responseTimeSum'] / updated_doc[count_name] if !updated_doc['responseTimeMax'].nil? && updated_doc['responseTimeMax'] > doc['responseTimeMax'] set_hash['responseTimeMax'] = updated_doc['responseTimeMax'] else set_hash['responseTimeMax'] = doc['responseTimeMax'] end if !updated_doc['responseTimeMin'].nil? && updated_doc['responseTimeMin'] < doc['responseTimeMin'] set_hash['responseTimeMin'] = updated_doc['responseTimeMin'] else set_hash['responseTimeMin'] = doc['responseTimeMin'] end set_hash['okRatio'] = ((updated_doc[count_name] - updated_doc['countOver']).to_f / updated_doc[count_name]).round(4) if MIN == kpi_unit set_hash['qpsAve'] = (updated_doc['counter'].inject(0.0){|r,i| r+=i } / updated_doc['counter'].size).round set_hash['qpsMax'] = updated_doc['counter'].max set_hash['qpsMin'] = updated_doc['counter'].min end collection.update( select_hash, { '$set' => set_hash} ) else collection.update( select_hash, {'$inc' => {count_name => doc[count_name]}}, {:upsert => true} ) end } rescue Mongo::OperationFailure => e raise e end end
shutdown()
click to toggle source
This method is called when shutting down. Shutdown the thread and close sockets or files here.
Calls superclass method
# File lib/fluent/plugin/out_mongokpi.rb, line 87 def shutdown @client.db.connection.close super end
start()
click to toggle source
This method is called when starting. Open sockets or files here.
Calls superclass method
# File lib/fluent/plugin/out_mongokpi.rb, line 80 def start super @client = get_client(@address, @connection_opts) end
write(chunk)
click to toggle source
This method is called every flush interval. write the buffer chunk to files or databases here. 'chunk' is a buffer chunk that includes multiple formatted events. You can use 'data = chunk.read' to get all events and 'chunk.open {|io| ... }' to get IO object.
def write(chunk)
data = chunk.read print data
end
optionally, you can use chunk.msgpack_each to deserialize objects.
def write(chunk)
chunk.msgpack_each {|(tag,time,record)| }
end
# File lib/fluent/plugin/out_mongokpi.rb, line 117 def write(chunk) doc_hash = get_insert_doc_hash(@kpi_type, @kpi_unit, @time_key, @time_format, @count_key, @count_name, chunk) insert(@kpi_type, @kpi_unit, @collection, @count_key, @count_name, doc_hash) end