class Patriot::JobStore::InMemoryStore
a JobStore
implementation on memory
Public Class Methods
@see Patriot::JobStore::Base#initialize
# File lib/patriot/job_store/in_memory_store.rb, line 11 def initialize(store_id, config) @config = config @logger = create_logger(config) @mutex = Mutex.new @jobs = {} # hash from job_id to job content in hash # hash from state to list of job_id @job_states = {Patriot::JobStore::JobState::INIT => [], Patriot::JobStore::JobState::SUCCEEDED => [Patriot::JobStore::INITIATOR_JOB_ID], Patriot::JobStore::JobState::WAIT => [], Patriot::JobStore::JobState::RUNNING => [], Patriot::JobStore::JobState::SUSPEND => [], Patriot::JobStore::JobState::FAILED => [], Patriot::JobStore::JobState::DISCARDED => []} @producers = {} # hash from job_id to produces products @consumers = {} # hash from job_id to referece products @job_history = {} # hash from job_id to a array of its execution hisotry end
Public Instance Methods
@see Patriot::JobStore::Base#acceptable?
# File lib/patriot/job_store/in_memory_store.rb, line 60 def acceptable?(job) raise "invalid class #{job.class}" unless job.is_a?(Patriot::JobStore::Job) return true end
@see Patriot::JobStore::Base#delete_job
# File lib/patriot/job_store/in_memory_store.rb, line 302 def delete_job(job_id) job_id = job_id.to_sym @mutex.synchronize do @job_states.each{|s,js| js.delete(job_id)} @jobs.delete(job_id) @producers.delete(job_id) @consumers.delete(job_id) end end
@see Patriot::JobStore::Base#find_jobs_by_state
# File lib/patriot/job_store/in_memory_store.rb, line 190 def find_jobs_by_state(state, opts = {}) all_records = @job_states[state] - [Patriot::JobStore::INITIATOR_JOB_ID] size = all_records.size opts = {:limit => size, :offset => 0}.merge(opts) filter = opts.has_key?(:filter_exp) ? Regexp.new(opts[:filter_exp].gsub(/(?<!\\)%/,'.*').gsub(/(?<!\\)_/,'.')) : nil result = [] opts[:offset].upto(size).each do |i| break if i >= size break if result.size >= opts[:limit] job_id = all_records[size - 1 - i].to_s next if !filter.nil? && !filter.match(job_id) result << job_id end return result end
@see Patriot::JobStore::Base#get_consumers
# File lib/patriot/job_store/in_memory_store.rb, line 173 def get_consumers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) opts = {:include_attrs => []}.merge(opts) products = [products] unless products.is_a?(Array) consumers = [] products.each{|product| @consumers.map{|pid, prods| if prods.include?(product) job = @jobs[pid].filter_attributes(opts[:include_attrs]) job[:job_id] = pid.to_s consumers.push(job) end } } return consumers.uniq end
@see Patriot::JobStore::Base#get_execution_history
# File lib/patriot/job_store/in_memory_store.rb, line 207 def get_execution_history(job_id, opts = {}) opts = {:limit => 1, :order => :DESC} return @job_history[job_id.to_sym] || [] end
get nodes and edges information to render graph @param [String] job_id JOB ID @param [Hash] opts options @return [Array] [nodes, edges]
# File lib/patriot/job_store/in_memory_store.rb, line 216 def get_graph(job_id, opts = {}) job = get(job_id) history = get_execution_history(job_id, {})[0] hashed_job = { :job_id => job.job_id, :history => history, :depth => 0 }.merge(job.attributes) # set self node nodes = {job_id => hashed_job} edges = [] _set_dependency( :producers, opts[:producer_depth], nodes, edges, hashed_job ) _set_dependency( :consumers, opts[:consumer_depth], nodes, edges, hashed_job ) return {:nodes => nodes, :edges => edges} end
@see Patriot::JobStore::Base#get_job
# File lib/patriot/job_store/in_memory_store.rb, line 149 def get_job(job_id) return nil if job_id.nil? raise "string is expected but job_id is a #{job_id.class}" unless job_id.is_a?(String) return @jobs[job_id.to_sym] end
@see Patriot::JobStore::Base#get_job_size
# File lib/patriot/job_store/in_memory_store.rb, line 290 def get_job_size(opts = {}) opts = {:ignore_states => []}.merge(opts) sizes = {} @job_states.each do |s,js| next if opts[:ignore_states].include?(s) sizes[s] = js.size sizes[s] = sizes[s] -1 if s == Patriot::JobStore::JobState::SUCCEEDED end return sizes end
@see Patriot::JobStore::Base#get_job_tickets
# File lib/patriot/job_store/in_memory_store.rb, line 66 def get_job_tickets(host, nodes, options = {}) nodes = [nodes] unless nodes.is_a?(Array) @mutex.synchronize do return @job_states[Patriot::JobStore::JobState::WAIT].map do |wid| job = @jobs[wid] # check host and node next unless job[Patriot::Command::EXEC_NODE_ATTR].nil? || nodes.include?(job[Patriot::Command::EXEC_NODE_ATTR]) next unless job[Patriot::Command::EXEC_HOST_ATTR].nil? || host == job[Patriot::Command::EXEC_HOST_ATTR] next unless job[Patriot::Command::START_DATETIME_ATTR].nil? || Time.now > job[Patriot::Command::START_DATETIME_ATTR] # check dependency reference = @consumers[wid] || [] producers = @producers.map{|pid, prods| pid unless (prods & reference).empty?}.compact next if !reference.empty? && producers.empty? # no producer exists next if producers.any?{|pjid| !@job_states[Patriot::JobStore::JobState::SUCCEEDED].include?(pjid)} JobTicket.new(wid.to_s, job.update_id, job[Patriot::Command::EXEC_NODE_ATTR]) end.compact end end
@see Patriot::JobStore::Base#get_producers
# File lib/patriot/job_store/in_memory_store.rb, line 156 def get_producers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) opts = {:include_attrs => []}.merge(opts) products = [products] unless products.is_a?(Array) producers = [] products.each{|product| @producers.map{|pid, prods| if prods.include?(product) job = @jobs[pid].filter_attributes(opts[:include_attrs]) job[:job_id] = pid.to_s producers.push(job) end } } return producers.uniq end
@see Patriot::JobStore::Base#offer_to_execute
# File lib/patriot/job_store/in_memory_store.rb, line 86 def offer_to_execute(job_ticket) job_id = job_ticket.job_id.to_sym update_id = job_ticket.update_id @mutex.synchronize do unless _check_and_set_state(job_id, update_id, Patriot::JobStore::JobState::WAIT, Patriot::JobStore::JobState::RUNNING) @logger.debug("execution of job: #{job_id} is skipped") return end job = @jobs[job_id] raise "no entry found for #{job_ticket}" if job.nil? begin # TODO make the max number of histories configurable and keep multiple histories execution_id = Time.now.to_i @job_history[job_id] = [{:id => execution_id, :job_id => job_id.to_s, :host => job_ticket.exec_host, :node => job_ticket.exec_node, :thread => job_ticket.exec_thread, :begin_at => Time.now }] return {:execution_id => execution_id, :command => job.to_command(@config)} rescue Exception => e _check_and_set_state(job_id, update_id, Patriot::JobStore::JobState::RUNNING, Patriot::JobStore::JobState::FAILED) raise e end end end
@see Patriot::JobStore::Base#register
# File lib/patriot/job_store/in_memory_store.rb, line 30 def register(update_id, jobs) jobs.each{|job| raise "#{job.job_id} is not acceptable" unless acceptable?(job) } @mutex.synchronize do jobs.each {|job| _upsert(update_id, job) } end end
@see Patriot::JobStore::Base#report_completion_status
# File lib/patriot/job_store/in_memory_store.rb, line 115 def report_completion_status(job_ticket) job_id = job_ticket.job_id.to_sym update_id = job_ticket.update_id exit_code = job_ticket.exit_code raise "exit code is not set " if exit_code.nil? state = Patriot::JobStore::EXIT_CODE_TO_STATE[exit_code] raise "invalid exit code #{exit_code} " if state.nil? @mutex.synchronize do # TODO save finish_time to history server last_history = @job_history[job_id] raise "illegal state job_history is not set for #{job_id}" if last_history.nil? || last_history.empty? last_history = last_history[0] # TODO make the max number of histories configurable and keep multiple histories @job_history[job_id] = [last_history.merge({:exit_code => exit_code, :end_at => Time.now, :description => job_ticket.description})] return _check_and_set_state(job_id, update_id, Patriot::JobStore::JobState::RUNNING, state) end end
@see Patriot::JobStore::Base#set_state
# File lib/patriot/job_store/in_memory_store.rb, line 134 def set_state(update_id, job_ids, new_state) @mutex.synchronize do job_ids = job_ids.map do |jid| @jobs[jid.to_sym][Patriot::Command::STATE_ATTR] = new_state jid.to_sym end @job_states.each do |s,jobs| next if s == new_state @job_states[s] -= job_ids end @job_states[new_state] += job_ids end end
Private Instance Methods
not thread safe. should be locked around invocation @param [Symbol] job_id @param [Integer] update_id @param [Integer] prev_state @param [Integer] post_state @return [Boolean] true if state is changed, otherwise false
# File lib/patriot/job_store/in_memory_store.rb, line 320 def _check_and_set_state(job_id, update_id, prev_state, post_state) return false unless @job_states[prev_state].include?(job_id) return false unless @jobs[job_id].update_id == update_id _set_state(job_id, post_state) return true end
get dependency and set nodes and edges
@private @param [Symbol] direction :producers or :consumers @param [Integer] depth dependency depth to get @param [Hash] nodes nodes to set for dager-d3 @param [Array] edges edges to set for dager-d3 @param [Hash] base_job base job to get dependency
# File lib/patriot/job_store/in_memory_store.rb, line 257 def _set_dependency(direction, depth, nodes, edges, base_job) return if nodes[base_job[:job_id]][:depth] == depth base_job[direction].map{|depend_job| job = get(depend_job[:job_id]) history = get_execution_history(depend_job[:job_id], {})[0] hashed_job = { :job_id => job.job_id, :history => history, :depth => base_job[:depth] + 1 }.merge(job.attributes) nodes[job.job_id] = hashed_job if direction == :producers edges.push([job.job_id, base_job[:job_id]]) else edges.push([base_job[:job_id], job.job_id]) end # call recursively _set_dependency( direction, depth, nodes, edges, hashed_job ) } end
set job state @param [String] job_id @param [Integer] new_state new state of the job. set nil to keep_state
# File lib/patriot/job_store/in_memory_store.rb, line 331 def _set_state(job_id, new_state) return if new_state.nil? job_id = job_id.to_sym @job_states.each do |s,jobs| deleted_id = jobs.delete(job_id) break unless deleted_id.nil? end @jobs[job_id][Patriot::Command::STATE_ATTR] = new_state @job_states[new_state] << job_id end
# File lib/patriot/job_store/in_memory_store.rb, line 37 def _upsert(update_id, job) job_id = job.job_id.to_sym if @jobs.has_key?(job_id) # update original = @jobs[job_id] job[Patriot::Command::STATE_ATTR] ||= original[Patriot::Command::STATE_ATTR] job.update_id = original.update_id else # insert job[Patriot::Command::STATE_ATTR] ||= Patriot::JobStore::JobState::INIT raise "update_id id should not be nil for new jobs" if update_id.nil? job.update_id = update_id end @jobs[job_id] = job @producers[job_id] = job[Patriot::Command::PRODUCTS_ATTR] || [] @consumers[job_id] = job[Patriot::Command::REQUISITES_ATTR] || [] if job[Patriot::Command::STATE_ATTR] == Patriot::JobStore::JobState::INIT _set_state(job_id, Patriot::JobStore::JobState::WAIT) else _set_state(job_id, job[Patriot::Command::STATE_ATTR]) end end