module Shifty::DSL

Public Instance Methods

batch_worker(options = {gathering: 1}, &block) click to toggle source
# File lib/shifty/dsl.rb, line 66
def batch_worker(options = {gathering: 1}, &block)
  ensure_regular_arity(block) if block
  batch_full = block ||
    Proc.new { |_, batch| batch.size >= options[:gathering] }

  batch_context = BatchContext.new({ batch_full: batch_full })

  Worker.new(context: batch_context) do |value, supply, context|
    if value
      context.collection = [value]
      until context.batch_complete?(
          context.collection.last,
          context.collection
      )
        context.collection << supply.shift
      end
      context.collection.compact
    end
  end
end
filter_worker(argument=nil, &block) click to toggle source
# File lib/shifty/dsl.rb, line 44
def filter_worker(argument=nil, &block)
  if (block && argument.respond_to?(:call))
    throw_with 'You cannot supply two callables'
  end
  callable = argument.respond_to?(:call) ? argument : block
  ensure_callable(callable)

  Worker.new do |value, supply|
    while value && !callable.call(value) do
      value = supply.shift
    end
    value
  end
end
handoff(something) click to toggle source
# File lib/shifty/dsl.rb, line 122
def handoff(something)
  Fiber.yield something
end
relay_worker(&block) click to toggle source
# File lib/shifty/dsl.rb, line 23
def relay_worker(&block)
  ensure_regular_arity(block)

  Worker.new do |value|
    value && block.call(value)
  end
end
side_worker(mode=:normal, &block) click to toggle source
# File lib/shifty/dsl.rb, line 31
def side_worker(mode=:normal, &block)
  ensure_regular_arity(block)

  Worker.new do |value|
    value.tap do |v|
      used_value = mode == :hardened ?
        Marshal.load(Marshal.dump(v)) : v

      v && block.call(used_value)
    end
  end
end
source_worker(argument=nil, &block) click to toggle source
# File lib/shifty/dsl.rb, line 5
def source_worker(argument=nil, &block)
  ensure_correct_arity_for!(argument, block)

  series = series_from(argument)
  callable = setup_callable_for(block, series)

  return Worker.new(&callable) if series.nil?

  Worker.new do
    series.each(&callable)

    while true do
      handoff nil
    end
  end

end
splitter_worker(&block) click to toggle source
# File lib/shifty/dsl.rb, line 87
def splitter_worker(&block)
  ensure_regular_arity(block)

  Worker.new do |value|
    if value.nil?
      value
    else
      parts = [block.call(value)].flatten
      while parts.size > 1 do
        handoff parts.shift
      end
      parts.shift
    end
  end
end
trailing_worker(trail_length=2) click to toggle source
# File lib/shifty/dsl.rb, line 103
def trailing_worker(trail_length=2)
  trail = []
  Worker.new do |value, supply|
    if value
      trail.unshift value
      if trail.size >= trail_length
        trail.pop
      end
      while trail.size < trail_length
        trail.unshift supply.shift
      end

      trail
    else
      value
    end
  end
end

Private Instance Methods

ensure_callable(callable) click to toggle source
# File lib/shifty/dsl.rb, line 132
def ensure_callable(callable)
  unless callable && callable.respond_to?(:call)
    throw_with 'You must supply a callable'
  end
end
ensure_correct_arity_for!(argument, block) click to toggle source

only valid for source_worker

# File lib/shifty/dsl.rb, line 146
def ensure_correct_arity_for!(argument, block)
  return unless block
  if argument
    ensure_regular_arity(block)
  else
    if block.arity > 0
      throw_with \
        'Source worker cannot accept any arguments (arity == 0)'
    end
  end
end
ensure_regular_arity(block) click to toggle source
# File lib/shifty/dsl.rb, line 138
def ensure_regular_arity(block)
  if block.arity != 1
    throw_with \
      "Worker must accept exactly one argument (arity == 1)"
  end
end
series_from(series) click to toggle source
# File lib/shifty/dsl.rb, line 158
def series_from(series)
  return if series.nil?
  case
  when series.respond_to?(:to_a)
    series.to_a
  when series.respond_to?(:scan)
    series.scan(/./)
  else
    [series]
  end
end
setup_callable_for(block, series) click to toggle source
# File lib/shifty/dsl.rb, line 170
def setup_callable_for(block, series)
  return block unless series
  if block
    return Proc.new { |value| handoff block.call(value) }
  else
    return Proc.new { |value| handoff value }
  end
end
throw_with(*msg) click to toggle source
# File lib/shifty/dsl.rb, line 128
def throw_with(*msg)
  raise WorkerInitializationError.new([msg].flatten.join(' '))
end