class Appfuel::Service::Worker

Public Class Methods

allow_all_action?() click to toggle source
# File lib/appfuel/service/worker.rb, line 31
def allow_all_action?
  @allow_all_actions
end
container_class_type() click to toggle source
# File lib/appfuel/service/worker.rb, line 27
def container_class_type
  'consumers'
end
inherited(klass) click to toggle source
# File lib/appfuel/service/worker.rb, line 10
def inherited(klass)
  container = klass.app_container
  consumer_key = "#{klass.top_container_key}.consumer_keys"
  unless container.key?(consumer_key)
    container.register(consumer_key, [])
  end
  key = klass.container_class_path
  container.register(key, klass)
  container[consumer_key] << key

  @allow_all_actions = true
end
register(routes) click to toggle source
# File lib/appfuel/service/worker.rb, line 35
def register(routes)
  @allow_all_actions = false

  routes = [routes] if routes.is_a?(String)

  unless routes.is_a?(Array)
    fail "register accepts only String or Array"
  end
  @registered_actions = routes
end
registered?(route) click to toggle source
# File lib/appfuel/service/worker.rb, line 50
def registered?(route)
  return true if allowed_all_actions? == true

  registered_actions.include?(route)
end
registered_actions() click to toggle source
# File lib/appfuel/service/worker.rb, line 46
def registered_actions
  @registered_actions ||= []
end
top_container_key() click to toggle source
# File lib/appfuel/service/worker.rb, line 23
def top_container_key
  "message_brokers"
end

Public Instance Methods

logger() click to toggle source
# File lib/appfuel/service/worker.rb, line 115
def logger
  @logger ||= app_container[:logger]
end
publish_rpc(request, response) click to toggle source

Publish a response for the rpc request.

@param request [MsgRequest] @param respons [Appfuel::Response] @return [Nil]

# File lib/appfuel/service/worker.rb, line 105
def publish_rpc(request, response)
  options = {
    correlation_id: request.correlation_id,
    routing_key: request.reply_to,
    headers: { "action_route" => request.action_route }
  }
  publish(response.to_json, options)
  nil
end
rpc?(properties) click to toggle source
# File lib/appfuel/service/worker.rb, line 96
def rpc?(properties)
  properties.correlation_id && properties.reply_to
end
work_with_params(msg, delivery_info, properties) click to toggle source

Sneakers worker hook to handle messages from RabbitMQ

@param msg [String] JSON string of inputs @param delivery_info [Bunny::Delivery::Info] @param properties [Bunny::MessageProperties] @return [Appfuel::Response]

# File lib/appfuel/service/worker.rb, line 64
def work_with_params(msg, delivery_info, properties)
  begin
    request  = create_request(msg, delivery_info, properties)
  rescue => e
    request = create_request('{}', delivery_info, properties)
    handle_exception("failed to build request", e, request)
    return ack!
  end

  handle_invalid_routes(request.action_route) do |exception|
    handle_exception("dispatch not allowed", exception, request)
    return ack!
  end

  begin
    response = dispatch(request, app_container)
  rescue => e
    handle_exception("failed to dispatch", e, request)
    return ack!
  end

  if response.failure?
    logger.error "[#{request.action_route}] #{response.errors.format}"
  end

  if rpc?(properties)
    publish_rpc(request, response)
  end

  ack!
end

Private Instance Methods

create_error_response(key, err_msg) click to toggle source
# File lib/appfuel/service/worker.rb, line 145
def create_error_response(key, err_msg)
  Appfuel::ResponseHandler.new.error(key => [error_msg])
end
create_request(msg, delivery_info, properties) click to toggle source
# File lib/appfuel/service/worker.rb, line 149
def create_request(msg, delivery_info, properties)
  Appfuel::Service::MsgRequest.new(msg, delivery_info, properties)
end
handle_exception(label, e, request) click to toggle source
# File lib/appfuel/service/worker.rb, line 136
def handle_exception(label, e, request)
  err_msg = "[queue #{queue}] #{label}: #{e.message}"
  logger.error err_msg
  if rpc?(request.properties)
    response = create_error_response(queue, error_msg)
    publish_rpc(request, response)
  end
end
handle_invalid_routes(route) { |e| ... } click to toggle source
# File lib/appfuel/service/worker.rb, line 121
def handle_invalid_routes(route)
  if self.class.registered_actions.empty? ||
    self.class.registered?(request.action_route)
    return false
  end

  msg = "route #{request.action_route} not allowed for queue #{queue}"
  e = RuntimeError.new(msg)
  if block_given?
    yield e
  else
    return e
  end
end