class PDO::Worker
Public Class Methods
new(tasks, thread_num)
click to toggle source
# File lib/pdo.rb, line 10 def initialize(tasks, thread_num) @tasks = tasks @thread_num = thread_num end
Public Instance Methods
execute(task)
click to toggle source
# File lib/pdo.rb, line 62 def execute(task) # execute forks a child process to run the task, then saves the # output from the child process into a thread local variable, marks # the :new_data tag. raise "no task" if task.nil? target, cmd = task open("|-") do |child_io| if child_io # The waitpid statement below causes the program to hang when # running some commands, such as wget. # waitpid child_io.pid Thread.current[:output] = child_io.readlines Thread.current[:new_data] = true Thread.current[:target] = target else STDIN.close STDERR.reopen(STDOUT) exec(*cmd) end end end
run()
click to toggle source
# File lib/pdo.rb, line 15 def run # run first spawns a set of threads, tells them to execute the task # queue. it then loop through the current Thread list (note # Thread.list returns all running and sleeping threads), and outputs # those new data. # for the spawned threads, they execute one task, then wait for their # data to be picked up by the main thread, then do the next task. return 1 if @tasks.size == 0 n = @thread_num < @tasks.size ? @thread_num : @tasks.size 1.upto(n) do Thread.new do while true do logger.info "#{Thread.current.object_id} started." begin execute(@tasks.next) # presumably new data is ready, thus i stop. # main thread will read :output and then wakes me up. Thread.stop rescue logger.info "No more task for #{Thread.current.object_id}." break end end end end loop do # break because the only left thread must be the main thread. break if Thread.list.size == 1 Thread.list.each do |t| next if t == Thread.main if t.key? :output and t.key? :new_data and t[:new_data] then logger.info "#{t.object_id} finished." puts "=== #{t[:target]} ===" t[:output].each {|x| puts x} # puts t[:output].join('').gsub("\n", ' | ').chomp(' | ') t[:new_data] = false # wakes up the thread t.run end end end end