class Fluent::CassandraUpsertor
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_upsert.rb, line 35 def configure(conf) super # perform validations raise ConfigError, "params 'where_condition_upd' is require condition or primarykey for case update" if self.where_condition_upd.nil? @caseInsertValue = self.case_insert_value @caseUpdateValue = self.case_update_value @whereCondUpd = self.where_condition_upd end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_cassandra_upsert.rb, line 46 def format(tag, time, record) record.to_msgpack end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_upsert.rb, line 30 def shutdown super @session.close if @session end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_upsert.rb, line 25 def start super @session ||= get_session(self.host, self.port, self.keyspace, self.connect_timeout, self.username, self.password) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_cassandra_upsert.rb, line 50 def write(chunk) chunk.msgpack_each { |record| whereCondition = prepareParameter(@whereCondUpd, record) cql = "select count(*) from #{self.keyspace}.#{self.tablename}" cql += " where " + whereCondition + ";" countRow = nil begin countRow = @session.execute(cql) rescue Exception => e $log.error "Cannot select Cassandra: #{e.message}\nTrace: #{e.backtrace.to_s}" raise e end countRow = getRowCount(countRow) if countRow > 0 @caseUpdateValue = prepareParameter(@caseUpdateValue, record) updateCassandra(@caseUpdateValue, whereCondition) else @caseInsertValue = prepareParameter(@caseInsertValue, record) insertCassandra(@caseInsertValue) end } end
Private Instance Methods
getRowCount(countRow)
click to toggle source
# File lib/fluent/plugin/out_cassandra_upsert.rb, line 117 def getRowCount(countRow) rc = 0 if countRow.length > 0 countRow.each do |row| rc = "#{row['count']}" end rc = rc.to_i end rc end
insertCassandra(insertVal)
click to toggle source
# File lib/fluent/plugin/out_cassandra_upsert.rb, line 81 def insertCassandra(insertVal) colIns = [] valIns = [] tmpStr = nil insertVal.split(",").each do |str| tmpStr = str.split("=") colIns.push(tmpStr[0]) valIns.push(tmpStr[1]) end cql = "INSERT INTO #{self.keyspace}.#{self.tablename} (#{colIns.join(',')}) VALUES (#{valIns.join(',')});" begin @session.execute(cql) rescue Exception => e $log.error "Cannot insert record Cassandra: #{e.message}\nTrace: #{e.backtrace.to_s}" raise e end end
prepareParameter(strOri,record)
click to toggle source
# File lib/fluent/plugin/out_cassandra_upsert.rb, line 128 def prepareParameter(strOri,record) tmpCondVal = {} tmpStr = nil count = 0 strOri.split(":").each do |str| if count > 0 tmpStr = str.gsub(/(;.*)/, '') tmpCondVal[tmpStr] = record[tmpStr] end count += 1 end tmpCondVal.each do |k,v| strOri= strOri.gsub(k,v) end strOri = strOri.gsub(':','') strOri = strOri.gsub(';','') strOri end
updateCassandra(updateVal, whereCondition)
click to toggle source
# File lib/fluent/plugin/out_cassandra_upsert.rb, line 103 def updateCassandra(updateVal, whereCondition) cql = "update #{self.keyspace}.#{self.tablename} set " cql += updateVal + " where " + whereCondition + ";" begin @session.execute(cql) rescue Exception => e $log.error "Cannot update record Cassandra: #{e.message}\nTrace: #{e.backtrace.to_s}" raise e end end