class Patriot::Worker::Base

@abstract base class for worker implementations

Attributes

config[RW]
cycle[RW]
host[RW]
job_store[RW]
started_at[R]
status[RW]

Public Class Methods

new(config) click to toggle source

@param config [Patriot::Util::Config::Base]

# File lib/patriot/worker/base.rb, line 40
def initialize(config)
  raise "configuration is nil" if config.nil?
  @logger      = create_logger(config)
  @config      = config
  @job_store   = create_jobstore(Patriot::JobStore::ROOT_STORE_ID, @config)
  @host        = `hostname`.chomp
  @cycle       = config.get('fetch_cycle', Patriot::Worker::DEFAULT_FETCH_CYCLE).to_i
  @fetch_limit = config.get('fetch_limit', Patriot::Worker::DEFAULT_FETCH_LIMIT).to_i
  @worker_name = config.get('worker_name', Patriot::Worker::DEFAULT_WORKER_NAME)
  @info_server = Patriot::Worker::InfoServer.new(self,@config)
end

Public Instance Methods

execute_job(job_ticket) click to toggle source

execute a job @param [Patriot::JobStore::JobTicket] job_ticket a ticket of job to be executed @return [Patriot::Command::ExitCode]

# File lib/patriot/worker/base.rb, line 55
def execute_job(job_ticket)
  job_ticket.exec_host   = @host
  job_ticket.exec_node   = Thread.current[:name]
  begin
    response = @job_store.offer_to_execute(job_ticket)
  rescue Exception => e
    @logger.error e
    return Patriot::Command::ExitCode::FAILED
  end

  # already executed by other node
  return Patriot::Command::ExitCode::SKIPPED if response.nil?

  @logger.info " executing job: #{job_ticket.job_id}"
  command                 = response[:command]
  job_ticket.execution_id = response[:execution_id]
  job_ticket.exit_code    = Patriot::Command::ExitCode::FAILED
  begin
    command.execute
    job_ticket.exit_code  = Patriot::Command::ExitCode::SUCCEEDED
  rescue Exception => e
    @logger.warn " job : #{job_ticket.job_id} failed"
    @logger.warn e
    job_ticket.description = e.to_s
  else
    job_ticket.description = command.description
  ensure
    begin
      execute_with_retry{ @job_store.report_completion_status(job_ticket) }
    rescue Exception => job_store_error
      @logger.error job_store_error
    end
    unless command.post_processors.nil?
      continue_post_processing = true
      command.post_processors.each do |pp|
        begin
          if continue_post_processing
            @logger.info "executing post process by #{pp}"
            continue_post_processing = continue_post_processing && pp.process(command, self, job_ticket)
          else
            @logger.info "skipping post process by #{pp}"
          end
        rescue Exception => post_process_error
          @logger.error "post process by #{pp} failed"
          @logger.error post_process_error
        end
      end
    end
  end
  return job_ticket.exit_code
end
get_pid() click to toggle source

@return [Integer] pid if the worker is running, otherwise nil

# File lib/patriot/worker/base.rb, line 108
def get_pid
  return Patriot::Worker.get_pid(@config)
end
init_worker() click to toggle source

should be overrided in sub class This method is for implementation-specific configuration

# File lib/patriot/worker/base.rb, line 147
def init_worker
  raise NotImplementedError
end
request_shutdown() click to toggle source

send a request graceful shutdown to a running worker @return [Boolean] true worker is running and request is sent, otherwise false

# File lib/patriot/worker/base.rb, line 114
def request_shutdown
  pid = get_pid
  if pid.nil?
    @logger.info("worker #{@worker_name} does not exist")
    return false
  end
  Process.kill(SIGNAL_FOR_GRACEFUL_SHUTDOWN[0], pid.to_i)
  return true
end
run_worker() click to toggle source

should be overrided in sub class Main loop in which the worker fetches and executes jobs should be implemented here

# File lib/patriot/worker/base.rb, line 153
def run_worker
  raise NotImplementedError
end
start_worker() click to toggle source

main entry point of worker processing

# File lib/patriot/worker/base.rb, line 125
def start_worker
  return unless get_pid.nil?
  @logger.info "starting worker #{@node}@#{@host}"
  pid_file = Patriot::Worker.get_pid_file(@config)
  File.open(pid_file, 'w') {|f| f.write($$)} # save pid for shutdown
  set_traps
  @info_server.start_server
  @started_at = Time.now
  @logger.info "initiating worker #{@node}@#{@host}"
  init_worker
  @status = Patriot::Worker::Status::ACTIVE
  @logger.info "start worker #{@node}@#{@host}"
  run_worker
  @logger.info "shutting down worker #{@node}@#{@host}"
  stop_worker
  # should be last since worker_admin judge availability from the info_server
  @info_server.shutdown_server
end
stop_worker() click to toggle source

should be overrided in sub class Tasks for tearing down the worker should be implemented here

# File lib/patriot/worker/base.rb, line 159
def stop_worker
  raise NotImplementedError
end

Private Instance Methods

set_traps() click to toggle source
# File lib/patriot/worker/base.rb, line 163
def set_traps
  Patriot::Worker::SIGNAL_FOR_GRACEFUL_SHUTDOWN.each do |s|
    Signal.trap(s) do
      @status = Patriot::Worker::Status::SHUTDOWN
    end
  end
  Patriot::Worker::SIGNAL_FOR_THREAD_DUMP.each do |s|
    # TODO may not work on Ruby 2.x
    Signal.trap(s) do
      # TODO output to separated stream
      Thread.list.each do |t|
        @logger.info("Thread #{t[:name]}\n#{t.backtrace.map{|bt| "\t#{bt}"}.join("\n")}")
      end
    end
  end
end