class ReceptorController::Client::DirectiveBlocking

Blocking directive for requests through POST /job Successful POST causes locking current thread until response from Kafka comes

Raises kind of ReceptorController::Client::Error in case of problems/timeout

Attributes

response_data[RW]
response_exception[RW]
response_lock[RW]
response_waiting[RW]

Public Class Methods

new(name:, account:, node_id:, payload:, client:, log_message_common: nil) click to toggle source
# File lib/receptor_controller/client/directive_blocking.rb, line 9
def initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil)
  super
  self.response_lock = Mutex.new
  self.response_waiting = ConditionVariable.new
  self.response_data = nil
  self.response_exception = nil
end

Public Instance Methods

call(body = default_body) click to toggle source
# File lib/receptor_controller/client/directive_blocking.rb, line 17
def call(body = default_body)
  @url = JSON.parse(body[:payload])['url']
  response = connection.post(config.job_path, body.to_json)

  msg_id = JSON.parse(response.body)['id']

  logger.debug("Receptor response [#{ReceptorController::Client::Configuration.default.queue_persist_ref}]: registering message #{msg_id}, href_slug: #{log_message_common}")
  # registers message id for kafka responses
  response_worker.register_message(msg_id, self)
  wait_for_response(msg_id)
rescue Faraday::Error => e
  msg = receptor_log_msg("Directive #{name} failed (#{log_message_common}) [MSG: #{msg_id}]", account, node_id, e)
  raise ReceptorController::Client::ControllerResponseError.new(msg)
end
response_error(msg_id, response_code, err_message) click to toggle source
# File lib/receptor_controller/client/directive_blocking.rb, line 56
def response_error(msg_id, response_code, err_message)
  response_lock.synchronize do
    self.response_data = nil
    self.response_exception = ReceptorController::Client::ResponseError.new("#{err_message} (code: #{response_code}) (#{log_message_common}) [MSG: #{msg_id}]")
    response_waiting.signal
  end
end
response_success(msg_id, message_type, response) click to toggle source

TODO: Review when future plugins with more “response” messages come

# File lib/receptor_controller/client/directive_blocking.rb, line 43
def response_success(msg_id, message_type, response)
  response_lock.synchronize do
    if message_type == MESSAGE_TYPE_RESPONSE
      self.response_data = response
    elsif message_type == MESSAGE_TYPE_EOF
      response_waiting.signal
    else
      self.response_exception = ReceptorController::Client::UnknownResponseTypeError.new("#{log_message_common}[MSG: #{msg_id}]")
      response_waiting.signal
    end
  end
end
response_timeout(msg_id) click to toggle source
# File lib/receptor_controller/client/directive_blocking.rb, line 64
def response_timeout(msg_id)
  response_lock.synchronize do
    self.response_data = nil
    self.response_exception = ReceptorController::Client::ResponseTimeoutError.new("Timeout (#{log_message_common}) [MSG: #{msg_id}]")
    response_waiting.signal
  end
end
wait_for_response(_msg_id) click to toggle source
# File lib/receptor_controller/client/directive_blocking.rb, line 32
def wait_for_response(_msg_id)
  response_lock.synchronize do
    response_waiting.wait(response_lock)

    raise response_exception if response_failed?

    response_data.dup
  end
end

Private Instance Methods

connection() click to toggle source
# File lib/receptor_controller/client/directive_blocking.rb, line 76
def connection
  @connection ||= Faraday.new(config.controller_url, :headers => client.headers(account)) do |c|
    c.use(Faraday::Response::RaiseError)
    c.adapter(Faraday.default_adapter)
  end
end
response_failed?() click to toggle source
# File lib/receptor_controller/client/directive_blocking.rb, line 83
def response_failed?
  response_exception.present?
end