class Pluggaloid::Stream

Public Class Methods

new(enumerator) click to toggle source
# File lib/pluggaloid/stream.rb, line 7
def initialize(enumerator)
  @enumerator = enumerator
end

Public Instance Methods

buffer(sec) click to toggle source
# File lib/pluggaloid/stream.rb, line 35
def buffer(sec)
  throttling_promise = nil
  buffer = []
  Stream.new(
    Enumerator.new do |yielder|
      @enumerator.each do |item|
        buffer << item
        throttling_promise ||= Delayer.new(delay: sec) do
          yielder << buffer.freeze
          buffer = []
          throttling_promise = nil
        end
      end
    end.lazy
  )
end
debounce(sec) click to toggle source
# File lib/pluggaloid/stream.rb, line 21
def debounce(sec)
  throttling_promise = nil
  Stream.new(
    Enumerator.new do |yielder|
      @enumerator.each do |item|
        throttling_promise&.cancel
        throttling_promise = Delayer.new(delay: sec) do
          yielder << item
        end
      end
    end.lazy
  )
end
merge(*streams) click to toggle source
# File lib/pluggaloid/stream.rb, line 52
def merge(*streams)
  Stream.new(Merge.new(self, *streams).lazy)
end
throttle(sec) click to toggle source
# File lib/pluggaloid/stream.rb, line 11
def throttle(sec)
  throttling = 0
  @enumerator.select do |item|
    r0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    if throttling <= r0
      throttling = r0 + sec
    end
  end
end