class Timmy::CommandStreamer

Public Class Methods

new(command) click to toggle source
# File lib/timmy/command_streamer.rb, line 12
def initialize(command)
  @command = command
  @queue = Queue.new
end
stream(command, &block) click to toggle source
# File lib/timmy/command_streamer.rb, line 7
def stream(command, &block)
  self.new(command).stream(&block)
end

Public Instance Methods

stream(&block) click to toggle source
# File lib/timmy/command_streamer.rb, line 17
def stream(&block)
  Open3.popen3(*@command) do |stdin, stdout, stderr, wait_thr|
    start_readers(stdout: stdout, stderr: stderr)
    pop_lines_from_active_readers(block)
    join_readers()
  end
end

Private Instance Methods

join_readers() click to toggle source
# File lib/timmy/command_streamer.rb, line 54
def join_readers
  @readers.values.each(&:join)
end
pop_lines_from_active_readers(delegate) click to toggle source
# File lib/timmy/command_streamer.rb, line 42
def pop_lines_from_active_readers(delegate)
  while @active_readers.any? do
    type, line = @queue.pop

    if line
      delegate.call(type, line)
    else
      @active_readers.delete(type)
    end
  end
end
start_readers(streams) click to toggle source
# File lib/timmy/command_streamer.rb, line 27
def start_readers(streams)
  @readers = streams.map do |type, stream|
    thread = Thread.new do
      until (line = stream.gets).nil? do
        @queue << [type, line]
      end
      @queue << [type, nil]
    end

    [type, thread]
  end.to_h

  @active_readers = @readers.keys
end