module Shifty::DSL
Public Instance Methods
batch_worker(options = {gathering: 1}, &block)
click to toggle source
# File lib/shifty/dsl.rb, line 66 def batch_worker(options = {gathering: 1}, &block) ensure_regular_arity(block) if block batch_full = block || Proc.new { |_, batch| batch.size >= options[:gathering] } batch_context = BatchContext.new({ batch_full: batch_full }) Worker.new(context: batch_context) do |value, supply, context| if value context.collection = [value] until context.batch_complete?( context.collection.last, context.collection ) context.collection << supply.shift end context.collection.compact end end end
filter_worker(argument=nil, &block)
click to toggle source
# File lib/shifty/dsl.rb, line 44 def filter_worker(argument=nil, &block) if (block && argument.respond_to?(:call)) throw_with 'You cannot supply two callables' end callable = argument.respond_to?(:call) ? argument : block ensure_callable(callable) Worker.new do |value, supply| while value && !callable.call(value) do value = supply.shift end value end end
handoff(something)
click to toggle source
# File lib/shifty/dsl.rb, line 122 def handoff(something) Fiber.yield something end
relay_worker(&block)
click to toggle source
# File lib/shifty/dsl.rb, line 23 def relay_worker(&block) ensure_regular_arity(block) Worker.new do |value| value && block.call(value) end end
side_worker(mode=:normal, &block)
click to toggle source
# File lib/shifty/dsl.rb, line 31 def side_worker(mode=:normal, &block) ensure_regular_arity(block) Worker.new do |value| value.tap do |v| used_value = mode == :hardened ? Marshal.load(Marshal.dump(v)) : v v && block.call(used_value) end end end
source_worker(argument=nil, &block)
click to toggle source
# File lib/shifty/dsl.rb, line 5 def source_worker(argument=nil, &block) ensure_correct_arity_for!(argument, block) series = series_from(argument) callable = setup_callable_for(block, series) return Worker.new(&callable) if series.nil? Worker.new do series.each(&callable) while true do handoff nil end end end
splitter_worker(&block)
click to toggle source
# File lib/shifty/dsl.rb, line 87 def splitter_worker(&block) ensure_regular_arity(block) Worker.new do |value| if value.nil? value else parts = [block.call(value)].flatten while parts.size > 1 do handoff parts.shift end parts.shift end end end
trailing_worker(trail_length=2)
click to toggle source
# File lib/shifty/dsl.rb, line 103 def trailing_worker(trail_length=2) trail = [] Worker.new do |value, supply| if value trail.unshift value if trail.size >= trail_length trail.pop end while trail.size < trail_length trail.unshift supply.shift end trail else value end end end
Private Instance Methods
ensure_callable(callable)
click to toggle source
# File lib/shifty/dsl.rb, line 132 def ensure_callable(callable) unless callable && callable.respond_to?(:call) throw_with 'You must supply a callable' end end
ensure_correct_arity_for!(argument, block)
click to toggle source
only valid for source_worker
# File lib/shifty/dsl.rb, line 146 def ensure_correct_arity_for!(argument, block) return unless block if argument ensure_regular_arity(block) else if block.arity > 0 throw_with \ 'Source worker cannot accept any arguments (arity == 0)' end end end
ensure_regular_arity(block)
click to toggle source
# File lib/shifty/dsl.rb, line 138 def ensure_regular_arity(block) if block.arity != 1 throw_with \ "Worker must accept exactly one argument (arity == 1)" end end
series_from(series)
click to toggle source
# File lib/shifty/dsl.rb, line 158 def series_from(series) return if series.nil? case when series.respond_to?(:to_a) series.to_a when series.respond_to?(:scan) series.scan(/./) else [series] end end
setup_callable_for(block, series)
click to toggle source
# File lib/shifty/dsl.rb, line 170 def setup_callable_for(block, series) return block unless series if block return Proc.new { |value| handoff block.call(value) } else return Proc.new { |value| handoff value } end end
throw_with(*msg)
click to toggle source
# File lib/shifty/dsl.rb, line 128 def throw_with(*msg) raise WorkerInitializationError.new([msg].flatten.join(' ')) end