class PositionSelectionWorker
MyWorker defines the behaviour of workers. Here is where the real processing takes place
Public Instance Methods
closing_worker()
click to toggle source
called once, when the worker is about to be closed
# File lib/anncrsnp/position_selection_worker.rb, line 77 def closing_worker end
get_info_to_search(objs)
click to toggle source
WORKER CUSTOM METHODS
# File lib/anncrsnp/position_selection_worker.rb, line 85 def get_info_to_search(objs) packs = {} datas = [] objs.each do |data, chr, positions| # Analyse which chromosomes and packs must be loaded datas << data if !datas.include?(data) positions.each do |position| pack = position/@options[:index_size] pack = pack * @options[:index_size] #puts "#{position} ==> #{pack}" query_chr = packs[chr] if query_chr.nil? packs[chr] = { pack => [position]} else query_pack = query_chr[pack] if query_pack.nil? query_chr[pack] = [position] else query_pack << position end end end end return packs, datas end
get_scores(chr_data, positions)
click to toggle source
# File lib/anncrsnp/position_selection_worker.rb, line 110 def get_scores(chr_data, positions) positions_scores = [] # Remove positions out of existing coordinates lower_limit = chr_data.first.first upper_limit = chr_data.last.first positions_scores.concat(positions.select{|pos| pos < lower_limit}.map{|pos| [pos, 0]}) #At the beginning filtered_positions = positions.select{|pos| pos >= lower_limit && pos <= upper_limit } #-------------------------------------------------------------------------------------------------- if !filtered_positions.empty? current_position = filtered_positions.shift chr_data.each do |coord, score| if coord == current_position positions_scores << [current_position, score] break if filtered_positions.empty? current_position = filtered_positions.shift elsif coord > current_position # We have encountered a gap and current position is in it while coord > current_position # drop positions within the gap positions_scores << [current_position, 0] break if filtered_positions.empty? current_position = filtered_positions.shift end break if filtered_positions.empty? end end end positions_scores.concat(positions.select{|pos| pos > upper_limit}.map{|pos| [pos, 0]}) # At the end return positions_scores end
process_object(objs)
click to toggle source
process_object
method is called for each received object. Be aware that objs is always an array, and you must iterate over it if you need to process it independently
The value returned here will be received by the work_received method at your worker_manager subclass.
# File lib/anncrsnp/position_selection_worker.rb, line 45 def process_object(objs) all_data = nil Benchmark.bm do |x| x.report('PosS'){ packs, datas = get_info_to_search(objs) all_data = {} datas.each do |data| selected_scores = {} packs.each do |chr, ps| scores = [] ps.each do |pack, positions| info_path = File.join(@options[:preprocessed_data], data, "#{chr}_#{pack}.gz") #puts info_path if File.exists?(info_path) chr_data = [] Zlib::GzipReader.open(info_path) {|gz| chr_data = JSON.parse(gz.read)} scores.concat(get_scores(chr_data, positions)) end end selected_scores[chr] = scores end all_data[data] = selected_scores end # return objs back to manager } end return all_data end
receive_initial_config(parameters)
click to toggle source
receive_initial_config
is called only once just after the first connection, when initial parameters are received from manager
# File lib/anncrsnp/position_selection_worker.rb, line 27 def receive_initial_config(parameters) @options = parameters # Reads the parameters # You can use worker logs at any time in this way: $WORKER_LOG.info "Params received" # save received parameters, if any # @params = parameters end
starting_worker()
click to toggle source
starting_worker
method is called one time at initialization and allows you to initialize your variables
# File lib/anncrsnp/position_selection_worker.rb, line 16 def starting_worker # You can use worker logs at any time in this way: $WORKER_LOG.info "Starting a worker" end