class ReceptorController::Client::ResponseWorker
ResponseWorker
is listening on Kafka topic platform.receptor-controller.responses (@see Configuration.queue_topic) It asynchronously receives responses requested by POST /job to receptor controller. Request and response is paired by message ID (response of POST /job and 'in_response_to' value in kafka response here)
Successful responses are at least two:
-
1+ of 'response' type, containing data
-
1 of 'eof' type, signalizing end of transmission
Registered messages without response are removed after timeout (Configuration.response_timeout)
All type of responses/timeout can be sent to registered callbacks (@see :register_message)
Use “start” and “stop” methods to start/stop listening on Kafka
Constants
- EOF
- INITIALIZATION_TIMEOUT
- RESPONSE
Attributes
Public Class Methods
# File lib/receptor_controller/client/response_worker.rb, line 31 def initialize(config, logger) self.config = config self.lock = Mutex.new self.timeout_lock = Mutex.new self.logger = logger self.registered_messages = Concurrent::Map.new self.received_messages = Concurrent::Array.new self.started = Concurrent::AtomicBoolean.new(false) self.workers = {} end
Public Instance Methods
Registers message_id received by request, Defines response and timeout callback methods
@param msg_id [String] UUID @param receiver [Object] any object implementing callbacks @param response_callback [Symbol] name of receiver's method processing responses @param timeout_callback [Symbol] name of receiver's method processing timeout [optional] @param error_callback [Symbol] name of receiver's method processing errors [optional]
# File lib/receptor_controller/client/response_worker.rb, line 78 def register_message(msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error) registered_messages[msg_id] = {:receiver => receiver, :response_callback => response_callback, :timeout_callback => timeout_callback, :error_callback => error_callback, :last_checked_at => Time.now.utc} end
Start listening on Kafka
# File lib/receptor_controller/client/response_worker.rb, line 43 def start init_lock, init_wait = Mutex.new, ConditionVariable.new lock.synchronize do return if started.value default_messaging_opts # Thread-safe init started.value = true workers[:maintenance] = Thread.new { check_timeouts while started.value } workers[:listener] = Thread.new { listen while started.value } end logger.info("Receptor Response worker started...") end
Stop listener
# File lib/receptor_controller/client/response_worker.rb, line 60 def stop lock.synchronize do return unless started.value started.value = false workers[:listener]&.terminate workers[:maintenance]&.join end end
Private Instance Methods
# File lib/receptor_controller/client/response_worker.rb, line 170 def check_timeouts(threshold = config.response_timeout) expired = [] # # STEP 1 Collect expired messages # registered_messages.each_pair do |message_id, callbacks| timeout_lock.synchronize do if callbacks[:last_checked_at] < Time.now.utc - threshold expired << message_id end end end # # STEP 2 Remove expired messages, send timeout callbacks # expired.each do |message_id| callbacks = registered_messages.delete(message_id) if callbacks[:receiver].respond_to?(callbacks[:timeout_callback]) callbacks[:receiver].send(callbacks[:timeout_callback], message_id) end end sleep(config.response_timeout_poll_time) rescue => err logger.error("Exception in maintenance worker: #{err}\n#{err.backtrace.join("\n")}") end
# File lib/receptor_controller/client/response_worker.rb, line 237 def default_messaging_opts return @default_messaging_opts if @default_messaging_opts @default_messaging_opts = { :host => config.queue_host, :port => config.queue_port, :protocol => :Kafka, :client_ref => "receptor_client-responses-#{Time.now.to_i}", # A reference string to identify the client } end
GZIP recognition tools.ietf.org/html/rfc1952#page-5
# File lib/receptor_controller/client/response_worker.rb, line 200 def gzipped?(data) sign = data.to_s.bytes[0..1] sign[0] == '0x1f'.hex && sign[1] == '0x8b'.hex end
# File lib/receptor_controller/client/response_worker.rb, line 91 def listen # Open a connection to the messaging service client = ManageIQ::Messaging::Client.open(default_messaging_opts) client.subscribe_topic(queue_opts) do |message| process_message(message) end rescue => err logger.error(response_log("Exception in kafka listener: #{err}\n#{err.backtrace.join("\n")}")) ensure client&.close end
# File lib/receptor_controller/client/response_worker.rb, line 248 def log_received_message(callbacks, message_id, response) log_all = (ENV["LOG_ALL_RECEPTOR_MESSAGES"] || 0).to_i != 0 if log_all || (!log_all && callbacks.present?) logger.debug(response_log("Received message #{message_id}: serial: #{response["serial"]}, type: #{response['message_type']}, payload: #{response['payload'] || "n/a"}")) end end
# File lib/receptor_controller/client/response_worker.rb, line 104 def process_message(message) response = JSON.parse(message.payload) if (message_id = response['in_response_to']) callbacks = registered_messages[message_id] log_received_message(callbacks, message_id, response) if callbacks.present? # Reset last_checked_at to avoid timeout in multi-response messages reset_last_checked_at(callbacks) if response['code'] == 0 # # Response OK # message_type = response['message_type'] # "response" (with data) or "eof" (without data) payload = response['payload'] callbacks[:received_msgs] ? callbacks[:received_msgs] += 1 : callbacks[:received_msgs] = 1 case message_type when EOF # Store how many messages are needed to be received for this request callbacks[:total_msgs] = response["serial"] when RESPONSE payload = unpack_payload(payload) if payload.kind_of?(String) callbacks[:msg_size] ? callbacks[:msg_size] += payload.size : callbacks[:msg_size] = payload.size callbacks[:receiver].send(callbacks[:response_callback], message_id, message_type, payload) else # Send the callback to release the thread. logger.warn(response_log("Unexpected type | message #{message_id}, type: #{message_type}")) callbacks[:receiver].send(callbacks[:response_callback], message_id, message_type, payload) end # We received all the messages, complete the message. if callbacks[:received_msgs] == callbacks[:total_msgs] registered_messages.delete(message_id) callbacks[:receiver].send(callbacks[:response_callback], message_id, EOF, payload) logger.debug(response_log("Message #{message_id} complete, total bytes: #{callbacks[:msg_size]}")) end logger.debug(response_log("OK | message: #{message_id}, serial: #{response["serial"]}, type: #{message_type}, payload: #{payload || "n/a"}")) else # # Response Error # registered_messages.delete(message_id) logger.error(response_log("ERROR | message #{message_id} (#{response})")) callbacks[:receiver].send(callbacks[:error_callback], message_id, response['code'], response['payload']) end elsif (ENV["LOG_ALL_RECEPTOR_MESSAGES"] || 0)&.to_i != 0 # noop, it's not error if not registered, can be processed by another pod logger.debug(response_log("NOT REGISTERED | #{message_id} (#{response['code']})")) end else logger.error(response_log("MISSING | Message id (in_response_to) not received! #{response}")) end rescue JSON::ParserError => e logger.error(response_log("Failed to parse Kafka response (#{e.message})\n#{message.payload}")) rescue => e logger.error(response_log("#{e}\n#{e.backtrace.join("\n")}")) ensure message.ack unless config.queue_auto_ack end
No persist_ref here, because all instances (pods) needs to receive kafka message TODO: temporary changed to unique persist_ref
# File lib/receptor_controller/client/response_worker.rb, line 227 def queue_opts return @queue_opts if @queue_opts @queue_opts = {:service => config.queue_topic, :auto_ack => config.queue_auto_ack} @queue_opts[:max_bytes] = config.queue_max_bytes if config.queue_max_bytes @queue_opts[:persist_ref] = ENV['HOSTNAME'] @queue_opts end
Reset last_checked_at to avoid timeout in multi-response messages
# File lib/receptor_controller/client/response_worker.rb, line 219 def reset_last_checked_at(callbacks) timeout_lock.synchronize do callbacks[:last_checked_at] = Time.now.utc end end
# File lib/receptor_controller/client/response_worker.rb, line 255 def response_log(message) "Receptor Response [#{queue_opts[:persist_ref]}]: #{message}" end
Tries to decompress String response If not a gzip, it's a String error from receptor node
# File lib/receptor_controller/client/response_worker.rb, line 208 def unpack_payload(data) decoded = Base64.decode64(data) if gzipped?(decoded) gz = Zlib::GzipReader.new(StringIO.new(decoded)) JSON.parse(gz.read) else data end end