class Fyrehose::Reactor
Public Class Methods
run(host, port, opts = {}, &block)
click to toggle source
# File lib/fyrehose/reactor.rb, line 5 def self.run(host, port, opts = {}, &block) unless block raise Fyrehose::Error.new("missing proc{ |channel,data| } for #run") end EventMachine.run do reactor = EventMachine.connect(host, port, Fyrehose::Reactor) reactor.instance_eval(&block) end end
Public Instance Methods
deliver(channel, data)
click to toggle source
# File lib/fyrehose/reactor.rb, line 21 def deliver(channel, data) txid = Fyrehose.next_txid send_data("##{txid} @#{channel} *#{data.size} #{data}\n") end
on_message(&block)
click to toggle source
# File lib/fyrehose/reactor.rb, line 39 def on_message(&block) @callbacks << block end
post_init()
click to toggle source
# File lib/fyrehose/reactor.rb, line 16 def post_init @input_stream = Fyrehose::InputStream.new @callbacks = [] end
receive_data(chunk)
click to toggle source
# File lib/fyrehose/reactor.rb, line 43 def receive_data(chunk) @input_stream << chunk @input_stream.each do |msg| next unless msg[:type] == :data @callbacks.each do |block| block.call(msg[:channel], msg[:body]) end end end
set_flags(channel, flags)
click to toggle source
# File lib/fyrehose/reactor.rb, line 26 def set_flags(channel, flags) txid = Fyrehose.next_txid send_data("##{txid} @#{channel} +#{flags}\n") end
subscribe(channel)
click to toggle source
# File lib/fyrehose/reactor.rb, line 31 def subscribe(channel) set_flags(channel, 1) end
unsubscribe(channel)
click to toggle source
# File lib/fyrehose/reactor.rb, line 35 def unsubscribe(channel) set_flags(channel, 0) end