class Pione::Agent::NotificationListener
Attributes
notification_handlers[R]
notification handler threads
Public Class Methods
new(model, uri)
click to toggle source
@param model [NotificationListenerModel]
notification-listner model
@param uri [URI]
listening port
Calls superclass method
Pione::Agent::BasicAgent::new
# File lib/pione/agent/notification-listener.rb, line 15 def initialize(model, uri) super() @uri = uri @model = model @notification_handlers = ThreadGroup.new @lock = Mutex.new end
Public Instance Methods
transit_to_init()
click to toggle source
Initialize the agent.
# File lib/pione/agent/notification-listener.rb, line 37 def transit_to_init Log::SystemLog.info('Notification listener starts listening notification messages on "%s".' % @uri) @receiver = Notification::Receiver.new(@uri) end
transit_to_receive()
click to toggle source
Receive notification messages and make a message handler thread.
# File lib/pione/agent/notification-listener.rb, line 43 def transit_to_receive # receive a notification transmitter_host, message = @receiver.receive # handle the notification in new thread thread = Util::FreeThreadGenerator.generate do handle_notification(transmitter_host, message) end @notification_handlers.add(thread) rescue StandardError => e Log::Debug.notification("Receiver agent has received bad data from %s, so it ignored and reopen socket." % ip_address) @receiver.reopen end
transit_to_terminate()
click to toggle source
Close receiver socket.
# File lib/pione/agent/notification-listener.rb, line 58 def transit_to_terminate # kill threads of notification handler @notification_handlers.list.each {|thread| thread.kill.join} # close socket @receiver.close unless @receiver.closed? end
Private Instance Methods
handle_notification(transmitter_host, message)
click to toggle source
Notify the notification message to recipients.
# File lib/pione/agent/notification-listener.rb, line 72 def handle_notification(transmitter_host, message) # notify the message to recepients bad_recipients = @model.recipients.each_with_object([]) do |uri, bad_recipients| begin Timeout.timeout(3) {DRb::DRbObject.new_with_uri(uri).notify(message)} rescue Timeout::Error, DRb::DRbConnError, DRbPatch::ReplyReaderError => e Log::Debug.notification("Notification recipient %s disconnected: %s" % [uri, e.message]) bad_recipients << uri end end # delete bad recipients @model.delete_recipient(*bad_recipients) end