class PositionSelectionManager
MyWorkerManager class is used to implement the methods to send and receive the data to or from workers
Public Class Methods
# File lib/anncrsnp/position_selection_manager.rb, line 168 def self.create_positions_sets_for_tensorflow(path_folder, scores, tags) validation_set_proportion = 0.2 positions_number = tags.length validation_set_length = (positions_number * validation_set_proportion).to_i training_set_length = positions_number - validation_set_length validation_set_positions = [] # Set which positions will belong to validation set while validation_set_positions.length < validation_set_length position = rand(positions_number - 1) # We need random 0 based positions validation_set_positions << position if !validation_set_positions.include?(position) end tags.map!{|t| #tensorflow nedd positive integer as tags, we change tag used in AUC operation if t == -1 0 else t end } genomic_features = scores.keys training_set = [] validation_set = [] tags.each_with_index do |tag, n| record = [] # Create record position genomic_features.each do |gf| record << scores[gf][n] end record << tag if validation_set_positions.include?(n) # Send record to correspondant set validation_set << record else training_set << record end end tag_names = tags.uniq #TODO: improve to ensure exact correspondance training_set.unshift([training_set.length, genomic_features.length].concat(tag_names)) # set headers validation_set.unshift([validation_set.length, genomic_features.length].concat(tag_names)) # set headers write_set(training_set, File.join(path_folder, 'training_set.csv')) write_set(validation_set, File.join(path_folder, 'validation_set.csv')) end
end_work_manager
is executed at the end, when all the process is done. You can use it to close files opened in init_work_manager
# File lib/anncrsnp/position_selection_manager.rb, line 29 def self.end_work_manager positions_ids = [] scores = {} # Create genomic features table $LOG.info "Create general scores table" @@all_data.each do |data, positions_info| data_scores = [] positions_info.each do |chr, position_info| position_info.each do |position, score| data_scores << ["#{chr}_#{position.to_s}", score] end end data_scores.sort!{|sc1, sc2| sc1.first <=> sc2.first} scores[data] = data_scores.map{|sc| sc.last} positions_ids = data_scores.map{|sc| sc.first} if positions_ids.empty? end if !@@groups.empty? tags = positions_ids.map{|id| # Create vector tag group related to scores table tag = @@groups[id] if tag == 0 tag = -1 else tag = 1 end } if !@@options[:no_auc] $LOG.info "Calculating AUC for each genomic feature" aucs = get_aucs(tags, scores) # GEnerate area under curve by each genomic feature File.open(File.join(@@options[:selected_positions_folder], 'AUCs'), 'w'){ |f| aucs.each do |data_type, auc| f.puts "#{data_type}\t#{auc.join("\t")}" end } end $LOG.info "Creating training files for tensorflow" create_positions_sets_for_tensorflow(@@options[:selected_positions_folder], scores, tags) end data_types = scores.keys File.open(File.join(@@options[:selected_positions_folder], 'all_data'), 'w'){ |f| #final genomic feature scores table for goldstandard f.puts ['position'].concat(data_types).join("\t") positions_ids.each_with_index do |id, i| record = [id] data_types.each do |dt| record << scores[dt][i] end f.puts record.join("\t") end } end
# File lib/anncrsnp/position_selection_manager.rb, line 155 def self.get_aucs(tags, scores) aucs = {} scores.each do | data_type, scores| matrix = [] scores.each_with_index do |score, i| matrix << [score, tags[i]] end pts = ROC.curve_points(matrix) aucs[data_type] = [ROC.auc(matrix), GChart.scatter(:data => [pts.collect { |x| x[0] }, pts.collect { |x| x[1] }]).to_url] end return aucs end
init_work_manager
is executed at the start, prior to any processing. You can use init_work_manager
to initialize global variables, open files, etc… Note that an instance of MyWorkerManager will be created for each worker connection, and thus, all global variables here should be class variables (starting with @@)
# File lib/anncrsnp/position_selection_manager.rb, line 18 def self.init_work_manager(options) @@options = options @@positions, @@groups = load_selected_positions(@@options[:selected_positions]) @@active_data = File.open(File.join(@@options[:preprocessed_data], 'active_data')).readlines.map {|item| item.chomp} @@used_data = 0 @@used_position = 0 @@all_data = {} end
CUSTOM ADDITIONAL METHODS
# File lib/anncrsnp/position_selection_manager.rb, line 130 def self.load_selected_positions(file_path) selected_positions = {} groups = {} File.open(file_path).each do |line| line.chomp! chr, position, group = line.split("\t") record = position.to_i if !group.nil? group = group.to_i groups["#{chr}_#{position}"] = group end query = selected_positions[chr] if query.nil? selected_positions[chr] = [record] else query << record query.uniq! end end selected_positions.each do |chr, positions| positions.sort! end return selected_positions, groups end
# File lib/anncrsnp/position_selection_manager.rb, line 207 def self.write_set(set, path) File.open(path, 'w'){|f| set.each do |record| f.puts record.join(',') end } end
Public Instance Methods
next_work
method is called every time a worker needs a new work Here you can read data from disk This method must return the work data or nil if no more data is available
# File lib/anncrsnp/position_selection_manager.rb, line 89 def next_work begin if @@used_data >= @@active_data.length e = nil # worker signal disconect else chr = @@positions.keys[@@used_position] e = [@@active_data[@@used_data], chr, @@positions[chr]] @@used_position += 1 if @@used_position >= @@positions.length @@used_data +=1 @@used_position = 0 end end rescue Exception => e puts e.message puts e.backtrace end return e end
work_received
is executed each time a worker has finished a job. Here you can write results down to disk, perform some aggregated statistics, etc…
# File lib/anncrsnp/position_selection_manager.rb, line 115 def work_received(results) results.each do |data, positions_info| query = @@all_data[data] if query.nil? @@all_data[data] = positions_info else @@all_data[data] = query.merge(positions_info) end end end
worker_initial_config
is used to send initial parameters to workers. The method is executed once per each worker
# File lib/anncrsnp/position_selection_manager.rb, line 82 def worker_initial_config return @@options end