module ResqueSpec
Attributes
disable_ext[RW]
inline[RW]
Public Instance Methods
clear_all!()
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 145 def clear_all! [*Resque.redis.keys].each do |key| Resque.redis.del(key) end end
clear_locked!()
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 139 def clear_locked! [*Resque.redis.keys("lock:*")].each do |key| Resque.redis.del(key) end end
delayed?(klass, *args)
click to toggle source
Check if we have queued a delayed worker for `klass` with `*args` Very slow, checks every `delayed` key
# File lib/tresque/resque_spec/resque_spec.rb, line 106 def delayed?(klass, *args) klass = klass.to_s unless klass.is_a? String [*Resque.redis.keys("delayed:*")].each do |key| [*Resque.redis.lrange(key, 0, -1)].each do |item| parsed_item = JSON.parse(item) return true if parsed_item['class'] == klass && parsed_item['args'] == [*args] end end false end
delayed_key(klass, *args)
click to toggle source
Get back the key of the delayed worker if it exists
# File lib/tresque/resque_spec/resque_spec.rb, line 119 def delayed_key(klass, *args) klass = klass.to_s unless klass.is_a? String [*Resque.redis.keys("delayed:*")].each do |key| [*Resque.redis.lrange(key, 0, -1)].each do |item| parsed_item = JSON.parse(item) return key if parsed_item['class'] == klass && parsed_item['args'] == [*args] end end nil end
delete_all(queue_name)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 92 def delete_all(queue_name) queue = "queue:#{queue_name}" Resque.redis.del(queue) reset! end
dequeue(queue_name, klass, *args)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 27 def dequeue(queue_name, klass, *args) queue_by_name(queue_name).delete_if do |job| job[:class] == klass.to_s && args.empty? || job[:args] == args end end
enqueue(queue_name, klass, *args)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 33 def enqueue(queue_name, klass, *args) perform_or_store(queue_name, :class => klass.to_s, :args => args) end
enqueue_at(time, klass, *args)
click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 49 def enqueue_at(time, klass, *args) enqueue_at_with_queue(schedule_queue_name(klass), time, klass, *args) end
enqueue_at_with_queue(queue, time, klass, *args)
click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 53 def enqueue_at_with_queue(queue, time, klass, *args) is_time?(time) perform_or_store(queue, :class => klass.to_s, :time => time, :stored_at => Time.now, :args => args) end
enqueue_in(time, klass, *args)
click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 58 def enqueue_in(time, klass, *args) enqueue_at(Time.now + time, klass, *args) end
enqueue_in_with_queue(queue, time, klass, *args)
click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 62 def enqueue_in_with_queue(queue, time, klass, *args) enqueue_at_with_queue(queue, Time.now + time, klass, *args) end
locked?(klass, *args)
click to toggle source
check if the worker has a lock with the provided args
# File lib/tresque/resque_spec/resque_spec.rb, line 132 def locked?(klass, *args) klass = klass.to_s unless klass.is_a? String key = "lock:#{klass}-#{[*args].join('-')}" Resque.redis.keys(key).present? end
peek(queue_name, start = 0, count = 1)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 74 def peek(queue_name, start = 0, count = 1) queue_by_name(queue_name).slice(start, count) end
perform_all(queue_name)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 41 def perform_all(queue_name) while job = shift_queue(queue_name) perform(queue_name, job) end end
perform_all!()
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 98 def perform_all! ::TResque::Registry.queues.each do |queue_name| ResqueSpec.perform_all(queue_name) end end
perform_next(queue_name)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 37 def perform_next(queue_name) perform(queue_name, shift_queue(queue_name)) end
pop(queue_name)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 61 def pop(queue_name) return unless payload = shift_queue(queue_name) new_job(queue_name, payload) end
queue_by_name(name)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 66 def queue_by_name(name) queues[name.to_s] end
queue_for(klass)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 70 def queue_for(klass) queue_by_name(queue_name(klass)) end
queue_name(klass)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 78 def queue_name(klass) if klass.is_a?(String) klass = Kernel.const_get(klass) rescue nil end name_from_instance_var(klass) or name_from_queue_accessor(klass) or raise ::Resque::NoQueueError.new("Jobs must be placed onto a queue.") end
queues()
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 88 def queues @queues ||= Hash.new {|h,k| h[k] = []} end
remove_delayed(klass, *args)
click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 66 def remove_delayed(klass, *args) sched_queue = queue_by_name(schedule_queue_name(klass)) count_before_remove = sched_queue.length sched_queue.delete_if do |job| job[:class] == klass.to_s && job[:args] == args end # Return number of removed items to match Resque Scheduler behaviour count_before_remove - sched_queue.length end
reset!()
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 151 def reset! clear_all! queues.clear self.inline = false end
schedule_for(klass)
click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 76 def schedule_for(klass) queue_by_name(schedule_queue_name(klass)) end
shift_queue(queue_name)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 47 def shift_queue(queue_name) # pass scheduled ones array = queue_by_name(queue_name) || [] index = nil array.each_with_index do |hash, i| if hash[:time].to_i <= Time.now.to_i index = i break end end return nil unless index array.delete_at(index) end
Private Instance Methods
is_time?(time)
click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 82 def is_time?(time) time.to_i end
name_from_instance_var(klass)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 159 def name_from_instance_var(klass) klass.instance_variable_get(:@queue) end
name_from_queue_accessor(klass)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 163 def name_from_queue_accessor(klass) klass.respond_to?(:queue) and klass.queue end
new_job(queue_name, payload)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 167 def new_job(queue_name, payload) Resque::Job.new(queue_name, payload_with_string_keys(payload)) end
payload_with_string_keys(payload)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 191 def payload_with_string_keys(payload) { 'class' => payload[:class], 'args' => decode(encode(payload[:args])), 'stored_at' => payload[:stored_at] } end
perform(queue_name, payload)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 171 def perform(queue_name, payload) prev = ENV['QUEUE'] ENV['QUEUE'] = queue_name new_job(queue_name, payload).perform ensure ENV['QUEUE'] = prev end
perform_or_store(queue_name, payload)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 179 def perform_or_store(queue_name, payload) if inline perform(queue_name, payload) else store(queue_name, payload) end end
schedule_queue_name(klass)
click to toggle source
# File lib/tresque/resque_spec/scheduler.rb, line 86 def schedule_queue_name(klass) # "#{queue_name(klass)}_scheduled" # we use the real one queue_name(klass) end
store(queue_name, payload)
click to toggle source
# File lib/tresque/resque_spec/resque_spec.rb, line 187 def store(queue_name, payload) queue_by_name(queue_name) << payload end