module MessageDriver::Client

The client module is the primary client API for MessageDriver. It can either be included in a class that is using it, or used directly.

@example Included as a Module

class MyClass
  include MessageDriver::Client

  def do_work
    publish(:my_destination, 'Hi Mom!')
  end
end

@example Used Directly

class DirectClass
  def use_directly
    MesageDriver::Client.find_destination(:my_queue)
  end
end

Public Class Methods

for_broker(name) click to toggle source

@private

# File lib/message_driver/client.rb, line 163
def for_broker(name)
  Module.new do
    include Client
    extend self

    define_method :broker_name do
      name
    end
  end
end

Public Instance Methods

[](index) click to toggle source

@return [Client] the client for the specified broker @example

class MyClass
  include MessageDriver::Client[:my_broker]
end
# File lib/message_driver/client.rb, line 180
def [](index)
  Broker.client(index)
end
ack_message(message, options = {}) click to toggle source
# File lib/message_driver/client.rb, line 72
def ack_message(message, options = {})
  message.ack(options)
end
broker() click to toggle source

@return [Broker] the broker associated with this Client module

# File lib/message_driver/client.rb, line 153
def broker
  Broker.broker(broker_name)
end
broker_name() click to toggle source

@return [Symbol] the name of the broker associated with this Client module

# File lib/message_driver/client.rb, line 158
def broker_name
  Broker::DEFAULT_BROKER_NAME
end
clear_context() click to toggle source

@private

# File lib/message_driver/client.rb, line 144
def clear_context
  wrapper = fetch_context_wrapper(false)
  unless wrapper.nil?
    wrapper.invalidate
    set_context_wrapper(nil)
  end
end
consumer(key, &block) click to toggle source

@!group Defining and Looking Up Consumers

# File lib/message_driver/client.rb, line 48
def consumer(key, &block)
  broker.consumer(key, &block)
end
current_adapter_context(initialize = true) click to toggle source

@private

# File lib/message_driver/client.rb, line 127
def current_adapter_context(initialize = true)
  ctx = fetch_context_wrapper(initialize)
  ctx.nil? ? nil : ctx.ctx
end
dynamic_destination(dest_name, dest_options = {}, message_props = {}) click to toggle source

@!group Defining and Looking up Destinations

# File lib/message_driver/client.rb, line 28
def dynamic_destination(dest_name, dest_options = {}, message_props = {})
  current_adapter_context.create_destination(dest_name, dest_options, message_props)
end
find_consumer(consumer) click to toggle source
# File lib/message_driver/client.rb, line 52
def find_consumer(consumer)
  broker.find_consumer(consumer)
end
find_destination(destination_name) click to toggle source

(see MessageDriver::Broker#find_destination) @note if destination_name is a {Destination::Base}, find_destination will just

return that destination back
# File lib/message_driver/client.rb, line 35
def find_destination(destination_name)
  case destination_name
  when Destination::Base
    destination_name
  else
    broker.find_destination(destination_name)
  end
end
nack_message(message, options = {}) click to toggle source
# File lib/message_driver/client.rb, line 76
def nack_message(message, options = {})
  message.nack(options)
end
pop_message(destination, options = {}) click to toggle source

@!group Receiving Messages

# File lib/message_driver/client.rb, line 68
def pop_message(destination, options = {})
  find_destination(destination).pop_message(options)
end
publish(destination, body, headers = {}, properties = {}) click to toggle source

@!group Sending Messages

# File lib/message_driver/client.rb, line 60
def publish(destination, body, headers = {}, properties = {})
  find_destination(destination).publish(body, headers, properties)
end
subscribe(destination_name, consumer_name, options = {}) click to toggle source
# File lib/message_driver/client.rb, line 80
def subscribe(destination_name, consumer_name, options = {})
  consumer = find_consumer(consumer_name)
  subscribe_with(destination_name, options, &consumer)
end
subscribe_with(destination_name, options = {}, &consumer) click to toggle source
# File lib/message_driver/client.rb, line 85
def subscribe_with(destination_name, options = {}, &consumer)
  destination = find_destination(destination_name)
  current_adapter_context.subscribe(destination, options, &consumer)
end
with_adapter_context(adapter_context) { || ... } click to toggle source

@private

# File lib/message_driver/client.rb, line 133
def with_adapter_context(adapter_context)
  old_ctx = fetch_context_wrapper(false)
  Thread.current[adapter_context_key] = build_context_wrapper(adapter_context)
  begin
    yield
  ensure
    set_context_wrapper(old_ctx)
  end
end
with_message_transaction(options = {}) { || ... } click to toggle source

@!group Transaction Management

# File lib/message_driver/client.rb, line 94
def with_message_transaction(options = {})
  wrapper = fetch_context_wrapper
  wrapper.increment_transaction_depth
  begin
    if wrapper.ctx.supports_transactions?
      if wrapper.transaction_depth == 1
        wrapper.ctx.begin_transaction(options)
        begin
          yield
        rescue
          begin
            wrapper.ctx.rollback_transaction
          rescue => e
            logger.error exception_to_str(e)
          end
          raise
        end
        wrapper.ctx.commit_transaction
      else
        yield
      end
    else
      logger.debug('this adapter does not support transactions')
      yield
    end
  ensure
    wrapper.decrement_transaction_depth
  end
end

Private Instance Methods

adapter() click to toggle source
# File lib/message_driver/client.rb, line 203
def adapter
  broker.adapter
end
adapter_context_key() click to toggle source
# File lib/message_driver/client.rb, line 207
def adapter_context_key
  @__adapter_context_key ||= "#{broker_name}_adapter_context".to_sym
end
build_context_wrapper(ctx = adapter.new_context) click to toggle source
# File lib/message_driver/client.rb, line 199
def build_context_wrapper(ctx = adapter.new_context)
  ContextWrapper.new(ctx)
end
fetch_context_wrapper(initialize = true) click to toggle source
# File lib/message_driver/client.rb, line 186
def fetch_context_wrapper(initialize = true)
  wrapper = Thread.current[adapter_context_key]
  if wrapper.nil? || !wrapper.valid?
    wrapper = (build_context_wrapper if initialize)
    Thread.current[adapter_context_key] = wrapper
  end
  wrapper
end
for_broker(name) click to toggle source

@private

# File lib/message_driver/client.rb, line 163
def for_broker(name)
  Module.new do
    include Client
    extend self

    define_method :broker_name do
      name
    end
  end
end
set_context_wrapper(wrapper) click to toggle source
# File lib/message_driver/client.rb, line 195
def set_context_wrapper(wrapper)
  Thread.current[adapter_context_key] = wrapper
end