class Droonga::Client::Connection::DroongaProtocol::Coolio

Attributes

on_error[W]

Public Class Methods

new(host, port, tag, options={}) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 212
def initialize(host, port, tag, options={})
  @host = host
  @port = port
  @tag = tag
  default_options = {
  }
  @options = default_options.merge(options)
  @loop = options[:loop] || ::Coolio::Loop.default

  @sender = Sender.connect(@host, @port)
  @sender.attach(@loop)
  @receiver_host = @options[:receiver_host] || Socket.gethostname
  @receiver_port = @options[:receiver_port] || 0
  @receiver = Receiver.new(@receiver_host, @receiver_port)
  @receiver.on_error = lambda do |error|
    on_error(ReceiverError.new(error))
  end
  @receiver.attach(@loop)
end

Public Instance Methods

close() click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 309
def close
  @sender.close
  @receiver.close
end
on_error(error) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 314
def on_error(error)
  @on_error.call(error) if @on_error
end
request(message, options={}) { |error| ... } click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 232
def request(message, options={}, &block)
  message = message.merge("replyTo" => @receiver.droonga_name)
  send(message, options) do |error|
    yield(error)
  end

  sync = block.nil?
  if sync
    response = nil
    block = lambda do |_response|
      response = _response
    end
  end
  id = message["id"]
  @receiver.register(id) do |response|
    @receiver.unregister(id)
    block.call(response)
  end
  request = Request.new(@receiver, id, @loop)
  if sync
    request.wait
    response
  else
    request
  end
end
send(message, options={}) { |error| ... } click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 299
def send(message, options={}, &block)
  @sender.send("#{@tag}.message", message) do
    host = @sender.peeraddr[3]
    port = @sender.peeraddr[1]
    detail = message
    error = ConnectionError.new(host, port, detail)
    yield(error) if block_given?
  end
end
subscribe(message, options={}) { |error| ... } click to toggle source
# File lib/droonga/client/connection/droonga-protocol/coolio.rb, line 259
def subscribe(message, options={}, &block)
  message = message.merge("replyTo" => @receiver.droonga_name,
                          "from" => @receiver.droonga_name)
  send(message, options) do |error|
    yield(error)
  end

  id = message["id"]
  request_options = {
    :subscription_timeout => options[:subscription_timeout],
  }
  @receiver.max_messages = options[:max_messages]
  request = InfiniteRequest.new(@loop, request_options)
  request.on_timeout = lambda do
    @receiver.unregister(id)
  end
  sync = block.nil?
  if sync
    yielder = nil
    buffer = []
    @receiver.register(id) do |response|
      if yielder
        while (old_response = buffer.shift)
          yielder << old_response
        end
        yielder << response
      else
        buffer << response
      end
    end
    Enumerator.new do |_yielder|
      yielder = _yielder
      request.wait
    end
  else
    @receiver.register(id, &block)
    request
  end
end