class OskieRpc::Processor

Public Class Methods

new(&block) click to toggle source
# File lib/oskie_rpc/processor.rb, line 3
def initialize(&block)
  @lock = Monitor.new
  @input_chain = create_input_chain
  @output_chain = create_output_chain
  @callbacks = {}
  @requests = SortedSet.new
  @state = :initializing
  block.call(self) if block
  raise MissingCallbackError, :message unless @callbacks[:message]
  raise MissingCallbackError, :request unless @callbacks[:request]
  raise MissingCallbackError, :output unless @callbacks[:output]
  @state = :initialized
end

Public Instance Methods

<<(bytes) click to toggle source
# File lib/oskie_rpc/processor.rb, line 17
def <<(bytes)
  @lock.synchronize do
    @input_chain << bytes
  end

  nil
end
deliver(delivery) click to toggle source
# File lib/oskie_rpc/processor.rb, line 34
def deliver(delivery)
  @lock.synchronize do
    raise InvalidStateError unless @state == :initialized

    if delivery.is_a?(Request)
      delivery.__request_sent
      @requests << delivery
    end

    @output_chain << delivery.dump
  end

  nil
end
heartbeat() click to toggle source
# File lib/oskie_rpc/processor.rb, line 49
def heartbeat
  @lock.synchronize do
    @requests.each do |request|
      if request.timed_out?
        @requests.delete(request)
        request.__response_failed
      else
        return
      end
    end
  end

  nil
end
on(name, &block) click to toggle source
# File lib/oskie_rpc/processor.rb, line 25
def on(name, &block)
  @lock.synchronize do
    raise InvalidStateError unless @state == :initializing
    @callbacks[name.to_sym] = block
  end

  nil
end

Private Instance Methods

create_input_chain() click to toggle source
# File lib/oskie_rpc/processor.rb, line 66
def create_input_chain
  FilterChain::Chain.new do |chain|
    chain.add(FilterChain::DemultiplexFilter.new)
    chain.add(FilterChain::DeserializeFilter.new(:format => :json))
    chain.add(FilterChain::ProcFilter.new { |payload| payload_handler(payload) })
    chain.add(FilterChain::Terminator.new { |delivery| delivery_handler(delivery) })
  end
end
create_output_chain() click to toggle source
# File lib/oskie_rpc/processor.rb, line 75
def create_output_chain
  FilterChain::Chain.new do |chain|
    chain.add(FilterChain::SerializeFilter.new(:format => :json))
    chain.add(FilterChain::MultiplexFilter.new)
    chain.add(FilterChain::Terminator.new { |bytes| output_handler(bytes) })
  end
end
delivery_handler(delivery) click to toggle source
# File lib/oskie_rpc/processor.rb, line 95
def delivery_handler(delivery)
  case delivery
  when Message then message_handler(delivery)
  when Request then request_handler(delivery)
  when Response then response_handler(delivery)
  else
    raise UnknownDeliveryClassError, message.class.name
  end
end
message_handler(message) click to toggle source
# File lib/oskie_rpc/processor.rb, line 105
def message_handler(message)
  @callbacks[:message].call(message)
end
output_handler(bytes) click to toggle source
# File lib/oskie_rpc/processor.rb, line 122
def output_handler(bytes)
  @callbacks[:output].call(bytes)
end
payload_handler(payload) click to toggle source
# File lib/oskie_rpc/processor.rb, line 83
def payload_handler(payload)
  message = case payload['type']
  when 'rpcMessage' then Message.new
  when 'rpcRequest' then Request.new
  when 'rpcResponse' then Response.new
  else
    raise UnknownPayloadTypeError, payload['type']
  end

  message.load(payload)
end
request_handler(request) click to toggle source
# File lib/oskie_rpc/processor.rb, line 109
def request_handler(request)
  request.__request_received(self)
  @callbacks[:request].call(request)
end
response_handler(response) click to toggle source
# File lib/oskie_rpc/processor.rb, line 114
def response_handler(response)
  request = @requests.find { |request| request.message_id == response.message_id }
  return unless request
  @requests.delete(request) 
  request.__response_received(response)
end