class Elrpc::RPCService

Attributes

logger[RW]
socket_state[R]

Public Class Methods

new(name, socket, methods = nil) click to toggle source
# File lib/elrpc.rb, line 291
def initialize(name, socket, methods = nil)
      @logger = Logger.new(STDOUT)
      @logger.level = Elrpc::default_log_level
  @logger.datetime_format = Elrpc.get_logger_format(name)

  @methods = Hash.new # name -> Method
  @session = Hash.new # uid -> proc
  @session_lock = Monitor.new

      @sending_queue = Queue.new # CallMessage

  @socket = socket
  @socket_state_lock = Monitor.new
      @socket_state = :socket_opened

  @wait_lock = nil
  @wait_cv = nil
  @close_hooks = []

  if methods then
    methods.each do |m|
      register_method(m)
    end
  end

  @sender_thread = Thread.start { sender_loop }
  @receiver_thread = Thread.start { receiver_loop }
  @worker_pool = WorkerPool.new(1, @logger)

  @logger.debug ":ready for I/O stream."
end

Public Instance Methods

_raise_connection_error(block) click to toggle source
# File lib/elrpc.rb, line 349
def _raise_connection_error(block)
  job = lambda do
    block.call(EPCStackError.new("ConnectionClosed","Connection closed",""))
  end
  @worker_pool.invoke(job)
end
add_close_hook(&block) click to toggle source
# File lib/elrpc.rb, line 438
def add_close_hook(&block)
  @close_hooks << block
end
alive?() click to toggle source
# File lib/elrpc.rb, line 323
def alive?
  return @socket_state == :socket_opened
end
call_method(name, *args) click to toggle source

相手のメソッドを呼ぶ(同期版)

# File lib/elrpc.rb, line 357
def call_method(name, *args)
  mutex = Mutex.new
  cv = ConditionVariable.new
  ret = nil
  ex = nil
  call_method_async(name, *args) do |err, value|
    mutex.synchronize do
      ex = err
      ret = value
      cv.signal
    end
  end
  mutex.synchronize do
    cv.wait(mutex)
  end
  if !ex.nil?
    raise ex
  end
  return ret
end
call_method_async(name, *args, &block) click to toggle source

相手のメソッドを呼ぶ block(err, value)

# File lib/elrpc.rb, line 339
def call_method_async(name, *args, &block)
  return _raise_connection_error(block) if @socket_state != :socket_opened
  uid = Elrpc.gen_uid
  msg = CallMessage.new(uid, name, args, block)
  # ここは競合しないのでロックしない
  @session[uid] = msg
  @sending_queue.push(msg)
  uid
end
def_method(name, argdoc=nil, docstring=nil, &block) click to toggle source

register_method の簡易版

# File lib/elrpc.rb, line 333
def def_method(name, argdoc=nil, docstring=nil, &block)
  register_method(Method.new(name, argdoc, docstring, &block))
end
query_methods() click to toggle source

接続相手のメソッド一覧を返す(同期版)

[name, argdoc, docstring], …
# File lib/elrpc.rb, line 391
def query_methods
  mutex = Mutex.new
  cv = ConditionVariable.new
  ret = nil
  ex = nil
  query_methods_async do |err, value|
    mutex.synchronize do
      ex = err
      ret = value
      cv.signal
    end
  end
  mutex.synchronize do
    cv.wait(mutex)
  end
  if !ex.nil?
    raise ex
  end
  return ret
end
query_methods_async(&block) click to toggle source

接続相手のメソッド一覧を返す

[name, argdoc, docstring], …
# File lib/elrpc.rb, line 380
def query_methods_async(&block)
  return _raise_connection_error(block) if @socket_state != :socket_opened
  uid = Elrpc.gen_uid
  msg = MethodsMessage.new(uid, block)
  @session[uid] = msg
  @sending_queue.push(msg)
  uid
end
register_method(method) click to toggle source

自分にメソッドを登録する

# File lib/elrpc.rb, line 328
def register_method(method)
  @methods[method.name] = method
end
stop() click to toggle source
# File lib/elrpc.rb, line 412
def stop
  if @socket_state == :socket_opened then
    @logger.debug "RPCService.stop: received!"
    @worker_pool.kill
    @socket_state = :socket_closing
    @socket.close
    @sending_queue << nil # stop message
    @sender_thread.join(4) unless Thread.current == @sender_thread
    @receiver_thread.join(4) unless Thread.current == @receiver_thread
    _clear_waiting_sessions
    @socket_state = :socket_not_connected
  end
  _wakeup
  @logger.debug "RPCService.stop: completed"
end
wait() click to toggle source

ソケットが相手から切断されるまでメインスレッドを止める

# File lib/elrpc.rb, line 429
def wait
  @wait_lock = Mutex.new
  @wait_cv = ConditionVariable.new
  @wait_lock.synchronize do
    @wait_cv.wait(@wait_lock)
  end
  stop
end

Private Instance Methods

_call(ast) click to toggle source

相手からメソッドを呼ばれた

# File lib/elrpc.rb, line 611
def _call(ast)
  _, uid, name, args = ast
  @logger.debug ": called: Enter: #{name} : #{uid}"
  method = @methods[name.to_sym]
  if method then
    task = -> do
      msg = nil
      begin
        ret = method.call(args)
        msg = ReturnMessage.new(uid, ret)
      rescue => e
        @logger.debug ": called: Error!: #{name} : #{uid} : #{e}"
        @logger.debug e
        msg = ErrorMessage.new(uid, [e.class.name, e.message, e.backtrace.join("\n")])
      end
      @sending_queue.push(msg)
    end
    @worker_pool.invoke(task)
  else
    # method not found
    @logger.debug ": called: Method not found: #{name} : #{uid} "
    @sending_queue.push(EPCErrorMessage.new(uid, "Not found the name: #{name}"))
  end # if
  @logger.debug ": called: Leave: #{name} : #{uid}"
end
_clear_waiting_sessions() click to toggle source

RPC呼び出しで待ってるスレッドを全てエラーにして終了させる

# File lib/elrpc.rb, line 446
def _clear_waiting_sessions
  @session_lock.synchronize do
    @session.keys.each do |uid|
      _session_return(uid, EPCStackError.new("ConnectionClosed","EPC Connection closed",""), nil)
    end
  end
end
_epc_error(ast) click to toggle source

相手からEPCエラーが返ってきた

# File lib/elrpc.rb, line 663
def _epc_error(ast)
  _, uid, error = ast
  @logger.debug ": epc-error: Start: #{uid} : error = #{error}"
  if @session[uid] then
    # error : [classname, message, backtrace]
    _session_return(uid, EPCStackError.new(error[0], error[1], error[2]), nil)
  else
    @logger.error "Not found a session for #{uid}"
  end
  @logger.debug ": epc-error: End: #{uid}"
end
_query_methods(ast) click to toggle source

相手から一覧要求があった

# File lib/elrpc.rb, line 676
def _query_methods(ast)
  _, uid = ast
  @logger.debug ": query-methods: Start: #{uid}"
  begin
    list = @methods.map do |k,m|
      [m.name, m.argdoc, m.docstring]
    end
    msg = ReturnMessage.new(uid, list)
    @sending_queue.push(msg)
  rescue => e
    @logger.warn ": query-method: Exception #{e.message}"
    @logger.warn e.backtrace.join("\n")
    msg = ErrorMessage.new(uid, [e.class.name, e.message, e.backtrace.join("\n")])
    @sending_queue.push(msg)
  end
  @logger.debug ": query-methods: End: #{uid}"
end
_return(ast) click to toggle source

相手から返り値が返ってきた

# File lib/elrpc.rb, line 638
def _return(ast)
  _, uid, value = ast
  @logger.debug ": return: Start: #{uid} : value = #{value}"
  if @session[uid] then
    _session_return(uid, nil, value)
  else
    @logger.error "Not found a session for #{uid}"
  end
  @logger.debug ": return: End: #{uid}"
end
_return_error(ast) click to toggle source

相手からアプリケーションエラーが返ってきた

# File lib/elrpc.rb, line 650
def _return_error(ast)
  _, uid, error = ast
  @logger.debug ": return-error: Start: #{uid} : error = #{error}"
  if @session[uid] then
    # error : [classname, message, backtrace]
    _session_return(uid, EPCRuntimeError.new(error[0], error[1], error[2]), nil)
  else
    @logger.error "Not found a session for #{uid}"
  end
  @logger.debug ": return-error: End: #{uid}"
end
_send_message(msg) click to toggle source

相手にシリアライズされたデータを送る

# File lib/elrpc.rb, line 470
def _send_message(msg)
  msg = msg.encode("UTF-8") + "\n"
  len = msg.bytesize
  body = sprintf("%06x%s",len,msg)
  @socket.write(body)
  @socket.flush
end
_session_return(uid, error, value) click to toggle source

呼び出し元に値を返して、セッションをクリアする

# File lib/elrpc.rb, line 479
def _session_return(uid, error, value)
  m = nil
  @session_lock.synchronize do
    m = @session[uid]
    @session.delete(uid)
  end
  if m then
    m.block.call(error, value)
  end
end
_socket_state_lock() { || ... } click to toggle source
# File lib/elrpc.rb, line 454
def _socket_state_lock
  @socket_state_lock.synchronize do
    yield
  end
end
_wakeup() click to toggle source

もし、メインスレッドが停止していれば再開させて終了させる

# File lib/elrpc.rb, line 461
def _wakeup
  if @wait_lock
    @wait_lock.synchronize do
      @wait_cv.signal
    end
  end
end
receiver_loop() click to toggle source
# File lib/elrpc.rb, line 530
def receiver_loop
  parser = Elparser::Parser.new
      loop do
    ast = nil # for error message and recovery
    uid = nil
            begin
              lenstr = @socket.read(6)
      if lenstr.nil? then
        @logger.debug "[rcvloop] Socket closed!"
        @socket_state = :socket_not_connected
        break
      end
      len = lenstr.to_i(16)
              @logger.debug "Receiving a message : len=#{len}"
      body = @socket.read(len) # 1 means LF
      if body.nil? then
        @logger.debug "[rcvloop] Socket closed!"
        @socket_state = :socket_not_connected
        break
      end
      body.force_encoding("utf-8")
      @logger.debug "Parse : #{body}/#{body.encoding}"
      ast = parser.parse(body)
      raise "Unexpected multiple s-expression : #{body}" if ast.size != 1
      ast = ast[0].to_ruby
      uid = ast[1]
              case ast[0]
              when :call
                    @logger.debug "  received: CALL : #{uid}"
        _call(ast)
              when :return
                    @logger.debug "  received: RETURN: #{uid}"
        _return(ast)
              when :'return-error'
                    @logger.debug "  received: ERROR: #{uid}"
        _return_error(ast)
              when :'epc-error'
                    @logger.debug "  received: EPC_ERROR: #{uid}"
        _epc_error(ast)
              when :'methods'
                    @logger.debug "  received: METHODS: #{uid}"
        _query_methods(ast)
              else
                    @logger.debug "  Unknown message code. try to reset the connection. >> #{body}"
        @socket_state = :socket_closing
        @sending_queue.push nil # wakeup sender thread
                    return
              end # case
      if @socket_state == :socket_closing then
        @logger.debug "[receiver-thread] terminating..."
        break
      end
            rescue Exception => evar
      @logger.debug "[rcvloop] Exception! #{evar}"
              mes = evar.message
              if uid && @session[uid] then
        _session_return(uid, evar, nil)
              end
              if mes["close"] || mes["reset"] then
                    @logger.debug "  [rcvloop] disconnected by the peer."
                    break
              elsif evar.kind_of?(IOError) then
                    @logger.debug "  [rcvloop] IOError."
        @socket_state = :socket_closing
                    break
              else
                    @logger.warn "  [rcvloop] going to recover the communication."
                    bt = evar.backtrace.join("\n")
                    @logger.warn "  [rcvloop] #{bt}"
              end
            end # begin rescue
    ast = nil
    uid = nil
      end # loop
  @logger.debug "[receiver-thread] loop exit : #{@socket_state}"
  _clear_waiting_sessions
  _wakeup
  @logger.debug "[receiver-thread exit]--------------"
end
sender_loop() click to toggle source
# File lib/elrpc.rb, line 490
def sender_loop
      loop do
            begin
      entry = @sending_queue.shift
      if entry.nil? then
        @logger.debug "Queue.shift received stop message."
        break
      end
      @logger.debug "Queue.shift [#{@sending_queue.size}] : #{entry.uid}"
      body = Elparser.encode( entry.to_ast )
      @logger.debug "Encode : #{body}"
      _send_message( body )
      @logger.debug "  Queue -> sent #{entry.uid}"
            rescue Elparser::EncodingError => evar
              @logger.warn "[sendloop] #{evar.to_s}  "
      err = EPCStackError.new(evar.class.name, evar.message, evar.backtrace)
              _session_return(entry.uid, err, nil) if entry
            rescue => evar
              mes = evar.message
              @logger.warn "[sendloop] #{evar.to_s}  "
              if mes["abort"] then
                    @logger.warn "  [sendloop] disconnected by the peer."
        @socket_state = :socket_not_connected
              elsif evar.class == IOError then
                    @logger.warn evar.backtrace.join("\n")
        @socket_state = :socket_closing
              end
              _session_return(entry.uid, evar, nil) if entry
            end # begin
    if @socket_state == :socket_closing || 
        @socket_state == :socket_not_connected then
      @logger.debug "[sender-thread] terminating..."
      break
    end
      end # loop
  @logger.debug "[sender-thread] loop exit : #{@socket_state}"
  _wakeup
  @logger.debug "[sender-thread] exit--------------"
end