class Droonga::Client::Connection::DroongaProtocol::Thread::Receiver

Constants

BUFFER_SIZE

Attributes

on_error[W]

Public Class Methods

new(options={}) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 184
def initialize(options={})
  host = options[:host] || Socket.gethostname
  port = options[:port] || 0
  @socket = TCPServer.new(host, port)
  @read_ios = [@socket]
  @client_handlers = {}
end

Public Instance Methods

close() click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 192
def close
  @socket.close
  @client_handlers.each_key do |client|
    client.close
  end
end
host() click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 199
def host
  @socket.addr[3]
end
port() click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 203
def port
  @socket.addr[1]
end
receive(options={}) { |object| ... } click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 208
def receive(options={}, &block)
  timeout = options[:timeout]
  catch do |tag|
    loop do
      start = Time.now
      readable_ios, = IO.select(@read_ios, nil, nil, timeout)
      break if readable_ios.nil?
      if timeout
        timeout -= (Time.now - start)
        timeout = 0 if timeout < 0
      end
      readable_ios.each do |readable_io|
        on_readable(readable_io) do |object|
          begin
            yield(object)
          rescue LocalJumpError
            throw(tag)
          end
        end
      end
    end
  end
end

Private Instance Methods

on_error(error) click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 265
def on_error(error)
  @on_error.call(error) if @on_error
end
on_readable(io) { |droonga_message| ... } click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 233
def on_readable(io)
  case io
  when @socket
    client = @socket.accept
    @read_ios << client
    @client_handlers[client] = lambda do
      unpacker = MessagePack::Unpacker.new
      begin
        data = client.read_nonblock(BUFFER_SIZE)
      rescue EOFError
        client.close
        @read_ios.delete(client)
        @client_handlers.delete(client)
      else
        unpacker.feed_each(data) do |fluent_message|
          unless fluent_message
            on_error(NilMessage.new("thread / unpacker.feed_each"))
          end
          tag, time, droonga_message = fluent_message
          unless droonga_message
            on_error(NilMessage.new("thread / unpacker.feed_each",
                                    :fluent_message => fluent_message))
          end
          yield(droonga_message)
        end
      end
    end
  else
    @client_handlers[io].call
  end
end