class Resqued::ListenerProxy

Controls a listener process from the master process.

Attributes

state[R]

Public Class Methods

new(state) click to toggle source

Public.

# File lib/resqued/listener_proxy.rb, line 13
def initialize(state)
  @state = state
end

Public Instance Methods

dispose() click to toggle source

Public: wrap up all the things, this object is going home.

# File lib/resqued/listener_proxy.rb, line 20
def dispose
  if @state.master_socket
    @state.master_socket.close
    @state.master_socket = nil
  end
end
kill(signal) click to toggle source

Public: Stop the listener process.

# File lib/resqued/listener_proxy.rb, line 58
def kill(signal)
  log "kill -#{signal} #{pid}"
  Process.kill(signal.to_s, pid)
end
pid() click to toggle source

Public: The pid of the running listener process.

# File lib/resqued/listener_proxy.rb, line 33
def pid
  @state.pid
end
read_pipe() click to toggle source

Public: An IO to select on to check if there is incoming data available.

# File lib/resqued/listener_proxy.rb, line 28
def read_pipe
  @state.master_socket
end
read_worker_status(options) click to toggle source

Public: Check for updates on running worker information.

# File lib/resqued/listener_proxy.rb, line 74
def read_worker_status(options)
  on_activity = options[:on_activity]
  until @state.master_socket.nil?
    IO.select([@state.master_socket], nil, nil, 0) or return
    case line = @state.master_socket.readline
    when /^\+(\d+),(.*)$/
      worker_pids[$1] = $2
      on_activity&.worker_started($1)
    when /^-(\d+)$/
      worker_pids.delete($1)
      on_activity&.worker_finished($1)
    when /^RUNNING/
      on_activity&.listener_running(self)
    when ""
      break
    else
      log "Malformed data from listener: #{line.inspect}"
    end
  end
rescue EOFError, Errno::ECONNRESET
  @state.master_socket.close
  @state.master_socket = nil
end
run() click to toggle source

Public: Start the listener process.

# File lib/resqued/listener_proxy.rb, line 38
def run
  return if pid

  listener_socket, master_socket = UNIXSocket.pair
  if @state.pid = fork
    # master
    listener_socket.close
    master_socket.close_on_exec = true
    log "Started listener #{@state.pid}"
    @state.master_socket = master_socket
  else
    # listener
    master_socket.close
    Master::TRAPS.each { |signal| trap(signal, "DEFAULT") rescue nil }
    Listener.new(@state.options.merge(socket: listener_socket)).exec
    exit
  end
end
running_workers() click to toggle source

Public: Get the list of workers running from this listener.

# File lib/resqued/listener_proxy.rb, line 64
def running_workers
  worker_pids.map { |pid, queue_key| { pid: pid, queue_key: queue_key } }
end
worker_finished(pid) click to toggle source

Public: Tell the listener process that a worker finished.

# File lib/resqued/listener_proxy.rb, line 99
def worker_finished(pid)
  return if @state.master_socket.nil?

  @state.master_socket.write_nonblock("#{pid}\n")
rescue IO::WaitWritable
  log "Couldn't tell #{@state.pid} that #{pid} exited!"
  # Ignore it, maybe the next time it'll work.
rescue Errno::EPIPE
  @state.master_socket.close
  @state.master_socket = nil
end
worker_pids() click to toggle source

Private: Map worker pids to queue names

# File lib/resqued/listener_proxy.rb, line 69
def worker_pids
  @state.worker_pids ||= {}
end