class Pione::Agent::NotificationListener

Attributes

notification_handlers[R]

notification handler threads

Public Class Methods

new(model, uri) click to toggle source

@param model [NotificationListenerModel]

notification-listner model

@param uri [URI]

listening port
Calls superclass method Pione::Agent::BasicAgent::new
# File lib/pione/agent/notification-listener.rb, line 15
def initialize(model, uri)
  super()
  @uri = uri
  @model = model
  @notification_handlers = ThreadGroup.new
  @lock = Mutex.new
end

Public Instance Methods

transit_to_init() click to toggle source

Initialize the agent.

# File lib/pione/agent/notification-listener.rb, line 37
def transit_to_init
  Log::SystemLog.info('Notification listener starts listening notification messages on "%s".' % @uri)
  @receiver = Notification::Receiver.new(@uri)
end
transit_to_receive() click to toggle source

Receive notification messages and make a message handler thread.

# File lib/pione/agent/notification-listener.rb, line 43
def transit_to_receive
  # receive a notification
  transmitter_host, message = @receiver.receive

  # handle the notification in new thread
  thread = Util::FreeThreadGenerator.generate do
    handle_notification(transmitter_host, message)
  end
  @notification_handlers.add(thread)
rescue StandardError => e
  Log::Debug.notification("Receiver agent has received bad data from %s, so it ignored and reopen socket." % ip_address)
  @receiver.reopen
end
transit_to_terminate() click to toggle source

Close receiver socket.

# File lib/pione/agent/notification-listener.rb, line 58
def transit_to_terminate
  # kill threads of notification handler
  @notification_handlers.list.each {|thread| thread.kill.join}
  # close socket
  @receiver.close unless @receiver.closed?
end

Private Instance Methods

handle_notification(transmitter_host, message) click to toggle source

Notify the notification message to recipients.

# File lib/pione/agent/notification-listener.rb, line 72
def handle_notification(transmitter_host, message)
  # notify the message to recepients
  bad_recipients = @model.recipients.each_with_object([]) do |uri, bad_recipients|
    begin
      Timeout.timeout(3) {DRb::DRbObject.new_with_uri(uri).notify(message)}
    rescue Timeout::Error, DRb::DRbConnError, DRbPatch::ReplyReaderError => e
      Log::Debug.notification("Notification recipient %s disconnected: %s" % [uri, e.message])
      bad_recipients << uri
    end
  end

  # delete bad recipients
  @model.delete_recipient(*bad_recipients)
end