class Fluent::BigObjectOutput::TableElement

Attributes

mpattern[R]

Public Class Methods

new(log, bo_hostname, bo_port, tag_format) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigobject.rb, line 34
def initialize(log, bo_hostname, bo_port, tag_format)
  super()
  @log = log
  @bo_hostname = bo_hostname
  @bo_port = bo_port
  @bo_url="http://#{@bo_hostname}:#{@bo_port}/cmd"
  @tag_format = tag_format
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigobject.rb, line 43
def configure(conf)
  super

  @mpattern = Fluent::MatchPattern.create(pattern)
  @mapping = (@column_mapping==nil)? nil:parse_column_mapping(@column_mapping)
  @log.info("column mapping for #{table} - #{@mapping}")
  @format_proc = Proc.new { |record|
    if (@mapping==nil)
      record
    else
      new_record = {}
      @mapping.each { |k, c|
        new_record[c] = record[k]
        }
      new_record
    end
  }
end
getPkeyValue(value) click to toggle source
# File lib/fluent/plugin/out_bigobject.rb, line 62
def getPkeyValue(value)
  if (@bo_primary_key_is_int)
       return value
  else
       return"\"#{value}\""
  end
end
send(chunk) click to toggle source

Send Data to Bigobject using Restful API

# File lib/fluent/plugin/out_bigobject.rb, line 71
def send(chunk)
  insertStmts = Array.new
  deleteStmts = Array.new
  
  columns = nil
  chunk.msgpack_each { |tag, time, data|
     tag_parts = tag.match(@tag_format)
     target_event = tag_parts['event']
     id_key = tag_parts['primary_key']
     
     keys = Array.new
     values = Array.new
     data = @format_proc.call(data)
     data.keys.sort.each do |key|
        keys << key
        values << data[key].to_json
     end
      
     if (target_event=='insert')
        if columns.to_s.empty?
          columns = "(#{keys.join(",")})"
        end
        insertStmts.push("(#{values.join(",")})")
     elsif (target_event=='update')
       pkey=""
       updates = Array.new
       keys.zip(values) { |key, value|
           if (key==id_key)
             pkey = getPkeyValue(value)
           else
             updates.push("#{key}=#{value}")
           end 
       }
       sendStmt = "UPDATE #{table} SET #{updates.join(",")} WHERE #{id_key}=#{pkey}"
       sendBO(@bo_url, sendStmt)   
     elsif (target_event=='delete')
       keys.zip(values) { |key, value|
            if (key==id_key)
              pkey = getPkeyValue(value)
            end
            deleteStmts.push("#{id_key}=#{pkey}")
        }
     end
  }
  
  if insertStmts.length>0
    sendStmt = "INSERT INTO #{@table}  #{columns} VALUES " + insertStmts.join(",")
    sendBO(@bo_url, sendStmt)
    @log.debug("sending #{insertStmts.length} rows to bigobject for insert via Restful API")
  end 
  
  if deleteStmts.length>0
    sendStmt = "DELETE FROM #{@table} WHERE " + deleteStmts.join(" or ")
    sendBO(@bo_url, sendStmt)
    @log.debug("sending #{deleteStmts.length} rows to bigobject for delete via Restful API")
  end
end
to_s() click to toggle source
# File lib/fluent/plugin/out_bigobject.rb, line 129
def to_s
  "table:#{table}, column_mapping:#{column_mapping}, pattern:#{pattern}"
end

Private Instance Methods

formatRequest(stmt) click to toggle source
# File lib/fluent/plugin/out_bigobject.rb, line 144
def formatRequest(stmt)
  params = Hash.new
  params['Stmt'] = stmt

  if @bo_workspace.to_s!=''
    params['Workspace'] = @bo_workspace
  end
  if @bo_opts.to_s!=''
    params['Opts'] = @bo_opts
  end
  return params
end
parse_column_mapping(column_mapping_conf) click to toggle source
# File lib/fluent/plugin/out_bigobject.rb, line 134
def parse_column_mapping(column_mapping_conf)
  mapping = {}
  column_mapping_conf.split(',').each { |column_map|
    key, column = column_map.strip.split(':', 2)
    column = key if column.nil?
    mapping[key] = column
  }
  mapping
end
sendBO(bourl, sendStmt) click to toggle source
# File lib/fluent/plugin/out_bigobject.rb, line 157
def sendBO(bourl, sendStmt)
  params = formatRequest(sendStmt)
  begin
    resp = RestClient.post bourl, params.to_json, :content_type =>:json, :accept =>:json
    @log.debug("resp= #{resp.body}")  
  rescue Exception => e 
    @log.error(e.message)  
    raise "Failed to sendBO: #{e.message}"
  end
  
  parsed = JSON.parse(resp)
  err = parsed['Err']
  if (err.to_s!='')
    @log.error("[BigObject] #{err}")
  end
  
end