class Uc::EventStream

Attributes

debug_output[RW]
queue_name[R]

Public Class Methods

new(queue_name) click to toggle source
# File lib/uc/event_stream.rb, line 11
def initialize(queue_name)
  @queue_name = queue_name
end

Public Instance Methods

close_connections() click to toggle source
# File lib/uc/event_stream.rb, line 95
def close_connections
  writer.close if writer
  @writer = nil
end
debug(msg) click to toggle source
# File lib/uc/event_stream.rb, line 19
def debug(msg)
  pub :debug, msg
end
expect(event_type, timeout: 30, recreate: true) { || ... } click to toggle source
# File lib/uc/event_stream.rb, line 38
def expect(event_type, timeout: 30, recreate: true, &block)
  begin
    mq.recreate if recreate
    mq.clear
    t = wait_in_background(event_type, timeout, output: true, first_timeout: 50)
    yield
    t.join
    raise t[:error] if t[:error]
  rescue => e
    raise uc_error(e)
  ensure
    t.kill if t
  end
end
fatal(msg) click to toggle source
# File lib/uc/event_stream.rb, line 27
def fatal(msg)
  pub :fatal, msg
end
info(msg) click to toggle source
# File lib/uc/event_stream.rb, line 15
def info(msg)
  pub :info, msg
end
print(event) click to toggle source
pub(type, msg) click to toggle source
# File lib/uc/event_stream.rb, line 31
def pub(type, msg)
  event = Event.new(type, msg)
  writer.send event.to_s(mq.msg_size)
rescue Errno::ENOENT, Errno::EAGAIN, Errno::EACCES, Errno::EMSGSIZE => e
  puts "#{e.class} #{e.message}"
end
wait(event_type, timeout, output: false, first_timeout: nil) click to toggle source
# File lib/uc/event_stream.rb, line 53
def wait(event_type, timeout, output: false, first_timeout: nil)
  event_type = event_type.to_s
  message = ""
  event = ""
  t = first_timeout || timeout
  mq.reader do |r|
    loop do
      r.receive(message, t)
      t = timeout
      event = Event.parse message
      print event if output
      break if event.type == event_type
    end
  end
  puts "#{"success".green.bold} #{event.msg}"
  true
end
wait_in_background(event_type, timeout, **kwargs) click to toggle source
# File lib/uc/event_stream.rb, line 71
def wait_in_background(event_type, timeout, **kwargs)
  Thread.new do
    begin
      wait(event_type, timeout, **kwargs)
    rescue => e
      Thread.current[:error] = e
      false
    end
  end
end
warn(msg) click to toggle source
# File lib/uc/event_stream.rb, line 23
def warn(msg)
  pub :warn, msg
end

Private Instance Methods

mq() click to toggle source
# File lib/uc/event_stream.rb, line 103
def mq
  @mq ||= ::Uc::Mqueue.new(@queue_name)
end
uc_error(e) click to toggle source
# File lib/uc/event_stream.rb, line 111
def uc_error(e)
  case e
  when Errno::EACCES
    msg = "unable to setup message queue"
  when Errno::ENOENT
    msg = "message queue deleted"
  when Errno::ETIMEDOUT
    msg = "timeout reached while waiting for server ready msg"
  else
    return e
  end

  return ::Uc::Error.new(msg)
end
writer() click to toggle source
# File lib/uc/event_stream.rb, line 107
def writer
  @writer ||= mq.nb_writer
end