class Charrington::Process

Constants

Error
EventNil
ProcessFailed

Attributes

connection[R]
driver[R]
event[R]
max_retries[R]
opts[R]
retry_interval[RW]
retry_max_interval[R]
schema[R]
should_retry[RW]
transformer[R]

Public Class Methods

new(connection, event, opts={}) click to toggle source
# File lib/logstash/outputs/charrington/process.rb, line 17
def initialize(connection, event, opts={})
  raise EventNil, "Event is nil" if event.nil?
  @connection = connection
  @event = event.to_hash
  @opts = opts

  @max_retries = opts[:max_retries] || 10
  @retry_max_interval = opts[:retry_max_interval] || 2
  @retry_interval = opts[:retry_initial_interval] || 2
  @driver = opts[:driver]
  @transformer = opts[:transformer]

  @attempts = 1
  @should_retry = true
end

Public Instance Methods

call() click to toggle source
# File lib/logstash/outputs/charrington/process.rb, line 33
def call
  while should_retry do
    transformed = case transformer
                    when "redshift"
                      self.logger.info "Found transformer of #{transformer} for driver of #{driver} with event of: #{event}"
                      Charrington::TransformRedshift.call(event)
                    else
                      self.logger.info "Found transformer of #{transformer} for driver of #{driver} with event of: #{event}"
                      Charrington::TransformPostgres.call(event)
                    end
    self.logger.info "Transformed event into: #{transformed}"
    should_retry = Charrington::Insert.call(connection, transformed, opts)
    break if !should_retry

    @attempts += 1
    break if @attempts > max_retries

    # If we're retrying the action, sleep for the recommended interval
    # Double the interval for the next time through to achieve exponential backoff
    sleep_interval
  end
rescue => e
  raise ProcessFailed, e.message
ensure
  connection.close unless connection.nil?
  @event.clear if clearable(@event)
end

Private Instance Methods

clearable(obj) click to toggle source
# File lib/logstash/outputs/charrington/process.rb, line 69
def clearable(obj)
  obj.is_a? Hash or obj.is_a? Array
end
sleep_interval() click to toggle source
# File lib/logstash/outputs/charrington/process.rb, line 63
def sleep_interval
  sleep(retry_interval)
  doubled = retry_interval * 2
  retry_interval = doubled > retry_max_interval ? retry_max_interval : doubled
end