class LogStash::Inputs::CouchDBChanges
This CouchDB input allows you to automatically stream events from the CouchDB guide.couchdb.org/draft/notifications.html[_changes] URI. Moreover, any “future” changes will automatically be streamed as well making it easy to synchronize your CouchDB data with any target destination
### Upsert and delete You can use event metadata to allow for document deletion. All non-delete operations are treated as upserts
### Starting at a Specific Sequence The CouchDB input stores the last sequence number value in location defined by `sequence_path`. You can use this fact to start or resume the stream at a particular sequence.
Constants
- FEED
Declare these constants here.
- INCLUDEDOCS
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/couchdb_changes.rb, line 97 def register require "logstash/util/buftok" if @sequence_path.nil? if ENV["HOME"].nil? @logger.error("No HOME environment variable set, I don't know where " \ "to keep track of the files I'm watching. Either set " \ "HOME in your environment, or set sequence_path in " \ "in your Logstash config.") raise ArgumentError end default_dir = ENV["HOME"] @sequence_path = File.join(default_dir, ".couchdb_seq") @logger.info("No sequence_path set, generating one...", :sequence_path => @sequence_path) end @sequencedb = SequenceDB::File.new(@sequence_path) @path = '/' + @db + '/_changes' @scheme = @secure ? 'https' : 'http' if !@initial_sequence.nil? @logger.info("initial_sequence is set, writing to filesystem ...", :initial_sequence => @initial_sequence, :sequence_path => @sequence_path) @sequencedb.write(@initial_sequence) @sequence = @initial_sequence else @logger.info("No initial_sequence set, reading from filesystem ...", :sequence_path => @sequence_path) @sequence = @sequencedb.read end end
run(queue)
click to toggle source
# File lib/logstash/inputs/couchdb_changes.rb, line 150 def run(queue) buffer = FileWatch::BufferedTokenizer.new @logger.info("Connecting to CouchDB _changes stream at:", :host => @host.to_s, :port => @port.to_s, :db => @db) uri = build_uri @logger.info("Using service uri :", :uri => uri) until stop? begin Net::HTTP.start(@host, @port, :use_ssl => (@secure == true), :ca_file => @ca_file) do |http| request = Net::HTTP::Get.new(uri.request_uri) request.basic_auth(@username, @password.value) if @username && @password http.request request do |response| raise ArgumentError, :message => "Server error!", :response_code => response.code if response.code >= "500" raise ArgumentError, :message => "Authentication error!", :response_code => response.code if response.code == "401" raise ArgumentError, :message => "Database not found!", :response_code => response.code if response.code == "404" raise ArgumentError, :message => "Request error!", :response_code => response.code if response.code >= "400" response.read_body do |chunk| buffer.extract(chunk).each do |changes| # Put a "stop" check here. If we stop here, anything we've read, but # not written, will be read again since the @sequence change won't # have been written to the file, ensuring that it will pick up where # it left off. break if stop? # If no changes come since the last heartbeat period, a blank line is # sent as a sort of keep-alive. We should ignore those. next if changes.chomp.empty? if event = build_event(changes) @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug? decorate(event) queue << event @sequence = event.get("[@metadata][seq]") @sequencedb.write(@sequence.to_s) end end end end end rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError, Errno::EHOSTUNREACH, Errno::ECONNREFUSED, Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError, SocketError => e @logger.error("Connection problem encountered: Retrying connection in " + @reconnect_delay.to_s + " seconds...", :error => e.to_s, :host => @host.to_s, :port => @port.to_s, :db => @db) retry if reconnect? rescue Errno::EBADF => e @logger.error("Unable to connect: Bad file descriptor: ", :error => e.to_s) retry if reconnect? rescue ArgumentError => e @logger.error("Unable to connect to database", :db => @db, :error => e.to_s) retry if reconnect? end end end
Private Instance Methods
build_event(changes)
click to toggle source
# File lib/logstash/inputs/couchdb_changes.rb, line 215 def build_event(changes) # In lieu of a codec, build the event here data = LogStash::Json.load(changes) return nil if data.has_key?("last_seq") if data['doc'].nil? logger.debug("doc is nil", :data => data) return nil end hash = Hash.new hash['@metadata'] = { '_id' => data['doc']['_id'] } if data['doc']['_deleted'] hash['@metadata']['action'] = 'delete' else hash['doc'] = data['doc'] hash['@metadata']['action'] = 'update' hash['doc'].delete('_id') unless @keep_id hash['doc_as_upsert'] = true hash['doc'].delete('_rev') unless @keep_revision end hash['@metadata']['seq'] = data['seq'] event = LogStash::Event.new(hash) @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug? event end
build_uri()
click to toggle source
# File lib/logstash/inputs/couchdb_changes.rb, line 202 def build_uri options = {:feed => FEED, :include_docs => INCLUDEDOCS, :since => @sequence} options = options.merge(@timeout ? {:timeout => @timeout} : {:heartbeat => @heartbeat}) URI::HTTP.build(:scheme => @scheme, :host => @host, :port => @port, :path => @path, :query => URI.encode_www_form(options)) end
reconnect?()
click to toggle source
# File lib/logstash/inputs/couchdb_changes.rb, line 209 def reconnect? Stud.stoppable_sleep(@reconnect_delay) if @always_reconnect @always_reconnect end