class Tengine::Core::Scheduler

Public Class Methods

new(argv) click to toggle source
# File lib/tengine/core/scheduler.rb, line 22
def initialize argv
  @uuid = UUID.new.generate
  @config = Tengine::Core::Config::Atd.parse argv
  @daemonize_options = {
    :app_name => 'tengine_atd',
    :ARGV => [@config[:action]],
    :ontop => !@config[:process][:daemon],
    :multiple => true,
    :dir_mode => :normal,
    :dir => File.expand_path(@config[:process][:pid_dir]),
  }

  Tengine::Core::MethodTraceable.disabled = !@config[:verbose]
rescue Exception
  puts "[#{$!.class.name}] #{$!.message}\n  " << $!.backtrace.join("\n  ")
  raise
end

Public Instance Methods

mark_schedule_done(sched) click to toggle source
# File lib/tengine/core/scheduler.rb, line 62
def mark_schedule_done sched
  # 複数のマシンで複数のatdが複数動いている可能性があり、その場合には複数の
  # atdが同時に同じエントリに更新をかける可能性はとても高い。そのような状況
  # でもエラーになってはいけない。
  Tengine::Core::Schedule.with(
    safe: safemode(Tengine::Core::Schedule.collection)
  ).where(
    :_id => sched.id,
    :status => Tengine::Core::Schedule::SCHEDULED
  ).update_all(
    :status => Tengine::Core::Schedule::FIRED
  )
end
pid() click to toggle source
# File lib/tengine/core/scheduler.rb, line 44
def pid
  @pid ||= sprintf "process:%s/%d", ENV["MM_SERVER_NAME"], Process.pid
end
run(__file__) click to toggle source
# File lib/tengine/core/scheduler.rb, line 85
def run(__file__)
  case @config[:action].to_sym
  when :start
    start_daemon(__file__)
  when :stop
    stop_daemon(__file__)
  when :restart
    stop_daemon(__file__)
    start_daemon(__file__)
  end
end
search_for_schedule() { |i| ... } click to toggle source
# File lib/tengine/core/scheduler.rb, line 76
def search_for_schedule
  Tengine::Core::Schedule.where(
    :scheduled_at.lte => Time.now,
    :status => Tengine::Core::Schedule::SCHEDULED
  ).each_next_tick do |i|
    yield i
  end
end
send_last_event() click to toggle source
# File lib/tengine/core/scheduler.rb, line 48
def send_last_event
  sender.fire "finished.process.atd.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :info, :keep_connection => true
  sender.stop
end
send_periodic_event() click to toggle source
# File lib/tengine/core/scheduler.rb, line 53
def send_periodic_event
  sender.fire "atd.heartbeat.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :debug, :keep_connection => true, :retry_count => 0
end
send_scheduled_event(sched) click to toggle source
# File lib/tengine/core/scheduler.rb, line 57
def send_scheduled_event sched
  Tengine.logger.info "Scheduled time (#{sched.scheduled_at}) has come.  Now firing #{sched.event_type_name} for #{sched.source_name}"
  sender.fire sched.event_type_name, :key => sched._id, :source_name => sched.source_name, :sender_name => pid, :occurred_at => Time.now, :level_key => :info, :keep_connection => true, :properties => sched.properties
end
sender() click to toggle source
# File lib/tengine/core/scheduler.rb, line 40
def sender
  @sender ||= Tengine::Event::Sender.new Tengine::Mq::Suite.new(@config[:event_queue])
end
shutdown() click to toggle source
# File lib/tengine/core/scheduler.rb, line 112
def shutdown
  EM.run do
    EM.cancel_timer @periodic if @periodic
    send_last_event
  end
end
start() click to toggle source
# File lib/tengine/core/scheduler.rb, line 119
def start
  @config.setup_loggers

  Mongoid.configure do |c|
    c.send :load_configuration, @config[:db]
  end

  require 'amqp'
  Mongoid.logger = AMQP::Session.logger = Tengine.logger

  EM.run do
    sender.wait_for_connection do
      @invalidate = EM.add_periodic_timer 1 do # !!! MAGIC NUMBER
        search_for_schedule do |sched|
          send_scheduled_event sched
          mark_schedule_done sched
        end
      end
      int = @config[:heartbeat][:atd][:interval].to_i
      if int and int > 0
        @periodic = EM.add_periodic_timer int do
          send_periodic_event
        end
      end
    end
  end
end
start_daemon(__file__) click to toggle source
# File lib/tengine/core/scheduler.rb, line 97
def start_daemon(__file__)
  pdir = File.expand_path @config[:process][:pid_dir]
  fname = File.basename __file__
  cwd = Dir.getwd
  #    Daemons.run_proc(fname, :ARGV => [@config[:action]], :multiple => true, :ontop => !@config[:process][:daemon], :dir_mode => :normal, :dir => pdir) do
  Daemons.run_proc(fname, @daemonize_options) do
    Dir.chdir(cwd) { self.start }
  end
end
stop_daemon(__file__) click to toggle source
# File lib/tengine/core/scheduler.rb, line 107
def stop_daemon(__file__)
  fname = File.basename __file__
  Daemons.run_proc(fname, @daemonize_options)
end