class Beaneater::Tube

Beanstalk tube which contains jobs which can be inserted, reserved, et al.

Attributes

client[R]

@!attribute name

@return [String] name of the tube

@!attribute client

@return [Beaneater] returns the client instance
name[R]

@!attribute name

@return [String] name of the tube

@!attribute client

@return [Beaneater] returns the client instance

Public Class Methods

new(client, name) click to toggle source

Fetches the specified tube.

@param [Beaneater] client The beaneater client instance. @param [String] name The name for this tube. @example

Beaneater::Tube.new(@client, 'tube-name')
# File lib/beaneater/tube/record.rb, line 18
def initialize(client, name)
  @client = client
  @name = name.to_s
  @mutex = Mutex.new
end

Public Instance Methods

clear() click to toggle source

Clears all unreserved jobs in all states from the tube

@example

@tube.clear
# File lib/beaneater/tube/record.rb, line 132
def clear
  client.tubes.watch!(self.name)
  %w(delayed buried ready).each do |state|
    while job = self.peek(state.to_sym)
      begin
        job.delete
      rescue Beaneater::UnexpectedResponse, Beaneater::NotFoundError
        # swallow any issues
      end
    end
  end
  client.tubes.ignore(name)
rescue Beaneater::NotIgnoredError
  # swallow any issues
end
inspect()
Alias for: to_s
kick(bounds=1) click to toggle source

Kick specified number of jobs from buried to ready state.

@param [Integer] bounds The number of jobs to kick. @return [Hash{String => String, Number}] Beanstalkd command response @example

@tube.kick(5)

@api public

# File lib/beaneater/tube/record.rb, line 99
def kick(bounds=1)
  safe_use { transmit("kick #{bounds}") }
end
pause(delay) click to toggle source

Pause the execution of this tube for specified `delay`.

@param [Integer] delay Number of seconds to delay tube execution @return [Array<Hash{String => String, Number}>] Beanstalkd command response @example

@tube.pause(10)

@api public

# File lib/beaneater/tube/record.rb, line 123
def pause(delay)
  transmit("pause-tube #{name} #{delay}")
end
peek(state) click to toggle source

Peek at next job within this tube in given `state`.

@param [String] state The job state to peek at (`ready`, `buried`, `delayed`) @return [Beaneater::Job] The next job within this tube. @example

@tube.peek(:ready) # => <Beaneater::Job id=5 body=foo>

@api public

# File lib/beaneater/tube/record.rb, line 66
def peek(state)
  safe_use do
    res = transmit("peek-#{state}")
    Job.new(client, res)
  end
rescue Beaneater::NotFoundError
  # Return nil if not found
  nil
end
put(body, options={}) click to toggle source

Inserts job with specified body onto tube.

@param [String] body The data to store with this job. @param [Hash{String => Integer}] options The settings associated with this job. @option options [Integer] pri priority for this job @option options [Integer] ttr time to respond for this job @option options [Integer] delay delay for this job @return [Hash{String => String, Number}] beanstalkd command response @example

@tube.put "data", :pri => 1000, :ttr => 10, :delay => 5

@api public

# File lib/beaneater/tube/record.rb, line 43
def put(body, options={})
  safe_use do
    serialized_body = config.job_serializer.call(body)

    options = {
      :pri   => config.default_put_pri,
      :delay => config.default_put_delay,
      :ttr   => config.default_put_ttr
    }.merge(options)

    cmd_options = "#{options[:pri]} #{options[:delay]} #{options[:ttr]} #{serialized_body.bytesize}"
    transmit("put #{cmd_options}\r\n#{serialized_body}")
  end
end
reserve(timeout=nil, &block) click to toggle source

Reserves the next job from tube.

@param [Integer] timeout Number of seconds before timing out @param [Proc] block Callback to perform on reserved job @yield [job] Job that was reserved. @return [Beaneater::Job] Job that was reserved. @example

@tube.reserve # => <Beaneater::Job id=5 body=foo>

@api public

# File lib/beaneater/tube/record.rb, line 86
def reserve(timeout=nil, &block)
  client.tubes.watch!(self.name)
  client.tubes.reserve(timeout, &block)
end
stats() click to toggle source

Returns related stats for this tube.

@return [Beaneater::StatStruct] Struct of tube related values @example

@tube.stats.current_jobs_delayed # => 24

@api public

# File lib/beaneater/tube/record.rb, line 110
def stats
  res = transmit("stats-tube #{name}")
  StatStruct.from_hash(res[:body])
end
to_s() click to toggle source

String representation of tube.

@return [String] Representation of tube including name. @example

@tube.to_s # => "#<Beaneater::Tube name=foo>"
# File lib/beaneater/tube/record.rb, line 154
def to_s
  "#<Beaneater::Tube name=#{name.inspect}>"
end
Also aliased as: inspect
transmit(command, options={}) click to toggle source

Delegates transmit to the connection object.

@see Beaneater::Connection#transmit

# File lib/beaneater/tube/record.rb, line 27
def transmit(command, options={})
  client.connection.transmit(command, **options)
end

Protected Instance Methods

config() click to toggle source

Returns configuration options for beaneater

@return [Beaneater::Configuration] configuration object

# File lib/beaneater/tube/record.rb, line 180
def config
  Beaneater.configuration
end
safe_use() { || ... } click to toggle source

Transmits a beanstalk command that requires this tube to be set as used.

@param [Proc] block Beanstalk command to transmit. @return [Object] Result of block passed @example

safe_use { transmit("kick 1") }
  # => "Response to kick command"
# File lib/beaneater/tube/record.rb, line 169
def safe_use(&block)
  @mutex.lock
  client.tubes.use(self.name)
  yield
ensure
  @mutex.unlock
end