class Thread::Pipe
A pipe lets you execute various tasks on a set of data in parallel, each datum inserted in the pipe is passed along through queues to the various functions composing the pipe, the final result is inserted in the final queue.
Public Class Methods
finalizer(tasks)
click to toggle source
@private
# File lib/thread/pipe.rb, line 67 def self.finalizer(tasks) proc { tasks.each(&:kill) } end
new(input = Queue.new, output = Queue.new)
click to toggle source
Create a pipe using the optionally passed objects as input and output queue.
The objects must respond to enq
and deq
, and block on deq
.
# File lib/thread/pipe.rb, line 57 def initialize(input = Queue.new, output = Queue.new) @tasks = [] @input = input @output = output ObjectSpace.define_finalizer self, self.class.finalizer(@tasks) end
Public Instance Methods
deq(non_block = false)
click to toggle source
Get an element from the output queue.
# File lib/thread/pipe.rb, line 106 def deq(non_block = false) @output.deq(non_block) end
empty?()
click to toggle source
Check if the pipe is empty.
# File lib/thread/pipe.rb, line 89 def empty? @input.empty? && @output.empty? && @tasks.all?(&:empty?) end
enq(data)
click to toggle source
Insert data in the pipe.
# File lib/thread/pipe.rb, line 94 def enq(data) return if @tasks.empty? @input.enq data self end
|(func)
click to toggle source
Add a task to the pipe, it must respond to call and arity, and arity must return 1.
# File lib/thread/pipe.rb, line 75 def |(func) if func.arity != 1 raise ArgumentError, 'wrong arity' end Task.new(func, (@tasks.empty? ? @input : Queue.new), @output).tap {|t| @tasks.last.output = t.input unless @tasks.empty? @tasks << t } self end