class Pluggaloid::Stream
Public Class Methods
new(enumerator)
click to toggle source
# File lib/pluggaloid/stream.rb, line 7 def initialize(enumerator) @enumerator = enumerator end
Public Instance Methods
buffer(sec)
click to toggle source
# File lib/pluggaloid/stream.rb, line 35 def buffer(sec) throttling_promise = nil buffer = [] Stream.new( Enumerator.new do |yielder| @enumerator.each do |item| buffer << item throttling_promise ||= Delayer.new(delay: sec) do yielder << buffer.freeze buffer = [] throttling_promise = nil end end end.lazy ) end
debounce(sec)
click to toggle source
# File lib/pluggaloid/stream.rb, line 21 def debounce(sec) throttling_promise = nil Stream.new( Enumerator.new do |yielder| @enumerator.each do |item| throttling_promise&.cancel throttling_promise = Delayer.new(delay: sec) do yielder << item end end end.lazy ) end
merge(*streams)
click to toggle source
# File lib/pluggaloid/stream.rb, line 52 def merge(*streams) Stream.new(Merge.new(self, *streams).lazy) end
throttle(sec)
click to toggle source
# File lib/pluggaloid/stream.rb, line 11 def throttle(sec) throttling = 0 @enumerator.select do |item| r0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) if throttling <= r0 throttling = r0 + sec end end end