class ConeyIsland::Submitter
TODO: Refactor this to instantiate and use instance methods for ease of testing and thread safety.
Public Class Methods
amqp_connection()
click to toggle source
# File lib/coney_island/submitter.rb, line 146 def self.amqp_connection @connection end
amqp_parameters()
click to toggle source
# File lib/coney_island/submitter.rb, line 88 def self.amqp_parameters if ConeyIsland.single_amqp_connection? ConeyIsland.amqp_parameters else @amqp_parameters end end
amqp_parameters=(params)
click to toggle source
# File lib/coney_island/submitter.rb, line 84 def self.amqp_parameters=(params) @amqp_parameters = params end
channel()
click to toggle source
# File lib/coney_island/submitter.rb, line 68 def self.channel @channel end
connected?()
click to toggle source
# File lib/coney_island/submitter.rb, line 96 def self.connected? !!connection && connection.connected? end
connection()
click to toggle source
# File lib/coney_island/submitter.rb, line 60 def self.connection @connection end
connection=(conn)
click to toggle source
# File lib/coney_island/submitter.rb, line 56 def self.connection=(conn) @connection = conn end
create_channel()
click to toggle source
# File lib/coney_island/submitter.rb, line 72 def self.create_channel @channel = self.connection.create_channel end
delay_exchange()
click to toggle source
# File lib/coney_island/submitter.rb, line 80 def self.delay_exchange @delay_exchange end
exchange()
click to toggle source
# File lib/coney_island/submitter.rb, line 76 def self.exchange @exchange end
handle_connection()
click to toggle source
# File lib/coney_island/submitter.rb, line 100 def self.handle_connection Rails.logger.info("ConeyIsland::Submitter.handle_connection connecting...") self.connection = Bunny.new(self.amqp_parameters) self.start_connection rescue Bunny::TCPConnectionFailed, Bunny::PossibleAuthenticationFailureError => e self.tcp_connection_retries ||= 0 self.tcp_connection_retries += 1 if self.tcp_connection_retries >= ConeyIsland.tcp_connection_retry_limit message = "Submitter Failed to connect to RabbitMQ #{ConeyIsland.tcp_connection_retry_limit} times, bailing out" Rails.logger.error(message) ConeyIsland.poke_the_badger(e, { code_source: 'ConeyIsland::Submitter.handle_connection', reason: message} ) @connection = nil else message = "Failed to connecto to RabbitMQ Attempt ##{self.tcp_connection_retries} time(s), trying again in #{ConeyIsland.tcp_connection_retry_interval(self.tcp_connection_retries)} seconds..." Rails.logger.error(message) sleep(ConeyIsland.tcp_connection_retry_interval(self.tcp_connection_retries)) retry end rescue Bunny::ConnectionLevelException => e Rails.logger.error "Submitter Handling a connection-level exception." # Rails.logger.error "Bunny class id : #{e.connection_close.class_id}" # Rails.logger.error "Bunny method id: #{e.connection_close.method_id}" # Rails.logger.error "Status code : #{e.connection_close.reply_code}" # Rails.logger.error "Error message : #{e.connection_close.reply_text}" rescue Bunny::ChannelLevelException => e Rails.logger.error "Submitter Handling a channel-level exception." Rails.logger.error "Bunny class id : #{e.channel_close.class_id}" Rails.logger.error "Bunny method id: #{e.channel_close.method_id}" Rails.logger.error "Status code : #{e.channel_close.reply_code}" Rails.logger.error "Error message : #{e.channel_close.reply_text}" else self.initialize_rabbit self.tcp_connection_retries = 0 end
initialize_rabbit()
click to toggle source
# File lib/coney_island/submitter.rb, line 139 def self.initialize_rabbit self.create_channel @exchange = self.channel.topic('coney_island') @delay_exchange = self.channel.topic('coney_island_delay') @delay_queue = {} end
jobs_cache()
click to toggle source
# File lib/coney_island/submitter.rb, line 31 def self.jobs_cache @jobs_cache ||= JobsCache.new end
publish_job(args, job_id = nil)
click to toggle source
# File lib/coney_island/submitter.rb, line 150 def self.publish_job(args, job_id = nil) # Map arguments klass, method_name, job_args = *args # Job args is optional job_args ||= {} # Check arguments # Break if klass isn't a Class or a Module raise ConeyIsland::JobArgumentError.new "Expected #{klass} to be a Class or Module" unless [Class, Module].any? {|k| klass.is_a?(k)} # Break if method_name isn't a String or a Symbol raise ConeyIsland::JobArgumentError.new "Expected #{method_name} to be a String or a Symbol" unless [String,Symbol].any? {|k| method_name.is_a?(k)} # Set defaults job_args['klass'] = klass.name job_args['method_name'] = method_name job_args.stringify_keys! # Extract non job args delay = job_args.delete 'delay' work_queue = job_args.delete 'work_queue' # Set class defaults if they exist if klass.included_modules.include?(Performer) delay ||= klass.get_coney_settings[:delay] work_queue ||= klass.get_coney_settings[:work_queue] end # Set our own defaults if we still don't have any work_queue ||= ConeyIsland.default_settings[:work_queue] delay ||= ConeyIsland.default_settings[:delay] # Make sure we have a connection if we need one handle_connection if !running_inline? && !connected? if self.running_inline? # Just run this inline if we're not threaded ConeyIsland::Job.new(nil, job_args).handle_job elsif delay && delay.to_i > 0 # Is this delayed? # Publish to the delay exchange publish_to_delay_queue(job_id, job_args, work_queue, delay) else # Publish to the normal exchange publish_to_queue(self.exchange, job_id, job_args, work_queue) end true end
run_inline()
click to toggle source
# File lib/coney_island/submitter.rb, line 11 def self.run_inline @run_inline = true end
running_inline?()
click to toggle source
# File lib/coney_island/submitter.rb, line 19 def self.running_inline? !!@run_inline end
start_connection()
click to toggle source
# File lib/coney_island/submitter.rb, line 64 def self.start_connection @connection.start end
stop_running_inline()
click to toggle source
# File lib/coney_island/submitter.rb, line 15 def self.stop_running_inline @run_inline = false end
submit(*args)
click to toggle source
# File lib/coney_island/submitter.rb, line 35 def self.submit(*args) if caching_jobs? cache_job(*args) else submit!(args) end end
submit!(args, job_id = nil)
click to toggle source
# File lib/coney_island/submitter.rb, line 43 def self.submit!(args, job_id = nil) Rails.logger.info "Submitting job #{job_id}: #{args}" publish_job(args, job_id) rescue StandardError => e Rails.logger.error(e) ConeyIsland.poke_the_badger(e,{ code_source: "ConeyIsland::Submitter.submit!", message: "Error submitting job", job_args: args }) fail e if running_inline? end
tcp_connection_retries()
click to toggle source
# File lib/coney_island/submitter.rb, line 27 def self.tcp_connection_retries @tcp_connection_retries end
tcp_connection_retries=(number)
click to toggle source
# File lib/coney_island/submitter.rb, line 23 def self.tcp_connection_retries=(number) @tcp_connection_retries = number end
Protected Class Methods
publish_to_delay_queue(job_id, job_args, work_queue, delay)
click to toggle source
Publishes a job to a delayed queue exchange
# File lib/coney_island/submitter.rb, line 202 def self.publish_to_delay_queue(job_id, job_args, work_queue, delay) @delay_queue[work_queue] ||= {} # TODO: Should this be in a different little method, say, bind_delay? unless @delay_queue[work_queue][delay].present? @delay_queue[work_queue][delay] ||= self.channel.queue( work_queue + '_delayed_' + delay.to_s, auto_delete: false, durable: true, arguments: {'x-dead-letter-exchange' => 'coney_island', 'x-message-ttl' => delay * 1000}) @delay_queue[work_queue][delay].bind(self.delay_exchange, routing_key: 'carousels.' + work_queue + ".#{delay}") end publish_to_queue(self.delay_exchange, job_id, job_args, "#{work_queue}.#{delay}") end
publish_to_queue(exchange, job_id, job_args, queue)
click to toggle source
Publishes a job to a given exchange
# File lib/coney_island/submitter.rb, line 217 def self.publish_to_queue(exchange, job_id, job_args, queue) exchange.publish(job_args.to_json, {routing_key: "carousels.#{queue}"}) do RequestStore.store[:jobs].delete job_id if RequestStore.store[:jobs] && job_id.present? end end