class GoChanel::Pipe

Attributes

pools[RW]

Public Class Methods

new() click to toggle source
# File lib/go_chanel/pipe.rb, line 10
def initialize
  @pools = []
end
|(pool) click to toggle source
# File lib/go_chanel/pipe.rb, line 4
def self.|(pool)
  pipe = Pipe.new
  pipe.pools << pool
  pipe
end

Public Instance Methods

run(*args) click to toggle source
# File lib/go_chanel/pipe.rb, line 19
def run(*args)
  raise PipeException.new("at_leaset pipe two pools") if @pools.count < 2
  concurrence_chan = GoChanel::Chanel.new(@pools.count)

  chans = build_chans

  start_pool = @pools.shift
  last_pool = @pools.pop

  Thread.new(args) do |params|
    begin
      GoChanel::Task.start_task(chans[0], *params, &start_pool.proc)
    rescue => e
      GoChanel::ErrNotifier.notify(e)
    ensure
      concurrence_chan.push(true)
    end
  end

  #no head and tail
  @pools.each_with_index do |pool, i|
    Thread.new do
      begin
        GoChanel::Task.normal_task(chans[i], chans[i + 1], pool.size, &pool.proc)
      rescue => e
        GoChanel::ErrNotifier.notify(e)
      ensure
        concurrence_chan.push(true)
      end
    end
  end

  Thread.new do
    begin
      GoChanel::Task.end_task(chans[-1], last_pool.size, &last_pool.proc)
    rescue => e
      GoChanel::ErrNotifier.notify(e)
    ensure
      concurrence_chan.push(true)
    end
  end

  concurrence_chan.cap.times do
    concurrence_chan.pop
  end
end
|(pool) click to toggle source
# File lib/go_chanel/pipe.rb, line 14
def |(pool)
  @pools << pool
  self
end

Private Instance Methods

build_chans() click to toggle source
# File lib/go_chanel/pipe.rb, line 68
def build_chans
  chans = []

  @pools.each_with_index do |pool, i|
    next if i == 0
    chans << GoChanel::Chanel.new(pool.size)
  end

  chans
end