class WAB::IO::Engine
Public Class Methods
new(shell, tcnt)
click to toggle source
Starts the engine by creating a listener on STDIN. Processing threads are also created to handle the processing of requests.
- tcnt
-
processing thread count
# File lib/wab/io/engine.rb, line 13 def initialize(shell, tcnt) @shell = shell @last_rid = 0 @pending = {} @lock = Thread::Mutex.new @queue = Queue.new tcnt = 1 if 0 >= tcnt @tcnt = tcnt @timeout_thread = nil end
Public Instance Methods
process_msg(native)
click to toggle source
return false to exit loop
# File lib/wab/io/engine.rb, line 111 def process_msg(native) # exit loop if an interrupt (api less than 0) return false if native[:api] < 0 rid = native[:rid] body = native[:body] return send_error(rid, 'No body in request.') if body.nil? data = @shell.data(body, false) data.detect controller = @shell.controller(data) return send_error(rid, 'No handler found.') if controller.nil? reply_body = nil op = body[:op] path = body[:path] query = body[:query] begin if 'NEW' == op && controller.respond_to?(:create) log_operation_with_body('controller.create', path, query, body) if @shell.info? reply_body = controller.create(path, query, data.get(:content)) elsif 'GET' == op && controller.respond_to?(:read) log_operation('controller.read', path, query) if @shell.info? reply_body = controller.read(path, query) elsif 'DEL' == op && controller.respond_to?(:delete) log_operation('controller.delete', path, query) if @shell.info? reply_body = controller.delete(path, query) elsif 'MOD' == op && controller.respond_to?(:update) log_operation_with_body('controller.update', path, query, body) if @shell.info? reply_body = controller.update(path, query, data.get(:content)) else reply_body = controller.handle(data) end rescue Exception => e return send_error(rid, "#{e.class}: #{e.message}", e.backtrace) end # If reply_body is nil then it is async. unless reply_body.nil? reply_body = reply_body.native if reply_body.is_a?(WAB::Data) msg = {rid: rid, api: 2, body: reply_body} @shell.info("=> view: #{Oj.dump(msg, mode: :wab)}") if @shell.info? $stdout.puts(@shell.data(msg).json) $stdout.flush end true end
request(tql, timeout)
click to toggle source
Send request to the model portion of the system.
- tql
-
the body of the message which should be JSON-TQL as a native Hash
# File lib/wab/io/engine.rb, line 78 def request(tql, timeout) call = Call.new(timeout) @lock.synchronize { @last_rid += 1 call.rid = @last_rid.to_s @pending[call.rid] = call } msg = {rid: call.rid, api: 3, body: tql} @shell.info("=> model: #{Oj.dump(msg, mode: :wab)}") if @shell.info? data = @shell.data(msg, true) # Send the message. Make sure to flush to assure it gets sent. $stdout.puts(data.json) $stdout.flush # Wait for either the response to arrive or for a timeout. In both # cases #run should be called on the thread. Sleep is used instead of # stop to avoid a race condition where a response arrives before the # thread is stopped. sleep(0.1) while call.result.nil? call.result end
send_error(rid, msg, bt=nil)
click to toggle source
# File lib/wab/io/engine.rb, line 102 def send_error(rid, msg, bt=nil) body = { code: -1, error: msg } body[:backtrace] = bt unless bt.nil? $stdout.puts(@shell.data({rid: rid, api: 2, body: body}).json) $stdout.flush true end
shutdown(msg)
click to toggle source
# File lib/wab/io/engine.rb, line 68 def shutdown(msg) # TBD kill timeout thread Thread.kill(@timeout_thread) # tell processing threads to shutdown. @tcnt.times { @queue.push(msg) } end
start()
click to toggle source
# File lib/wab/io/engine.rb, line 24 def start() proc_threads = [] @tcnt.times { proc_threads << Thread.new { while true begin break unless process_msg(@queue.pop) rescue Exception => e $stderr.puts WAB::Impl.format_error(e) end end } } @timeout_thread = Thread.new { timeout_check } Oj.load($stdin, mode: :wab, symbol_keys: true) { |msg| api = msg[:api] @shell.info("=> controller #{Oj.dump(msg, mode: :wab)}") if @shell.info? case api when 1 @queue.push(msg) when 4 rid = msg[:rid] call = nil @lock.synchronize { call = @pending.delete(rid) } unless call.nil? call.result = msg[:body] call.thread.run end when -2, -3, -6, -15 shutdown(msg) break when -9 Thread.kill(@timeout_thread) proc_threads.each { |t| Thread.kill(t) } Process.exit(0) else $stderr.puts WAB::Impl.format_error("Invalid api value (#{api}) in message.") end } end
timeout_check()
click to toggle source
# File lib/wab/io/engine.rb, line 158 def timeout_check() while true sleep(0.5) timed_out = [] now = Time.now @lock.synchronize { @pending.delete_if { |_rid,call| if call.giveup <= now timed_out << call true else false end } } timed_out.each { |call| body = { code: -1, error: "Timed out waiting for #{call.rid}." } unless call.nil? call.result = body call.thread.run end } end end
Private Instance Methods
log_operation(caller, path, query)
click to toggle source
# File lib/wab/io/engine.rb, line 185 def log_operation(caller, path, query) @shell.info("=> #{caller}(#{path.join('/')}#{query})") end
log_operation_with_body(caller, path, query, body)
click to toggle source
# File lib/wab/io/engine.rb, line 189 def log_operation_with_body(caller, path, query, body) @shell.info("=> #{caller}(#{path.join('/')}#{query}, #{Oj.dump(body[:content], mode: :wab)})") end