class Beaneater::Jobs

Represents collection of job-related commands.

Constants

MAX_RETRIES

Number of retries to process a job.

RELEASE_DELAY

Delay in seconds before to make job ready again.

RESERVE_TIMEOUT

Number of seconds to wait for a job before checking a different server.

Attributes

client[R]

@!attribute processors

@return [Array<Proc>] returns Collection of proc to handle beanstalkd jobs

@!attribute client

@return [Beaneater] returns the client instance

@!attribute current_job

@return [Beaneater] returns the currently processing job in the process loop
current_job[R]

@!attribute processors

@return [Array<Proc>] returns Collection of proc to handle beanstalkd jobs

@!attribute client

@return [Beaneater] returns the client instance

@!attribute current_job

@return [Beaneater] returns the currently processing job in the process loop
processors[R]

@!attribute processors

@return [Array<Proc>] returns Collection of proc to handle beanstalkd jobs

@!attribute client

@return [Beaneater] returns the client instance

@!attribute current_job

@return [Beaneater] returns the currently processing job in the process loop

Public Class Methods

new(client) click to toggle source

Creates new jobs instance.

@param [Beaneater] client The beaneater client instance. @example

Beaneater::Jobs.new(@client)
# File lib/beaneater/job/collection.rb, line 32
def initialize(client)
  @client = client
end

Public Instance Methods

[](id)
Alias for: find
find(id) click to toggle source

Peek (or find) job by id from beanstalkd.

@param [Integer] id Job id to find @return [Beaneater::Job] Job matching given id @example

@beaneater.jobs[123] # => <Beaneater::Job>
@beaneater.jobs.find(123) # => <Beaneater::Job>
@beaneater.jobs.peek(123) # => <Beaneater::Job>

@api public

# File lib/beaneater/job/collection.rb, line 53
def find(id)
  res = transmit("peek #{id}")
  Job.new(client, res)
rescue Beaneater::NotFoundError
  nil
end
Also aliased as: peek, []
peek(id)
Alias for: find
process!(options={}) click to toggle source

Watch, reserve, process and delete or bury or release jobs.

@param [Hash{String => Integer}] options Settings for processing @option options [Integer] release_delay Delay in seconds before to make job ready again @option options [Integer] reserve_timeout Number of seconds to wait for a job before checking a different server

@api public

# File lib/beaneater/job/collection.rb, line 106
def process!(options={})
  release_delay = options.delete(:release_delay) || RELEASE_DELAY
  reserve_timeout = options.delete(:reserve_timeout) || RESERVE_TIMEOUT
  client.tubes.watch!(*processors.keys)
  while !stop? do
    begin
      @current_job = client.tubes.reserve(reserve_timeout)
      processor = processors[@current_job.tube]
      begin
        processor[:block].call(@current_job)
        @current_job.delete
      rescue *processor[:retry_on]
        if @current_job.stats.releases < processor[:max_retries]
          @current_job.release(:delay => release_delay)
        end
      end
    rescue AbortProcessingError
      break
    rescue Beaneater::JobNotReserved, Beaneater::NotFoundError, Beaneater::TimedOutError
      retry
    rescue StandardError # handles unspecified errors
      @current_job.bury if @current_job
    ensure # bury if still reserved
      @current_job.bury if @current_job && @current_job.exists? && @current_job.reserved?
      @current_job = nil
    end
  end
end
register(tube_name, options={}, &block) click to toggle source

Register a processor to handle beanstalkd job on particular tube.

@param [String] tube_name Tube name @param [Hash{String=>RuntimeError}] options settings for processor @param [Proc] block Process beanstalkd job @option options [Integer] max_retries Number of retries to process a job @option options [Array<RuntimeError>] retry_on Collection of errors to rescue and re-run processor

@example

@beanstalk.jobs.register('some-tube', :retry_on => [SomeError]) do |job|
  do_something(job)
end

@beanstalk.jobs.register('other-tube') do |job|
  do_something_else(job)
end

@api public

# File lib/beaneater/job/collection.rb, line 80
def register(tube_name, options={}, &block)
  @processors ||= {}
  max_retries = options[:max_retries] || MAX_RETRIES
  retry_on = Array(options[:retry_on])
  @processors[tube_name.to_s] = { :block => block, :retry_on => retry_on, :max_retries => max_retries }
end
stop!() click to toggle source

Sets flag to indicate that process loop should stop after current job

# File lib/beaneater/job/collection.rb, line 88
def stop!
  @stop = true
end
stop?() click to toggle source

Returns whether the process loop should stop

@return [Boolean] if true the loop should stop after current processing

# File lib/beaneater/job/collection.rb, line 95
def stop?
  !!@stop
end
transmit(command, options={}) click to toggle source

Delegates transmit to the connection object.

@see Beaneater::Connection#transmit

# File lib/beaneater/job/collection.rb, line 39
def transmit(command, options={})
  client.connection.transmit(command, **options)
end