class BlackStack::Dispatcher

Attributes

allowing_function[RW]

additional function to decide if the worker can dispatch or not example: use this function when you want to decide based on the remaining credits of the client it should returns true or false keep it nil if you want it returns always true

field_end_time[RW]
field_id[RW]
field_primary_key[RW]
field_start_time[RW]
field_time[RW]
field_times[RW]
finisher_function[RW]

additional function to perform the update on a record to flag the finishing of the job by default this function will set the :field_end_time field with the current datetime keep this parameter nil if you want to use the default algorithm

max_job_duration_minutes[RW]

max number of minutes that a job should take to process. if :end_time keep nil x minutes after :start_time, that's considered as the job has failed or interrumped

max_try_times[RW]

max number of times that a record can start to process & fail (:start_time field is not nil, but :end_time field is still nil after :max_job_duration_minutes)

name[RW]
occupied_function[RW]

additional function to returns an array of objects pending to be processed by a worker. it should returns an array keep it nil if you want to run the default function

queue_size[RW]

max number of records assigned to a worker that have not started (:start_time field is nil)

relauncher_function[RW]

additional function to perform the update on a record to retry keep this parameter nil if you want to use the default algorithm

relaunching_function[RW]

additional function to choose the records to retry keep this parameter nil if you want to use the default algorithm

selecting_function[RW]

additional function to choose the records to launch it should returns an array of IDs keep this parameter nil if you want to use the default algorithm

starter_function[RW]

additional function to perform the update on a record to flag the starting of the job by default this function will set the :field_start_time field with the current datetime, and it will increase the :field_times counter keep this parameter nil if you want to use the default algorithm

table[RW]

database information :field_times, :field_start_time and :field_end_time maybe nil

Public Class Methods

new(h) click to toggle source

setup dispatcher configuration here

# File lib/pampa_dispatcher.rb, line 54
def initialize(h)
  self.name = h[:name]
  self.table = h[:table]
  self.field_primary_key = h[:field_primary_key]
  self.field_id = h[:field_id]
  self.field_time = h[:field_time]
  self.field_times = h[:field_times]
  self.field_start_time = h[:field_start_time]
  self.field_end_time = h[:field_end_time]
  self.queue_size = h[:queue_size]
  self.max_job_duration_minutes = h[:max_job_duration_minutes]  
  self.max_try_times = h[:max_try_times]
  self.occupied_function = h[:occupied_function]
  self.allowing_function = h[:allowing_function]
  self.selecting_function = h[:selecting_function]
  self.relaunching_function = h[:relaunching_function]
  self.relauncher_function = h[:relauncher_function]
end

Public Instance Methods

allowing(worker) click to toggle source

decide if the worker can dispatch or not example: use this function when you want to decide based on the remaining credits of the client returns always true

# File lib/pampa_dispatcher.rb, line 99
def allowing(worker)
  if self.allowing_function.nil?
    return true
  else
    # TODO: validar que retorna true o false
    return self.allowing_function.call(worker, self)
  end
end
available_slots(worker) click to toggle source

returns the number of free slots in the procesing queue of this worker

# File lib/pampa_dispatcher.rb, line 86
def available_slots(worker)
  occupied = self.occupied_slots(worker).size
  allowed = self.queue_size
  if occupied > allowed
    return 0
  else
    return allowed - occupied
  end
end
finish(o) click to toggle source
# File lib/pampa_dispatcher.rb, line 162
def finish(o)
  if self.finisher_function.nil?
    o[self.field_end_time.to_sym] = now() if !self.field_end_time.nil?
    o.save
  else
    self.finisher_function.call(o, self)
  end
end
occupied_slots(worker) click to toggle source

returns an array of objects pending to be processed by the worker. it will select the records with :reservation_id == worker.id, and :start_time == nil

# File lib/pampa_dispatcher.rb, line 75
def occupied_slots(worker)
  if self.occupied_function.nil?
    return self.table.where(self.field_id.to_sym => worker.id, self.field_start_time.to_sym => nil).all if !self.field_start_time.nil?
    return self.table.where(self.field_id.to_sym => worker.id).all if self.field_start_time.nil?
  else
    # TODO: validar que retorna un entero
    return self.occupied_function.call(worker, self)
  end
end
relaunch(o) click to toggle source
# File lib/pampa_dispatcher.rb, line 144
def relaunch(o)
  o[self.field_id.to_sym] = nil
  o[self.field_time.to_sym] = nil
  o[self.field_start_time.to_sym] = nil if !self.field_start_time.nil?
  o[self.field_end_time.to_sym] = nil if !self.field_end_time.nil?
  o.save      
end
relaunching(worker, n) click to toggle source
# File lib/pampa_dispatcher.rb, line 135
def relaunching(worker, n)
  if self.relaunching_function.nil?
    return self.relaunching_dataset(worker, n).map { |o| o[self.field_primary_key.to_sym] }
  else
    # TODO: validar que retorna un array de strings
    return self.relaunching_function.call(worker, self, n)
  end
end
relaunching_dataset(worker, n) click to toggle source

choose the records to retry returns an array of IDs

# File lib/pampa_dispatcher.rb, line 128
    def relaunching_dataset(worker, n)
      ds = self.table.select(self.field_primary_key.to_sym).where("#{self.field_time.to_s} < '#{(Time.now - 60*self.max_job_duration_minutes.to_i).strftime('%Y-%m-%d %H:%M:%S').to_s}'")
      ds = ds.filter("#{self.field_end_time.to_s} IS NULL") if !self.field_end_time.nil?  
#      ds = ds.filter("( #{self.field_times.to_s} IS NULL OR #{self.field_times.to_s} < #{self.max_try_times.to_s} ) ") if !self.field_times.nil?
      ds = ds.limit(n)
    end
run_dispatch(worker) click to toggle source

dispatch records returns the # of records dispatched

# File lib/pampa_dispatcher.rb, line 189
def run_dispatch(worker)
  # get # of available slots
  n = self.available_slots(worker)
  
  # dispatching n pending records
  i = 0
  if n>0
    self.selecting(worker, n).each { |id|
      # count the # of dispatched
      i += 1
      # dispatch records
      o = self.table.where(self.field_primary_key.to_sym => id).first
      o[self.field_id.to_sym] = worker.id
      o[self.field_time.to_sym] = now()
      o[self.field_start_time.to_sym] = nil if !self.field_start_time.nil?
      o[self.field_end_time.to_sym] = nil if !self.field_end_time.nil?
      o.save
      # release resources
      DB.disconnect
      GC.start        
    }
  end
  
  #
  return i
end
run_relaunch(worker) click to toggle source

relaunch records

# File lib/pampa_dispatcher.rb, line 172
def run_relaunch(worker)
  # relaunch failed records
  self.relaunching(worker, self.queue_size).each { |id|
    o = self.table.where(self.field_primary_key.to_sym => id).first
    if self.relauncher_function.nil?
      self.relaunch(o)
    else
      self.relauncher_function.call(o, self)
    end
    # release resources
    DB.disconnect
    GC.start
  }
end
selecting(worker, n) click to toggle source
# File lib/pampa_dispatcher.rb, line 117
def selecting(worker, n)
  if self.selecting_function.nil?
    return self.selecting_dataset(worker, n).map { |o| o[self.field_primary_key.to_sym] }
  else
    # TODO: validar que retorna un array de strings
    return self.selecting_function.call(worker, self, n)
  end
end
selecting_dataset(worker, n) click to toggle source

choose the records to dispatch returns an array of IDs

# File lib/pampa_dispatcher.rb, line 110
def selecting_dataset(worker, n)
  ds = self.table.select(self.field_primary_key.to_sym).where(self.field_id.to_sym => nil) 
  ds = ds.filter(self.field_end_time.to_sym => nil) if !self.field_end_time.nil?  
  ds = ds.filter("#{self.field_times.to_s} IS NULL OR #{self.field_times.to_s} < #{self.max_try_times.to_s}") if !self.field_times.nil? 
  ds.limit(n)
end
start(o) click to toggle source
# File lib/pampa_dispatcher.rb, line 152
def start(o)
  if self.starter_function.nil?
    o[self.field_start_time.to_sym] = now() if !self.field_start_time.nil?
    o[self.field_times.to_sym] = o[self.field_times.to_sym].to_i + 1
    o.save
  else
    self.starter_function.call(o, self)
  end
end