class Fluent::CassandraSelector
module Fluent
class Plugin::CassandraSelector < Plugin::Filter
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_cassandra_selector.rb, line 36 def configure(conf) super raise ConfigError, "params 'field' or 'field_json' is require least once" if self.field_json.nil? && self.field.nil? end
filter(tag, time, record)
click to toggle source
# File lib/fluent/plugin/filter_cassandra_selector.rb, line 43 def filter(tag, time, record) dataList = nil cql = getCql(record) begin dataList = @session.execute(cql) rescue Exception => e $log.error "Cannot select Cassandra: #{e.message}\nTrace: #{e.backtrace.to_s}" raise e end if dataList.length == 1 dataList.each do |row| record = prepareRowToHash(row, record) end elsif dataList.length > 1 if self.field_json.nil? || self.field_json.empty? record["data_cassandra"] = dataList.rows.to_a else tmpListRec = [] tmpRec = nil dataList.each do |row| tmpRec = prepareRowToHash(row,{}) tmpListRec.push(tmpRec) end record["data_cassandra"] = tmpListRec end end record end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_cassandra_selector.rb, line 31 def shutdown super @session.close if @session end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_cassandra_selector.rb, line 26 def start super @session ||= get_session(self.host, self.port, self.keyspace, self.connect_timeout, self.username, self.password) end
Private Instance Methods
getCql(record)
click to toggle source
# File lib/fluent/plugin/filter_cassandra_selector.rb, line 105 def getCql(record) cql = "select " if self.field cql += self.field + "," end if self.field_json cql += self.field_json + "," end cql = cql.gsub(/,$/, '') cql += " from #{self.keyspace}.#{self.tablename}" if self.where_condition cql += " where "+prepareCondition(record) end cql += ";" cql end
getDataStrJson(jsonData, record)
click to toggle source
# File lib/fluent/plugin/filter_cassandra_selector.rb, line 93 def getDataStrJson(jsonData, record) if jsonData && !jsonData.empty? jsonData = eval(jsonData) jsonData.each do |k,v| record[k.to_s] = v end end record end
prepareCondition(record)
click to toggle source
# File lib/fluent/plugin/filter_cassandra_selector.rb, line 128 def prepareCondition(record) tmpCondVal = {} tmpStr = nil count = 0 self.where_condition.split(":").each do |str| if count > 0 tmpStr = str.gsub(/(;.*)/, '') tmpCondVal[tmpStr] = record[tmpStr] end count += 1 end tmpStr = self.where_condition tmpCondVal.each do |k,v| tmpStr = tmpStr.gsub(k,v) end tmpStr = tmpStr.gsub(':','') tmpStr = tmpStr.gsub(';','') tmpStr end
prepareRowToHash(row, record)
click to toggle source
# File lib/fluent/plugin/filter_cassandra_selector.rb, line 78 def prepareRowToHash(row, record) if self.field self.field.split(",").each do |col| record[col] = "#{row[col]}" end end if self.field_json self.field_json.split(",").each do |col| record = getDataStrJson("#{row[col]}", record) end end record end