class Queight::Client

Constants

PublishFailure

Public Class Methods

new(channel_pool) click to toggle source
# File lib/queight/client.rb, line 7
def initialize(channel_pool)
  @channel_pool = channel_pool
end

Public Instance Methods

bind(exchange, queue) click to toggle source
# File lib/queight/client.rb, line 79
def bind(exchange, queue)
  with_channel do |channel|
    exchange.bind(channel, queue)
  end
end
declare(queue) click to toggle source
# File lib/queight/client.rb, line 29
def declare(queue)
  with_channel do |channel|
    queue.declare(channel)
  end
end
delete_exchange(exchange) click to toggle source
# File lib/queight/client.rb, line 99
def delete_exchange(exchange)
  with_channel do |channel|
    exchange.delete(channel)
  end
end
delete_queue(queue) click to toggle source
# File lib/queight/client.rb, line 89
def delete_queue(queue)
  with_channel do |channel|
    queue.delete(channel)
  end
end
message_count(queue) click to toggle source
# File lib/queight/client.rb, line 85
def message_count(queue)
  with_channel { |channel| queue.message_count(channel) }
end
publish(exchange, message, routing_key) click to toggle source
# File lib/queight/client.rb, line 35
def publish(exchange, message, routing_key)
  with_transactional_channel do |channel|
    channel.tx_select
    exchange.publish(channel, message, routing_key)
    raise PublishFailure unless channel.tx_commit
  end
end
publish!(exchange, message, routing_key)
publish_to_queue(message, queue, message_options = {}) click to toggle source
# File lib/queight/client.rb, line 50
def publish_to_queue(message, queue, message_options = {})
  declare(queue)
  publish(Queight.default_exchange(message_options), message, queue.name)
end
publish_to_queue!(message, queue, options = {})
publish_to_queue_without_transaction(message, queue, options = {}) click to toggle source
# File lib/queight/client.rb, line 55
def publish_to_queue_without_transaction(message, queue, options = {})
  declare(queue)
  publish_without_transaction(
    Queight.default_exchange(options),
    message,
    queue.name
  )
end
Also aliased as: publish_to_queue!
publish_without_transaction(exchange, message, routing_key) click to toggle source
# File lib/queight/client.rb, line 43
def publish_without_transaction(exchange, message, routing_key)
  with_channel do |channel|
    exchange.publish(channel, message, routing_key)
  end
end
Also aliased as: publish!
purge(queue) click to toggle source
# File lib/queight/client.rb, line 95
def purge(queue)
  with_channel { |channel| queue.purge(channel) }
end
subscribe(queue, prefetch = 1, &block) click to toggle source
# File lib/queight/client.rb, line 65
def subscribe(queue, prefetch = 1, &block)
  with_subscribe_channel(prefetch) do |channel|
    queue.subscribe(channel, &block)
  end
end
subscribe_non_blocking(queue, prefetch = 1, &block) click to toggle source
# File lib/queight/client.rb, line 71
def subscribe_non_blocking(queue, prefetch = 1, &block)
  channel = @channel_pool.create_channel(prefetch)
  channel.prefetch(prefetch)
  consumer = queue.subscribe(channel, :block => false, &block)

  CancellableSubscriber.new(channel, consumer)
end
with_channel() { |channel| ... } click to toggle source
# File lib/queight/client.rb, line 11
def with_channel
  @channel_pool.with_channel do |channel|
    yield(channel)
  end
end
with_subscribe_channel(prefetch) { |channel| ... } click to toggle source
# File lib/queight/client.rb, line 23
def with_subscribe_channel(prefetch)
  @channel_pool.with_subscribe_channel(prefetch) do |channel|
    yield(channel)
  end
end
with_transactional_channel() { |channel| ... } click to toggle source
# File lib/queight/client.rb, line 17
def with_transactional_channel
  @channel_pool.with_transactional_channel do |channel|
    yield(channel)
  end
end