class Mongar::Replica

Attributes

columns[RW]
created_finder[RW]
db_time_selector[RW]
deleted_finder[RW]
destination[RW]
logger[RW]
mongodb_name[RW]
source[RW]
updated_finder[RW]

Public Class Methods

new(args = {}) click to toggle source
# File lib/mongar/replica.rb, line 8
def initialize(args = {})
  self.logger = args[:logger] || Logger.new(nil)
  self.source = args[:source]
  self.destination = args[:destination]
  self.mongodb_name = args[:mongodb_name] || :default
  self.columns = []
  
  self.destination.replica = self if self.destination
  
  self.deleted_finder = lambda do |last_replicated_at|
    find_every_with_deleted(:conditions => ["deleted_at > ?", last_replicated_at])
  end
  self.created_finder = Proc.new do |last_replicated_at|
    find(:all, :conditions => ["created_at > ? AND deleted_at IS NULL", last_replicated_at])
  end
  self.updated_finder = Proc.new do |last_replicated_at|
    find(:all, :conditions => ["updated_at > ? AND deleted_at IS NULL", last_replicated_at])
  end
end

Public Instance Methods

column(name, &block) click to toggle source
# File lib/mongar/replica.rb, line 99
def column(name, &block)
  new_column = Mongar::Column.new(:name => name)
  new_column.instance_eval(&block) if block_given?
  self.columns << new_column
  new_column
end
current_time_on_database_server() click to toggle source
# File lib/mongar/replica.rb, line 191
def current_time_on_database_server
  @db_time_selector.nil? ? default_time_selector(source) : source.instance_exec(&@db_time_selector)
end
default_time_selector(object) click to toggle source
# File lib/mongar/replica.rb, line 176
def default_time_selector(object)
  adapter = object.connection.class.to_s
  adapter = $1 if adapter =~ /::([^:]+)$/
  
  time = if adapter == 'MysqlAdapter'
    Time.parse(object.connection.execute("SELECT UTC_TIMESTAMP()").fetch_row.first + " UTC")
  elsif adapter == 'Mysql2Adapter'
    object.connection.execute("SELECT UTC_TIMESTAMP()").first.first
  elsif adapter == 'SQLServerAdapter'
    object.connection.select_one("SELECT getutcdate() AS date")['date']
  end
  
  time.is_a?(Time) ? time : nil
end
do_full_refresh?(last_replicated_time = nil) click to toggle source
# File lib/mongar/replica.rb, line 136
def do_full_refresh?(last_replicated_time = nil)
  last_replicated_time ||= destination.last_replicated_at
  
  if last_replicated_time.nil? || last_replicated_time == Time.parse("1/1/1902 00:00:00")
    true
  elsif @full_refresh.nil?
    false
  elsif @full_refresh.is_a?(Proc)
    source.instance_exec last_replicated_time, &@full_refresh
  elsif last_replicated_time.nil?
    true
  else
    (Time.now - last_replicated_time) > @full_refresh
  end
end
find(type, last_replicated_time) click to toggle source
# File lib/mongar/replica.rb, line 152
def find(type, last_replicated_time)
  logger.debug " * Find #{type} from last replicated time #{last_replicated_time}"
  finder_function = self.send("#{type}_finder".to_sym)
  return [] if finder_function.nil?
  # execute the finder proc on the source object with an argument of the last replicated date/time
  source.instance_exec(last_replicated_time, &finder_function) || []
end
full_refresh(condition = nil) click to toggle source
# File lib/mongar/replica.rb, line 110
def full_refresh(condition = nil)
  return @full_refresh if condition.nil?
  
  @full_refresh = if condition[:if]
    condition[:if]
  elsif condition[:every]
    condition[:every]
  else
    raise StandardError, 'You must specify either :if or :every as a condition for full refresh'
  end
end
locked?() click to toggle source
# File lib/mongar/replica.rb, line 195
def locked?
  last_replicated_at = destination.last_activity_at
  return false if last_replicated_at.nil?
  Time.now - last_replicated_at < 300
end
mongodb() click to toggle source
# File lib/mongar/replica.rb, line 131
def mongodb
  return nil unless mongodb_name
  @mongodb ||= Mongar::Mongo.databases[mongodb_name]
end
mongodb_name=(val) click to toggle source
# File lib/mongar/replica.rb, line 126
def mongodb_name=(val)
  @mongodb = nil
  @mongodb_name = val
end
primary_index() click to toggle source
# File lib/mongar/replica.rb, line 106
def primary_index
  columns.find { |c| c.primary_index? }
end
run() click to toggle source
# File lib/mongar/replica.rb, line 28
def run
  logger.info "Replicating #{source.to_s} to #{mongodb.name}.#{destination.name}"

  if locked?
    logger.info " * Skipping locked replica"
    return
  end
  
  time = current_time_on_database_server
  logger.debug " * Time on server #{time}"
  
  # Set the time back 1 second to make sure we don't miss any changes made mid-second
  time -= 1 unless time.nil?
  
  if do_full_refresh?
    logger.info " * Full refresh"
    
    destination.mark_all_items_pending_deletion!
    
    run_sync_for([:created_or_updated], Time.parse('1/1/1902 00:00:00'))
    
    destination.delete_all_items_pending_deletion!
  else
    last_replicated_at = destination.last_replicated_at
    
    logger.info " * Incremental updates since #{last_replicated_at}"
    
    run_sync_for([:deleted, :created_or_updated, :updated], last_replicated_at)
  end
  logger.debug " * Setting last_replicated_at to #{time}"
  destination.last_replicated_at = time
  destination.last_activity_at = nil
end
run_sync_for(types, last_replicated_at) click to toggle source
# File lib/mongar/replica.rb, line 62
def run_sync_for(types, last_replicated_at)
  logger.info "   * Syncing #{types.join(', ')} items"
  
  # find deleted
  find(:deleted, last_replicated_at).each do |deleted_item|
    destination.delete! source_object_to_primary_key_hash(deleted_item)
  end if types.include?(:deleted)
  
  # find created
  find(:created, last_replicated_at).each do |created_item|
    destination.create! source_object_to_hash(created_item)
  end if types.include?(:created)
  
  # find created & updated
  find(:created, last_replicated_at).each do |created_item|
    destination.create_or_update! source_object_to_primary_key_hash(created_item), source_object_to_hash(created_item)
  end if types.include?(:created_or_updated)
  
  # find updated
  find(:updated, last_replicated_at).each do |updated_item|
    destination.update! source_object_to_primary_key_hash(updated_item), source_object_to_hash(updated_item)
  end if types.include?(:updated)
end
source_object_to_hash(object) click to toggle source
# File lib/mongar/replica.rb, line 86
def source_object_to_hash(object)
  columns.inject({}) do |hash, column|
    name = column.name.to_sym
    hash[name] = column.transform_this(object.send(name))
    hash
  end
end
source_object_to_primary_key_hash(object) click to toggle source
# File lib/mongar/replica.rb, line 94
def source_object_to_primary_key_hash(object)
  column = primary_index
  { column.name => column.transform_this(object.send(column.name.to_sym)) }
end
use_mongodb(name) click to toggle source
# File lib/mongar/replica.rb, line 122
def use_mongodb(name)
  self.mongodb_name = name
end