class Charrington::Insert
Constants
- Error
- EventNil
- InsertFailed
- TableNameNil
Attributes
columns[R]
connection[R]
driver[R]
enable_event_as_json_keyword[R]
event[RW]
event_as_json_keyword[R]
opts[R]
schema[R]
should_retry[RW]
table_name[R]
tracks[R]
transformer[R]
Public Class Methods
new(connection, event, opts = {})
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 28 def initialize(connection, event, opts = {}) raise EventNil, "Table name is nil" if event.nil? @transformer = opts[:transformer] @event = event.to_hash @tracks = create_tracks(@event) event_name = event["event"].to_s.strip raise TableNameNil, "Table name is nil" if event_name.empty? @connection = connection @schema = opts[:schema].empty? ? '' : "#{opts[:schema]}." @table_name = underscore(event_name) @columns = event.keys.map{|x| underscore(x)} @should_retry = false @enable_event_as_json_keyword = opts[:enable_event_as_json_keyword] @event_as_json_keyword = opts[:event_as_json_keyword] @driver = opts[:driver] @opts = opts end
Public Instance Methods
call()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 49 def call self.logger.info "Attempting insert into table name: #{table_name}" insert_sql = insert_event_statement insert_stmt = connection.prepareStatement(insert_sql) self.logger.info "Insert statement passed into prepareStatement is: #{insert_stmt}" insert_stmt = add_statement_event_params(insert_stmt, event) self.logger.info "Insert statement to be run is: #{insert_stmt.toString}" insert_stmt.execute self.logger.info "Attempting insert into tracks table" do_tracks_insert should_retry rescue Java::JavaSql::SQLException => e case e.getSQLState() when "42P01" self.logger.info "Received Java::JavaSql::SQLException with error sql state of 42P01, moving to create table" should_retry = create_table when "42703" self.logger.info "Received Java::JavaSql::SQLException with error sql state of 42703, moving to alter table" should_retry = alter_table else raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}" end should_retry rescue => e raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}" ensure insert_stmt.close unless insert_stmt.nil? cleanup end
do_tracks_insert()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 82 def do_tracks_insert tracks_sql = insert_tracks_statement tracks_stmt = connection.prepareStatement(tracks_sql) tracks_stmt = add_statement_event_params(tracks_stmt, tracks) self.logger.info "Insert tracks statment to be run: #{tracks_stmt.toString}" tracks_stmt.execute rescue Java::JavaSql::SQLException => e self.logger.error("SQLException (Charrington:Insert) Insert tracks entry failed. #{e.message} #{tracks_sql}") ensure tracks_stmt.close unless tracks_stmt.nil? end
Private Instance Methods
add_statement_event_params(stmt, map)
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 171 def add_statement_event_params(stmt, map) values = [] map.keys.each_with_index do |key, idx| pos = idx + 1 value = map[key] values << value if @@timestamp_columns.include?(key) begin time = parse_date(value) stmt.setTimestamp(pos, time) next rescue java.text.ParseException => e time = parse_date(value, "yyyy-MM-dd'T'HH:mm:ss'Z'") stmt.setTimestamp(pos, time) next end end case value when Time stmt.setString(pos, value.strftime(STRFTIME_FMT)) when LogStash::Timestamp stmt.setString(pos, value.time.strftime(STRFTIME_FMT)) when Integer if value > 2147483647 || value < -2147483648 stmt.setLong(pos, value) else stmt.setInt(pos, value) end when BigDecimal stmt.setBigDecimal(pos, value.to_java) when Float stmt.setFloat(pos, value) when String stmt.setString(pos, value[0,254]) # truncate at 254 string characters when Array, Hash stmt.setString(pos, value.to_json) when true, false stmt.setBoolean(pos, value) else stmt.setString(pos, nil) end end stmt end
alter_table()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 124 def alter_table if is_postgres? Charrington::AlterPostgresTable.call(connection, event, schema, table_name, columns) elsif is_redshift? Charrington::AlterRedshiftTable.call(connection, event, schema, table_name, columns) end end
arr_to_csv(arr)
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 225 def arr_to_csv(arr) '(' + arr.join(', ') + ')' end
cleanup()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 132 def cleanup @columns.clear if clearable(@columns) @tracks.clear if clearable(@tracks) end
clearable(obj)
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 229 def clearable(obj) obj.is_a? Hash or obj.is_a? Array end
columns_text()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 146 def columns_text @columns_text ||= arr_to_csv(columns) end
create_table()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 116 def create_table if is_postgres? Charrington::CreatePostgresTable.call(connection, event, schema, table_name, columns, opts) elsif is_redshift? Charrington::CreateRedshiftTable.call(connection, event, schema, table_name, columns, opts) end end
create_tracks(event)
click to toggle source
Set Variables
# File lib/logstash/outputs/charrington/insert.rb, line 138 def create_tracks(event) tracks = event.clone tracks.keys.each do |key| tracks.delete(key) unless tracks_columns.include?(key) end tracks end
execute(connection, sql)
click to toggle source
SQL
# File lib/logstash/outputs/charrington/insert.rb, line 246 def execute(connection, sql) statement = connection.prepareStatement( sql.gsub(/\s+/, " ").strip ) statement.execute() rescue Java::OrgPostgresqlUtil::PSQLException => e self.logger.error "PSQLException: #{e.message}, with SQL: #{sql}" rescue Java::JavaSql::SQLException => e self.logger.error "Redshift SQLException: #{e.message}, with SQL: #{sql}" ensure statement.close unless statement.nil? end
insert_event_statement()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 163 def insert_event_statement "INSERT INTO #{schema}#{table_name} #{columns_text} VALUES #{placeholder_text(columns)}" end
insert_tracks_statement()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 167 def insert_tracks_statement "INSERT INTO #{schema}tracks #{tracks_columns_text} VALUES #{placeholder_text(tracks.keys)}" end
is_postgres?()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 104 def is_postgres? driver == 'postgres' end
is_postgres_transform?()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 108 def is_postgres_transform? transformer == 'postgres' end
is_redshift?()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 100 def is_redshift? driver == 'redshift' end
is_redshift_transform?()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 112 def is_redshift_transform? transformer == 'redshift' end
parse_date(date, fmt = "yyyy-MM-dd'T'HH:mm:ss.S'Z'")
click to toggle source
Helpers
# File lib/logstash/outputs/charrington/insert.rb, line 219 def parse_date(date, fmt = "yyyy-MM-dd'T'HH:mm:ss.S'Z'") format = java.text.SimpleDateFormat.new(fmt) parsed = format.parse(date) java.sql.Timestamp.new(parsed.getTime) end
placeholder_text(columns)
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 159 def placeholder_text(columns) arr_to_csv(value_placeholders(columns)) end
tracks_columns()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 96 def tracks_columns is_redshift_transform? ? @@redshift_tracks_columns : @@postgres_tracks_columns end
tracks_columns_text()
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 150 def tracks_columns_text @tracks_columns_text ||= arr_to_csv(tracks.keys) end
underscore(str)
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 233 def underscore(str) str. gsub(/::/, '/'). gsub(/([A-Z]+)([A-Z][a-z])/,'\1_\2'). gsub(/([a-z\d])([A-Z])/,'\1_\2'). downcase. gsub(/[^a-z0-9]+/, "_"). gsub(/\A_+/, ""). gsub(/_+\z/, "")[0,64] end
value_placeholders(columns)
click to toggle source
# File lib/logstash/outputs/charrington/insert.rb, line 155 def value_placeholders(columns) ('?' * columns.length).split('') end