class Pmux::LogView::LogParser
Public Class Methods
new(jobs_cache_dir_path, cache_expire)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 34 def initialize jobs_cache_dir_path, cache_expire @jobs_cache_dir_path = jobs_cache_dir_path @cache_expire_time = cache_expire @logger = LoggerWrapper.instance() end
Public Instance Methods
expire_cache(cache, now)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 152 def expire_cache cache, now new_cache = { "jobs" => {} } need_save_cache = false cache["jobs"].each do |job_id, job| if job["parsed_time"] + @cache_expire_time < now need_save_cache = true next end new_cache["jobs"][job_id] = cache["jobs"][job_id] end return [new_cache, need_save_cache] end
fast_parse(file, job_id, parsed_time)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 62 def fast_parse file, job_id, parsed_time cachable = false job = {"end_time" => nil, "elapsed_time" => nil, "finished_tasks" => 0, "job_id" => job_id, "parsed_time" => parsed_time} task_cnt = 0 start_time = nil end_time = nil doc1_buffer = "" doc_cnt = 0 File.open(file) do |f| f.each_line do |ln| if @@document_re =~ ln if doc_cnt == 1 new_doc = YAML.load(doc1_buffer) job["mapper"] = new_doc[:mapper].encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8") end doc_cnt += 1 elsif doc_cnt == 1 && @@start_at_re =~ ln empty, key, value = ln.split(":", 3) start_time = DateTime::parse(value.strip()) job["start_time_msec"] = start_time.strftime("%Q") job["start_time"] = start_time.strftime("%Y-%m-%d %H:%M:%S") elsif doc_cnt == 1 && @@tasks_re =~ ln empty, key, value = ln.split(":", 3) job[key] = value.strip().to_i() elsif doc_cnt == 1 doc1_buffer += ln elsif doc_cnt == 2 && @@task_id_re =~ ln task_cnt += 1 elsif doc_cnt == 3 && @@finish_at_re =~ ln empty, key, value = ln.split(":", 3) end_time = DateTime::parse(value.strip()) job["end_time_msec"] = end_time.strftime("%Q") job["end_time"] = end_time.strftime("%Y-%m-%d %H:%M:%S") cachable = true elsif doc_cnt == 3 && @@error_status_re =~ ln empty, key, value = ln.split(":", 3) job[key] = value.strip() cachable = true elsif doc_cnt == 3 && @@error_message_re =~ ln empty, key, value = ln.split(":", 3) job[key] = value.strip().gsub(@@quote_re, "").encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8") end end end job["finished_tasks"] = task_cnt job["elapsed_time"] = ((end_time - start_time) * 86400).to_f if !start_time.nil? && !end_time.nil? if end_time.nil? if job["map_tasks"].nil? job["end_time"] = "--- %" elsif job["map_tasks"] == 0 job["end_time"] = "100%" else job["end_time"] = ((100 * job["finished_tasks"]) / job["map_tasks"]).to_s + "%" end end return [job, cachable] end
full_parse(file_path)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 217 def full_parse file_path documents = [] File.open(file_path) do |f| doc1_buffer = "" doc1_param = {} doc_cnt = 0 new_doc = nil task_id = nil f.each_line do |ln| if @@document_re =~ ln if doc_cnt == 1 new_doc = YAML.load(doc1_buffer) new_doc[:job_started_at] = doc1_param["job_started_at"] if doc1_param["job_started_at"] new_doc[:invoked_at] = doc1_param["invoked_at"] if doc1_param["invoked_at"] new_doc[:start_time] = doc1_param["start_time"] if doc1_param["start_time"] new_doc[:mapper] = new_doc[:mapper].to_s.encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8") if new_doc[:mapper] new_doc[:reducer] = new_doc[:reducer].to_s.encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8") if new_doc[:reducer] new_doc[:params][:mapper] = new_doc[:params][:mapper].to_s.encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8") if new_doc.key?(:params) && new_doc[:params][:mapper] new_doc[:params][:job_name] = new_doc[:params][:job_name].to_s.encode("UTF-16BE", "UTF-8", :invalid => :replace, :undef => :replace, :replace => '?').encode("UTF-8") if new_doc.key?(:params) && new_doc[:params][:job_name] end if !new_doc.nil? new_doc.delete(task_id) if doc_cnt == 2 && !task_id.nil? && new_doc[task_id].length < 5 documents.push(new_doc) end doc_cnt += 1 new_doc = {} task_id = nil elsif doc_cnt == 1 && @@start_at_re =~ ln empty, key, value = ln.split(":", 3) time = DateTime::parse(value.strip()) doc1_param[key] = time elsif doc_cnt == 1 doc1_buffer += ln elsif doc_cnt == 2 && @@task_id_re =~ ln task_id, empty = ln.split(":", 2) new_doc[task_id] = {} elsif doc_cnt == 2 && @@elapse_re =~ ln key, value = ln.split(":", 2) new_doc[task_id][key.strip()] = value.strip().to_f() elsif doc_cnt == 2 && @@task_re =~ ln key, value = ln.split(":", 2) new_doc[task_id][key.strip()] = value.strip().gsub(@@quote_re, "") elsif doc_cnt == 3 && @@finish_at_re =~ ln empty, key, value = ln.split(":", 3) time = DateTime::parse(value.strip()) new_doc[key] = time elsif doc_cnt == 3 && @@futter_re =~ ln empty, key, value = ln.split(":", 3) new_doc[key] = value.strip().gsub(@@quote_re, "") end end if !new_doc.nil? new_doc.delete(task_id) if doc_cnt == 2 && new_doc[task_id].length < 5 documents.push(new_doc) end if documents.length == 1 documents.push(nil) end if documents.length == 2 documents.push(nil) end end return documents end
get_files(user, log_path, data, now)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 44 def get_files user, log_path, data, now stime = Time.at(0) etime = now need_slice = true if !data["start_time_msec"].nil? || !data["end_time_msec"].nil? need_slice = false stime = Time.at(data["start_time_msec"] / 1000, (data["start_time_msec"] % 1000) * 1000) if !data["start_time_msec"].nil? etime = Time.at(data["end_time_msec"] / 1000, (data["start_time_msec"] % 1000) * 1000) if !data["end_time_msec"].nil? end file_mtime_list = Dir.glob(File.expand_path(["~" + user, log_path, "*.yml"].join(File::SEPARATOR))).map{ |f| [f, File.mtime(f)] } file_list = file_mtime_list.sort_by{ |fm| fm[1] }.reverse.map{ |fm| fm[0] if stime <= fm[1] && fm[1] <= etime }.compact if need_slice item_count = (data["page"] + 1) * data["nitems"] file_list = file_list.slice(0, item_count) end return file_list end
get_log_paths()
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 40 def get_log_paths return [@@pmux_log_path] end
load_cache(file_path)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 120 def load_cache file_path cache = { "jobs" => {} } return cache if !File.exist?(file_path) begin File.open(file_path, "rb") {|f| f.flock(File::LOCK_SH) cache = Marshal.load(f) f.flock(File::LOCK_UN) } return cache rescue @logger.warn("cannot load cache file (#{file_path})") return cache end end
parse_log_dispatcher(user)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 307 def parse_log_dispatcher user parse_data = [] file_path = File.expand_path(["~" + user, @@pmux_log_path, @@dispatcher_log].join(File::SEPARATOR)) begin f = File.open(file_path) begin f.seek(-@@max_dispatcher_log_size, IO::SEEK_END) rescue end while ln = f.gets parse_data.push(ln) end partial_line = parse_data.shift() ensure f.close() if !f.nil? end return parse_data end
parse_log_job(user, data)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 181 def parse_log_job user, data # data structure # { # "jobs" => { # id1 => { <job> }, # . # . # }, # } new = { "jobs" => {} } ids = [] now = Time.now jobs_cache_file_path = [@jobs_cache_dir_path, user].join(File::SEPARATOR) cache = load_cache(jobs_cache_file_path) new_cache, need_save_cache = expire_cache(cache, now.to_i) files = get_files(user, get_log_paths, data, now) files.each do |file| job_id = File::basename(file).sub(".yml", "") if new_cache["jobs"].key?(job_id) new["jobs"][job_id] = new_cache["jobs"][job_id] ids.push([job_id, new["jobs"][job_id]["start_time_msec"]]) else job, cachable = fast_parse(file, job_id, now.to_i) if cachable new_cache["jobs"][job_id] = job need_save_cache = true end new["jobs"][job_id] = job ids.push([job_id, new["jobs"][job_id]["start_time_msec"]]) end end save_cache(jobs_cache_file_path, new_cache) if need_save_cache parse_data = pickup_job(new, ids, data) return parse_data end
parse_log_job_detail(user, job_id)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 282 def parse_log_job_detail user, job_id file_path = File.expand_path(["~" + user, @@pmux_log_path, job_id + ".yml"].join(File::SEPARATOR)) if !File.exist?(file_path) file_path = File.expand_path(["~" + user, @@pmux_old_log_path, job_id + ".yml"].join(File::SEPARATOR)) if !File.exist?(file_path) return nil end end docs = full_parse(file_path) parse_data = [] for idx in [0, 1, 2] if idx == 0 && !docs[idx].nil? docs[idx][:job_started_at_msec] = docs[idx][:job_started_at].strftime("%Q") if !docs[idx][:job_started_at].nil? elsif idx == 2 && !docs[idx].nil? docs[idx]["job_finished_at_msec"] = docs[idx]["job_finished_at"].strftime("%Q") if !docs[idx]["job_finished_at"].nil? end if docs[idx].nil? parse_data.push({}) else parse_data.push(docs[idx]) end end return parse_data end
pickup_job(src, src_ids, data)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 165 def pickup_job src, src_ids, data parse_data = { "jobs" => {} } src_ids = src_ids.sort_by{|id_start| id_start[1] }.map{|id_start| id_start[0]} src_ids = src_ids.reverse if data["sort_order"] == "desc" start_idx = 0; if data["start_time_msec"].nil? && data["end_time_msec"].nil? start_idx = data["page"] * data["nitems"] return parse_data if start_idx >= src_ids.length end for idx in start_idx..src_ids.length - 1 job_id = src_ids[idx] parse_data["jobs"][job_id] = src["jobs"][job_id] end return parse_data end
save_cache(file_path, cache)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 136 def save_cache file_path, cache FileUtils.mkdir_p(@jobs_cache_dir_path) if !File.exist?(@jobs_cache_dir_path) begin File.open(file_path, File::RDWR|File::CREAT, 0644) do |f| f.flock(File::LOCK_EX) f.rewind() Marshal.dump(cache, f) f.flush() f.truncate(f.pos) f.flock(File::LOCK_UN) end rescue @logger.warn("cannot save cache file (#{file_path})") end end
set_jobs_cache_dir_path(jobs_cache_dir_path)
click to toggle source
# File lib/pmux-logview/log_parser.rb, line 326 def set_jobs_cache_dir_path jobs_cache_dir_path @jobs_cache_dir_path = jobs_cache_dir_path end