class Fluent::BigObjectOutput_AVRO::TableElement
Attributes
mpattern[R]
Public Class Methods
new(log, bo_hostname, bo_port)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigobject_avro.rb, line 28 def initialize(log, bo_hostname, bo_port) super() @log = log @bo_hostname = bo_hostname @bo_port = bo_port @bo_url="http://#{@bo_hostname}:#{@bo_port}/cmd" end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigobject_avro.rb, line 36 def configure(conf) super @avro_schema = Avro::Schema.parse(File.open(@schema_file, "rb").read) @avro_writer = Avro::IO::DatumWriter.new(@avro_schema) @mpattern = Fluent::MatchPattern.create(pattern) @mapping = (@column_mapping==nil)? nil:parse_column_mapping(@column_mapping) @log.info("column mapping for #{pattern} - #{@mapping}") @format_proc = Proc.new { |record| if (@mapping==nil) record else new_record = {} @mapping.each { |k, c| new_record[c] = record[k] } new_record end } end
send_binary(chunk)
click to toggle source
Send data to Bigobject using binary AVRO
# File lib/fluent/plugin/out_bigobject_avro.rb, line 59 def send_binary(chunk) buffer = StringIO.new() dw = Avro::DataFile::Writer.new(buffer, @avro_writer, @avro_schema) i=0 chunk.msgpack_each { |tag, time, data| data = @format_proc.call(data) dw<<data i+=1 } dw.flush begin socket = TCPSocket.open(@bo_hostname, @bo_port) begin #timeout=60 opt = [1, 60].pack('I!I!') # { int l_onoff; int l_linger; } socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [60, 0].pack('L!L!') # struct timeval socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) socket.write(buffer.string) ensure socket.close end rescue Exception => e @log.error(e.message) raise "Failed to send_binary: #{e.message}" end @log.debug("sending #{i} rows to bigobject via avro") end
to_s()
click to toggle source
# File lib/fluent/plugin/out_bigobject_avro.rb, line 92 def to_s "pattern:#{pattern}, column_mapping:#{column_mapping}" end
Private Instance Methods
parse_column_mapping(column_mapping_conf)
click to toggle source
# File lib/fluent/plugin/out_bigobject_avro.rb, line 97 def parse_column_mapping(column_mapping_conf) mapping = {} column_mapping_conf.split(',').each { |column_map| key, column = column_map.strip.split(':', 2) column = key if column.nil? mapping[key] = column } mapping end