class Droonga::Client::Connection::DroongaProtocol::Thread::Receiver
Constants
- BUFFER_SIZE
Attributes
on_error[W]
Public Class Methods
new(options={})
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 184 def initialize(options={}) host = options[:host] || Socket.gethostname port = options[:port] || 0 @socket = TCPServer.new(host, port) @read_ios = [@socket] @client_handlers = {} end
Public Instance Methods
close()
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 192 def close @socket.close @client_handlers.each_key do |client| client.close end end
host()
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 199 def host @socket.addr[3] end
port()
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 203 def port @socket.addr[1] end
receive(options={}) { |object| ... }
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 208 def receive(options={}, &block) timeout = options[:timeout] catch do |tag| loop do start = Time.now readable_ios, = IO.select(@read_ios, nil, nil, timeout) break if readable_ios.nil? if timeout timeout -= (Time.now - start) timeout = 0 if timeout < 0 end readable_ios.each do |readable_io| on_readable(readable_io) do |object| begin yield(object) rescue LocalJumpError throw(tag) end end end end end end
Private Instance Methods
on_error(error)
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 265 def on_error(error) @on_error.call(error) if @on_error end
on_readable(io) { |droonga_message| ... }
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 233 def on_readable(io) case io when @socket client = @socket.accept @read_ios << client @client_handlers[client] = lambda do unpacker = MessagePack::Unpacker.new begin data = client.read_nonblock(BUFFER_SIZE) rescue EOFError client.close @read_ios.delete(client) @client_handlers.delete(client) else unpacker.feed_each(data) do |fluent_message| unless fluent_message on_error(NilMessage.new("thread / unpacker.feed_each")) end tag, time, droonga_message = fluent_message unless droonga_message on_error(NilMessage.new("thread / unpacker.feed_each", :fluent_message => fluent_message)) end yield(droonga_message) end end end else @client_handlers[io].call end end