class Charrington::Insert

Constants

Error
EventNil
InsertFailed
TableNameNil

Attributes

columns[R]
connection[R]
driver[R]
enable_event_as_json_keyword[R]
event[RW]
event_as_json_keyword[R]
opts[R]
schema[R]
should_retry[RW]
table_name[R]
tracks[R]
transformer[R]

Public Class Methods

new(connection, event, opts = {}) click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 28
def initialize(connection, event, opts = {})
  raise EventNil, "Table name is nil" if event.nil?
  @transformer = opts[:transformer]
  @event = event.to_hash
  @tracks = create_tracks(@event)
  event_name = event["event"].to_s.strip
  raise TableNameNil, "Table name is nil" if event_name.empty?

  @connection = connection
  @schema = opts[:schema].empty? ? '' : "#{opts[:schema]}."

  @table_name = underscore(event_name)

  @columns = event.keys.map{|x| underscore(x)}
  @should_retry = false
  @enable_event_as_json_keyword = opts[:enable_event_as_json_keyword]
  @event_as_json_keyword = opts[:event_as_json_keyword]
  @driver = opts[:driver]
  @opts = opts
end

Public Instance Methods

call() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 49
def call
  self.logger.info "Attempting insert into table name: #{table_name}"
  insert_sql = insert_event_statement
  insert_stmt = connection.prepareStatement(insert_sql)
  self.logger.info "Insert statement passed into prepareStatement is: #{insert_stmt}"
  insert_stmt = add_statement_event_params(insert_stmt, event)
  self.logger.info "Insert statement to be run is: #{insert_stmt.toString}"
  insert_stmt.execute

  self.logger.info "Attempting insert into tracks table"
  do_tracks_insert

  should_retry
rescue Java::JavaSql::SQLException => e
  case e.getSQLState()
  when "42P01"
    self.logger.info "Received Java::JavaSql::SQLException with error sql state of 42P01, moving to create table"
    should_retry = create_table
  when "42703"
    self.logger.info "Received Java::JavaSql::SQLException with error sql state of 42703, moving to alter table"
    should_retry = alter_table
  else
    raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}"
  end
  should_retry

rescue => e
  raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}"
ensure
  insert_stmt.close unless insert_stmt.nil?
  cleanup
end
do_tracks_insert() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 82
def do_tracks_insert
  tracks_sql = insert_tracks_statement
  tracks_stmt = connection.prepareStatement(tracks_sql)
  tracks_stmt = add_statement_event_params(tracks_stmt, tracks)
  self.logger.info "Insert tracks statment to be run: #{tracks_stmt.toString}"
  tracks_stmt.execute
rescue Java::JavaSql::SQLException => e
  self.logger.error("SQLException (Charrington:Insert) Insert tracks entry failed. #{e.message} #{tracks_sql}")
ensure
  tracks_stmt.close unless tracks_stmt.nil?
end

Private Instance Methods

add_statement_event_params(stmt, map) click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 171
def add_statement_event_params(stmt, map)
  values = []
  map.keys.each_with_index do |key, idx|
    pos = idx + 1
    value = map[key]
    values << value

    if @@timestamp_columns.include?(key)
      begin
        time = parse_date(value)
        stmt.setTimestamp(pos, time)
        next
      rescue java.text.ParseException => e
        time = parse_date(value, "yyyy-MM-dd'T'HH:mm:ss'Z'")
        stmt.setTimestamp(pos, time)
        next
      end
    end

    case value
    when Time
      stmt.setString(pos, value.strftime(STRFTIME_FMT))
    when LogStash::Timestamp
      stmt.setString(pos, value.time.strftime(STRFTIME_FMT))
    when Integer
      if value > 2147483647 || value < -2147483648
        stmt.setLong(pos, value)
      else
        stmt.setInt(pos, value)
      end
    when BigDecimal
      stmt.setBigDecimal(pos, value.to_java)
    when Float
      stmt.setFloat(pos, value)
    when String
      stmt.setString(pos, value[0,254]) # truncate at 254 string characters
    when Array, Hash
      stmt.setString(pos, value.to_json)
    when true, false
      stmt.setBoolean(pos, value)
    else
      stmt.setString(pos, nil)
    end
  end
  stmt
end
alter_table() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 124
def alter_table
  if is_postgres?
    Charrington::AlterPostgresTable.call(connection, event, schema, table_name, columns)
  elsif is_redshift?
    Charrington::AlterRedshiftTable.call(connection, event, schema, table_name, columns)
  end
end
arr_to_csv(arr) click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 225
def arr_to_csv(arr)
  '(' + arr.join(', ') + ')'
end
cleanup() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 132
def cleanup
  @columns.clear if clearable(@columns)
  @tracks.clear if clearable(@tracks)
end
clearable(obj) click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 229
def clearable(obj)
  obj.is_a? Hash or obj.is_a? Array
end
columns_text() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 146
def columns_text
  @columns_text ||= arr_to_csv(columns)
end
create_table() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 116
def create_table
  if is_postgres?
    Charrington::CreatePostgresTable.call(connection, event, schema, table_name, columns, opts)
  elsif is_redshift?
    Charrington::CreateRedshiftTable.call(connection, event, schema, table_name, columns, opts)
  end
end
create_tracks(event) click to toggle source

Set Variables

# File lib/logstash/outputs/charrington/insert.rb, line 138
def create_tracks(event)
  tracks = event.clone
  tracks.keys.each do |key|
    tracks.delete(key) unless tracks_columns.include?(key)
  end
  tracks
end
execute(connection, sql) click to toggle source

SQL

# File lib/logstash/outputs/charrington/insert.rb, line 246
def execute(connection, sql)
  statement = connection.prepareStatement( sql.gsub(/\s+/, " ").strip )
  statement.execute()
rescue Java::OrgPostgresqlUtil::PSQLException => e
  self.logger.error "PSQLException: #{e.message}, with SQL: #{sql}"
rescue Java::JavaSql::SQLException => e
  self.logger.error "Redshift SQLException: #{e.message}, with SQL: #{sql}"
ensure
  statement.close unless statement.nil?
end
insert_event_statement() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 163
def insert_event_statement
  "INSERT INTO #{schema}#{table_name} #{columns_text} VALUES #{placeholder_text(columns)}"
end
insert_tracks_statement() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 167
def insert_tracks_statement
  "INSERT INTO #{schema}tracks #{tracks_columns_text} VALUES #{placeholder_text(tracks.keys)}"
end
is_postgres?() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 104
def is_postgres?
  driver == 'postgres'
end
is_postgres_transform?() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 108
def is_postgres_transform?
  transformer == 'postgres'
end
is_redshift?() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 100
def is_redshift?
  driver == 'redshift'
end
is_redshift_transform?() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 112
def is_redshift_transform?
  transformer == 'redshift'
end
parse_date(date, fmt = "yyyy-MM-dd'T'HH:mm:ss.S'Z'") click to toggle source

Helpers

# File lib/logstash/outputs/charrington/insert.rb, line 219
def parse_date(date, fmt = "yyyy-MM-dd'T'HH:mm:ss.S'Z'")
  format = java.text.SimpleDateFormat.new(fmt)
  parsed = format.parse(date)
  java.sql.Timestamp.new(parsed.getTime)
end
placeholder_text(columns) click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 159
def placeholder_text(columns)
  arr_to_csv(value_placeholders(columns))
end
tracks_columns() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 96
def tracks_columns
  is_redshift_transform? ? @@redshift_tracks_columns : @@postgres_tracks_columns
end
tracks_columns_text() click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 150
def tracks_columns_text
  @tracks_columns_text ||= arr_to_csv(tracks.keys)
end
underscore(str) click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 233
def underscore(str)
  str.
  gsub(/::/, '/').
  gsub(/([A-Z]+)([A-Z][a-z])/,'\1_\2').
  gsub(/([a-z\d])([A-Z])/,'\1_\2').
  downcase.
  gsub(/[^a-z0-9]+/, "_").
  gsub(/\A_+/, "").
  gsub(/_+\z/, "")[0,64]
end
value_placeholders(columns) click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 155
def value_placeholders(columns)
  ('?' * columns.length).split('')
end