class Resqued::ListenerProxy
Controls a listener process from the master process.
Attributes
state[R]
Public Class Methods
new(state)
click to toggle source
Public.
# File lib/resqued/listener_proxy.rb, line 13 def initialize(state) @state = state end
Public Instance Methods
dispose()
click to toggle source
Public: wrap up all the things, this object is going home.
# File lib/resqued/listener_proxy.rb, line 20 def dispose if @state.master_socket @state.master_socket.close @state.master_socket = nil end end
kill(signal)
click to toggle source
Public: Stop the listener process.
# File lib/resqued/listener_proxy.rb, line 58 def kill(signal) log "kill -#{signal} #{pid}" Process.kill(signal.to_s, pid) end
pid()
click to toggle source
Public: The pid of the running listener process.
# File lib/resqued/listener_proxy.rb, line 33 def pid @state.pid end
read_pipe()
click to toggle source
Public: An IO to select on to check if there is incoming data available.
# File lib/resqued/listener_proxy.rb, line 28 def read_pipe @state.master_socket end
read_worker_status(options)
click to toggle source
Public: Check for updates on running worker information.
# File lib/resqued/listener_proxy.rb, line 74 def read_worker_status(options) on_activity = options[:on_activity] until @state.master_socket.nil? IO.select([@state.master_socket], nil, nil, 0) or return case line = @state.master_socket.readline when /^\+(\d+),(.*)$/ worker_pids[$1] = $2 on_activity&.worker_started($1) when /^-(\d+)$/ worker_pids.delete($1) on_activity&.worker_finished($1) when /^RUNNING/ on_activity&.listener_running(self) when "" break else log "Malformed data from listener: #{line.inspect}" end end rescue EOFError, Errno::ECONNRESET @state.master_socket.close @state.master_socket = nil end
run()
click to toggle source
Public: Start the listener process.
# File lib/resqued/listener_proxy.rb, line 38 def run return if pid listener_socket, master_socket = UNIXSocket.pair if @state.pid = fork # master listener_socket.close master_socket.close_on_exec = true log "Started listener #{@state.pid}" @state.master_socket = master_socket else # listener master_socket.close Master::TRAPS.each { |signal| trap(signal, "DEFAULT") rescue nil } Listener.new(@state.options.merge(socket: listener_socket)).exec exit end end
running_workers()
click to toggle source
Public: Get the list of workers running from this listener.
# File lib/resqued/listener_proxy.rb, line 64 def running_workers worker_pids.map { |pid, queue_key| { pid: pid, queue_key: queue_key } } end
worker_finished(pid)
click to toggle source
Public: Tell the listener process that a worker finished.
# File lib/resqued/listener_proxy.rb, line 99 def worker_finished(pid) return if @state.master_socket.nil? @state.master_socket.write_nonblock("#{pid}\n") rescue IO::WaitWritable log "Couldn't tell #{@state.pid} that #{pid} exited!" # Ignore it, maybe the next time it'll work. rescue Errno::EPIPE @state.master_socket.close @state.master_socket = nil end
worker_pids()
click to toggle source
Private: Map worker pids to queue names
# File lib/resqued/listener_proxy.rb, line 69 def worker_pids @state.worker_pids ||= {} end