class QRPC::Server

Queue RPC server.

Queue RPC server.

Constants

QRPC_POSTFIX_INPUT

Input queue postfix. @deprecated (since 0.2.0)

QRPC_POSTFIX_OUTPUT

Output queue postfix. @deprecated (since 0.2.0)

QRPC_PREFIX

Prefix for handled queues. @deprecated (since 0.2.0)

Public Class Methods

finalize(id) click to toggle source

Finalizer handler. @param [Integer] id id of finalized instance

# File lib/qrpc/server.rb, line 143
def self.finalize(id)
    if @@servers.has_key? id
        @@servers[id].finalize!
    end
end
new(api, synchronicity = :synchronous, protocol = QRPC::default_protocol) click to toggle source

Constructor.

@param [Object] api some object which will be used as RPC API @param [Symbol] synchronicity API methods synchronicity @param [QRPC::Protocol::Abstract] protocol a protocol handling instance

# File lib/qrpc/server.rb, line 127
def initialize(api, synchronicity = :synchronous, protocol = QRPC::default_protocol)
    @api = api
    @protocol = protocol
    @synchronicity = synchronicity
    @output_name_cache = { }
    
    # Destructor
    ObjectSpace.define_finalizer(self, self.class.method(:finalize).to_proc)
    @@servers[self.object_id] = self
end

Public Instance Methods

finalize!() click to toggle source

Destructor.

# File lib/qrpc/server.rb, line 153
def finalize!
    if not @input_queue.nil?
        @input_queue.subscribe("default") do
            @input_queue.unsubscribe(@input_name.to_s) do
                @input_queue.close!
            end
        end
    end
    
    if not @output_queue.nil?
        @output_queue.use("default") do
            @output_queue.close
        end
    end
end
input_name() click to toggle source

Returns input name.

@return [Symbol] input name @since 0.1.1

# File lib/qrpc/server.rb, line 270
def input_name
    if @input_name.nil?
        @input_name = (QRPC::QUEUE_PREFIX + "-" + @locator.queue_name + "-" + QRPC::QUEUE_POSTFIX_INPUT).to_sym
    end
    
    return @input_name
end
input_queue() { |input_queue| ... } click to toggle source

Returns input queue. @param [Proc] block block to which will be input queue given

# File lib/qrpc/server.rb, line 214
def input_queue(&block)
    if @input_queue.nil?
        @input_queue = @locator.input_queue 
        @input_queue.subscribe(self.input_name.to_s) do
            @input_queue.unsubscribe("default") do
                yield @input_queue 
            end
        end
    else
        @input_queue.subscribe(self.input_name.to_s) do
            yield @input_queue
        end
    end
end
listen!(locator, opts = { }) click to toggle source

Listens to the queue. (Blocking call which starts eventmachine.)

@param [QRPC::Locator] locator of the input queue @param [Hash] opts options for the server

# File lib/qrpc/server.rb, line 178
def listen!(locator, opts = { })
    EM.run do
        self.start_listening(locator, opts)
    end
end
output_name(client) click to toggle source

Returns output name for client name.

@param [String, Symbol] client client identifier @return [Symbol] output name

# File lib/qrpc/server.rb, line 249
def output_name(client)
    client_index = client.to_sym
    
    if not @output_name_cache.include? client_index
       output_name = QRPC::QUEUE_PREFIX + "-" + client.to_s + "-" + QRPC::QUEUE_POSTFIX_OUTPUT
       output_name = output_name.to_sym
       @output_name_cache[client_index] = output_name
    else
        output_name = @output_name_cache[client_index]
    end
       
    return output_name
end
output_queue(&block) click to toggle source

Returns output queue. @param [Proc] block block to which will be output queue given

# File lib/qrpc/server.rb, line 234
def output_queue(&block)
    if @output_queue.nil?
        @output_queue = @locator.output_queue
    else
        @output_queue
    end
end
start_listening(locator, opts = { }) click to toggle source

Starts listening to the queue. (Blocking queue which expect, eventmachine is started.)

@param [QRPC::Locator] locator of the input queue @param [Hash] opts options for the server

# File lib/qrpc/server.rb, line 192
def start_listening(locator, opts = { })
    @locator = locator
    @dispatcher = QRPC::Server::Dispatcher::new

    # Cache cleaning dispatcher
    EM::add_periodic_timer(20) do
        @output_name_cache.clear
    end

    # Process input queue
    self.input_queue do |queue|
        queue.pop(true) do |job|
            self.process_job(job)
        end
    end
end

Protected Instance Methods

process_job(job) click to toggle source

Process one job.

# File lib/qrpc/server.rb, line 286
def process_job(job)
    our_job = QRPC::Server::Job::new(@api, @synchronicity, job, @protocol)
    if not our_job.request.notification?
        our_job.callback do |result|
            output_name = self.output_name(our_job.client)
            output_queue = self.output_queue
            output_queue.use(output_name.to_s) do
                output_queue.push(result, our_job.priority)
            end
        end
    end
    
    @dispatcher.put(our_job)
end