class OFlow::Actors::HttpServer
Provides a simple HTTP server that accepts requests which then trigger a flow. The execution flow should end back at the server so it can sent a response to the requester.
Constants
- STATUS_MESSAGES
Public Class Methods
new(task, options)
click to toggle source
Calls superclass method
# File lib/oflow/actors/httpserver.rb, line 14 def initialize(task, options) super @sessions = { } @server = TCPServer.new(@port) @server.fcntl(Fcntl::F_SETFL, @server.fcntl(Fcntl::F_GETFL, 0) | Fcntl::O_NONBLOCK) @server_loop = Thread.start(self) do |me| Thread.current[:name] = me.task.full_name() + '-server' while Task::CLOSING != task.state begin if Task::BLOCKED == task.state || Task::STOPPED == task.state sleep(0.1) next end session = @server.accept_nonblock() session.fcntl(Fcntl::F_SETFL, session.fcntl(Fcntl::F_GETFL, 0) | Fcntl::O_NONBLOCK) @count += 1 # if nil is returned the request was empty next if (req = read_req(session, @count)).nil? @sessions[@count] = session resp = { status: 200, body: nil, headers: { 'Content-Type' => 'text/html', } } box = new_event() box.contents[:request] = req box.contents[:response] = resp task.links.each_key do |key| continue if :success == key || 'success' == key begin task.ship(key, box) rescue BlockedError task.warn("Failed to ship timer #{box.contents} to #{key}. Task blocked.") rescue BusyError task.warn("Failed to ship timer #{box.contents} to #{key}. Task busy.") end end rescue IO::WaitReadable, Errno::EINTR IO.select([@server], nil, nil, 0.5) rescue Exception => e task.handle_error(e) end end end end
Public Instance Methods
perform(op, box)
click to toggle source
# File lib/oflow/actors/httpserver.rb, line 63 def perform(op, box) case op when :reply req_id = box.get(@req_id_path) if (session = @sessions[req_id]).nil? raise NotFoundError.new(task.full_name, 'session', req_id) end if (resp = box.get(@response_path)).nil? raise NotFoundError.new(task.full_name, 'response', @response_path) end body = resp[:body] body = '' if body.nil? status = resp[:status] headers = ["HTTP/1.1 #{status} {STATUS_MESSAGES[status]}"] resp[:headers].each do |k,v| headers << "#{k}: #{v}" end headers << "Content-Length: #{body.length}\r\n\r\n" session.puts headers.join("\r\n") session.puts body session.close @sessions.delete(req_id) when :skip req_id = box.get(@req_id_path) if (session = @sessions[req_id]).nil? raise NotFoundError.new(task.full_name, 'session', req_id) end @sessions.delete(req_id) else raise OpError.new(task.full_name, op) end task.ship(:success, Box.new(nil, box.tracker)) unless task.links[:success].nil? end
read_req(session, id)
click to toggle source
# File lib/oflow/actors/httpserver.rb, line 105 def read_req(session, id) req = { id: id, } line = session.gets() return if line.nil? parts = line.split(' ') req[:method] = parts[0] req[:protocol] = parts[2] path, arg_str = parts[1].split('?', 2) req[:path] = path args = nil unless arg_str.nil? args = arg_str.split('&').map { |pair| pair.split('=') } end req[:args] = args # Read the rest of the lines and the body if there is one. len = 0 while line = session.gets() line.strip! break if 0 == line.size parts = line.split(':', 2) next unless 2 == parts.size key = parts[0] value = parts[1].strip() if 'Content-Length' == key value = value.to_i len = value end req[key] = value end req[:body] = read_timeout(session, len, @read_timeout) if 0 < len req end
read_timeout(session, len, timeout)
click to toggle source
# File lib/oflow/actors/httpserver.rb, line 141 def read_timeout(session, len, timeout) str = '' done = Time.now() + timeout loop do begin str = session.readpartial(len, str) break if str.size == len rescue Errno::EAGAIN => e raise e if IO.select([session], nil, nil, done - Time.now()).nil? retry end end str end
set_options(options)
click to toggle source
Calls superclass method
# File lib/oflow/actors/httpserver.rb, line 97 def set_options(options) super @port = options.fetch(:port, 6060) @req_id_path = options.fetch(:req_id_path, 'request:id') @response_path = options.fetch(:response_path, 'response') @read_timeout = options.fetch(:read_timeout, 1.0) end