class ReceptorController::Client::ResponseWorker

ResponseWorker is listening on Kafka topic platform.receptor-controller.responses (@see Configuration.queue_topic) It asynchronously receives responses requested by POST /job to receptor controller. Request and response is paired by message ID (response of POST /job and 'in_response_to' value in kafka response here)

Successful responses are at least two:

Registered messages without response are removed after timeout (Configuration.response_timeout)

All type of responses/timeout can be sent to registered callbacks (@see :register_message)

Use “start” and “stop” methods to start/stop listening on Kafka

Constants

EOF
INITIALIZATION_TIMEOUT
RESPONSE

Attributes

config[RW]
lock[RW]
logger[RW]
received_messages[RW]
registered_messages[RW]
started[RW]
started?[R]
timeout_lock[RW]
workers[RW]

Public Class Methods

new(config, logger) click to toggle source
# File lib/receptor_controller/client/response_worker.rb, line 31
def initialize(config, logger)
  self.config              = config
  self.lock                = Mutex.new
  self.timeout_lock        = Mutex.new
  self.logger              = logger
  self.registered_messages = Concurrent::Map.new
  self.received_messages   = Concurrent::Array.new
  self.started             = Concurrent::AtomicBoolean.new(false)
  self.workers             = {}
end

Public Instance Methods

register_message(msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error) click to toggle source

Registers message_id received by request, Defines response and timeout callback methods

@param msg_id [String] UUID @param receiver [Object] any object implementing callbacks @param response_callback [Symbol] name of receiver's method processing responses @param timeout_callback [Symbol] name of receiver's method processing timeout [optional] @param error_callback [Symbol] name of receiver's method processing errors [optional]

# File lib/receptor_controller/client/response_worker.rb, line 78
def register_message(msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error)
  registered_messages[msg_id] = {:receiver          => receiver,
                                 :response_callback => response_callback,
                                 :timeout_callback  => timeout_callback,
                                 :error_callback    => error_callback,
                                 :last_checked_at   => Time.now.utc}
end
start() click to toggle source

Start listening on Kafka

# File lib/receptor_controller/client/response_worker.rb, line 43
def start
  init_lock, init_wait = Mutex.new, ConditionVariable.new

  lock.synchronize do
    return if started.value

    default_messaging_opts # Thread-safe init

    started.value         = true
    workers[:maintenance] = Thread.new { check_timeouts while started.value }
    workers[:listener]    = Thread.new { listen while started.value }
  end

  logger.info("Receptor Response worker started...")
end
stop() click to toggle source

Stop listener

# File lib/receptor_controller/client/response_worker.rb, line 60
def stop
  lock.synchronize do
    return unless started.value

    started.value = false
    workers[:listener]&.terminate
    workers[:maintenance]&.join
  end
end

Private Instance Methods

check_timeouts(threshold = config.response_timeout) click to toggle source
# File lib/receptor_controller/client/response_worker.rb, line 170
def check_timeouts(threshold = config.response_timeout)
  expired = []
  #
  # STEP 1 Collect expired messages
  #
  registered_messages.each_pair do |message_id, callbacks|
    timeout_lock.synchronize do
      if callbacks[:last_checked_at] < Time.now.utc - threshold
        expired << message_id
      end
    end
  end

  #
  # STEP 2 Remove expired messages, send timeout callbacks
  #
  expired.each do |message_id|
    callbacks = registered_messages.delete(message_id)
    if callbacks[:receiver].respond_to?(callbacks[:timeout_callback])
      callbacks[:receiver].send(callbacks[:timeout_callback], message_id)
    end
  end

  sleep(config.response_timeout_poll_time)
rescue => err
  logger.error("Exception in maintenance worker: #{err}\n#{err.backtrace.join("\n")}")
end
default_messaging_opts() click to toggle source
# File lib/receptor_controller/client/response_worker.rb, line 237
def default_messaging_opts
  return @default_messaging_opts if @default_messaging_opts

  @default_messaging_opts = {
    :host       => config.queue_host,
    :port       => config.queue_port,
    :protocol   => :Kafka,
    :client_ref => "receptor_client-responses-#{Time.now.to_i}", # A reference string to identify the client
  }
end
gzipped?(data) click to toggle source

GZIP recognition tools.ietf.org/html/rfc1952#page-5

# File lib/receptor_controller/client/response_worker.rb, line 200
def gzipped?(data)
  sign = data.to_s.bytes[0..1]

  sign[0] == '0x1f'.hex && sign[1] == '0x8b'.hex
end
listen() click to toggle source
# File lib/receptor_controller/client/response_worker.rb, line 91
def listen
  # Open a connection to the messaging service
  client = ManageIQ::Messaging::Client.open(default_messaging_opts)

  client.subscribe_topic(queue_opts) do |message|
    process_message(message)
  end
rescue => err
  logger.error(response_log("Exception in kafka listener: #{err}\n#{err.backtrace.join("\n")}"))
ensure
  client&.close
end
log_received_message(callbacks, message_id, response) click to toggle source
# File lib/receptor_controller/client/response_worker.rb, line 248
def log_received_message(callbacks, message_id, response)
  log_all = (ENV["LOG_ALL_RECEPTOR_MESSAGES"] || 0).to_i != 0
  if log_all || (!log_all && callbacks.present?)
    logger.debug(response_log("Received message #{message_id}: serial: #{response["serial"]}, type: #{response['message_type']}, payload: #{response['payload'] || "n/a"}"))
  end
end
process_message(message) click to toggle source
# File lib/receptor_controller/client/response_worker.rb, line 104
def process_message(message)
  response = JSON.parse(message.payload)

  if (message_id = response['in_response_to'])
    callbacks = registered_messages[message_id]
    log_received_message(callbacks, message_id, response)

    if callbacks.present?
      # Reset last_checked_at to avoid timeout in multi-response messages
      reset_last_checked_at(callbacks)

      if response['code'] == 0
        #
        # Response OK
        #
        message_type = response['message_type'] # "response" (with data) or "eof" (without data)
        payload = response['payload']
        callbacks[:received_msgs] ? callbacks[:received_msgs] += 1 : callbacks[:received_msgs] = 1

        case message_type
        when EOF
          # Store how many messages are needed to be received for this request
          callbacks[:total_msgs] = response["serial"]
        when RESPONSE
          payload = unpack_payload(payload) if payload.kind_of?(String)
          callbacks[:msg_size] ? callbacks[:msg_size] += payload.size : callbacks[:msg_size] = payload.size
          callbacks[:receiver].send(callbacks[:response_callback], message_id, message_type, payload)
        else
          # Send the callback to release the thread.
          logger.warn(response_log("Unexpected type | message #{message_id}, type: #{message_type}"))
          callbacks[:receiver].send(callbacks[:response_callback], message_id, message_type, payload)
        end

        # We received all the messages, complete the message.
        if callbacks[:received_msgs] == callbacks[:total_msgs]
          registered_messages.delete(message_id)
          callbacks[:receiver].send(callbacks[:response_callback], message_id, EOF, payload)
          logger.debug(response_log("Message #{message_id} complete, total bytes: #{callbacks[:msg_size]}"))
        end

        logger.debug(response_log("OK | message: #{message_id}, serial: #{response["serial"]}, type: #{message_type}, payload: #{payload || "n/a"}"))
      else
        #
        # Response Error
        #
        registered_messages.delete(message_id)

        logger.error(response_log("ERROR | message #{message_id} (#{response})"))

        callbacks[:receiver].send(callbacks[:error_callback], message_id, response['code'], response['payload'])
      end
    elsif (ENV["LOG_ALL_RECEPTOR_MESSAGES"] || 0)&.to_i != 0
      # noop, it's not error if not registered, can be processed by another pod
      logger.debug(response_log("NOT REGISTERED | #{message_id} (#{response['code']})"))
    end
  else
    logger.error(response_log("MISSING | Message id (in_response_to) not received! #{response}"))
  end
rescue JSON::ParserError => e
  logger.error(response_log("Failed to parse Kafka response (#{e.message})\n#{message.payload}"))
rescue => e
  logger.error(response_log("#{e}\n#{e.backtrace.join("\n")}"))
ensure
  message.ack unless config.queue_auto_ack
end
queue_opts() click to toggle source

No persist_ref here, because all instances (pods) needs to receive kafka message TODO: temporary changed to unique persist_ref

# File lib/receptor_controller/client/response_worker.rb, line 227
def queue_opts
  return @queue_opts if @queue_opts

  @queue_opts               = {:service  => config.queue_topic,
                               :auto_ack => config.queue_auto_ack}
  @queue_opts[:max_bytes]   = config.queue_max_bytes if config.queue_max_bytes
  @queue_opts[:persist_ref] = ENV['HOSTNAME']
  @queue_opts
end
reset_last_checked_at(callbacks) click to toggle source

Reset last_checked_at to avoid timeout in multi-response messages

# File lib/receptor_controller/client/response_worker.rb, line 219
def reset_last_checked_at(callbacks)
  timeout_lock.synchronize do
    callbacks[:last_checked_at] = Time.now.utc
  end
end
response_log(message) click to toggle source
# File lib/receptor_controller/client/response_worker.rb, line 255
def response_log(message)
  "Receptor Response [#{queue_opts[:persist_ref]}]: #{message}"
end
unpack_payload(data) click to toggle source

Tries to decompress String response If not a gzip, it's a String error from receptor node

# File lib/receptor_controller/client/response_worker.rb, line 208
def unpack_payload(data)
  decoded = Base64.decode64(data)
  if gzipped?(decoded)
    gz = Zlib::GzipReader.new(StringIO.new(decoded))
    JSON.parse(gz.read)
  else
    data
  end
end