class LogStash::Outputs::MongodbUpsertCustom

This output writes events to MongoDB.

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/mongodb_upsert_custom.rb, line 207
def close
  @closed.make_true
  @bulk_thread.wakeup
  @bulk_thread.join
end
receive(event) click to toggle source
# File lib/logstash/outputs/mongodb_upsert_custom.rb, line 108
def receive(event)
  begin
    # Our timestamp object now has a to_bson method, using it here
    # {}.merge(other) so we don't taint the event hash innards
    document = {}.merge(event.to_hash)
    if !@isodate
      timestamp = event.timestamp
      if timestamp
        # not using timestamp.to_bson
        document["@timestamp"] = timestamp.to_json
      else
        @logger.warn("Cannot set MongoDB document `@timestamp` field because it does not exist in the event", :event => event)
      end
    end

    if @date_keys
      keys = date_keys.to_s.split(",")
      document.each do |key, value|
        if keys.index key
          document[key] = LogStash::Timestamp.new(value)
        end
      end
    end

    if @update_keys
      filterkeys = update_keys.to_s.split(",")
      @filter_key1 = filterkeys[0]
      @filter_key2 = filterkeys[1]
    end

    if @generateId
      document["_id"] = BSON::ObjectId.new
    end

    collection = event.sprintf(@collection)
    if @bulk
      @@mutex.synchronize do
        if(!@documents[collection])
          @documents[collection] = []
        end
        @documents[collection].push(document)
        if(@documents[collection].length >= @bulk_size)
          @documents[collection].each do |docRecord|
            criteria = Hash.new
            criteria_key1 = @filter_key1
            criteria[criteria_key1] = docRecord[criteria_key1]
            criteria_key2 = @filter_key2
            criteria[criteria_key2] = docRecord[criteria_key2]
            if @is_checkpoint
              if docRecord['type'] == @@log_type['OUTPUT'] || docRecord['type'] == @@log_type['INPUT']
                if docRecord['time_stamp']
                  criteria['time_stamp'] = { '$lte': docRecord["time_stamp"] }
                end
                @db[collection].find(criteria).update_many({"$set": docRecord}, :upsert => :true)
              end
            else
              if docRecord['type'] == @@log_type['OUTPUT']
                @db[collection].find(criteria).update_one(docRecord, :upsert => :true, :new => :true)
              end
            end
          end
          @documents.delete(collection)
        end
      end
    else
      criteria = Hash.new
      criteria_key1 = @filter_key1
      criteria[criteria_key1] = document[criteria_key1]
      criteria_key2 = @filter_key2
      criteria[criteria_key2] = document[criteria_key2]
      if @is_checkpoint
        if document['type'] == @@log_type['OUTPUT'] || document['type'] == @@log_type['INPUT']
          if document['time_stamp']
            criteria['time_stamp'] = { '$lte': document["time_stamp"] }
          end
          @db[collection].find(criteria).update_many({"$set": document}, :upsert => :true)
        end
      else
        if document['type'] == @@log_type['OUTPUT']
          @db[collection].find(criteria).update_one(document, :upsert => :true, :new => :true)
        end
      end
    end
  rescue => e
    if e.message =~ /^E11000/
      # On a duplicate key error, skip the insert.
      # We could check if the duplicate key err is the _id key
      # and generate a new primary key.
      # If the duplicate key error is on another field, we have no way
      # to fix the issue.
      @logger.warn("Skipping insert because of a duplicate key error", :event => event, :exception => e)
    else
      @logger.warn("Failed to send event to MongoDB, retrying in #{@retry_delay.to_s} seconds", :event => event, :exception => e)
      sleep(@retry_delay)
      retry
    end
  end
end
register() click to toggle source
# File lib/logstash/outputs/mongodb_upsert_custom.rb, line 61
def register
  if @bulk_size > 1000
    raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'"
  end

  Mongo::Logger.logger = @logger
  # conn = Mongo::Client.new(@uri)
  # @db = conn.use(@database)
  @db = Mongo::Client.new(@uri)

  @closed = Concurrent::AtomicBoolean.new(false)
  @documents = {}
  @bulk_thread = Thread.new(@bulk_interval) do |bulk_interval|
    while @closed.false? do
      sleep(bulk_interval)

      @@mutex.synchronize do
        @documents.each do |collection, values|
          if values.length > 0
            # if !@is_checkpoint
              values.each do |value|
                criteria = Hash.new
                criteria_key1 = @filter_key1
                criteria[criteria_key1] = value[criteria_key1]
                criteria_key2 = @filter_key2
                criteria[criteria_key2] = value[criteria_key2]
                if @is_checkpoint
                  if value['type'] == @@log_type['OUTPUT'] || value['type'] == @@log_type['INPUT']
                    if value['time_stamp']
                      criteria['time_stamp'] = { '$lte': value["time_stamp"] }
                    end
                    @db[collection].find(criteria).update_many({"$set": value}, :upsert => :true)
                  end
                else
                  if value['type'] == @@log_type['OUTPUT']
                    @db[collection].find(criteria).update_one(value, :upsert => :true, :new => :true)
                  end
                end
              end
            @documents.delete(collection)
          end
        end
      end
    end
  end
end