class EventHub::ActorHeartbeat
Heartbeat class
Public Class Methods
new(processor_instance)
click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 9 def initialize(processor_instance) @processor_instance = processor_instance async.start end
Public Instance Methods
cleanup()
click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 27 def cleanup EventHub.logger.info("Heartbeat is cleaning up...") publish(heartbeat(action: "stopped")) EventHub.logger.info("Heartbeat has sent a [stopped] beat") end
start()
click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 14 def start EventHub.logger.info("Heartbeat is starting...") every(60 * 60 * 24) { EventHub.logger.info("Actual actors: #{Celluloid::Actor.all.size}: #{Celluloid::Actor.all.map { |a| a.class }.join(", ")}") } publish(heartbeat(action: "started")) EventHub.logger.info("Heartbeat has sent [started] beat") loop do sleep Configuration.processor[:heartbeat_cycle_in_s] publish(heartbeat) end end
Private Instance Methods
addresses()
click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 91 def addresses interfaces = Socket.getifaddrs.select { |interface| !interface.addr.ipv4_loopback? && !interface.addr.ipv6_loopback? } interfaces.map { |interface| begin { interface: interface.name, host_name: Socket.gethostname, ip_address: interface.addr.ip_address } rescue nil # will be ignored end }.compact end
heartbeat(args = {action: "running"})
click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 52 def heartbeat(args = {action: "running"}) message = EventHub::Message.new message.origin_module_id = EventHub::Configuration.name message.origin_type = "processor" message.origin_site_id = "global" message.process_name = "event_hub.heartbeat" now = Time.now # message structure needs more changes message.body = { version: @processor_instance.send(:version), action: args[:action], pid: Process.pid, process_name: "event_hub.heartbeat", heartbeat: { started: now_stamp(started_at), stamp_last_beat: now_stamp(now), uptime_in_ms: (now - started_at) * 1000, heartbeat_cycle_in_ms: Configuration.processor[:heartbeat_cycle_in_s] * 1000, queues_consuming_from: EventHub::Configuration.processor[:listener_queues], queues_publishing_to: [EventHub::EH_X_INBOUND], # needs more dynamic in the future host: Socket.gethostname, addresses: addresses, messages: messages_statistics } } message.to_json end
messages_statistics()
click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 109 def messages_statistics { total: statistics.messages_total, successful: statistics.messages_successful, unsuccessful: statistics.messages_unsuccessful, average_size: statistics.messages_average_size, average_process_time_in_ms: statistics.messages_average_process_time * 1000, total_process_time_in_ms: statistics.messages_total_process_time * 1000 } end
publish(message)
click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 35 def publish(message) connection = create_bunny_connection connection.start channel = connection.create_channel channel.confirm_select exchange = channel.direct(EventHub::EH_X_INBOUND, durable: true) exchange.publish(message, persistent: true) success = channel.wait_for_confirms unless success raise "Published heartbeat message has "\ "not been confirmed by the server" end ensure connection&.close end
started_at()
click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 83 def started_at @processor_instance.started_at end
statistics()
click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 87 def statistics @processor_instance.statistics end