class Object
Public Class Methods
client(args = {})
click to toggle source
# File lib/rks/kafka.rb, line 7 def client(args = {}) if args[:new] new(config.brokers) else @client ||= new(config.brokers) end end
consumer()
click to toggle source
# File lib/rks/kafka.rb, line 19 def consumer @consumer ||= client.consumer(group_id: config.consumer_group_id) end
init(args = {})
click to toggle source
# File lib/rks/logger.rb, line 6 def init(args = {}) new_args = { buffer_max_items: 5000, buffer_max_interval: 1, type: :multi_logger, outputs: [ {type: :stdout} # {type: :file, path: "log/#{ENV['RKS_ENV']}.log"} if ENV["LOG_FILE"] ] }.merge!(args) new(new_args) end
producer()
click to toggle source
# File lib/rks/kafka.rb, line 15 def producer @producer ||= client.producer end
Public Instance Methods
camelize(str)
click to toggle source
# File lib/rks/kafka.rb, line 43 def camelize(str) str.split('-').collect(&:capitalize).join end
execute_job(worker, cloned_args)
click to toggle source
# File lib/rks/sidekiq.rb, line 4 def execute_job(worker, cloned_args) correlation_id = cloned_args[0]["correlation_id"] Application.logger.with_rescue_and_duration_worker(correlation_id, worker.class.name, cloned_args[0], worker.jid) do worker.perform(*cloned_args) end end
produce(*args)
click to toggle source
# File lib/rks/kafka.rb, line 28 def produce(*args) encoding = args[1][:encoding] == true || args[1][:encoding] == nil payload, topic, encoding = JSON.parse(JSON.dump(args[0])), args[1][:topic] args[1][:topic] = [Application.config.env, topic].join("-") args[1].delete(:encoding) payload = if encoding Application.avro_registry.encode(payload, schema_name: camelize(topic)) else JSON.dump(payload) end original_produce(payload, **args[1]) end
with_rescue_and_duration_command(correlation_id, actor, args) { || ... }
click to toggle source
# File lib/rks/logger.rb, line 45 def with_rescue_and_duration_command(correlation_id, actor, args) info correlation_id: correlation_id, status: "started", command: actor, args: args duration = Benchmark.measure { @result = yield } info correlation_id: correlation_id, status: "finished", command: actor, duration: duration.real.round(3) @result rescue Exception => e Application.logger.fatal correlation_id: correlation_id, status: "failed", command: actor, error_name: e.class.to_s, error_message: e.message, error_detail: e.backtrace nil end
with_rescue_and_duration_event(correlation_id, event, payload) { || ... }
click to toggle source
# File lib/rks/logger.rb, line 29 def with_rescue_and_duration_event(correlation_id, event, payload) begin info correlation_id: correlation_id, status: "started", event: event, payload: payload rescue Encoding::UndefinedConversionError info correlation_id: correlation_id, status: "started", event: event end duration = Benchmark.measure { @result = yield } info correlation_id: correlation_id, status: "finished", event: event, duration: duration.real.round(3) @result rescue Exception => e Application.logger.fatal correlation_id: correlation_id, status: "failed", event: event, error_name: e.class.to_s, error_message: e.message, error_detail: e.backtrace nil end
with_rescue_and_duration_worker(correlation_id, worker, args, jid) { || ... }
click to toggle source
# File lib/rks/logger.rb, line 56 def with_rescue_and_duration_worker(correlation_id, worker, args, jid) info correlation_id: correlation_id, status: "started", worker: worker, jid: jid, args: args duration = Benchmark.measure { @result = yield } info correlation_id: correlation_id, status: "finished", worker: worker, jid: jid, duration: duration.real.round(3) @result rescue Exception => e Application.logger.fatal correlation_id: correlation_id, status: "failed", worker: worker, jid: jid, error_name: e.class.to_s, error_message: e.message, error_detail: e.backtrace nil end