class QRPC::Client::Dispatcher

Queue RPC client dispaxtcher (worker). @since 0.3.0

Public Class Methods

finalize(id) click to toggle source

Finalizer handler. @param [Integer] id id of finalized instance

# File lib/qrpc/client/dispatcher.rb, line 117
def self.finalize(id)
    if @@clients.has_key? id
        @@clients[id].finalize!
    end
end
new(locator, generator = QRPC::default_generator, protocol = QRPC::default_protocol) click to toggle source

Constructor.

@param [QRPC::Locator] locator of the output queue @param [QRPC::Generator] ID generator @param [QRPC::Protocol::Abstract] protocol protocol of the session

# File lib/qrpc/client/dispatcher.rb, line 100
def initialize(locator, generator = QRPC::default_generator, protocol = QRPC::default_protocol)
    @protocol = protocol
    @locator = locator
    @generator = generator
    @pooling = false
    @jobs = { }

    # Destructor
    ObjectSpace.define_finalizer(self, self.class.method(:finalize).to_proc)
    @@clients[self.object_id] = self
end

Public Instance Methods

create_job(name, args, priority = QRPC::DEFAULT_PRIORITY, &block) click to toggle source

Creates job associated to this client session.

@param [Symbol] name name of the method of the job @param [Array] args arguments of the method call @param [Integer] priority job priority @param [Proc] block result returning callback @return [QRPC::Client::Job] new job

# File lib/qrpc/client/dispatcher.rb, line 153
def create_job(name, args, priority = QRPC::DEFAULT_PRIORITY, &block)
    Client::Job::new(self.id, name, args, priority, @generator, @protocol, &block)
end
finalize!() click to toggle source

Destructor.

# File lib/qrpc/client/dispatcher.rb, line 127
def finalize!
    if not @input_queue.nil?
        @input_queue.subscribe("default") do
            @input_queue.unsubscribe(@input_name.to_s) do
                @input_queue.close!
            end
        end
    end
    
    if not @output_queue.nil?
        @output_queue.use("default") do
            @output_queue.close!
        end
    end
end
id() click to toggle source

Returns client (or maybe session is better) ID. @return [Symbol] client (session) ID

# File lib/qrpc/client/dispatcher.rb, line 279
def id
    if @id.nil?
        @id = @generator.generate(self)
    else
        @id
    end
end
input_name() click to toggle source

Returns input name. @return [Symbol] input queue name

# File lib/qrpc/client/dispatcher.rb, line 215
def input_name
    if @input_name.nil?
        @input_name = (QRPC::QUEUE_PREFIX + "-" + self.id.to_s + "-" + QRPC::QUEUE_POSTFIX_OUTPUT).to_sym
    else
        @input_name
    end
end
input_queue() { |input_queue| ... } click to toggle source

Returns input queue. @param [Proc] block block to which will be input queue given

# File lib/qrpc/client/dispatcher.rb, line 228
def input_queue(&block)
    if @input_queue.nil?
        @input_queue = @locator.input_queue
        @input_queue.subscribe(self.input_name.to_s) do
            @input_queue.unsubscribe("default") do
                yield @input_queue 
            end
        end
    else
        @input_queue.subscribe(self.input_name.to_s) do
            yield @input_queue
        end
    end
end
output_name() click to toggle source

Returns output name. @return [Symbol] output queue name

# File lib/qrpc/client/dispatcher.rb, line 248
def output_name
    if @output_name.nil?
        @output_name = (QRPC::QUEUE_PREFIX + "-" + @locator.queue_name + "-" + QRPC::QUEUE_POSTFIX_INPUT).to_sym
    else
        @output_name
    end
end
output_queue() { |output_queue| ... } click to toggle source

Returns output queue. @param [Proc] block block to which will be output queue given

# File lib/qrpc/client/dispatcher.rb, line 261
def output_queue(&block)
    if @output_queue.nil?
        @output_queue = @locator.output_queue
        @output_queue.use(self.output_name.to_s) do
            yield @output_queue
        end
    else
        @output_queue.use(self.output_name.to_s) do
            yield @output_queue
        end
    end
end
pool!() click to toggle source

Starts input (results) pooling.

# File lib/qrpc/client/dispatcher.rb, line 182
def pool!
    
    # Results processing logic
    processor = Proc::new do |job|
        response = @protocol.response::parse(job)
        
        if not response.id.nil?
            id = response.id
            id = id.to_sym if not id.kind_of? Integer
            
            if @jobs.include? id
                @jobs[id].assign_result(response)
                @jobs.delete(id)
            end
        end
    end
    
    # Runs processor for each job (expects recurring #pop)
    self.input_queue do |queue|
        queue.pop(true, &processor)
    end
    
    ##
    
    @pooling = true
    
end
put(job) click to toggle source

Puts job to client. @param [QRPC::Client::Job] job job for put to output queue

# File lib/qrpc/client/dispatcher.rb, line 161
def put(job)
    if not job.notification?
        id = job.id
        id = id.to_sym if not id.kind_of? Integer
        
        @jobs[id] = job
    end
    
    if (not @pooling) and (@jobs.length > 0)
        self.pool!
    end
    
    self.output_queue do |queue|
        queue.push(job.serialize)
    end
end