class SidekiqStrategies::PollingStrategy

Attributes

poll_interval[RW]
poll_max_tries[RW]
poll_tries_before_airbrake[RW]

Public Class Methods

new(acc_handler, bns_handler, logger, airbrake_notifier, helper) click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 7
def initialize(acc_handler, bns_handler, logger, airbrake_notifier, helper)
  @acc_handler = acc_handler
  @bns_handler = bns_handler
  @helper = helper

  @poll_interval = 1 # seconds
  @poll_max_tries = 600
  @poll_tries_before_airbrake = 180

  @logger = logger
  @airbrake_notifier = airbrake_notifier
end

Public Instance Methods

perform(sidekiq_jid) click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 26
def perform(sidekiq_jid)

  @logger.info "processing transaction #{@acc_handler.transaction.id}"

  begin
    request_tube = @bns_handler.obtain_tube(@helper.request_tube_name)
    reply_tube = @bns_handler.obtain_tube(@helper.reply_tube_name)

    # There are previous data?
    beanstalkd_jid = @acc_handler.transaction.beanstalkd_jid
    if beanstalkd_jid != nil
      reply_job = reserve_existing_reply_job(reply_tube)
      request_job = @bns_handler.find_job(beanstalkd_jid)

      if reply_job != nil
        # do nothing: there was already a reply job for me
      else
        kick_existing_buried_job(beanstalkd_jid)
      end
    end

    # Save sidekiq job id
    @acc_handler.save_sidekiq_jid(sidekiq_jid)

    if ( ['reserved', 'awaiting_retry', 'waiting_at_worker'].include?(@acc_handler.account.state) || request_job!=nil || reply_job!=nil )
    
      if reply_job == nil

        # There are available watchers?
        validate_available_watchers(request_tube)

        # Is the provider available to process requests?
        validate_provider_status

      end

      if ( request_job==nil && reply_job==nil )
        @logger.info "putting job in #{@helper.request_tube_name} tube"
        @logger.info "expect response in #{@helper.reply_tube_name} tube"
        request_job_data = request_tube.put(Oj.dump(@helper.queue_message), pri: @acc_handler.priority, ttr: BeanstalkdHandler::TIME_TO_RUN)
        request_job = @bns_handler.find_job(request_job_data[:id])
        @acc_handler.save_beanstalkd_jid(request_job_data[:id])
      end

      if reply_job == nil
        reply_job = reserve_reply_job(reply_tube, request_tube, request_job)
      end

      # At this point all should be ok
      @logger.info "reply job obtained: #{reply_job}"

      if reply_job != nil
        process_job_response(reply_job, request_job)
      elsif reply_job==nil && @acc_handler.account.state!='error'
        mark_account_with_invalid_state
      end

    else
      mark_account_with_invalid_state
    end

  rescue Beaneater::NotConnected => exception
    @logger.info "beanstalkd service was unavailable"
    exception = XBP::Error::MessageQueueNotReachable.new("Beanstalk queue is not reachable")
    @acc_handler.queue_for_retry
    raise exception
  end

end
retries_exhausted(msg) click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 20
def retries_exhausted(msg)
  @logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}"
  @acc_handler.save_exception(XBP::Error::ApiInternalError.new( msg['error_class'] + ': ' + msg['error_message'] ))
  @acc_handler.error
end

Private Instance Methods

kick_existing_buried_job(job_id) click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 138
def kick_existing_buried_job(job_id)
  request_job = @bns_handler.find_job(job_id)
  if request_job != nil
    if request_job.stats.state == 'buried'
      request_job.kick
    elsif request_job.stats == 'ready'
      # Do nothing: soon some worker will process this job
    elsif request_job.stats == 'reserved'
      # Do nothing: some worker should be processing this job
    elsif request_job.stats == 'delayed'
      # Do nothing: this job suddenly will be back on 'ready' queue
    end
  else
    # Do nothing: if request_job doesn't exist it will be created
  end

  request_job
end
mark_account_with_invalid_state() click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 98
def mark_account_with_invalid_state
  @logger.info "account was in an invalid state: #{@acc_handler.account.state}"
  exception = XBP::Error::AccountInvalidState.new("The account was in an invalid state: #{@acc_handler.account.state}.")
  @acc_handler.save_exception(exception)
  @acc_handler.error
end
process_job_response(reply_job, request_job) click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 157
def process_job_response(reply_job, request_job)
  if @acc_handler.account.state == 'waiting_at_worker'
    @acc_handler.start_action
  end

  body = Oj.load(reply_job.body.to_s)

  if exception = body[:exception]
    if @helper.try_again?(exception)
      @acc_handler.queue_for_retry

      # Raise exception for retry
      if exception[:type]
        if exception[:type].kind_of?(String)
          raise Kernel.const_get(exception[:type]), exception[:message]
        else
          raise exception[:type], exception[:message]
        end
      else
        raise exception, exception[:message]
      end
    else
      @acc_handler.save_exception(exception)
      @acc_handler.error
    end
  else
    @acc_handler.save_success_response(body)
    @acc_handler.success
  end

  reply_job.delete
  ## request_job.delete
end
reserve_existing_reply_job(reply_tube) click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 124
def reserve_existing_reply_job(reply_tube)
  reply_job = nil

  if reply_job_data = reply_tube.peek(:ready)
    reply_job = reply_tube.reserve

    if ( ['reserved', 'awaiting_retry', 'waiting_at_worker'].include?(@acc_handler.account.state) )
      @acc_handler.start_action
    end
  end

  reply_job
end
reserve_reply_job(reply_tube, request_tube, request_job) click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 191
def reserve_reply_job(reply_tube, request_tube, request_job)
  job = nil
  poll_count = 0
  poll_done = false

  if request_tube.stats.current_waiting == 0
    @acc_handler.queue_for_wait_at_worker
  else
    @acc_handler.start_action
  end

  while !poll_done

    begin
      job = reply_tube.reserve(@poll_interval)
      poll_done = true
    rescue Beaneater::TimedOutError => ex

      begin
        if request_job.stats.state == 'ready'
          # Do nothing: wait another second or less
        elsif request_job.stats.state == 'reserved'
          if @acc_handler.account.state == 'waiting_at_worker'
            @acc_handler.start_action
          else
            # Do nothing: wait another second or less
          end
        elsif request_job.stats.state == 'buried'
          # Do nothing: reply tube should contain the response
        end
      rescue Beaneater::NotFoundError => ex
        # Request_job has been deleted, it is ok!
        # Reply tube should be ready
      end

    end

    poll_count += 1
    if poll_count >= @poll_max_tries
      poll_done = true
    elsif poll_count == @poll_tries_before_airbrake
      @airbrake_notifier.notify(:error_message => "Polling is taking more than #{@poll_tries_before_airbrake} seconds.")
    end

  end # polling

  if job == nil
    error_message = "Queue #{@helper.reply_tube_name} time out. Waited #{@poll_max_tries} seconds."
    exception = XBP::Error::MessageQueueTimeout.new(error_message)
    
    @logger.info error_message
    @acc_handler.save_exception(exception)
    @acc_handler.error
    @airbrake_notifier.notify(:error_message => error_message)
  end

  job
end
validate_available_watchers(tube) click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 105
def validate_available_watchers(tube)
  if tube.stats.current_watching == 0
    exception = XBP::Error::WorkersUnavailable.new("No workers available for queue #{tube.name}.")
    @acc_handler.queue_for_retry
    raise exception
  end
end
validate_provider_status() click to toggle source
# File lib/sidekiq_strategies/polling_strategy.rb, line 113
def validate_provider_status
  # TODO: status == obtain_status
  # TODO: if status != 'ok'
  # TODO:   exception = XBP::Error::??
  # TODO:   @acc_handler.save_sidekiq_exception(exception)
  # TODO:   @acc_handler.error
  # TODO: else
  # TODO:   true
  # TODO: end
end