module LaGear::Bus
Public Class Methods
init_pool(size = ::Sidekiq.options[:concurrency], timeout = 3)
click to toggle source
# File lib/la_gear/bus.rb, line 5 def init_pool(size = ::Sidekiq.options[:concurrency], timeout = 3) $publisher = ConnectionPool.new( size: size, timeout: timeout ) { ::LaGear::Publisher.new } $publisher.with do |bus| fail 'Bus is lost!' unless bus.is_a?(LaGear::Publisher) end end
publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
click to toggle source
# File lib/la_gear/bus.rb, line 18 def publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts) DelayablePublisher.sidekiq_delay(sidekiq_opts).publish(routing_key, msg, bunny_opts) end
publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
click to toggle source
# File lib/la_gear/bus.rb, line 30 def publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts) DelayablePublisher.sidekiq_delay_until(timestamp, sidekiq_opts).publish(routing_key, msg, bunny_opts) end
publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
click to toggle source
# File lib/la_gear/bus.rb, line 24 def publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts) DelayablePublisher.sidekiq_delay_for(interval, sidekiq_opts).publish(routing_key, msg, bunny_opts) end
publish_local(routing_key, msg, la_gear_opts = {})
click to toggle source
# File lib/la_gear/bus.rb, line 36 def publish_local(routing_key, msg, la_gear_opts = {}) routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts) NamespaceUtility.local_worker(routing_key).perform_async(*msg.values) end
publish_local_in(routing_key, msg, la_gear_opts = {}, interval)
click to toggle source
# File lib/la_gear/bus.rb, line 42 def publish_local_in(routing_key, msg, la_gear_opts = {}, interval) routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts) NamespaceUtility.local_worker(routing_key).perform_in(interval, *msg.values) end
Private Instance Methods
init_pool(size = ::Sidekiq.options[:concurrency], timeout = 3)
click to toggle source
# File lib/la_gear/bus.rb, line 5 def init_pool(size = ::Sidekiq.options[:concurrency], timeout = 3) $publisher = ConnectionPool.new( size: size, timeout: timeout ) { ::LaGear::Publisher.new } $publisher.with do |bus| fail 'Bus is lost!' unless bus.is_a?(LaGear::Publisher) end end
publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
click to toggle source
# File lib/la_gear/bus.rb, line 18 def publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts) DelayablePublisher.sidekiq_delay(sidekiq_opts).publish(routing_key, msg, bunny_opts) end
publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
click to toggle source
# File lib/la_gear/bus.rb, line 30 def publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts) DelayablePublisher.sidekiq_delay_until(timestamp, sidekiq_opts).publish(routing_key, msg, bunny_opts) end
publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
click to toggle source
# File lib/la_gear/bus.rb, line 24 def publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts) DelayablePublisher.sidekiq_delay_for(interval, sidekiq_opts).publish(routing_key, msg, bunny_opts) end
publish_local(routing_key, msg, la_gear_opts = {})
click to toggle source
# File lib/la_gear/bus.rb, line 36 def publish_local(routing_key, msg, la_gear_opts = {}) routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts) NamespaceUtility.local_worker(routing_key).perform_async(*msg.values) end
publish_local_in(routing_key, msg, la_gear_opts = {}, interval)
click to toggle source
# File lib/la_gear/bus.rb, line 42 def publish_local_in(routing_key, msg, la_gear_opts = {}, interval) routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts) NamespaceUtility.local_worker(routing_key).perform_in(interval, *msg.values) end