class LogStash::Inputs::MongoDB
Constants
- SINCE_TABLE
Public Instance Methods
close()
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 387 def close # If needed, use this to tidy up on shutdown @logger.debug("Shutting down...") end
flatten(my_hash)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 195 def flatten(my_hash) new_hash = {} @logger.debug("Raw Hash: #{my_hash}") if my_hash.respond_to? :each my_hash.each do |k1,v1| if v1.is_a?(Hash) v1.each do |k2,v2| if v2.is_a?(Hash) # puts "Found a nested hash" result = flatten(v2) result.each do |k3,v3| new_hash[k1.to_s+"_"+k2.to_s+"_"+k3.to_s] = v3 end # puts "result: "+result.to_s+" k2: "+k2.to_s+" v2: "+v2.to_s else new_hash[k1.to_s+"_"+k2.to_s] = v2 end end else # puts "Key: "+k1.to_s+" is not a hash" new_hash[k1.to_s] = v1 end end else @logger.debug("Flatten [ERROR]: hash did not respond to :each") end @logger.debug("Flattened Hash: #{new_hash}") return new_hash end
get_all_tables(mongodb)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 130 def get_all_tables(mongodb) return @mongodb.collection_names end
get_collection_names(mongodb, collection)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 135 def get_collection_names(mongodb, collection) collection_names = [] @mongodb.collection_names.each do |coll| if /#{collection}/ =~ coll collection_names.push(coll) @logger.debug("Added #{coll} to the collection list as it matches our collection search") end end return collection_names end
get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 147 def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size) collection = mongodb.collection(mongo_collection_name) # Need to make this sort by date in object id then get the first of the series # db.events_20150320.find().limit(1).sort({ts:1}) return collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size) end
get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 109 def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) since = sqlitedb[SINCE_TABLE] x = since.where(:table => "#{since_table}_#{mongo_collection_name}") if x[:place].nil? || x[:place] == 0 first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) @logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}") return first_entry_id else @logger.debug("placeholder already exists, it is #{x[:place]}") return x[:place][:place] end end
init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 91 def init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) @logger.debug("init placeholder for #{since_table}_#{mongo_collection_name}") since = sqlitedb[SINCE_TABLE] mongo_collection = mongodb.collection(mongo_collection_name) first_entry = mongo_collection.find({}).sort(since_column => 1).limit(1).first first_entry_id = '' if since_type == 'id' first_entry_id = first_entry[since_column].to_s else first_entry_id = first_entry[since_column].to_i end since.insert(:table => "#{since_table}_#{mongo_collection_name}", :place => first_entry_id) @logger.info("init placeholder for #{since_table}_#{mongo_collection_name}: #{first_entry}") return first_entry_id end
init_placeholder_table(sqlitedb)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 79 def init_placeholder_table(sqlitedb) begin sqlitedb.create_table "#{SINCE_TABLE}" do String :table Int :place end rescue @logger.debug("since table already exists") end end
register()
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 169 def register require "jdbc/sqlite3" require "sequel" placeholder_db_path = File.join(@placeholder_db_dir, @placeholder_db_name) conn = Mongo::Client.new(@uri) @host = Socket.gethostname @logger.info("Registering MongoDB input") @mongodb = conn.database @sqlitedb = Sequel.connect("jdbc:sqlite:#{placeholder_db_path}") # Should check to see if there are new matching tables at a predefined interval or on some trigger @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb) end
run(queue)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 225 def run(queue) sleep_min = 0.01 sleep_max = 5 sleeptime = sleep_min @logger.debug("Tailing MongoDB") @logger.debug("Collection data is: #{@collection_data}") while true && !stop? begin @collection_data.each do |index, collection| collection_name = collection[:name] @logger.debug("collection_data is: #{@collection_data}") last_id = @collection_data[index][:last_id] #@logger.debug("last_id is #{last_id}", :index => index, :collection => collection_name) # get batch of events starting at the last_place if it is set last_id_object = last_id if since_type == 'id' last_id_object = BSON::ObjectId(last_id) elsif since_type == 'time' if last_id != '' last_id_object = Time.at(last_id) end end cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size) cursor.each do |doc| logdate = DateTime.parse(doc['_id'].generation_time.to_s) event = LogStash::Event.new("host" => @host) decorate(event) event.set("logdate",logdate.iso8601.force_encoding(Encoding::UTF_8)) log_entry = doc.to_h.to_s log_entry['_id'] = log_entry['_id'].to_s event.set("log_entry",log_entry.force_encoding(Encoding::UTF_8)) event.set("mongo_id",doc['_id'].to_s) @logger.debug("mongo_id: "+doc['_id'].to_s) #@logger.debug("EVENT looks like: "+event.to_s) #@logger.debug("Sent message: "+doc.to_h.to_s) #@logger.debug("EVENT looks like: "+event.to_s) # Extract the HOST_ID and PID from the MongoDB BSON::ObjectID if @unpack_mongo_id doc_hex_bytes = doc['_id'].to_s.each_char.each_slice(2).map {|b| b.join.to_i(16) } doc_obj_bin = doc_hex_bytes.pack("C*").unpack("a4 a3 a2 a3") host_id = doc_obj_bin[1].unpack("S") process_id = doc_obj_bin[2].unpack("S") event.set('host_id',host_id.first.to_i) event.set('process_id',process_id.first.to_i) end if @parse_method == 'flatten' # Flatten the JSON so that the data is usable in Kibana flat_doc = flatten(doc) # Check for different types of expected values and add them to the event if flat_doc['info_message'] && (flat_doc['info_message'] =~ /collection stats: .+/) # Some custom stuff I'm having to do to fix formatting in past logs... sub_value = flat_doc['info_message'].sub("collection stats: ", "") JSON.parse(sub_value).each do |k1,v1| flat_doc["collection_stats_#{k1.to_s}"] = v1 end end flat_doc.each do |k,v| # Check for an integer @logger.debug("key: #{k.to_s} value: #{v.to_s}") if v.is_a? Numeric event.set(k.to_s,v) elsif v.is_a? Time event.set(k.to_s,v.iso8601) elsif v.is_a? String if v == "NaN" event.set(k.to_s, Float::NAN) elsif /\A[-+]?\d+[.][\d]+\z/ == v event.set(k.to_s, v.to_f) elsif (/\A[-+]?\d+\z/ === v) || (v.is_a? Integer) event.set(k.to_s, v.to_i) else event.set(k.to_s, v) end else if k.to_s == "_id" || k.to_s == "tags" event.set(k.to_s, v.to_s ) end if (k.to_s == "tags") && (v.is_a? Array) event.set('tags',v) end end end elsif @parse_method == 'dig' # Dig into the JSON and flatten select elements doc.each do |k, v| if k != "_id" if (@dig_fields.include? k) && (v.respond_to? :each) v.each do |kk, vv| if (@dig_dig_fields.include? kk) && (vv.respond_to? :each) vv.each do |kkk, vvv| if /\A[-+]?\d+\z/ === vvv event.set("#{k}_#{kk}_#{kkk}",vvv.to_i) else event.set("#{k}_#{kk}_#{kkk}", vvv.to_s) end end else if /\A[-+]?\d+\z/ === vv event.set("#{k}_#{kk}", vv.to_i) else event.set("#{k}_#{kk}",vv.to_s) end end end else if /\A[-+]?\d+\z/ === v event.set(k,v.to_i) else event.set(k,v.to_s) end end end end elsif @parse_method == 'simple' doc.each do |k, v| if v.is_a? Numeric event.set(k, v.abs) elsif v.is_a? Array event.set(k, v) elsif v == "NaN" event.set(k, Float::NAN) else event.set(k, v.to_s) end end end queue << event since_id = doc[since_column] if since_type == 'id' since_id = doc[since_column].to_s elsif since_type == 'time' since_id = doc[since_column].to_i end @collection_data[index][:last_id] = since_id end # Store the last-seen doc in the database update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id]) end @logger.debug("Updating watch collections") @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb) # nothing found in that iteration # sleep a bit @logger.debug("No new rows. Sleeping.", :time => sleeptime) sleeptime = [sleeptime * 2, sleep_max].min sleep(sleeptime) rescue => e @logger.warn('MongoDB Input threw an exception, restarting', :exception => e) end end end
update_placeholder(sqlitedb, since_table, mongo_collection_name, place)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 123 def update_placeholder(sqlitedb, since_table, mongo_collection_name, place) #@logger.debug("updating placeholder for #{since_table}_#{mongo_collection_name} to #{place}") since = sqlitedb[SINCE_TABLE] since.where(:table => "#{since_table}_#{mongo_collection_name}").update(:place => place) end
update_watched_collections(mongodb, collection, sqlitedb)
click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 155 def update_watched_collections(mongodb, collection, sqlitedb) collections = get_collection_names(mongodb, collection) collection_data = {} collections.each do |my_collection| init_placeholder_table(sqlitedb) last_id = get_placeholder(sqlitedb, since_table, mongodb, my_collection) if !collection_data[my_collection] collection_data[my_collection] = { :name => my_collection, :last_id => last_id } end end return collection_data end