class PositionSelectionWorker

MyWorker defines the behaviour of workers. Here is where the real processing takes place

Public Instance Methods

closing_worker() click to toggle source

called once, when the worker is about to be closed

# File lib/anncrsnp/position_selection_worker.rb, line 77
def closing_worker

end
get_scores(chr_data, positions) click to toggle source
# File lib/anncrsnp/position_selection_worker.rb, line 110
def get_scores(chr_data, positions)
  positions_scores = []
  # Remove positions out of existing coordinates
  lower_limit = chr_data.first.first
  upper_limit = chr_data.last.first
  positions_scores.concat(positions.select{|pos| pos < lower_limit}.map{|pos| [pos, 0]}) #At the beginning
  filtered_positions = positions.select{|pos| pos >= lower_limit && pos <= upper_limit }
  #--------------------------------------------------------------------------------------------------
  if !filtered_positions.empty?
    current_position = filtered_positions.shift
    chr_data.each do |coord, score|
      if coord == current_position
        positions_scores << [current_position,  score]
        break if filtered_positions.empty?
        current_position = filtered_positions.shift
      elsif coord > current_position # We have encountered a gap and current position is in it
        while coord > current_position # drop positions within the gap
          positions_scores << [current_position,  0]
          break if filtered_positions.empty?
          current_position = filtered_positions.shift
        end
        break if filtered_positions.empty?
      end
    end
  end
  
  positions_scores.concat(positions.select{|pos| pos > upper_limit}.map{|pos| [pos, 0]}) # At the end

  return positions_scores
end
process_object(objs) click to toggle source

process_object method is called for each received object. Be aware that objs is always an array, and you must iterate over it if you need to process it independently

The value returned here will be received by the work_received method at your worker_manager subclass.

# File lib/anncrsnp/position_selection_worker.rb, line 45
def process_object(objs)
  all_data = nil
  Benchmark.bm do |x|
  x.report('PosS'){

    packs, datas = get_info_to_search(objs) 
    all_data = {}
    datas.each do |data|
      selected_scores = {}
      packs.each do |chr, ps|
        scores = []
        ps.each do |pack, positions|
          info_path = File.join(@options[:preprocessed_data], data, "#{chr}_#{pack}.gz")
          #puts info_path
          if File.exists?(info_path)
            chr_data = []
            Zlib::GzipReader.open(info_path) {|gz| chr_data = JSON.parse(gz.read)}
            scores.concat(get_scores(chr_data, positions)) 
          end
        end
        selected_scores[chr] = scores 
      end
      all_data[data] = selected_scores
    end
    # return objs back to manager

  }
  end
  return all_data
end
receive_initial_config(parameters) click to toggle source

receive_initial_config is called only once just after the first connection, when initial parameters are received from manager

# File lib/anncrsnp/position_selection_worker.rb, line 27
def receive_initial_config(parameters)
  @options = parameters
  # Reads the parameters

  # You can use worker logs at any time in this way:
  $WORKER_LOG.info "Params received"

  # save received parameters, if any
  # @params = parameters
end
starting_worker() click to toggle source

starting_worker method is called one time at initialization and allows you to initialize your variables

# File lib/anncrsnp/position_selection_worker.rb, line 16
def starting_worker

  # You can use worker logs at any time in this way:
  $WORKER_LOG.info "Starting a worker"

end