class PDO::Worker

Public Class Methods

new(tasks, thread_num) click to toggle source
# File lib/pdo.rb, line 10
def initialize(tasks, thread_num)
  @tasks = tasks
  @thread_num = thread_num
end

Public Instance Methods

execute(task) click to toggle source
# File lib/pdo.rb, line 62
def execute(task)
# execute forks a child process to run the task, then saves the
# output from the child process into a thread local variable, marks
# the :new_data tag.

  raise "no task" if task.nil?

  target, cmd = task

  open("|-") do |child_io|
    if child_io
      # The waitpid statement below causes the program to hang when
      # running some commands, such as wget.
      # waitpid child_io.pid
      Thread.current[:output] = child_io.readlines
      Thread.current[:new_data] = true
      Thread.current[:target] = target
    else
      STDIN.close
      STDERR.reopen(STDOUT)
      exec(*cmd)
    end
  end

end
run() click to toggle source
# File lib/pdo.rb, line 15
def run
# run first spawns a set of threads, tells them to execute the task
# queue. it then loop through the current Thread list (note
# Thread.list returns all running and sleeping threads), and outputs
# those new data.
# for the spawned threads, they execute one task, then wait for their
# data to be picked up by the main thread, then do the next task.

  return 1 if @tasks.size == 0

  n = @thread_num < @tasks.size ? @thread_num : @tasks.size
  1.upto(n) do
    Thread.new do
      while true do
        logger.info "#{Thread.current.object_id} started."
        begin
          execute(@tasks.next)
          # presumably new data is ready, thus i stop.
          # main thread will read :output and then wakes me up.
          Thread.stop
        rescue
          logger.info "No more task for #{Thread.current.object_id}."
          break
        end
      end
    end
  end

  loop do
    # break because the only left thread must be the main thread.
    break if Thread.list.size == 1

    Thread.list.each do |t|
      next if t == Thread.main
      if t.key? :output and t.key? :new_data and t[:new_data] then
        logger.info "#{t.object_id} finished."
        puts "=== #{t[:target]} ==="
        t[:output].each {|x| puts x}
        # puts t[:output].join('').gsub("\n", ' | ').chomp(' | ')
        t[:new_data] = false
        # wakes up the thread
        t.run
      end
    end
  end
end