class Emque::Consuming::Runner
Attributes
instance[RW]
control[RW]
options[RW]
persist[RW]
pid[RW]
pidfile[RW]
receivers[RW]
status[RW]
Public Class Methods
new(options = {})
click to toggle source
# File lib/emque/consuming/runner.rb, line 20 def initialize(options = {}) self.control = Emque::Consuming::Control.new self.options = options self.receivers = [] self.status = Emque::Consuming::Status.new apply_options Emque::Consuming .application .initialize_logger(daemonized: options.fetch(:daemon) { false }) self.class.instance = self self.pidfile = options.fetch(:pidfile, default_pidfile) self.pid = Emque::Consuming::Pidfile.new(pidfile) end
Public Instance Methods
app()
click to toggle source
Calls superclass method
Emque::Consuming::Helpers#app
# File lib/emque/consuming/runner.rb, line 34 def app super end
console()
click to toggle source
# File lib/emque/consuming/runner.rb, line 38 def console require "pry" Pry.start end
http?()
click to toggle source
# File lib/emque/consuming/runner.rb, line 43 def http? config.status == :on end
phased_restart()
click to toggle source
# File lib/emque/consuming/runner.rb, line 47 def phased_restart receivers.each { |r| r.stop && r.start } end
restart()
click to toggle source
# File lib/emque/consuming/runner.rb, line 51 def restart stop && start end
restart_application()
click to toggle source
# File lib/emque/consuming/runner.rb, line 55 def restart_application receivers.first.restart end
sock?()
click to toggle source
# File lib/emque/consuming/runner.rb, line 59 def sock? true end
start()
click to toggle source
# File lib/emque/consuming/runner.rb, line 63 def start exit_if_already_running! daemonize! if daemonize? write_pidfile! @persist = Thread.new { loop { sleep 1 } } set_process_title setup_receivers receivers.each(&:start) persist.join rescue Interrupt stop end
stop(timeout: 5)
click to toggle source
# File lib/emque/consuming/runner.rb, line 76 def stop(timeout: 5) if persist Thread.new do sleep timeout logger.error("Timeout Exceeded. Forcing Shutdown.") persist.exit if persist.alive? end receivers.each(&:stop) logger.info("Graceful shutdown successful.") logger.info("#{config.app_name.capitalize} stopped.") persist.exit if persist.alive? else Emque::Consuming::Transmitter.send( :command => :stop, :socket_path => config.socket_path ) end end
Private Instance Methods
apply_options()
click to toggle source
# File lib/emque/consuming/runner.rb, line 100 def apply_options options.each do |attr, val| config.send("#{attr}=", val) if config.respond_to?(attr) end end
config()
click to toggle source
# File lib/emque/consuming/runner.rb, line 106 def config Emque::Consuming.application.config end
daemonize!()
click to toggle source
# File lib/emque/consuming/runner.rb, line 114 def daemonize! Process.daemon(true, true) [$stdout, $stderr].each do |io| File.open(Emque::Consuming.application.logfile, "ab") do |f| io.reopen(f) end io.sync = true end $stdin.reopen("/dev/null") end
daemonize?()
click to toggle source
# File lib/emque/consuming/runner.rb, line 110 def daemonize? options[:daemon] end
default_pidfile()
click to toggle source
# File lib/emque/consuming/runner.rb, line 127 def default_pidfile File.join( Emque::Consuming.application.root, "tmp", "pids", "#{config.app_name}.pid" ) end
exit_if_already_running!()
click to toggle source
# File lib/emque/consuming/runner.rb, line 136 def exit_if_already_running! if pid.running? [ "Pid file exists. Process #{pid} active.", "Please ensure app is not running." ].each do |msg| logger.error(msg) $stdout.puts(msg) end exit end end
set_process_title()
click to toggle source
# File lib/emque/consuming/runner.rb, line 150 def set_process_title title = "#{config.app_name} [pidfile: #{pidfile}" title << " | unix socket: #{config.socket_path}" if sock? title << " | http://#{config.status_host}:#{config.status_port}" if http? title << "]" $0 = title end
setup_receivers()
click to toggle source
# File lib/emque/consuming/runner.rb, line 158 def setup_receivers receivers << app receivers << Emque::Consuming::CommandReceivers::UnixSocket.new if sock? receivers << Emque::Consuming::CommandReceivers::HttpServer.new if http? end
write_pidfile!()
click to toggle source
# File lib/emque/consuming/runner.rb, line 164 def write_pidfile! pid.write at_exit { FileUtils.rm_f(pidfile) } end