class DigitalFabric::Service
Constants
- INVALID_HOST
Attributes
stats[R]
timer[R]
token[R]
Public Class Methods
new(token: )
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 12 def initialize(token: ) @token = token @agents = {} @routes = {} @counters = { connections: 0, http_requests: 0, errors: 0 } @connection_count = 0 @current_request_count = 0 @http_latency_accumulator = 0 @http_latency_counter = 0 @http_latency_max = 0 @last_counters = @counters.merge(stamp: Time.now.to_f - 1) @fiber = Fiber.current # @timer = Polyphony::Timer.new('service_timer', resolution: 5) end
Public Instance Methods
calculate_stats()
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 31 def calculate_stats now = Time.now.to_f elapsed = now - @last_counters[:stamp] connections = @counters[:connections] - @last_counters[:connections] http_requests = @counters[:http_requests] - @last_counters[:http_requests] errors = @counters[:errors] - @last_counters[:errors] @last_counters = @counters.merge(stamp: now) average_latency = @http_latency_counter == 0 ? 0 : @http_latency_accumulator / @http_latency_counter @http_latency_accumulator = 0 @http_latency_counter = 0 max_latency = @http_latency_max @http_latency_max = 0 cpu, rss = pid_cpu_and_rss(Process.pid) backend_stats = Thread.backend.stats op_rate = backend_stats[:op_count] / elapsed switch_rate = backend_stats[:switch_count] / elapsed poll_rate = backend_stats[:poll_count] / elapsed object_space_stats = ObjectSpace.count_objects { service: { agent_count: @agents.size, connection_count: @connection_count, connection_rate: connections / elapsed, error_rate: errors / elapsed, http_request_rate: http_requests / elapsed, latency_avg: average_latency, latency_max: max_latency, pending_requests: @current_request_count, }, backend: { op_rate: op_rate, pending_ops: backend_stats[:pending_ops], poll_rate: poll_rate, runqueue_size: backend_stats[:runqueue_size], runqueue_high_watermark: backend_stats[:runqueue_max_length], switch_rate: switch_rate, }, process: { cpu_usage: cpu, rss: rss.to_f / 1024, objects_total: object_space_stats[:TOTAL], objects_free: object_space_stats[:FREE] } } end
compile_agent_routes()
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 229 def compile_agent_routes @routing_changed = false @routes.clear @agents.keys.reverse.each do |agent| route = @agents[agent] @routes[route] ||= agent end @route_keys = @routes.keys end
decr_connection_count()
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 102 def decr_connection_count @connection_count -= 1 end
df_upgrade(req)
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 182 def df_upgrade(req) # we don't want to count connected agents @current_request_count -= 1 if req.headers['df-token'] != @token return req.respond(nil, ':status' => Qeweney::Status::FORBIDDEN) end req.adapter.conn << Protocol.df_upgrade_response AgentProxy.new(self, req) ensure @current_request_count += 1 end
find_agent(req)
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 215 def find_agent(req) compile_agent_routes if @routing_changed host = req.headers[':authority'] || req.headers['host'] || INVALID_HOST path = req.headers[':path'] route = @route_keys.find do |route| (host == route[:host]) || (path =~ route[:path_regexp]) end return @routes[route] if route nil end
get_stats()
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 94 def get_stats calculate_stats end
graceful_shutdown()
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 244 def graceful_shutdown @shutdown = true @agents.keys.each do |agent| if agent.respond_to?(:send_shutdown) agent.send_shutdown else @agents.delete(agent) end end move_on_after(60) do while !@agents.empty? sleep 0.25 end end end
http_request(req, allow_df_upgrade = false)
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 127 def http_request(req, allow_df_upgrade = false) @current_request_count += 1 @counters[:http_requests] += 1 @counters[:connections] += 1 if req.headers[':first'] return upgrade_request(req, allow_df_upgrade) if req.upgrade_protocol inject_request_headers(req) agent = find_agent(req) unless agent @counters[:errors] += 1 return req.respond(nil, ':status' => Qeweney::Status::SERVICE_UNAVAILABLE) end agent.http_request(req) rescue IOError, SystemCallError, HTTP2::Error::StreamClosed @counters[:errors] += 1 rescue => e @counters[:errors] += 1 puts '*' * 40 p req p e puts e.backtrace.join("\n") req.respond(e.inspect, ':status' => Qeweney::Status::INTERNAL_SERVER_ERROR) ensure @current_request_count -= 1 req.adapter.conn.close if @shutdown end
incr_connection_count()
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 98 def incr_connection_count @connection_count += 1 end
inject_request_headers(req)
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 156 def inject_request_headers(req) req.headers['x-request-id'] = SecureRandom.uuid conn = req.adapter.conn req.headers['x-forwarded-for'] = conn.peeraddr(false)[2] req.headers['x-forwarded-proto'] ||= conn.is_a?(OpenSSL::SSL::SSLSocket) ? 'https' : 'http' end
mount(route, agent)
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 195 def mount(route, agent) if route[:path] route[:path_regexp] = path_regexp(route[:path]) end @executive = agent if route[:executive] @agents[agent] = route @routing_changed = true end
path_regexp(path)
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 240 def path_regexp(path) /^#{path}/ end
pid_cpu_and_rss(pid)
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 84 def pid_cpu_and_rss(pid) s = `ps -p #{pid} -o %cpu,rss` cpu, rss = s.lines[1].chomp.strip.split(' ') [cpu.to_f, rss.to_i] rescue Polyphony::BaseException raise rescue Exception [nil, nil] end
record_latency_measurement(latency, req)
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 118 def record_latency_measurement(latency, req) @http_latency_accumulator += latency @http_latency_counter += 1 @http_latency_max = latency if latency > @http_latency_max return if latency < 1.0 puts format('slow request (%.1f): %p', latency, req.headers) end
total_request_count()
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 108 def total_request_count count = 0 @agents.keys.each do |agent| if agent.respond_to?(:current_request_count) count += agent.current_request_count end end count end
unmount(agent)
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 204 def unmount(agent) route = @agents[agent] return unless route @executive = nil if route[:executive] @agents.delete(agent) @routing_changed = true end
upgrade_request(req, allow_df_upgrade)
click to toggle source
# File lib/tipi/digital_fabric/service.rb, line 163 def upgrade_request(req, allow_df_upgrade) case (protocol = req.upgrade_protocol) when 'df' if allow_df_upgrade df_upgrade(req) else req.respond(nil, ':status' => Qeweney::Status::SERVICE_UNAVAILABLE) end else agent = find_agent(req) unless agent @counters[:errors] += 1 return req.respond(nil, ':status' => Qeweney::Status::SERVICE_UNAVAILABLE) end agent.http_upgrade(req, protocol) end end