class Fluent::SomeOutput

Public Instance Methods

configure(conf) click to toggle source

This method is called before starting.

Calls superclass method
# File lib/fluent/plugin/out_coralogix.rb, line 33
def configure(conf)
  super
  begin
    @currentWorker = 0;
    @workers = {}
    @app_name_from_config = DEFAULT_appname;
    @sub_name_from_config = DEFAULT_subsystemname;
    i = 0
    log.info "configure #{number_of_workers} workers"
    until i == number_of_workers do
      @workers[i.to_s] = {}
      log.info "init worker ##{i}"
      private_key = get_private_key
      if !appname.start_with?("$") && !subsystemname.start_with?("$")
        @app_name_from_config = config.fetch("APP_NAME", appname)
        @sub_name_from_config = config.fetch("SUB_SYSTEM", subsystemname)
        @workers[i.to_s]["#{@app_name_from_config}.#{@sub_name_from_config}"] = CoralogixLogger.new private_key, @app_name_from_config, @sub_name_from_config, false, "FluentD (#{version?})", force_compression, proxy ? proxy.to_h.map { |k,v| [k.to_s,v] }.to_h : Hash.new, log
      else
        @workers[i.to_s] = {}
      end
      i+=1
    end

    unless endpoint.nil?
      ENV["CORALOGIX_LOG_URL"] = "https://#{endpoint}/api/v1/logs"
      ENV["CORALOGIX_TIME_DELTA_URL"] = "https://#{endpoint}/sdk/v1/time"
    end

  rescue Exception => e
    log.error "Failed to configure: #{e}"
  end
end
emit(tag, es, chain) click to toggle source

This method is called when an event reaches Fluentd. 'es' is a Fluent::EventStream object that includes multiple events. You can use 'es.each {|time,record| … }' to retrieve events. 'chain' is an object that manages transactions. Call 'chain.next' at appropriate points and rollback if it raises an exception.

NOTE! This method is called by Fluentd's main thread so you should not write slow routine here. It causes Fluentd's performance degression.

# File lib/fluent/plugin/out_coralogix.rb, line 141
def emit(tag, es, chain)
  chain.next
  es.each { |time, record|
    logger = get_logger(record)

    log_record = log_key_name != nil ? record.fetch(log_key_name, record) : record
    log_record = is_json ? log_record.to_json : log_record
    log_record = log_record.to_s.empty? ? record : log_record

    timestamp = record.fetch(timestamp_key_name, nil)
    if (timestamp.nil?)
      logger.debug log_record
    else
      begin
        float_timestamp = DateTime.parse(timestamp.to_s).to_time.to_f * 1000
        logger.debug log_record, nil, timestamp: float_timestamp
      rescue Exception => e
        logger.debug log_record
      end
    end
  }
end
extract(record, key, default) click to toggle source
# File lib/fluent/plugin/out_coralogix.rb, line 78
def extract record, key, default
  begin
    res = record
    return key unless key.start_with?("$")
    key[1..-1].split(".").each do |k|
      res = res.fetch(k, nil)
      return default if res == nil
    end
    return res
  rescue Exception => e
    log.error "Failed to extract #{key}: #{e}"
    return default
  end
end
get_app_sub_name(record) click to toggle source
# File lib/fluent/plugin/out_coralogix.rb, line 94
def get_app_sub_name(record)
  app_name = extract(record, appname, DEFAULT_appname)
  sub_name = extract(record, subsystemname, DEFAULT_subsystemname)
  return app_name, sub_name
end
get_logger(record) click to toggle source
# File lib/fluent/plugin/out_coralogix.rb, line 100
def get_logger(record)
  private_key = get_private_key
  if !appname.start_with?("$") && !subsystemname.start_with?("$")
    app_name = @app_name_from_config
    sub_name = @sub_name_from_config
  else
    app_name, sub_name = get_app_sub_name(record)
  end
  
  # YK@2020-11-26T10:56 - We had encountered a case in which this value reached above 7K and the value of worker became null
  if @currentWorker >= number_of_workers
    @currentWorker = 0
  end
  worker = @workers[@currentWorker.to_s]
  @currentWorker+=1;
  if !worker.key?("#{app_name}.#{sub_name}")
    worker["#{app_name}.#{sub_name}"] = CoralogixLogger.new private_key, app_name, sub_name, false, "FluentD (#{version?})", force_compression, proxy ? proxy.to_h.map { |k, v| [k.to_s, v] }.to_h : Hash.new, log
  end

  return worker["#{app_name}.#{sub_name}"]
  
end
get_private_key() click to toggle source
# File lib/fluent/plugin/out_coralogix.rb, line 66
def get_private_key
  return config.fetch("PRIVATE_KEY", privatekey)
end
shutdown() click to toggle source

This method is called when shutting down.

Calls superclass method
# File lib/fluent/plugin/out_coralogix.rb, line 130
def shutdown
  super
end
start() click to toggle source

This method is called when starting.

Calls superclass method
# File lib/fluent/plugin/out_coralogix.rb, line 125
def start
  super
end
version?() click to toggle source
# File lib/fluent/plugin/out_coralogix.rb, line 70
def version?
  begin
    Gem.loaded_specs['fluent-plugin-coralogix'].version.to_s
  rescue Exception => e
    return '0.0.0'
  end
end