class DigitalFabric::Agent
Constants
- UPGRADE_REQUEST
Public Class Methods
new(server_url, route, token)
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 12 def initialize(server_url, route, token) @server_url = server_url @route = route @token = token @requests = {} @long_running_requests = {} @name = '<unknown>' end
Public Instance Methods
connect_and_process_incoming_requests()
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 41 def connect_and_process_incoming_requests # log 'Connecting...' @socket = connect_to_server @last_recv = @last_send = Time.now df_upgrade @connected = true @msgpack_reader = MessagePack::Unpacker.new process_incoming_requests rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE, TimeoutError log 'Disconnected' if @connected @connected = nil end
connect_to_server()
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 56 def connect_to_server if @server_url =~ /^([^\:]+)\:(\d+)$/ host = Regexp.last_match(1) port = Regexp.last_match(2) Polyphony::Net.tcp_connect(host, port) else UNIXSocket.new(@server_url) end end
df_upgrade()
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 76 def df_upgrade @socket << format(UPGRADE_REQUEST, @token, mount_point) while (line = @socket.gets) break if line.chomp.empty? end # log 'Connection upgraded' end
get_http_request_body(id, limit)
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 201 def get_http_request_body(id, limit) send_df_message(Protocol.http_get_request_body(id, limit)) receive end
http_request(req)
click to toggle source
default handler for HTTP request
# File lib/tipi/digital_fabric/agent.rb, line 221 def http_request(req) req.respond(nil, { ':status': Qeweney::Status::SERVICE_UNAVAILABLE }) end
is_long_running_request_response?(msg)
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 152 def is_long_running_request_response?(msg) case msg[Protocol::Attribute::KIND] when Protocol::HTTP_UPGRADE true when Protocol::HTTP_RESPONSE !msg[Protocol::Attribute::HttpResponse::COMPLETE] end end
keep_alive()
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 107 def keep_alive return unless @connected now = Time.now if now - @last_send >= Protocol::SEND_TIMEOUT send_df_message(Protocol.ping) end # if now - @last_recv >= Protocol::RECV_TIMEOUT # raise TimeoutError # end rescue IOError, SystemCallError => e # transmit exception to fiber running the agent @fiber.raise(e) end
log(msg)
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 94 def log(msg) puts "#{Time.now} (#{@name}) #{msg}" end
mount_point()
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 84 def mount_point if @route[:host] "host=#{@route[:host]}" elsif @route[:path] "path=#{@route[:path]}" else nil end end
prepare_http_request(msg)
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 185 def prepare_http_request(msg) headers = msg[Protocol::Attribute::HttpRequest::HEADERS] body_chunk = msg[Protocol::Attribute::HttpRequest::BODY_CHUNK] complete = msg[Protocol::Attribute::HttpRequest::COMPLETE] req = Qeweney::Request.new(headers, RequestAdapter.new(self, msg)) req.buffer_body_chunk(body_chunk) if body_chunk req end
process_incoming_requests()
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 98 def process_incoming_requests @socket.feed_loop(@msgpack_reader, :feed_each) do |msg| recv_df_message(msg) return if @shutdown && @requests.empty? end rescue IOError, SystemCallError, TimeoutError # ignore end
recv_df_message(msg)
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 122 def recv_df_message(msg) @last_recv = Time.now case msg[Protocol::Attribute::KIND] when Protocol::SHUTDOWN recv_shutdown when Protocol::HTTP_REQUEST recv_http_request(msg) when Protocol::HTTP_REQUEST_BODY recv_http_request_body(msg) when Protocol::WS_REQUEST recv_ws_request(msg) when Protocol::CONN_DATA, Protocol::CONN_CLOSE, Protocol::WS_DATA, Protocol::WS_CLOSE fiber = @requests[msg[Protocol::Attribute::ID]] fiber << msg if fiber end end
recv_http_request(msg)
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 168 def recv_http_request(msg) req = prepare_http_request(msg) id = msg[Protocol::Attribute::ID] @requests[id] = spin("#{Fiber.current.tag}.#{id}") do http_request(req) rescue IOError, Errno::ECONNREFUSED, Errno::EPIPE # ignore rescue Polyphony::Terminate => e req.respond(nil, { ':status' => Qeweney::Status::SERVICE_UNAVAILABLE }) if Fiber.current.graceful_shutdown? raise e ensure @requests.delete(id) @long_running_requests.delete(id) @fiber.terminate if @shutdown && @requests.empty? end end
recv_http_request_body(msg)
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 194 def recv_http_request_body(msg) fiber = @requests[msg[Protocol::Attribute::ID]] return unless fiber fiber << msg[Protocol::Attribute::HttpRequestBody::BODY] end
recv_shutdown()
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 161 def recv_shutdown # puts "Received shutdown message (#{@requests.size} pending requests)" # puts " (Long running requests: #{@long_running_requests.size})" @shutdown = true @long_running_requests.values.each { |f| f.terminate(true) } end
recv_ws_request(msg)
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 206 def recv_ws_request(msg) req = Qeweney::Request.new(msg[Protocol::Attribute::WS::HEADERS], RequestAdapter.new(self, msg)) id = msg[Protocol::Attribute::ID] @requests[id] = @long_running_requests[id] = spin("#{Fiber.current.tag}.#{id}-ws") do ws_request(req) rescue IOError, Errno::ECONNREFUSED, Errno::EPIPE # ignore ensure @requests.delete(id) @long_running_requests.delete(id) @fiber.terminate if @shutdown && @requests.empty? end end
run()
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 29 def run @fiber = Fiber.current @keep_alive_timer = spin_loop("#{@fiber.tag}-keep_alive", interval: 5) { keep_alive } while true connect_and_process_incoming_requests return if @shutdown sleep 5 end ensure @keep_alive_timer.stop end
send_df_message(msg)
click to toggle source
# File lib/tipi/digital_fabric/agent.rb, line 140 def send_df_message(msg) # we mark long-running requests by applying simple heuristics to sent DF # messages. This is so we can correctly stop long-running requests # upon graceful shutdown if is_long_running_request_response?(msg) id = msg[Protocol::Attribute::ID] @long_running_requests[id] = @requests[id] end @last_send = Time.now @socket << msg.to_msgpack end
ws_request(req)
click to toggle source
default handler for WS request
# File lib/tipi/digital_fabric/agent.rb, line 226 def ws_request(req) req.respond(nil, { ':status': Qeweney::Status::SERVICE_UNAVAILABLE }) end