class Protein::AMQPAdapter
Attributes
calls[RW]
reply_queue[R]
Public Class Methods
call(request_payload)
click to toggle source
# File lib/protein/amqp_adapter.rb, line 40 def call(request_payload) prepare_client call_id = SecureRandom.uuid @x.publish(request_payload, correlation_id: call_id, routing_key: @server_queue, reply_to: @reply_queue.name, expiration: timeout) call = Concurrent::Hash.new mutex = Mutex.new condition = ConditionVariable.new call[:mutex] = mutex call[:condition] = condition calls[call_id] = call mutex.synchronize { condition.wait(mutex, timeout && timeout * 0.001) } response = call[:response] calls.delete(call_id) if response == nil raise(TransportError, "timeout after #{timeout}ms") elsif response == "ESRV" raise(TransportError, "failed to process the request") else response end end
from_hash(hash)
click to toggle source
# File lib/protein/amqp_adapter.rb, line 8 def from_hash(hash) if (new_url = hash[:url]) url(new_url) end if (new_queue = hash[:queue]) queue(new_queue) end if hash.has_key?(:timeout) timeout(hash[:timeout]) end end
push(message_payload)
click to toggle source
# File lib/protein/amqp_adapter.rb, line 72 def push(message_payload) prepare_client @x.publish(message_payload, routing_key: @server_queue) end
queue(queue = nil)
click to toggle source
# File lib/protein/amqp_adapter.rb, line 27 def queue(queue = nil) @queue = queue if queue @queue || raise(DefinitionError, "queue is not defined") end
serve(router)
click to toggle source
# File lib/protein/amqp_adapter.rb, line 79 def serve(router) @conn = Bunny.new(url) @terminating = false @processing = false begin @conn.start rescue Bunny::TCPConnectionFailed => e Protein.logger.error "RPC server connection error: #{e.inspect}" log_error(e) raise(e) end @ch = @conn.create_channel @ch.prefetch(1) @q = @ch.queue(queue) @x = @ch.default_exchange Signal.trap("TERM") do if @processing @terminating = true else exit end end Signal.trap("INT") do if @processing @terminating = true else exit end end Protein.logger.info "Connected to #{url}, serving RPC calls from #{queue}" loop do begin @q.subscribe(block: true, manual_ack: true) do |delivery_info, properties, payload| @processing = true begin @error = nil response = Processor.call(router, payload) rescue Exception => error @error = error response = "ESRV" end if response @x.publish(response, routing_key: properties.reply_to, correlation_id: properties.correlation_id) end @ch.ack(delivery_info.delivery_tag) @processing = false break if @terminating if @error log_error(@error) raise(@error) end end rescue StandardError => e @processing = false break if @terminating log_error(e) Protein.logger.error "RPC server error: #{e.inspect}, restarting the server in 5s..." sleep 5 end end end
timeout(timeout = :not_set)
click to toggle source
# File lib/protein/amqp_adapter.rb, line 32 def timeout(timeout = :not_set) @timeout = timeout if timeout != :not_set instance_variable_defined?("@timeout") ? @timeout : 15_000 end
url(url = nil)
click to toggle source
# File lib/protein/amqp_adapter.rb, line 22 def url(url = nil) @url = url if url @url || raise(DefinitionError, "url is not defined") end
Private Class Methods
log_error(error)
click to toggle source
# File lib/protein/amqp_adapter.rb, line 152 def log_error(error) @error_logger ||= Protein.config.error_logger @error_logger.call(error) if @error_logger end
prepare_client()
click to toggle source
# File lib/protein/amqp_adapter.rb, line 157 def prepare_client return if @conn @conn = Bunny.new(url) @conn.start @ch = @conn.create_channel @x = @ch.default_exchange @server_queue = queue @reply_queue = @ch.queue("", exclusive: true) @calls = Concurrent::Hash.new @reply_queue.subscribe do |delivery_info, properties, payload| call_id = properties[:correlation_id] call = calls[call_id] if call mutex = call[:mutex] condition = call[:condition] call[:response] = payload mutex.synchronize { condition.signal } end end end