class RocketJob::Worker

Worker

A worker runs on a single operating system thread Is usually started under a Rocket Job server process.

Attributes

current_filter[RW]
id[RW]
name[R]
server_name[R]

Public Class Methods

new(id: 0, server_name: "inline:0") click to toggle source
# File lib/rocket_job/worker.rb, line 19
def initialize(id: 0, server_name: "inline:0")
  @id             = id
  @server_name    = server_name
  @name           = "#{server_name}:#{id}"
  @re_check_start = Time.now
  @current_filter = Config.filter || {}
end

Public Instance Methods

add_to_current_filter(filter) click to toggle source

Add the supplied filter to the current filter.

# File lib/rocket_job/worker.rb, line 165
def add_to_current_filter(filter)
  filter.each_pair do |k, v|
    current_filter[k] =
      if (previous = current_filter[k])
        v.is_a?(Array) ? previous + v : v
      else
        v
      end
  end
  current_filter
end
alive?() click to toggle source
# File lib/rocket_job/worker.rb, line 27
def alive?
  true
end
backtrace() click to toggle source
# File lib/rocket_job/worker.rb, line 31
def backtrace
  Thread.current.backtrace
end
find_and_assign_job() click to toggle source

Finds the next job to work on in priority based order and assigns it to this worker.

Applies the current filter to exclude filtered jobs.

Returns nil if no jobs are available for processing.

# File lib/rocket_job/worker.rb, line 153
def find_and_assign_job
  SemanticLogger.silence(:info) do
    scheduled = RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now)
    working   = RocketJob::Job.queued.or(state: "running", sub_state: "processing")
    query     = RocketJob::Job.and(working, scheduled)
    query     = query.and(current_filter) unless current_filter.blank?
    update    = {"$set" => {"worker_name" => name, "state" => "running"}}
    query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true)
  end
end
join(*_args) click to toggle source
# File lib/rocket_job/worker.rb, line 35
def join(*_args)
  true
end
kill() click to toggle source
# File lib/rocket_job/worker.rb, line 39
def kill
  true
end
next_available_job() click to toggle source

Returns [RocketJob::Job] the next job available for processing. Returns [nil] if no job is available for processing.

Notes:

  • Destroys expired jobs

  • Runs job throttles and skips the job if it is throttled.

    • Adding that filter to the current filter to exclude from subsequent polling.

# File lib/rocket_job/worker.rb, line 106
def next_available_job
  until shutdown?
    job = find_and_assign_job
    return unless job

    if job.expired?
      job.fail_on_exception! do
        job.worker_name = name
        job.destroy
        logger.info("Destroyed expired job.")
      end
      next
    end

    # Batch Job that is already started?
    # Batch has its own throttles for slices.
    return job if job.running?

    # Should this job be throttled?
    next if job.fail_on_exception! { throttled_job?(job) }
    # Job failed during throttle execution?
    next if job.failed?

    # Start this job!
    job.fail_on_exception! { job.start!(name) }
    return job if job.running?
  end
end
random_wait_interval() click to toggle source

Returns [Float] a randomized poll interval in seconds up to the maximum configured poll interval.

# File lib/rocket_job/worker.rb, line 178
def random_wait_interval
  rand(Config.max_poll_seconds * 1000) / 1000
end
reset_filter_if_expired() click to toggle source

Resets the current job filter if the relevant time interval has passed

# File lib/rocket_job/worker.rb, line 90
def reset_filter_if_expired
  # Only clear out the current_filter after every `re_check_seconds`
  time = Time.now
  return unless (time - @re_check_start) > Config.re_check_seconds

  @re_check_start     = time
  self.current_filter = Config.filter || {}
end
run() click to toggle source

Process jobs until it shuts down

Params

worker_id [Integer]
  The number of this worker for logging purposes
# File lib/rocket_job/worker.rb, line 61
def run
  Thread.current.name = format("rocketjob %03i", id)
  logger.info "Started"

  until shutdown?
    sleep_seconds = Config.max_poll_seconds
    reset_filter_if_expired
    job = next_available_job

    # Returns true when work was completed, but no other work is available
    if job&.rocket_job_work(self, false)
      # Return the database connections for this thread back to the connection pool
      ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)

      # Stagger workers so that they don't all poll at the same time.
      sleep_seconds = random_wait_interval
    end

    wait_for_shutdown?(sleep_seconds)
  end

  logger.info "Stopping"
rescue Exception => e
  logger.fatal("Unhandled exception in job processing thread", e)
ensure
  ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
end
shutdown!() click to toggle source
# File lib/rocket_job/worker.rb, line 47
def shutdown!
  true
end
shutdown?() click to toggle source
# File lib/rocket_job/worker.rb, line 43
def shutdown?
  false
end
throttled_job?(job) click to toggle source

Whether the supplied job has been throttled and should be ignored.

# File lib/rocket_job/worker.rb, line 136
def throttled_job?(job)
  # Evaluate job throttles, if any.
  filter = job.rocket_job_throttles.matching_filter(job)
  return false unless filter

  add_to_current_filter(filter)
  # Restore retrieved job so that other workers can process it later
  job.set(worker_name: nil, state: :queued)
  true
end
wait_for_shutdown?(_timeout = nil) click to toggle source

Returns [true|false] whether the shutdown indicator was set

# File lib/rocket_job/worker.rb, line 52
def wait_for_shutdown?(_timeout = nil)
  false
end