class PreprocessingWorker

MyWorker defines the behaviour of workers. Here is where the real processing takes place

Public Instance Methods

closing_worker() click to toggle source

called once, when the worker is about to be closed

# File lib/anncrsnp/preprocessing_worker.rb, line 77
def closing_worker

end
download_data(link, cols, header, format, temp) click to toggle source

Download protocols

# File lib/anncrsnp/preprocessing_worker.rb, line 87
def download_data(link, cols, header, format, temp)
  protocol, url = link.split('://')
  temp_file = nil
  if protocol == 'http'
    temp_file = File.join(temp, url.split('/').last)
    if !File.exist?(temp_file)
      get_http_data(url, temp_file) 
      $WORKER_LOG.info "Downloading #{link}"
    else
      $WORKER_LOG.info "Link was downloaded in a previous execution. Skipping download #{link}"
    end
  elsif protocol == 'ftp'
  else
    $WORKER_LOG.info "WARNING: protocol: #{protocol} in link: #{link} is not supported"
  end
  return temp_file
end
extract_data(format, temp, folder) click to toggle source

File decompression methods

# File lib/anncrsnp/preprocessing_worker.rb, line 115
def extract_data(format, temp, folder)
  data = {}
  parser_class = FileParser.select(format)
  parser = parser_class.new(folder, @options[:index_size])
  $WORKER_LOG.info "Processing temporal file #{temp}"
  if temp.include?('.gz')
    #data = get_gz(temp, parser)
    get_gz(temp, parser) 
  else

  end
  parser.write_compressed_data # Write remaining buffered data
  $WORKER_LOG.info "End processing temporal file #{temp}"
  return data
end
get_gz(temp, parser) click to toggle source
# File lib/anncrsnp/preprocessing_worker.rb, line 131
def get_gz(temp, parser)
  Zlib::GzipReader.open(temp) {|gz|
      gz.each do |line|
        parser.parse(line.chomp)
      end
  }
  #return parser.get_data
end
get_http_data(url, temp) click to toggle source
# File lib/anncrsnp/preprocessing_worker.rb, line 105
def get_http_data(url, temp)
  File.open(temp, "wb") do |saved_file|
    open("http://#{url}", "rb") do |read_file|
     saved_file.write(read_file.read)
    end
  end
end
process_object(objs) click to toggle source

process_object method is called for each received object. Be aware that objs is always an array, and you must iterate over it if you need to process it independently

The value returned here will be received by the work_received method at your worker_manager subclass.

# File lib/anncrsnp/preprocessing_worker.rb, line 49
def process_object(objs)
  Benchmark.bm do |x|
  x.report('Prep'){
    
    FileParser.load
    objs.each do |link, feature, cols, header, format| # iterate over all objects received
      $WORKER_LOG.info "Processing link: #{feature}, #{format}, #{link}"
      ft_folder = File.join(@options[:preprocessed_data], feature)
      ft_temp_folder = File.join(@options[:temp], feature)
      temp_file = download_data(link, cols, header, format, ft_temp_folder)
      if !@options[:downloaded_only]
        if File.exist?(temp_file)
          extract_data(format, temp_file, ft_folder)
        else
          $WORKER_LOG.info "WARNING: Temporal file #{temp_file} have not been downloaded for feature #{feature} so it will be skipped"
        end
      else
        $WORKER_LOG.info "Download only mode, skipping processing temp files"
      end
    end

  }
  end
  # return objs back to manager
  return []
end
receive_initial_config(parameters) click to toggle source

receive_initial_config is called only once just after the first connection, when initial parameters are received from manager

# File lib/anncrsnp/preprocessing_worker.rb, line 31
def receive_initial_config(parameters)
  @options = parameters
  # Reads the parameters

  # You can use worker logs at any time in this way:
  $WORKER_LOG.info "Params received"

  # save received parameters, if any
  # @params = parameters
end
starting_worker() click to toggle source

starting_worker method is called one time at initialization and allows you to initialize your variables

# File lib/anncrsnp/preprocessing_worker.rb, line 20
def starting_worker

  # You can use worker logs at any time in this way:
  $WORKER_LOG.info "Starting a worker"

end