class Pione::Notification::TaskWorkerBrokerRecipient

TaskWorkerBrokerRecipient is a recipient for task worker broker agent.

Public Class Methods

new(model, front_uri, listener_uri) click to toggle source

@param model [TaskWorkerBrokerModel]

task worker broker model

@param front_uri [URI]

URI of command front

@param listener_uri [URI]

URI of notification listener
Calls superclass method Pione::Notification::Recipient::new
# File lib/pione/notification/task-worker-broker-recipient.rb, line 11
def initialize(model, front_uri, listener_uri)
  super(front_uri, listener_uri)

  @model = model
  @tuple_space = {}
  @lock = Mutex.new

  # update broker's tuple spaces
  @thread = Thread.new do
    while true
      sleep 1
      clean
      update_broker
    end
  end
end

Public Instance Methods

clean() click to toggle source

Clean tuple space table.

# File lib/pione/notification/task-worker-broker-recipient.rb, line 68
def clean
  @lock.synchronize do
    now = Time.now
    dtime = Global.tuple_space_disconnection_time
    @tuple_space.delete_if {|_, holder| (now - holder[:last_time]) > dtime}
  end
end
get_tuple_space(uri) click to toggle source

Get a tuple space from front server at the URI.

# File lib/pione/notification/task-worker-broker-recipient.rb, line 53
def get_tuple_space(uri)
  # build a reference to provider front
  front = DRb::DRbObject.new_with_uri(uri)

  # return the tuple space reference
  Timeout.timeout(3) {front.tuple_space}
rescue Timeout::Error
  Log::Debug.notification do
    'tuple_space notfication ignored the provider "%s" that seems to be something bad' % front.uri
  end
rescue DRb::DRbConnError, DRbPatch::ReplyReaderError => e
  Log::Debug.notification('The tuple space at "%s" disconnected: %s' % [front.uri, e.message])
end
receive_tuple_space(message) click to toggle source

Receive a “tupele space” message.

# File lib/pione/notification/task-worker-broker-recipient.rb, line 35
def receive_tuple_space(message)
  uri = message["front"]
  if @tuple_space.has_key?(uri)
    @lock.synchronize {@tuple_space[uri][:last_time] = Time.now}
  else
    if tuple_space = get_tuple_space(uri)
      @lock.synchronize do
        @tuple_space[uri] = {:last_time => Time.now, :tuple_space => tuple_space}
      end
    end
  end
end
terminate() click to toggle source

Terminate the recipient.

# File lib/pione/notification/task-worker-broker-recipient.rb, line 29
def terminate
  super
  @thread.terminate
end
update_broker() click to toggle source

Update the tuple space list of broker.

# File lib/pione/notification/task-worker-broker-recipient.rb, line 77
def update_broker
  @lock.synchronize do
    tuple_spaces = @tuple_space.values.map {|holder| holder[:tuple_space]}
    @model.update_tuple_spaces(tuple_spaces)
  end
end