class Object

Public Class Methods

client(args = {}) click to toggle source
# File lib/rks/kafka.rb, line 7
def client(args = {})
  if args[:new]
    new(config.brokers)
  else
    @client ||= new(config.brokers)
  end
end
consumer() click to toggle source
# File lib/rks/kafka.rb, line 19
def consumer
  @consumer ||= client.consumer(group_id: config.consumer_group_id)
end
init(args = {}) click to toggle source
# File lib/rks/logger.rb, line 6
def init(args = {})
      new_args = {
    buffer_max_items: 5000,
    buffer_max_interval: 1,
    type: :multi_logger,
    outputs: [
      {type: :stdout}
      # {type: :file, path: "log/#{ENV['RKS_ENV']}.log"} if ENV["LOG_FILE"]
    ]
  }.merge!(args)
  new(new_args)
end
producer() click to toggle source
# File lib/rks/kafka.rb, line 15
def producer
  @producer ||= client.producer
end

Public Instance Methods

camelize(str) click to toggle source
# File lib/rks/kafka.rb, line 43
def camelize(str)
  str.split('-').collect(&:capitalize).join
end
execute_job(worker, cloned_args) click to toggle source
# File lib/rks/sidekiq.rb, line 4
def execute_job(worker, cloned_args)
  correlation_id = cloned_args[0]["correlation_id"]
  Application.logger.with_rescue_and_duration_worker(correlation_id, worker.class.name, cloned_args[0], worker.jid) do
    worker.perform(*cloned_args)
  end
end
produce(*args) click to toggle source
# File lib/rks/kafka.rb, line 28
def produce(*args)
  encoding = args[1][:encoding] == true || args[1][:encoding] == nil
  payload, topic, encoding = JSON.parse(JSON.dump(args[0])), args[1][:topic]
  args[1][:topic] = [Application.config.env, topic].join("-")
  args[1].delete(:encoding)

  payload = if encoding
    Application.avro_registry.encode(payload, schema_name: camelize(topic))
  else
    JSON.dump(payload)
  end

  original_produce(payload, **args[1])
end
with_rescue_and_duration_command(correlation_id, actor, args) { || ... } click to toggle source
# File lib/rks/logger.rb, line 45
def with_rescue_and_duration_command(correlation_id, actor, args)
  info correlation_id: correlation_id, status: "started", command: actor, args: args
  duration = Benchmark.measure { @result = yield }
  info correlation_id: correlation_id, status: "finished", command: actor, duration: duration.real.round(3)

  @result
rescue Exception => e
  Application.logger.fatal correlation_id: correlation_id, status: "failed", command: actor, error_name: e.class.to_s, error_message: e.message, error_detail: e.backtrace
  nil
end
with_rescue_and_duration_event(correlation_id, event, payload) { || ... } click to toggle source
# File lib/rks/logger.rb, line 29
def with_rescue_and_duration_event(correlation_id, event, payload)
  begin
    info correlation_id: correlation_id, status: "started", event: event, payload: payload
  rescue Encoding::UndefinedConversionError
    info correlation_id: correlation_id, status: "started", event: event
  end
  
  duration = Benchmark.measure { @result = yield }
  info correlation_id: correlation_id, status: "finished", event: event, duration: duration.real.round(3)

  @result
rescue Exception => e
  Application.logger.fatal correlation_id: correlation_id, status: "failed", event: event, error_name: e.class.to_s, error_message: e.message, error_detail: e.backtrace
  nil
end
with_rescue_and_duration_worker(correlation_id, worker, args, jid) { || ... } click to toggle source
# File lib/rks/logger.rb, line 56
def with_rescue_and_duration_worker(correlation_id, worker, args, jid)
  info correlation_id: correlation_id, status: "started", worker: worker, jid: jid, args: args
  duration = Benchmark.measure { @result = yield }
  info correlation_id: correlation_id, status: "finished", worker: worker, jid: jid, duration: duration.real.round(3)

  @result
rescue Exception => e
  Application.logger.fatal correlation_id: correlation_id, status: "failed", worker: worker, jid: jid, error_name: e.class.to_s, error_message: e.message, error_detail: e.backtrace
  nil
end