class Pione::Model::TaskWorkerBrokerModel

Attributes

task_workers[RW]
tuple_space[RW]
tuple_space_lock[R]

Public Class Methods

new() click to toggle source
Calls superclass method Rootage::Model::new
# File lib/pione/model/task-worker-broker-model.rb, line 8
def initialize
  super

  @spawnings = 0                  # number of current spawning task worker
  @task_worker_lock = Monitor.new # lock for task worker table
  @task_workers = Array.new       # known task worker fronts
  @tuple_space_lock = Monitor.new
  @tuple_space = Hash.new         # known tuple space table

  self[:spawn_task_worker] = true
end

Public Instance Methods

add_tuple_space(tuple_space) click to toggle source

Add the tuple space.

# File lib/pione/model/task-worker-broker-model.rb, line 21
def add_tuple_space(tuple_space)
  uuid = tuple_space.uuid

  # update tuple space table with the id
  @tuple_space_lock.synchronize {@tuple_space[uuid] = tuple_space}
end
create_task_worker(tuple_space) click to toggle source

Create a task worker for the tuple space. This method returns true if we suceeds to spawn the task worker, or returns false.

# File lib/pione/model/task-worker-broker-model.rb, line 30
def create_task_worker(tuple_space)
  res = true

  @task_worker_lock.synchronize do
    @spawnings += 1

    # spawn a new process of pione-task-worker command
    if self[:spawn_task_worker]
      # make task worker's parameters
      param = {
        :features => Global.features,
        :tuple_space_id => tuple_space.uuid
      }

      begin
        spawner = Command::PioneTaskWorker.spawn(self, param)
        @task_workers << spawner.child_front
        spawner.when_terminated {delete_task_worker(spawner.child_front)}
      rescue Command::SpawnError => e
        Log::Debug.system("Task worker broker agent failed to spawn a task worker: %s" % e.message)
        res = false
      end
    else
      @task_workers << Agent::TaskWorker.start(tuple_space, Global.expressional_features, @env)
    end

    @spawnings -= 1
  end

  return res
end
delete_dead_task_workers() click to toggle source

Delete all dead task workers.

# File lib/pione/model/task-worker-broker-model.rb, line 67
def delete_dead_task_workers
  @task_worker_lock.synchronize do
    @task_workers.delete_if do |worker|
      not(Util.ignore_exception {timeout(1) {worker.ping}})
    end
  end
end
delete_dead_tuple_spaces() click to toggle source

Delete all dead tuple spaces.

# File lib/pione/model/task-worker-broker-model.rb, line 76
def delete_dead_tuple_spaces
  @tuple_space_lock.synchronize do
    @tuple_space.delete_if do |_, space|
      not(Util.ignore_exception {timeout(1) {space.ping}})
    end
  end
end
delete_task_worker(worker) click to toggle source
# File lib/pione/model/task-worker-broker-model.rb, line 62
def delete_task_worker(worker)
  @task_worker_lock.synchronize {@task_workers.delete(worker)}
end
excess_task_workers() click to toggle source

Return excess number of task workers belong to the broker.

# File lib/pione/model/task-worker-broker-model.rb, line 85
def excess_task_workers
  @task_worker_lock.synchronize do
    @task_worker_size - @task_workers.size - @spawnings
  end
end
get_tuple_space(tuple_space_id) click to toggle source

Get the tuple space.

# File lib/pione/model/task-worker-broker-model.rb, line 92
def get_tuple_space(tuple_space_id)
  @tuple_space_lock.synchronize {@tuple_space[tuple_space_id]}
end
quantity() click to toggle source

Return number of task workers the broker manages.

# File lib/pione/model/task-worker-broker-model.rb, line 97
def quantity
  @task_worker_lock.synchronize {@task_workers.size}
end
terminate_task_worker_if(&condition) click to toggle source

Terminate first task worker that satisfies the condition.

# File lib/pione/model/task-worker-broker-model.rb, line 102
def terminate_task_worker_if(&condition)
  @task_worker_lock.synchronize do
    @task_workers.each do |worker|
      if condition.call(worker)
        worker.terminate
        @task_workers.delete(worker)
        return true
      end
    end
  end
  return false
end
tuple_spaces() click to toggle source

Return known tuple spaces.

# File lib/pione/model/task-worker-broker-model.rb, line 116
def tuple_spaces
  @tuple_space_lock.synchronize {@tuple_space.values}
end
update_tuple_spaces(tuple_spaces) click to toggle source

Update tuple space list.

# File lib/pione/model/task-worker-broker-model.rb, line 121
def update_tuple_spaces(tuple_spaces)
  @tuple_space_lock.synchronize do
    # clear and update tuple space list
    @tuple_space.clear
    tuple_spaces.each {|tuple_space| add_tuple_space(tuple_space)}

    Log::Debug.system do
      list = @tuple_space.values.map{|space| space.__drburi}
      "Task worker broker has updated tuple space table: %s" % [list]
    end
  end
end