class PreprocessingWorker
MyWorker defines the behaviour of workers. Here is where the real processing takes place
Public Instance Methods
closing_worker()
click to toggle source
called once, when the worker is about to be closed
# File lib/anncrsnp/preprocessing_worker.rb, line 77 def closing_worker end
download_data(link, cols, header, format, temp)
click to toggle source
Download protocols
# File lib/anncrsnp/preprocessing_worker.rb, line 87 def download_data(link, cols, header, format, temp) protocol, url = link.split('://') temp_file = nil if protocol == 'http' temp_file = File.join(temp, url.split('/').last) if !File.exist?(temp_file) get_http_data(url, temp_file) $WORKER_LOG.info "Downloading #{link}" else $WORKER_LOG.info "Link was downloaded in a previous execution. Skipping download #{link}" end elsif protocol == 'ftp' else $WORKER_LOG.info "WARNING: protocol: #{protocol} in link: #{link} is not supported" end return temp_file end
extract_data(format, temp, folder)
click to toggle source
File decompression methods
# File lib/anncrsnp/preprocessing_worker.rb, line 115 def extract_data(format, temp, folder) data = {} parser_class = FileParser.select(format) parser = parser_class.new(folder, @options[:index_size]) $WORKER_LOG.info "Processing temporal file #{temp}" if temp.include?('.gz') #data = get_gz(temp, parser) get_gz(temp, parser) else end parser.write_compressed_data # Write remaining buffered data $WORKER_LOG.info "End processing temporal file #{temp}" return data end
get_gz(temp, parser)
click to toggle source
# File lib/anncrsnp/preprocessing_worker.rb, line 131 def get_gz(temp, parser) Zlib::GzipReader.open(temp) {|gz| gz.each do |line| parser.parse(line.chomp) end } #return parser.get_data end
get_http_data(url, temp)
click to toggle source
# File lib/anncrsnp/preprocessing_worker.rb, line 105 def get_http_data(url, temp) File.open(temp, "wb") do |saved_file| open("http://#{url}", "rb") do |read_file| saved_file.write(read_file.read) end end end
process_object(objs)
click to toggle source
process_object
method is called for each received object. Be aware that objs is always an array, and you must iterate over it if you need to process it independently
The value returned here will be received by the work_received method at your worker_manager subclass.
# File lib/anncrsnp/preprocessing_worker.rb, line 49 def process_object(objs) Benchmark.bm do |x| x.report('Prep'){ FileParser.load objs.each do |link, feature, cols, header, format| # iterate over all objects received $WORKER_LOG.info "Processing link: #{feature}, #{format}, #{link}" ft_folder = File.join(@options[:preprocessed_data], feature) ft_temp_folder = File.join(@options[:temp], feature) temp_file = download_data(link, cols, header, format, ft_temp_folder) if !@options[:downloaded_only] if File.exist?(temp_file) extract_data(format, temp_file, ft_folder) else $WORKER_LOG.info "WARNING: Temporal file #{temp_file} have not been downloaded for feature #{feature} so it will be skipped" end else $WORKER_LOG.info "Download only mode, skipping processing temp files" end end } end # return objs back to manager return [] end
receive_initial_config(parameters)
click to toggle source
receive_initial_config
is called only once just after the first connection, when initial parameters are received from manager
# File lib/anncrsnp/preprocessing_worker.rb, line 31 def receive_initial_config(parameters) @options = parameters # Reads the parameters # You can use worker logs at any time in this way: $WORKER_LOG.info "Params received" # save received parameters, if any # @params = parameters end
starting_worker()
click to toggle source
starting_worker
method is called one time at initialization and allows you to initialize your variables
# File lib/anncrsnp/preprocessing_worker.rb, line 20 def starting_worker # You can use worker logs at any time in this way: $WORKER_LOG.info "Starting a worker" end