class Beanstalk::MockConnection

Public Class Methods

new(addr, default_tube=nil) click to toggle source
Calls superclass method
# File lib/beanstalk-client-rspec.rb, line 7
def initialize(addr, default_tube=nil)
  super
  @default_tube = default_tube
  reset!
end

Public Instance Methods

clear!() click to toggle source

Tests use this to rest stuff

# File lib/beanstalk-client-rspec.rb, line 31
def clear!
  @id_mutex = Mutex.new
  @tube_mutex = Mutex.new
  @tubes = {}
  @id = 0

  # Super reset
  @mutex = Mutex.new
  @tube_mutex = Mutex.new
  @waiting = false
end
connect() click to toggle source
# File lib/beanstalk-client-rspec.rb, line 43
def connect
  # We don't want to actually connect to anything
end
reserve(timeout=nil) click to toggle source

TODO Put on to reservation queue and deal with bury etc

# File lib/beanstalk-client-rspec.rb, line 48
def reserve(timeout=nil)
  job = nil
  @watch_list.each do |tube_name|
    begin
      if @tubes[tube_name].nil?
        next
      end
      job = @tubes[tube_name]['ready'].pop(true)
    rescue ThreadError
      next
    end

    job['reserves'] += 1
    (@tubes[tube_name]['reserved'] ||= []).push job
  end

  if job.nil?
    if timeout
      raise Beanstalk::TimedOut
    else
      return nil
    end
  end

  Job.new(self, job['id'], job['body'])
end
reset!() click to toggle source

Tests use this to rest stuff

# File lib/beanstalk-client-rspec.rb, line 14
def reset!
  @id_mutex = Mutex.new
  @tube_mutex = Mutex.new
  @tubes = {}
  @id = 0

  # Super reset
  @mutex = Mutex.new
  @tube_mutex = Mutex.new
  @waiting = false
  @last_used = 'default'
  @watch_list = [@last_used]
  self.use(@default_tube) if @default_tube
  self.watch(@default_tube) if @default_tube
end

Private Instance Methods

interact(cmd, rfmt) click to toggle source
# File lib/beanstalk-client-rspec.rb, line 76
def interact(cmd, rfmt)
  case cmd
  when /^watch (\S+)/
    [@watch_list.size]
  when /^ignore/
    [@watch_list.size]
  when /^use (\S+)/
    [$1]
  when /^list-tubes-watched/
    @watch_list
  when /^put (\d+) (\d+) (\d+) \d+\r\n(.*)\r\n/m
    pri = $1
    delay = $2
    ttr = $3
    body = $4

    id = @id_mutex.synchronize { @id += 1 }
    job = {
      'id'         => id,
      'pri'        => pri,
      'delay'      => delay,
      'ttr'        => ttr,
      'body'       => body.to_s,
      'created_at' => Time.now,
      'file'       => 0,
      'reserves'   => 0,
      'timeouts'   => 0,
      'releases'   => 0,
      'buries'     => 0,
      'kicks'      => 0,
      }
    @tubes[@last_used] ||= {}
    (@tubes[@last_used]['ready'] ||= Queue.new) << job
    [id]
  when /^delete (\d+)/
    id = $1
    @tubes.each_pair do |tube_name, states|
      states['reserved'].delete_if {|job| job[:id] == id } if states['reserved']
    end
  when /^stats-job (\d+)/
    id = $1
    @tubes.each_pair do |tube_name, states|
      job = states['reserved'].find {|job| job[:id] == id }
      if job
        return job.merge(
          'state'     => 'reserved',
          'age'       => (Time.now - job[:created_at]),
          'file'      => 0,
          'time-left' => 1,
        )
      end
    end
  when /^release (\d+) (\d+) (\d+)/
    id = $1
    @tubes.each_pair do |tube_name, states|
      job = states['reserved'].delete_if {|job| job[:id] == id }.first
      if job
        job['releases'] += 1
        (@tubes[tube_name]['ready'] ||= Queue.new) << job
      end
    end
  else
    raise "Need to define #{cmd} #{rfmt} for interact"
  end
end