class GorgService::RabbitmqEnvBuilder

Public Class Methods

new(conn:nil, event_exchange:"", app_id:"", deferred_time: 10000, listened_routing_keys: [], prefetch: 1) click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 4
def initialize(conn:nil, event_exchange:"", app_id:"", deferred_time: 10000, listened_routing_keys: [], prefetch: 1)
  @_conn=conn
  @app_id=app_id
  @event_exchange_name=event_exchange
  @deferred_time=deferred_time
  @delayed_queues={}
  @listened_routing_keys=listened_routing_keys
  set_logger
end

Public Instance Methods

ch() click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 20
def ch
  unless (@_ch && @_ch.status == :open)
    @_ch=conn.create_channel
    @_ch.prefetch(1)
  end
  @_ch
end
Also aliased as: channel
channel()
Alias for: ch
conn() click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 14
def conn
  @_conn.start unless @_conn.connected?
  @_conn
end
Also aliased as: connection
connection()
Alias for: conn
delayed_in_exchange() click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 42
def delayed_in_exchange
 ch.topic("#{@app_id}_delayed_in_x", :durable => true)
end
delayed_out_exchange() click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 46
def delayed_out_exchange
  ch.fanout("#{@app_id}_delayed_out_x", :durable => true)
end
delayed_queue_for(routing_key) click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 62
def delayed_queue_for routing_key
  @delayed_queues[routing_key]||= create_delayed_queue_for(routing_key)
end
event_exchange() click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 37
def event_exchange
  ch.topic(@event_exchange_name, :durable => true)
end
find_exchange_by_name(name, type: 'topic', opts: {}) click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 66
def find_exchange_by_name(name, type: 'topic', opts: {})
  begin
    ch.send(type,name,opts)
  rescue Bunny::PreconditionFailed => e
    regex=/PRECONDITION_FAILED - inequivalent arg '(?<arg>.*)' for exchange '(?<exchange>.*)' in vhost '(?<vhost>.*)': received '(?<our>.*)' but current is '(?<their>.*)'/
    match=regex.match(e.message)

    case match[:arg]
      when "type"
        find_exchange_by_name(name,type: match[:their],opts: opts)
      else
        find_exchange_by_name(name,type: type,opts: opts.merge({ match[:arg].to_sym =>  match[:their]}))
    end
  end
end
job_queue() click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 50
def job_queue
  GorgService.logger.debug "Listened keys :#{@listened_routing_keys}"
  q=ch.queue("#{@app_id}_job_q", :durable => true)
  q.bind delayed_out_exchange
  @listened_routing_keys.each do |rk|
    q.bind(event_exchange, :routing_key => rk)
  end
  q.bind(reply_exchange, :routing_key => "#")
  q.bind(request_exchange, :routing_key => "#")
  q
end
reply_exchange() click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 33
def reply_exchange
  ch.topic("#{@app_id}.reply", :durable => true)
end
request_exchange() click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 29
def request_exchange
  ch.topic("#{@app_id}.request", :durable => true)
end

Private Instance Methods

create_delayed_queue_for(routing_key) click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 92
def create_delayed_queue_for(routing_key)
  q_name="#{@app_id}_#{routing_key.gsub(".","-")}_deferred_q"

  begin
    q=ch.queue(q_name,
      durable: true,
      arguments: {
          'x-message-ttl' => @deferred_time,
          'x-dead-letter-exchange' => delayed_out_exchange.name,
          'x-dead-letter-routing-key' => routing_key,
        }
      )

    q.bind(delayed_in_exchange.name, :routing_key => routing_key)
  rescue Bunny::PreconditionFailed => e
    if e.message.start_with?("PRECONDITION_FAILED - inequivalent arg")
      GorgService.logger.fatal("Mismatching configuration : admin action necessary. Error was : #{e.message}")
      raise "MismatchingConfig"
    end
    raise
  end
end
set_logger() click to toggle source
# File lib/gorg_service/rabbitmq_env_builder.rb, line 84
def set_logger
  x=ch.fanout("log", :durable => true)
  x.bind(event_exchange, :routing_key => "#")
  x.bind(reply_exchange, :routing_key => "#")
  x.bind(request_exchange, :routing_key => "#")
  x.bind(delayed_in_exchange, :routing_key => "#")
end