class Uc::EventStream
Attributes
debug_output[RW]
queue_name[R]
Public Class Methods
new(queue_name)
click to toggle source
# File lib/uc/event_stream.rb, line 11 def initialize(queue_name) @queue_name = queue_name end
Public Instance Methods
close_connections()
click to toggle source
# File lib/uc/event_stream.rb, line 95 def close_connections writer.close if writer @writer = nil end
debug(msg)
click to toggle source
# File lib/uc/event_stream.rb, line 19 def debug(msg) pub :debug, msg end
expect(event_type, timeout: 30, recreate: true) { || ... }
click to toggle source
# File lib/uc/event_stream.rb, line 38 def expect(event_type, timeout: 30, recreate: true, &block) begin mq.recreate if recreate mq.clear t = wait_in_background(event_type, timeout, output: true, first_timeout: 50) yield t.join raise t[:error] if t[:error] rescue => e raise uc_error(e) ensure t.kill if t end end
fatal(msg)
click to toggle source
# File lib/uc/event_stream.rb, line 27 def fatal(msg) pub :fatal, msg end
info(msg)
click to toggle source
# File lib/uc/event_stream.rb, line 15 def info(msg) pub :info, msg end
print(event)
click to toggle source
# File lib/uc/event_stream.rb, line 82 def print(event) case event.type when "info" puts event.msg when "warn" puts "#{"warn".yellow.bold} #{event.msg}" when "debug" puts "#{"debug".blue.bold} #{event.msg}" if debug_output when "fatal" raise ::Uc::Error, event.msg end end
pub(type, msg)
click to toggle source
# File lib/uc/event_stream.rb, line 31 def pub(type, msg) event = Event.new(type, msg) writer.send event.to_s(mq.msg_size) rescue Errno::ENOENT, Errno::EAGAIN, Errno::EACCES, Errno::EMSGSIZE => e puts "#{e.class} #{e.message}" end
wait(event_type, timeout, output: false, first_timeout: nil)
click to toggle source
# File lib/uc/event_stream.rb, line 53 def wait(event_type, timeout, output: false, first_timeout: nil) event_type = event_type.to_s message = "" event = "" t = first_timeout || timeout mq.reader do |r| loop do r.receive(message, t) t = timeout event = Event.parse message print event if output break if event.type == event_type end end puts "#{"success".green.bold} #{event.msg}" true end
wait_in_background(event_type, timeout, **kwargs)
click to toggle source
# File lib/uc/event_stream.rb, line 71 def wait_in_background(event_type, timeout, **kwargs) Thread.new do begin wait(event_type, timeout, **kwargs) rescue => e Thread.current[:error] = e false end end end
warn(msg)
click to toggle source
# File lib/uc/event_stream.rb, line 23 def warn(msg) pub :warn, msg end
Private Instance Methods
mq()
click to toggle source
# File lib/uc/event_stream.rb, line 103 def mq @mq ||= ::Uc::Mqueue.new(@queue_name) end
uc_error(e)
click to toggle source
# File lib/uc/event_stream.rb, line 111 def uc_error(e) case e when Errno::EACCES msg = "unable to setup message queue" when Errno::ENOENT msg = "message queue deleted" when Errno::ETIMEDOUT msg = "timeout reached while waiting for server ready msg" else return e end return ::Uc::Error.new(msg) end
writer()
click to toggle source
# File lib/uc/event_stream.rb, line 107 def writer @writer ||= mq.nb_writer end