class SeqtrimWorkManager

TODO - Pasar secuencias en grupos, y hacer blast en grupos de secuencias.

Public Class Methods

end_work_manager() click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 93
def self.end_work_manager

  # if initial files doesn't exists, create it
  if !File.exists?(File.join(OUTPUT_PATH,'initial_stats.json'))
    File.open(File.join(OUTPUT_PATH,'initial_stats.json'),'w') do |f|
      f.puts JSON.pretty_generate(@@ongoing_stats)
    end
  end

  # load stats
  #r=File.read(STATS_PATH)
  #stats=JSON::parse(r)

  stats=@@full_stats

  # make graphs
  gs=GraphStats.new(stats)

  #close all files
  if @@use_json
    @@json_output.close
  end
  @@errors_file.close

  @@files.each do |k,file|
    file.close
  end

end
exit_status() click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 89
def self.exit_status
  return @@exit_status
end
global_error_received(error_exception) click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 123
def self.global_error_received(error_exception)
  $LOG.error "Global error:\n" + error_exception.message + ":\n" +error_exception.backtrace.join("\n")
  @@errors_file.puts "Global error:\n" + error_exception.message + ":\n" +error_exception.backtrace.join("\n")
  @@errors_file.puts "="*60
  @@exit_status=-1
  SeqtrimWorkManager.controlled_exit
end
init_work_manager(sequence_readers, params, chunk_size = 100, use_json=false, skip_output=false, write_in_gzip=true) click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 16
def self.init_work_manager(sequence_readers, params, chunk_size = 100, use_json=false, skip_output=false, write_in_gzip=true)
  @@full_stats={}
  @@params= params
  @@exit = false
  @@exit_status=0

  @@ongoing_stats={}
  @@ongoing_stats[:sequence_count] = 0
  @@ongoing_stats[:smallest_sequence_size] = 900000000000000
  @@ongoing_stats[:biggest_sequence_size] = 0

  @@skip_output=skip_output
  @@write_in_gzip=write_in_gzip

  @@chunk_size = chunk_size

  checkpoint_exists=File.exists?(ScbiMapreduce::CHECKPOINT_FILE)

  # @@use_qual = !qual_path.nil? and File.exists?(qual_path)
  @@open_mode='w'
  if checkpoint_exists
    @@open_mode = 'a'
  end

  #open input file
  @@sequence_readers=sequence_readers

  # @@use_qual = @@fqr.with_qual?
  # @@use_json = use_json

  @@params.set_param('use_qual',@@sequence_readers.first.with_qual?)
  @@params.set_param('use_json',use_json)
  @@params.set_param('tuple_size',@@sequence_readers.count)

  @@use_json=use_json

  @@sequence_readers.each do |sequence_reader|
    sequence_reader.rewind
  end

  # open output files

  if !Dir.exists?(OUTPUT_PATH)
    Dir.mkdir(OUTPUT_PATH)
  end

  @@files={}

  # @@rejected_output_file=File.open(File.join(OUTPUT_PATH,'rejected.txt'),@@open_mode)

  # seqs_with_errors
  @@errors_file=File.open('errors.txt',@@open_mode)

  if @@use_json
    @@json_output=File.open('results.json',@@open_mode)
  end

  @@json_separator=''

  @@paired_output_files={}

  @@sequences_output_files={}

  @@low_complexity_output_files={}

  @@sffinfo_files={}

  @@low_sffinfo_files={}

  @@tuple_id=0

end
work_manager_finished() click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 131
def self.work_manager_finished
  @@full_stats['scbi_mapreduce']=@@stats

  puts "FULL STATS:\n" +JSON.pretty_generate(@@full_stats)

  # create stats file
  f = File.open(STATS_PATH,'w')
  f.puts JSON.pretty_generate(@@full_stats)
  f.close
end

Public Instance Methods

error_received(worker_error, obj) click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 142
def error_received(worker_error, obj)
  @@errors_file.puts "Error while processing object #{obj.inspect}\n" + worker_error.original_exception.message + ":\n" +worker_error.original_exception.backtrace.join("\n")
  @@errors_file.puts "="*60
  @@exit_status=-1
  SeqtrimWorkManager.controlled_exit

end
get_file(file_name) click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 345
def get_file(file_name)
  res_file = @@files[file_name]

  # if file is not already open, create it
  if res_file.nil?
    # create dir if necessary
    dir = File.dirname(file_name)
    if !File.exists?(dir)
      FileUtils.mkdir_p(dir)
    end

    # open file
    if @@write_in_gzip && file_name.upcase.index('.FASTQ')
      file=File.open(file_name+'.gz',@@open_mode)
      res_file=Zlib::GzipWriter.new(file)
    else
      res_file=File.open(file_name,@@open_mode)
    end

    # save it in hash for next use
    @@files[file_name]=res_file
  end

  return res_file
end
get_next_seq_from_file(file) click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 243
def get_next_seq_from_file(file)
  # find a valid and no repeated sequence in file
  begin

    n,f,q,c = file.next_seq

    if !n.nil? && @@params.repeated_seq?(n)
      @@full_stats.add_stats({'sequences' => {'count' => {'rejected' => 1}}})
      @@full_stats.add_stats({'sequences' => {'rejected' => {'repeated' => 1}}})

      get_file(File.join(OUTPUT_PATH,'rejected.txt')).puts('>'+n+ ' repeated')

    end

    if !n.nil?
      @@ongoing_stats[:sequence_count] += 1
      @@ongoing_stats[:smallest_sequence_size] = [f.size, @@ongoing_stats[:smallest_sequence_size]].min
      @@ongoing_stats[:biggest_sequence_size] = [f.size, @@ongoing_stats[:smallest_sequence_size]].max

      @@full_stats.add_stats({'sequences' => {'count' => {'input_count' => 1}}})
    end

  end while (!n.nil? && @@params.repeated_seq?(n))

  return n,f,q,c

end
load_user_checkpoint(checkpoint) click to toggle source
Calls superclass method
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 159
def load_user_checkpoint(checkpoint)
  # load full_stats from file !!!!!!!!!!!!!

  if File.exists?(STATS_PATH)

    # load stats
    text = File.read(STATS_PATH)

    # wipe text
    # text=text.grep(/^\s*[^#]/).to_s

    # decode json
    @@full_stats = JSON.parse(text)
  end

  # reset count stats since they are repeated by checkpointing

  # {
  #   "sequences": {
  #   "count": {
  #     "input_count": 1600,
  #     "output_seqs": 933,
  #     "rejected": 67
  #   },
  #   "rejected": {
  #     "short insert": 39,
  #     "contaminated": 26,
  #     "unexpected vector": 2
  #   }
  # }
  # }

  if @@full_stats['sequences']
    if @@full_stats['sequences']['count']
      # set input count to 0
      @@full_stats['sequences']['count']['input_count']=0

      # do not remove outputseqs
      # @@full_stats['sequences']['count']['output_seqs']=0
    end

    # remove rejected due to repetitions from rejected count
    if @@full_stats['sequences']['rejected']

      # it there are repeated
      if (@@full_stats['sequences']['rejected']['repeated'])

        # if repeated count > 0 and there count exists
        if (@@full_stats['sequences']['rejected']['repeated'] > 0) and @@full_stats['sequences']['count']

          # discount repeated from rejected, since they are going to be added again by checkout process
          @@full_stats['sequences']['count']['rejected'] -= @@full_stats['sequences']['rejected']['repeated']
        end

        # set repeated to 0
        @@full_stats['sequences']['rejected']['repeated']=0
      end
    end
  end


  # puts "Loaded Stats"
  # puts "FULL STATS:\n" +JSON.pretty_generate(@@full_stats)

  # TODO - remove sequences from rejected file that were added by cloned

  super
  # return checkpoint
end
next_work() click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 271
def next_work

  if @@exit
    return nil
  end

  tuple=[]
  order_in_tuple=0

  @@tuple_id += 1
  tuple_size=@@sequence_readers.count

  @@sequence_readers.each do |sequence_reader|
    n,f,q,c = get_next_seq_from_file(sequence_reader)

    if !n.nil?
      seq=SequenceWithAction.new(n,f.upcase,q,c)
      seq.tuple_id=@@tuple_id
      seq.order_in_tuple=order_in_tuple
      seq.tuple_size=tuple_size
      tuple << seq
      order_in_tuple+=1
    end

  end

  if tuple_size>1
    # check duplicated names
    names = tuple.map{|s| s.seq_name}
    
    if names.uniq.count!=tuple_size
      # puts "NAMES EQUAL IN TUPLE"
      tuple.each_with_index do |seq,i|
        # puts seq.class # seq_name
        seq.seq_name = "#{seq.seq_name}/#{i+1}"
      end
    end
  end

  # tuple is complete
  if tuple.count==tuple_size
    return tuple
  else
    return nil
  end

end
save_files(obj) click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 337
def save_files(obj)
  files=obj.output_files
  files.each do |file_name,content|
    f=get_file(file_name)
    f.puts content
  end
end
save_user_checkpoint() click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 229
def save_user_checkpoint

  f = File.open(STATS_PATH,'w')
  f.puts JSON.pretty_generate(@@full_stats)
  f.close

end
too_many_errors_received() click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 150
def too_many_errors_received
  $LOG.error "Too many errors: #{@@error_count} errors on #{@@count} executed sequences, exiting before finishing"
  @@exit_status=-1
end
trash_checkpointed_work() click to toggle source

read a work that will not be processed, only to skip until checkpoint

# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 239
def trash_checkpointed_work
  warn "Deprecated: trash_checkpointed_work was deprecated, it is automatic now"
end
work_received(obj) click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 320
def work_received(obj)

  res = obj

  # collect stats
  @@full_stats.add_stats(obj.stats)

  # print output in screen
  if !@@skip_output
    puts obj.output_text
  end

  # save results to files
  save_files(obj)

end
worker_initial_config() click to toggle source
# File lib/seqtrimnext/classes/em_classes/seqtrim_work_manager.rb, line 155
def worker_initial_config
  return @@params
end