module QueueDispatcher::ActsAsTaskQueue::InstanceMethods

Public Instance Methods

acts_as_task_queue_tasks() click to toggle source
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 127
def acts_as_task_queue_tasks
  self.send(acts_as_task_queue_config.task_class_name.pluralize)
end
all_done?() click to toggle source

Are all tasks executed?

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 235
def all_done?
  ! pending_tasks? || empty?
end
brand_new?() click to toggle source

Return true, if the task_queue is in state new and is not older 30 seconds

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 213
def brand_new?
  state == 'new' && (Time.now - created_at) < 30.seconds
end
destroy_if_all_done!() click to toggle source

Destroy the queue if it has no pending jobs

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 266
def destroy_if_all_done!
  transaction do
    queue = TaskQueue.where(:id => self.id).lock(true).first
    queue.destroy if queue && queue.all_done?
  end
end
empty?() click to toggle source

Return true if there are no tasks in this taskqueue

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 219
def empty?
  acts_as_task_queue_tasks.empty?
end
kill() click to toggle source

Kill a task_queue

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 260
def kill
  Process.kill('HUP', pid) if pid
end
pending?() click to toggle source

Return true, if the task_queue has pending jobs and is running but no job is running

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 247
def pending?
  ts = task_states
  (ts == :new || ts == :pending || ts == :acquire_lock) && self.running?
end
pending_tasks?() click to toggle source

Are there any running or pending tasks in the queue?

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 225
def pending_tasks?
  transaction do
    queue = TaskQueue.where(:id => self.id).lock(true).first
    states = determine_state_of_task_array queue.acts_as_task_queue_tasks.lock(true)
    states[:running] || states[:pending] || states[:acquire_lock] || states[:init_queue]
  end
end
pid_running?() click to toggle source

Return true, if the command of the process with pid ‘self.pid’ is ‘ruby’

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 201
def pid_running?
  self.class.pid_running?(self.pid)
end
pop(args = {}) click to toggle source

Get the next ready to run task out of the queue. Consider the priority and the dependent tasks, which is defined in the association defined on top of this model.

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 140
def pop(args = {})
  task      = nil
  log_debug = acts_as_task_queue_config.debug

  transaction do
    # Find next pending task, where all dependent tasks are executed
    all_tasks = acts_as_task_queue_tasks.lock(true).all
    pos       = 0
    while task.nil? && pos < all_tasks.to_a.count do
      t = all_tasks[pos]
      if t.dependent_tasks_executed?
        task = t if t.state == 'new'
      else
        log :msg => "Task #{t.id}: Waiting for dependent tasks #{t.dependent_tasks.map{|dt| dt.id}.join ','}...", :sev => :debug if log_debug
      end
      pos += 1
    end

    # Remove task from current queue
    if task
      if args[:remove_task].nil? || args[:remove_task]
        task.update_attribute :task_queue_id, nil
      else
        task.update_attribute :state, 'new_popped'
      end
    end
  end

  task
end
push(task) click to toggle source

Put a new task into the queue

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 133
def push(task)
  acts_as_task_queue_tasks << task
end
reloading_config?() click to toggle source

Return true, if the task_queue is in state ‘reloading_config’

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 254
def reloading_config?
  pid_running? && state == 'reloading_config'
end
remove_finished_tasks!() click to toggle source

Remove finished tasks from queue

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 275
def remove_finished_tasks!
  trasnaction do
    tasks.each{ |t| t.update_attribute(:task_queue_id, nil) if t.executed? }
  end
end
run!(args = {}) click to toggle source

Execute all tasks in the queue

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 283
def run!(args = {})
  task          = nil
  @logger       = args[:logger] || Logger.new("#{File.expand_path(Rails.root)}/log/task_queue.log")
  finish_state  = 'aborted'
  task_queue    = self
  print_log     = args[:print_log]

  task_queue.update_attribute :state, 'running'

  # Set logger in engine
  @engine.logger = @logger if defined?(@engine) && @engine.methods.include?(:logger=)
  log :msg => "#{name}: Starting TaskQueue #{task_queue.id}...", :print_log => print_log

  # Init. Pop first task from queue, to show init_queue-state
  task = task_queue.pop(:remove_task => false)
  task.update_attribute :state, 'init_queue' if task
  init

  # Put task, which was used for showing the init_queue-state, back into the task_queue
  task.update_attributes :state => 'new', :task_queue_id => task_queue.id if task
  task_queue.reload

  # Ensure, that each task_queue is executed at least once, even if there are no tasks inside at the time it is started (this
  # can happen, if there are a lot of DB activities...)
  first_run = true
  # Loop as long as the task_queue exists with states 'running' and until the task_queue has pending tasks
  while task_queue && task_queue.state == 'running' && (task_queue.pending_tasks? || first_run) do
    first_run = false

    # Pop next task from queue
    task = task_queue.pop(:remove_task => (! acts_as_task_queue_config.leave_running_tasks_in_queue))

    if task
      if task.new?
        # Start
        task.update_attributes :state => 'acquire_lock', :perc_finished => 0
        get_lock_for task
        log :msg => "#{name}: Starting task #{task.id} (#{task.payload.class.name}.#{task.method_name})...", :print_log => print_log
        task.update_attributes :state => 'running'

        # Execute the method defined in task.method
        if task.payload.methods.map(&:to_sym).include?(task.method_name.to_sym)
          if task.dependent_tasks_had_errors
            error_msg = 'Dependent tasks had errors!'
            log :msg => error_msg,
                :sev => :warn, 
                :print_log => print_log
            result = QueueDispatcher::RcAndMsg.bad_rc error_msg
          else
            payload = task.payload
            payload.logger = @logger if payload.methods.include?(:logger=) || payload.methods.include?('logger=')
            result = task.execute!
          end
        else
          error_msg = "unknown method '#{task.method_name}' for #{task.payload.class.name}!"
          log :msg => error_msg,
              :sev => :warn,
              :print_log => print_log
          result = QueueDispatcher::RcAndMsg.bad_rc error_msg
        end

        # Change task state according to the return code and remove it from the queue
        task.update_state_and_exec_callbacks(result, false, logger)
        cleanup_locks_after_error_for task
        task.update_attribute :task_queue_id, nil unless acts_as_task_queue_config.leave_finished_tasks_in_queue
        log :msg => "#{name}: Task #{task.id} (#{task.payload.class.name}.#{task.method_name}) finished with state '#{task.state}'.", :print_log => print_log

        # Wait between tasks
        sleep acts_as_task_queue_config.task_finish_wait_time
      end
    else
      # We couldn't fetch a task out of the queue but there should still exists some. Maybe some are waiting for dependent tasks.
      # Sleep some time before trying it again.
      sleep acts_as_task_queue_config.poll_time
    end

    # Interrupts
    handle_interrupts print_log: print_log

    # Reload task_queue to get all updates
    task_queue = TaskQueue.find_by_id task_queue.id

    # If all tasks are finished, a config reload will be executed at the end of this method. To avoid too much config reloads,
    # wait some time before continuing. Maybe, some more tasks will added to the queue?!
    wait_time = 0
    unless task_queue.nil? || task_queue.terminate_immediately
      until task_queue.nil? || task_queue.pending_tasks? || wait_time >= acts_as_task_queue_config.idle_wait_time || task_queue.state != 'running' do
        sleep acts_as_task_queue_config.poll_time
        wait_time += acts_as_task_queue_config.poll_time
        task_queue = TaskQueue.find_by_id task_queue.id
      end
    end

    # Reset logger since this got lost by reloading the task_queue
    task_queue.logger = @logger if task_queue
  end

  # Reload config if last task was not a config reload
  config_reload_required = cleanup_before_auto_reload
  if config_reload_required
    task_queue.update_attributes :state => 'reloading_config' if task_queue
    reload_config task, print_log: print_log
  end

  # Loop has ended
  log :msg => "#{name}: TaskQueue has ended!", :print_log => print_log
  finish_state = 'stopped'
rescue => exception
  # Error handler
  backtrace = exception.backtrace.join("\n  ")
  log :msg => "Fatal error in method 'run!': #{$!}\n  #{backtrace}", :sev => :error, :print_log => print_log
  puts "Fatal error in method 'run!': #{$!}\n#{backtrace}"
  task.update_state_and_exec_callbacks(QueueDispatcher::RcAndMsg.bad_rc("Fatal error: #{$!}"), false, logger) if task
  cleanup_locks_after_error_for task if task
  task.update_attributes state: 'error' if task && task.state != 'finished'
ensure
  # Reload task and task_queue, to ensure the objects are up to date
  task_queue = TaskQueue.find_by_id task_queue.id if task_queue
  task       = Task.find_by_id task.id if task

  # Delete task_queue
  task_queue.destroy_if_all_done! if task_queue

  # Update states of task and task_queue
  task.update_attributes :state => 'aborted' if task && task.state == 'running'
  task_queue.update_attributes :state => finish_state, :pid   => nil if task_queue

  # Clean up
  deinit
end
running?() click to toggle source

Return true, if the task_queue is still running

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 207
def running?
  state == 'running' && pid_running?
end
task_states() click to toggle source

Returns the state of this task list (:stopped or :running)

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 173
def task_states
  states = determine_state_of_task_array acts_as_task_queue_tasks

  if states[:empty]
    nil
  elsif states[:running]
    :running
  elsif states[:init_queue]
    :init_queue
  elsif states[:pending]
    :pending
  elsif states[:acquire_lock]
    :acquire_lock
  elsif states[:error]
    :error
  elsif states[:aborted]
    :aborted
  elsif states[:new]
    :new
  elsif states[:successful]
    :successful
  else
    :unknown
  end
end
working?() click to toggle source

Return true, if the task_queue is working or has pending jobs

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 241
def working?
  self.task_states == :running && self.running?
end

Private Instance Methods

acts_as_task_queue_config() click to toggle source
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 420
def acts_as_task_queue_config
  self.class.acts_as_task_queue_config
end
cleanup_before_auto_reload() click to toggle source

Here you can add clean up tasks which will be executed before the auto-reload at the end of the task-queue execution. This is handy if you want to remove the virtual-flag from objects for example. Return true, when a config reload is needed.

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 476
def cleanup_before_auto_reload
  true
end
cleanup_locks_after_error_for(task) click to toggle source

Clean up locks after an error occured

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 469
def cleanup_locks_after_error_for(task)
  release_lock_for task
end
deinit() click to toggle source

Deinitialize

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 487
def deinit
end
determine_state_of_task_array(task_array) click to toggle source
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 425
def determine_state_of_task_array(task_array)
  successful = true
  new = true
  pending = false
  error = false
  aborted = false
  running = false
  acquire_lock = false
  init_queue = false

  task_array.each do |task|
    running = true if task.state == 'running'
    acquire_lock = true if task.state == 'acquire_lock'
    successful = false unless task.state == 'finished' || task.state == 'successful'
    new = false unless task.state == 'new' || task.state == 'new_popped' || task.state == 'build'
    pending = true if (task.state == 'new' || task.state == 'new_popped' || task.state == 'build' || task.state == 'pending')
    error = true if task.state == 'error'
    aborted = true if task.state == 'aborted'
    init_queue = true if task.state == 'init_queue'
  end

  {:running      => running,
   :acquire_lock => acquire_lock,
   :successful   => successful,
   :new          => new,
   :pending      => pending,
   :error        => error,
   :aborted      => aborted,
   :empty        => task_array.empty?,
   :init_queu    => init_queue}
end
get_lock_for(task) click to toggle source

Get Lock

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 459
def get_lock_for(task)
end
handle_interrupts(args = {}) click to toggle source

Interrupt handler

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 498
def handle_interrupts(args = {})
  interrupts.each { |int| send int.to_sym }
  update_attributes :interrupts => []
rescue => exception
  backtrace = exception.backtrace.join("\n  ")
  log :msg => "Fatal error in method 'handle_interrupts': #{$!}\n  #{backtrace}", :sev => :error, :print_log => args[:print_log]
end
init() click to toggle source

Initialize

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 482
def init
end
release_lock_for(task) click to toggle source

Release Lock

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 464
def release_lock_for(task)
end
reload_config(last_task, args = {}) click to toggle source

Reload config

# File lib/queue_dispatcher/acts_as_task_queue.rb, line 492
def reload_config(last_task, args = {})
  #log :msg => "#{name}: Reloading config...", :print_log => args[:print_log]
end