class LogStash::Inputs::MongoDB

Constants

SINCE_TABLE

Public Instance Methods

close() click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 526
def close
  # If needed, use this to tidy up on shutdown
  @logger.debug("Shutting down...")
end
flatten(my_hash) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 229
def flatten(my_hash)
  new_hash = {}
  @logger.debug("Raw Hash: #{my_hash}")
  if my_hash.respond_to? :each
    my_hash.each do |k1,v1|
      if v1.is_a?(Hash)
        v1.each do |k2,v2|
          if v2.is_a?(Hash)
            # puts "Found a nested hash"
            result = flatten(v2)
            result.each do |k3,v3|
              new_hash[k1.to_s+"_"+k2.to_s+"_"+k3.to_s] = v3
            end
            # puts "result: "+result.to_s+" k2: "+k2.to_s+" v2: "+v2.to_s
          else
            new_hash[k1.to_s+"_"+k2.to_s] = v2
          end
        end
      else
        # puts "Key: "+k1.to_s+" is not a hash"
        new_hash[k1.to_s] = v1
      end
    end
  else
    @logger.debug("Flatten [ERROR]: hash did not respond to :each")
  end
  @logger.debug("Flattened Hash: #{new_hash}")
  return new_hash
end
get_all_tables(mongodb) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 140
def get_all_tables(mongodb)
  return @mongodb.collection_names
end
get_collection_names(mongodb, collection) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 145
def get_collection_names(mongodb, collection)
  collection_names = []
  @mongodb.collection_names.each do |coll|
    @logger.info("printing the collection names #{coll}")
    if /#{collection}/ =~ coll
      collection_names.push(coll)
      @logger.debug("Added #{coll} to the collection list as it matches our collection search")
    end
  end
  return collection_names
end
get_collection_names_with_exactmatch(mongodb, collection) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 158
def get_collection_names_with_exactmatch(mongodb, collection)
  collection_names = []
  @mongodb.collection_names.each do |coll|
    @logger.info("printing the collection names #{coll}")
    if collection == coll
      collection_names.push(coll)
      @logger.info("Added #{coll} to the collection list as it matches our collection search")
    end
  end
  return collection_names
end
get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 172
def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size)
  collection = mongodb.collection(mongo_collection_name)
  # Need to make this sort by date in object id then get the first of the series
  # db.events_20150320.find().limit(1).sort({ts:1})
  #hardcoded customization for the timebeing
  #return collection.find('$and' => [{:_id => {:$gt => last_id_object}}, {'attributes.id'=>'service_30'}, {'attributes.hostName' => /dev/}]).limit(batch_size)
  return collection.find('$and' => [{:_id => {:$gt => last_id_object}}, {'attributes.id'=>'service_30'}]).limit(batch_size) #regex searches taking eternity in mongodb
end
get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 119
def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
  since = sqlitedb[SINCE_TABLE]
  x = since.where(:table => "#{since_table}_#{mongo_collection_name}")
  if x[:place].nil? || x[:place] == 0
    first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
    @logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}")
    return first_entry_id
  else
    @logger.debug("placeholder already exists, it is #{x[:place]}")
    return x[:place][:place]
  end
end
init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 100
def init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
  @logger.debug("init placeholder for #{since_table}_#{mongo_collection_name}")
  since = sqlitedb[SINCE_TABLE]
  mongo_collection = mongodb.collection(mongo_collection_name)

  #first_entry = mongo_collection.find('$and' => [{'attributes.id'=>'service_30'}, {'attributes.hostName' => /dev/}]).sort(since_column => 1).limit(1).first
first_entry = mongo_collection.find('$and' => [{'attributes.id'=>'service_30'}]).sort(since_column => 1).limit(1).first #regex taking eternity in mongodb
  first_entry_id = ''
  if since_type == 'id'
    first_entry_id = first_entry[since_column].to_s
  else
    first_entry_id = first_entry[since_column].to_i
  end
  since.insert(:table => "#{since_table}_#{mongo_collection_name}", :place => first_entry_id)
  @logger.info("init placeholder for #{since_table}_#{mongo_collection_name}: #{first_entry}")
  return first_entry_id
end
init_placeholder_table(sqlitedb) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 88
def init_placeholder_table(sqlitedb)
  begin
    sqlitedb.create_table "#{SINCE_TABLE}" do
      String :table
      Int :place
    end
  rescue
    @logger.debug("since table already exists")
  end
end
register() click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 202
def register
  require "jdbc/sqlite3"
  require "sequel"
  placeholder_db_path = File.join(@placeholder_db_dir, @placeholder_db_name)
  conn = Mongo::Client.new(@uri)

  @host = Socket.gethostname
  @logger.info("Registering MongoDB input - now included dbname in the config file")

  #@mongodb = conn.database
  @mongodb = conn.use(@dbname).database
  @sqlitedb = Sequel.connect("jdbc:sqlite:#{placeholder_db_path}")

  # Should check to see if there are new matching tables at a predefined interval or on some trigger
  @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)
end
run(queue) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 345
def run(queue)
  sleep_min = 0.01
  sleep_max = 5
  sleeptime = sleep_min

  @logger.debug("Tailing MongoDB")
  @logger.debug("Collection data is: #{@collection_data}")

  while true && !stop?
    begin
      @collection_data.each do |index, collection|
        collection_name = collection[:name]
        @logger.debug("collection_data is: #{@collection_data}")
        last_id = @collection_data[index][:last_id]
        #@logger.debug("last_id is #{last_id}", :index => index, :collection => collection_name)
        # get batch of events starting at the last_place if it is set


        last_id_object = last_id
        if since_type == 'id'
          last_id_object = BSON::ObjectId(last_id)
        elsif since_type == 'time'
          if last_id != ''
            last_id_object = Time.at(last_id)
          end
        end
        cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size)
        cursor.each do |doc|
          logdate = DateTime.parse(doc['_id'].generation_time.to_s)
          event = LogStash::Event.new("host" => @host)
          decorate(event)
          event.set("logdate",logdate.iso8601.force_encoding(Encoding::UTF_8))
          transformedjson = transformcsapjson(doc)
          log_entry = doc.to_h.to_s
          log_entry['_id'] = log_entry['_id'].to_s
          #event.set("log_entry",transformedjson.force_encoding(Encoding::UTF_8))
          event.set("log_entry",transformedjson)
          event.set("mongo_id",doc['_id'].to_s)
          
          if doc['attributes']['hostName'].include?("dev")
            event.set("csap_env","dev")
          else 
            if doc['attributes']['hostName'].include?("stg")
              event.set("csap_env","stg")
            else 
              if doc['attributes']['hostName'].include?("lt") && doc['attributes']['hostName'].include?("prf")
                event.set("csap_env","lt")
              else 
                if doc['attributes']['hostName'].include?("prd")
                  event.set("csap_env","prd")
                end
              end
            end
          end
          
          @logger.debug("mongo_id: "+doc['_id'].to_s)
          #@logger.debug("EVENT looks like: "+event.to_s)
          #@logger.debug("Sent message: "+doc.to_h.to_s)
          #@logger.debug("EVENT looks like: "+event.to_s)
          # Extract the HOST_ID and PID from the MongoDB BSON::ObjectID
          if @unpack_mongo_id
            doc_hex_bytes = doc['_id'].to_s.each_char.each_slice(2).map {|b| b.join.to_i(16) }
            doc_obj_bin = doc_hex_bytes.pack("C*").unpack("a4 a3 a2 a3")
            host_id = doc_obj_bin[1].unpack("S")
            process_id = doc_obj_bin[2].unpack("S")
            event.set('host_id',host_id.first.to_i)
            event.set('process_id',process_id.first.to_i)
          end

          if @parse_method == 'flatten'
            # Flatten the JSON so that the data is usable in Kibana
            flat_doc = flatten(doc)
            # Check for different types of expected values and add them to the event
            if flat_doc['info_message'] && (flat_doc['info_message']  =~ /collection stats: .+/)
              # Some custom stuff I'm having to do to fix formatting in past logs...
              sub_value = flat_doc['info_message'].sub("collection stats: ", "")
              JSON.parse(sub_value).each do |k1,v1|
                flat_doc["collection_stats_#{k1.to_s}"] = v1
              end
            end

            flat_doc.each do |k,v|
              # Check for an integer
              @logger.debug("key: #{k.to_s} value: #{v.to_s}")
              if v.is_a? Numeric
                event.set(k.to_s,v)
              elsif v.is_a? Time
                event.set(k.to_s,v.iso8601)

              elsif v.is_a? String
                if v == "NaN"
                  event.set(k.to_s, Float::NAN)
                elsif /\A[-+]?\d+[.][\d]+\z/ == v
                  event.set(k.to_s, v.to_f)
                elsif (/\A[-+]?\d+\z/ === v) || (v.is_a? Integer)
                  event.set(k.to_s, v.to_i)
                else
                  event.set(k.to_s, v)
                end
              else
                if k.to_s  == "_id" || k.to_s == "tags"
                  event.set(k.to_s, v.to_s )
                end
                if (k.to_s == "tags") && (v.is_a? Array)
                  event.set('tags',v)
                end
              end
            end
          elsif @parse_method == 'dig'
            # Dig into the JSON and flatten select elements
            doc.each do |k, v|
              if k != "_id"
                if (@dig_fields.include? k) && (v.respond_to? :each)
                  v.each do |kk, vv|
                    if (@dig_dig_fields.include? kk) && (vv.respond_to? :each)
                      vv.each do |kkk, vvv|
                        if /\A[-+]?\d+\z/ === vvv
                          event.set("#{k}_#{kk}_#{kkk}",vvv.to_i)
                        else
                          event.set("#{k}_#{kk}_#{kkk}", vvv.to_s)
                        end
                      end
                    else
                      if /\A[-+]?\d+\z/ === vv
                        event.set("#{k}_#{kk}", vv.to_i)
                      else
                        event.set("#{k}_#{kk}",vv.to_s)
                      end
                    end
                  end
                else
                  if /\A[-+]?\d+\z/ === v
                    event.set(k,v.to_i)
                  else
                    event.set(k,v.to_s)
                  end
                end
              end
            end
          elsif @parse_method == 'simple'
            doc.each do |k, v|
                if v.is_a? Numeric
                  event.set(k, v.abs)
                elsif v.is_a? Array
                  event.set(k, v)
                elsif v == "NaN"
                  event.set(k, Float::NAN)
                else
                  event.set(k, v.to_s)
                end
            end
          end

          queue << event

          since_id = doc[since_column]
          if since_type == 'id'
            since_id = doc[since_column].to_s
          elsif since_type == 'time'
            since_id = doc[since_column].to_i
          end

          @collection_data[index][:last_id] = since_id
        end
        # Store the last-seen doc in the database
        update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id])
      end
      @logger.debug("Updating watch collections")
      @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)

      # nothing found in that iteration
      # sleep a bit
      @logger.debug("No new rows. Sleeping.", :time => sleeptime)
      sleeptime = [sleeptime * 2, sleep_max].min
      sleep(sleeptime)
    rescue => e
      @logger.warn('MongoDB Input threw an exception, restarting', :exception => e)
    end
  end
end
transformcsapjson(doc) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 261
def transformcsapjson(doc)
  @logger.info("transforming document format #{doc['_id']}")
  hash = doc.to_hash
  hostname = hash[:attributes][:hostName]
  service_names = []
  total = hash[:data][:timeStamp].size()
  hash[:data].each do |k,v|
      if k.include? "topCpu_"
        service_names.push(k.partition('topCpu_').last)
      end
  end
  service_count = service_names.size()
  @logger.debug("service count #{service_count}")
  jsonstimestamp = []
  total.times do |count|
    service_count.times do |servicecount|
      begin
        timestamp= "timestamp: #{hash[:data][:timeStamp][count]},"
        timestampformatted= "timestampformatted: #{Time.at(hash[:data][:timeStamp][count].to_f/1000.0).utc.strftime("%Y-%m-%d %H:%M:%S.%3N")},"
      rescue => e
        @logger.warn('MongoDB Input threw an exception while processing timestamp', :exception => e)
        timestamp= ""
        timestampformatted=""
      end
      
      begin
        hostName= "hostName: #{hostname},"
      rescue => e
        @logger.warn('MongoDB Input threw an exception while processing hostName', :exception => e)
        hostName= ""
      end
      
      begin
        serviceName= "serviceName: #{service_names[servicecount]},"
      rescue => e
        @logger.warn('MongoDB Input threw an exception while processing serviceName', :exception => e)
        serviceName= ""
      end
      
      begin
        cpu = "cpu: #{hash[:data]["topCpu_"+ service_names[servicecount]][count]},"
      rescue => e
        @logger.warn('MongoDB Input threw an exception while processing cpu', :exception => e)
        cpu= ""
      end

      begin
        memory = "memory: #{hash[:data]["rssMemory_"+ service_names[servicecount]][count]},"
      rescue => e
        @logger.warn('MongoDB Input threw an exception while processing memory', :exception => e)
        memory= ""
      end

      begin
        diskRead = "diskRead: #{hash[:data]["diskReadKb_"+ service_names[servicecount]][count]},"
      rescue => e
        @logger.warn('MongoDB Input threw an exception while processing diskRead', :exception => e)
        diskRead= ""
      end

      begin
        diskWrite = "diskWrite: #{hash[:data]["diskWriteKb_"+ service_names[servicecount]][count]},"
      rescue => e
        @logger.warn('MongoDB Input threw an exception while processing diskWrite', :exception => e)
        diskWrite= ""
      end
      
      begin
        diskUtil = "diskUtil: #{hash[:data]["diskUtil_"+ service_names[servicecount]][count]}"
      rescue => e
        @logger.warn('MongoDB Input threw an exception while processing diskUtil', :exception => e)
        diskUtil= ""
      end

      jsontext = timestamp + timestampformatted + hostName + serviceName + cpu + memory + diskRead + diskWrite + diskUtil
      jsonstimestamp.push("{" + jsontext + "}")
    end
  end
  resultjson = jsonstimestamp
  @logger.debug("the transformed json #{resultjson}")
  return resultjson    
end
update_placeholder(sqlitedb, since_table, mongo_collection_name, place) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 133
def update_placeholder(sqlitedb, since_table, mongo_collection_name, place)
  #@logger.debug("updating placeholder for #{since_table}_#{mongo_collection_name} to #{place}")
  since = sqlitedb[SINCE_TABLE]
  since.where(:table => "#{since_table}_#{mongo_collection_name}").update(:place => place)
end
update_watched_collections(mongodb, collection, sqlitedb) click to toggle source
# File lib/logstash/inputs/mongodb.rb, line 182
def update_watched_collections(mongodb, collection, sqlitedb)
  collections = []
  if isCollectionExactMatch == true
    collections = get_collection_names_with_exactmatch(mongodb, collection)
  else 
    collections = get_collection_names(mongodb, collection)
  end
  
  collection_data = {}
  collections.each do |my_collection|
    init_placeholder_table(sqlitedb)
    last_id = get_placeholder(sqlitedb, since_table, mongodb, my_collection)
    if !collection_data[my_collection]
      collection_data[my_collection] = { :name => my_collection, :last_id => last_id }
    end
  end
  return collection_data
end