class CouchdbToSql::Changes

Constants

COUCHDB_HEARTBEAT
INACTIVITY_TIMEOUT
RECONNECT_TIMEOUT

Attributes

handlers[R]
highest_sequence[RW]
schemas[R]
source[R]

Public Class Methods

new(opts = '', &block) click to toggle source

Start a new Changes instance by connecting to the provided CouchDB to see if the database exists.

# File lib/couchdb_to_sql/changes.rb, line 15
def initialize(opts = '', &block)
  raise 'Block required for changes!' unless block_given?

  @schemas  = {}
  @handlers = []
  @source   = CouchRest.database(opts)
  @http     = HTTPClient.new
  @http.debug_dev = STDOUT if ENV.key?('DEBUG')
  @skip_seqs = Set.new

  log_info 'Connected to CouchDB'

  @ember_pouch_mode = false
  @fail_on_unhandled_document = false
  @upsert_mode = false

  # Prepare the definitions
  @dsl_mode = true
  instance_eval(&block)
  @dsl_mode = false
end

Public Instance Methods

database(opts = nil) click to toggle source

@note Dual-purpose method, accepts configuration of database or returns a previous definition.

# File lib/couchdb_to_sql/changes.rb, line 79
def database(opts = nil)
  if opts
    @database ||= begin
      Sequel.connect(opts).tap { |conn|
        next unless ENV.key?('SEQUEL_LOG_LEVEL')

        conn.logger = LoggingLibrary::LoggerFactory.create(self.class.name).tap { |l|
          l.level = ENV['SEQUEL_LOG_LEVEL'].to_s.downcase.to_sym
        }
      }
    end
    find_or_create_sequence_number
  end
  @database
end
document(filter = {}, &block) click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 95
def document(filter = {}, &block)
  @handlers << DocumentHandler.new(self, filter, &block)
end
ember_pouch_mode() click to toggle source

Sets the `ember_pouch_mode` flag. In `ember-pouch` mode, all the data fields are expected to reside within a `data` node in the document. More information on `ember-pouch` can be found [here](github.com/nolanlawson/ember-pouch).

@note Dual-purpose method, accepts configuration of setting or returns a previous definition.

# File lib/couchdb_to_sql/changes.rb, line 44
def ember_pouch_mode
  if @dsl_mode
    @ember_pouch_mode ||= true
  else
    @ember_pouch_mode
  end
end
fail_on_unhandled_document() click to toggle source

Sets the “fail on unhandled document” flag, which will turn log errors into runtime exceptions if an unhandled document is encountered.

@note Dual-purpose method, accepts configuration of setting or returns a previous definition.

# File lib/couchdb_to_sql/changes.rb, line 69
def fail_on_unhandled_document
  if @dsl_mode
    @fail_on_unhandled_document ||= true
  else
    @fail_on_unhandled_document
  end
end
log_debug(message) click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 118
def log_debug(message)
  logger.debug "#{source.name}: #{message}"
end
log_error(message) click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 126
def log_error(message)
  logger.error "#{source.name}: #{message}"
end
log_info(message) click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 122
def log_info(message)
  logger.info "#{source.name}: #{message}"
end
schema(name) click to toggle source

END DSL

# File lib/couchdb_to_sql/changes.rb, line 107
def schema(name)
  @schemas[name.to_sym] ||= Schema.new(database, name)
end
skip_seqs_file(file_path) click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 99
def skip_seqs_file(file_path)
  file_contents = File.read(file_path)
  seqs = JSON.parse(file_contents)
  @skip_seqs |= Set.new(seqs)
end
start() click to toggle source

Start listening to the CouchDB changes feed. By this stage we should have a sequence id so we know where to start from and all the filters should have been prepared.

# File lib/couchdb_to_sql/changes.rb, line 114
def start
  perform_request
end
upsert_mode() click to toggle source

Sets the `upsert_mode` flag. When running in upsert mode, Sequel's insert_conflict mode is being used. More information about that can be found [here](sequel.jeremyevans.net/rdoc/files/doc/postgresql_rdoc.html#label-INSERT+ON+CONFLICT+Support)

@note Dual-purpose method, accepts configuration of setting or returns a previous definition.

# File lib/couchdb_to_sql/changes.rb, line 57
def upsert_mode
  if @dsl_mode
    @upsert_mode ||= true
  else
    @upsert_mode
  end
end

Protected Instance Methods

create_sequence_table() click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 279
def create_sequence_table
  database.create_table CouchdbToSql::COUCHDB_TO_SQL_SEQUENCES_TABLE do
    String :couchdb_database_name, primary_key: true
    String :highest_sequence, default: '0', null: false
    DateTime :created_at
    DateTime :updated_at
  end
end
ember_pouch_transform_document(doc) click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 237
def ember_pouch_transform_document(doc)
  if doc.key?('data')
    doc['id'] = doc['_id'].split('_2_', 2).last
    doc.merge(doc.delete('data'))
  else
    doc
  end
end
fetch_document_from(row) click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 227
def fetch_document_from(row)
  doc = row.fetch('doc')

  if ember_pouch_mode
    ember_pouch_transform_document(doc)
  else
    doc
  end
end
find_document_handlers(document) click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 246
def find_document_handlers(document)
  @handlers.select { |row| row.handles?(document) }
end
find_or_create_sequence_number() click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 250
def find_or_create_sequence_number
  unless database.table_exists?(CouchdbToSql::COUCHDB_TO_SQL_SEQUENCES_TABLE)
    create_sequence_table
    sequence_table.insert(couchdb_database_name: source.name, created_at: DateTime.now)
  end

  row = sequence_table.where(couchdb_database_name: source.name).first
  self.highest_sequence = (row ? row.fetch(:highest_sequence) : '0')
end
logger() click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 292
def logger
  CouchdbToSql.logger
end
perform_request() click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 132
def perform_request # rubocop:disable AbcSize
  raise 'Internal error: Highest_sequence is expected to be non-nil' unless highest_sequence

  log_info "listening to changes feed from sequence number: #{highest_sequence}"

  url = File.join(source.root.to_s, '_changes')
  uri = URI.parse(url)

  # Authenticate?
  @http.set_auth(source.root, uri.user, uri.password) if uri.user.present? && uri.password.present?

  # Make sure the request has the latest sequence
  query = {
    feed: 'continuous',
    heartbeat: COUCHDB_HEARTBEAT * 1000,
    include_docs: true,
    since: highest_sequence
  }

  num_rows = 0
  loop do
    begin
      # Perform the actual request for chunked content
      @http.get_content(url, query) do |chunk|
        rows = chunk.split("\n")
        rows.each { |row|
          parsed_row = JSON.parse(row)
          process_row(parsed_row)

          num_rows += 1
          log_info "Processed #{num_rows} rows" if (num_rows % 10_000) == 0
        }
      end
    rescue StandardError => e
      log_error "exception occurred: #{e.message}"
      log_error "connection ended, attempting to reconnect in #{RECONNECT_TIMEOUT}s..."
      sleep RECONNECT_TIMEOUT
    end
  end
rescue HTTPClient::TimeoutError, HTTPClient::BadResponseError => e
  log_error "connection failed: #{e.message}, attempting to reconnect in #{RECONNECT_TIMEOUT}s..."
  sleep RECONNECT_TIMEOUT
  retry
end
process_row(row) click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 177
def process_row(row)
  id = row['id']
  seq = row['seq']

  return if id =~ /^_design/
  return if @skip_seqs.include?(seq)

  if id
    # Wrap the whole request in a transaction
    database.transaction do
      doc = fetch_document_from(row)

      if row['deleted']
        log_info "received DELETE seq. #{seq} id: #{id}"
        document_handlers = find_document_handlers(doc)
        if document_handlers.empty?
          log_info "Found deletion without type-identifying field, (id: '#{id}'), removing " \
                    'data from SQL/Postgres.'
          log_info 'Trying all handlers...'
          handlers.each { |handler| handler.delete(doc) }
        else
          document_handlers.each { |handler| handler.mark_as_deleted(doc) }
        end
      else
        log_debug "received CHANGE seq. #{seq} id: #{id}"

        document_handlers = find_document_handlers(doc)
        if document_handlers.empty?
          message = 'No document handlers found for document. ' \
            "Document data: #{doc.inspect}, seq: #{seq}, source: #{@source.name}"
          raise InvalidDataError, message if fail_on_unhandled_document

          log_error message
        end

        document_handlers.each do |handler|
          # Delete all previous entries of doc, then re-create
          handler.delete(doc)
          handler.insert(doc)
        end
      end

      update_sequence_table(seq) # transaction
    end
  elsif row['last_seq']
    # Sometimes CouchDB will send an update to keep the connection alive
    log_info "received last seq: #{row['last_seq']}"
  end
end
sequence_table() click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 288
def sequence_table
  database[CouchdbToSql::COUCHDB_TO_SQL_SEQUENCES_TABLE]
end
update_sequence_table(new_highest_sequence) click to toggle source
# File lib/couchdb_to_sql/changes.rb, line 260
def update_sequence_table(new_highest_sequence)
  if upsert_mode
    data = {
      couchdb_database_name: source.name,
      highest_sequence: new_highest_sequence,
      updated_at: DateTime.now
    }
    sequence_table
      .insert_conflict(target: :couchdb_database_name, update: data)
      .insert(data.merge(created_at: data[:updated_at]))
  else
    sequence_table
      .where(couchdb_database_name: source.name)
      .update(highest_sequence: new_highest_sequence)
  end

  self.highest_sequence = new_highest_sequence
end