class Pione::Notification::TaskWorkerBrokerRecipient
TaskWorkerBrokerRecipient
is a recipient for task worker broker agent.
Public Class Methods
new(model, front_uri, listener_uri)
click to toggle source
@param model [TaskWorkerBrokerModel]
task worker broker model
@param front_uri [URI]
URI of command front
@param listener_uri [URI]
URI of notification listener
Calls superclass method
Pione::Notification::Recipient::new
# File lib/pione/notification/task-worker-broker-recipient.rb, line 11 def initialize(model, front_uri, listener_uri) super(front_uri, listener_uri) @model = model @tuple_space = {} @lock = Mutex.new # update broker's tuple spaces @thread = Thread.new do while true sleep 1 clean update_broker end end end
Public Instance Methods
clean()
click to toggle source
Clean tuple space table.
# File lib/pione/notification/task-worker-broker-recipient.rb, line 68 def clean @lock.synchronize do now = Time.now dtime = Global.tuple_space_disconnection_time @tuple_space.delete_if {|_, holder| (now - holder[:last_time]) > dtime} end end
get_tuple_space(uri)
click to toggle source
Get a tuple space from front server at the URI
.
# File lib/pione/notification/task-worker-broker-recipient.rb, line 53 def get_tuple_space(uri) # build a reference to provider front front = DRb::DRbObject.new_with_uri(uri) # return the tuple space reference Timeout.timeout(3) {front.tuple_space} rescue Timeout::Error Log::Debug.notification do 'tuple_space notfication ignored the provider "%s" that seems to be something bad' % front.uri end rescue DRb::DRbConnError, DRbPatch::ReplyReaderError => e Log::Debug.notification('The tuple space at "%s" disconnected: %s' % [front.uri, e.message]) end
receive_tuple_space(message)
click to toggle source
Receive a “tupele space” message.
# File lib/pione/notification/task-worker-broker-recipient.rb, line 35 def receive_tuple_space(message) uri = message["front"] if @tuple_space.has_key?(uri) @lock.synchronize {@tuple_space[uri][:last_time] = Time.now} else if tuple_space = get_tuple_space(uri) @lock.synchronize do @tuple_space[uri] = {:last_time => Time.now, :tuple_space => tuple_space} end end end end
terminate()
click to toggle source
Terminate the recipient.
Calls superclass method
Pione::Notification::Recipient#terminate
# File lib/pione/notification/task-worker-broker-recipient.rb, line 29 def terminate super @thread.terminate end
update_broker()
click to toggle source
Update the tuple space list of broker.
# File lib/pione/notification/task-worker-broker-recipient.rb, line 77 def update_broker @lock.synchronize do tuple_spaces = @tuple_space.values.map {|holder| holder[:tuple_space]} @model.update_tuple_spaces(tuple_spaces) end end