class RubyJob::InMemoryJobStore

Attributes

pause_starting_at[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/ruby_job/in_memory_job_store.rb, line 11
def initialize
  super
  @semaphore = Mutex.new
  @next_uuid = 0
  @pause_starting_at = nil
end

Public Instance Methods

dequeue(job) click to toggle source
# File lib/ruby_job/in_memory_job_store.rb, line 24
def dequeue(job)
  @semaphore.synchronize { queue.delete(job) }
end
enqueue(job) click to toggle source
# File lib/ruby_job/in_memory_job_store.rb, line 18
def enqueue(job)
  raise 'job does not have an assigned uuid' unless job.uuid

  @semaphore.synchronize { queue.push(job) }
end
fetch() click to toggle source
# File lib/ruby_job/in_memory_job_store.rb, line 32
def fetch
  @options[:wait] ? fetch_next_or_wait : fetch_next
end
next_uuid() click to toggle source
# File lib/ruby_job/in_memory_job_store.rb, line 40
def next_uuid
  @semaphore.synchronize { @next_uuid += 1 }
end
pause_at(time) click to toggle source
# File lib/ruby_job/in_memory_job_store.rb, line 28
def pause_at(time)
  @pause_starting_at = time
end
size() click to toggle source
# File lib/ruby_job/in_memory_job_store.rb, line 36
def size
  queue.size
end

Private Instance Methods

fetch_next() click to toggle source
# File lib/ruby_job/in_memory_job_store.rb, line 54
def fetch_next
  @semaphore.synchronize do
    queue.pop if (top = queue.top) && top.start_at <= [Time.now, @pause_starting_at].compact.min
  end
end
fetch_next_or_wait() click to toggle source
# File lib/ruby_job/in_memory_job_store.rb, line 60
def fetch_next_or_wait
  job = nil
  loop do
    job = fetch_next
    break if job || (paused_before?(Time.now) && !@options[:wait])

    sleep(@options[:wait_delay])
  end
  job
end
paused_before?(time) click to toggle source
# File lib/ruby_job/in_memory_job_store.rb, line 50
def paused_before?(time)
  @pause_starting_at && @pause_starting_at <= time
end
queue() click to toggle source
# File lib/ruby_job/in_memory_job_store.rb, line 46
def queue
  @queue ||= JobPriorityQueue.new
end