class Seqtrim
SEQTRIM_VERSION_REVISION=27 SEQTRIM_VERSION_STAGE = 'b' $SEQTRIM_VERSION = “2.0.0#{SEQTRIM_VERSION_STAGE}#{SEQTRIM_VERSION_REVISION}”
Public Class Methods
exit_status()
click to toggle source
# File lib/seqtrimnext/classes/seqtrim.rb, line 20 def self.exit_status return SeqtrimWorkManager.exit_status end
new(options)
click to toggle source
# File lib/seqtrimnext/classes/seqtrim.rb, line 157 def initialize(options) # ,options[:fasta],options[:qual],,,, params_path=options[:template] ip=options[:server_ip] port=options[:port] workers=options[:workers] only_workers=options[:only_workers] chunk_size = options[:chunk_size] use_json = options[:json] # check for checkpoint if File.exists?(ScbiMapreduce::CHECKPOINT_FILE) if !options[:use_checkpoint] STDERR.puts "ERROR: A checkpoint file exists, either delete it or provide -C flag to use it" exit(-1) end end # it is the server part if !only_workers then cd_hit_input_file = nil # TODO - FIX seqtrim to not iterate two times over input, so STDIN can be used sequence_readers=[] # open sequence reader and expand input files paths if options[:fastq] # choose fastq quality format format=:sanger case options[:format] when 'sanger' format = :sanger when 'illumina15' format = :ilumina when 'illumina18' format = :sanger end seqs_path='' $LOG.info("Used FastQ format for input files: #{format}") # iterate files options[:fastq].each do |fastq_file| if fastq_file=='-' seqs_path = STDIN else seqs_path = File.expand_path(fastq_file) end sequence_readers << FastqFile.new(seqs_path,'r',format, true) end cd_hit_input_file = seqs_path else seqs_path = File.expand_path(options[:fasta]) cd_hit_input_file = seqs_path qual_path = File.expand_path(options[:qual]) if qual_path sequence_readers << FastaQualFile.new(options[:fasta],options[:qual],true) end $LOG.info "Loading params" # Reads the parameter's file params = Params.new(params_path) $LOG.info "Checking global params" if !check_global_params(params) exit(-1) end # Load actions $LOG.info "Loading actions" action_manager = ActionManager.new() # load plugins plugin_list = params.get_param('plugin_list') # puts in plugin_list the plugins's array $LOG.info "Loading plugins [#{plugin_list}]" plugin_manager = PluginManager.new(plugin_list,params) # creates an instance from PluginManager. This must storage the plugins and load it # load plugin params $LOG.info "Check plugin params" if !plugin_manager.check_plugins_params(params) then $LOG.error "Plugin check failed" # save used params to file params.save_file('used_params.txt') exit(-1) end if !Dir.exists?(OUTPUT_PATH) Dir.mkdir(OUTPUT_PATH) end # Extract global stats if params.get_param('generate_initial_stats').to_s=='true' $LOG.info "Calculatings stats" ExtractStats.new(sequence_readers,params) else $LOG.info "Skipping calculatings stats phase." end # save used params to file params.save_file(File.join(OUTPUT_PATH,'used_params.txt')) piro_on = (params.get_param('next_generation_sequences').to_s=='true') params.load_mids(params.get_param('mids_db')) params.load_ab_adapters(params.get_param('adapters_ab_db')) params.load_adapters(params.get_param('adapters_db')) params.load_linkers(params.get_param('linkers_db')) #execute cd-hit if params.get_param('remove_clonality').to_s=='true' cmd=get_custom_cdhit(cd_hit_input_file,params) if cmd.empty? cmd=get_cd_hit_cmd(cd_hit_input_file,workers,$SEQTRIMNEXT_INIT) end $LOG.info "Executing cd-hit-454: #{cmd}" if !File.exists?('clusters.fasta.clstr') system(cmd) end if File.exists?('clusters.fasta.clstr') params.load_repeated_seqs('clusters.fasta.clstr') else $LOG.error("Exiting due to not found clusters.fasta.clstr. Maybe cd-hit failed. Check cd-hit.out") exit(-1) end end ############ SCBI DRB ########### # port = 50000 # ip = "10.250.255.6" # port = 50000 # ip = "localhost" # # workers=20 # only_workers=false # launch work manager end # end only_workers custom_worker_file = File.join(File.dirname(__FILE__), 'em_classes','seqtrim_worker.rb') $LOG.info "Workers:\n#{workers}" if only_workers then worker_launcher = ScbiMapreduce::WorkerLauncher.new(ip,port, workers, custom_worker_file, STDOUT) worker_launcher.launch_workers_and_wait else $LOG.info 'Starting server' SeqtrimWorkManager.init_work_manager(sequence_readers, params,chunk_size,use_json,options[:skip_output],options[:write_in_gzip]) begin cpus=1 if RUBY_PLATFORM.downcase.include?("darwin") cpus=`hwprefs -cpu_count`.chomp.to_i else cpus=`grep processor /proc/cpuinfo |wc -l`.chomp.to_i end rescue cpus=1 end # if workers is an integer, reduce it by one (because of the server) begin Integer(workers) if workers>1 && workers<cpus workers-=1 end rescue if workers.count>1 && workers.count<cpus workers.shift end end # launch processor server passing the ip, port and all required params # server = Server.new(ip,port, workers, SeqtrimWorkManager,custom_worker_file, STDOUT,File.join($SEQTRIM_PATH,'init_env')) # server = ScbiMapreduce::Manager.new(ip,port, workers, SeqtrimWorkManager,custom_worker_file, STDOUT,'~/.seqtrimnext') server = ScbiMapreduce::Manager.new(ip,port, workers, SeqtrimWorkManager,custom_worker_file, STDOUT,$SEQTRIMNEXT_INIT) server.chunk_size=chunk_size server.checkpointing=true server.keep_order=true server.retry_stuck_jobs=true server.start_server # close sequence reader sequence_readers.each do |file| file.close end if SeqtrimWorkManager.exit_status>=0 $LOG.info "Exit status: #{SeqtrimWorkManager.exit_status}" else $LOG.error "Exit status: #{SeqtrimWorkManager.exit_status}" end $LOG.info 'Closing server' end ############ SCBI DRB ########### end
Public Instance Methods
check_global_params(params)
click to toggle source
# File lib/seqtrimnext/classes/seqtrim.rb, line 100 def check_global_params(params) errors=[] # check plugin list comment='Plugins applied to every sequence, separated by commas. Order is important' # default_value='PluginLowHighSize,PluginMids,PluginIndeterminations,PluginAbAdapters,PluginContaminants,PluginLinker,PluginVectors,PluginLowQuality' # params.check_param(errors,'plugin_list','String',default_value,comment) params.check_param(errors,'plugin_list','PluginList',nil,comment) comment='Should SeqTrimNext analysis be based on NGS? (if setting to false, a classic Sanger sequencing is considered)' default_value='true' params.check_param(errors,'next_generation_sequences','String',default_value,comment) comment='Remove duplicated (clonal) sequences (using CD-HIT 454)' default_value='true' params.check_param(errors,'remove_clonality','String',default_value,comment) comment='Custom parameters used by CD-HIT-454 (leave empty to let seqtrimnext decide). Execute "cd-hit-454 help" in command line to see a list of parameters' default_value='' params.check_param(errors,'cdhit_custom_parameters','String',default_value,comment) comment='Generate initial stats' default_value='true' params.check_param(errors,'generate_initial_stats','String',default_value,comment) comment='Minimum insert size for every trimmed sequence' default_value = 40 params.check_param(errors,'min_insert_size_trimmed','Integer',default_value,comment) comment='Minimum insert size for each end of paired-end reads; true paired-ends have both single-ends longer than this value' default_value = 40 params.check_param(errors,'min_insert_size_paired','Integer',default_value,comment) comment='Do not reject unexpectedly long sequences found in the raw data' default_value='true' params.check_param(errors,'accept_very_long_sequences','String',default_value,comment) comment='Seqtrim version' default_value=Seqtrimnext::SEQTRIM_VERSION params.check_param(errors,'seqtrim_version','String',default_value,comment) if !errors.empty? $LOG.error 'Please, define the following global parameters in params file:' errors.each do |error| $LOG.error ' -' + error end #end each end #end if return errors.empty? end
get_cd_hit_cmd(cd_hit_input_file,workers,init_file_path)
click to toggle source
# File lib/seqtrimnext/classes/seqtrim.rb, line 50 def get_cd_hit_cmd(cd_hit_input_file,workers,init_file_path) num_cpus_cdhit=1 cmd='' # if workers is an integer, reduce it by one in the server begin Integer(workers) num_cpus_cdhit = workers cmd = "cd-hit-454 -i #{cd_hit_input_file} -o clusters.fasta -M #{num_cpus_cdhit*1000} -T #{num_cpus_cdhit} > cd-hit-454.out" rescue Exception => exception #not an integer, send via ssh to other machine # puts exception worker_hash={};workers.map{|e| worker_hash[e] = (worker_hash[e]||0) +1} max_worker = worker_hash.sort_by{|k,v| -v}.first puts "Found these workers: #{worker_hash.sort_by{|k,v| -v}}" num_cpus_cdhit=max_worker[1] init='' cd='' cmd = "cd-hit-454 -i #{cd_hit_input_file} -o clusters.fasta -M #{num_cpus_cdhit*1000} -T #{num_cpus_cdhit} > cd-hit-454.out" # worker is different to current machine, send over ssh if max_worker[0]!= workers[0] if File.exists?(init_file_path) init=". #{init_file_path}; " end pwd=`pwd`.chomp cd ='' if File.exists?(pwd) cd = "cd #{pwd}; " end cmd = "ssh #{max_worker[0]} \"#{init} #{cd} #{cmd}\"" end end return cmd end
get_custom_cdhit(cd_hit_input_file,params)
click to toggle source
First of all, reads the file's parameters, where are the values of all parameters and the 'plugin_list' that specifies the order of execution from the plugins.
Secondly, loads the plugins in a folder .
Thirdly, checks if parameter's file have the number of parameters necessary for every plugin that is going to be executed.
After that, creates a thread's pool of a determinate number of workers, e.g. 10 threads, reads the sequences from files 'fasta' , until now without qualities, and executes the plugins over the sequences in the pool of threads
# File lib/seqtrimnext/classes/seqtrim.rb, line 34 def get_custom_cdhit(cd_hit_input_file,params) cmd='' begin cdhit_custom_parameters=params.get_param('cdhit_custom_parameters').strip if !cdhit_custom_parameters.nil? and !cdhit_custom_parameters.empty? cmd = "cd-hit-454 -i #{cd_hit_input_file} -o clusters.fasta #{cdhit_custom_parameters} > cd-hit-454.out" end rescue Exception => exception #not an integer, send via ssh to other machine cmd='' end return cmd end