class Droonga::Client::Connection::DroongaProtocol::Thread

Constants

DEFAULT_TIMEOUT_SECONDS

Attributes

on_error[W]

Public Class Methods

new(host, port, tag, options={}) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 49
def initialize(host, port, tag, options={})
  @host = host
  @port = port
  @tag = tag
  default_options = {
    :timeout => 1,
  }
  @options = default_options.merge(options)
  @logger = Fluent::Logger::FluentLogger.new(@tag, @options)
  @timeout = @options[:timeout]
end

Public Instance Methods

close() click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 152
def close
  @logger.close
end
request(message, options={}, &block) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 61
def request(message, options={}, &block)
  receiver = create_receiver
  receiver.on_error = lambda do |error|
    on_error(ReceiverError.new(error))
  end
  message = message.dup
  message["replyTo"] = "#{receiver.host}:#{receiver.port}/droonga"
  send(message, options)

  sync = block.nil?
  if sync
    responses = []
    receive(receiver, options) do |response|
      responses << response
    end
    if responses.size > 1
      responses
    else
      responses.first
    end
  else
    thread = ::Thread.new do
      receive(receiver, options, &block)
    end
    Request.new(thread)
  end
end
send(message, options={}, &block) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 148
def send(message, options={}, &block)
  @logger.post("message", message)
end
subscribe(message, options={}, &block) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 89
def subscribe(message, options={}, &block)
  receiver = create_receiver
  receive_end_point = "#{receiver.host}:#{receiver.port}/droonga"
  message = message.dup
  message["replyTo"] = receive_end_point
  message["from"] = receive_end_point
  send(message, options)

  subscription_timeout = options[:subscription_timeout]
  max_messages = options[:max_messages]
  start = Time.now
  receive_options = {
    :timeout => options[:timeout] || DEFAULT_TIMEOUT_SECONDS,
  }
  n_messages = 0
  sync = block.nil?
  if sync
    Enumerator.new do |yielder|
      loop do
        receiver.receive(receive_options) do |object|
          yielder << object
          n_messages += 1
        end
        if max_messages and
             n_messages >= max_messages
          break
        end
        if subscription_timeout
          elapsed_seconds = Time.now - start
          break if elapsed_seconds >= subscription_timeout
        end
      end
      receiver.close
    end
  else
    thread = ::Thread.new do
      begin
        loop do
          receiver.receive(receive_options) do |message|
            block.call(message)
            n_messages += 1
          end
          if max_messages and
               n_messages >= max_messages
            break
          end
          if subscription_timeout
            elapsed_seconds = Time.now - start
            break if elapsed_seconds >= subscription_timeout
          end
        end
      ensure
        receiver.close
      end
    end
    Request.new(thread)
  end
end

Private Instance Methods

create_receiver() click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 157
def create_receiver
  Receiver.new(:host => @options[:receiver_host],
               :port => @options[:receiver_port])
end
on_error(error) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 177
def on_error(error)
  @on_error.call(error) if @on_error
end
receive(receiver, options) { |response| ... } click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 162
def receive(receiver, options)
  timeout = options[:timeout] || @timeout

  receive_options = {
    :timeout => timeout,
  }
  begin
    receiver.receive(receive_options) do |response|
      yield(response)
    end
  ensure
    receiver.close
  end
end