class MessageDriver::Adapters::InMemoryAdapter::Destination
Public Instance Methods
handle_message_count()
click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 34 def handle_message_count message_queue.size end
handle_pop_message(ctx, options = {})
click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 38 def handle_pop_message(ctx, options = {}) _fetch_message(ctx, options) end
handle_publish(body, headers = {}, properties = {})
click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 49 def handle_publish(body, headers = {}, properties = {}) raw_body = body b, h, p = middleware.on_publish(body, headers, properties) msg = Message.new(nil, self, b, h, p, raw_body) message_queue << msg _deliver_messages end
handle_subscribe(options = {}, &consumer)
click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 42 def handle_subscribe(options = {}, &consumer) subscription = Subscription.new(adapter, self, consumer, options) adapter.add_subscription_for(name, subscription) _deliver_messages subscription end
subscriptions()
click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 30 def subscriptions adapter.subscriptions_for(name) end
Private Instance Methods
_deliver_messages()
click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 74 def _deliver_messages unless subscriptions.empty? until (msg = _fetch_message(current_adapter_context)).nil? sub = next_subscription # this actually cycles through the subscriptions sub.deliver_message(msg) end end end
_fetch_message(ctx, _options = {})
click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 63 def _fetch_message(ctx, _options = {}) message = message_queue.shift if message.nil? nil else raw_body = message.body b, h, p = middleware.on_consume(message.body, message.headers, message.properties) Message.new(ctx, self, b, h, p, raw_body) end end
message_queue()
click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 83 def message_queue adapter.message_queue_for(name) end
next_subscription()
click to toggle source
# File lib/message_driver/adapters/in_memory_adapter.rb, line 59 def next_subscription adapter.next_subscription_for(name) end