class Asynk::Worker
Public Class Methods
new(bunny_connection, consumer, instance_id)
click to toggle source
# File lib/asynk/worker.rb, line 5 def initialize(bunny_connection, consumer, instance_id) @instance_id = instance_id @consumer = consumer @ch = bunny_connection.create_channel @ch.prefetch(1) @default_exchange = @ch.default_exchange x = @ch.topic(Asynk.config[:mq_exchange]) q = @ch.queue(consumer.queue_name, @consumer.queue_options || {}) @consumer.routing_keys.each{ |routing_key| q.bind(x, routing_key: routing_key) } q.subscribe(@consumer.subscribe_arguments || {}, &method(:on_event)) Asynk.logger.info ["#{@consumer.name}:#{@instance_id} worker accepting connections. ", " mq_exchange: #{Asynk.config[:mq_exchange]}", " queue_name: #{@consumer.queue_name}", " queue_options: #{@consumer.queue_options}", " routing_keys: #{@consumer.routing_keys}" ].join("\r\n") end
Public Instance Methods
on_event(delivery_info, properties, payload)
click to toggle source
# File lib/asynk/worker.rb, line 27 def on_event(delivery_info, properties, payload) global_start_time = Asynk::Benchmark.start message = Asynk::Message.new(delivery_info, properties, payload) consumer_instance = @consumer.new(@ch, delivery_info) do |result| @default_exchange.publish(convert_to_valid_json_string(result), routing_key: properties.reply_to, correlation_id: properties.correlation_id) if Asynk.config[:respond_back_execution_time] Asynk.logger.info "Responded to message #{message.routing_key}:#{message.message_id} In: #{Asynk::Benchmark.end(global_start_time)} ms." end Asynk.logger.debug "#{@consumer.name}:#{@instance_id} Respoding to message id: #{message.message_id} with: #{result.to_s}. " end Asynk.logger.info "#{@consumer.name}:#{@instance_id} Got message #{message.routing_key}:#{message.message_id} with payload: #{message.body}" consumer_instance.invoke_processing(message) end
shutdown()
click to toggle source
# File lib/asynk/worker.rb, line 44 def shutdown Asynk.logger.info "#{@consumer.name}:#{@instance_id} stopping..." @ch.close end
Private Instance Methods
convert_to_valid_json_string(data)
click to toggle source
# File lib/asynk/worker.rb, line 50 def convert_to_valid_json_string(data) data = '' if data.nil? data.is_a?(String) ? data : data.to_json end