class SidekiqStrategies::PollingStrategy
Attributes
poll_interval[RW]
poll_max_tries[RW]
poll_tries_before_airbrake[RW]
Public Class Methods
new(acc_handler, bns_handler, logger, airbrake_notifier, helper)
click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 7 def initialize(acc_handler, bns_handler, logger, airbrake_notifier, helper) @acc_handler = acc_handler @bns_handler = bns_handler @helper = helper @poll_interval = 1 # seconds @poll_max_tries = 600 @poll_tries_before_airbrake = 180 @logger = logger @airbrake_notifier = airbrake_notifier end
Public Instance Methods
perform(sidekiq_jid)
click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 26 def perform(sidekiq_jid) @logger.info "processing transaction #{@acc_handler.transaction.id}" begin request_tube = @bns_handler.obtain_tube(@helper.request_tube_name) reply_tube = @bns_handler.obtain_tube(@helper.reply_tube_name) # There are previous data? beanstalkd_jid = @acc_handler.transaction.beanstalkd_jid if beanstalkd_jid != nil reply_job = reserve_existing_reply_job(reply_tube) request_job = @bns_handler.find_job(beanstalkd_jid) if reply_job != nil # do nothing: there was already a reply job for me else kick_existing_buried_job(beanstalkd_jid) end end # Save sidekiq job id @acc_handler.save_sidekiq_jid(sidekiq_jid) if ( ['reserved', 'awaiting_retry', 'waiting_at_worker'].include?(@acc_handler.account.state) || request_job!=nil || reply_job!=nil ) if reply_job == nil # There are available watchers? validate_available_watchers(request_tube) # Is the provider available to process requests? validate_provider_status end if ( request_job==nil && reply_job==nil ) @logger.info "putting job in #{@helper.request_tube_name} tube" @logger.info "expect response in #{@helper.reply_tube_name} tube" request_job_data = request_tube.put(Oj.dump(@helper.queue_message), pri: @acc_handler.priority, ttr: BeanstalkdHandler::TIME_TO_RUN) request_job = @bns_handler.find_job(request_job_data[:id]) @acc_handler.save_beanstalkd_jid(request_job_data[:id]) end if reply_job == nil reply_job = reserve_reply_job(reply_tube, request_tube, request_job) end # At this point all should be ok @logger.info "reply job obtained: #{reply_job}" if reply_job != nil process_job_response(reply_job, request_job) elsif reply_job==nil && @acc_handler.account.state!='error' mark_account_with_invalid_state end else mark_account_with_invalid_state end rescue Beaneater::NotConnected => exception @logger.info "beanstalkd service was unavailable" exception = XBP::Error::MessageQueueNotReachable.new("Beanstalk queue is not reachable") @acc_handler.queue_for_retry raise exception end end
retries_exhausted(msg)
click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 20 def retries_exhausted(msg) @logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}" @acc_handler.save_exception(XBP::Error::ApiInternalError.new( msg['error_class'] + ': ' + msg['error_message'] )) @acc_handler.error end
Private Instance Methods
kick_existing_buried_job(job_id)
click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 138 def kick_existing_buried_job(job_id) request_job = @bns_handler.find_job(job_id) if request_job != nil if request_job.stats.state == 'buried' request_job.kick elsif request_job.stats == 'ready' # Do nothing: soon some worker will process this job elsif request_job.stats == 'reserved' # Do nothing: some worker should be processing this job elsif request_job.stats == 'delayed' # Do nothing: this job suddenly will be back on 'ready' queue end else # Do nothing: if request_job doesn't exist it will be created end request_job end
mark_account_with_invalid_state()
click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 98 def mark_account_with_invalid_state @logger.info "account was in an invalid state: #{@acc_handler.account.state}" exception = XBP::Error::AccountInvalidState.new("The account was in an invalid state: #{@acc_handler.account.state}.") @acc_handler.save_exception(exception) @acc_handler.error end
process_job_response(reply_job, request_job)
click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 157 def process_job_response(reply_job, request_job) if @acc_handler.account.state == 'waiting_at_worker' @acc_handler.start_action end body = Oj.load(reply_job.body.to_s) if exception = body[:exception] if @helper.try_again?(exception) @acc_handler.queue_for_retry # Raise exception for retry if exception[:type] if exception[:type].kind_of?(String) raise Kernel.const_get(exception[:type]), exception[:message] else raise exception[:type], exception[:message] end else raise exception, exception[:message] end else @acc_handler.save_exception(exception) @acc_handler.error end else @acc_handler.save_success_response(body) @acc_handler.success end reply_job.delete ## request_job.delete end
reserve_existing_reply_job(reply_tube)
click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 124 def reserve_existing_reply_job(reply_tube) reply_job = nil if reply_job_data = reply_tube.peek(:ready) reply_job = reply_tube.reserve if ( ['reserved', 'awaiting_retry', 'waiting_at_worker'].include?(@acc_handler.account.state) ) @acc_handler.start_action end end reply_job end
reserve_reply_job(reply_tube, request_tube, request_job)
click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 191 def reserve_reply_job(reply_tube, request_tube, request_job) job = nil poll_count = 0 poll_done = false if request_tube.stats.current_waiting == 0 @acc_handler.queue_for_wait_at_worker else @acc_handler.start_action end while !poll_done begin job = reply_tube.reserve(@poll_interval) poll_done = true rescue Beaneater::TimedOutError => ex begin if request_job.stats.state == 'ready' # Do nothing: wait another second or less elsif request_job.stats.state == 'reserved' if @acc_handler.account.state == 'waiting_at_worker' @acc_handler.start_action else # Do nothing: wait another second or less end elsif request_job.stats.state == 'buried' # Do nothing: reply tube should contain the response end rescue Beaneater::NotFoundError => ex # Request_job has been deleted, it is ok! # Reply tube should be ready end end poll_count += 1 if poll_count >= @poll_max_tries poll_done = true elsif poll_count == @poll_tries_before_airbrake @airbrake_notifier.notify(:error_message => "Polling is taking more than #{@poll_tries_before_airbrake} seconds.") end end # polling if job == nil error_message = "Queue #{@helper.reply_tube_name} time out. Waited #{@poll_max_tries} seconds." exception = XBP::Error::MessageQueueTimeout.new(error_message) @logger.info error_message @acc_handler.save_exception(exception) @acc_handler.error @airbrake_notifier.notify(:error_message => error_message) end job end
validate_available_watchers(tube)
click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 105 def validate_available_watchers(tube) if tube.stats.current_watching == 0 exception = XBP::Error::WorkersUnavailable.new("No workers available for queue #{tube.name}.") @acc_handler.queue_for_retry raise exception end end
validate_provider_status()
click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 113 def validate_provider_status # TODO: status == obtain_status # TODO: if status != 'ok' # TODO: exception = XBP::Error::?? # TODO: @acc_handler.save_sidekiq_exception(exception) # TODO: @acc_handler.error # TODO: else # TODO: true # TODO: end end