module ResqueSpec

Attributes

disable_ext[RW]
inline[RW]

Public Instance Methods

clear_all!() click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 145
def clear_all!
  [*Resque.redis.keys].each do |key|
    Resque.redis.del(key)
  end
end
clear_locked!() click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 139
def clear_locked!
  [*Resque.redis.keys("lock:*")].each do |key|
    Resque.redis.del(key)
  end
end
delayed?(klass, *args) click to toggle source

Check if we have queued a delayed worker for `klass` with `*args` Very slow, checks every `delayed` key

# File lib/tresque/resque_spec/resque_spec.rb, line 106
def delayed?(klass, *args)
  klass = klass.to_s unless klass.is_a? String
  [*Resque.redis.keys("delayed:*")].each do |key|
    [*Resque.redis.lrange(key, 0, -1)].each do |item|
      parsed_item = JSON.parse(item)
      return true if parsed_item['class'] == klass && parsed_item['args'] == [*args]
    end
  end

  false
end
delayed_key(klass, *args) click to toggle source

Get back the key of the delayed worker if it exists

# File lib/tresque/resque_spec/resque_spec.rb, line 119
def delayed_key(klass, *args)
  klass = klass.to_s unless klass.is_a? String
  [*Resque.redis.keys("delayed:*")].each do |key|
    [*Resque.redis.lrange(key, 0, -1)].each do |item|
      parsed_item = JSON.parse(item)
      return key if parsed_item['class'] == klass && parsed_item['args'] == [*args]
    end
  end

  nil
end
delete_all(queue_name) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 92
def delete_all(queue_name)
  queue = "queue:#{queue_name}"
  Resque.redis.del(queue)
  reset!
end
dequeue(queue_name, klass, *args) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 27
def dequeue(queue_name, klass, *args)
  queue_by_name(queue_name).delete_if do |job|
    job[:class] == klass.to_s && args.empty? || job[:args] == args
  end
end
enqueue(queue_name, klass, *args) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 33
def enqueue(queue_name, klass, *args)
  perform_or_store(queue_name, :class => klass.to_s, :args => args)
end
enqueue_at(time, klass, *args) click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 49
def enqueue_at(time, klass, *args)
  enqueue_at_with_queue(schedule_queue_name(klass), time, klass, *args)
end
enqueue_at_with_queue(queue, time, klass, *args) click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 53
def enqueue_at_with_queue(queue, time, klass, *args)
  is_time?(time)
  perform_or_store(queue, :class => klass.to_s, :time  => time, :stored_at => Time.now, :args => args)
end
enqueue_in(time, klass, *args) click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 58
def enqueue_in(time, klass, *args)
  enqueue_at(Time.now + time, klass, *args)
end
enqueue_in_with_queue(queue, time, klass, *args) click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 62
def enqueue_in_with_queue(queue, time, klass, *args)
  enqueue_at_with_queue(queue, Time.now + time, klass, *args)
end
locked?(klass, *args) click to toggle source

check if the worker has a lock with the provided args

# File lib/tresque/resque_spec/resque_spec.rb, line 132
def locked?(klass, *args)
  klass = klass.to_s unless klass.is_a? String

  key = "lock:#{klass}-#{[*args].join('-')}"
  Resque.redis.keys(key).present?
end
peek(queue_name, start = 0, count = 1) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 74
def peek(queue_name, start = 0, count = 1)
  queue_by_name(queue_name).slice(start, count)
end
perform_all(queue_name) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 41
def perform_all(queue_name)
  while job = shift_queue(queue_name)
    perform(queue_name, job)
  end
end
perform_all!() click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 98
def perform_all!
  ::TResque::Registry.queues.each do |queue_name|
    ResqueSpec.perform_all(queue_name)
  end
end
perform_next(queue_name) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 37
def perform_next(queue_name)
  perform(queue_name, shift_queue(queue_name))
end
pop(queue_name) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 61
def pop(queue_name)
  return unless payload = shift_queue(queue_name)
  new_job(queue_name, payload)
end
queue_by_name(name) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 66
def queue_by_name(name)
  queues[name.to_s]
end
queue_for(klass) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 70
def queue_for(klass)
  queue_by_name(queue_name(klass))
end
queue_name(klass) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 78
def queue_name(klass)
  if klass.is_a?(String)
    klass = Kernel.const_get(klass) rescue nil
  end

  name_from_instance_var(klass) or
    name_from_queue_accessor(klass) or
      raise ::Resque::NoQueueError.new("Jobs must be placed onto a queue.")
end
queues() click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 88
def queues
  @queues ||= Hash.new {|h,k| h[k] = []}
end
remove_delayed(klass, *args) click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 66
def remove_delayed(klass, *args)
  sched_queue = queue_by_name(schedule_queue_name(klass))
  count_before_remove = sched_queue.length
  sched_queue.delete_if do |job|
    job[:class] == klass.to_s && job[:args] == args
  end
  # Return number of removed items to match Resque Scheduler behaviour
  count_before_remove - sched_queue.length
end
reset!() click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 151
def reset!
  clear_all!
  queues.clear
  self.inline = false
end
schedule_for(klass) click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 76
def schedule_for(klass)
  queue_by_name(schedule_queue_name(klass))
end
shift_queue(queue_name) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 47
def shift_queue(queue_name)
  # pass scheduled ones
  array = queue_by_name(queue_name) || []
  index = nil
  array.each_with_index do |hash, i|
    if hash[:time].to_i <= Time.now.to_i
      index = i
      break
    end
  end
  return nil unless index
  array.delete_at(index)
end

Private Instance Methods

is_time?(time) click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 82
def is_time?(time)
  time.to_i
end
name_from_instance_var(klass) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 159
def name_from_instance_var(klass)
  klass.instance_variable_get(:@queue)
end
name_from_queue_accessor(klass) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 163
def name_from_queue_accessor(klass)
  klass.respond_to?(:queue) and klass.queue
end
new_job(queue_name, payload) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 167
def new_job(queue_name, payload)
  Resque::Job.new(queue_name, payload_with_string_keys(payload))
end
payload_with_string_keys(payload) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 191
def payload_with_string_keys(payload)
  {
    'class' => payload[:class],
    'args' => decode(encode(payload[:args])),
    'stored_at' => payload[:stored_at]
  }
end
perform(queue_name, payload) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 171
def perform(queue_name, payload)
  prev = ENV['QUEUE']
  ENV['QUEUE'] = queue_name
  new_job(queue_name, payload).perform
ensure
  ENV['QUEUE'] = prev
end
perform_or_store(queue_name, payload) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 179
def perform_or_store(queue_name, payload)
  if inline
    perform(queue_name, payload)
  else
    store(queue_name, payload)
  end
end
schedule_queue_name(klass) click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 86
def schedule_queue_name(klass)
  # "#{queue_name(klass)}_scheduled"
  # we use the real one
  queue_name(klass)
end
store(queue_name, payload) click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 187
def store(queue_name, payload)
  queue_by_name(queue_name) << payload
end