class BcdbOut

Constants

DEFAULT_BUFFER_TYPE
DEFAULT_FORMATTER

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bcdb.rb, line 35
def initialize
  super
end

Public Instance Methods

bcdb_authorise() click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 144
def bcdb_authorise()
    auth_uri = URI.parse(@auth_url)
    auth_data = {
        :username => @username,
        :password => @password,
        :client_id => @client_id,
        :client_secret => @client_secret,
        :grant_type => @grant_type
    }
    status = true
    begin
        unless (@token_oauth && (@expires_token && Time.now.utc > @expires_token))
            https= Net::HTTP.new(auth_uri.host,auth_uri.port)
            https.use_ssl = auth_uri.scheme == 'https'

            request = Net::HTTP::Post.new(auth_uri.path)
            request.set_form_data(auth_data)
            request['Content-Type'] = "application/x-www-form-urlencoded"
            resp = https.request(request)
            log.debug("#{resp.body}")
            bcdb_response = JSON.parse(resp.body)
            if bcdb_response["code"] == 5000
                status = false
                log.error("Authentification failed please check your credentials")
            else
                @token_oauth = bcdb_response['access_token']
                @expires_token = Time.now.utc + bcdb_response['expires_in'].to_i
            end
        end
    rescue => e
      # This should never happen unless there's a flat out issue with the network
      log.error("Error Makeing Authorization Request to BCDB. Error: #{e.message} | Backtrace: #{e.backtrace}")
      sleep(2)
      bcdb_authorise()
    end
    return status
end
bcdb_update_schema(data, cached_keys=false) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 182
def bcdb_update_schema(data, cached_keys=false)
    schema_uri = URI.parse(@create_schema_url)
    schema_properties = {}
    data.each do |key|
        log.debug("KEY #{key.inspect}")
        schema_properties["#{key}"] = {
            :"$id" => "/properties/#{schema_properties["#{key}"]}",
            :type => "string",
            :title => "The #{schema_properties["#{key}"]} Schema"
        }
    end
    schema_data = {
        :type => "object",
        :"$id" => @bcdb_entity,
        :"$schema" => "http://json-schema.org/draft-07/schema#",
        :title => "The Root Schema",
        :properties => schema_properties,
        :autoId => true
    }
    body = JSON(schema_data)

    if cached_keys
        request = bcdb_url(schema_uri,'put', body)
    else
        request = bcdb_url(schema_uri,'post',body)
        res = JSON.parse(request.body)["code"]
        if res == 4009 || res == 4000
            request = bcdb_url(schema_uri,'put', body)
        end
    end
   log.debug("UPDATE SCHEMA: #{body}")

   log.debug("UPDATE SCHEMA RESPONSE: #{request.body}")
   return data, true
end
bcdb_url(uri,type,body) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 217
def bcdb_url(uri,type,body)
    bcdb_request = Net::HTTP.new(uri.host,uri.port)
    bcdb_request.use_ssl = uri.scheme == 'https'
    case type
    when 'post'
        request = Net::HTTP::Post.new(uri.path)
    when 'put'
        request = Net::HTTP::Put.new(uri.path)
    end
    request.body = body
    request['Content-Type'] = "application/json"
    request['authorization'] = "Bearer #{@token_oauth}"
    response = bcdb_request.request(request)
    return response
end
bulk_request_format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 432
def bulk_request_format(tag, time, record)
  @formatter.format(tag, time, record)
end
compress_body(req, data) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 271
def compress_body(req, data)
  return unless @compress_request
  gz = Zlib::GzipWriter.new(StringIO.new)
  gz << data

  req['Content-Encoding'] = "gzip"
  req.body = gz.close.string
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bcdb.rb, line 105
def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter)
  super
  @create_schema_url = "#{@base_url}" + "/data/catalog/_JsonSchema/" + "#{@bcdb_entity}"
  if @bulk_request
      @base_url =  "#{@base_url}" + "/data/bulkAsync/" + "#{@bcdb_entity}"
  else
      @base_url = "#{@base_url}" + "/data/" + "#{@bcdb_entity}"
  end

  bcdb_authorise() if @authentication == :oauth

  @ssl_verify_mode = if @ssl_no_verify
                       OpenSSL::SSL::VERIFY_NONE
                     else
                       OpenSSL::SSL::VERIFY_PEER
                     end

  @ca_file = @cacert_file
  @last_request_time = nil
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered

  if @formatter_config = conf.elements('format').first
    @formatter = formatter_create
  end

  if @bulk_request
    class << self
      alias_method :format, :bulk_request_format
    end
    @formatter = formatter_create(type: :json)
    @serializer = :x_ndjson # secret settings for bulk_request
  else
    class << self
      alias_method :format, :split_request_format
    end
  end
end
create_request(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 330
def create_request(tag, time, record)
  url = format_url(tag, time, record)
  uri = URI.parse(url)
  req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.request_uri)
  set_body(req, tag, time, record)
  set_header(req, tag, time, record)
  log.trace("CREATE REQUEST: #{req}, #{uri}")
  return req, uri
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 424
def format(tag, time, record)
  # For safety.
end
format_url(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 241
def format_url(tag, time, record)
  @base_url
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 436
def formatted_to_msgpack_binary?
  if @bulk_request
    false
  else
    true
  end
end
handle_record(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 407
def handle_record(tag, time, record)
  if @formatter_config
    record = @formatter.format(tag, time, record)
  end
  req, uri = create_request(tag, time, record)
  send_request(req, uri)
end
handle_records(tag, time, chunk) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 415
def handle_records(tag, time, chunk)
  req, uri = create_request(tag, time, chunk.read)
  send_request(req, uri)
end
http_opts(uri) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 340
def http_opts(uri)
    opts = {
      :use_ssl => uri.scheme == 'https'
    }
    opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl]
    opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file)
    opts[:cert] = OpenSSL::X509::Certificate.new(File.read(@client_cert_path)) if File.file?(@client_cert_path)
    opts[:key] = OpenSSL::PKey.read(File.read(@private_key_path), @private_key_passphrase) if File.file?(@private_key_path)
    opts
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 444
def multi_workers_ready?
  true
end
prefer_buffered_processing() click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 420
def prefer_buffered_processing
  @buffered
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 448
def process(tag, es)
  log.trace("TRACE PROCESS: #{tag}, #{es}")
  es.each do |time, record|
    handle_record(tag, time, record)
  end
end
proxies() click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 351
def proxies
  ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy']
end
send_request(req, uri) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 355
def send_request(req, uri)
  is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    log.info('Dropped request due to rate limiting')
    return
  end

  res = nil

  begin
    if @authentication == :basic
      req.basic_auth(@username, @password)
    elsif @authentication == :bearer
      req['authorization'] = "bearer #{@token}"
    elsif @authentication == :jwt
      req['authorization'] = "jwt #{@token}"
    elsif @authentication == :oauth
        req['authorization'] = "Bearer #{@token_oauth}"
    end
    @last_request_time = Time.now.to_f

    if proxy = proxies
      proxy_uri = URI.parse(proxy)

      res = Net::HTTP.start(uri.host, uri.port,
                            proxy_uri.host, proxy_uri.port, proxy_uri.user, proxy_uri.password,
                            **http_opts(uri)) {|http| http.request(req) }
    else
      res = Net::HTTP.start(uri.host, uri.port, **http_opts(uri)) {|http| http.request(req) }
      log.debug("REQUEST BODY: #{req.body}")
      log.debug("RESPONSE BODY: #{res.body}")
    end
  rescue => e # rescue all StandardErrors
    # server didn't respond
    log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'"
    raise e if @raise_on_error
  else
     unless res and res.is_a?(Net::HTTPSuccess)
        res_summary = if res
                         "#{res.code} #{res.message} #{res.body}"
                      else
                         "res=nil"
                      end
        if @recoverable_status_codes.include?(res.code.to_i)
          raise RecoverableResponse, res_summary
        else
          log.warn "failed to #{req.method} #{uri} (#{res_summary})"
        end
     end #end unless
  end # end begin
end
set_body(req, tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 245
def set_body(req, tag, time, record)
  if @serializer == :json
    set_json_body(req, record)
  elsif @serializer == :text
    set_text_body(req, record)
  elsif @serializer == :raw
    set_raw_body(req, record)
  elsif @serializer == :x_ndjson
    set_bulk_body(req, record)
  else
    req.set_form_data(record)
  end
  req
end
set_bulk_body(req, data) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 303
def set_bulk_body(req, data)
  bcdb_authorise()
  if data.is_a? String
      flat_keys = []
      bcdb_data = data.split("\n").map{ |x| JSON.parse(x) }
      bcdb_data.each do |data|
          flat_keys = flat_keys + data.keys
      end
      flat_keys.uniq!
      unless @cached_keys && @keys.sort == flat_keys.sort
          @keys, @cached_keys = bcdb_update_schema(flat_keys, @cached_keys)
      end
      data = { :records => bcdb_data }
      @base_url = "#{@base_url_}" + "/data/bulkAsync/" + "#{@bcdb_entity}"
  else
      log.debug("DATA: #{data.inspect}")
      unless @cached_keys && @keys.sort == data.keys.sort
          @keys, @cached_keys = bcdb_update_schema(data.keys, @cached_keys)
      end
      data = { :records => [data] }
  end
  req.body = Yajl.dump(data)
  # req['Content-Type'] = 'application/x-ndjson'
  req['Content-Type'] = 'application/json'
  compress_body(req, req.body)
end
set_header(req, tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 260
def set_header(req, tag, time, record)
  if @custom_headers
    @custom_headers.each do |k,v|
      req[k] = v
    end
    req
  else
    req
  end
end
set_json_body(req, data) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 280
def set_json_body(req, data)
  bcdb_authorise()
  unless @cached_keys && @keys.sort == data.keys.sort
      @keys, @cached_keys = bcdb_update_schema(data.keys, @cached_keys)
  end
  # data = { :records => [data] } if @bulk_request
  req.body = Yajl.dump(data)
  req['Content-Type'] = "application/json"
  compress_body(req, req.body)
end
set_raw_body(req, data) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 297
def set_raw_body(req, data)
  req.body = data.to_s
  req['Content-Type'] = 'application/octet-stream'
  compress_body(req, req.body)
end
set_text_body(req, data) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 291
def set_text_body(req, data)
  req.body = data["message"]
  req['Content-Type'] = 'text/plain'
  compress_body(req, req.body)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bcdb.rb, line 237
def shutdown
  super
end
split_request_format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 428
def split_request_format(tag, time, record)
  [time, record].to_msgpack
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bcdb.rb, line 233
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_bcdb.rb, line 455
def write(chunk)
  tag = chunk.metadata.tag
  @base_url = extract_placeholders(@base_url, chunk)
  if @bulk_request
    time = Fluent::Engine.now
    handle_records(tag, time, chunk)
  else
    chunk.msgpack_each do |time, record|
      handle_record(tag, time, record)
    end
  end
end