class Patriot::Worker::Base
@abstract base class for worker implementations
Attributes
config[RW]
cycle[RW]
host[RW]
job_store[RW]
started_at[R]
status[RW]
Public Class Methods
new(config)
click to toggle source
@param config [Patriot::Util::Config::Base]
# File lib/patriot/worker/base.rb, line 40 def initialize(config) raise "configuration is nil" if config.nil? @logger = create_logger(config) @config = config @job_store = create_jobstore(Patriot::JobStore::ROOT_STORE_ID, @config) @host = `hostname`.chomp @cycle = config.get('fetch_cycle', Patriot::Worker::DEFAULT_FETCH_CYCLE).to_i @fetch_limit = config.get('fetch_limit', Patriot::Worker::DEFAULT_FETCH_LIMIT).to_i @worker_name = config.get('worker_name', Patriot::Worker::DEFAULT_WORKER_NAME) @info_server = Patriot::Worker::InfoServer.new(self,@config) end
Public Instance Methods
execute_job(job_ticket)
click to toggle source
execute a job @param [Patriot::JobStore::JobTicket] job_ticket a ticket of job to be executed @return [Patriot::Command::ExitCode]
# File lib/patriot/worker/base.rb, line 55 def execute_job(job_ticket) job_ticket.exec_host = @host job_ticket.exec_node = Thread.current[:name] begin response = @job_store.offer_to_execute(job_ticket) rescue Exception => e @logger.error e return Patriot::Command::ExitCode::FAILED end # already executed by other node return Patriot::Command::ExitCode::SKIPPED if response.nil? @logger.info " executing job: #{job_ticket.job_id}" command = response[:command] job_ticket.execution_id = response[:execution_id] job_ticket.exit_code = Patriot::Command::ExitCode::FAILED begin command.execute job_ticket.exit_code = Patriot::Command::ExitCode::SUCCEEDED rescue Exception => e @logger.warn " job : #{job_ticket.job_id} failed" @logger.warn e job_ticket.description = e.to_s else job_ticket.description = command.description ensure begin execute_with_retry{ @job_store.report_completion_status(job_ticket) } rescue Exception => job_store_error @logger.error job_store_error end unless command.post_processors.nil? continue_post_processing = true command.post_processors.each do |pp| begin if continue_post_processing @logger.info "executing post process by #{pp}" continue_post_processing = continue_post_processing && pp.process(command, self, job_ticket) else @logger.info "skipping post process by #{pp}" end rescue Exception => post_process_error @logger.error "post process by #{pp} failed" @logger.error post_process_error end end end end return job_ticket.exit_code end
get_pid()
click to toggle source
@return [Integer] pid if the worker is running, otherwise nil
# File lib/patriot/worker/base.rb, line 108 def get_pid return Patriot::Worker.get_pid(@config) end
init_worker()
click to toggle source
should be overrided in sub class This method is for implementation-specific configuration
# File lib/patriot/worker/base.rb, line 147 def init_worker raise NotImplementedError end
request_shutdown()
click to toggle source
send a request graceful shutdown to a running worker @return [Boolean] true worker is running and request is sent, otherwise false
# File lib/patriot/worker/base.rb, line 114 def request_shutdown pid = get_pid if pid.nil? @logger.info("worker #{@worker_name} does not exist") return false end Process.kill(SIGNAL_FOR_GRACEFUL_SHUTDOWN[0], pid.to_i) return true end
run_worker()
click to toggle source
should be overrided in sub class Main loop in which the worker fetches and executes jobs should be implemented here
# File lib/patriot/worker/base.rb, line 153 def run_worker raise NotImplementedError end
start_worker()
click to toggle source
main entry point of worker processing
# File lib/patriot/worker/base.rb, line 125 def start_worker return unless get_pid.nil? @logger.info "starting worker #{@node}@#{@host}" pid_file = Patriot::Worker.get_pid_file(@config) File.open(pid_file, 'w') {|f| f.write($$)} # save pid for shutdown set_traps @info_server.start_server @started_at = Time.now @logger.info "initiating worker #{@node}@#{@host}" init_worker @status = Patriot::Worker::Status::ACTIVE @logger.info "start worker #{@node}@#{@host}" run_worker @logger.info "shutting down worker #{@node}@#{@host}" stop_worker # should be last since worker_admin judge availability from the info_server @info_server.shutdown_server end
stop_worker()
click to toggle source
should be overrided in sub class Tasks for tearing down the worker should be implemented here
# File lib/patriot/worker/base.rb, line 159 def stop_worker raise NotImplementedError end
Private Instance Methods
set_traps()
click to toggle source
# File lib/patriot/worker/base.rb, line 163 def set_traps Patriot::Worker::SIGNAL_FOR_GRACEFUL_SHUTDOWN.each do |s| Signal.trap(s) do @status = Patriot::Worker::Status::SHUTDOWN end end Patriot::Worker::SIGNAL_FOR_THREAD_DUMP.each do |s| # TODO may not work on Ruby 2.x Signal.trap(s) do # TODO output to separated stream Thread.list.each do |t| @logger.info("Thread #{t[:name]}\n#{t.backtrace.map{|bt| "\t#{bt}"}.join("\n")}") end end end end