module ManageIQ::Messaging::Stomp::BackgroundJob
Private Instance Methods
run_job(options)
click to toggle source
# File lib/manageiq/messaging/stomp/background_job.rb, line 40 def run_job(options) assert_options(options, [:class_name, :method_name]) instance_id = options[:instance_id] args = options[:args] miq_callback = options[:miq_callback] obj = Object.const_get(options[:class_name]) obj = obj.find(instance_id) if instance_id msg_timeout = 600 # TODO: configurable per message result = Timeout.timeout(msg_timeout) do obj.send(options[:method_name], *args) end run_job(miq_callback) if miq_callback result end
subscribe_background_job_impl(options)
click to toggle source
# File lib/manageiq/messaging/stomp/background_job.rb, line 7 def subscribe_background_job_impl(options) queue_name, headers = queue_for_subscribe(options) subscribe(queue_name, headers) do |msg| begin ack(msg) assert_options(msg.headers, ['class_name', 'message_type']) msg_options = decode_body(msg.headers, msg.body) msg_options = {} if msg_options.empty? logger.info("Processing background job: queue(#{queue_name}), job(#{msg_options.inspect}), headers(#{msg.headers})") result = run_job(msg_options.merge(:class_name => msg.headers['class_name'], :method_name => msg.headers['message_type'])) logger.info("Background job completed") correlation_ref = msg.headers['correlation_id'] send_response(options[:service], correlation_ref, result) if correlation_ref rescue Timeout::Error logger.warn("Background job timed out") if Object.const_defined?('ActiveRecord::Base') begin logger.info("Reconnecting to DB after timeout error during queue deliver") ActiveRecord::Base.connection.reconnect! rescue => err logger.error("Error encountered during <ActiveRecord::Base.connection.reconnect!> error:#{err.class.name}: #{err.message}") end end rescue => e logger.error("Background job error: #{e.message}") logger.error(e.backtrace.join("\n")) end end end