class Droonga::Client::Connection::DroongaProtocol::Coolio::Receiver

Attributes

max_messages[RW]
on_error[W]

Public Class Methods

new(*args) click to toggle source
Calls superclass method
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 119
def initialize(*args)
  super(*args) do |engine|
    @engines << engine
    handle_engine(engine)
  end
  @requests = {}
  @engines = []
  @max_messages = nil
end

Public Instance Methods

close() click to toggle source
Calls superclass method
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 129
def close
  super
  engines = @engines.dup
  engines.each do |engine|
    engine.close
  end
end
droonga_name() click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 145
def droonga_name
  "#{host}:#{port}/droonga"
end
host() click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 137
def host
  @listen_socket.addr[3]
end
port() click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 141
def port
  @listen_socket.addr[1]
end
received?(id) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 160
def received?(id)
  if @requests.key?(id)
    @requests[id][:received]
  else
    true
  end
end
register(id, &callback) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 149
def register(id, &callback)
  @requests[id] = {
    :received => false,
    :callback => callback,
  }
end
unregister(id) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 156
def unregister(id)
  @requests.delete(id)
end

Private Instance Methods

handle_engine(engine) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 169
def handle_engine(engine)
  unpacker = MessagePack::Unpacker.new
  n_messages = 0
  on_read = lambda do |data|
    unpacker.feed_each(data) do |fluent_message|
      unless fluent_message
        on_error(NilMessage.new("coolio / unpacker.feed_each"))
      end
      tag, time, droonga_message = fluent_message
      unless droonga_message
        on_error(NilMessage.new("coolio / unpacker.feed_each",
                                :fluent_message => fluent_message.inspect))
      end
      id = droonga_message["inReplyTo"]
      request = @requests[id]
      n_messages += 1
      if request
        request[:received] = true
        request[:callback].call(droonga_message)
      end
      if @max_messages and
           n_messages >= @max_messages
        unregister(id)
      end
    end
  end
  engine.on_read do |data|
    on_read.call(data)
  end

  on_close = lambda do
    @engines.delete(engine)
  end
  engine.on_close do
    on_close.call
  end
end
on_error(error) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 207
def on_error(error)
  @on_error.call(error) if @on_error
end