class Queight::Client
Constants
- PublishFailure
Public Class Methods
new(channel_pool)
click to toggle source
# File lib/queight/client.rb, line 7 def initialize(channel_pool) @channel_pool = channel_pool end
Public Instance Methods
bind(exchange, queue)
click to toggle source
# File lib/queight/client.rb, line 79 def bind(exchange, queue) with_channel do |channel| exchange.bind(channel, queue) end end
declare(queue)
click to toggle source
# File lib/queight/client.rb, line 29 def declare(queue) with_channel do |channel| queue.declare(channel) end end
delete_exchange(exchange)
click to toggle source
# File lib/queight/client.rb, line 99 def delete_exchange(exchange) with_channel do |channel| exchange.delete(channel) end end
delete_queue(queue)
click to toggle source
# File lib/queight/client.rb, line 89 def delete_queue(queue) with_channel do |channel| queue.delete(channel) end end
message_count(queue)
click to toggle source
# File lib/queight/client.rb, line 85 def message_count(queue) with_channel { |channel| queue.message_count(channel) } end
publish(exchange, message, routing_key)
click to toggle source
# File lib/queight/client.rb, line 35 def publish(exchange, message, routing_key) with_transactional_channel do |channel| channel.tx_select exchange.publish(channel, message, routing_key) raise PublishFailure unless channel.tx_commit end end
publish_to_queue(message, queue, message_options = {})
click to toggle source
# File lib/queight/client.rb, line 50 def publish_to_queue(message, queue, message_options = {}) declare(queue) publish(Queight.default_exchange(message_options), message, queue.name) end
publish_to_queue_without_transaction(message, queue, options = {})
click to toggle source
# File lib/queight/client.rb, line 55 def publish_to_queue_without_transaction(message, queue, options = {}) declare(queue) publish_without_transaction( Queight.default_exchange(options), message, queue.name ) end
Also aliased as: publish_to_queue!
publish_without_transaction(exchange, message, routing_key)
click to toggle source
# File lib/queight/client.rb, line 43 def publish_without_transaction(exchange, message, routing_key) with_channel do |channel| exchange.publish(channel, message, routing_key) end end
Also aliased as: publish!
purge(queue)
click to toggle source
# File lib/queight/client.rb, line 95 def purge(queue) with_channel { |channel| queue.purge(channel) } end
subscribe(queue, prefetch = 1, &block)
click to toggle source
# File lib/queight/client.rb, line 65 def subscribe(queue, prefetch = 1, &block) with_subscribe_channel(prefetch) do |channel| queue.subscribe(channel, &block) end end
subscribe_non_blocking(queue, prefetch = 1, &block)
click to toggle source
# File lib/queight/client.rb, line 71 def subscribe_non_blocking(queue, prefetch = 1, &block) channel = @channel_pool.create_channel(prefetch) channel.prefetch(prefetch) consumer = queue.subscribe(channel, :block => false, &block) CancellableSubscriber.new(channel, consumer) end
with_channel() { |channel| ... }
click to toggle source
# File lib/queight/client.rb, line 11 def with_channel @channel_pool.with_channel do |channel| yield(channel) end end
with_subscribe_channel(prefetch) { |channel| ... }
click to toggle source
# File lib/queight/client.rb, line 23 def with_subscribe_channel(prefetch) @channel_pool.with_subscribe_channel(prefetch) do |channel| yield(channel) end end
with_transactional_channel() { |channel| ... }
click to toggle source
# File lib/queight/client.rb, line 17 def with_transactional_channel @channel_pool.with_transactional_channel do |channel| yield(channel) end end