class LogStash::Outputs::Cassandra::SafeSubmitter

Responsible for submitting parsed actions to cassandra (with or without a retry mechanism)

Public Class Methods

new(options) click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 9
def initialize(options)
  @statement_cache = {}
  @logger = options['logger']
  setup_cassandra_session(options)
end

Public Instance Methods

submit(actions) click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 15
def submit(actions)
  queries = prepare_queries(actions)
  execute_queries_with_retries(queries)
end

Private Instance Methods

execute_async(query, retries, queries) click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 106
def execute_async(query, retries, queries)
  future = @session.execute_async(query[:query], arguments: query[:arguments])
  future.on_failure { |error|
    @logger.error('Failed to execute query', :query => query, :error => error)
    if @retry_policy.is_a?(::Cassandra::Retry::Policies::Backoff)
      decision = @retry_policy.retry_with_backoff({ :retries => retries, :consistency => @consistency })
      if decision.is_a?(::Cassandra::Retry::Decisions::Retry)
        queries << query
      end
    end
  }
  future
end
execute_queries(queries, retries) click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 92
def execute_queries(queries, retries)
  futures = []
  while queries.length > 0
    query = queries.pop
    begin
      future = execute_async(query, retries, queries)
      futures << future
    rescue Exception => e
      @logger.error('Failed to send query', :query => query, :exception => e, :backtrace => e.backtrace)
    end
  end
  futures.each(&:join)
end
execute_queries_with_retries(queries) click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 84
def execute_queries_with_retries(queries)
  retries = 0
  while queries.length > 0
    execute_queries(queries, retries)
    retries += 1
  end
end
get_query(action) click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 71
    def get_query(action)
      @logger.debug('generating query for action', :action => action)
      action_data = action['data']
      query =
"INSERT INTO #{action['table']} (#{action_data.keys.join(', ')})
VALUES (#{('?' * action_data.keys.count).split(//) * ', '})"
      unless @statement_cache.has_key?(query)
        @logger.debug('preparing new query', :query => query)
        @statement_cache[query] = @session.prepare(query)
      end
      @statement_cache[query]
    end
get_retry_policy(retry_policy) click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 38
def get_retry_policy(retry_policy)
  case retry_policy['type']
    when 'default'
      return ::Cassandra::Retry::Policies::Default.new
    when 'downgrading_consistency'
      return ::Cassandra::Retry::Policies::DowngradingConsistency.new
    when 'failthrough'
      return ::Cassandra::Retry::Policies::Fallthrough.new
    when 'backoff'
      return ::Cassandra::Retry::Policies::Backoff.new({
        'backoff_type' => retry_policy['backoff_type'], 'backoff_size' => retry_policy['backoff_size'],
        'retry_limit' => retry_policy['retry_limit'], 'logger' => @logger
      })
    else
      raise ArgumentError, "unknown retry policy type: #{retry_policy['type']}"
  end
end
prepare_queries(actions) click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 56
def prepare_queries(actions)
  remaining_queries = Queue.new
  actions.each do |action|
    begin
      if action
        query = get_query(action)
        remaining_queries << { :query => query, :arguments => action['data'].values }
      end
    rescue Exception => e
      @logger.error('Failed to prepare query', :action => action, :exception => e, :backtrace => e.backtrace)
    end
  end
  remaining_queries
end
setup_cassandra_session(options) click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 21
def setup_cassandra_session(options)
  @retry_policy = get_retry_policy(options['retry_policy'])
  @consistency = options['consistency'].to_sym
  cluster = options['cassandra'].cluster(
    username: options['username'],
    password: options['password'],
    protocol_version: options['protocol_version'],
    hosts: options['hosts'],
    port: options['port'],
    consistency: @consistency,
    timeout: options['request_timeout'],
    retry_policy: @retry_policy,
    logger: options['logger']
  )
  @session = cluster.connect(options['keyspace'])
end