class QRPC::Server
Queue RPC server.
Queue RPC server.
Constants
- QRPC_POSTFIX_INPUT
Input queue postfix. @deprecated (since 0.2.0)
- QRPC_POSTFIX_OUTPUT
Output queue postfix. @deprecated (since 0.2.0)
- QRPC_PREFIX
Prefix for handled queues. @deprecated (since 0.2.0)
Public Class Methods
Finalizer handler. @param [Integer] id id of finalized instance
# File lib/qrpc/server.rb, line 143 def self.finalize(id) if @@servers.has_key? id @@servers[id].finalize! end end
Constructor.
@param [Object] api some object which will be used as RPC API @param [Symbol] synchronicity API methods synchronicity @param [QRPC::Protocol::Abstract] protocol a protocol handling instance
# File lib/qrpc/server.rb, line 127 def initialize(api, synchronicity = :synchronous, protocol = QRPC::default_protocol) @api = api @protocol = protocol @synchronicity = synchronicity @output_name_cache = { } # Destructor ObjectSpace.define_finalizer(self, self.class.method(:finalize).to_proc) @@servers[self.object_id] = self end
Public Instance Methods
Destructor.
# File lib/qrpc/server.rb, line 153 def finalize! if not @input_queue.nil? @input_queue.subscribe("default") do @input_queue.unsubscribe(@input_name.to_s) do @input_queue.close! end end end if not @output_queue.nil? @output_queue.use("default") do @output_queue.close end end end
Returns input name.
@return [Symbol] input name @since 0.1.1
# File lib/qrpc/server.rb, line 270 def input_name if @input_name.nil? @input_name = (QRPC::QUEUE_PREFIX + "-" + @locator.queue_name + "-" + QRPC::QUEUE_POSTFIX_INPUT).to_sym end return @input_name end
Returns input queue. @param [Proc] block block to which will be input queue given
# File lib/qrpc/server.rb, line 214 def input_queue(&block) if @input_queue.nil? @input_queue = @locator.input_queue @input_queue.subscribe(self.input_name.to_s) do @input_queue.unsubscribe("default") do yield @input_queue end end else @input_queue.subscribe(self.input_name.to_s) do yield @input_queue end end end
Listens to the queue. (Blocking call which starts eventmachine.)
@param [QRPC::Locator] locator of the input queue @param [Hash] opts options for the server
# File lib/qrpc/server.rb, line 178 def listen!(locator, opts = { }) EM.run do self.start_listening(locator, opts) end end
Returns output name for client name.
@param [String, Symbol] client client identifier @return [Symbol] output name
# File lib/qrpc/server.rb, line 249 def output_name(client) client_index = client.to_sym if not @output_name_cache.include? client_index output_name = QRPC::QUEUE_PREFIX + "-" + client.to_s + "-" + QRPC::QUEUE_POSTFIX_OUTPUT output_name = output_name.to_sym @output_name_cache[client_index] = output_name else output_name = @output_name_cache[client_index] end return output_name end
Returns output queue. @param [Proc] block block to which will be output queue given
# File lib/qrpc/server.rb, line 234 def output_queue(&block) if @output_queue.nil? @output_queue = @locator.output_queue else @output_queue end end
Starts listening to the queue. (Blocking queue which expect, eventmachine is started.)
@param [QRPC::Locator] locator of the input queue @param [Hash] opts options for the server
# File lib/qrpc/server.rb, line 192 def start_listening(locator, opts = { }) @locator = locator @dispatcher = QRPC::Server::Dispatcher::new # Cache cleaning dispatcher EM::add_periodic_timer(20) do @output_name_cache.clear end # Process input queue self.input_queue do |queue| queue.pop(true) do |job| self.process_job(job) end end end
Protected Instance Methods
Process one job.
# File lib/qrpc/server.rb, line 286 def process_job(job) our_job = QRPC::Server::Job::new(@api, @synchronicity, job, @protocol) if not our_job.request.notification? our_job.callback do |result| output_name = self.output_name(our_job.client) output_queue = self.output_queue output_queue.use(output_name.to_s) do output_queue.push(result, our_job.priority) end end end @dispatcher.put(our_job) end