# File lib/mongar/replica.rb, line 106 def primary_index columns.find { |c| c.primary_index? } end
class Mongar::Replica
Attributes
columns[RW]
created_finder[RW]
db_time_selector[RW]
deleted_finder[RW]
destination[RW]
logger[RW]
mongodb_name[RW]
source[RW]
updated_finder[RW]
Public Class Methods
new(args = {})
click to toggle source
# File lib/mongar/replica.rb, line 8 def initialize(args = {}) self.logger = args[:logger] || Logger.new(nil) self.source = args[:source] self.destination = args[:destination] self.mongodb_name = args[:mongodb_name] || :default self.columns = [] self.destination.replica = self if self.destination self.deleted_finder = lambda do |last_replicated_at| find_every_with_deleted(:conditions => ["deleted_at > ?", last_replicated_at]) end self.created_finder = Proc.new do |last_replicated_at| find(:all, :conditions => ["created_at > ? AND deleted_at IS NULL", last_replicated_at]) end self.updated_finder = Proc.new do |last_replicated_at| find(:all, :conditions => ["updated_at > ? AND deleted_at IS NULL", last_replicated_at]) end end
Public Instance Methods
column(name, &block)
click to toggle source
# File lib/mongar/replica.rb, line 99 def column(name, &block) new_column = Mongar::Column.new(:name => name) new_column.instance_eval(&block) if block_given? self.columns << new_column new_column end
current_time_on_database_server()
click to toggle source
# File lib/mongar/replica.rb, line 191 def current_time_on_database_server @db_time_selector.nil? ? default_time_selector(source) : source.instance_exec(&@db_time_selector) end
default_time_selector(object)
click to toggle source
# File lib/mongar/replica.rb, line 176 def default_time_selector(object) adapter = object.connection.class.to_s adapter = $1 if adapter =~ /::([^:]+)$/ time = if adapter == 'MysqlAdapter' Time.parse(object.connection.execute("SELECT UTC_TIMESTAMP()").fetch_row.first + " UTC") elsif adapter == 'Mysql2Adapter' object.connection.execute("SELECT UTC_TIMESTAMP()").first.first elsif adapter == 'SQLServerAdapter' object.connection.select_one("SELECT getutcdate() AS date")['date'] end time.is_a?(Time) ? time : nil end
do_full_refresh?(last_replicated_time = nil)
click to toggle source
# File lib/mongar/replica.rb, line 136 def do_full_refresh?(last_replicated_time = nil) last_replicated_time ||= destination.last_replicated_at if last_replicated_time.nil? || last_replicated_time == Time.parse("1/1/1902 00:00:00") true elsif @full_refresh.nil? false elsif @full_refresh.is_a?(Proc) source.instance_exec last_replicated_time, &@full_refresh elsif last_replicated_time.nil? true else (Time.now - last_replicated_time) > @full_refresh end end
find(type, last_replicated_time)
click to toggle source
# File lib/mongar/replica.rb, line 152 def find(type, last_replicated_time) logger.debug " * Find #{type} from last replicated time #{last_replicated_time}" finder_function = self.send("#{type}_finder".to_sym) return [] if finder_function.nil? # execute the finder proc on the source object with an argument of the last replicated date/time source.instance_exec(last_replicated_time, &finder_function) || [] end
full_refresh(condition = nil)
click to toggle source
# File lib/mongar/replica.rb, line 110 def full_refresh(condition = nil) return @full_refresh if condition.nil? @full_refresh = if condition[:if] condition[:if] elsif condition[:every] condition[:every] else raise StandardError, 'You must specify either :if or :every as a condition for full refresh' end end
locked?()
click to toggle source
# File lib/mongar/replica.rb, line 195 def locked? last_replicated_at = destination.last_activity_at return false if last_replicated_at.nil? Time.now - last_replicated_at < 300 end
mongodb()
click to toggle source
# File lib/mongar/replica.rb, line 131 def mongodb return nil unless mongodb_name @mongodb ||= Mongar::Mongo.databases[mongodb_name] end
mongodb_name=(val)
click to toggle source
# File lib/mongar/replica.rb, line 126 def mongodb_name=(val) @mongodb = nil @mongodb_name = val end
primary_index()
click to toggle source
run()
click to toggle source
# File lib/mongar/replica.rb, line 28 def run logger.info "Replicating #{source.to_s} to #{mongodb.name}.#{destination.name}" if locked? logger.info " * Skipping locked replica" return end time = current_time_on_database_server logger.debug " * Time on server #{time}" # Set the time back 1 second to make sure we don't miss any changes made mid-second time -= 1 unless time.nil? if do_full_refresh? logger.info " * Full refresh" destination.mark_all_items_pending_deletion! run_sync_for([:created_or_updated], Time.parse('1/1/1902 00:00:00')) destination.delete_all_items_pending_deletion! else last_replicated_at = destination.last_replicated_at logger.info " * Incremental updates since #{last_replicated_at}" run_sync_for([:deleted, :created_or_updated, :updated], last_replicated_at) end logger.debug " * Setting last_replicated_at to #{time}" destination.last_replicated_at = time destination.last_activity_at = nil end
run_sync_for(types, last_replicated_at)
click to toggle source
# File lib/mongar/replica.rb, line 62 def run_sync_for(types, last_replicated_at) logger.info " * Syncing #{types.join(', ')} items" # find deleted find(:deleted, last_replicated_at).each do |deleted_item| destination.delete! source_object_to_primary_key_hash(deleted_item) end if types.include?(:deleted) # find created find(:created, last_replicated_at).each do |created_item| destination.create! source_object_to_hash(created_item) end if types.include?(:created) # find created & updated find(:created, last_replicated_at).each do |created_item| destination.create_or_update! source_object_to_primary_key_hash(created_item), source_object_to_hash(created_item) end if types.include?(:created_or_updated) # find updated find(:updated, last_replicated_at).each do |updated_item| destination.update! source_object_to_primary_key_hash(updated_item), source_object_to_hash(updated_item) end if types.include?(:updated) end
source_object_to_hash(object)
click to toggle source
# File lib/mongar/replica.rb, line 86 def source_object_to_hash(object) columns.inject({}) do |hash, column| name = column.name.to_sym hash[name] = column.transform_this(object.send(name)) hash end end
source_object_to_primary_key_hash(object)
click to toggle source
# File lib/mongar/replica.rb, line 94 def source_object_to_primary_key_hash(object) column = primary_index { column.name => column.transform_this(object.send(column.name.to_sym)) } end
use_mongodb(name)
click to toggle source
# File lib/mongar/replica.rb, line 122 def use_mongodb(name) self.mongodb_name = name end