class SISFC::Simulation
Attributes
start_time[R]
Public Class Methods
new(opts = {})
click to toggle source
# File lib/sisfc/simulation.rb, line 18 def initialize(opts = {}) @configuration = opts[:configuration] @evaluator = opts[:evaluator] # create latency manager @latency_manager = LatencyManager.new(@configuration.latency_models) end
Public Instance Methods
evaluate_allocation(vm_allocation)
click to toggle source
# File lib/sisfc/simulation.rb, line 38 def evaluate_allocation(vm_allocation) # setup simulation start and current time @current_time = @start_time = @configuration.start_time # create data centers and store them in a repository data_center_repository = Hash[ @configuration.data_centers.map do |k,v| [ k, DataCenter.new(id: k, **v) ] end ] customer_repository = @configuration.customers workflow_type_repository = @configuration.workflow_types # initialize statistics stats = Statistics.new per_workflow_and_customer_stats = Hash[ workflow_type_repository.keys.map do |wft_id| [ wft_id, Hash[ customer_repository.keys.map do |c_id| [ c_id, Statistics.new(@configuration.custom_stats.find{|x| x[:customer_id] == c_id && x[:workflow_type_id] == wft_id } || {}) ] end ] ] end ] reqs_received_per_workflow_and_customer = Hash[ workflow_type_repository.keys.map do |wft_id| [ wft_id, Hash[customer_repository.keys.map {|c_id| [ c_id, 0 ]}] ] end ] # create VMs @vms = [] vmid = 0 vm_allocation.each do |opts| # setup service_time_distribution stdist = @configuration.service_component_types[opts[:component_type]][:service_time_distribution] # allocate the VMs opts[:vm_num].times do # create VM ... # if [ "Financial Transaction Server", "RDBMS C", "Queue Manager" ].include? opts[:component_type] # vm = VM.new(vmid, opts[:dc_id], opts[:vm_size], stdist, trace: true, notes: "ct #{opts[:component_type]}") # else # vm = VM.new(vmid, opts[:dc_id], opts[:vm_size], stdist) # end vm = VM.new(vmid, opts[:dc_id], opts[:vm_size], stdist) # ... add it to the vm list ... @vms << vm # ... and register it in the corresponding data center unless data_center_repository[opts[:dc_id]].add_vm(vm, opts[:component_type]) $stderr.puts "====== Unfeasible allocation at data center #{dc_id} ======" $stderr.flush # here we return Float::MAX instead of, e.g., Float::INFINITY, # because the latter would break optimization tools. instead, we # want to have a very high but comparable value. return Float::MAX end # update vm id vmid += 1 end end # create event queue @event_queue = SortedArray.new # puts "========== Simulation Start ==========" # generate first request rg = RequestGenerator.new(@configuration.request_generation) req_attrs = rg.generate new_event(Event::ET_REQUEST_GENERATION, req_attrs, req_attrs[:generation_time], nil) # schedule end of simulation unless @configuration.end_time.nil? # puts "Simulation ends at: #{@configuration.end_time}" new_event(Event::ET_END_OF_SIMULATION, nil, @configuration.end_time, nil) end # calculate warmup threshold warmup_threshold = @configuration.start_time + @configuration.warmup_duration.to_i requests_being_worked_on = 0 requests_forwarded_to_other_dcs = 0 current_event = 0 # launch simulation until @event_queue.empty? e = @event_queue.shift current_event += 1 # sanity check on simulation time flow if @current_time > e.time raise "Error: simulation time inconsistency for event #{current_event} " + "e.type=#{e.type} @current_time=#{@current_time}, e.time=#{e.time}" end @current_time = e.time case e.type when Event::ET_REQUEST_GENERATION req_attrs = e.data # find closest data center customer_location_id = customer_repository.dig(req_attrs[:customer_id], :location_id) dc_at_customer_location = data_center_repository.values.find {|dc| dc.location_id == customer_location_id } raise "No data center found at location id #{customer_location_id}!" unless dc_at_customer_location # find first component name for requested workflow workflow = workflow_type_repository[req_attrs[:workflow_type_id]] first_component_name = workflow[:component_sequence][0][:name] closest_dc = if dc_at_customer_location.has_vms_of_type?(first_component_name) dc_at_customer_location else data_center_repository.values.select{|dc| dc.has_vms_of_type?(first_component_name) }&.sample end raise "Invalid configuration! No VMs of type #{first_component_name} found!" unless closest_dc arrival_time = @current_time + @latency_manager.sample_latency_between(customer_location_id, closest_dc.location_id) new_req = Request.new(req_attrs.merge!(initial_data_center_id: closest_dc.dcid, arrival_time: arrival_time)) # schedule arrival of current request new_event(Event::ET_REQUEST_ARRIVAL, new_req, arrival_time, nil) # schedule generation of next request req_attrs = rg.generate new_event(Event::ET_REQUEST_GENERATION, req_attrs, req_attrs[:generation_time], nil) when Event::ET_REQUEST_ARRIVAL # get request req = e.data # find data center data_center = data_center_repository[req.data_center_id] # update reqs_received_per_workflow_and_customer reqs_received_per_workflow_and_customer[req.workflow_type_id][req.customer_id] += 1 # find next component name workflow = workflow_type_repository[req.workflow_type_id] next_component_name = workflow[:component_sequence][req.next_step][:name] # if [5,6,9].include? req.workflow_type_id # $stderr.puts "received request for wf #{req.workflow_type_id} at dc #{req.data_center_id}" # $stderr.puts "next component name is #{next_component_name}" # $stderr.flush # end # get random vm providing next service component type vm = data_center.get_random_vm(next_component_name) # if [5,6,9].include? req.workflow_type_id # $stderr.puts "next vm is #{vm.__id__}" # $stderr.flush # end # schedule request forwarding to vm new_event(Event::ET_REQUEST_FORWARDING, req, e.time, vm) # update stats if req.arrival_time > warmup_threshold # increase the number of requests being worked on requests_being_worked_on += 1 # increase count of received requests stats.request_received # increase count of received requests in per_workflow_and_customer_stats per_workflow_and_customer_stats[req.workflow_type_id][req.customer_id].request_received # if stats.received % 10_000 == 0 # $stderr.puts "#{Thread.current.__id__} sisfc: Received #{stats.received} requests." # $stderr.puts "#{Thread.current.__id__} sisfc: Working on #{requests_being_worked_on} requests." # $stderr.flush # end end # Leave these events for when we add VM migration support # when Event::ET_VM_SUSPEND # when Event::ET_VM_RESUME when Event::ET_REQUEST_FORWARDING # get request req = e.data time = e.time vm = e.destination vm.new_request(self, req, time) when Event::ET_WORKFLOW_STEP_COMPLETED # retrieve request and vm req = e.data vm = e.destination # tell the old vm that it can start processing another request vm.request_finished(self, e.time) # find data center and workflow data_center = data_center_repository[req.data_center_id] workflow = workflow_type_repository[req.workflow_type_id] # check if there are other steps left to complete the workflow if req.next_step < workflow[:component_sequence].size # find next component name next_component_name = workflow[:component_sequence][req.next_step][:name] # get random VM providing next service component type new_vm = data_center.get_random_vm(next_component_name) # this is the request's time of arrival at the new VM forwarding_time = e.time # there might not be a VM of the type we need in the current data # center, so look in the other data centers unless new_vm # get list of other data centers, randomly picked other_dcs = data_center_repository.values.select{|x| x != data_center && x.has_vms_of_type?(next_component_name) }&.shuffle other_dcs.each do |dc| new_vm = dc.get_random_vm(next_component_name) if new_vm # need to update data_center_id of request req.data_center_id = dc.dcid # keep track of transmission time transmission_time = @latency_manager.sample_latency_between(data_center.location_id, dc.location_id) unless transmission_time >= 0.0 raise "Negative transmission time (#{transmission_time})!" end # if [5,6,9].include? req.workflow_type_id # $stderr.puts "rerouting request for wf #{req.workflow_type_id} from dc #{data_center.dcid} to dc #{dc.dcid}" # $stderr.puts "next component name is #{next_component_name}" # $stderr.puts "transmission time is #{transmission_time}" # $stderr.flush # end req.update_transfer_time(transmission_time) forwarding_time += transmission_time # update request's current data_center_id req.data_center_id = dc.dcid # keep track of number of requests forwarded to other data centers requests_forwarded_to_other_dcs += 1 # we're done here break end end end # make sure we actually found a VM raise "Cannot find VM running a component of type " + "#{next_component_name} in any data center!" unless new_vm # if [5,6,9].include? req.workflow_type_id # $stderr.puts "received request for wf #{req.workflow_type_id} at dc #{req.data_center_id}" # $stderr.puts "next component name is #{next_component_name}" # $stderr.puts "next vm is #{new_vm.__id__}" # $stderr.flush # end # schedule request forwarding to vm new_event(Event::ET_REQUEST_FORWARDING, req, forwarding_time, new_vm) else # workflow is finished # calculate transmission time transmission_time = @latency_manager.sample_latency_between( # data center location data_center_repository[req.data_center_id].location_id, # customer location customer_repository.dig(req.customer_id, :location_id) ) # if [5,6,9].include? req.workflow_type_id # $stderr.puts "closing request for wf #{req.workflow_type_id}" # $stderr.puts "e.time is #{e.time}" # $stderr.puts "transmission_time is #{transmission_time}" # $stderr.flush # end unless transmission_time >= 0.0 raise "Negative transmission time (#{transmission_time})!" end # keep track of transmission time req.update_transfer_time(transmission_time) # schedule request closure new_event(Event::ET_REQUEST_CLOSURE, req, e.time + transmission_time, nil) end when Event::ET_REQUEST_CLOSURE # retrieve request and vm req = e.data # request is closed req.finished_processing(e.time) # update stats if req.arrival_time > warmup_threshold # decrease the number of requests being worked on requests_being_worked_on -= 1 # collect request statistics stats.record_request(req) # collect request statistics in per_workflow_and_customer_stats per_workflow_and_customer_stats[req.workflow_type_id][req.customer_id].record_request(req) end when Event::ET_END_OF_SIMULATION # puts "#{e.time}: end simulation" break end end # puts "========== Simulation Finished ==========" costs = @evaluator.evaluate_business_impact(stats, per_workflow_and_customer_stats, vm_allocation) puts "====== Evaluating new allocation ======\n" + "costs: #{costs}\n" + "vm_allocation: #{vm_allocation.inspect}\n" + "stats: #{stats.to_s}\n" + "per_workflow_and_customer_stats: #{per_workflow_and_customer_stats.to_s}\n" + "=======================================\n" # we want to minimize the cost, so we define fitness as the opposite of # the sum of all costs incurred fitness = - costs.values.inject(0.0){|s,x| s += x } end
new_event(type, data, time, destination)
click to toggle source
# File lib/sisfc/simulation.rb, line 27 def new_event(type, data, time, destination) e = Event.new(type, data, time, destination) @event_queue << e end
now()
click to toggle source
# File lib/sisfc/simulation.rb, line 33 def now @current_time end
Private Instance Methods
communication_latency_between(loc1, loc2)
click to toggle source
# File lib/sisfc/simulation.rb, line 387 def communication_latency_between(loc1, loc2) @latency_manager.sample_latency_between(loc1.to_i, loc2.to_i) end