class Evoc::Experiment

Attributes

opts[RW]

Public Class Methods

new(opts = Hash.new) click to toggle source
# File lib/evoc/experiment.rb, line 8
def initialize(opts = Hash.new)
  self.opts = opts
  # Set logger level
  Logging.set_level(self.opts[:logger_level])
  logger.debug "Initialized experiment with options: #{opts}"
  # setup history
  if !opts[:transactions].nil?
    Evoc::HistoryStore.initialize(path: self.opts[:transactions],case_id: self.opts[:case_id], granularity: self.opts[:granularity])
  end
end

Public Instance Methods

execute_scenarios() click to toggle source

Execute a set of scenarios

@return: json lines stream to stdout (jsonlines.org/)

# File lib/evoc/experiment.rb, line 229
def execute_scenarios

  ######
  # Setup factors
  #####

  # Factor: model size aka model/learning/history size
  factor_model_size = self.opts[:model_size].nil? ? nil : self.opts[:model_size].map {|s| [ 'model_size',s ]}
  # Factor: Max size aka filtering constraint on history
  factor_max_size = self.opts[:max_size].nil? ? nil : self.opts[:max_size].map {|s| [ 'max_size',s ]}
  # Factor: Model age aka number of commits between query and last tx in history
  factor_model_age = self.opts[:model_age].nil? ? nil : self.opts[:model_age].map {|s| [ 'model_age',s ]}
  # Factor: Measures
  factor_measures = self.opts[:measures].map {|c| ['measures',c]}
  # Factor: Aggregator
  # if aggregation is requested, we also assume that no aggregation wants to be tested
  # the non-aggregation rows are specified with 'aggregator' == nil
  factor_aggregators = self.opts[:aggregators].nil? ? nil : (self.opts[:aggregators]+[nil]).map {|a| ['aggregator',a]}
  # deprecated factor, allways set to 1 for backwards compatibility
  # factor_permutation = self.opts[:permutation].nil? ? nil : (1..self.opts[:permutation]).to_a.map {|p| [ 'permutation',p ]}
  factor_permutation =  [[ 'permutation',1 ]]

  ####
  # Iterate over the queries provided and execute each query in each scenario
  #
  # queries CSV header:
  # tx_id,tx_index,tx,query,query_size,expected_outcome,expected_outcome_size

  # Count the number of lines so we can properly format the output json
  num_lines = File.read(self.opts[:queries]).each_line.count-1
  current_line = 1

  # compact removes nil values (not used factors)
  factors = [factor_model_size,factor_max_size,factor_model_age,factor_measures,factor_permutation,factor_aggregators].compact
  num_of_scenarios = factors.inject(1) {|product,f| product * f.size}
  invalid_configuration = 0 
  last_error = 'no errors'
  CSV.foreach(self.opts[:queries], headers: true) do |query|
    # abort if the failsafe file is present
    if !self.opts[:fail_safe].nil?
      if File.exists?(self.opts[:fail_safe])
        $stderr.puts "\nFail safe detected, exiting.."
        break
      end
    end
    # get query
    query_hash = query.to_h
    # convert query string to array of items
    query_hash['query'] = query_hash['query'].split(',').map(&:to_i)
    # verify query before executing
    tx = nil
    if tx = Evoc::HistoryStore.base_history.get_tx(id: query_hash['tx_id'],id_type: :id)
      if !(query_hash['query'] - tx.items).empty?
        raise Evoc::Exceptions::ConfigurationError.new "The query generated from #{query_hash['tx_id']} was not a subset of the same tx in the loaded history. The query was: '#{query_hash['query']}', the tx was '#{tx.items}'"
      end
    else
      raise Evoc::Exceptions::ConfigurationError.new "Could not find the tx: '#{query_hash['tx_id']}' from #{self.opts[:queries]} in the history #{self.opts[:transactions]}"
    end

    current_scenario = 1
    # - the splat operator '*' turns the array into parameters for #product
    # - the block form of #product makes it lazy (i.e., the whole cartesian product isn't generated at once)
    factors.first.product(*factors[1..-1]).each do |scenario|
      params = query_hash.merge(scenario.to_h)
      params[:case_id] = self.opts[:case_id]
      params[:granularity] = self.opts[:granularity]
      # initialize scenario
      s =  Evoc::Scenario.new(params)
      scenario_stats = {}
      if self.opts[:stats]
        scenario_stats = s.stats
      end
      # Factor: Algorithm
      self.opts[:algorithms].each do |algorithm|
        s.algorithm = algorithm
        # Print progress to stderr
        STDERR.print "(#{self.opts[:case_id]}) Executing #{algorithm} on scenario #{current_scenario} of #{num_of_scenarios} on query #{current_line} of #{num_lines}"
        if invalid_configuration > 0
          STDERR.print " (scenarios skipped: #{invalid_configuration},last reason: #{last_error[0..20]}...)                             \r"
        else
          STDERR.print "                                \r"
        end

        begin
          Evoc::RecommendationCache.get_recommendation(algorithm: algorithm,
                                                       query: s.query,
                                                       model_start: s.model_start,
                                                       model_end: s.model_end,
                                                       max_size: s.max_size,
                                                       aggregator: s.aggregator,
                                                       measures: s.measures)
          Evoc::RecommendationCache.evaluate_last(evaluators: self.opts[:evaluators],
                                                  topk: self.opts[:topk],
                                                  unique_consequents: self.opts[:unique_consequents],
                                                  expected_outcome: s.expected_outcome,
                                                  measure_combination: s.measures)

          # build json line by merging hashes
          $stdout.puts s.to_h
                         .merge(scenario_stats)
                         .merge({topk: self.opts[:topk], date: tx.date})
                         .merge(Evoc::RecommendationCache.to_h(measures: s.measures))
                         .to_json
        rescue ArgumentError => e
          invalid_configuration += 1
          last_error = e.message
        end
      end
      current_scenario += 1
    end
    current_line += 1
  end
  STDERR.puts "\n(#{self.opts[:case_id]}) DONE"
end
generate_queries() click to toggle source

Generates a CSV of queries according to the given options

CSV HEADER:

tx_id, query

# File lib/evoc/experiment.rb, line 113
def generate_queries
  # initialze a random number generator with fixed seed
  rand = Random.new(self.opts[:seed])
  ##
  # write dict
  ##
  if path = self.opts[:write_dict]
    tmp = Tempfile.new('dict')
    begin
      tmp.puts("id,name")
      Evoc::HistoryStore.base_history.int_2_name.each do |id,name|
        tmp.puts("#{id},#{name}")
      end
      tmp.close
      FileUtils.mv(tmp.path,path)
    ensure
      tmp.close
      tmp.unlink
    end
  end
  ##
  # WRITE CSV HEADER
  CSV {|row| row << %W(tx_id query)}

  ###
  # Iterate over sampled tx ids
  CSV.foreach(self.opts[:transaction_ids_path], headers: true) do |row|
    tx_id = row['tx_id']
    ##
    # GET THE TRANSACTION
    if tx = Evoc::HistoryStore.base_history.get_tx(id: tx_id, id_type: :id)
      items = tx.items
      tx_size = items.size
      ##
      # SAMPLE QUERIES
      #
      # We have 3 different strategies, which may produce the same sizes,
      # but the same size does not need to be executed several times,
      # so duplicates are removed
      specified_sizes = []
      if !self.opts[:select].nil? then specified_sizes << self.opts[:select].map(&:to_i) end
      if !self.opts[:reverse_select].nil? then specified_sizes << self.opts[:reverse_select].map {|i| tx_size-i.to_i} end
      if !self.opts[:percentage].nil? then specified_sizes << self.opts[:percentage].map {|p| (p.to_f/100*tx_size).ceil} end
      # filter out sizes <= 1
      specified_sizes.flatten!.select! {|s| s > 0}
      specified_sizes.uniq!

      random_sizes = []
      if self.opts[:random_select] then random_sizes << Random.new.rand(self.opts[:minimum_query_size]..(tx_size-1)) end

      sampled_queries = []
      # only specified sizes
      if random_sizes.empty? & !specified_sizes.empty?
        sampled_queries = specified_sizes.map {|s| items.sample(s, random: rand)}
      # only random sizes
      elsif !random_sizes.empty? & specified_sizes.empty?
        sampled_queries = random_sizes.map {|s| items.sample(s, random: rand)}
      # random + specified = randomly sample in range defined by specified
      # ex:
      # specified = [1,3,10,20]
      # tx size = 4
      #
      # 1. remove X in specified that are larger than or equal to 4
      # 2. randomly select X in specified = Y
      # 3. randomly select Y in tx
      elsif !random_sizes.empty? & !specified_sizes.empty?
        specified_sizes.select! {|s| (s < tx_size) & (s >= self.opts[:minimum_query_size])} #1.
        if randomly_sampled_size = specified_sizes.sample(random: rand) #2.
          sampled_queries = [items.sample(randomly_sampled_size, random: rand)] #3.
        end
      end

      if sampled_queries.empty?
        logger.warn "Unable to generate query from tx: #{items}, with params #{self.opts}"
      end

      ##
      # WRITE CSV
      sampled_queries.each do |query| 
        if query.size == tx_size
          logger.debug "The size of the sampled query was equal to the size of the transaction, skipping.. Tx ID: #{tx_id}. Query size: #{query.size}"
          next
        end
        if query.size < self.opts[:minimum_query_size]
          next
        end
        CSV {|row| row << [tx_id,query.join(',')]}
      end
    else
      raise ArgumentError, "The tx with id '#{tx_id}' was not found in the history: #{self.opts[:transactions]}, wrong file?"
    end
  end

  #TODO  possibly move this to execution
  #############################################
  # Filter the generated queries if requested #
  #############################################
  #
  #
  #      logger.debug "Number of queries before filtering: #{query_store.size}"
  #
  #      if self.opts[:filter_expected_outcome]
  #        STDERR.puts "Filtering expected outcome.."
  #        init_size = query_store.size
  #        query_store.filter_expected_outcome!
  #        STDERR.puts "Had to remove #{init_size - query_store.size} queries as a result of the expected outcome now being empty"
  #      end
  #      if query_store.empty?
  #        STDERR.puts "WARNING: Returning 0 queries (maybe increase sample size?)"
  #      end
end
sample_transactions() click to toggle source
# File lib/evoc/experiment.rb, line 19
def sample_transactions
  # initialze a random number generator with fixed seed
  rand = Random.new(self.opts[:seed])
  # by default we can sample from the whole history
  sampling_history = Evoc::HistoryStore.base_history
  STDERR.puts "Sampling transactions from a pool of #{sampling_history.size}.."
  sample = []

  #################################################################################
  # performing filtering steps on min/max commits size and minimum previous history
  #################################################################################

  if !self.opts[:recent].nil?
    size = sampling_history.size
    sampling_history = sampling_history[[0,size-self.opts[:recent]].max..-1]
    STDERR.puts "    Filtering to the #{self.opts[:recent]} most recent transactions (new pool size: #{sampling_history.size})"
  end
  # filter out transactions larger than X
  if !self.opts[:minimum_commit_size].nil?
    sampling_history = sampling_history.select {|tx| tx.size >= self.opts[:minimum_commit_size]}
    STDERR.puts "    Filtering to txes larger than or equal to #{self.opts[:minimum_commit_size]} (new pool size: #{sampling_history.size})"
  end
  if !self.opts[:maximum_commit_size].nil?
    sampling_history = sampling_history.select {|tx| tx.size <= self.opts[:maximum_commit_size]}
    STDERR.puts "    Filtering to txes smaller than or equal to #{self.opts[:maximum_commit_size]} (new pool size: #{sampling_history.size})"
  end
  # only sample transactions that have at least 'minimum_history' previous history
  if !self.opts[:minimum_history].nil?
    sampling_history = sampling_history.select {|tx| tx.index >= self.opts[:minimum_history]}
    STDERR.puts "    Filtering to txes with at least #{self.opts[:minimum_history]} previous txes (new pool size: #{sampling_history.size})"
  end
  if !self.opts[:recent_viable].nil?
    size = sampling_history.size
    sampling_history = sampling_history[[0,size-self.opts[:recent_viable]].max..-1]
    STDERR.puts "    Filtering to the #{self.opts[:recent_viable]} most recent viable transactions (new pool size: #{sampling_history.size})"
  end

  filtering_switches = [:recent,:recent_viable,:minimum_commit_size,:maximum_commit_size,:minimum_history]
  if filtering_switches.any? {|s| !self.opts[s].nil?}
    if sampling_history.size == 0
      STDERR.puts "WARNING: All transactions were filtered out, unable to sample"
      return []
    end
  end

  if self.opts[:sample_size] > sampling_history.size 
    STDERR.puts "WARNING: The sample size is larger than the available transactions"
  end

  ######################
  # performing sampling
  ######################

  # group the txes by size
  groups = sampling_history.group_by {|tx| tx.size}
  # sort the sample_groups option to reduce the need for maintaining control over which txes that have been sampled
  # i.e., random sampling is done first, then the sampled txes are removed from the sampling
  tx_sizes_to_sample_from = self.opts[:sample_groups].sort_by(&:to_s)
  tx_sizes_to_sample_from.each do |group_size|
    if group_size == '*'
      sampled_ids = sampling_history.map(&:id).sample(self.opts[:sample_size], random: rand)
      sample << sampled_ids
      STDERR.puts "Sampled #{sampled_ids.size} txes"
      # remove sampled txes from sampling_history
      filtered_hist = sampling_history.reject {|tx| sampled_ids.include? tx.id}
      sampling_history.clear
      filtered_hist.each {|tx| sampling_history << tx}
    elsif group_size.to_i
      # check if there were any txes of this size
      if group = groups[group_size.to_i]
        if group.size < self.opts[:sample_size]
          logger.warn "Only #{group.size} transactions found of size #{group_size}, asked for #{self.opts[:sample_size]}"
        end
        sampled_ids = group.sample(self.opts[:sample_size], random: rand).map(&:id)
        sample << sampled_ids
        STDERR.puts "Sampled #{sampled_ids.size} txes of size #{group_size}"
      else
        logger.warn "No transactions found of size #{group_size}, asked for #{self.opts[:sample_size]} (minimum history: #{self.opts[:minimum_history]})"
      end
    else
      raise ArgumentError.new, "Tx size for sampling must either be specified by an Integer or '*' (was #{group_size}:#{group_size.class})"
    end
  end
  sample.flatten.uniq
end