class Fluent::BigObjectOutput::TableElement
Attributes
mpattern[R]
Public Class Methods
new(log, bo_hostname, bo_port, tag_format)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigobject.rb, line 34 def initialize(log, bo_hostname, bo_port, tag_format) super() @log = log @bo_hostname = bo_hostname @bo_port = bo_port @bo_url="http://#{@bo_hostname}:#{@bo_port}/cmd" @tag_format = tag_format end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigobject.rb, line 43 def configure(conf) super @mpattern = Fluent::MatchPattern.create(pattern) @mapping = (@column_mapping==nil)? nil:parse_column_mapping(@column_mapping) @log.info("column mapping for #{table} - #{@mapping}") @format_proc = Proc.new { |record| if (@mapping==nil) record else new_record = {} @mapping.each { |k, c| new_record[c] = record[k] } new_record end } end
getPkeyValue(value)
click to toggle source
# File lib/fluent/plugin/out_bigobject.rb, line 62 def getPkeyValue(value) if (@bo_primary_key_is_int) return value else return"\"#{value}\"" end end
send(chunk)
click to toggle source
Send Data to Bigobject using Restful API
# File lib/fluent/plugin/out_bigobject.rb, line 71 def send(chunk) insertStmts = Array.new deleteStmts = Array.new columns = nil chunk.msgpack_each { |tag, time, data| tag_parts = tag.match(@tag_format) target_event = tag_parts['event'] id_key = tag_parts['primary_key'] keys = Array.new values = Array.new data = @format_proc.call(data) data.keys.sort.each do |key| keys << key values << data[key].to_json end if (target_event=='insert') if columns.to_s.empty? columns = "(#{keys.join(",")})" end insertStmts.push("(#{values.join(",")})") elsif (target_event=='update') pkey="" updates = Array.new keys.zip(values) { |key, value| if (key==id_key) pkey = getPkeyValue(value) else updates.push("#{key}=#{value}") end } sendStmt = "UPDATE #{table} SET #{updates.join(",")} WHERE #{id_key}=#{pkey}" sendBO(@bo_url, sendStmt) elsif (target_event=='delete') keys.zip(values) { |key, value| if (key==id_key) pkey = getPkeyValue(value) end deleteStmts.push("#{id_key}=#{pkey}") } end } if insertStmts.length>0 sendStmt = "INSERT INTO #{@table} #{columns} VALUES " + insertStmts.join(",") sendBO(@bo_url, sendStmt) @log.debug("sending #{insertStmts.length} rows to bigobject for insert via Restful API") end if deleteStmts.length>0 sendStmt = "DELETE FROM #{@table} WHERE " + deleteStmts.join(" or ") sendBO(@bo_url, sendStmt) @log.debug("sending #{deleteStmts.length} rows to bigobject for delete via Restful API") end end
to_s()
click to toggle source
# File lib/fluent/plugin/out_bigobject.rb, line 129 def to_s "table:#{table}, column_mapping:#{column_mapping}, pattern:#{pattern}" end
Private Instance Methods
formatRequest(stmt)
click to toggle source
# File lib/fluent/plugin/out_bigobject.rb, line 144 def formatRequest(stmt) params = Hash.new params['Stmt'] = stmt if @bo_workspace.to_s!='' params['Workspace'] = @bo_workspace end if @bo_opts.to_s!='' params['Opts'] = @bo_opts end return params end
parse_column_mapping(column_mapping_conf)
click to toggle source
# File lib/fluent/plugin/out_bigobject.rb, line 134 def parse_column_mapping(column_mapping_conf) mapping = {} column_mapping_conf.split(',').each { |column_map| key, column = column_map.strip.split(':', 2) column = key if column.nil? mapping[key] = column } mapping end
sendBO(bourl, sendStmt)
click to toggle source
# File lib/fluent/plugin/out_bigobject.rb, line 157 def sendBO(bourl, sendStmt) params = formatRequest(sendStmt) begin resp = RestClient.post bourl, params.to_json, :content_type =>:json, :accept =>:json @log.debug("resp= #{resp.body}") rescue Exception => e @log.error(e.message) raise "Failed to sendBO: #{e.message}" end parsed = JSON.parse(resp) err = parsed['Err'] if (err.to_s!='') @log.error("[BigObject] #{err}") end end