class Droonga::Client::Connection::DroongaProtocol::Thread
Constants
- DEFAULT_TIMEOUT_SECONDS
Attributes
on_error[W]
Public Class Methods
new(host, port, tag, options={})
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 49 def initialize(host, port, tag, options={}) @host = host @port = port @tag = tag default_options = { :timeout => 1, } @options = default_options.merge(options) @logger = Fluent::Logger::FluentLogger.new(@tag, @options) @timeout = @options[:timeout] end
Public Instance Methods
close()
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 152 def close @logger.close end
request(message, options={}, &block)
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 61 def request(message, options={}, &block) receiver = create_receiver receiver.on_error = lambda do |error| on_error(ReceiverError.new(error)) end message = message.dup message["replyTo"] = "#{receiver.host}:#{receiver.port}/droonga" send(message, options) sync = block.nil? if sync responses = [] receive(receiver, options) do |response| responses << response end if responses.size > 1 responses else responses.first end else thread = ::Thread.new do receive(receiver, options, &block) end Request.new(thread) end end
send(message, options={}, &block)
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 148 def send(message, options={}, &block) @logger.post("message", message) end
subscribe(message, options={}, &block)
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 89 def subscribe(message, options={}, &block) receiver = create_receiver receive_end_point = "#{receiver.host}:#{receiver.port}/droonga" message = message.dup message["replyTo"] = receive_end_point message["from"] = receive_end_point send(message, options) subscription_timeout = options[:subscription_timeout] max_messages = options[:max_messages] start = Time.now receive_options = { :timeout => options[:timeout] || DEFAULT_TIMEOUT_SECONDS, } n_messages = 0 sync = block.nil? if sync Enumerator.new do |yielder| loop do receiver.receive(receive_options) do |object| yielder << object n_messages += 1 end if max_messages and n_messages >= max_messages break end if subscription_timeout elapsed_seconds = Time.now - start break if elapsed_seconds >= subscription_timeout end end receiver.close end else thread = ::Thread.new do begin loop do receiver.receive(receive_options) do |message| block.call(message) n_messages += 1 end if max_messages and n_messages >= max_messages break end if subscription_timeout elapsed_seconds = Time.now - start break if elapsed_seconds >= subscription_timeout end end ensure receiver.close end end Request.new(thread) end end
Private Instance Methods
create_receiver()
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 157 def create_receiver Receiver.new(:host => @options[:receiver_host], :port => @options[:receiver_port]) end
on_error(error)
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 177 def on_error(error) @on_error.call(error) if @on_error end
receive(receiver, options) { |response| ... }
click to toggle source
# File lib/droonga/client/connection/droonga-protocol/thread.rb, line 162 def receive(receiver, options) timeout = options[:timeout] || @timeout receive_options = { :timeout => timeout, } begin receiver.receive(receive_options) do |response| yield(response) end ensure receiver.close end end