module QueueDispatcher::ActsAsTaskQueue::InstanceMethods
Public Instance Methods
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 127 def acts_as_task_queue_tasks self.send(acts_as_task_queue_config.task_class_name.pluralize) end
Are all tasks executed?
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 235 def all_done? ! pending_tasks? || empty? end
Return true, if the task_queue is in state new and is not older 30 seconds
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 213 def brand_new? state == 'new' && (Time.now - created_at) < 30.seconds end
Destroy the queue if it has no pending jobs
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 266 def destroy_if_all_done! transaction do queue = TaskQueue.where(:id => self.id).lock(true).first queue.destroy if queue && queue.all_done? end end
Return true if there are no tasks in this taskqueue
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 219 def empty? acts_as_task_queue_tasks.empty? end
Kill a task_queue
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 260 def kill Process.kill('HUP', pid) if pid end
Return true, if the task_queue has pending jobs and is running but no job is running
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 247 def pending? ts = task_states (ts == :new || ts == :pending || ts == :acquire_lock) && self.running? end
Are there any running or pending tasks in the queue?
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 225 def pending_tasks? transaction do queue = TaskQueue.where(:id => self.id).lock(true).first states = determine_state_of_task_array queue.acts_as_task_queue_tasks.lock(true) states[:running] || states[:pending] || states[:acquire_lock] || states[:init_queue] end end
Return true, if the command of the process with pid ‘self.pid’ is ‘ruby’
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 201 def pid_running? self.class.pid_running?(self.pid) end
Get the next ready to run task out of the queue. Consider the priority and the dependent tasks, which is defined in the association defined on top of this model.
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 140 def pop(args = {}) task = nil log_debug = acts_as_task_queue_config.debug transaction do # Find next pending task, where all dependent tasks are executed all_tasks = acts_as_task_queue_tasks.lock(true).all pos = 0 while task.nil? && pos < all_tasks.to_a.count do t = all_tasks[pos] if t.dependent_tasks_executed? task = t if t.state == 'new' else log :msg => "Task #{t.id}: Waiting for dependent tasks #{t.dependent_tasks.map{|dt| dt.id}.join ','}...", :sev => :debug if log_debug end pos += 1 end # Remove task from current queue if task if args[:remove_task].nil? || args[:remove_task] task.update_attribute :task_queue_id, nil else task.update_attribute :state, 'new_popped' end end end task end
Put a new task into the queue
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 133 def push(task) acts_as_task_queue_tasks << task end
Return true, if the task_queue is in state ‘reloading_config’
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 254 def reloading_config? pid_running? && state == 'reloading_config' end
Remove finished tasks from queue
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 275 def remove_finished_tasks! trasnaction do tasks.each{ |t| t.update_attribute(:task_queue_id, nil) if t.executed? } end end
Execute all tasks in the queue
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 283 def run!(args = {}) task = nil @logger = args[:logger] || Logger.new("#{File.expand_path(Rails.root)}/log/task_queue.log") finish_state = 'aborted' task_queue = self print_log = args[:print_log] task_queue.update_attribute :state, 'running' # Set logger in engine @engine.logger = @logger if defined?(@engine) && @engine.methods.include?(:logger=) log :msg => "#{name}: Starting TaskQueue #{task_queue.id}...", :print_log => print_log # Init. Pop first task from queue, to show init_queue-state task = task_queue.pop(:remove_task => false) task.update_attribute :state, 'init_queue' if task init # Put task, which was used for showing the init_queue-state, back into the task_queue task.update_attributes :state => 'new', :task_queue_id => task_queue.id if task task_queue.reload # Ensure, that each task_queue is executed at least once, even if there are no tasks inside at the time it is started (this # can happen, if there are a lot of DB activities...) first_run = true # Loop as long as the task_queue exists with states 'running' and until the task_queue has pending tasks while task_queue && task_queue.state == 'running' && (task_queue.pending_tasks? || first_run) do first_run = false # Pop next task from queue task = task_queue.pop(:remove_task => (! acts_as_task_queue_config.leave_running_tasks_in_queue)) if task if task.new? # Start task.update_attributes :state => 'acquire_lock', :perc_finished => 0 get_lock_for task log :msg => "#{name}: Starting task #{task.id} (#{task.payload.class.name}.#{task.method_name})...", :print_log => print_log task.update_attributes :state => 'running' # Execute the method defined in task.method if task.payload.methods.map(&:to_sym).include?(task.method_name.to_sym) if task.dependent_tasks_had_errors error_msg = 'Dependent tasks had errors!' log :msg => error_msg, :sev => :warn, :print_log => print_log result = QueueDispatcher::RcAndMsg.bad_rc error_msg else payload = task.payload payload.logger = @logger if payload.methods.include?(:logger=) || payload.methods.include?('logger=') result = task.execute! end else error_msg = "unknown method '#{task.method_name}' for #{task.payload.class.name}!" log :msg => error_msg, :sev => :warn, :print_log => print_log result = QueueDispatcher::RcAndMsg.bad_rc error_msg end # Change task state according to the return code and remove it from the queue task.update_state_and_exec_callbacks(result, false, logger) cleanup_locks_after_error_for task task.update_attribute :task_queue_id, nil unless acts_as_task_queue_config.leave_finished_tasks_in_queue log :msg => "#{name}: Task #{task.id} (#{task.payload.class.name}.#{task.method_name}) finished with state '#{task.state}'.", :print_log => print_log # Wait between tasks sleep acts_as_task_queue_config.task_finish_wait_time end else # We couldn't fetch a task out of the queue but there should still exists some. Maybe some are waiting for dependent tasks. # Sleep some time before trying it again. sleep acts_as_task_queue_config.poll_time end # Interrupts handle_interrupts print_log: print_log # Reload task_queue to get all updates task_queue = TaskQueue.find_by_id task_queue.id # If all tasks are finished, a config reload will be executed at the end of this method. To avoid too much config reloads, # wait some time before continuing. Maybe, some more tasks will added to the queue?! wait_time = 0 unless task_queue.nil? || task_queue.terminate_immediately until task_queue.nil? || task_queue.pending_tasks? || wait_time >= acts_as_task_queue_config.idle_wait_time || task_queue.state != 'running' do sleep acts_as_task_queue_config.poll_time wait_time += acts_as_task_queue_config.poll_time task_queue = TaskQueue.find_by_id task_queue.id end end # Reset logger since this got lost by reloading the task_queue task_queue.logger = @logger if task_queue end # Reload config if last task was not a config reload config_reload_required = cleanup_before_auto_reload if config_reload_required task_queue.update_attributes :state => 'reloading_config' if task_queue reload_config task, print_log: print_log end # Loop has ended log :msg => "#{name}: TaskQueue has ended!", :print_log => print_log finish_state = 'stopped' rescue => exception # Error handler backtrace = exception.backtrace.join("\n ") log :msg => "Fatal error in method 'run!': #{$!}\n #{backtrace}", :sev => :error, :print_log => print_log puts "Fatal error in method 'run!': #{$!}\n#{backtrace}" task.update_state_and_exec_callbacks(QueueDispatcher::RcAndMsg.bad_rc("Fatal error: #{$!}"), false, logger) if task cleanup_locks_after_error_for task if task task.update_attributes state: 'error' if task && task.state != 'finished' ensure # Reload task and task_queue, to ensure the objects are up to date task_queue = TaskQueue.find_by_id task_queue.id if task_queue task = Task.find_by_id task.id if task # Delete task_queue task_queue.destroy_if_all_done! if task_queue # Update states of task and task_queue task.update_attributes :state => 'aborted' if task && task.state == 'running' task_queue.update_attributes :state => finish_state, :pid => nil if task_queue # Clean up deinit end
Return true, if the task_queue is still running
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 207 def running? state == 'running' && pid_running? end
Returns the state of this task list (:stopped or :running)
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 173 def task_states states = determine_state_of_task_array acts_as_task_queue_tasks if states[:empty] nil elsif states[:running] :running elsif states[:init_queue] :init_queue elsif states[:pending] :pending elsif states[:acquire_lock] :acquire_lock elsif states[:error] :error elsif states[:aborted] :aborted elsif states[:new] :new elsif states[:successful] :successful else :unknown end end
Return true, if the task_queue is working or has pending jobs
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 241 def working? self.task_states == :running && self.running? end
Private Instance Methods
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 420 def acts_as_task_queue_config self.class.acts_as_task_queue_config end
Here you can add clean up tasks which will be executed before the auto-reload at the end of the task-queue execution. This is handy if you want to remove the virtual-flag from objects for example. Return true, when a config reload is needed.
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 476 def cleanup_before_auto_reload true end
Clean up locks after an error occured
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 469 def cleanup_locks_after_error_for(task) release_lock_for task end
Deinitialize
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 487 def deinit end
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 425 def determine_state_of_task_array(task_array) successful = true new = true pending = false error = false aborted = false running = false acquire_lock = false init_queue = false task_array.each do |task| running = true if task.state == 'running' acquire_lock = true if task.state == 'acquire_lock' successful = false unless task.state == 'finished' || task.state == 'successful' new = false unless task.state == 'new' || task.state == 'new_popped' || task.state == 'build' pending = true if (task.state == 'new' || task.state == 'new_popped' || task.state == 'build' || task.state == 'pending') error = true if task.state == 'error' aborted = true if task.state == 'aborted' init_queue = true if task.state == 'init_queue' end {:running => running, :acquire_lock => acquire_lock, :successful => successful, :new => new, :pending => pending, :error => error, :aborted => aborted, :empty => task_array.empty?, :init_queu => init_queue} end
Get Lock
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 459 def get_lock_for(task) end
Interrupt handler
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 498 def handle_interrupts(args = {}) interrupts.each { |int| send int.to_sym } update_attributes :interrupts => [] rescue => exception backtrace = exception.backtrace.join("\n ") log :msg => "Fatal error in method 'handle_interrupts': #{$!}\n #{backtrace}", :sev => :error, :print_log => args[:print_log] end
Initialize
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 482 def init end
Release Lock
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 464 def release_lock_for(task) end
Reload config
# File lib/queue_dispatcher/acts_as_task_queue.rb, line 492 def reload_config(last_task, args = {}) #log :msg => "#{name}: Reloading config...", :print_log => args[:print_log] end