class ReceptorController::Client::DirectiveNonBlocking

Non-blocking directive for requests through POST /job Directive's call returns either message ID or nil

Callback blocks can be specified for handling responses @example:

receiver = <object with methods below>
directive
  .on_success do |msg_id, response|
    receiver.process_response(msg_id, response)
  end
  .on_error do |msg_id, code, response|
    receiver.process_error(msg_id, code, response)
  end
  .on_timeout do |msg_id|
    receiver.process_timeout(msg_id)
  end
  .on_eof do |msg_id|
    receiver.process_eof(msg_id)
  end
  .on_eof do |msg_id|
    logger.debug("[#{msg_id}] EOF message received")
  end

directive.call

Public Class Methods

new(name:, account:, node_id:, payload:, client:, log_message_common: nil) click to toggle source
# File lib/receptor_controller/client/directive_non_blocking.rb, line 30
def initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil)
  super

  @success_callbacks = []
  @eof_callbacks     = []
  @timeout_callbacks = []
  @error_callbacks   = []

  @responses_count = Concurrent::AtomicFixnum.new
  @eof_lock        = Mutex.new
  @eof_wait        = ConditionVariable.new
end

Public Instance Methods

call(body = default_body) click to toggle source

Entrypoint for request

# File lib/receptor_controller/client/directive_non_blocking.rb, line 44
def call(body = default_body)
  response = Faraday.post(config.job_url, body.to_json, client.headers(account))
  if response.success?
    msg_id = JSON.parse(response.body)['id']

    # registers message id for kafka responses
    response_worker.register_message(msg_id, self)
    logger.debug("Receptor response [#{ReceptorController::Client::Configuration.default.queue_persist_ref}]: registering message #{msg_id}, href_slug: #{log_message_common}")

    msg_id
  else
    logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common}): HTTP #{response.status}", account, node_id))
    nil
  end
rescue Faraday::Error => e
  logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common}). POST /job error", account, node_id, e))
  nil
rescue => e
  logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common})", account, node_id, e))
  nil
end
on_eof(&block) click to toggle source
# File lib/receptor_controller/client/directive_non_blocking.rb, line 71
def on_eof(&block)
  @eof_callbacks << block if block_given?
  self
end
on_error(&block) click to toggle source
# File lib/receptor_controller/client/directive_non_blocking.rb, line 81
def on_error(&block)
  @error_callbacks << block if block_given?
  self
end
on_success(&block) click to toggle source
# File lib/receptor_controller/client/directive_non_blocking.rb, line 66
def on_success(&block)
  @success_callbacks << block if block_given?
  self
end
on_timeout(&block) click to toggle source
# File lib/receptor_controller/client/directive_non_blocking.rb, line 76
def on_timeout(&block)
  @timeout_callbacks << block if block_given?
  self
end
response_error(msg_id, response_code, response) click to toggle source

Handles error responses in Threads EOF processing waits until all threads are finished

# File lib/receptor_controller/client/directive_non_blocking.rb, line 102
def response_error(msg_id, response_code, response)
  response_thread do
    @error_callbacks.each { |block| block.call(msg_id, response_code, response) }
  end
end
response_success(msg_id, message_type, response) click to toggle source

Handles successful responses in Threads EOF processing waits until all response threads are finished

# File lib/receptor_controller/client/directive_non_blocking.rb, line 88
def response_success(msg_id, message_type, response)
  if message_type == MESSAGE_TYPE_EOF
    eof_thread do
      @eof_callbacks.each { |block| block.call(msg_id) }
    end
  else
    response_thread do
      @success_callbacks.each { |block| block.call(msg_id, response) }
    end
  end
end
response_timeout(msg_id) click to toggle source

Error state: Any response wasn't received in `Configuration.response_timeout`

# File lib/receptor_controller/client/directive_non_blocking.rb, line 109
def response_timeout(msg_id)
  response_thread do
    @timeout_callbacks.each { |block| block.call(msg_id) }
  end
end

Private Instance Methods

eof_thread() { || ... } click to toggle source

Messages in kafka are received serialized, EOF is always last

> @responses_count has to be always positive

until all responses are processed
# File lib/receptor_controller/client/directive_non_blocking.rb, line 135
def eof_thread
  Thread.new do
    @eof_lock.synchronize do
      @eof_wait.wait(@eof_lock) if @responses_count.value > 0
    end

    yield
  end
end
response_thread() { || ... } click to toggle source

Responses are processed in threads to be able to call subrequests EOF response is blocked by thread-safe counter

# File lib/receptor_controller/client/directive_non_blocking.rb, line 119
def response_thread
  @responses_count.increment

  Thread.new do
    yield
  ensure
    @responses_count.decrement
    @eof_lock.synchronize do
      @eof_wait.signal if @responses_count.value == 0
    end
  end
end