class Fluent::BigObjectOutput_AVRO::TableElement

Attributes

mpattern[R]

Public Class Methods

new(log, bo_hostname, bo_port) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigobject_avro.rb, line 28
def initialize(log, bo_hostname, bo_port)
  super()
  @log = log
  @bo_hostname = bo_hostname
  @bo_port = bo_port
  @bo_url="http://#{@bo_hostname}:#{@bo_port}/cmd"
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigobject_avro.rb, line 36
def configure(conf)
  super

  @avro_schema = Avro::Schema.parse(File.open(@schema_file, "rb").read)
  @avro_writer = Avro::IO::DatumWriter.new(@avro_schema)

  @mpattern = Fluent::MatchPattern.create(pattern)
  @mapping = (@column_mapping==nil)? nil:parse_column_mapping(@column_mapping)
  @log.info("column mapping for #{pattern} - #{@mapping}")
  @format_proc = Proc.new { |record|
    if (@mapping==nil)
      record
    else
      new_record = {}
      @mapping.each { |k, c|
        new_record[c] = record[k]
        }
      new_record
    end
  }
end
send_binary(chunk) click to toggle source

Send data to Bigobject using binary AVRO

# File lib/fluent/plugin/out_bigobject_avro.rb, line 59
def send_binary(chunk)
  
  buffer = StringIO.new()      
  dw = Avro::DataFile::Writer.new(buffer, @avro_writer, @avro_schema)
  i=0
  chunk.msgpack_each { |tag, time, data|
     data = @format_proc.call(data)
     dw<<data
     i+=1
  }
  dw.flush

  begin
    socket = TCPSocket.open(@bo_hostname, @bo_port)
    begin
      #timeout=60
      opt = [1, 60].pack('I!I!')  # { int l_onoff; int l_linger; }
      socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  
      opt = [60, 0].pack('L!L!')  # struct timeval
      socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)
      socket.write(buffer.string)
    ensure
      socket.close
    end
    
  rescue Exception => e 
      @log.error(e.message)  
      raise "Failed to send_binary: #{e.message}"
  end
  @log.debug("sending #{i} rows to bigobject via avro")
end
to_s() click to toggle source
# File lib/fluent/plugin/out_bigobject_avro.rb, line 92
def to_s
  "pattern:#{pattern}, column_mapping:#{column_mapping}"
end

Private Instance Methods

parse_column_mapping(column_mapping_conf) click to toggle source
# File lib/fluent/plugin/out_bigobject_avro.rb, line 97
def parse_column_mapping(column_mapping_conf)
  mapping = {}
  column_mapping_conf.split(',').each { |column_map|
    key, column = column_map.strip.split(':', 2)
    column = key if column.nil?
    mapping[key] = column
  }
  mapping
end