class Elrpc::RPCService
Attributes
logger[RW]
socket_state[R]
Public Class Methods
new(name, socket, methods = nil)
click to toggle source
# File lib/elrpc.rb, line 291 def initialize(name, socket, methods = nil) @logger = Logger.new(STDOUT) @logger.level = Elrpc::default_log_level @logger.datetime_format = Elrpc.get_logger_format(name) @methods = Hash.new # name -> Method @session = Hash.new # uid -> proc @session_lock = Monitor.new @sending_queue = Queue.new # CallMessage @socket = socket @socket_state_lock = Monitor.new @socket_state = :socket_opened @wait_lock = nil @wait_cv = nil @close_hooks = [] if methods then methods.each do |m| register_method(m) end end @sender_thread = Thread.start { sender_loop } @receiver_thread = Thread.start { receiver_loop } @worker_pool = WorkerPool.new(1, @logger) @logger.debug ":ready for I/O stream." end
Public Instance Methods
_raise_connection_error(block)
click to toggle source
# File lib/elrpc.rb, line 349 def _raise_connection_error(block) job = lambda do block.call(EPCStackError.new("ConnectionClosed","Connection closed","")) end @worker_pool.invoke(job) end
add_close_hook(&block)
click to toggle source
# File lib/elrpc.rb, line 438 def add_close_hook(&block) @close_hooks << block end
alive?()
click to toggle source
# File lib/elrpc.rb, line 323 def alive? return @socket_state == :socket_opened end
call_method(name, *args)
click to toggle source
相手のメソッドを呼ぶ(同期版)
# File lib/elrpc.rb, line 357 def call_method(name, *args) mutex = Mutex.new cv = ConditionVariable.new ret = nil ex = nil call_method_async(name, *args) do |err, value| mutex.synchronize do ex = err ret = value cv.signal end end mutex.synchronize do cv.wait(mutex) end if !ex.nil? raise ex end return ret end
call_method_async(name, *args, &block)
click to toggle source
相手のメソッドを呼ぶ block(err, value)
# File lib/elrpc.rb, line 339 def call_method_async(name, *args, &block) return _raise_connection_error(block) if @socket_state != :socket_opened uid = Elrpc.gen_uid msg = CallMessage.new(uid, name, args, block) # ここは競合しないのでロックしない @session[uid] = msg @sending_queue.push(msg) uid end
def_method(name, argdoc=nil, docstring=nil, &block)
click to toggle source
register_method
の簡易版
# File lib/elrpc.rb, line 333 def def_method(name, argdoc=nil, docstring=nil, &block) register_method(Method.new(name, argdoc, docstring, &block)) end
query_methods()
click to toggle source
接続相手のメソッド一覧を返す(同期版)
- [name, argdoc, docstring], …
# File lib/elrpc.rb, line 391 def query_methods mutex = Mutex.new cv = ConditionVariable.new ret = nil ex = nil query_methods_async do |err, value| mutex.synchronize do ex = err ret = value cv.signal end end mutex.synchronize do cv.wait(mutex) end if !ex.nil? raise ex end return ret end
query_methods_async(&block)
click to toggle source
接続相手のメソッド一覧を返す
- [name, argdoc, docstring], …
# File lib/elrpc.rb, line 380 def query_methods_async(&block) return _raise_connection_error(block) if @socket_state != :socket_opened uid = Elrpc.gen_uid msg = MethodsMessage.new(uid, block) @session[uid] = msg @sending_queue.push(msg) uid end
register_method(method)
click to toggle source
自分にメソッドを登録する
# File lib/elrpc.rb, line 328 def register_method(method) @methods[method.name] = method end
stop()
click to toggle source
# File lib/elrpc.rb, line 412 def stop if @socket_state == :socket_opened then @logger.debug "RPCService.stop: received!" @worker_pool.kill @socket_state = :socket_closing @socket.close @sending_queue << nil # stop message @sender_thread.join(4) unless Thread.current == @sender_thread @receiver_thread.join(4) unless Thread.current == @receiver_thread _clear_waiting_sessions @socket_state = :socket_not_connected end _wakeup @logger.debug "RPCService.stop: completed" end
wait()
click to toggle source
ソケットが相手から切断されるまでメインスレッドを止める
# File lib/elrpc.rb, line 429 def wait @wait_lock = Mutex.new @wait_cv = ConditionVariable.new @wait_lock.synchronize do @wait_cv.wait(@wait_lock) end stop end
Private Instance Methods
_call(ast)
click to toggle source
相手からメソッドを呼ばれた
# File lib/elrpc.rb, line 611 def _call(ast) _, uid, name, args = ast @logger.debug ": called: Enter: #{name} : #{uid}" method = @methods[name.to_sym] if method then task = -> do msg = nil begin ret = method.call(args) msg = ReturnMessage.new(uid, ret) rescue => e @logger.debug ": called: Error!: #{name} : #{uid} : #{e}" @logger.debug e msg = ErrorMessage.new(uid, [e.class.name, e.message, e.backtrace.join("\n")]) end @sending_queue.push(msg) end @worker_pool.invoke(task) else # method not found @logger.debug ": called: Method not found: #{name} : #{uid} " @sending_queue.push(EPCErrorMessage.new(uid, "Not found the name: #{name}")) end # if @logger.debug ": called: Leave: #{name} : #{uid}" end
_clear_waiting_sessions()
click to toggle source
RPC呼び出しで待ってるスレッドを全てエラーにして終了させる
# File lib/elrpc.rb, line 446 def _clear_waiting_sessions @session_lock.synchronize do @session.keys.each do |uid| _session_return(uid, EPCStackError.new("ConnectionClosed","EPC Connection closed",""), nil) end end end
_epc_error(ast)
click to toggle source
相手からEPCエラーが返ってきた
# File lib/elrpc.rb, line 663 def _epc_error(ast) _, uid, error = ast @logger.debug ": epc-error: Start: #{uid} : error = #{error}" if @session[uid] then # error : [classname, message, backtrace] _session_return(uid, EPCStackError.new(error[0], error[1], error[2]), nil) else @logger.error "Not found a session for #{uid}" end @logger.debug ": epc-error: End: #{uid}" end
_query_methods(ast)
click to toggle source
相手から一覧要求があった
# File lib/elrpc.rb, line 676 def _query_methods(ast) _, uid = ast @logger.debug ": query-methods: Start: #{uid}" begin list = @methods.map do |k,m| [m.name, m.argdoc, m.docstring] end msg = ReturnMessage.new(uid, list) @sending_queue.push(msg) rescue => e @logger.warn ": query-method: Exception #{e.message}" @logger.warn e.backtrace.join("\n") msg = ErrorMessage.new(uid, [e.class.name, e.message, e.backtrace.join("\n")]) @sending_queue.push(msg) end @logger.debug ": query-methods: End: #{uid}" end
_return(ast)
click to toggle source
相手から返り値が返ってきた
# File lib/elrpc.rb, line 638 def _return(ast) _, uid, value = ast @logger.debug ": return: Start: #{uid} : value = #{value}" if @session[uid] then _session_return(uid, nil, value) else @logger.error "Not found a session for #{uid}" end @logger.debug ": return: End: #{uid}" end
_return_error(ast)
click to toggle source
相手からアプリケーションエラーが返ってきた
# File lib/elrpc.rb, line 650 def _return_error(ast) _, uid, error = ast @logger.debug ": return-error: Start: #{uid} : error = #{error}" if @session[uid] then # error : [classname, message, backtrace] _session_return(uid, EPCRuntimeError.new(error[0], error[1], error[2]), nil) else @logger.error "Not found a session for #{uid}" end @logger.debug ": return-error: End: #{uid}" end
_send_message(msg)
click to toggle source
相手にシリアライズされたデータを送る
# File lib/elrpc.rb, line 470 def _send_message(msg) msg = msg.encode("UTF-8") + "\n" len = msg.bytesize body = sprintf("%06x%s",len,msg) @socket.write(body) @socket.flush end
_session_return(uid, error, value)
click to toggle source
呼び出し元に値を返して、セッションをクリアする
# File lib/elrpc.rb, line 479 def _session_return(uid, error, value) m = nil @session_lock.synchronize do m = @session[uid] @session.delete(uid) end if m then m.block.call(error, value) end end
_socket_state_lock() { || ... }
click to toggle source
# File lib/elrpc.rb, line 454 def _socket_state_lock @socket_state_lock.synchronize do yield end end
_wakeup()
click to toggle source
もし、メインスレッドが停止していれば再開させて終了させる
# File lib/elrpc.rb, line 461 def _wakeup if @wait_lock @wait_lock.synchronize do @wait_cv.signal end end end
receiver_loop()
click to toggle source
# File lib/elrpc.rb, line 530 def receiver_loop parser = Elparser::Parser.new loop do ast = nil # for error message and recovery uid = nil begin lenstr = @socket.read(6) if lenstr.nil? then @logger.debug "[rcvloop] Socket closed!" @socket_state = :socket_not_connected break end len = lenstr.to_i(16) @logger.debug "Receiving a message : len=#{len}" body = @socket.read(len) # 1 means LF if body.nil? then @logger.debug "[rcvloop] Socket closed!" @socket_state = :socket_not_connected break end body.force_encoding("utf-8") @logger.debug "Parse : #{body}/#{body.encoding}" ast = parser.parse(body) raise "Unexpected multiple s-expression : #{body}" if ast.size != 1 ast = ast[0].to_ruby uid = ast[1] case ast[0] when :call @logger.debug " received: CALL : #{uid}" _call(ast) when :return @logger.debug " received: RETURN: #{uid}" _return(ast) when :'return-error' @logger.debug " received: ERROR: #{uid}" _return_error(ast) when :'epc-error' @logger.debug " received: EPC_ERROR: #{uid}" _epc_error(ast) when :'methods' @logger.debug " received: METHODS: #{uid}" _query_methods(ast) else @logger.debug " Unknown message code. try to reset the connection. >> #{body}" @socket_state = :socket_closing @sending_queue.push nil # wakeup sender thread return end # case if @socket_state == :socket_closing then @logger.debug "[receiver-thread] terminating..." break end rescue Exception => evar @logger.debug "[rcvloop] Exception! #{evar}" mes = evar.message if uid && @session[uid] then _session_return(uid, evar, nil) end if mes["close"] || mes["reset"] then @logger.debug " [rcvloop] disconnected by the peer." break elsif evar.kind_of?(IOError) then @logger.debug " [rcvloop] IOError." @socket_state = :socket_closing break else @logger.warn " [rcvloop] going to recover the communication." bt = evar.backtrace.join("\n") @logger.warn " [rcvloop] #{bt}" end end # begin rescue ast = nil uid = nil end # loop @logger.debug "[receiver-thread] loop exit : #{@socket_state}" _clear_waiting_sessions _wakeup @logger.debug "[receiver-thread exit]--------------" end
sender_loop()
click to toggle source
# File lib/elrpc.rb, line 490 def sender_loop loop do begin entry = @sending_queue.shift if entry.nil? then @logger.debug "Queue.shift received stop message." break end @logger.debug "Queue.shift [#{@sending_queue.size}] : #{entry.uid}" body = Elparser.encode( entry.to_ast ) @logger.debug "Encode : #{body}" _send_message( body ) @logger.debug " Queue -> sent #{entry.uid}" rescue Elparser::EncodingError => evar @logger.warn "[sendloop] #{evar.to_s} " err = EPCStackError.new(evar.class.name, evar.message, evar.backtrace) _session_return(entry.uid, err, nil) if entry rescue => evar mes = evar.message @logger.warn "[sendloop] #{evar.to_s} " if mes["abort"] then @logger.warn " [sendloop] disconnected by the peer." @socket_state = :socket_not_connected elsif evar.class == IOError then @logger.warn evar.backtrace.join("\n") @socket_state = :socket_closing end _session_return(entry.uid, evar, nil) if entry end # begin if @socket_state == :socket_closing || @socket_state == :socket_not_connected then @logger.debug "[sender-thread] terminating..." break end end # loop @logger.debug "[sender-thread] loop exit : #{@socket_state}" _wakeup @logger.debug "[sender-thread] exit--------------" end