class LogStash::Inputs::MongoDB

Constants

SINCE_TABLE

Public Instance Methods

close() click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 387
def close
  # If needed, use this to tidy up on shutdown
  @logger.debug("Shutting down...")
end
flatten(my_hash) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 195
def flatten(my_hash)
  new_hash = {}
  @logger.debug("Raw Hash: #{my_hash}")
  if my_hash.respond_to? :each
    my_hash.each do |k1,v1|
      if v1.is_a?(Hash)
        v1.each do |k2,v2|
          if v2.is_a?(Hash)
            # puts "Found a nested hash"
            result = flatten(v2)
            result.each do |k3,v3|
              new_hash[k1.to_s+"_"+k2.to_s+"_"+k3.to_s] = v3
            end
            # puts "result: "+result.to_s+" k2: "+k2.to_s+" v2: "+v2.to_s
          else
            new_hash[k1.to_s+"_"+k2.to_s] = v2
          end
        end
      else
        # puts "Key: "+k1.to_s+" is not a hash"
        new_hash[k1.to_s] = v1
      end
    end
  else
    @logger.debug("Flatten [ERROR]: hash did not respond to :each")
  end
  @logger.debug("Flattened Hash: #{new_hash}")
  return new_hash
end
get_all_tables(mongodb) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 130
def get_all_tables(mongodb)
  return @mongodb.collection_names
end
get_collection_names(mongodb, collection) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 135
def get_collection_names(mongodb, collection)
  collection_names = []
  @mongodb.collection_names.each do |coll|
    if /#{collection}/ =~ coll
      collection_names.push(coll)
      @logger.debug("Added #{coll} to the collection list as it matches our collection search")
    end
  end
  return collection_names
end
get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 147
def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size)
  collection = mongodb.collection(mongo_collection_name)
  # Need to make this sort by date in object id then get the first of the series
  # db.events_20150320.find().limit(1).sort({ts:1})
  return collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size)
end
get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 109
def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
  since = sqlitedb[SINCE_TABLE]
  x = since.where(:table => "#{since_table}_#{mongo_collection_name}")
  if x[:place].nil? || x[:place] == 0
    first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
    @logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}")
    return first_entry_id
  else
    @logger.debug("placeholder already exists, it is #{x[:place]}")
    return x[:place][:place]
  end
end
init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 91
def init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
  @logger.debug("init placeholder for #{since_table}_#{mongo_collection_name}")
  since = sqlitedb[SINCE_TABLE]
  mongo_collection = mongodb.collection(mongo_collection_name)

  first_entry = mongo_collection.find({}).sort(since_column => 1).limit(1).first
  first_entry_id = ''
  if since_type == 'id'
    first_entry_id = first_entry[since_column].to_s
  else
    first_entry_id = first_entry[since_column].to_i
  end
  since.insert(:table => "#{since_table}_#{mongo_collection_name}", :place => first_entry_id)
  @logger.info("init placeholder for #{since_table}_#{mongo_collection_name}: #{first_entry}")
  return first_entry_id
end
init_placeholder_table(sqlitedb) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 79
def init_placeholder_table(sqlitedb)
  begin
    sqlitedb.create_table "#{SINCE_TABLE}" do
      String :table
      Int :place
    end
  rescue
    @logger.debug("since table already exists")
  end
end
register() click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 169
def register
  require "jdbc/sqlite3"
  require "sequel"
  placeholder_db_path = File.join(@placeholder_db_dir, @placeholder_db_name)
  conn = Mongo::Client.new(@uri)

  @host = Socket.gethostname
  @logger.info("Registering MongoDB input")

  @mongodb = conn.database
  @sqlitedb = Sequel.connect("jdbc:sqlite:#{placeholder_db_path}")

  # Should check to see if there are new matching tables at a predefined interval or on some trigger
  @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)
end
run(queue) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 225
def run(queue)
  sleep_min = 0.01
  sleep_max = 5
  sleeptime = sleep_min

  @logger.debug("Tailing MongoDB")
  @logger.debug("Collection data is: #{@collection_data}")

  while true && !stop?
    begin
      @collection_data.each do |index, collection|
        collection_name = collection[:name]
        @logger.debug("collection_data is: #{@collection_data}")
        last_id = @collection_data[index][:last_id]
        #@logger.debug("last_id is #{last_id}", :index => index, :collection => collection_name)
        # get batch of events starting at the last_place if it is set


        last_id_object = last_id
        if since_type == 'id'
          last_id_object = BSON::ObjectId(last_id)
        elsif since_type == 'time'
          if last_id != ''
            last_id_object = Time.at(last_id)
          end
        end
        cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size)
        cursor.each do |doc|
          logdate = DateTime.parse(doc['_id'].generation_time.to_s)
          event = LogStash::Event.new("host" => @host)
          decorate(event)
          event.set("logdate",logdate.iso8601.force_encoding(Encoding::UTF_8))
          log_entry = doc.to_h.to_s
          log_entry['_id'] = log_entry['_id'].to_s
          event.set("log_entry",log_entry.force_encoding(Encoding::UTF_8))
          event.set("mongo_id",doc['_id'].to_s)
          @logger.debug("mongo_id: "+doc['_id'].to_s)
          #@logger.debug("EVENT looks like: "+event.to_s)
          #@logger.debug("Sent message: "+doc.to_h.to_s)
          #@logger.debug("EVENT looks like: "+event.to_s)
          # Extract the HOST_ID and PID from the MongoDB BSON::ObjectID
          if @unpack_mongo_id
            doc_hex_bytes = doc['_id'].to_s.each_char.each_slice(2).map {|b| b.join.to_i(16) }
            doc_obj_bin = doc_hex_bytes.pack("C*").unpack("a4 a3 a2 a3")
            host_id = doc_obj_bin[1].unpack("S")
            process_id = doc_obj_bin[2].unpack("S")
            event.set('host_id',host_id.first.to_i)
            event.set('process_id',process_id.first.to_i)
          end

          if @parse_method == 'flatten'
            # Flatten the JSON so that the data is usable in Kibana
            flat_doc = flatten(doc)
            # Check for different types of expected values and add them to the event
            if flat_doc['info_message'] && (flat_doc['info_message']  =~ /collection stats: .+/)
              # Some custom stuff I'm having to do to fix formatting in past logs...
              sub_value = flat_doc['info_message'].sub("collection stats: ", "")
              JSON.parse(sub_value).each do |k1,v1|
                flat_doc["collection_stats_#{k1.to_s}"] = v1
              end
            end

            flat_doc.each do |k,v|
              # Check for an integer
              @logger.debug("key: #{k.to_s} value: #{v.to_s}")
              if v.is_a? Numeric
                event.set(k.to_s,v)
              elsif v.is_a? Time
                event.set(k.to_s,v.iso8601)

              elsif v.is_a? String
                if v == "NaN"
                  event.set(k.to_s, Float::NAN)
                elsif /\A[-+]?\d+[.][\d]+\z/ == v
                  event.set(k.to_s, v.to_f)
                elsif (/\A[-+]?\d+\z/ === v) || (v.is_a? Integer)
                  event.set(k.to_s, v.to_i)
                else
                  event.set(k.to_s, v)
                end
              else
                if k.to_s  == "_id" || k.to_s == "tags"
                  event.set(k.to_s, v.to_s )
                end
                if (k.to_s == "tags") && (v.is_a? Array)
                  event.set('tags',v)
                end
              end
            end
          elsif @parse_method == 'dig'
            # Dig into the JSON and flatten select elements
            doc.each do |k, v|
              if k != "_id"
                if (@dig_fields.include? k) && (v.respond_to? :each)
                  v.each do |kk, vv|
                    if (@dig_dig_fields.include? kk) && (vv.respond_to? :each)
                      vv.each do |kkk, vvv|
                        if /\A[-+]?\d+\z/ === vvv
                          event.set("#{k}_#{kk}_#{kkk}",vvv.to_i)
                        else
                          event.set("#{k}_#{kk}_#{kkk}", vvv.to_s)
                        end
                      end
                    else
                      if /\A[-+]?\d+\z/ === vv
                        event.set("#{k}_#{kk}", vv.to_i)
                      else
                        event.set("#{k}_#{kk}",vv.to_s)
                      end
                    end
                  end
                else
                  if /\A[-+]?\d+\z/ === v
                    event.set(k,v.to_i)
                  else
                    event.set(k,v.to_s)
                  end
                end
              end
            end
          elsif @parse_method == 'simple'
            doc.each do |k, v|
                if v.is_a? Numeric
                  event.set(k, v.abs)
                elsif v.is_a? Array
                  event.set(k, v)
                elsif v == "NaN"
                  event.set(k, Float::NAN)
                else
                  event.set(k, v.to_s)
                end
            end
          end

          queue << event

          since_id = doc[since_column]
          if since_type == 'id'
            since_id = doc[since_column].to_s
          elsif since_type == 'time'
            since_id = doc[since_column].to_i
          end

          @collection_data[index][:last_id] = since_id
        end
        # Store the last-seen doc in the database
        update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id])
      end
      @logger.debug("Updating watch collections")
      @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)

      # nothing found in that iteration
      # sleep a bit
      @logger.debug("No new rows. Sleeping.", :time => sleeptime)
      sleeptime = [sleeptime * 2, sleep_max].min
      sleep(sleeptime)
    rescue => e
      @logger.warn('MongoDB Input threw an exception, restarting', :exception => e)
    end
  end
end
update_placeholder(sqlitedb, since_table, mongo_collection_name, place) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 123
def update_placeholder(sqlitedb, since_table, mongo_collection_name, place)
  #@logger.debug("updating placeholder for #{since_table}_#{mongo_collection_name} to #{place}")
  since = sqlitedb[SINCE_TABLE]
  since.where(:table => "#{since_table}_#{mongo_collection_name}").update(:place => place)
end
update_watched_collections(mongodb, collection, sqlitedb) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 155
def update_watched_collections(mongodb, collection, sqlitedb)
  collections = get_collection_names(mongodb, collection)
  collection_data = {}
  collections.each do |my_collection|
    init_placeholder_table(sqlitedb)
    last_id = get_placeholder(sqlitedb, since_table, mongodb, my_collection)
    if !collection_data[my_collection]
      collection_data[my_collection] = { :name => my_collection, :last_id => last_id }
    end
  end
  return collection_data
end