class Backburner::Workers::ThreadsOnFork

Attributes

garbage_after[RW]
is_child[RW]
shutdown[RW]
threads_number[RW]

Public Class Methods

child_pids() click to toggle source

return the pids of all alive children/forks

# File lib/backburner/workers/threads_on_fork.rb, line 11
def child_pids
  return [] if is_child
  @child_pids ||= []
  tmp_ids = []
  for id in @child_pids
    next if id.to_i == Process.pid
    begin
      Process.kill(0, id)
      tmp_ids << id
    rescue Errno::ESRCH
    end
  end
  @child_pids = tmp_ids if @child_pids != tmp_ids
  @child_pids
end
finish_forks() click to toggle source
# File lib/backburner/workers/threads_on_fork.rb, line 51
def finish_forks
  return if is_child

  ids = child_pids
  if ids.length > 0
    puts "[ThreadsOnFork workers] Stopping forks: #{ids.join(", ")}"
    stop_forks
    Kernel.sleep 1
    ids = child_pids
    if ids.length > 0
      puts "[ThreadsOnFork workers] Killing remaining forks: #{ids.join(", ")}"
      kill_forks
      Process.waitall
    end
  end
end
kill_forks() click to toggle source

Send a SIGKILL signal to all children This is the same of assassinate We are KILLING those folks that don't obey us

# File lib/backburner/workers/threads_on_fork.rb, line 42
def kill_forks
  for id in child_pids
    begin
      Process.kill("SIGKILL", id)
    rescue Errno::ESRCH
    end
  end
end
new(*args) click to toggle source

Custom initializer just to set @tubes_data

Calls superclass method Backburner::Worker::new
# File lib/backburner/workers/threads_on_fork.rb, line 70
def initialize(*args)
  @tubes_data = {}
  super
  self.process_tube_options
end
stop_forks() click to toggle source

Send a SIGTERM signal to all children This is the same of a normal exit We are simply asking the children to exit

# File lib/backburner/workers/threads_on_fork.rb, line 30
def stop_forks
  for id in child_pids
    begin
      Process.kill("SIGTERM", id)
    rescue Errno::ESRCH
    end
  end
end

Public Instance Methods

coolest_exit() click to toggle source

Exit with Kernel.exit! to avoid at_exit callbacks that should belongs to parent process We will use exitcode 99 that means the fork reached the garbage number

# File lib/backburner/workers/threads_on_fork.rb, line 227
def coolest_exit
  Kernel.exit! 99
end
create_thread(*args, &block) click to toggle source

Create a thread. Easy to test

# File lib/backburner/workers/threads_on_fork.rb, line 232
def create_thread(*args, &block)
  Thread.new(*args, &block)
end
fork_and_watch(name) click to toggle source

Make the fork and create a thread to watch the child process The exit code '99' means that the fork exited because of the garbage limit Any other code is an error

# File lib/backburner/workers/threads_on_fork.rb, line 146
def fork_and_watch(name)
  create_thread(name) do |tube_name|
    until self.class.shutdown
      pid = fork_tube(tube_name)
      _, status = wait_for_process(pid)

      # 99 = garbaged
      if status.exitstatus != 99
        log_error("Catastrophic failure: tube #{tube_name} exited with code #{status.exitstatus}.")
      end
    end
  end
end
fork_inner(name) click to toggle source

Here we are already on the forked child We will watch just the selected tube and change the configuration of queue_config.max_job_retries if needed

If we limit the number of threads to 1 it will just run in a loop without creating any extra thread.

# File lib/backburner/workers/threads_on_fork.rb, line 173
def fork_inner(name)
  if @tubes_data[name]
    queue_config.max_job_retries = @tubes_data[name][:retries] if @tubes_data[name][:retries]
  else
    @tubes_data[name] = {}
  end
  @garbage_after  = @tubes_data[name][:garbage]  || self.class.garbage_after
  @threads_number = (@tubes_data[name][:threads] || self.class.threads_number || 1).to_i

  @runs = 0

  if @threads_number == 1
    watch_tube(name)
    run_while_can
  else
    threads_count = Thread.list.count
    @threads_number.times do
      create_thread do
        begin
          conn = new_connection
          watch_tube(name, conn)
          run_while_can(conn)
        ensure
          conn.close if conn
        end
      end
    end
    sleep 0.1 while Thread.list.count > threads_count
  end

  coolest_exit
end
fork_it(&blk) click to toggle source

Forks the specified block and adds the process to the child process pool FIXME: If blk.call breaks then the pid isn't added to child_pids and is never shutdown

# File lib/backburner/workers/threads_on_fork.rb, line 246
def fork_it(&blk)
  pid = Kernel.fork do
    self.class.is_child = true
    $0 = "[ThreadsOnFork worker] parent: #{Process.ppid}"
    blk.call
  end
  self.class.child_pids << pid
  pid
end
fork_tube(name) click to toggle source

This makes easy to test

# File lib/backburner/workers/threads_on_fork.rb, line 161
def fork_tube(name)
  fork_it do
    fork_inner(name)
  end
end
on_reconnect(conn) click to toggle source
# File lib/backburner/workers/threads_on_fork.rb, line 220
def on_reconnect(conn)
  watch_tube(@watching_tube, conn) if @watching_tube
end
prepare() click to toggle source
# File lib/backburner/workers/threads_on_fork.rb, line 122
def prepare
  self.tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues
  self.tube_names = Array(self.tube_names)
  tube_names.map! { |name| expand_tube_name(name)  }.uniq!
  tube_display_names = tube_names.map{|name| "#{name}:#{@tubes_data[name].values}"}
  log_info "Working #{tube_names.size} queues: [ #{tube_display_names.join(', ')} ]"
end
process_tube_names(tube_names) click to toggle source

Process the special tube_names of ThreadsOnFork worker The idea is tube_name:custom_threads_limit:custom_garbage_limit:custom_retries Any custom can be ignore. So if you want to set just the custom_retries you will need to write this 'tube_name:::10'

@example

process_tube_names(['foo:10:5:1', 'bar:2::3', 'lol'])
=> ['foo', 'bar', 'lol']
# File lib/backburner/workers/threads_on_fork.rb, line 84
def process_tube_names(tube_names)
  names = compact_tube_names(tube_names)
  if names.nil?
    nil
  else
    names.map do |name|
      data = name.split(":")
      tube_name = data.first
      threads_number = data[1].empty? ? nil : data[1].to_i rescue nil
      garbage_number = data[2].empty? ? nil : data[2].to_i rescue nil
      retries_number = data[3].empty? ? nil : data[3].to_i rescue nil
      @tubes_data[expand_tube_name(tube_name)] = {
          :threads => threads_number,
          :garbage => garbage_number,
          :retries => retries_number
      }
      tube_name
    end
  end
end
process_tube_options() click to toggle source

Process the tube settings This overrides @tubes_data set by process_tube_names method. So a tube has name 'super_job:5:20:10' and the tube class has setting queue_jobs_limit 10, the result limit will be 10 If the tube is known by existing beanstalkd queue, but not by class - skip it

# File lib/backburner/workers/threads_on_fork.rb, line 110
def process_tube_options
  Backburner::Worker.known_queue_classes.each do |queue|
    next if @tubes_data[expand_tube_name(queue)].nil?
    queue_settings = {
        :threads => queue.queue_jobs_limit,
        :garbage => queue.queue_garbage_limit,
        :retries => queue.queue_retry_limit
    }
    @tubes_data[expand_tube_name(queue)].merge!(queue_settings){|k, v1, v2| v2.nil? ? v1 : v2 }
  end
end
run_while_can(conn = connection) click to toggle source

Run work_one_job while we can

# File lib/backburner/workers/threads_on_fork.rb, line 207
def run_while_can(conn = connection)
  while @garbage_after.nil? or @garbage_after > @runs
    @runs += 1 # FIXME: Likely race condition
    work_one_job(conn)
  end
end
start(lock=true) click to toggle source

For each tube we will call fork_and_watch to create the fork The lock argument define if this method should block or no

# File lib/backburner/workers/threads_on_fork.rb, line 132
def start(lock=true)
  prepare
  tube_names.each do |name|
    fork_and_watch(name)
  end

  if lock
    sleep 0.1 while true
  end
end
wait_for_process(pid) click to toggle source

Wait for a specific process. Easy to test

# File lib/backburner/workers/threads_on_fork.rb, line 237
def wait_for_process(pid)
  out = Process.wait2(pid)
  self.class.child_pids.delete(pid)
  out
end
watch_tube(name, conn = connection) click to toggle source

Shortcut for watching a tube on our beanstalk connection

# File lib/backburner/workers/threads_on_fork.rb, line 215
def watch_tube(name, conn = connection)
  @watching_tube = name
  conn.tubes.watch!(name)
end