class OFlow::Actors::HttpServer

Provides a simple HTTP server that accepts requests which then trigger a flow. The execution flow should end back at the server so it can sent a response to the requester.

Constants

STATUS_MESSAGES

Public Class Methods

new(task, options) click to toggle source
Calls superclass method
# File lib/oflow/actors/httpserver.rb, line 14
def initialize(task, options)
  super
  @sessions = { }
  
  @server = TCPServer.new(@port)
  @server.fcntl(Fcntl::F_SETFL, @server.fcntl(Fcntl::F_GETFL, 0) | Fcntl::O_NONBLOCK)
  @server_loop = Thread.start(self) do |me|
    Thread.current[:name] = me.task.full_name() + '-server'
    while Task::CLOSING != task.state
      begin
        if Task::BLOCKED == task.state || Task::STOPPED == task.state
          sleep(0.1)
          next
        end
        session = @server.accept_nonblock()
        session.fcntl(Fcntl::F_SETFL, session.fcntl(Fcntl::F_GETFL, 0) | Fcntl::O_NONBLOCK)
        @count += 1
        # if nil is returned the request was empty
        next if (req = read_req(session, @count)).nil?
        @sessions[@count] = session
        resp = {
          status: 200,
          body: nil,
          headers: {
            'Content-Type' => 'text/html',
          }
        }
        box = new_event()
        box.contents[:request] = req
        box.contents[:response] = resp
        task.links.each_key do |key|
          continue if :success == key || 'success' == key
          begin
            task.ship(key, box)
          rescue BlockedError
            task.warn("Failed to ship timer #{box.contents} to #{key}. Task blocked.")
          rescue BusyError
            task.warn("Failed to ship timer #{box.contents} to #{key}. Task busy.")
          end
        end
      rescue IO::WaitReadable, Errno::EINTR
        IO.select([@server], nil, nil, 0.5)
      rescue Exception => e
        task.handle_error(e)
      end
    end
  end
end

Public Instance Methods

perform(op, box) click to toggle source
# File lib/oflow/actors/httpserver.rb, line 63
def perform(op, box)
  case op
  when :reply
    req_id = box.get(@req_id_path)
    if (session = @sessions[req_id]).nil?
      raise NotFoundError.new(task.full_name, 'session', req_id)
    end
    if (resp = box.get(@response_path)).nil?
      raise NotFoundError.new(task.full_name, 'response', @response_path)
    end
    body = resp[:body]
    body = '' if body.nil?
    status = resp[:status]
    headers = ["HTTP/1.1 #{status} {STATUS_MESSAGES[status]}"]
    resp[:headers].each do |k,v|
      headers << "#{k}: #{v}"
    end
    headers << "Content-Length: #{body.length}\r\n\r\n"
    session.puts headers.join("\r\n")
    session.puts body
    session.close
    @sessions.delete(req_id)
  when :skip
    req_id = box.get(@req_id_path)
    if (session = @sessions[req_id]).nil?
      raise NotFoundError.new(task.full_name, 'session', req_id)
    end
    @sessions.delete(req_id)
  else
    raise OpError.new(task.full_name, op)
  end
  task.ship(:success, Box.new(nil, box.tracker)) unless task.links[:success].nil?
end
read_req(session, id) click to toggle source
# File lib/oflow/actors/httpserver.rb, line 105
def read_req(session, id)
  req = {
    id: id,
  }
  line = session.gets()
  return if line.nil?
  parts = line.split(' ')
  req[:method] = parts[0]
  req[:protocol] = parts[2]
  path, arg_str = parts[1].split('?', 2)
  req[:path] = path
  args = nil
  unless arg_str.nil?
    args = arg_str.split('&').map { |pair| pair.split('=') }
  end
  req[:args] = args

  # Read the rest of the lines and the body if there is one.
  len = 0
  while line = session.gets()
    line.strip!
    break if 0 == line.size
    parts = line.split(':', 2)
    next unless 2 == parts.size
    key = parts[0]
    value = parts[1].strip()
    if 'Content-Length' == key
      value = value.to_i
      len = value
    end
    req[key] = value
  end
  req[:body] = read_timeout(session, len, @read_timeout) if 0 < len
  req
end
read_timeout(session, len, timeout) click to toggle source
# File lib/oflow/actors/httpserver.rb, line 141
def read_timeout(session, len, timeout)
  str = ''
  done = Time.now() + timeout
  loop do
    begin
      str = session.readpartial(len, str)
      break if str.size == len
    rescue  Errno::EAGAIN => e
      raise e if IO.select([session], nil, nil, done - Time.now()).nil?
      retry
    end
  end
  str
end
set_options(options) click to toggle source
Calls superclass method
# File lib/oflow/actors/httpserver.rb, line 97
def set_options(options)
  super
  @port = options.fetch(:port, 6060)
  @req_id_path = options.fetch(:req_id_path, 'request:id')
  @response_path = options.fetch(:response_path, 'response')
  @read_timeout = options.fetch(:read_timeout, 1.0)
end