class Fluent::ODPSOutput
Attributes
Public Class Methods
# File lib/fluent/plugin/out_odps.rb, line 24 def initialize super require 'time' require_relative 'stream_client' @compressor = nil end
Public Instance Methods
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
# File lib/fluent/plugin/out_odps.rb, line 286 def configure(conf) super print "configure" # You can also refer raw parameter via conf[name]. @tables = [] conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(e.arg, log) te.configure(e) if e.arg.empty? log.warn "no table definition" else @tables << te end } if @tables.empty? raise ConfigError, "There is no <table>. <table> is required" end end
# File lib/fluent/plugin/out_odps.rb, line 357 def emit(tag, es, chain) super(tag, es, chain, format_tag(tag)) end
This method is called when an event reaches to Fluentd. Convert the event to a raw string.
# File lib/fluent/plugin/out_odps.rb, line 336 def format(tag, time, record) [tag, time, record].to_json + "\n" end
# File lib/fluent/plugin/out_odps.rb, line 365 def format_tag(tag) if @remove_tag_prefix tag.gsub(@remove_tag_prefix, '') else tag end end
This method is called when shutting down. Shutdown the thread and close sockets or files here.
# File lib/fluent/plugin/out_odps.rb, line 327 def shutdown super @tables.reject! do |te| te.close() end end
This method is called when starting. Open sockets or files here.
# File lib/fluent/plugin/out_odps.rb, line 309 def start super config = { :aliyun_access_id => @aliyun_access_id, :aliyun_access_key => @aliyun_access_key, :project => @project, :aliyun_odps_endpoint => @aliyun_odps_endpoint, :aliyun_odps_hub_endpoint => @aliyun_odps_hub_endpoint, } #初始化各个table object @tables.each { |te| te.init(config) } log.info "the table object size is "+@tables.size.to_s end
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
# File lib/fluent/plugin/out_odps.rb, line 347 def write(chunk) #foreach tables,choose table oject ,data = chunk.read @tables.each { |table| if table.pattern.match(chunk.key) log.info "Begin to import the data and the table_match is "+chunk.key return table.import(chunk) end } end