class ReceptorController::Client::DirectiveNonBlocking
Non-blocking directive for requests through POST /job Directive's call returns either message ID or nil
Callback blocks can be specified for handling responses @example:
receiver = <object with methods below> directive .on_success do |msg_id, response| receiver.process_response(msg_id, response) end .on_error do |msg_id, code, response| receiver.process_error(msg_id, code, response) end .on_timeout do |msg_id| receiver.process_timeout(msg_id) end .on_eof do |msg_id| receiver.process_eof(msg_id) end .on_eof do |msg_id| logger.debug("[#{msg_id}] EOF message received") end directive.call
Public Class Methods
new(name:, account:, node_id:, payload:, client:, log_message_common: nil)
click to toggle source
Calls superclass method
ReceptorController::Client::Directive::new
# File lib/receptor_controller/client/directive_non_blocking.rb, line 30 def initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil) super @success_callbacks = [] @eof_callbacks = [] @timeout_callbacks = [] @error_callbacks = [] @responses_count = Concurrent::AtomicFixnum.new @eof_lock = Mutex.new @eof_wait = ConditionVariable.new end
Public Instance Methods
call(body = default_body)
click to toggle source
Entrypoint for request
# File lib/receptor_controller/client/directive_non_blocking.rb, line 44 def call(body = default_body) response = Faraday.post(config.job_url, body.to_json, client.headers(account)) if response.success? msg_id = JSON.parse(response.body)['id'] # registers message id for kafka responses response_worker.register_message(msg_id, self) logger.debug("Receptor response [#{ReceptorController::Client::Configuration.default.queue_persist_ref}]: registering message #{msg_id}, href_slug: #{log_message_common}") msg_id else logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common}): HTTP #{response.status}", account, node_id)) nil end rescue Faraday::Error => e logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common}). POST /job error", account, node_id, e)) nil rescue => e logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common})", account, node_id, e)) nil end
on_eof(&block)
click to toggle source
# File lib/receptor_controller/client/directive_non_blocking.rb, line 71 def on_eof(&block) @eof_callbacks << block if block_given? self end
on_error(&block)
click to toggle source
# File lib/receptor_controller/client/directive_non_blocking.rb, line 81 def on_error(&block) @error_callbacks << block if block_given? self end
on_success(&block)
click to toggle source
# File lib/receptor_controller/client/directive_non_blocking.rb, line 66 def on_success(&block) @success_callbacks << block if block_given? self end
on_timeout(&block)
click to toggle source
# File lib/receptor_controller/client/directive_non_blocking.rb, line 76 def on_timeout(&block) @timeout_callbacks << block if block_given? self end
response_error(msg_id, response_code, response)
click to toggle source
Handles error responses in Threads EOF processing waits until all threads are finished
# File lib/receptor_controller/client/directive_non_blocking.rb, line 102 def response_error(msg_id, response_code, response) response_thread do @error_callbacks.each { |block| block.call(msg_id, response_code, response) } end end
response_success(msg_id, message_type, response)
click to toggle source
Handles successful responses in Threads EOF processing waits until all response threads are finished
# File lib/receptor_controller/client/directive_non_blocking.rb, line 88 def response_success(msg_id, message_type, response) if message_type == MESSAGE_TYPE_EOF eof_thread do @eof_callbacks.each { |block| block.call(msg_id) } end else response_thread do @success_callbacks.each { |block| block.call(msg_id, response) } end end end
response_timeout(msg_id)
click to toggle source
Error state: Any response wasn't received in `Configuration.response_timeout`
# File lib/receptor_controller/client/directive_non_blocking.rb, line 109 def response_timeout(msg_id) response_thread do @timeout_callbacks.each { |block| block.call(msg_id) } end end
Private Instance Methods
eof_thread() { || ... }
click to toggle source
Messages in kafka are received serialized, EOF is always last
> @responses_count has to be always positive¶ ↑
until all responses are processed
# File lib/receptor_controller/client/directive_non_blocking.rb, line 135 def eof_thread Thread.new do @eof_lock.synchronize do @eof_wait.wait(@eof_lock) if @responses_count.value > 0 end yield end end
response_thread() { || ... }
click to toggle source
Responses are processed in threads to be able to call subrequests EOF response is blocked by thread-safe counter
# File lib/receptor_controller/client/directive_non_blocking.rb, line 119 def response_thread @responses_count.increment Thread.new do yield ensure @responses_count.decrement @eof_lock.synchronize do @eof_wait.signal if @responses_count.value == 0 end end end