class PositionSelectionManager

MyWorkerManager class is used to implement the methods to send and receive the data to or from workers

Public Class Methods

create_positions_sets_for_tensorflow(path_folder, scores, tags) click to toggle source
# File lib/anncrsnp/position_selection_manager.rb, line 168
def self.create_positions_sets_for_tensorflow(path_folder, scores, tags)
  validation_set_proportion = 0.2
  positions_number = tags.length
  validation_set_length = (positions_number * validation_set_proportion).to_i
  training_set_length = positions_number - validation_set_length
  validation_set_positions = [] # Set which positions will belong to validation set
  while validation_set_positions.length < validation_set_length
    position = rand(positions_number - 1) # We need random 0 based positions
    validation_set_positions << position if !validation_set_positions.include?(position)
  end
  tags.map!{|t| #tensorflow nedd positive integer as tags, we change tag used in AUC operation
    if t == -1
      0
    else
      t
    end
  }
  genomic_features = scores.keys
  training_set = []
  validation_set = []
  tags.each_with_index do |tag, n|
    record = [] # Create record position
    genomic_features.each do |gf|
      record << scores[gf][n]
    end
    record << tag
    if validation_set_positions.include?(n) # Send record to correspondant set
      validation_set << record
    else
      training_set << record
    end
  end
  tag_names = tags.uniq #TODO: improve to ensure exact correspondance
  training_set.unshift([training_set.length, genomic_features.length].concat(tag_names)) # set headers
  validation_set.unshift([validation_set.length, genomic_features.length].concat(tag_names)) # set headers
  write_set(training_set, File.join(path_folder, 'training_set.csv'))
  write_set(validation_set, File.join(path_folder, 'validation_set.csv'))
end
end_work_manager() click to toggle source

end_work_manager is executed at the end, when all the process is done. You can use it to close files opened in init_work_manager

# File lib/anncrsnp/position_selection_manager.rb, line 29
def self.end_work_manager
  positions_ids = []
  scores = {} # Create genomic features table
  $LOG.info "Create general scores table"
  @@all_data.each do |data, positions_info|
    data_scores = []
    positions_info.each do |chr, position_info|
      position_info.each do |position, score|
        data_scores << ["#{chr}_#{position.to_s}", score]
      end
    end
    data_scores.sort!{|sc1, sc2| sc1.first <=> sc2.first}
    scores[data] = data_scores.map{|sc| sc.last}
    positions_ids = data_scores.map{|sc| sc.first} if positions_ids.empty?
  end

  if !@@groups.empty?
    tags = positions_ids.map{|id| # Create vector tag group related to scores table
      tag = @@groups[id]
      if tag == 0
        tag = -1 
      else
        tag = 1
      end
    }
    if !@@options[:no_auc]
      $LOG.info "Calculating AUC for each genomic feature"
      aucs = get_aucs(tags, scores) # GEnerate area under curve by each genomic feature
      File.open(File.join(@@options[:selected_positions_folder], 'AUCs'), 'w'){ |f|
        aucs.each do |data_type, auc|
          f.puts "#{data_type}\t#{auc.join("\t")}"
        end
      }
    end
    $LOG.info "Creating training files for tensorflow"
    create_positions_sets_for_tensorflow(@@options[:selected_positions_folder], scores, tags)
  end

  data_types = scores.keys
  File.open(File.join(@@options[:selected_positions_folder], 'all_data'), 'w'){ |f| #final genomic feature scores table for goldstandard
    f.puts ['position'].concat(data_types).join("\t")
    positions_ids.each_with_index do |id, i|
      record = [id]
      data_types.each do |dt|
        record << scores[dt][i]
      end
      f.puts record.join("\t")
    end
  }
end
get_aucs(tags, scores) click to toggle source
# File lib/anncrsnp/position_selection_manager.rb, line 155
def self.get_aucs(tags, scores)
  aucs = {}
  scores.each do | data_type, scores|
    matrix = []
    scores.each_with_index do |score, i|
      matrix << [score, tags[i]]
    end
    pts = ROC.curve_points(matrix)
    aucs[data_type] = [ROC.auc(matrix), GChart.scatter(:data => [pts.collect { |x| x[0] }, pts.collect { |x| x[1] }]).to_url] 
  end
  return aucs
end
init_work_manager(options) click to toggle source

init_work_manager is executed at the start, prior to any processing. You can use init_work_manager to initialize global variables, open files, etc… Note that an instance of MyWorkerManager will be created for each worker connection, and thus, all global variables here should be class variables (starting with @@)

# File lib/anncrsnp/position_selection_manager.rb, line 18
def self.init_work_manager(options)
  @@options = options
  @@positions, @@groups = load_selected_positions(@@options[:selected_positions])
  @@active_data = File.open(File.join(@@options[:preprocessed_data], 'active_data')).readlines.map {|item| item.chomp}
  @@used_data = 0
  @@used_position = 0
  @@all_data = {}
end
load_selected_positions(file_path) click to toggle source

CUSTOM ADDITIONAL METHODS

# File lib/anncrsnp/position_selection_manager.rb, line 130
def self.load_selected_positions(file_path)
  selected_positions = {}
  groups = {}
  File.open(file_path).each do |line|
    line.chomp!
    chr, position, group = line.split("\t")
    record = position.to_i
    if !group.nil?
      group = group.to_i 
      groups["#{chr}_#{position}"] = group
    end
    query = selected_positions[chr]
    if query.nil?
      selected_positions[chr] = [record]
    else
      query << record
      query.uniq!
    end
  end
  selected_positions.each do |chr, positions|
    positions.sort!
  end 
  return selected_positions, groups
end
write_set(set, path) click to toggle source
# File lib/anncrsnp/position_selection_manager.rb, line 207
def self.write_set(set, path)
  File.open(path, 'w'){|f|
    set.each do |record|
      f.puts record.join(',')
    end
  }
end

Public Instance Methods

next_work() click to toggle source

next_work method is called every time a worker needs a new work Here you can read data from disk This method must return the work data or nil if no more data is available

# File lib/anncrsnp/position_selection_manager.rb, line 89
def next_work
  begin
    if @@used_data >= @@active_data.length
      e = nil # worker signal disconect
    else
      chr = @@positions.keys[@@used_position]
      e = [@@active_data[@@used_data], chr, @@positions[chr]]
      @@used_position += 1
      if @@used_position >= @@positions.length
        @@used_data +=1
        @@used_position = 0
      end
    end

  rescue Exception => e  
    puts e.message  
    puts e.backtrace

  end
  return e

end
work_received(results) click to toggle source

work_received is executed each time a worker has finished a job. Here you can write results down to disk, perform some aggregated statistics, etc…

# File lib/anncrsnp/position_selection_manager.rb, line 115
def work_received(results)
  results.each do |data, positions_info|
    query = @@all_data[data]
    if query.nil?
      @@all_data[data] = positions_info
    else
      @@all_data[data] = query.merge(positions_info)
    end
  end
end
worker_initial_config() click to toggle source

worker_initial_config is used to send initial parameters to workers. The method is executed once per each worker

# File lib/anncrsnp/position_selection_manager.rb, line 82
def worker_initial_config
  return @@options
end