class MessageDriver::Adapters::InMemoryAdapter::Destination

Public Instance Methods

handle_message_count() click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 34
def handle_message_count
  message_queue.size
end
handle_pop_message(ctx, options = {}) click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 38
def handle_pop_message(ctx, options = {})
  _fetch_message(ctx, options)
end
handle_publish(body, headers = {}, properties = {}) click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 49
def handle_publish(body, headers = {}, properties = {})
  raw_body = body
  b, h, p = middleware.on_publish(body, headers, properties)
  msg = Message.new(nil, self, b, h, p, raw_body)
  message_queue << msg
  _deliver_messages
end
handle_subscribe(options = {}, &consumer) click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 42
def handle_subscribe(options = {}, &consumer)
  subscription = Subscription.new(adapter, self, consumer, options)
  adapter.add_subscription_for(name, subscription)
  _deliver_messages
  subscription
end
subscriptions() click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 30
def subscriptions
  adapter.subscriptions_for(name)
end

Private Instance Methods

_deliver_messages() click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 74
def _deliver_messages
  unless subscriptions.empty?
    until (msg = _fetch_message(current_adapter_context)).nil?
      sub = next_subscription # this actually cycles through the subscriptions
      sub.deliver_message(msg)
    end
  end
end
_fetch_message(ctx, _options = {}) click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 63
def _fetch_message(ctx, _options = {})
  message = message_queue.shift
  if message.nil?
    nil
  else
    raw_body = message.body
    b, h, p = middleware.on_consume(message.body, message.headers, message.properties)
    Message.new(ctx, self, b, h, p, raw_body)
  end
end
message_queue() click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 83
def message_queue
  adapter.message_queue_for(name)
end
next_subscription() click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 59
def next_subscription
  adapter.next_subscription_for(name)
end