class EventMachine::Iterator

A simple iterator for concurrent asynchronous work.

Unlike ruby’s built-in iterators, the end of the current iteration cycle is signaled manually, instead of happening automatically after the yielded block finishes executing. For example:

(0..10).each{ |num| }

becomes:

EM::Iterator.new(0..10).each{ |num,iter| iter.next }

This is especially useful when doing asynchronous work via reactor libraries and functions. For example, given a sync and async http api:

response = sync_http_get(url); ...
async_http_get(url){ |response| ... }

a synchronous iterator such as:

responses = urls.map{ |url| sync_http_get(url) }
...
puts 'all done!'

could be written as:

EM::Iterator.new(urls).map(proc{ |url,iter|
  async_http_get(url){ |res|
    iter.return(res)
  }
}, proc{ |responses|
  ...
  puts 'all done!'
})

Now, you can take advantage of the asynchronous api to issue requests in parallel. For example, to fetch 10 urls at a time, simply pass in a concurrency of 10:

EM::Iterator.new(urls, 10).each do |url,iter|
  async_http_get(url){ iter.next }
end

Constants

Stop

Attributes

concurrency[R]

Public Class Methods

new(list, concurrency = 1) click to toggle source

Create a new parallel async iterator with specified concurrency.

i = EM::Iterator.new(1..100, 10)

will create an iterator over the range that processes 10 items at a time. Iteration is started via each, map or inject

The list may either be an array-like object, or a proc that returns a new object to be processed each time it is called. If a proc is used, it must return EventMachine::Iterator::Stop to signal the end of the iterations.

# File lib/em/iterator.rb, line 56
def initialize(list, concurrency = 1)
  raise ArgumentError, 'concurrency must be bigger than zero' unless (concurrency > 0)
  if list.respond_to?(:call)
    @list = nil
    @list_proc = list
  elsif list.respond_to?(:to_a)
    @list = list.to_a.dup
    @list_proc = nil
  else
    raise ArgumentError, 'argument must be a proc or an array'
  end
  @concurrency = concurrency

  @started = false
  @ended = false
end

Public Instance Methods

concurrency=(val) click to toggle source

Change the concurrency of this iterator. Workers will automatically be spawned or destroyed to accomodate the new concurrency level.

# File lib/em/iterator.rb, line 76
def concurrency=(val)
  old = @concurrency
  @concurrency = val

  spawn_workers if val > old and @started and !@ended
end
each(foreach=nil, after=nil, &blk) click to toggle source

Iterate over a set of items using the specified block or proc.

EM::Iterator.new(1..100).each do |num, iter|
  puts num
  iter.next
end

An optional second proc is invoked after the iteration is complete.

EM::Iterator.new(1..100).each(
  proc{ |num,iter| iter.next },
  proc{ puts 'all done' }
)
# File lib/em/iterator.rb, line 98
def each(foreach=nil, after=nil, &blk)
  raise ArgumentError, 'proc or block required for iteration' unless foreach ||= blk
  raise RuntimeError, 'cannot iterate over an iterator more than once' if @started or @ended

  @started = true
  @pending = 0
  @workers = 0

  all_done = proc{
    after.call if after and @ended and @pending == 0
  }

  @process_next = proc{
    # p [:process_next, :pending=, @pending, :workers=, @workers, :ended=, @ended, :concurrency=, @concurrency, :list=, @list]
    unless @ended or @workers > @concurrency
      item = next_item()
      if item.equal?(Stop)
        @ended = true
        @workers -= 1
        all_done.call
      else
        @pending += 1

        is_done = false
        on_done = proc{
          raise RuntimeError, 'already completed this iteration' if is_done
          is_done = true

          @pending -= 1

          if @ended
            all_done.call
          else
            EM.next_tick(@process_next)
          end
        }
        class << on_done
          alias :next :call
        end

        foreach.call(item, on_done)
      end
    else
      @workers -= 1
    end
  }

  spawn_workers

  self
end
inject(obj, foreach, after) click to toggle source

Inject the results of an asynchronous iteration onto a given object.

EM::Iterator.new(%w[ pwd uptime uname date ], 2).inject({}, proc{ |hash,cmd,iter|
  EM.system(cmd){ |output,status|
    hash[cmd] = status.exitstatus == 0 ? output.strip : nil
    iter.return(hash)
  }
}, proc{ |results|
  p results
})
# File lib/em/iterator.rb, line 199
def inject(obj, foreach, after)
  each(proc{ |item,iter|
    is_done = false
    on_done = proc{ |res|
      raise RuntimeError, 'already returned a value for this iteration' if is_done
      is_done = true

      obj = res
      iter.next
    }
    class << on_done
      alias :return :call
      def next
        raise NoMethodError, 'must call #return on an inject iterator'
      end
    end

    foreach.call(obj, item, on_done)
  }, proc{
    after.call(obj)
  })
end
map(foreach, after) click to toggle source

Collect the results of an asynchronous iteration into an array.

EM::Iterator.new(%w[ pwd uptime uname date ], 2).map(proc{ |cmd,iter|
  EM.system(cmd){ |output,status|
    iter.return(output)
  }
}, proc{ |results|
  p results
})
# File lib/em/iterator.rb, line 160
def map(foreach, after)
  index = 0

  inject([], proc{ |results,item,iter|
    i = index
    index += 1

    is_done = false
    on_done = proc{ |res|
      raise RuntimeError, 'already returned a value for this iteration' if is_done
      is_done = true

      results[i] = res
      iter.return(results)
    }
    class << on_done
      alias :return :call
      def next
        raise NoMethodError, 'must call #return on a map iterator'
      end
    end

    foreach.call(item, on_done)
  }, proc{ |results|
    after.call(results)
  })
end

Private Instance Methods

next_item() click to toggle source

Return the next item from @list or @list_proc. Once items have run out, will return EM::Iterator::Stop. Procs must supply this themselves

# File lib/em/iterator.rb, line 240
def next_item
  if @list_proc
    @list_proc.call
  else
    @list.empty? ? Stop : @list.shift
  end
end
spawn_workers() click to toggle source

Spawn workers to consume items from the iterator’s enumerator based on the current concurrency level.

# File lib/em/iterator.rb, line 226
def spawn_workers
  EM.next_tick(start_worker = proc{
    if @workers < @concurrency and !@ended
      # p [:spawning_worker, :workers=, @workers, :concurrency=, @concurrency, :ended=, @ended]
      @workers += 1
      @process_next.call
      EM.next_tick(start_worker)
    end
  })
  nil
end