class Asynk::SyncPublisher
Public Class Methods
new(routing_key, params)
click to toggle source
# File lib/asynk/sync_publisher.rb, line 3 def initialize(routing_key, params) @routing_key = routing_key @params = params @message_id = (@params.delete(:message_id) || generate_message_id) @wait_timeout = (@params.delete(:timeout) || Asynk.config[:sync_publish_wait_timeout]) * 1000 @correlation_id = generate_message_id end
Public Instance Methods
generate_message_id(legnth = 8)
click to toggle source
# File lib/asynk/sync_publisher.rb, line 32 def generate_message_id(legnth = 8) SecureRandom.hex(legnth) end
send()
click to toggle source
# File lib/asynk/sync_publisher.rb, line 11 def send global_start_time = Asynk::Benchmark.start if Asynk.config[:publisher_execution_time] Asynk.broker.pool.with do |channel, exchange, reply_queue| exchange.publish(@params.to_json, message_id: @message_id, routing_key: @routing_key, correlation_id: @correlation_id, reply_to: reply_queue.name) start_time = Asynk::Benchmark.start while !@response do delivery_info, properties, payload = reply_queue.pop @response = payload if payload && properties[:correlation_id] == @correlation_id raise(RuntimeError.new('Timeout error reached')) if @wait_timeout <= Asynk::Benchmark.end(start_time) end end message = Asynk::Response.try_to_create_from_hash(@response) if Asynk.config[:publisher_execution_time] Asynk.logger.info "Sending sync message to #{@routing_key}:#{@message_id}. Completed In: #{Asynk::Benchmark.end(global_start_time)} ms." end message end