class Fluent::Plugin::CassandraJsonOutput
Public Instance Methods
close()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_json.rb, line 89 def close super @cluster.close end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_json.rb, line 64 def configure(conf) super if @hosts.empty? raise Fluent::ConfigError, "`hosts` has at least one host" end @cluster_options = @cluster_options.map { |k, v| [k.to_sym, v] }.to_h @cluster_options.merge!(hosts: @hosts, port: @port) @cluster_options.merge!(username: @username) if @username @cluster_options.merge!(password: @password) if @password formatter_config = conf.elements("format")[0] @formatter = formatter_create(usage: 'out_cassandra_for_insert', type: 'json', conf: formatter_config) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_cassandra_json.rb, line 95 def format(tag, time, record) record = inject_values_to_record(tag, time, record) @formatter.format(tag, time, record) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_cassandra_json.rb, line 100 def multi_workers_ready? true end
session()
click to toggle source
# File lib/fluent/plugin/out_cassandra_json.rb, line 85 def session Thread.current[:session] ||= @cluster.connect(@keyspace) end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_json.rb, line 79 def start super @cluster = Cassandra.cluster(@cluster_options) end
try_write(chunk)
click to toggle source
# File lib/fluent/plugin/out_cassandra_json.rb, line 104 def try_write(chunk) if chunk.empty? commit_write(chunk.unique_id) return end keyspace = extract_placeholders(@keyspace, chunk.metadata) table = extract_placeholders(@table, chunk.metadata) futures = chunk.open do |io| io.map do |line| line.chomp! cql = "INSERT INTO #{keyspace}.#{table} JSON '#{line.gsub("'", "''")}'#{@default_unset ? " DEFAULT UNSET" : ""}" cql << " IF NOT EXISTS" if @if_not_exists cql << " USING TTL #{@ttl}" if @ttl && @ttl > 0 @log.debug(cql) future = session.execute_async(cql, consistency: @consistency, idempotent: @idempotent) future.on_failure do |error| if @skip_invalid_rows @log.warn("failed to insert", record: line, error: error) else @log.error("failed to insert", record: line, error: error) end end end end combined = Cassandra::Future.all(futures) combined.on_complete do |value, error| if error.nil? || @skip_invalid_rows commit_write(chunk.unique_id) else rollback_write(chunk.unique_id) end end end