class Tengine::Core::HeartbeatWatcher
Public Class Methods
new(argv)
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 20 def initialize argv @uuid = UUID.new.generate @config = Tengine::Core::Config::HeartbeatWatcher.parse argv @daemonize_options = { :app_name => 'tengine_heartbeat_watcher', :ARGV => [@config[:action]], :ontop => !@config[:process][:daemon], :multiple => true, :dir_mode => :normal, :dir => File.expand_path(@config[:process][:pid_dir]), } Tengine::Core::MethodTraceable.disabled = !@config[:verbose] rescue Exception puts "[#{$!.class.name}] #{$!.message}\n " << $!.backtrace.join("\n ") raise end
Public Instance Methods
pid()
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 37 def pid @pid ||= sprintf "process:%s/%d", ENV["MM_SERVER_NAME"], Process.pid end
run(__file__)
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 81 def run(__file__) case @config[:action].to_sym when :start start_daemon(__file__) when :stop stop_daemon(__file__) when :restart stop_daemon(__file__) start_daemon(__file__) end end
search_for_invalid_heartbeat() { |i| ... }
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 68 def search_for_invalid_heartbeat t = Time.now a = @config[:heartbeat].to_hash.each_pair.map do |e, h| Tengine::Core::Event.where( :event_type_name => "#{e}.heartbeat.tengine", :occurred_at.lte => t - h[:expire] ) end a.flatten.each_next_tick do |i| yield i if i end end
send_invalidate_event(type, e0)
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 54 def send_invalidate_event type, e0 obj = e0.as_document.symbolize_keys Tengine.logger.info "Heartbeat expiration detected! for #{e0.event_type_name} of #{e0.source_name}: last seen #{e0.occurred_at} (#{(Time.now - e0.occurred_at).to_f} secs before)" obj.delete :_id obj.delete :confirmed obj.delete :updated_at obj.delete :created_at obj.delete :lock_version obj[:event_type_name] = type obj[:level] = Tengine::Event::LEVELS_INV[:error] e1 = Tengine::Event.new obj sender.fire e1, :keep_connection => true end
send_last_event()
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 45 def send_last_event sender.fire "finished.process.hbw.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :info, :keep_connection => true sender.stop end
send_periodic_event()
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 50 def send_periodic_event sender.fire "hbw.heartbeat.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :debug, :keep_connection => true, :retry_count => 0 end
sender()
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 41 def sender @sender ||= Tengine::Event::Sender.new Tengine::Mq::Suite.new(@config[:event_queue]) end
shutdown()
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 107 def shutdown EM.run do EM.cancel_timer @invalidate if @invalidate EM.cancel_timer @periodic if @periodic send_last_event end end
start()
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 115 def start @config.setup_loggers Mongoid.configure do |c| c.send :load_configuration, @config[:db] end require 'amqp' Mongoid.logger = AMQP::Session.logger = Tengine.logger EM.run do sender.wait_for_connection do @invalidate = EM.add_periodic_timer 1 do # !!! MAGIC NUMBER search_for_invalid_heartbeat do |obj| type = case obj.event_type_name when /job|core|hbw|resourcew|atd/ then "expired.#$&.heartbeat.tengine" end EM.next_tick do send_invalidate_event type, obj end end end int = @config[:heartbeat][:hbw][:interval].to_i if int and int > 0 @periodic = EM.add_periodic_timer int do send_periodic_event end end end end end
start_daemon(__file__)
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 93 def start_daemon(__file__) pdir = File.expand_path @config[:process][:pid_dir] fname = File.basename __file__ cwd = Dir.getwd Daemons.run_proc(fname, @daemonize_options) do Dir.chdir(cwd) { self.start } end end
stop_daemon(__file__)
click to toggle source
# File lib/tengine/core/heartbeat_watcher.rb, line 102 def stop_daemon(__file__) fname = File.basename __file__ Daemons.run_proc(fname, @daemonize_options) end