class Fluent::ODPSOutput::TableElement
TODO: Merge SQLInput’s TableElement
Attributes
client[R]
log[R]
partitionList[RW]
pattern[R]
writer[R]
Public Class Methods
new(pattern, log)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_odps.rb, line 63 def initialize(pattern, log) super() @pattern = MatchPattern.create(pattern) @log = log @writer = Array.new end
Public Instance Methods
close()
click to toggle source
# File lib/fluent/plugin/out_odps.rb, line 277 def close() @client.loadShard(0) end
configure(conf)
click to toggle source
初始化数据
Calls superclass method
# File lib/fluent/plugin/out_odps.rb, line 71 def configure(conf) super @format_proc = Proc.new { |record| values = [] @fields.split(',').each { |key| unless record.has_key?(key) @log.warn "the table "+@table+"'s "+key+" field not has match key" end values << record[key] } values } end
import(chunk)
click to toggle source
import data
# File lib/fluent/plugin/out_odps.rb, line 137 def import(chunk) records = [] partitions=Hash.new chunk.msgpack_each { |tag, time, data| begin #if partition is not empty unless @partition.blank? then #if partition has params in it if @partition.include? "=${" #split partition partition_arrays=@partition.split(',') partition_name='' i=1 for p in partition_arrays do #if partition is time formated if p.include? "strftime" key=p[p.index("{")+1, p.index(".strftime")-1-p.index("{")] partition_column=p[0, p.index("=")] timeFormat=p[p.index("(")+2, p.index(")")-3-p.index("(")] if data.has_key?(key) if time_format == nil partition_value=Time.parse(data[key]).strftime(timeFormat) else partition_value=Time.strptime(data[key], time_format).strftime(timeFormat) end if i==1 partition_name+=partition_column+"="+partition_value else partition_name+=","+partition_column+"="+partition_value end else raise "partition has no corresponding source key or the partition expression is wrong,"+data end else key=p[p.index("{")+1, p.index("}")-1-p.index("{")] partition_column=p[0, p.index("=")] if data.has_key?(key) partition_value=data[key] if i==1 partition_name+=partition_column+"="+partition_value else partition_name+=","+partition_column+"="+partition_value end else raise "partition has no corresponding source key or the partition expression is wrong,"+data end end i+=1 end else partition_name=@partition end if partitions[partition_name]==nil partitions[partition_name]=[] end partitions[partition_name] << @format_proc.call(data) else records << @format_proc.call(data) end rescue => e raise "Failed to format the data:"+e.message end } begin #multi thread sendThread = Array.new unless @partition.blank? then partitions.each { |k, v| @log.info k #if the partition is not exist, create one unless @partitionList.include?(k) @client.addPartition(k) @partitionList << k @log.info "add partition "+k end } for thread in 0..@thread_number-1 sendThread[thread] = Thread.start(thread) do |threadId| retryTime = 0 begin partitions.each { |k, v| sendCount = v.size/@thread_number restCount = 0 if threadId == @thread_number-1 restCount = v.size%@thread_number end @writer[threadId].write(v[sendCount*threadId..sendCount*(threadId+1)+restCount-1], k) @log.info "Successfully import "+(sendCount+restCount).to_s+" data to partition:"+k+",table:"+@table+" at threadId:"+threadId.to_s } rescue => e if retryTime > 0 @log.info "Fail to write, retry in 2sec. Error at threadId:"+threadId.to_s+" Msg:"+e.message sleep(2) retryTime -= 1 retry else raise e end end end end else @log.info records.size.to_s+" records to be sent" for thread in 0..@thread_number-1 sendThread[thread] = Thread.start(thread) do |threadId| retryTime = 0 #send data from sendCount*threadId to sendCount*(threadId+1)-1 sendCount = records.size/@thread_number restCount = 0 if threadId == @thread_number-1 restCount = records.size%@thread_number end begin @writer[threadId].write(records[sendCount*threadId..sendCount*(threadId+1)+restCount-1]) @log.info "Successfully import "+(sendCount+restCount).to_s+" data to table:"+@table+" at threadId:"+threadId.to_s rescue => e if retryTime > 0 @log.info "Fail to write, retry in 2sec. Error at threadId:"+threadId.to_s+" Msg:"+e.message sleep(2) retryTime -= 1 retry else raise e end end end end end for thread in 0..@thread_number-1 sendThread[thread].join end rescue => e # ignore other exceptions to use Fluentd retry raise "write records failed,"+e.message end end
init(config)
click to toggle source
# File lib/fluent/plugin/out_odps.rb, line 85 def init(config) odpsConfig = OdpsDatahub::OdpsConfig.new(config[:aliyun_access_id], config[:aliyun_access_key], config[:aliyun_odps_endpoint], config[:aliyun_odps_hub_endpoint], config[:project]) if @record_batch_size<=0 raise "the table "+ @table+"'s record_batch_size is must more than 0" end begin @client = OdpsDatahub::StreamClient.new(odpsConfig, config[:project], @table) @client.loadShard(@shard_number) allLoaded = false loadtime=0 while !allLoaded do count = 0 #get json like [{"ShardId": "0","State": "loaded"},{"ShardId": "1","State": "loaded"}] @client.getShardStatus.each { |shard| if shard["State"] != "loaded" sleep(5) loadtime+=5 break else count += 1 end if count == @shard_number allLoaded = true @log.info "All shareds are loaded successfully" end if loadtime>=300 raise "Load shared timeout" end } end for i in 0..@thread_number-1 @writer[i] = @client.createStreamArrayWriter() end partitionMaps=@client.getPartitionList @partitionList=[] for map in partitionMaps do partitionName='' map.each { |k, v| partitionName+=k+"="+v+"," } @partitionList<<partitionName.chomp(",") end rescue => e raise "loadShard failed,"+e.message end end