class Evoc::Experiment
Attributes
opts[RW]
Public Class Methods
new(opts = Hash.new)
click to toggle source
# File lib/evoc/experiment.rb, line 8 def initialize(opts = Hash.new) self.opts = opts # Set logger level Logging.set_level(self.opts[:logger_level]) logger.debug "Initialized experiment with options: #{opts}" # setup history if !opts[:transactions].nil? Evoc::HistoryStore.initialize(path: self.opts[:transactions],case_id: self.opts[:case_id], granularity: self.opts[:granularity]) end end
Public Instance Methods
execute_scenarios()
click to toggle source
Execute a set of scenarios
@return: json lines stream to stdout (jsonlines.org/)
# File lib/evoc/experiment.rb, line 229 def execute_scenarios ###### # Setup factors ##### # Factor: model size aka model/learning/history size factor_model_size = self.opts[:model_size].nil? ? nil : self.opts[:model_size].map {|s| [ 'model_size',s ]} # Factor: Max size aka filtering constraint on history factor_max_size = self.opts[:max_size].nil? ? nil : self.opts[:max_size].map {|s| [ 'max_size',s ]} # Factor: Model age aka number of commits between query and last tx in history factor_model_age = self.opts[:model_age].nil? ? nil : self.opts[:model_age].map {|s| [ 'model_age',s ]} # Factor: Measures factor_measures = self.opts[:measures].map {|c| ['measures',c]} # Factor: Aggregator # if aggregation is requested, we also assume that no aggregation wants to be tested # the non-aggregation rows are specified with 'aggregator' == nil factor_aggregators = self.opts[:aggregators].nil? ? nil : (self.opts[:aggregators]+[nil]).map {|a| ['aggregator',a]} # deprecated factor, allways set to 1 for backwards compatibility # factor_permutation = self.opts[:permutation].nil? ? nil : (1..self.opts[:permutation]).to_a.map {|p| [ 'permutation',p ]} factor_permutation = [[ 'permutation',1 ]] #### # Iterate over the queries provided and execute each query in each scenario # # queries CSV header: # tx_id,tx_index,tx,query,query_size,expected_outcome,expected_outcome_size # Count the number of lines so we can properly format the output json num_lines = File.read(self.opts[:queries]).each_line.count-1 current_line = 1 # compact removes nil values (not used factors) factors = [factor_model_size,factor_max_size,factor_model_age,factor_measures,factor_permutation,factor_aggregators].compact num_of_scenarios = factors.inject(1) {|product,f| product * f.size} invalid_configuration = 0 last_error = 'no errors' CSV.foreach(self.opts[:queries], headers: true) do |query| # abort if the failsafe file is present if !self.opts[:fail_safe].nil? if File.exists?(self.opts[:fail_safe]) $stderr.puts "\nFail safe detected, exiting.." break end end # get query query_hash = query.to_h # convert query string to array of items query_hash['query'] = query_hash['query'].split(',').map(&:to_i) # verify query before executing tx = nil if tx = Evoc::HistoryStore.base_history.get_tx(id: query_hash['tx_id'],id_type: :id) if !(query_hash['query'] - tx.items).empty? raise Evoc::Exceptions::ConfigurationError.new "The query generated from #{query_hash['tx_id']} was not a subset of the same tx in the loaded history. The query was: '#{query_hash['query']}', the tx was '#{tx.items}'" end else raise Evoc::Exceptions::ConfigurationError.new "Could not find the tx: '#{query_hash['tx_id']}' from #{self.opts[:queries]} in the history #{self.opts[:transactions]}" end current_scenario = 1 # - the splat operator '*' turns the array into parameters for #product # - the block form of #product makes it lazy (i.e., the whole cartesian product isn't generated at once) factors.first.product(*factors[1..-1]).each do |scenario| params = query_hash.merge(scenario.to_h) params[:case_id] = self.opts[:case_id] params[:granularity] = self.opts[:granularity] # initialize scenario s = Evoc::Scenario.new(params) scenario_stats = {} if self.opts[:stats] scenario_stats = s.stats end # Factor: Algorithm self.opts[:algorithms].each do |algorithm| s.algorithm = algorithm # Print progress to stderr STDERR.print "(#{self.opts[:case_id]}) Executing #{algorithm} on scenario #{current_scenario} of #{num_of_scenarios} on query #{current_line} of #{num_lines}" if invalid_configuration > 0 STDERR.print " (scenarios skipped: #{invalid_configuration},last reason: #{last_error[0..20]}...) \r" else STDERR.print " \r" end begin Evoc::RecommendationCache.get_recommendation(algorithm: algorithm, query: s.query, model_start: s.model_start, model_end: s.model_end, max_size: s.max_size, aggregator: s.aggregator, measures: s.measures) Evoc::RecommendationCache.evaluate_last(evaluators: self.opts[:evaluators], topk: self.opts[:topk], unique_consequents: self.opts[:unique_consequents], expected_outcome: s.expected_outcome, measure_combination: s.measures) # build json line by merging hashes $stdout.puts s.to_h .merge(scenario_stats) .merge({topk: self.opts[:topk], date: tx.date}) .merge(Evoc::RecommendationCache.to_h(measures: s.measures)) .to_json rescue ArgumentError => e invalid_configuration += 1 last_error = e.message end end current_scenario += 1 end current_line += 1 end STDERR.puts "\n(#{self.opts[:case_id]}) DONE" end
generate_queries()
click to toggle source
Generates a CSV of queries according to the given options
CSV HEADER:
tx_id, query
# File lib/evoc/experiment.rb, line 113 def generate_queries # initialze a random number generator with fixed seed rand = Random.new(self.opts[:seed]) ## # write dict ## if path = self.opts[:write_dict] tmp = Tempfile.new('dict') begin tmp.puts("id,name") Evoc::HistoryStore.base_history.int_2_name.each do |id,name| tmp.puts("#{id},#{name}") end tmp.close FileUtils.mv(tmp.path,path) ensure tmp.close tmp.unlink end end ## # WRITE CSV HEADER CSV {|row| row << %W(tx_id query)} ### # Iterate over sampled tx ids CSV.foreach(self.opts[:transaction_ids_path], headers: true) do |row| tx_id = row['tx_id'] ## # GET THE TRANSACTION if tx = Evoc::HistoryStore.base_history.get_tx(id: tx_id, id_type: :id) items = tx.items tx_size = items.size ## # SAMPLE QUERIES # # We have 3 different strategies, which may produce the same sizes, # but the same size does not need to be executed several times, # so duplicates are removed specified_sizes = [] if !self.opts[:select].nil? then specified_sizes << self.opts[:select].map(&:to_i) end if !self.opts[:reverse_select].nil? then specified_sizes << self.opts[:reverse_select].map {|i| tx_size-i.to_i} end if !self.opts[:percentage].nil? then specified_sizes << self.opts[:percentage].map {|p| (p.to_f/100*tx_size).ceil} end # filter out sizes <= 1 specified_sizes.flatten!.select! {|s| s > 0} specified_sizes.uniq! random_sizes = [] if self.opts[:random_select] then random_sizes << Random.new.rand(self.opts[:minimum_query_size]..(tx_size-1)) end sampled_queries = [] # only specified sizes if random_sizes.empty? & !specified_sizes.empty? sampled_queries = specified_sizes.map {|s| items.sample(s, random: rand)} # only random sizes elsif !random_sizes.empty? & specified_sizes.empty? sampled_queries = random_sizes.map {|s| items.sample(s, random: rand)} # random + specified = randomly sample in range defined by specified # ex: # specified = [1,3,10,20] # tx size = 4 # # 1. remove X in specified that are larger than or equal to 4 # 2. randomly select X in specified = Y # 3. randomly select Y in tx elsif !random_sizes.empty? & !specified_sizes.empty? specified_sizes.select! {|s| (s < tx_size) & (s >= self.opts[:minimum_query_size])} #1. if randomly_sampled_size = specified_sizes.sample(random: rand) #2. sampled_queries = [items.sample(randomly_sampled_size, random: rand)] #3. end end if sampled_queries.empty? logger.warn "Unable to generate query from tx: #{items}, with params #{self.opts}" end ## # WRITE CSV sampled_queries.each do |query| if query.size == tx_size logger.debug "The size of the sampled query was equal to the size of the transaction, skipping.. Tx ID: #{tx_id}. Query size: #{query.size}" next end if query.size < self.opts[:minimum_query_size] next end CSV {|row| row << [tx_id,query.join(',')]} end else raise ArgumentError, "The tx with id '#{tx_id}' was not found in the history: #{self.opts[:transactions]}, wrong file?" end end #TODO possibly move this to execution ############################################# # Filter the generated queries if requested # ############################################# # # # logger.debug "Number of queries before filtering: #{query_store.size}" # # if self.opts[:filter_expected_outcome] # STDERR.puts "Filtering expected outcome.." # init_size = query_store.size # query_store.filter_expected_outcome! # STDERR.puts "Had to remove #{init_size - query_store.size} queries as a result of the expected outcome now being empty" # end # if query_store.empty? # STDERR.puts "WARNING: Returning 0 queries (maybe increase sample size?)" # end end
sample_transactions()
click to toggle source
# File lib/evoc/experiment.rb, line 19 def sample_transactions # initialze a random number generator with fixed seed rand = Random.new(self.opts[:seed]) # by default we can sample from the whole history sampling_history = Evoc::HistoryStore.base_history STDERR.puts "Sampling transactions from a pool of #{sampling_history.size}.." sample = [] ################################################################################# # performing filtering steps on min/max commits size and minimum previous history ################################################################################# if !self.opts[:recent].nil? size = sampling_history.size sampling_history = sampling_history[[0,size-self.opts[:recent]].max..-1] STDERR.puts " Filtering to the #{self.opts[:recent]} most recent transactions (new pool size: #{sampling_history.size})" end # filter out transactions larger than X if !self.opts[:minimum_commit_size].nil? sampling_history = sampling_history.select {|tx| tx.size >= self.opts[:minimum_commit_size]} STDERR.puts " Filtering to txes larger than or equal to #{self.opts[:minimum_commit_size]} (new pool size: #{sampling_history.size})" end if !self.opts[:maximum_commit_size].nil? sampling_history = sampling_history.select {|tx| tx.size <= self.opts[:maximum_commit_size]} STDERR.puts " Filtering to txes smaller than or equal to #{self.opts[:maximum_commit_size]} (new pool size: #{sampling_history.size})" end # only sample transactions that have at least 'minimum_history' previous history if !self.opts[:minimum_history].nil? sampling_history = sampling_history.select {|tx| tx.index >= self.opts[:minimum_history]} STDERR.puts " Filtering to txes with at least #{self.opts[:minimum_history]} previous txes (new pool size: #{sampling_history.size})" end if !self.opts[:recent_viable].nil? size = sampling_history.size sampling_history = sampling_history[[0,size-self.opts[:recent_viable]].max..-1] STDERR.puts " Filtering to the #{self.opts[:recent_viable]} most recent viable transactions (new pool size: #{sampling_history.size})" end filtering_switches = [:recent,:recent_viable,:minimum_commit_size,:maximum_commit_size,:minimum_history] if filtering_switches.any? {|s| !self.opts[s].nil?} if sampling_history.size == 0 STDERR.puts "WARNING: All transactions were filtered out, unable to sample" return [] end end if self.opts[:sample_size] > sampling_history.size STDERR.puts "WARNING: The sample size is larger than the available transactions" end ###################### # performing sampling ###################### # group the txes by size groups = sampling_history.group_by {|tx| tx.size} # sort the sample_groups option to reduce the need for maintaining control over which txes that have been sampled # i.e., random sampling is done first, then the sampled txes are removed from the sampling tx_sizes_to_sample_from = self.opts[:sample_groups].sort_by(&:to_s) tx_sizes_to_sample_from.each do |group_size| if group_size == '*' sampled_ids = sampling_history.map(&:id).sample(self.opts[:sample_size], random: rand) sample << sampled_ids STDERR.puts "Sampled #{sampled_ids.size} txes" # remove sampled txes from sampling_history filtered_hist = sampling_history.reject {|tx| sampled_ids.include? tx.id} sampling_history.clear filtered_hist.each {|tx| sampling_history << tx} elsif group_size.to_i # check if there were any txes of this size if group = groups[group_size.to_i] if group.size < self.opts[:sample_size] logger.warn "Only #{group.size} transactions found of size #{group_size}, asked for #{self.opts[:sample_size]}" end sampled_ids = group.sample(self.opts[:sample_size], random: rand).map(&:id) sample << sampled_ids STDERR.puts "Sampled #{sampled_ids.size} txes of size #{group_size}" else logger.warn "No transactions found of size #{group_size}, asked for #{self.opts[:sample_size]} (minimum history: #{self.opts[:minimum_history]})" end else raise ArgumentError.new, "Tx size for sampling must either be specified by an Integer or '*' (was #{group_size}:#{group_size.class})" end end sample.flatten.uniq end