module QueueDispatcher::ActsAsTaskQueue::SingletonMethods

Public Instance Methods

acts_as_task_queue_config() click to toggle source
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 50
def acts_as_task_queue_config
  @acts_as_task_queue_config
end
any_running?() click to toggle source

Are there any running task_queues?

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 76
def any_running?
  running = false
  all.each{ |tq| running = true if tq.running? || tq.brand_new? }
  running
end
find_or_create_by_name(name, options = {}) click to toggle source

Find or create a task_queue by its name which is not in state ‘error’. Create one, if there does not exists one

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 100
def find_or_create_by_name(name, options = {})
  transaction do
    self.where(:name => name).where("state != 'error'").first || self.create(:name => name, :state => 'new', terminate_immediately: options[:terminate_immediately])
  end
end
get_next_pending() click to toggle source

Get next pending task_queue

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 84
def get_next_pending
  task_queue = nil

  transaction do
    # Find next task_queue which is not running and not in state error
    order(:id).lock(true).all.each { |tq| task_queue = tq unless task_queue || tq.pid_running? || tq.state == 'error' || tq.state == 'heartbeat' }

    # Update pid inside the atomic transaction to be sure, the next call of this method will not give the same queue a second time
    task_queue.update_attribute :pid, $$ if task_queue
  end

  task_queue
end
pid_running?(pid) click to toggle source

Check if a certain PID is still running and is a ruby process

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 56
def pid_running?(pid)
  ps = pid ? Sys::ProcTable.ps(pid) : nil
  if ps
    # Asume, that if the command of the 'ps'-output is 'ruby', the process is still running
    ps.comm == 'ruby'
  else
    false
  end
end
qd_running?() click to toggle source

Check if QueueDispatcher is running.

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 68
def qd_running?
  running = false
  TaskQueue.where(state: 'heartbeat').each { |tq| running = true if tq.updated_at > 1.minute.ago }
  running
end
reset_immediately!() click to toggle source

Kill all running TaskQueues immediately and destroy them.

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 108
def reset_immediately!
  all.each do |tq|
    tq.update_attributes state: 'aborted'

    # Kill the TaskQueue with SIGKILL
    Process.kill 'KILL', tq.pid if tq.pid_running?

    # Update task_state to aborted and release all its locks
    tq.tasks.each do |task|
      task.update_attributes state: 'aborted' unless task.state == 'successful' || task.state == 'finished'
      tq.send(:release_lock_for, task)
    end
    tq.destroy
  end
end