class Appfuel::Service::Worker
Public Class Methods
allow_all_action?()
click to toggle source
# File lib/appfuel/service/worker.rb, line 31 def allow_all_action? @allow_all_actions end
container_class_type()
click to toggle source
# File lib/appfuel/service/worker.rb, line 27 def container_class_type 'consumers' end
inherited(klass)
click to toggle source
# File lib/appfuel/service/worker.rb, line 10 def inherited(klass) container = klass.app_container consumer_key = "#{klass.top_container_key}.consumer_keys" unless container.key?(consumer_key) container.register(consumer_key, []) end key = klass.container_class_path container.register(key, klass) container[consumer_key] << key @allow_all_actions = true end
register(routes)
click to toggle source
# File lib/appfuel/service/worker.rb, line 35 def register(routes) @allow_all_actions = false routes = [routes] if routes.is_a?(String) unless routes.is_a?(Array) fail "register accepts only String or Array" end @registered_actions = routes end
registered?(route)
click to toggle source
# File lib/appfuel/service/worker.rb, line 50 def registered?(route) return true if allowed_all_actions? == true registered_actions.include?(route) end
registered_actions()
click to toggle source
# File lib/appfuel/service/worker.rb, line 46 def registered_actions @registered_actions ||= [] end
top_container_key()
click to toggle source
# File lib/appfuel/service/worker.rb, line 23 def top_container_key "message_brokers" end
Public Instance Methods
logger()
click to toggle source
# File lib/appfuel/service/worker.rb, line 115 def logger @logger ||= app_container[:logger] end
publish_rpc(request, response)
click to toggle source
Publish a response for the rpc request.
@param request [MsgRequest] @param respons [Appfuel::Response] @return [Nil]
# File lib/appfuel/service/worker.rb, line 105 def publish_rpc(request, response) options = { correlation_id: request.correlation_id, routing_key: request.reply_to, headers: { "action_route" => request.action_route } } publish(response.to_json, options) nil end
rpc?(properties)
click to toggle source
# File lib/appfuel/service/worker.rb, line 96 def rpc?(properties) properties.correlation_id && properties.reply_to end
work_with_params(msg, delivery_info, properties)
click to toggle source
Sneakers worker hook to handle messages from RabbitMQ
@param msg [String] JSON string of inputs @param delivery_info [Bunny::Delivery::Info] @param properties [Bunny::MessageProperties] @return [Appfuel::Response]
# File lib/appfuel/service/worker.rb, line 64 def work_with_params(msg, delivery_info, properties) begin request = create_request(msg, delivery_info, properties) rescue => e request = create_request('{}', delivery_info, properties) handle_exception("failed to build request", e, request) return ack! end handle_invalid_routes(request.action_route) do |exception| handle_exception("dispatch not allowed", exception, request) return ack! end begin response = dispatch(request, app_container) rescue => e handle_exception("failed to dispatch", e, request) return ack! end if response.failure? logger.error "[#{request.action_route}] #{response.errors.format}" end if rpc?(properties) publish_rpc(request, response) end ack! end
Private Instance Methods
create_error_response(key, err_msg)
click to toggle source
# File lib/appfuel/service/worker.rb, line 145 def create_error_response(key, err_msg) Appfuel::ResponseHandler.new.error(key => [error_msg]) end
create_request(msg, delivery_info, properties)
click to toggle source
# File lib/appfuel/service/worker.rb, line 149 def create_request(msg, delivery_info, properties) Appfuel::Service::MsgRequest.new(msg, delivery_info, properties) end
handle_exception(label, e, request)
click to toggle source
# File lib/appfuel/service/worker.rb, line 136 def handle_exception(label, e, request) err_msg = "[queue #{queue}] #{label}: #{e.message}" logger.error err_msg if rpc?(request.properties) response = create_error_response(queue, error_msg) publish_rpc(request, response) end end
handle_invalid_routes(route) { |e| ... }
click to toggle source
# File lib/appfuel/service/worker.rb, line 121 def handle_invalid_routes(route) if self.class.registered_actions.empty? || self.class.registered?(request.action_route) return false end msg = "route #{request.action_route} not allowed for queue #{queue}" e = RuntimeError.new(msg) if block_given? yield e else return e end end