class LittleMonster::Core::Job::Orchrestator
Attributes
job[R]
logger[R]
Public Class Methods
new(job)
click to toggle source
# File lib/little_monster/core/job_orchrestator.rb, line 6 def initialize(job) @job = job @logger = @job.logger end
Public Instance Methods
abort_job(error)
click to toggle source
Methods that work both on tasks and callbacks
# File lib/little_monster/core/job_orchrestator.rb, line 138 def abort_job(error) logger.debug 'notifiying abort...' if @job.callback_running? logger.info "[type:finish_callback] [status:error] data: #{@job.data.to_h[:outputs]}" @job.notify_callback :error, exception: error # if callback is not on_error, raise exception to run on_error if @job.current_action != :on_error # set status on pending because we are sending the job back to the queue @job.status = :pending raise CallbackFailedError, '[type:callback_fail_error]' end else @job.notify_task :error, exception: error logger.info "[type:finish_task] [status:error] data: #{@job.data.to_h[:outputs]}" end @job.status = :error end
build_task(task_symbol)
click to toggle source
# File lib/little_monster/core/job_orchrestator.rb, line 114 def build_task(task_symbol) task = @job.task_class_for(task_symbol).new(@job.data) task.send(:set_default_values, data: @job.data, job_id: @job.id, job_logger: logger, cancelled_callback: @job.method(:is_cancelled?), retries: @job.retries, max_retries: @job.max_retries, retry_callback: @job.method(:retry?)) task end
cancel()
click to toggle source
# File lib/little_monster/core/job_orchrestator.rb, line 127 def cancel logger.debug 'notifiying cancel...' @job.notify_task :cancelled logger.info "[type:finish_task] [status:cancelled] data: #{@job.data.to_h[:outputs]}" @job.status = :cancelled end
do_retry(error)
click to toggle source
# File lib/little_monster/core/job_orchrestator.rb, line 173 def do_retry(error) if @job.retry? logger.debug "Retry ##{@job.retries} of #{@job.max_retries}" @job.retries += 1 logger.debug 'notifiying retry' if @job.callback_running? @job.notify_callback :pending, retries: @job.retries, exception: error logger.info '[type:callback_retry]' else @job.notify_task :pending, retries: @job.retries, exception: error logger.info '[type:task_retry]' end @job.status = :pending logger.info "[type:job_retry] data: #{@job.data.to_h[:outputs]}" raise JobRetryError, "doing retry #{@job.retries} of #{@job.max_retries}" else logger.debug 'job has reached max retries' if @job.callback_running? logger.info '[type:callback_max_retries]' else logger.info '[type:task_max_retries]' end logger.info "[type:job_max_retries] [retries:#{@job.max_retries}]" abort_job(error) end end
handle_error(error)
click to toggle source
# File lib/little_monster/core/job_orchrestator.rb, line 159 def handle_error(error) raise error if LittleMonster.env.development? logger.error "[type:error] [error_type:#{error.class}][message:#{error.message.dump}] \n #{error.backtrace.to_a.join("\n\t")}" @job.error = @job.serialize_error error if error.is_a?(FatalTaskError) || error.is_a?(NameError) logger.debug 'error is fatal, aborting run' return abort_job(error) end do_retry(error) end
run()
click to toggle source
# File lib/little_monster/core/job_orchrestator.rb, line 11 def run # notifies status as running and then restores old_status if it is an ending status last_status = @job.status @job.status = :running @job.notify_status if Job::ENDED_STATUS.include? last_status @job.status = last_status else run_tasks logger.default_tags.delete(:current_task) # reset retries so retries don't mix between tasks and callbacks @job.retries = 0 end run_callback logger.info "[type:job_finish] [status:#{@job.status}] data: #{@job.data.to_h[:outputs]}" ensure options = {} options[:data] = @job.data if @job.ended_status? @job.notify_status options end
run_callback()
click to toggle source
# File lib/little_monster/core/job_orchrestator.rb, line 84 def run_callback @job.current_action = @job.callback_to_run return if @job.current_action.nil? logger.default_tags[:callback] = @job.current_action @job.notify_callback :running logger.info "[type:start_callback] data: #{@job.data.to_h[:outputs]}" begin logger.default_tags[:type] = 'callback_log' @job.public_send(@job.current_action) ensure logger.default_tags.delete(:type) end logger.info "[type:finish_callback] [status:success] data: #{@job.data.to_h[:outputs]}" @job.notify_callback :success @job.current_action = nil @job.retries = 0 logger.default_tags.delete(:callback) rescue APIUnreachableError => e logger.error "[type:api_unreachable] [message:#{e.message.dump}]" raise e rescue StandardError => e logger.debug "[type:standard_error] an error was catched with [message:#{e.message.dump}]" handle_error e end
run_tasks()
click to toggle source
# File lib/little_monster/core/job_orchrestator.rb, line 35 def run_tasks @job.tasks_to_run.each do |task_name| @job.current_action = task_name @job.notify_task :running logger.default_tags[:current_task] = @job.current_action logger.info "[type:start_task] data: #{@job.data.to_h[:outputs]}" begin raise LittleMonster::CancelError if @job.is_cancelled? task = build_task(task_name) task.run # data is sent only on task success @job.notify_task :success, data: @job.data logger.info "[type:finish_task] [status:success] data: #{@job.data.to_h[:outputs]}" if @job.mock? @job.runned_tasks[task_name] = {} @job.runned_tasks[task_name][:instance] = task @job.runned_tasks[task_name][:data] = @job.data.to_h[:outputs].to_h.dup end rescue APIUnreachableError => e logger.error "[type:api_unreachable] [message:#{e.message.dump}]" raise e rescue CancelError => e logger.info '[type:cancel] job was cancelled' cancel return rescue StandardError => e logger.debug "[type:standard_error] an error was catched with [message:#{e.message.dump}]" begin task.error e unless e.is_a? NameError rescue StandardError => task_error ensure handle_error task_error || e return end end @job.retries = 0 # Hago esto para que despues de succesful un task resete retries end @job.current_action = nil @job.status = :success end