class LogStash::Inputs::CouchDBChanges

This CouchDB input allows you to automatically stream events from the CouchDB guide.couchdb.org/draft/notifications.html[_changes] URI. Moreover, any “future” changes will automatically be streamed as well making it easy to synchronize your CouchDB data with any target destination

### Upsert and delete You can use event metadata to allow for document deletion. All non-delete operations are treated as upserts

### Starting at a Specific Sequence The CouchDB input stores the last sequence number value in location defined by `sequence_path`. You can use this fact to start or resume the stream at a particular sequence.

Constants

FEED

Declare these constants here.

INCLUDEDOCS

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/couchdb_changes.rb, line 97
def register
  require "logstash/util/buftok"
  if @sequence_path.nil?
    if ENV["HOME"].nil?
      @logger.error("No HOME environment variable set, I don't know where " \
                    "to keep track of the files I'm watching. Either set " \
                    "HOME in your environment, or set sequence_path in " \
                    "in your Logstash config.")
      raise ArgumentError
    end
    default_dir = ENV["HOME"]
    @sequence_path = File.join(default_dir, ".couchdb_seq")

    @logger.info("No sequence_path set, generating one...",
                 :sequence_path => @sequence_path)
  end

  @sequencedb   = SequenceDB::File.new(@sequence_path)
  @path         = '/' + @db + '/_changes'

  @scheme = @secure ? 'https' : 'http'

  if !@initial_sequence.nil?
    @logger.info("initial_sequence is set, writing to filesystem ...",
                 :initial_sequence => @initial_sequence, :sequence_path => @sequence_path)
    @sequencedb.write(@initial_sequence)
    @sequence = @initial_sequence
  else
    @logger.info("No initial_sequence set, reading from filesystem ...",
                 :sequence_path => @sequence_path)
    @sequence = @sequencedb.read
  end

end
run(queue) click to toggle source
# File lib/logstash/inputs/couchdb_changes.rb, line 150
def run(queue)
  buffer = FileWatch::BufferedTokenizer.new
  @logger.info("Connecting to CouchDB _changes stream at:", :host => @host.to_s, :port => @port.to_s, :db => @db)
  uri = build_uri
  @logger.info("Using service uri :", :uri => uri)
  until stop?
    begin
      Net::HTTP.start(@host, @port, :use_ssl => (@secure == true), :ca_file => @ca_file) do |http|

        request = Net::HTTP::Get.new(uri.request_uri)
        request.basic_auth(@username, @password.value) if @username && @password
        http.request request do |response|
          raise ArgumentError, :message => "Server error!", :response_code => response.code if response.code >= "500"
          raise ArgumentError, :message => "Authentication error!", :response_code => response.code if response.code == "401"
          raise ArgumentError, :message => "Database not found!", :response_code => response.code if response.code == "404"
          raise ArgumentError, :message => "Request error!", :response_code => response.code if response.code >= "400"
          response.read_body do |chunk|
            buffer.extract(chunk).each do |changes|
              # Put a "stop" check here. If we stop here, anything we've read, but
              # not written, will be read again since the @sequence change won't
              # have been written to the file, ensuring that it will pick up where
              # it left off.
              break if stop?
              # If no changes come since the last heartbeat period, a blank line is
              # sent as a sort of keep-alive.  We should ignore those.
              next if changes.chomp.empty?
              if event = build_event(changes)
                @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug?
                decorate(event)
                queue << event
                @sequence = event.get("[@metadata][seq]")
                @sequencedb.write(@sequence.to_s)
              end
            end
          end
        end
      end
    rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError, Errno::EHOSTUNREACH, Errno::ECONNREFUSED,
      Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError, SocketError => e
      @logger.error("Connection problem encountered: Retrying connection in " + @reconnect_delay.to_s + " seconds...", :error => e.to_s, :host => @host.to_s, :port => @port.to_s, :db => @db)
      retry if reconnect?
    rescue Errno::EBADF => e
      @logger.error("Unable to connect: Bad file descriptor: ", :error => e.to_s)
      retry if reconnect?
    rescue ArgumentError => e
      @logger.error("Unable to connect to database", :db => @db, :error => e.to_s)
      retry if reconnect?
    end
  end
end

Private Instance Methods

build_event(changes) click to toggle source
# File lib/logstash/inputs/couchdb_changes.rb, line 215
def build_event(changes)
  # In lieu of a codec, build the event here
  data = LogStash::Json.load(changes)
  return nil if data.has_key?("last_seq")
  if data['doc'].nil?
    logger.debug("doc is nil", :data => data)
    return nil
  end
  hash = Hash.new
  hash['@metadata'] = { '_id' => data['doc']['_id'] }
  if data['doc']['_deleted']
    hash['@metadata']['action'] = 'delete'
  else
    hash['doc'] = data['doc']
    hash['@metadata']['action'] = 'update'
    hash['doc'].delete('_id') unless @keep_id
    hash['doc_as_upsert'] = true
    hash['doc'].delete('_rev') unless @keep_revision
  end
  hash['@metadata']['seq'] = data['seq']
  event = LogStash::Event.new(hash)
  @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug?
  event
end
build_uri() click to toggle source
# File lib/logstash/inputs/couchdb_changes.rb, line 202
def build_uri
  options = {:feed => FEED, :include_docs => INCLUDEDOCS, :since => @sequence}
  options = options.merge(@timeout ? {:timeout => @timeout} : {:heartbeat => @heartbeat})
  URI::HTTP.build(:scheme => @scheme, :host => @host, :port => @port, :path => @path, :query => URI.encode_www_form(options))
end
reconnect?() click to toggle source
# File lib/logstash/inputs/couchdb_changes.rb, line 209
def reconnect?
  Stud.stoppable_sleep(@reconnect_delay) if @always_reconnect
  @always_reconnect
end