class PortAuthority::Agent
Scaffolding class for agents
Public Class Methods
Common agent process init. Contains configuration load, common signal responses and runtime variables init. Implements execution of actual agents via +run+ method. Also handles any uncaught exceptions.
# File lib/port-authority/agent.rb, line 16 def initialize Thread.current[:name] = 'main' # name main thread @@_exit = false # prepare exit flag @@_semaphores = { log: Mutex.new } # init semaphores @@_threads = {} # init threads Signal.trap('INT') { exit!(1) } # end immediatelly Signal.trap('TERM') { end! } # end gracefully Config.load! || exit!(1) # load config or die begin # all-wrapping exception ;) run # hook to child class rescue StandardError => e Logger.alert "UNCAUGHT EXCEPTION IN THREAD main! Dying! X.X" Logger.alert [' ', "#{e.class}:", e.message].join(' ') e.backtrace.each {|line| Logger.debug " #{line}"} exit! 1 end end
Public Instance Methods
Raise the exit flag
# File lib/port-authority/agent.rb, line 81 def end! @@_exit = true end
Has the exit flag been raised?
# File lib/port-authority/agent.rb, line 76 def exit? @@_exit end
Return hostname.
# File lib/port-authority/agent.rb, line 149 def hostname @hostname ||= Socket.gethostname end
Create a named +Mutex+ semaphore
# File lib/port-authority/agent.rb, line 86 def sem_create(name) @@_semaphores.merge!(Hash[name.to_sym], Mutex.new) end
Setup the agent process. Initializes logging, system process parameters, daemonizing.
There are 4 optional parameters:
:name
-
+String+
Agent
name. Defaults to +self.class.downcase+ of the child agent :root
-
+Bool+ Require to be ran as root. Defaults to +false+.
:daemonize
-
+Bool+ Daemonize the process. Defaults to +false+.
:nice
-
+Int+ nice of the process. Defaults to +0+
# File lib/port-authority/agent.rb, line 43 def setup(args = {}) name = args[:name] || self.class.to_s.downcase.split('::').last args[:root] ||= false args[:daemonize] ||= false args[:nice] ||= 0 Logger.init! @@_semaphores[:log], name Logger.info 'Starting main thread' Logger.debug 'Setting process name' if RUBY_VERSION >= '2.1' Process.setproctitle("pa-#{name}-agent") else $0 = "pa-#{name}-agent" end if args[:root] && Process.uid != 0 Logger.alert 'Must run under root user!' exit! 1 end Logger.debug 'Setting CPU nice level' Process.setpriority(Process::PRIO_PROCESS, 0, args[:nice]) if args[:daemonize] Logger.info 'Daemonizing process' if RUBY_VERSION < '1.9' exit if fork Process.setsid exit if fork Dir.chdir('/') else Process.daemon end end end
Create a named +Thread+ with its +Mutex+ semaphore. The definition includes +&block+ of code that should run within the thread.
The method requires 3 parameters:
name
-
+Symbol+ Thread/Mutex name.
interval
-
+Integer+ Thread loop interval.
- +&block+
-
+Proc+ Block of code to run.
# File lib/port-authority/agent.rb, line 98 def thr_create(name, interval, &block) @@_semaphores.merge!(Hash[name.to_sym, Mutex.new]) @@_threads.merge!(Hash[name.to_sym, Thread.new do Thread.current[:name] = name.to_s Logger.info "Starting thread #{Thread.current[:name]}" begin until exit? yield block sleep interval end Logger.info "Ending thread #{Thread.current[:name]}" rescue StandardError => e Logger.alert "UNCAUGHT EXCEPTION IN THREAD #{Thread.current[:name]}" Logger.alert [' ', "#{e.class}:", e.message].join(' ') e.backtrace.each {|line| Logger.debug " #{line}"} end! end end ]) end
Run thread-safe code. The +name+ parameter can be omitted when used from within a block of thread code. In this case the Mutex with the same +:name+ will be used.
The method accepts following parameters:
name
-
+Symbol+ Mutex name.
- +&block+
-
+Proc+ Block of code to run.
# File lib/port-authority/agent.rb, line 127 def thr_safe(name=Thread.current[:name].to_sym, &block) @@_semaphores[name.to_sym].synchronize do yield block end end
Start named thread. If the name is omitted, applies to all spawned threads ;)
# File lib/port-authority/agent.rb, line 135 def thr_start(name=nil) return @@_threads[name].run if name @@_threads.each_value(&:run) end
Wait for named thread to finish. If the name is omitted, applies to all spawned threads ;)
# File lib/port-authority/agent.rb, line 143 def thr_wait(name=nil) return @@_threads[name].join if name @@_threads.each_value(&:join) end