class LogStash::Outputs::Cassandra::SafeSubmitter
Responsible for submitting parsed actions to cassandra (with or without a retry mechanism)
Public Class Methods
new(options)
click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 9 def initialize(options) @statement_cache = {} @logger = options['logger'] setup_cassandra_session(options) end
Public Instance Methods
submit(actions)
click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 15 def submit(actions) queries = prepare_queries(actions) execute_queries_with_retries(queries) end
Private Instance Methods
execute_async(query, retries, queries)
click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 106 def execute_async(query, retries, queries) future = @session.execute_async(query[:query], arguments: query[:arguments]) future.on_failure { |error| @logger.error('Failed to execute query', :query => query, :error => error) if @retry_policy.is_a?(::Cassandra::Retry::Policies::Backoff) decision = @retry_policy.retry_with_backoff({ :retries => retries, :consistency => @consistency }) if decision.is_a?(::Cassandra::Retry::Decisions::Retry) queries << query end end } future end
execute_queries(queries, retries)
click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 92 def execute_queries(queries, retries) futures = [] while queries.length > 0 query = queries.pop begin future = execute_async(query, retries, queries) futures << future rescue Exception => e @logger.error('Failed to send query', :query => query, :exception => e, :backtrace => e.backtrace) end end futures.each(&:join) end
execute_queries_with_retries(queries)
click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 84 def execute_queries_with_retries(queries) retries = 0 while queries.length > 0 execute_queries(queries, retries) retries += 1 end end
get_query(action)
click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 71 def get_query(action) @logger.debug('generating query for action', :action => action) action_data = action['data'] query = "INSERT INTO #{action['table']} (#{action_data.keys.join(', ')}) VALUES (#{('?' * action_data.keys.count).split(//) * ', '})" unless @statement_cache.has_key?(query) @logger.debug('preparing new query', :query => query) @statement_cache[query] = @session.prepare(query) end @statement_cache[query] end
get_retry_policy(retry_policy)
click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 38 def get_retry_policy(retry_policy) case retry_policy['type'] when 'default' return ::Cassandra::Retry::Policies::Default.new when 'downgrading_consistency' return ::Cassandra::Retry::Policies::DowngradingConsistency.new when 'failthrough' return ::Cassandra::Retry::Policies::Fallthrough.new when 'backoff' return ::Cassandra::Retry::Policies::Backoff.new({ 'backoff_type' => retry_policy['backoff_type'], 'backoff_size' => retry_policy['backoff_size'], 'retry_limit' => retry_policy['retry_limit'], 'logger' => @logger }) else raise ArgumentError, "unknown retry policy type: #{retry_policy['type']}" end end
prepare_queries(actions)
click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 56 def prepare_queries(actions) remaining_queries = Queue.new actions.each do |action| begin if action query = get_query(action) remaining_queries << { :query => query, :arguments => action['data'].values } end rescue Exception => e @logger.error('Failed to prepare query', :action => action, :exception => e, :backtrace => e.backtrace) end end remaining_queries end
setup_cassandra_session(options)
click to toggle source
# File lib/logstash/outputs/cassandra/safe_submitter.rb, line 21 def setup_cassandra_session(options) @retry_policy = get_retry_policy(options['retry_policy']) @consistency = options['consistency'].to_sym cluster = options['cassandra'].cluster( username: options['username'], password: options['password'], protocol_version: options['protocol_version'], hosts: options['hosts'], port: options['port'], consistency: @consistency, timeout: options['request_timeout'], retry_policy: @retry_policy, logger: options['logger'] ) @session = cluster.connect(options['keyspace']) end