module LaGear::Bus

Public Class Methods

init_pool(size = ::Sidekiq.options[:concurrency], timeout = 3) click to toggle source
# File lib/la_gear/bus.rb, line 5
def init_pool(size = ::Sidekiq.options[:concurrency],
                   timeout = 3)
  $publisher = ConnectionPool.new(
    size: size,
    timeout: timeout
  ) { ::LaGear::Publisher.new }

  $publisher.with do |bus|
    fail 'Bus is lost!' unless bus.is_a?(LaGear::Publisher)
  end
end
publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) click to toggle source
# File lib/la_gear/bus.rb, line 18
def publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  DelayablePublisher.sidekiq_delay(sidekiq_opts).publish(routing_key, msg, bunny_opts)
end
publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) click to toggle source
# File lib/la_gear/bus.rb, line 30
def publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  DelayablePublisher.sidekiq_delay_until(timestamp, sidekiq_opts).publish(routing_key, msg, bunny_opts)
end
publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) click to toggle source
# File lib/la_gear/bus.rb, line 24
def publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  DelayablePublisher.sidekiq_delay_for(interval, sidekiq_opts).publish(routing_key, msg, bunny_opts)
end
publish_local(routing_key, msg, la_gear_opts = {}) click to toggle source
# File lib/la_gear/bus.rb, line 36
def publish_local(routing_key, msg, la_gear_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  NamespaceUtility.local_worker(routing_key).perform_async(*msg.values)
end
publish_local_in(routing_key, msg, la_gear_opts = {}, interval) click to toggle source
# File lib/la_gear/bus.rb, line 42
def publish_local_in(routing_key, msg, la_gear_opts = {}, interval)
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  NamespaceUtility.local_worker(routing_key).perform_in(interval, *msg.values)
end

Private Instance Methods

init_pool(size = ::Sidekiq.options[:concurrency], timeout = 3) click to toggle source
# File lib/la_gear/bus.rb, line 5
def init_pool(size = ::Sidekiq.options[:concurrency],
                   timeout = 3)
  $publisher = ConnectionPool.new(
    size: size,
    timeout: timeout
  ) { ::LaGear::Publisher.new }

  $publisher.with do |bus|
    fail 'Bus is lost!' unless bus.is_a?(LaGear::Publisher)
  end
end
publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) click to toggle source
# File lib/la_gear/bus.rb, line 18
def publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  DelayablePublisher.sidekiq_delay(sidekiq_opts).publish(routing_key, msg, bunny_opts)
end
publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) click to toggle source
# File lib/la_gear/bus.rb, line 30
def publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  DelayablePublisher.sidekiq_delay_until(timestamp, sidekiq_opts).publish(routing_key, msg, bunny_opts)
end
publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) click to toggle source
# File lib/la_gear/bus.rb, line 24
def publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  DelayablePublisher.sidekiq_delay_for(interval, sidekiq_opts).publish(routing_key, msg, bunny_opts)
end
publish_local(routing_key, msg, la_gear_opts = {}) click to toggle source
# File lib/la_gear/bus.rb, line 36
def publish_local(routing_key, msg, la_gear_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  NamespaceUtility.local_worker(routing_key).perform_async(*msg.values)
end
publish_local_in(routing_key, msg, la_gear_opts = {}, interval) click to toggle source
# File lib/la_gear/bus.rb, line 42
def publish_local_in(routing_key, msg, la_gear_opts = {}, interval)
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  NamespaceUtility.local_worker(routing_key).perform_in(interval, *msg.values)
end