class Fluent::Plugin::RedshiftOutputV2

Attributes

last_gz_path[R]
last_sql[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redshift_v2.rb, line 40
def initialize
  super

  require 'aws-sdk'
  require 'zlib'
  require 'time'
  require 'tempfile'
  require 'pg'
  require 'json'
  require 'csv'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redshift_v2.rb, line 52
def configure(conf)
  super
  if !check_credentials
    fail ConfigError, "aws_key_id and aws_sec_key is required. or, use aws_iam_role instead."
  end
  @path = "#{@path}/" unless @path.end_with?('/')
  @path = @path[1..-1] if @path.start_with?('/')
  @utc = true if conf['utc']
  @db_conf = {
    host: @redshift_host,
    port: @redshift_port,
    dbname: @redshift_dbname,
    user: @redshift_user,
    password: @redshift_password,
    connect_timeout: @redshift_connect_timeout,
    hostaddr: IPSocket.getaddress(@redshift_host)
  }
  @delimiter = determine_delimiter(@file_type) if @delimiter.nil? or @delimiter.empty?
  $log.debug format_log("redshift file_type:#{@file_type} delimiter:'#{@delimiter}'")
  @table_name_with_schema = [@redshift_schemaname, @redshift_tablename].compact.join('.')
  @redshift_copy_columns = if @redshift_copy_columns.to_s.empty?
                             nil
                           else
                             @redshift_copy_columns.split(/[,\s]+/)
                           end
  @copy_sql_template = build_redshift_copy_sql_template
  @s3_server_side_encryption = @s3_server_side_encryption.to_sym if @s3_server_side_encryption
end
copy_sql(s3_uri) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 185
def copy_sql(s3_uri)
  @last_sql = @copy_sql_template % s3_uri
end
create_gz_file(chunk) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 126
def create_gz_file(chunk)
  tmp = Tempfile.new("s3-")
  tmp =
    if json? || msgpack?
      create_gz_file_from_structured_data(tmp, chunk)
    else
      create_gz_file_from_flat_data(tmp, chunk)
    end

  if tmp
    key = next_gz_path
    @bucket.put_object({
        server_side_encryption: @s3_server_side_encryption,
        body: tmp,
        key: key
      })

    tmp.close!
    @last_gz_path = key
  else
    $log.debug format_log("received no valid data. ")
    return false
  end
end
exec_copy(s3_uri) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 166
def exec_copy(s3_uri)
  $log.debug format_log("start copying. s3_uri=#{s3_uri}")
  begin
    @redshift_connection.exec copy_sql(s3_uri)
    $log.info format_log("completed copying to redshift. s3_uri=#{s3_uri}")
    true
  rescue RedshiftError => e
    if e.to_s =~ /^ERROR:  Load into table '[^']+' failed\./
      $log.error format_log("failed to copy data into redshift due to load error. s3_uri=#{s3_uri}"), error:e.to_s
      return false
    end
    raise e
  end
end
format(_tag, _time, record) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 102
def format(_tag, _time, record)
  if json?
    record.to_msgpack
  elsif msgpack?
    { @record_log_tag => record }.to_msgpack
  else
    "#{record[@record_log_tag]}\n"
  end
end
format_log(message) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 189
def format_log(message)
  if @log_suffix && !@log_suffix.empty?
    "#{message} #{@log_suffix}"
  else
    message
  end
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 197
def formatted_to_msgpack_binary
  true
end
insert_logs(chunk) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 121
def insert_logs(chunk)
  $log.debug format_log("start creating gz.")
  exec_copy s3_uri(create_gz_file(chunk))
end
next_gz_path() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 151
def next_gz_path
  timestamp_key = (@utc) ? Time.now.utc.strftime(@timestamp_key_format) : Time.now.strftime(@timestamp_key_format)
  i = 0
  path = ''
  loop do
    path = "#{@path}#{timestamp_key}_#{'%02d' % i}.gz"
    if @bucket.object(path).exists?
      i += 1
    else
      break
    end
  end
  path
end
s3_uri(path) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 181
def s3_uri(path)
  "s3://#{@s3_bucket}/#{path}"
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 99
def shutdown
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redshift_v2.rb, line 81
def start
  super

  options = {}
  if @aws_key_id && @aws_sec_key
    options = {
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
      force_path_style: true,
      region: @s3_region
    }
  end
  options[:endpoint] = @s3_endpoint if @s3_endpoint
  @client = Aws::S3::Client.new(options)
  @bucket = Aws::S3::Bucket.new @s3_bucket, client: @client
  @redshift_connection = RedshiftConnection.new(@db_conf)
end
try_write(chunk) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 116
def try_write(chunk)
  insert_logs chunk
  commit_write chunk.unique_id
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 112
def write(chunk)
  insert_logs chunk
end

Private Instance Methods

build_redshift_copy_sql_template() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 224
def build_redshift_copy_sql_template
  copy_columns = if @redshift_copy_columns
                   "(#{@redshift_copy_columns.join(",")})"
                 else
                   ''
                 end
  credentials = if @aws_key_id && @aws_sec_key
                  "CREDENTIALS 'aws_access_key_id=#{@aws_key_id};aws_secret_access_key=#{@aws_sec_key}'"
                else
                  "CREDENTIALS 'aws_iam_role=#{@aws_iam_role}'"
                end
  escape = if !@redshift_copy_base_options.include?('ESCAPE') && (json? || msgpack?)
             " ESCAPE"
           else
             ''
           end

  "copy #{@table_name_with_schema}#{copy_columns} from '%s' #{credentials} delimiter '#{@delimiter}' GZIP#{escape} #{@redshift_copy_base_options} #{@redshift_copy_options};"
end
check_credentials() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 203
def check_credentials
  if @aws_key_id && @aws_sec_key
    true
  elsif @aws_iam_role
    true
  else
    false
  end
end
create_gz_file_from_flat_data(dst_file, chunk) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 252
def create_gz_file_from_flat_data(dst_file, chunk)
  gzw = nil
  begin
    gzw = Zlib::GzipWriter.new(dst_file)
    chunk.write_to(gzw)
  ensure
    gzw.close rescue nil if gzw
  end
  dst_file
end
create_gz_file_from_structured_data(dst_file, chunk) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 263
def create_gz_file_from_structured_data(dst_file, chunk)
  redshift_table_columns = fetch_table_columns
  if redshift_table_columns == nil
    raise "failed to fetch the redshift table definition."
  elsif redshift_table_columns.empty?
    $log.warn format_log("no table on redshift or cannot access table. table_name=#{@table_name_with_schema}")
    return nil
  end

  if @redshift_copy_columns
    unknown_colmns = @redshift_copy_columns - redshift_table_columns
    unless unknown_colmns.empty?
      fail Fluent::ConfigError, "missing columns included in redshift_copy_columns - missing columns:\"#{unknown_colmns.join(',')}\""
    end
  end

  gzw = nil
  begin
    gzw = Zlib::GzipWriter.new(dst_file)
    chunk.msgpack_each do |record|
      next unless record
      begin
        tsv_text = hash_to_table_text(record, redshift_table_columns)
        gzw.write(tsv_text) if tsv_text and not tsv_text.empty?
      rescue => e
        text = record.is_a?(Hash) ? record[@record_log_tag] : record
        $log.error format_log("failed to create table text from #{@file_type}. text=(#{text})"), error:e.to_s
        $log.error_backtrace
      end
    end
    return nil unless gzw.pos > 0
  ensure
    gzw.close rescue nil if gzw
  end
  dst_file
end
determine_delimiter(file_type) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 213
def determine_delimiter(file_type)
  case file_type
  when 'json', 'msgpack', 'tsv'
    "\t"
  when "csv"
    ','
  else
    raise Fluent::ConfigError, "Invalid file_type:#{file_type}."
  end
end
escape_text_for_copy(val) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 344
def escape_text_for_copy(val)
  val.gsub(/\\|\t|\n/, {"\\" => "\\\\", "\t" => "\\\t", "\n" => "\\\n"})  # escape tab, newline and backslash
end
fetch_columns_sql() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 306
def fetch_columns_sql
  sql = "select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '#{@redshift_tablename}'"
  sql << " and table_schema = '#{@redshift_schemaname}'" if @redshift_schemaname
  sql << " order by ordinal_position;"
  @last_sql = sql
  sql
end
fetch_table_columns() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 300
def fetch_table_columns
  @redshift_connection.exec(fetch_columns_sql) do |result|
    result.map { |row| row['column_name'] }
  end
end
generate_line_with_delimiter(val_list, delimiter) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 329
def generate_line_with_delimiter(val_list, delimiter)
  val_list.collect do |val|
    case val
    when nil
      "\\N"
    when ''
      ''
    when Hash, Array
      escape_text_for_copy(JSON.generate(val))
    else
      escape_text_for_copy(val.to_s)
    end
  end.join(delimiter) + "\n"
end
hash_to_table_text(hash, redshift_table_columns) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 314
def hash_to_table_text(hash, redshift_table_columns)
  if hash
    values = redshift_table_columns.map { |cn| hash[cn] }

    if values.compact.empty?
      $log.warn format_log("no data match for table columns on redshift. data=#{hash} table_columns=#{redshift_table_columns}")
      return ''
    else
      generate_line_with_delimiter(values, delimiter)
    end
  else
    ''
  end
end
json?() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 244
def json?
  @file_type == 'json'
end
msgpack?() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 248
def msgpack?
  @file_type == 'msgpack'
end