class Fyrehose::Reactor

Public Class Methods

run(host, port, opts = {}, &block) click to toggle source
# File lib/fyrehose/reactor.rb, line 5
def self.run(host, port, opts = {}, &block)
  unless block
    raise Fyrehose::Error.new("missing proc{ |channel,data| } for #run")
  end

  EventMachine.run do
    reactor = EventMachine.connect(host, port, Fyrehose::Reactor)
    reactor.instance_eval(&block)
  end
end

Public Instance Methods

deliver(channel, data) click to toggle source
# File lib/fyrehose/reactor.rb, line 21
def deliver(channel, data)
  txid = Fyrehose.next_txid
  send_data("##{txid} @#{channel} *#{data.size} #{data}\n")
end
on_message(&block) click to toggle source
# File lib/fyrehose/reactor.rb, line 39
def on_message(&block)
  @callbacks << block
end
post_init() click to toggle source
# File lib/fyrehose/reactor.rb, line 16
def post_init
  @input_stream = Fyrehose::InputStream.new
  @callbacks = []
end
receive_data(chunk) click to toggle source
# File lib/fyrehose/reactor.rb, line 43
def receive_data(chunk)
  @input_stream << chunk

  @input_stream.each do |msg|
    next unless msg[:type] == :data

    @callbacks.each do |block|
      block.call(msg[:channel], msg[:body])
    end
  end
end
set_flags(channel, flags) click to toggle source
# File lib/fyrehose/reactor.rb, line 26
def set_flags(channel, flags)
  txid = Fyrehose.next_txid
  send_data("##{txid} @#{channel} +#{flags}\n")
end
subscribe(channel) click to toggle source
# File lib/fyrehose/reactor.rb, line 31
def subscribe(channel)
  set_flags(channel, 1)
end
unsubscribe(channel) click to toggle source
# File lib/fyrehose/reactor.rb, line 35
def unsubscribe(channel)
  set_flags(channel, 0)
end