class Octopus::Proxy
Attributes
Public Class Methods
# File lib/octopus/proxy.rb, line 19 def initialize(config = Octopus.config) self.proxy_config = Octopus::ProxyConfig.new(config) end
Public Instance Methods
# File lib/octopus/proxy.rb, line 114 def check_schema_migrations(shard) OctopusModel.using(shard).connection.table_exists?( ActiveRecord::Migrator.schema_migrations_table_name, ) || OctopusModel.using(shard).connection.initialize_schema_migrations_table end
# File lib/octopus/proxy.rb, line 107 def clean_connection_proxy self.current_shard = Octopus.master_shard self.current_model = nil self.current_group = nil self.block = nil end
# File lib/octopus/proxy.rb, line 157 def clear_active_connections! with_each_healthy_shard(&:release_connection) end
# File lib/octopus/proxy.rb, line 161 def clear_all_connections! with_each_healthy_shard(&:disconnect!) end
# File lib/octopus/proxy.rb, line 153 def clear_query_cache with_each_healthy_shard { |v| v.connected? && safe_connection(v).clear_query_cache } end
# File lib/octopus/proxy.rb, line 165 def connected? shards.any? { |_k, v| v.connected? } end
# File lib/octopus/proxy.rb, line 138 def connection_pool shards[current_shard] end
# File lib/octopus/proxy.rb, line 185 def current_model_replicated? replicated && (current_model.try(:replicated) || fully_replicated?) end
# File lib/octopus/proxy.rb, line 55 def delete(*args, &block) legacy_method_missing_logic('delete', *args, &block) end
# File lib/octopus/proxy.rb, line 148 def disable_query_cache! with_each_healthy_shard { |v| v.connected? && safe_connection(v).disable_query_cache! } end
# File lib/octopus/proxy.rb, line 143 def enable_query_cache! clear_query_cache with_each_healthy_shard { |v| v.connected? && safe_connection(v).enable_query_cache! } end
# File lib/octopus/proxy.rb, line 36 def execute(sql, name = nil) conn = select_connection clean_connection_proxy if should_clean_connection_proxy?('execute') conn.execute(sql, name) end
# File lib/octopus/proxy.rb, line 42 def insert(arel, name = nil, pk = nil, id_value = nil, sequence_name = nil, binds = []) conn = select_connection clean_connection_proxy if should_clean_connection_proxy?('insert') conn.insert(arel, name, pk, id_value, sequence_name, binds) end
# File lib/octopus/proxy.rb, line 130 def method_missing(method, *args, &block) legacy_method_missing_logic(method, *args, &block) end
# File lib/octopus/proxy.rb, line 134 def respond_to?(method, include_private = false) super || select_connection.respond_to?(method, include_private) end
# File lib/octopus/proxy.rb, line 83 def run_queries_on_shard(shard, &_block) keeping_connection_proxy(shard) do using_shard(shard) do yield end end end
Rails 3.1 sets automatic_reconnect to false when it removes connection pool. Octopus
can potentially retain a reference to a closed connection pool. Previously, that would work since the pool would just reconnect, but in Rails 3.1 the flag prevents this.
# File lib/octopus/proxy.rb, line 71 def safe_connection(connection_pool) connection_pool.automatic_reconnect ||= true if !connection_pool.connected? && shards[Octopus.master_shard].connection.query_cache_enabled connection_pool.connection.enable_query_cache! end connection_pool.connection end
# File lib/octopus/proxy.rb, line 59 def select_all(*args, &block) legacy_method_missing_logic('select_all', *args, &block) end
# File lib/octopus/proxy.rb, line 79 def select_connection safe_connection(shards[shard_name]) end
# File lib/octopus/proxy.rb, line 63 def select_value(*args, &block) legacy_method_missing_logic('select_value', *args, &block) end
# File lib/octopus/proxy.rb, line 103 def send_queries_to_all_shards(&block) send_queries_to_multiple_shards(shard_names.uniq { |shard_name| shards[shard_name] }, &block) end
# File lib/octopus/proxy.rb, line 97 def send_queries_to_group(group, &block) using_group(group) do send_queries_to_multiple_shards(shards_for_group(group), &block) end end
# File lib/octopus/proxy.rb, line 91 def send_queries_to_multiple_shards(shards, &block) shards.map do |shard| run_queries_on_shard(shard, &block) end end
# File lib/octopus/proxy.rb, line 173 def send_queries_to_shard_slave_group(method, *args, &block) send_queries_to_balancer(shards_slave_groups[current_shard][current_slave_group], method, *args, &block) end
# File lib/octopus/proxy.rb, line 181 def send_queries_to_slave_group(method, *args, &block) send_queries_to_balancer(slave_groups[current_slave_group], method, *args, &block) end
# File lib/octopus/proxy.rb, line 169 def should_send_queries_to_shard_slave_group?(method) should_use_slaves_for_method?(method) && shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present? end
# File lib/octopus/proxy.rb, line 177 def should_send_queries_to_slave_group?(method) should_use_slaves_for_method?(method) && slave_groups.try(:[], current_slave_group).present? end
# File lib/octopus/proxy.rb, line 120 def transaction(options = {}, &block) if !sharded && current_model_replicated? run_queries_on_shard(Octopus.master_shard) do select_connection.transaction(options, &block) end else select_connection.transaction(options, &block) end end
# File lib/octopus/proxy.rb, line 48 def update(arel, name = nil, binds = []) conn = select_connection # Call the legacy should_clean_connection_proxy? method here, emulating an insert. clean_connection_proxy if should_clean_connection_proxy?('insert') conn.update(arel, name, binds) end
Protected Instance Methods
Temporarily block cleaning connection proxy and run the block
@see Octopus::Proxy#should_clean_connection? @see Octopus::Proxy#clean_connection_proxy
# File lib/octopus/proxy.rb, line 305 def keeping_connection_proxy(shard, &_block) last_block = block begin self.block = shard yield ensure self.block = last_block || nil end end
@thiagopradi - This legacy method missing logic will be keep for a while for compatibility and will be removed when Octopus
1.0 will be released. We are planning to migrate to a much stable logic for the Proxy
that doesn't require method missing.
# File lib/octopus/proxy.rb, line 194 def legacy_method_missing_logic(method, *args, &block) if should_clean_connection_proxy?(method) conn = select_connection clean_connection_proxy conn.send(method, *args, &block) elsif should_send_queries_to_shard_slave_group?(method) send_queries_to_shard_slave_group(method, *args, &block) elsif should_send_queries_to_slave_group?(method) send_queries_to_slave_group(method, *args, &block) elsif should_send_queries_to_replicated_databases?(method) send_queries_to_selected_slave(method, *args, &block) else val = select_connection.send(method, *args, &block) if val.instance_of? ActiveRecord::Result val.current_shard = shard_name end val end end
Temporarily switch `current_shard` to the next slave in a slave group and send queries to it while preserving `current_shard`
# File lib/octopus/proxy.rb, line 285 def send_queries_to_balancer(balancer, method, *args, &block) send_queries_to_slave(balancer.next(current_load_balance_options), method, *args, &block) end
# File lib/octopus/proxy.rb, line 256 def send_queries_to_selected_slave(method, *args, &block) if current_model.replicated || fully_replicated? selected_slave = slaves_load_balancer.next current_load_balance_options else selected_slave = Octopus.master_shard end send_queries_to_slave(selected_slave, method, *args, &block) end
Temporarily switch `current_shard` to the specified slave and send queries to it while preserving `current_shard`
# File lib/octopus/proxy.rb, line 291 def send_queries_to_slave(slave, method, *args, &block) using_shard(slave) do val = select_connection.send(method, *args, &block) if val.instance_of? ActiveRecord::Result val.current_shard = slave end val end end
# File lib/octopus/proxy.rb, line 247 def should_clean_connection_proxy?(method) method.to_s =~ /insert|select|execute/ && !current_model_replicated? && (!block || block != current_shard) end
Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined
# File lib/octopus/proxy.rb, line 252 def should_send_queries_to_replicated_databases?(method) replicated && method.to_s =~ /select/ && !block && !slaves_grouped? end
We should use slaves if and only if its safe to do so.
We can safely use slaves when: (1) `replicated: true` is specified in `shards.yml` (2) The current model is `replicated()`, or `fully_replicated: true` is specified in `shards.yml` which means that
all the model is `replicated()`
(3) It's a SELECT query while ensuring that we revert `current_shard` from the selected slave to the (shard's) master not to make queries other than SELECT leak to the slave.
# File lib/octopus/proxy.rb, line 275 def should_use_slaves_for_method?(method) current_model_replicated? && method.to_s =~ /select/ end
# File lib/octopus/proxy.rb, line 279 def slaves_grouped? slave_groups.present? end
Temporarily switch `current_group` and run the block
# File lib/octopus/proxy.rb, line 335 def using_group(group, &_block) older_group = current_group begin self.current_group = group yield ensure self.current_group = older_group end end
Temporarily switch `current_shard` and run the block
# File lib/octopus/proxy.rb, line 317 def using_shard(shard, &_block) older_shard = current_shard older_slave_group = current_slave_group older_load_balance_options = current_load_balance_options begin unless current_model && !current_model.allowed_shard?(shard) self.current_shard = shard end yield ensure self.current_shard = older_shard self.current_slave_group = older_slave_group self.current_load_balance_options = older_load_balance_options end end
Ensure that a single failing slave doesn't take down the entire application
# File lib/octopus/proxy.rb, line 217 def with_each_healthy_shard shards.each do |shard_name, v| begin yield(v) rescue => e if Octopus.robust_environment? Octopus.logger.error "Error on shard #{shard_name}: #{e.message}" else raise end end end ar_pools = ActiveRecord::Base.connection_handler.connection_pool_list ar_pools.each do |pool| next if pool == shards[:master] # Already handled this begin yield(pool) rescue => e if Octopus.robust_environment? Octopus.logger.error "Error on pool (spec: #{pool.spec}): #{e.message}" else raise end end end end