class Patriot::JobStore::InMemoryStore

a JobStore implementation on memory

Public Class Methods

new(store_id, config) click to toggle source

@see Patriot::JobStore::Base#initialize

# File lib/patriot/job_store/in_memory_store.rb, line 11
def initialize(store_id, config)
  @config = config
  @logger = create_logger(config)
  @mutex  = Mutex.new
  @jobs       = {} # hash from job_id to job content in hash
  # hash from state to list of job_id
  @job_states = {Patriot::JobStore::JobState::INIT      => [],
                 Patriot::JobStore::JobState::SUCCEEDED => [Patriot::JobStore::INITIATOR_JOB_ID],
                 Patriot::JobStore::JobState::WAIT      => [],
                 Patriot::JobStore::JobState::RUNNING   => [],
                 Patriot::JobStore::JobState::SUSPEND   => [],
                 Patriot::JobStore::JobState::FAILED    => [],
                 Patriot::JobStore::JobState::DISCARDED => []}
  @producers   = {} # hash from job_id to produces products
  @consumers   = {} # hash from job_id to referece products
  @job_history = {} # hash from job_id to a array of its execution hisotry
end

Public Instance Methods

acceptable?(job) click to toggle source

@see Patriot::JobStore::Base#acceptable?

# File lib/patriot/job_store/in_memory_store.rb, line 60
def acceptable?(job)
  raise "invalid class #{job.class}" unless job.is_a?(Patriot::JobStore::Job)
  return true
end
delete_job(job_id) click to toggle source

@see Patriot::JobStore::Base#delete_job

# File lib/patriot/job_store/in_memory_store.rb, line 302
def delete_job(job_id)
  job_id = job_id.to_sym
  @mutex.synchronize do
    @job_states.each{|s,js| js.delete(job_id)}
    @jobs.delete(job_id)
    @producers.delete(job_id)
    @consumers.delete(job_id)
  end
end
find_jobs_by_state(state, opts = {}) click to toggle source

@see Patriot::JobStore::Base#find_jobs_by_state

# File lib/patriot/job_store/in_memory_store.rb, line 190
def find_jobs_by_state(state, opts = {})
  all_records = @job_states[state] - [Patriot::JobStore::INITIATOR_JOB_ID]
  size        = all_records.size
  opts        = {:limit => size, :offset => 0}.merge(opts)
  filter      = opts.has_key?(:filter_exp) ? Regexp.new(opts[:filter_exp].gsub(/(?<!\\)%/,'.*').gsub(/(?<!\\)_/,'.')) : nil
  result      = []
  opts[:offset].upto(size).each do |i|
    break if i >= size
    break if result.size >= opts[:limit]
    job_id = all_records[size - 1 - i].to_s
    next  if !filter.nil? && !filter.match(job_id)
    result << job_id
  end
  return result
end
get_consumers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) click to toggle source

@see Patriot::JobStore::Base#get_consumers

# File lib/patriot/job_store/in_memory_store.rb, line 173
def get_consumers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]})
  opts = {:include_attrs => []}.merge(opts)
  products = [products] unless products.is_a?(Array)
  consumers = []
  products.each{|product|
    @consumers.map{|pid, prods|
      if prods.include?(product)
        job = @jobs[pid].filter_attributes(opts[:include_attrs])
        job[:job_id] = pid.to_s
        consumers.push(job)
      end
    }
  }
  return consumers.uniq
end
get_execution_history(job_id, opts = {}) click to toggle source

@see Patriot::JobStore::Base#get_execution_history

# File lib/patriot/job_store/in_memory_store.rb, line 207
def get_execution_history(job_id, opts = {})
  opts = {:limit => 1, :order => :DESC}
  return @job_history[job_id.to_sym] || []
end
get_graph(job_id, opts = {}) click to toggle source

get nodes and edges information to render graph @param [String] job_id JOB ID @param [Hash] opts options @return [Array] [nodes, edges]

# File lib/patriot/job_store/in_memory_store.rb, line 216
def get_graph(job_id, opts = {})
  job = get(job_id)
  history = get_execution_history(job_id, {})[0]

  hashed_job = {
    :job_id => job.job_id,
    :history => history,
    :depth => 0
  }.merge(job.attributes)

  # set self node
  nodes = {job_id => hashed_job}
  edges = []

  _set_dependency(
    :producers,
    opts[:producer_depth],
    nodes,
    edges,
    hashed_job
  )

  _set_dependency(
    :consumers,
    opts[:consumer_depth],
    nodes,
    edges,
    hashed_job
  )

  return {:nodes => nodes, :edges => edges}
end
get_job(job_id) click to toggle source

@see Patriot::JobStore::Base#get_job

# File lib/patriot/job_store/in_memory_store.rb, line 149
def get_job(job_id)
  return nil if job_id.nil?
  raise "string is expected but job_id is a #{job_id.class}" unless job_id.is_a?(String)
  return @jobs[job_id.to_sym]
end
get_job_size(opts = {}) click to toggle source

@see Patriot::JobStore::Base#get_job_size

# File lib/patriot/job_store/in_memory_store.rb, line 290
def get_job_size(opts = {})
  opts  = {:ignore_states => []}.merge(opts)
  sizes = {}
  @job_states.each do |s,js|
    next if opts[:ignore_states].include?(s)
    sizes[s] = js.size
    sizes[s] = sizes[s] -1 if s == Patriot::JobStore::JobState::SUCCEEDED
  end
  return sizes
end
get_job_tickets(host, nodes, options = {}) click to toggle source

@see Patriot::JobStore::Base#get_job_tickets

# File lib/patriot/job_store/in_memory_store.rb, line 66
def get_job_tickets(host, nodes, options = {})
  nodes = [nodes] unless nodes.is_a?(Array)
  @mutex.synchronize do
    return @job_states[Patriot::JobStore::JobState::WAIT].map do |wid|
      job = @jobs[wid]
      # check host and node
      next unless job[Patriot::Command::EXEC_NODE_ATTR].nil?      || nodes.include?(job[Patriot::Command::EXEC_NODE_ATTR])
      next unless job[Patriot::Command::EXEC_HOST_ATTR].nil?      || host == job[Patriot::Command::EXEC_HOST_ATTR]
      next unless job[Patriot::Command::START_DATETIME_ATTR].nil? || Time.now > job[Patriot::Command::START_DATETIME_ATTR]
      # check dependency
      reference = @consumers[wid] || []
      producers = @producers.map{|pid, prods| pid unless (prods & reference).empty?}.compact
      next if !reference.empty? && producers.empty? # no producer exists
      next if producers.any?{|pjid| !@job_states[Patriot::JobStore::JobState::SUCCEEDED].include?(pjid)}
      JobTicket.new(wid.to_s, job.update_id, job[Patriot::Command::EXEC_NODE_ATTR])
    end.compact
  end
end
get_producers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) click to toggle source

@see Patriot::JobStore::Base#get_producers

# File lib/patriot/job_store/in_memory_store.rb, line 156
def get_producers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]})
  opts = {:include_attrs => []}.merge(opts)
  products = [products] unless products.is_a?(Array)
  producers = []
  products.each{|product|
    @producers.map{|pid, prods|
      if prods.include?(product)
        job = @jobs[pid].filter_attributes(opts[:include_attrs])
        job[:job_id] = pid.to_s
        producers.push(job)
      end
    }
  }
  return producers.uniq
end
offer_to_execute(job_ticket) click to toggle source

@see Patriot::JobStore::Base#offer_to_execute

# File lib/patriot/job_store/in_memory_store.rb, line 86
def offer_to_execute(job_ticket)
  job_id    = job_ticket.job_id.to_sym
  update_id = job_ticket.update_id
  @mutex.synchronize do
    unless _check_and_set_state(job_id, update_id, Patriot::JobStore::JobState::WAIT, Patriot::JobStore::JobState::RUNNING)
      @logger.debug("execution of job: #{job_id} is skipped")
      return
    end
    job = @jobs[job_id]
    raise "no entry found for #{job_ticket}" if job.nil?
    begin
      # TODO make the max number of histories configurable and keep multiple histories
      execution_id         = Time.now.to_i
      @job_history[job_id] = [{:id       => execution_id,
                               :job_id   => job_id.to_s,
                               :host     => job_ticket.exec_host,
                               :node     => job_ticket.exec_node,
                               :thread   => job_ticket.exec_thread,
                               :begin_at => Time.now
                             }]
      return {:execution_id => execution_id, :command => job.to_command(@config)}
    rescue Exception => e
      _check_and_set_state(job_id, update_id, Patriot::JobStore::JobState::RUNNING, Patriot::JobStore::JobState::FAILED)
      raise e
    end
  end
end
register(update_id, jobs) click to toggle source

@see Patriot::JobStore::Base#register

# File lib/patriot/job_store/in_memory_store.rb, line 30
def register(update_id, jobs)
  jobs.each{|job| raise "#{job.job_id} is not acceptable" unless acceptable?(job) }
  @mutex.synchronize do
    jobs.each {|job| _upsert(update_id, job) }
  end
end
report_completion_status(job_ticket) click to toggle source

@see Patriot::JobStore::Base#report_completion_status

# File lib/patriot/job_store/in_memory_store.rb, line 115
def report_completion_status(job_ticket)
  job_id    = job_ticket.job_id.to_sym
  update_id = job_ticket.update_id
  exit_code = job_ticket.exit_code
  raise "exit code is not set " if exit_code.nil?
  state     = Patriot::JobStore::EXIT_CODE_TO_STATE[exit_code]
  raise "invalid exit code #{exit_code} " if state.nil?
  @mutex.synchronize do
    # TODO save finish_time to history server
    last_history = @job_history[job_id]
    raise "illegal state job_history is not set for #{job_id}" if last_history.nil? || last_history.empty?
    last_history = last_history[0]
    # TODO make the max number of histories configurable and keep multiple histories
    @job_history[job_id] = [last_history.merge({:exit_code => exit_code, :end_at => Time.now, :description => job_ticket.description})]
    return _check_and_set_state(job_id, update_id, Patriot::JobStore::JobState::RUNNING, state)
  end
end
set_state(update_id, job_ids, new_state) click to toggle source

@see Patriot::JobStore::Base#set_state

# File lib/patriot/job_store/in_memory_store.rb, line 134
def set_state(update_id, job_ids, new_state)
  @mutex.synchronize do
    job_ids = job_ids.map do |jid|
      @jobs[jid.to_sym][Patriot::Command::STATE_ATTR] = new_state
      jid.to_sym
    end
    @job_states.each do |s,jobs|
      next if s == new_state
      @job_states[s] -= job_ids
    end
    @job_states[new_state] += job_ids
  end
end

Private Instance Methods

_check_and_set_state(job_id, update_id, prev_state, post_state) click to toggle source

not thread safe. should be locked around invocation @param [Symbol] job_id @param [Integer] update_id @param [Integer] prev_state @param [Integer] post_state @return [Boolean] true if state is changed, otherwise false

# File lib/patriot/job_store/in_memory_store.rb, line 320
def _check_and_set_state(job_id, update_id, prev_state, post_state)
  return false unless @job_states[prev_state].include?(job_id)
  return false unless @jobs[job_id].update_id == update_id
  _set_state(job_id, post_state)
  return true
end
_set_dependency(direction, depth, nodes, edges, base_job) click to toggle source

get dependency and set nodes and edges

@private @param [Symbol] direction :producers or :consumers @param [Integer] depth dependency depth to get @param [Hash] nodes nodes to set for dager-d3 @param [Array] edges edges to set for dager-d3 @param [Hash] base_job base job to get dependency

# File lib/patriot/job_store/in_memory_store.rb, line 257
def _set_dependency(direction, depth, nodes, edges, base_job)
  return if nodes[base_job[:job_id]][:depth] == depth

  base_job[direction].map{|depend_job|
    job = get(depend_job[:job_id])
    history = get_execution_history(depend_job[:job_id], {})[0]

    hashed_job = {
      :job_id => job.job_id,
      :history => history,
      :depth => base_job[:depth] + 1
    }.merge(job.attributes)

    nodes[job.job_id] = hashed_job
    if direction == :producers
      edges.push([job.job_id, base_job[:job_id]])
    else
      edges.push([base_job[:job_id], job.job_id])
    end

    # call recursively
    _set_dependency(
      direction,
      depth,
      nodes,
      edges,
      hashed_job
    )
  }
end
_set_state(job_id, new_state) click to toggle source

set job state @param [String] job_id @param [Integer] new_state new state of the job. set nil to keep_state

# File lib/patriot/job_store/in_memory_store.rb, line 331
def _set_state(job_id, new_state)
  return if new_state.nil?
  job_id = job_id.to_sym
  @job_states.each do |s,jobs|
    deleted_id = jobs.delete(job_id)
    break unless deleted_id.nil?
  end
  @jobs[job_id][Patriot::Command::STATE_ATTR] = new_state
  @job_states[new_state] << job_id
end
_upsert(update_id, job) click to toggle source
# File lib/patriot/job_store/in_memory_store.rb, line 37
def _upsert(update_id, job)
  job_id        = job.job_id.to_sym
  if @jobs.has_key?(job_id) # update
    original = @jobs[job_id]
    job[Patriot::Command::STATE_ATTR] ||= original[Patriot::Command::STATE_ATTR]
    job.update_id = original.update_id
  else # insert
    job[Patriot::Command::STATE_ATTR] ||= Patriot::JobStore::JobState::INIT
    raise "update_id id should not be nil for new jobs" if update_id.nil?
    job.update_id = update_id
  end
  @jobs[job_id]      = job
  @producers[job_id] = job[Patriot::Command::PRODUCTS_ATTR] || []
  @consumers[job_id] = job[Patriot::Command::REQUISITES_ATTR] || []
  if job[Patriot::Command::STATE_ATTR] == Patriot::JobStore::JobState::INIT
    _set_state(job_id, Patriot::JobStore::JobState::WAIT)
  else
    _set_state(job_id, job[Patriot::Command::STATE_ATTR])
  end
end