class LogStash::Inputs::MongoDB
Constants
- SINCE_TABLE
Public Instance Methods
close()
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 526 def close # If needed, use this to tidy up on shutdown @logger.debug("Shutting down...") end
flatten(my_hash)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 229 def flatten(my_hash) new_hash = {} @logger.debug("Raw Hash: #{my_hash}") if my_hash.respond_to? :each my_hash.each do |k1,v1| if v1.is_a?(Hash) v1.each do |k2,v2| if v2.is_a?(Hash) # puts "Found a nested hash" result = flatten(v2) result.each do |k3,v3| new_hash[k1.to_s+"_"+k2.to_s+"_"+k3.to_s] = v3 end # puts "result: "+result.to_s+" k2: "+k2.to_s+" v2: "+v2.to_s else new_hash[k1.to_s+"_"+k2.to_s] = v2 end end else # puts "Key: "+k1.to_s+" is not a hash" new_hash[k1.to_s] = v1 end end else @logger.debug("Flatten [ERROR]: hash did not respond to :each") end @logger.debug("Flattened Hash: #{new_hash}") return new_hash end
get_all_tables(mongodb)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 140 def get_all_tables(mongodb) return @mongodb.collection_names end
get_collection_names(mongodb, collection)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 145 def get_collection_names(mongodb, collection) collection_names = [] @mongodb.collection_names.each do |coll| @logger.info("printing the collection names #{coll}") if /#{collection}/ =~ coll collection_names.push(coll) @logger.debug("Added #{coll} to the collection list as it matches our collection search") end end return collection_names end
get_collection_names_with_exactmatch(mongodb, collection)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 158 def get_collection_names_with_exactmatch(mongodb, collection) collection_names = [] @mongodb.collection_names.each do |coll| @logger.info("printing the collection names #{coll}") if collection == coll collection_names.push(coll) @logger.info("Added #{coll} to the collection list as it matches our collection search") end end return collection_names end
get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 172 def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size) collection = mongodb.collection(mongo_collection_name) # Need to make this sort by date in object id then get the first of the series # db.events_20150320.find().limit(1).sort({ts:1}) #hardcoded customization for the timebeing #return collection.find('$and' => [{:_id => {:$gt => last_id_object}}, {'attributes.id'=>'service_30'}, {'attributes.hostName' => /dev/}]).limit(batch_size) return collection.find('$and' => [{:_id => {:$gt => last_id_object}}, {'attributes.id'=>'service_30'}]).limit(batch_size) #regex searches taking eternity in mongodb end
get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 119 def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) since = sqlitedb[SINCE_TABLE] x = since.where(:table => "#{since_table}_#{mongo_collection_name}") if x[:place].nil? || x[:place] == 0 first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) @logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}") return first_entry_id else @logger.debug("placeholder already exists, it is #{x[:place]}") return x[:place][:place] end end
init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 100 def init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) @logger.debug("init placeholder for #{since_table}_#{mongo_collection_name}") since = sqlitedb[SINCE_TABLE] mongo_collection = mongodb.collection(mongo_collection_name) #first_entry = mongo_collection.find('$and' => [{'attributes.id'=>'service_30'}, {'attributes.hostName' => /dev/}]).sort(since_column => 1).limit(1).first first_entry = mongo_collection.find('$and' => [{'attributes.id'=>'service_30'}]).sort(since_column => 1).limit(1).first #regex taking eternity in mongodb first_entry_id = '' if since_type == 'id' first_entry_id = first_entry[since_column].to_s else first_entry_id = first_entry[since_column].to_i end since.insert(:table => "#{since_table}_#{mongo_collection_name}", :place => first_entry_id) @logger.info("init placeholder for #{since_table}_#{mongo_collection_name}: #{first_entry}") return first_entry_id end
init_placeholder_table(sqlitedb)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 88 def init_placeholder_table(sqlitedb) begin sqlitedb.create_table "#{SINCE_TABLE}" do String :table Int :place end rescue @logger.debug("since table already exists") end end
register()
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 202 def register require "jdbc/sqlite3" require "sequel" placeholder_db_path = File.join(@placeholder_db_dir, @placeholder_db_name) conn = Mongo::Client.new(@uri) @host = Socket.gethostname @logger.info("Registering MongoDB input - now included dbname in the config file") #@mongodb = conn.database @mongodb = conn.use(@dbname).database @sqlitedb = Sequel.connect("jdbc:sqlite:#{placeholder_db_path}") # Should check to see if there are new matching tables at a predefined interval or on some trigger @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb) end
run(queue)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 345 def run(queue) sleep_min = 0.01 sleep_max = 5 sleeptime = sleep_min @logger.debug("Tailing MongoDB") @logger.debug("Collection data is: #{@collection_data}") while true && !stop? begin @collection_data.each do |index, collection| collection_name = collection[:name] @logger.debug("collection_data is: #{@collection_data}") last_id = @collection_data[index][:last_id] #@logger.debug("last_id is #{last_id}", :index => index, :collection => collection_name) # get batch of events starting at the last_place if it is set last_id_object = last_id if since_type == 'id' last_id_object = BSON::ObjectId(last_id) elsif since_type == 'time' if last_id != '' last_id_object = Time.at(last_id) end end cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size) cursor.each do |doc| logdate = DateTime.parse(doc['_id'].generation_time.to_s) event = LogStash::Event.new("host" => @host) decorate(event) event.set("logdate",logdate.iso8601.force_encoding(Encoding::UTF_8)) transformedjson = transformcsapjson(doc) log_entry = doc.to_h.to_s log_entry['_id'] = log_entry['_id'].to_s #event.set("log_entry",transformedjson.force_encoding(Encoding::UTF_8)) event.set("log_entry",transformedjson) event.set("mongo_id",doc['_id'].to_s) if doc['attributes']['hostName'].include?("dev") event.set("csap_env","dev") else if doc['attributes']['hostName'].include?("stg") event.set("csap_env","stg") else if doc['attributes']['hostName'].include?("lt") && doc['attributes']['hostName'].include?("prf") event.set("csap_env","lt") else if doc['attributes']['hostName'].include?("prd") event.set("csap_env","prd") end end end end @logger.debug("mongo_id: "+doc['_id'].to_s) #@logger.debug("EVENT looks like: "+event.to_s) #@logger.debug("Sent message: "+doc.to_h.to_s) #@logger.debug("EVENT looks like: "+event.to_s) # Extract the HOST_ID and PID from the MongoDB BSON::ObjectID if @unpack_mongo_id doc_hex_bytes = doc['_id'].to_s.each_char.each_slice(2).map {|b| b.join.to_i(16) } doc_obj_bin = doc_hex_bytes.pack("C*").unpack("a4 a3 a2 a3") host_id = doc_obj_bin[1].unpack("S") process_id = doc_obj_bin[2].unpack("S") event.set('host_id',host_id.first.to_i) event.set('process_id',process_id.first.to_i) end if @parse_method == 'flatten' # Flatten the JSON so that the data is usable in Kibana flat_doc = flatten(doc) # Check for different types of expected values and add them to the event if flat_doc['info_message'] && (flat_doc['info_message'] =~ /collection stats: .+/) # Some custom stuff I'm having to do to fix formatting in past logs... sub_value = flat_doc['info_message'].sub("collection stats: ", "") JSON.parse(sub_value).each do |k1,v1| flat_doc["collection_stats_#{k1.to_s}"] = v1 end end flat_doc.each do |k,v| # Check for an integer @logger.debug("key: #{k.to_s} value: #{v.to_s}") if v.is_a? Numeric event.set(k.to_s,v) elsif v.is_a? Time event.set(k.to_s,v.iso8601) elsif v.is_a? String if v == "NaN" event.set(k.to_s, Float::NAN) elsif /\A[-+]?\d+[.][\d]+\z/ == v event.set(k.to_s, v.to_f) elsif (/\A[-+]?\d+\z/ === v) || (v.is_a? Integer) event.set(k.to_s, v.to_i) else event.set(k.to_s, v) end else if k.to_s == "_id" || k.to_s == "tags" event.set(k.to_s, v.to_s ) end if (k.to_s == "tags") && (v.is_a? Array) event.set('tags',v) end end end elsif @parse_method == 'dig' # Dig into the JSON and flatten select elements doc.each do |k, v| if k != "_id" if (@dig_fields.include? k) && (v.respond_to? :each) v.each do |kk, vv| if (@dig_dig_fields.include? kk) && (vv.respond_to? :each) vv.each do |kkk, vvv| if /\A[-+]?\d+\z/ === vvv event.set("#{k}_#{kk}_#{kkk}",vvv.to_i) else event.set("#{k}_#{kk}_#{kkk}", vvv.to_s) end end else if /\A[-+]?\d+\z/ === vv event.set("#{k}_#{kk}", vv.to_i) else event.set("#{k}_#{kk}",vv.to_s) end end end else if /\A[-+]?\d+\z/ === v event.set(k,v.to_i) else event.set(k,v.to_s) end end end end elsif @parse_method == 'simple' doc.each do |k, v| if v.is_a? Numeric event.set(k, v.abs) elsif v.is_a? Array event.set(k, v) elsif v == "NaN" event.set(k, Float::NAN) else event.set(k, v.to_s) end end end queue << event since_id = doc[since_column] if since_type == 'id' since_id = doc[since_column].to_s elsif since_type == 'time' since_id = doc[since_column].to_i end @collection_data[index][:last_id] = since_id end # Store the last-seen doc in the database update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id]) end @logger.debug("Updating watch collections") @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb) # nothing found in that iteration # sleep a bit @logger.debug("No new rows. Sleeping.", :time => sleeptime) sleeptime = [sleeptime * 2, sleep_max].min sleep(sleeptime) rescue => e @logger.warn('MongoDB Input threw an exception, restarting', :exception => e) end end end
transformcsapjson(doc)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 261 def transformcsapjson(doc) @logger.info("transforming document format #{doc['_id']}") hash = doc.to_hash hostname = hash[:attributes][:hostName] service_names = [] total = hash[:data][:timeStamp].size() hash[:data].each do |k,v| if k.include? "topCpu_" service_names.push(k.partition('topCpu_').last) end end service_count = service_names.size() @logger.debug("service count #{service_count}") jsonstimestamp = [] total.times do |count| service_count.times do |servicecount| begin timestamp= "timestamp: #{hash[:data][:timeStamp][count]}," timestampformatted= "timestampformatted: #{Time.at(hash[:data][:timeStamp][count].to_f/1000.0).utc.strftime("%Y-%m-%d %H:%M:%S.%3N")}," rescue => e @logger.warn('MongoDB Input threw an exception while processing timestamp', :exception => e) timestamp= "" timestampformatted="" end begin hostName= "hostName: #{hostname}," rescue => e @logger.warn('MongoDB Input threw an exception while processing hostName', :exception => e) hostName= "" end begin serviceName= "serviceName: #{service_names[servicecount]}," rescue => e @logger.warn('MongoDB Input threw an exception while processing serviceName', :exception => e) serviceName= "" end begin cpu = "cpu: #{hash[:data]["topCpu_"+ service_names[servicecount]][count]}," rescue => e @logger.warn('MongoDB Input threw an exception while processing cpu', :exception => e) cpu= "" end begin memory = "memory: #{hash[:data]["rssMemory_"+ service_names[servicecount]][count]}," rescue => e @logger.warn('MongoDB Input threw an exception while processing memory', :exception => e) memory= "" end begin diskRead = "diskRead: #{hash[:data]["diskReadKb_"+ service_names[servicecount]][count]}," rescue => e @logger.warn('MongoDB Input threw an exception while processing diskRead', :exception => e) diskRead= "" end begin diskWrite = "diskWrite: #{hash[:data]["diskWriteKb_"+ service_names[servicecount]][count]}," rescue => e @logger.warn('MongoDB Input threw an exception while processing diskWrite', :exception => e) diskWrite= "" end begin diskUtil = "diskUtil: #{hash[:data]["diskUtil_"+ service_names[servicecount]][count]}" rescue => e @logger.warn('MongoDB Input threw an exception while processing diskUtil', :exception => e) diskUtil= "" end jsontext = timestamp + timestampformatted + hostName + serviceName + cpu + memory + diskRead + diskWrite + diskUtil jsonstimestamp.push("{" + jsontext + "}") end end resultjson = jsonstimestamp @logger.debug("the transformed json #{resultjson}") return resultjson end
update_placeholder(sqlitedb, since_table, mongo_collection_name, place)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 133 def update_placeholder(sqlitedb, since_table, mongo_collection_name, place) #@logger.debug("updating placeholder for #{since_table}_#{mongo_collection_name} to #{place}") since = sqlitedb[SINCE_TABLE] since.where(:table => "#{since_table}_#{mongo_collection_name}").update(:place => place) end
update_watched_collections(mongodb, collection, sqlitedb)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 182 def update_watched_collections(mongodb, collection, sqlitedb) collections = [] if isCollectionExactMatch == true collections = get_collection_names_with_exactmatch(mongodb, collection) else collections = get_collection_names(mongodb, collection) end collection_data = {} collections.each do |my_collection| init_placeholder_table(sqlitedb) last_id = get_placeholder(sqlitedb, since_table, mongodb, my_collection) if !collection_data[my_collection] collection_data[my_collection] = { :name => my_collection, :last_id => last_id } end end return collection_data end