class Patriot::Worker::MultiNodeWorker
a worker implementation which can host multiple node on one process
Constants
- ANY
type of node
handle any jobs without label or same label
- ANY_EXCLUDE_TYPE_OWN
type of job
executed by any node
- ONLY_SPECIFIED_NODE
executed by only same labeled node
- OWN
handle only jobs with same label
- SUPPORTED_TYPES
supporte node types
- UNEXPECTED
unknown
Public Instance Methods
init_worker()
click to toggle source
@see Patriot::Worker::Base#init_worker
# File lib/patriot/worker/multi_node_worker.rb, line 29 def init_worker nodes = @config.get('nodes') raise "nodes are not configured" if nodes.nil? nodes = [nodes] unless nodes.is_a?(Array) @nodes = {} nodes.each do |n| node_config = get_node_config(@config, n) raise "node #{n} is not configured" if node_config.nil? @nodes[n] = {:queue => Queue.new }.merge(node_config) end end
run_worker()
click to toggle source
@see Patriot::Worker::Base#run_worker
# File lib/patriot/worker/multi_node_worker.rb, line 51 def run_worker @threads = [] # create node threads for job execution @nodes.each do |node,conf| 1.upto(conf[:threads]) do |i| @threads << create_thread(node, i, conf[:queue]) end end # start main thread for updating queues Thread.current[:name] = 'main' while(alive?) if @status == Patriot::Worker::Status::ACTIVE begin job_tickets = @job_store.get_job_tickets(@host, @nodes.keys, {:fetch_limit => @fetch_limit}) @logger.info "get #{job_tickets.size} jobs" update_queue(job_tickets) unless job_tickets.nil? rescue => e @logger.error e end end sleep @cycle end end
stop_worker()
click to toggle source
@see Patriot::Worker::Base#run_worker
# File lib/patriot/worker/multi_node_worker.rb, line 144 def stop_worker @status = Patriot::Worker::Status::SHUTDOWN @logger.info "terminating worker" @nodes.each do |node,conf| conf[:queue].clear 1.upto(conf[:threads]) {|i| conf[:queue].push(:TERM) } end @threads.each{|t| t.join} @logger.info "terminated" end
Private Instance Methods
alive?()
click to toggle source
check thread status
# File lib/patriot/worker/multi_node_worker.rb, line 104 def alive? @threads.each do |t| if t.status.nil? # thread stopped by some error @logger.error "#{t[:name]} : nil " @status = Patriot::Worker::Status::SHUTDOWN elsif t.status == false # stopped by signal @logger.debug "#{t[:name]} : false " @status = Patriot::Worker::Status::SHUTDOWN else @logger.debug "#{t[:name]} : #{t.status}" end end return @status != Patriot::Worker::Status::SHUTDOWN end
create_thread(node, thread_number, queue)
click to toggle source
# File lib/patriot/worker/multi_node_worker.rb, line 75 def create_thread(node, thread_number, queue) Thread.start(queue) do |q| Thread.current[:name] = "worker_#{node}_#{thread_number}" Thread.current[:node] = node begin while(@status != Patriot::Worker::Status::SHUTDOWN) job_ticket = q.pop if job_ticket == :TERM @logger.info "terminating" else @logger.debug "fetch job #{job_ticket.job_id}" Thread.current[Patriot::Worker::JOB_ID_IN_EXECUTION] = job_ticket.job_id execute_job(job_ticket) Thread.current[Patriot::Worker::JOB_ID_IN_EXECUTION] = nil end end rescue Exception => e @logger.error e raise e, "exception in worker thread" , $@ ensure @logger.info "terminated" interrupted_job = Thread.current[Patriot::Worker::JOB_ID_IN_EXECUTION] @logger.warn "job: #{interrupted_job} could be interrupted." unless interrupted_job.nil? end end end
get_node_config(config, n)
click to toggle source
# File lib/patriot/worker/multi_node_worker.rb, line 41 def get_node_config(config, n) type = config.get("node.#{n}.type") threads = config.get("node.#{n}.threads") raise "the number of threads for node #{n} is not set" if threads.nil? raise "unsupported node type: #{n} with #{type} " unless SUPPORTED_TYPES.include?(type) return {:type => type, :threads => threads.to_i} end
type_of_job(job_ticket)
click to toggle source
# File lib/patriot/worker/multi_node_worker.rb, line 136 def type_of_job(job_ticket) return ANY_EXCLUDE_TYPE_OWN if job_ticket.node.nil? return ONLY_SPECIFIED_NODE if @nodes.has_key?(job_ticket.node) return UNEXPECTED; end
update_queue(job_tickets)
click to toggle source
update job queues
# File lib/patriot/worker/multi_node_worker.rb, line 121 def update_queue(job_tickets) @nodes.each{|node,conf| conf[:queue].clear } job_tickets.each do |job_ticket| case type_of_job(job_ticket) when ANY_EXCLUDE_TYPE_OWN @nodes.each{|node,conf| conf[:queue].push(job_ticket) unless conf[:type]==OWN } when ONLY_SPECIFIED_NODE @nodes[job_ticket.node][:queue].push(job_ticket) else @logger.warn "receive unexpected job #{job_ticket.to_s}" end end end