class Thread::Pipe

A pipe lets you execute various tasks on a set of data in parallel, each datum inserted in the pipe is passed along through queues to the various functions composing the pipe, the final result is inserted in the final queue.

Public Class Methods

finalizer(tasks) click to toggle source

@private

# File lib/thread/pipe.rb, line 67
def self.finalizer(tasks)
        proc {
                tasks.each(&:kill)
        }
end
new(input = Queue.new, output = Queue.new) click to toggle source

Create a pipe using the optionally passed objects as input and output queue.

The objects must respond to enq and deq, and block on deq.

# File lib/thread/pipe.rb, line 57
def initialize(input = Queue.new, output = Queue.new)
        @tasks = []

        @input  = input
        @output = output

        ObjectSpace.define_finalizer self, self.class.finalizer(@tasks)
end

Public Instance Methods

<<(data)
Alias for: enq
deq(non_block = false) click to toggle source

Get an element from the output queue.

# File lib/thread/pipe.rb, line 106
def deq(non_block = false)
        @output.deq(non_block)
end
Also aliased as: pop, ~
empty?() click to toggle source

Check if the pipe is empty.

# File lib/thread/pipe.rb, line 89
def empty?
        @input.empty? && @output.empty? && @tasks.all?(&:empty?)
end
enq(data) click to toggle source

Insert data in the pipe.

# File lib/thread/pipe.rb, line 94
def enq(data)
        return if @tasks.empty?

        @input.enq data

        self
end
Also aliased as: push, <<
pop(non_block = false)
Alias for: deq
push(data)
Alias for: enq
|(func) click to toggle source

Add a task to the pipe, it must respond to call and arity, and arity must return 1.

# File lib/thread/pipe.rb, line 75
def |(func)
        if func.arity != 1
                raise ArgumentError, 'wrong arity'
        end

        Task.new(func, (@tasks.empty? ? @input : Queue.new), @output).tap {|t|
                @tasks.last.output = t.input unless @tasks.empty?
                @tasks << t
        }

        self
end
~(non_block = false)
Alias for: deq