class Tengine::Core::HeartbeatWatcher

Public Class Methods

new(argv) click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 20
def initialize argv
  @uuid = UUID.new.generate
  @config = Tengine::Core::Config::HeartbeatWatcher.parse argv
  @daemonize_options = {
    :app_name => 'tengine_heartbeat_watcher',
    :ARGV => [@config[:action]],
    :ontop => !@config[:process][:daemon],
    :multiple => true,
    :dir_mode => :normal,
    :dir => File.expand_path(@config[:process][:pid_dir]),
  }
  Tengine::Core::MethodTraceable.disabled = !@config[:verbose]
rescue Exception
  puts "[#{$!.class.name}] #{$!.message}\n  " << $!.backtrace.join("\n  ")
  raise
end

Public Instance Methods

pid() click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 37
def pid
  @pid ||= sprintf "process:%s/%d", ENV["MM_SERVER_NAME"], Process.pid
end
run(__file__) click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 81
def run(__file__)
  case @config[:action].to_sym
  when :start
    start_daemon(__file__)
  when :stop
    stop_daemon(__file__)
  when :restart
    stop_daemon(__file__)
    start_daemon(__file__)
  end
end
search_for_invalid_heartbeat() { |i| ... } click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 68
def search_for_invalid_heartbeat
  t = Time.now
  a = @config[:heartbeat].to_hash.each_pair.map do |e, h|
    Tengine::Core::Event.where(
      :event_type_name => "#{e}.heartbeat.tengine",
      :occurred_at.lte => t - h[:expire]
      )
  end
  a.flatten.each_next_tick do |i|
    yield i if i
  end
end
send_invalidate_event(type, e0) click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 54
def send_invalidate_event type, e0
  obj = e0.as_document.symbolize_keys
  Tengine.logger.info "Heartbeat expiration detected! for #{e0.event_type_name} of #{e0.source_name}: last seen #{e0.occurred_at} (#{(Time.now - e0.occurred_at).to_f} secs before)"
  obj.delete :_id
  obj.delete :confirmed
  obj.delete :updated_at
  obj.delete :created_at
  obj.delete :lock_version
  obj[:event_type_name] = type
  obj[:level] = Tengine::Event::LEVELS_INV[:error]
  e1 = Tengine::Event.new obj
  sender.fire e1, :keep_connection => true
end
send_last_event() click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 45
def send_last_event
  sender.fire "finished.process.hbw.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :info, :keep_connection => true
  sender.stop
end
send_periodic_event() click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 50
def send_periodic_event
  sender.fire "hbw.heartbeat.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :debug, :keep_connection => true, :retry_count => 0
end
sender() click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 41
def sender
  @sender ||= Tengine::Event::Sender.new Tengine::Mq::Suite.new(@config[:event_queue])
end
shutdown() click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 107
def shutdown
  EM.run do
    EM.cancel_timer @invalidate if @invalidate
    EM.cancel_timer @periodic if @periodic
    send_last_event
  end
end
start() click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 115
def start
  @config.setup_loggers

  Mongoid.configure do |c|
    c.send :load_configuration, @config[:db]
  end

  require 'amqp'
  Mongoid.logger = AMQP::Session.logger = Tengine.logger

  EM.run do
    sender.wait_for_connection do
      @invalidate = EM.add_periodic_timer 1 do # !!! MAGIC NUMBER
        search_for_invalid_heartbeat do |obj|
          type = case obj.event_type_name
                 when /job|core|hbw|resourcew|atd/ then
                   "expired.#$&.heartbeat.tengine"
                 end
          EM.next_tick do
            send_invalidate_event type, obj
          end
        end
      end
      int = @config[:heartbeat][:hbw][:interval].to_i
      if int and int > 0
        @periodic = EM.add_periodic_timer int do
          send_periodic_event
        end
      end
    end
  end
end
start_daemon(__file__) click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 93
def start_daemon(__file__)
  pdir = File.expand_path @config[:process][:pid_dir]
  fname = File.basename __file__
  cwd = Dir.getwd
  Daemons.run_proc(fname, @daemonize_options) do
    Dir.chdir(cwd) { self.start }
  end
end
stop_daemon(__file__) click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 102
def stop_daemon(__file__)
  fname = File.basename __file__
  Daemons.run_proc(fname, @daemonize_options)
end