class Fluent::Plugin::RedshiftOutputV2
Attributes
last_gz_path[R]
last_sql[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redshift_v2.rb, line 40 def initialize super require 'aws-sdk' require 'zlib' require 'time' require 'tempfile' require 'pg' require 'json' require 'csv' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redshift_v2.rb, line 52 def configure(conf) super if !check_credentials fail ConfigError, "aws_key_id and aws_sec_key is required. or, use aws_iam_role instead." end @path = "#{@path}/" unless @path.end_with?('/') @path = @path[1..-1] if @path.start_with?('/') @utc = true if conf['utc'] @db_conf = { host: @redshift_host, port: @redshift_port, dbname: @redshift_dbname, user: @redshift_user, password: @redshift_password, connect_timeout: @redshift_connect_timeout, hostaddr: IPSocket.getaddress(@redshift_host) } @delimiter = determine_delimiter(@file_type) if @delimiter.nil? or @delimiter.empty? $log.debug format_log("redshift file_type:#{@file_type} delimiter:'#{@delimiter}'") @table_name_with_schema = [@redshift_schemaname, @redshift_tablename].compact.join('.') @redshift_copy_columns = if @redshift_copy_columns.to_s.empty? nil else @redshift_copy_columns.split(/[,\s]+/) end @copy_sql_template = build_redshift_copy_sql_template @s3_server_side_encryption = @s3_server_side_encryption.to_sym if @s3_server_side_encryption end
copy_sql(s3_uri)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 185 def copy_sql(s3_uri) @last_sql = @copy_sql_template % s3_uri end
create_gz_file(chunk)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 126 def create_gz_file(chunk) tmp = Tempfile.new("s3-") tmp = if json? || msgpack? create_gz_file_from_structured_data(tmp, chunk) else create_gz_file_from_flat_data(tmp, chunk) end if tmp key = next_gz_path @bucket.put_object({ server_side_encryption: @s3_server_side_encryption, body: tmp, key: key }) tmp.close! @last_gz_path = key else $log.debug format_log("received no valid data. ") return false end end
exec_copy(s3_uri)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 166 def exec_copy(s3_uri) $log.debug format_log("start copying. s3_uri=#{s3_uri}") begin @redshift_connection.exec copy_sql(s3_uri) $log.info format_log("completed copying to redshift. s3_uri=#{s3_uri}") true rescue RedshiftError => e if e.to_s =~ /^ERROR: Load into table '[^']+' failed\./ $log.error format_log("failed to copy data into redshift due to load error. s3_uri=#{s3_uri}"), error:e.to_s return false end raise e end end
format(_tag, _time, record)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 102 def format(_tag, _time, record) if json? record.to_msgpack elsif msgpack? { @record_log_tag => record }.to_msgpack else "#{record[@record_log_tag]}\n" end end
format_log(message)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 189 def format_log(message) if @log_suffix && !@log_suffix.empty? "#{message} #{@log_suffix}" else message end end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 197 def formatted_to_msgpack_binary true end
insert_logs(chunk)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 121 def insert_logs(chunk) $log.debug format_log("start creating gz.") exec_copy s3_uri(create_gz_file(chunk)) end
next_gz_path()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 151 def next_gz_path timestamp_key = (@utc) ? Time.now.utc.strftime(@timestamp_key_format) : Time.now.strftime(@timestamp_key_format) i = 0 path = '' loop do path = "#{@path}#{timestamp_key}_#{'%02d' % i}.gz" if @bucket.object(path).exists? i += 1 else break end end path end
s3_uri(path)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 181 def s3_uri(path) "s3://#{@s3_bucket}/#{path}" end
shutdown()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 99 def shutdown end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redshift_v2.rb, line 81 def start super options = {} if @aws_key_id && @aws_sec_key options = { access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, force_path_style: true, region: @s3_region } end options[:endpoint] = @s3_endpoint if @s3_endpoint @client = Aws::S3::Client.new(options) @bucket = Aws::S3::Bucket.new @s3_bucket, client: @client @redshift_connection = RedshiftConnection.new(@db_conf) end
try_write(chunk)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 116 def try_write(chunk) insert_logs chunk commit_write chunk.unique_id end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 112 def write(chunk) insert_logs chunk end
Private Instance Methods
build_redshift_copy_sql_template()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 224 def build_redshift_copy_sql_template copy_columns = if @redshift_copy_columns "(#{@redshift_copy_columns.join(",")})" else '' end credentials = if @aws_key_id && @aws_sec_key "CREDENTIALS 'aws_access_key_id=#{@aws_key_id};aws_secret_access_key=#{@aws_sec_key}'" else "CREDENTIALS 'aws_iam_role=#{@aws_iam_role}'" end escape = if !@redshift_copy_base_options.include?('ESCAPE') && (json? || msgpack?) " ESCAPE" else '' end "copy #{@table_name_with_schema}#{copy_columns} from '%s' #{credentials} delimiter '#{@delimiter}' GZIP#{escape} #{@redshift_copy_base_options} #{@redshift_copy_options};" end
check_credentials()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 203 def check_credentials if @aws_key_id && @aws_sec_key true elsif @aws_iam_role true else false end end
create_gz_file_from_flat_data(dst_file, chunk)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 252 def create_gz_file_from_flat_data(dst_file, chunk) gzw = nil begin gzw = Zlib::GzipWriter.new(dst_file) chunk.write_to(gzw) ensure gzw.close rescue nil if gzw end dst_file end
create_gz_file_from_structured_data(dst_file, chunk)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 263 def create_gz_file_from_structured_data(dst_file, chunk) redshift_table_columns = fetch_table_columns if redshift_table_columns == nil raise "failed to fetch the redshift table definition." elsif redshift_table_columns.empty? $log.warn format_log("no table on redshift or cannot access table. table_name=#{@table_name_with_schema}") return nil end if @redshift_copy_columns unknown_colmns = @redshift_copy_columns - redshift_table_columns unless unknown_colmns.empty? fail Fluent::ConfigError, "missing columns included in redshift_copy_columns - missing columns:\"#{unknown_colmns.join(',')}\"" end end gzw = nil begin gzw = Zlib::GzipWriter.new(dst_file) chunk.msgpack_each do |record| next unless record begin tsv_text = hash_to_table_text(record, redshift_table_columns) gzw.write(tsv_text) if tsv_text and not tsv_text.empty? rescue => e text = record.is_a?(Hash) ? record[@record_log_tag] : record $log.error format_log("failed to create table text from #{@file_type}. text=(#{text})"), error:e.to_s $log.error_backtrace end end return nil unless gzw.pos > 0 ensure gzw.close rescue nil if gzw end dst_file end
determine_delimiter(file_type)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 213 def determine_delimiter(file_type) case file_type when 'json', 'msgpack', 'tsv' "\t" when "csv" ',' else raise Fluent::ConfigError, "Invalid file_type:#{file_type}." end end
escape_text_for_copy(val)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 344 def escape_text_for_copy(val) val.gsub(/\\|\t|\n/, {"\\" => "\\\\", "\t" => "\\\t", "\n" => "\\\n"}) # escape tab, newline and backslash end
fetch_columns_sql()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 306 def fetch_columns_sql sql = "select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '#{@redshift_tablename}'" sql << " and table_schema = '#{@redshift_schemaname}'" if @redshift_schemaname sql << " order by ordinal_position;" @last_sql = sql sql end
fetch_table_columns()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 300 def fetch_table_columns @redshift_connection.exec(fetch_columns_sql) do |result| result.map { |row| row['column_name'] } end end
generate_line_with_delimiter(val_list, delimiter)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 329 def generate_line_with_delimiter(val_list, delimiter) val_list.collect do |val| case val when nil "\\N" when '' '' when Hash, Array escape_text_for_copy(JSON.generate(val)) else escape_text_for_copy(val.to_s) end end.join(delimiter) + "\n" end
hash_to_table_text(hash, redshift_table_columns)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 314 def hash_to_table_text(hash, redshift_table_columns) if hash values = redshift_table_columns.map { |cn| hash[cn] } if values.compact.empty? $log.warn format_log("no data match for table columns on redshift. data=#{hash} table_columns=#{redshift_table_columns}") return '' else generate_line_with_delimiter(values, delimiter) end else '' end end
json?()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 244 def json? @file_type == 'json' end
msgpack?()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 248 def msgpack? @file_type == 'msgpack' end