class Arachni::Reactor::Iterator

@note Pretty much an `EventMachine::Iterator` rip-off.

A simple iterator for concurrent asynchronous work.

Unlike Ruby's built-in iterators, the end of the current iteration cycle is signaled manually, instead of happening automatically after the yielded block finishes executing.

@example Direct initialization.

Iterator.new( reactor, 0..10 ).each { |num, iterator| iterator.next }

@example Reactor factory.

reactor.create_iterator( 0..10 ).each { |num, iterator| iterator.next }

@author Tasos “Zapotek” Laskos <tasos.laskos@gmail.com>

Attributes

concurrency[R]

@return [Integer]

reactor[R]

@return [Reactor]

Public Class Methods

new( reactor, list, concurrency = 1 ) click to toggle source

@example Create a new parallel async iterator with specified concurrency.

i = Iterator.new( reactor, 1..100, 10 )

@param [Reactor] reactor @param [#to_a] list

List to iterate.

@param [Integer] concurrency

Parallel workers to spawn.
# File lib/arachni/reactor/iterator.rb, line 47
def initialize( reactor, list, concurrency = 1 )
    raise ArgumentError, 'argument must be an array' unless list.respond_to?(:to_a)
    raise ArgumentError, 'concurrency must be bigger than zero' unless concurrency > 0

    @reactor     = reactor
    @list        = list.to_a.dup
    @concurrency = concurrency

    @started = false
    @ended   = false
end

Public Instance Methods

concurrency=( val ) click to toggle source

Change the concurrency of this iterator. Workers will automatically be spawned or destroyed to accommodate the new concurrency level.

@param [Integer] val

New concurrency.
# File lib/arachni/reactor/iterator.rb, line 64
def concurrency=( val )
    old          = @concurrency
    @concurrency = val

    spawn_workers if val > old && @started && !@ended

    val
end
each( foreach = nil, after = nil, &block ) click to toggle source

@example Iterate over a set of items using the specified block or proc.

Iterator.new( reactor, 1..100 ).each do |num, iterator|
    puts num
    iterator.next
end

@example An optional second proc is invoked after the iteration is complete.

Iterator.new( reactor, 1..100 ).each(
    proc { |num, iterator| iterator.next },
    proc { puts 'all done' }
)
# File lib/arachni/reactor/iterator.rb, line 86
def each( foreach = nil, after = nil, &block )
    raise ArgumentError, 'Proc or Block required for iteration.' unless foreach ||= block
    raise RuntimeError, 'Cannot iterate over an iterator more than once.' if @started or @ended

    @started = true
    @pending = 0
    @workers = 0

    all_done = proc do
        after.call if after && @ended && @pending == 0
    end

    @process_next = proc do
        if @ended || @workers > @concurrency
            @workers -= 1
        else
            if @list.empty?
                @ended    = true
                @workers -= 1

                all_done.call
            else
                item      = @list.shift
                @pending += 1

                is_done = false
                on_done = proc do
                    raise RuntimeError, 'Already completed this iteration.' if is_done
                    is_done = true

                    @pending -= 1

                    if @ended
                        all_done.call
                    else
                        @reactor.next_tick(&@process_next)
                    end
                end

                class << on_done
                    alias :next :call
                end

                foreach.call(item, on_done)
            end
        end
    end

    spawn_workers

    self
end
inject( object, foreach, after ) click to toggle source

@example Inject the results of an asynchronous iteration onto a given object.

Iterator.new( reactor, %w(one two three four), 2 ).inject( {},
    proc do |hash, string, iterator|
        hash.merge!( string => string.size )
        iterator.return( hash )
    end,
    proc do |results|
        p results
    end
)

@param [Object] object @param [Proc] foreach

`Proc` to handle each entry.

@param [Proc] after

`Proc` to handle the results.
# File lib/arachni/reactor/iterator.rb, line 204
def inject( object, foreach, after )
    each(
        proc do |item, iter|
            is_done = false
            on_done = proc do |res|
                raise RuntimeError, 'Already returned a value for this iteration.' if is_done
                is_done = true

                object = res
                iter.next
            end

            class << on_done
                alias :return :call
                def next
                    raise NoMethodError, 'Must call #return on an inject iterator.'
                end
            end

            foreach.call( object, item, on_done )
        end,

        proc do
            after.call(object)
        end
    )
end
map( foreach, after ) click to toggle source

@example Collect the results of an asynchronous iteration into an array.

Iterator.new( reactor, %w(one two three four), 2 ).map(
    proc do |string, iterator|
        iterator.return( string.size )
    end,
    proc do |results|
        p results
    end
)

@param [Proc] foreach

`Proc` to handle each entry.

@param [Proc] after

`Proc` to handle the results.
# File lib/arachni/reactor/iterator.rb, line 154
def map( foreach, after )
    index = 0

    inject( [],
        proc do |results, item, iter|
            i      = index
            index += 1

            is_done = false
            on_done = proc do |res|
                raise RuntimeError, 'Already returned a value for this iteration.' if is_done
                is_done = true

                results[i] = res
                iter.return(results)
            end

            class << on_done
                alias :return :call
                def next
                    raise NoMethodError, 'Must call #return on a map iterator.'
                end
            end

            foreach.call( item, on_done )
        end,

        proc do |results|
            after.call(results)
        end
    )
end

Private Instance Methods

spawn_workers() click to toggle source

Spawn workers to consume items from the iterator's enumerator based on the current concurrency level.

# File lib/arachni/reactor/iterator.rb, line 236
def spawn_workers
    @reactor.next_tick( &proc { |task|
        next if @workers >= @concurrency || @ended

        @workers += 1
        @process_next.call
        @reactor.next_tick(&task.to_proc)
    })

    nil
end