class Pmux::LogView::LogParser

Public Class Methods

new(jobs_cache_dir_path, cache_expire) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 34
def initialize jobs_cache_dir_path, cache_expire
  @jobs_cache_dir_path = jobs_cache_dir_path
  @cache_expire_time = cache_expire
  @logger = LoggerWrapper.instance()
end

Public Instance Methods

expire_cache(cache, now) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 152
def expire_cache cache, now
  new_cache = { "jobs" => {} }
  need_save_cache = false
  cache["jobs"].each do |job_id, job|
    if job["parsed_time"] + @cache_expire_time < now
      need_save_cache = true
      next
    end
    new_cache["jobs"][job_id] = cache["jobs"][job_id]
  end
  return [new_cache, need_save_cache]
end
fast_parse(file, job_id, parsed_time) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 62
def fast_parse file, job_id, parsed_time
  cachable = false
  job = {"end_time" => nil, "elapsed_time" => nil, "finished_tasks" => 0, "job_id" => job_id, "parsed_time" => parsed_time}
  task_cnt = 0
  start_time = nil
  end_time = nil
  doc1_buffer = ""
  doc_cnt = 0
  File.open(file) do |f|
    f.each_line do |ln|
      if @@document_re =~ ln
        if doc_cnt == 1
          new_doc = YAML.load(doc1_buffer)
          job["mapper"] = new_doc[:mapper].encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8")
        end
        doc_cnt += 1
      elsif doc_cnt == 1 && @@start_at_re =~ ln
        empty, key, value = ln.split(":", 3)
        start_time = DateTime::parse(value.strip())
        job["start_time_msec"] = start_time.strftime("%Q") 
        job["start_time"] = start_time.strftime("%Y-%m-%d %H:%M:%S")
      elsif doc_cnt == 1 && @@tasks_re =~ ln
        empty, key, value = ln.split(":", 3)
        job[key] = value.strip().to_i()
      elsif doc_cnt == 1
        doc1_buffer += ln
      elsif doc_cnt == 2 && @@task_id_re =~ ln
        task_cnt += 1
      elsif doc_cnt == 3 && @@finish_at_re =~ ln
        empty, key, value = ln.split(":", 3)
        end_time = DateTime::parse(value.strip())
        job["end_time_msec"] = end_time.strftime("%Q") 
        job["end_time"] = end_time.strftime("%Y-%m-%d %H:%M:%S")
        cachable = true
      elsif doc_cnt == 3 && @@error_status_re =~ ln
        empty, key, value = ln.split(":", 3)
        job[key] = value.strip()
        cachable = true
      elsif doc_cnt == 3 && @@error_message_re =~ ln
        empty, key, value = ln.split(":", 3)
        job[key] = value.strip().gsub(@@quote_re, "").encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8")
      end
    end
  end
  job["finished_tasks"] = task_cnt
  job["elapsed_time"] = ((end_time - start_time) * 86400).to_f if !start_time.nil? && !end_time.nil?
  if end_time.nil?
    if job["map_tasks"].nil?
      job["end_time"] = "--- %"
    elsif job["map_tasks"] == 0 
      job["end_time"] = "100%"
    else
      job["end_time"] = ((100 * job["finished_tasks"]) / job["map_tasks"]).to_s + "%"
    end
  end
  return [job, cachable]
end
full_parse(file_path) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 217
def full_parse file_path
  documents = []
  File.open(file_path) do |f|
    doc1_buffer = ""
    doc1_param = {}
    doc_cnt = 0
    new_doc = nil
    task_id = nil
    f.each_line do |ln|
      if @@document_re =~ ln
        if doc_cnt == 1
          new_doc = YAML.load(doc1_buffer)
          new_doc[:job_started_at] = doc1_param["job_started_at"] if doc1_param["job_started_at"]
          new_doc[:invoked_at] = doc1_param["invoked_at"] if doc1_param["invoked_at"]
          new_doc[:start_time] = doc1_param["start_time"] if doc1_param["start_time"]
          new_doc[:mapper] = new_doc[:mapper].to_s.encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8") if new_doc[:mapper]
          new_doc[:reducer] = new_doc[:reducer].to_s.encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8") if new_doc[:reducer]
          new_doc[:params][:mapper] = new_doc[:params][:mapper].to_s.encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8") if new_doc.key?(:params) && new_doc[:params][:mapper]
          new_doc[:params][:job_name] = new_doc[:params][:job_name].to_s.encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8") if new_doc.key?(:params) && new_doc[:params][:job_name]
        end
        if !new_doc.nil?
          new_doc.delete(task_id) if doc_cnt == 2 && !task_id.nil? && new_doc[task_id].length < 5
          documents.push(new_doc)
        end
        doc_cnt += 1
        new_doc = {}
        task_id = nil
      elsif doc_cnt == 1 && @@start_at_re =~ ln
        empty, key, value = ln.split(":", 3)
        time = DateTime::parse(value.strip())
        doc1_param[key] = time
      elsif doc_cnt == 1
        doc1_buffer += ln
      elsif doc_cnt == 2 && @@task_id_re =~ ln
        task_id, empty = ln.split(":", 2)
        new_doc[task_id] = {}
      elsif doc_cnt == 2 && @@elapse_re =~ ln
        key, value = ln.split(":", 2)
        new_doc[task_id][key.strip()] = value.strip().to_f()
      elsif doc_cnt == 2 && @@task_re =~ ln
        key, value = ln.split(":", 2)
        new_doc[task_id][key.strip()] = value.strip().gsub(@@quote_re, "")
      elsif doc_cnt == 3 && @@finish_at_re =~ ln
        empty, key, value = ln.split(":", 3)
        time = DateTime::parse(value.strip())
        new_doc[key] = time
      elsif doc_cnt == 3 && @@futter_re =~ ln
        empty, key, value = ln.split(":", 3)
        new_doc[key] = value.strip().gsub(@@quote_re, "")
      end
    end 
    if !new_doc.nil?
      new_doc.delete(task_id) if doc_cnt == 2 && new_doc[task_id].length < 5
      documents.push(new_doc)
    end
    if documents.length == 1
      documents.push(nil)
    end
    if documents.length == 2
      documents.push(nil)
    end
  end 
  return documents
end
get_files(user, log_path, data, now) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 44
def get_files user, log_path, data, now
  stime = Time.at(0) 
  etime = now
  need_slice = true
  if !data["start_time_msec"].nil? || !data["end_time_msec"].nil?
    need_slice = false
    stime = Time.at(data["start_time_msec"] / 1000, (data["start_time_msec"] % 1000) * 1000) if !data["start_time_msec"].nil?
    etime = Time.at(data["end_time_msec"] / 1000, (data["start_time_msec"] % 1000) * 1000) if !data["end_time_msec"].nil?
  end
  file_mtime_list = Dir.glob(File.expand_path(["~" + user, log_path, "*.yml"].join(File::SEPARATOR))).map{ |f| [f, File.mtime(f)] }
  file_list = file_mtime_list.sort_by{ |fm| fm[1] }.reverse.map{ |fm| fm[0] if stime <= fm[1] && fm[1] <= etime }.compact
  if need_slice
    item_count = (data["page"] + 1) * data["nitems"]
    file_list = file_list.slice(0, item_count)
  end
  return file_list
end
get_log_paths() click to toggle source
# File lib/pmux-logview/log_parser.rb, line 40
def get_log_paths
  return [@@pmux_log_path]
end
load_cache(file_path) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 120
def load_cache file_path
  cache = { "jobs" => {} } 
  return cache if !File.exist?(file_path)
  begin
    File.open(file_path, "rb") {|f|
      f.flock(File::LOCK_SH)
      cache = Marshal.load(f)
      f.flock(File::LOCK_UN)
    }
    return cache
  rescue
    @logger.warn("cannot load cache file (#{file_path})")
    return cache
  end
end
parse_log_dispatcher(user) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 307
def parse_log_dispatcher user
  parse_data = []
  file_path = File.expand_path(["~" + user, @@pmux_log_path, @@dispatcher_log].join(File::SEPARATOR))
  begin
    f = File.open(file_path)
    begin
      f.seek(-@@max_dispatcher_log_size, IO::SEEK_END)
    rescue
    end
    while ln = f.gets
        parse_data.push(ln)
    end
    partial_line = parse_data.shift()
  ensure
    f.close() if !f.nil?
  end
  return parse_data
end
parse_log_job(user, data) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 181
def parse_log_job user, data
  #  data structure
  #  {
  #    "jobs" => {
  #                id1 => { <job> },
  #                .
  #                .
  #              },
  #  }
  new = { "jobs" => {} }
  ids = []
  now = Time.now
  jobs_cache_file_path = [@jobs_cache_dir_path, user].join(File::SEPARATOR)
  cache = load_cache(jobs_cache_file_path)
  new_cache, need_save_cache = expire_cache(cache, now.to_i)
  files = get_files(user, get_log_paths, data, now)
  files.each do |file|
    job_id = File::basename(file).sub(".yml", "")
    if new_cache["jobs"].key?(job_id)
      new["jobs"][job_id] = new_cache["jobs"][job_id]
      ids.push([job_id, new["jobs"][job_id]["start_time_msec"]])
    else
      job, cachable = fast_parse(file, job_id, now.to_i)
      if cachable
        new_cache["jobs"][job_id] = job
        need_save_cache = true
      end
      new["jobs"][job_id] = job
      ids.push([job_id, new["jobs"][job_id]["start_time_msec"]])
    end
  end
  save_cache(jobs_cache_file_path, new_cache) if need_save_cache
  parse_data = pickup_job(new, ids, data)
  return parse_data
end
parse_log_job_detail(user, job_id) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 282
def parse_log_job_detail user, job_id
  file_path = File.expand_path(["~" + user, @@pmux_log_path, job_id + ".yml"].join(File::SEPARATOR))
  if !File.exist?(file_path)
    file_path = File.expand_path(["~" + user, @@pmux_old_log_path, job_id + ".yml"].join(File::SEPARATOR))
    if !File.exist?(file_path)
      return nil
    end
  end
  docs = full_parse(file_path)
  parse_data = []
  for idx in [0, 1, 2]
    if idx == 0 && !docs[idx].nil?
      docs[idx][:job_started_at_msec] = docs[idx][:job_started_at].strftime("%Q") if !docs[idx][:job_started_at].nil?
    elsif idx == 2 && !docs[idx].nil?
      docs[idx]["job_finished_at_msec"] = docs[idx]["job_finished_at"].strftime("%Q") if !docs[idx]["job_finished_at"].nil?
    end
    if docs[idx].nil?
        parse_data.push({})
    else
        parse_data.push(docs[idx])
    end
  end
  return parse_data
end
pickup_job(src, src_ids, data) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 165
def pickup_job src, src_ids, data 
  parse_data = { "jobs" => {} }
  src_ids = src_ids.sort_by{|id_start| id_start[1] }.map{|id_start| id_start[0]}
  src_ids = src_ids.reverse if data["sort_order"] == "desc"
  start_idx = 0;
  if data["start_time_msec"].nil? && data["end_time_msec"].nil?
    start_idx = data["page"] * data["nitems"]
    return parse_data if start_idx >= src_ids.length
  end
  for idx in start_idx..src_ids.length - 1
    job_id = src_ids[idx]
    parse_data["jobs"][job_id] = src["jobs"][job_id]
  end
  return parse_data
end
save_cache(file_path, cache) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 136
def save_cache file_path, cache
  FileUtils.mkdir_p(@jobs_cache_dir_path) if !File.exist?(@jobs_cache_dir_path)
  begin
    File.open(file_path, File::RDWR|File::CREAT, 0644) do |f|
      f.flock(File::LOCK_EX)
      f.rewind()
      Marshal.dump(cache, f)
      f.flush()
      f.truncate(f.pos)
      f.flock(File::LOCK_UN)
    end
  rescue
    @logger.warn("cannot save cache file (#{file_path})")
  end
end
set_jobs_cache_dir_path(jobs_cache_dir_path) click to toggle source
# File lib/pmux-logview/log_parser.rb, line 326
def set_jobs_cache_dir_path jobs_cache_dir_path
  @jobs_cache_dir_path = jobs_cache_dir_path
end