class ConeyIsland::Worker
Public Class Methods
abandon_and_shutdown()
click to toggle source
# File lib/coney_island/worker.rb, line 211 def self.abandon_and_shutdown self.log.info("Lost RabbitMQ connection, abandoning current jobs and shutting down") self.clear_running_jobs self.shutdown('TERM') end
amqp_parameters()
click to toggle source
# File lib/coney_island/worker.rb, line 89 def self.amqp_parameters return @amqp_parameters if @amqp_paramenters.is_a? Hash if ConeyIsland.single_amqp_connection? @amqp_parameters = ConeyIsland.amqp_parameters else @amqp_parameters end @amqp_parameters end
amqp_parameters=(params)
click to toggle source
# File lib/coney_island/worker.rb, line 85 def self.amqp_parameters=(params) @amqp_parameters = params end
channel()
click to toggle source
# File lib/coney_island/worker.rb, line 81 def self.channel @channel end
clear_running_jobs()
click to toggle source
# File lib/coney_island/worker.rb, line 32 def self.clear_running_jobs @running_jobs = [] end
config()
click to toggle source
# File lib/coney_island/worker.rb, line 8 def self.config @config end
config=(config_hash)
click to toggle source
# File lib/coney_island/worker.rb, line 4 def self.config=(config_hash) @config = config_hash.symbolize_keys! end
delayed_jobs()
click to toggle source
# File lib/coney_island/worker.rb, line 36 def self.delayed_jobs @delayed_jobs ||= [] end
exchange()
click to toggle source
# File lib/coney_island/worker.rb, line 77 def self.exchange @exchange end
handle_incoming_message(metadata,payload)
click to toggle source
# File lib/coney_island/worker.rb, line 191 def self.handle_incoming_message(metadata,payload) args = JSON.parse(payload) job = Job.new(metadata, args) job.handle_job unless job.initialization_errors rescue StandardError => e metadata.ack if !ConeyIsland.running_inline? ConeyIsland.poke_the_badger(e, {code_source: 'ConeyIsland', job_payload: args}) self.log.error("ConeyIsland code error, not application code:\n#{e.inspect}\nARGS: #{args}") end
handle_missing_children()
click to toggle source
# File lib/coney_island/worker.rb, line 201 def self.handle_missing_children @child_pids.each do |child_pid| begin Process.kill 0, child_pid rescue Errno::ESRCH => e @child_pids.push Process.spawn("bundle exec coney_island #{@ticket}") end end end
initialize_background()
click to toggle source
# File lib/coney_island/worker.rb, line 52 def self.initialize_background ENV['NEW_RELIC_AGENT_ENABLED'] = 'false' ENV['NEWRELIC_ENABLE'] = 'false' @ticket = ARGV[0] @ticket ||= 'default' @log_io = self.config[:log] self.log = Logger.new(@log_io) @instance_config = self.config[:carousels][@ticket.to_sym] @prefetch_count = @instance_config[:prefetch_count] if @instance_config @prefetch_count ||= 20 @worker_count = @instance_config[:worker_count] if @instance_config @worker_count ||= 1 @child_count = @worker_count - 1 reset_child_pids @full_instance_name = @ticket self.log.level = self.config[:log_level] self.log.info("config: #{self.config}") end
initialize_rabbit(connection)
click to toggle source
# File lib/coney_island/worker.rb, line 161 def self.initialize_rabbit(connection) self.log.info('initializing rabbit connection with channel and queue...') @channel = AMQP::Channel.new(connection) @channel.on_error do |ch, channel_close| self.log.error "Worker Handling a channel-level exception." self.log.error "AMQP class id : #{channel_close.class_id}" self.log.error "AMQP method id: #{channel_close.method_id}" self.log.error "Status code : #{channel_close.reply_code}" self.log.error "Error message : #{channel_close.reply_text}" end @exchange = @channel.topic('coney_island') #send a heartbeat every 15 seconds to avoid aggresive network configurations that close quiet connections heartbeat_exchange = self.channel.fanout('coney_island_heartbeat') EventMachine.add_periodic_timer(15) do heartbeat_exchange.publish({:instance_name => @ticket}) self.handle_missing_children end self.channel.prefetch @prefetch_count @queue = self.channel.queue(@full_instance_name, auto_delete: false, durable: true) @queue.bind(self.exchange, routing_key: 'carousels.' + @ticket + '.#') if ConeyIsland::Submitter.amqp_connection.respond_to?(:connected?) && !ConeyIsland::Submitter.amqp_connection.connected? ConeyIsland::Submitter.handle_connection end @queue.subscribe(:ack => true) do |metadata,payload| self.handle_incoming_message(metadata,payload) end self.tcp_connection_retries = 0 end
log()
click to toggle source
# File lib/coney_island/worker.rb, line 12 def self.log @log ||= Logger.new(File.open(File::NULL, "w")) end
log=(log_thing)
click to toggle source
# File lib/coney_island/worker.rb, line 16 def self.log=(log_thing) @log = log_thing end
reset_child_pids()
click to toggle source
# File lib/coney_island/worker.rb, line 48 def self.reset_child_pids @child_pids = [] end
running_jobs()
click to toggle source
# File lib/coney_island/worker.rb, line 28 def self.running_jobs @running_jobs ||= [] end
shutdown(signal)
click to toggle source
# File lib/coney_island/worker.rb, line 217 def self.shutdown(signal) @shutting_down = true @child_pids.each do |child_pid| self.log.info("killing child #{child_pid}") Process.kill(signal, child_pid) end @queue.unsubscribe rescue nil self.delayed_jobs.each do |delayed_job| delayed_job.requeue_delay end EventMachine.add_periodic_timer(1) do if self.running_jobs.any? self.log.info("Waiting for #{self.running_jobs.length} requests to finish") else self.log.info("Shutting down coney island #{@ticket}") EventMachine.stop end end end
start()
click to toggle source
# File lib/coney_island/worker.rb, line 99 def self.start @child_count.times do child_pid = Process.fork unless child_pid self.log.info("started child for ticket #{@ticket} with pid #{Process.pid}") break end @child_pids.push child_pid end defined?(ActiveRecord::Base) and ActiveRecord::Base.establish_connection ConeyIsland::Submitter.handle_connection begin EventMachine.run do Signal.trap('INT') do self.shutdown('INT') end Signal.trap('TERM') do self.shutdown('TERM') end AMQP.connect(self.amqp_parameters) do |connection| self.log.info("Worker Connected to AMQP broker. Running #{AMQP::VERSION}") connection.on_error do |conn, connection_close| self.log.error "Worker Handling a connection-level exception." self.log.error "AMQP class id : #{connection_close.class_id}" self.log.error "AMQP method id: #{connection_close.method_id}" self.log.error "Status code : #{connection_close.reply_code}" self.log.error "Error message : #{connection_close.reply_text}" end #Handle a lost connection to rabbitMQ connection.on_tcp_connection_loss do |connection, settings| self.log.warn("Lost rabbit connection, attempting to reconnect...") connection.reconnect(true, 1) self.initialize_rabbit(connection) end self.initialize_rabbit(connection) end end rescue AMQP::TCPConnectionFailed, AMQP::PossibleAuthenticationFailureError => e self.tcp_connection_retries ||= 0 self.tcp_connection_retries += 1 if self.tcp_connection_retries >= ConeyIsland.tcp_connection_retry_limit message = "Failed to connect to RabbitMQ #{ConeyIsland.tcp_connection_retry_limit} times, bailing out" self.log.error(message) ConeyIsland.poke_the_badger(e, { code_source: 'ConeyIsland::Worker.start', reason: message} ) self.abandon_and_shutdown else message = "Worker Failed to connecto to RabbitMQ Attempt ##{self.tcp_connection_retries} time(s), trying again in #{ConeyIsland.tcp_connection_retry_interval(self.tcp_connection_retries)} seconds..." self.log.error(message) sleep(ConeyIsland.tcp_connection_retry_interval(self.tcp_connection_retries)) retry end end end
tcp_connection_retries()
click to toggle source
# File lib/coney_island/worker.rb, line 24 def self.tcp_connection_retries @tcp_connection_retries end
tcp_connection_retries=(number)
click to toggle source
# File lib/coney_island/worker.rb, line 20 def self.tcp_connection_retries=(number) @tcp_connection_retries = number end
ticket()
click to toggle source
# File lib/coney_island/worker.rb, line 40 def self.ticket @ticket end
ticket=(some_ticket)
click to toggle source
# File lib/coney_island/worker.rb, line 44 def self.ticket=(some_ticket) @ticket = some_ticket end