class LogStash::Outputs::Datahub
Datahub
output plugin
Constants
- DatahubPackage
Attributes
shard_cursor[RW]
该值内部使用,不提供配置 分发shard的游标
Public Instance Methods
check_and_set_data(entry, field_type, index, event_map, column_name)
click to toggle source
检查并设置数据到entry中 如果解析数据异常,则数据落脏数据文件
# File lib/logstash/outputs/datahub.rb, line 176 def check_and_set_data(entry, field_type, index, event_map, column_name) data = event_map[column_name] begin if field_type == DatahubPackage.common.data.FieldType::STRING entry.setString(index, data.to_s) elsif field_type == DatahubPackage.common.data.FieldType::BIGINT entry.setBigint(index, java.lang.Long.parseLong(data.to_s)) elsif field_type == DatahubPackage.common.data.FieldType::DOUBLE entry.setDouble(index, java.lang.Double.parseDouble(data.to_s)) elsif field_type == DatahubPackage.common.data.FieldType::BOOLEAN entry.setBoolean(index, java.lang.Boolean.parseBoolean(data.to_s)) elsif field_type == DatahubPackage.common.data.FieldType::TIMESTAMP entry.setTimeStamp(index, java.lang.Long.parseLong(data.to_s)) else raise "Unknown schema type of data" end return true rescue => e @logger.error "Parse data: " + column_name + "[" + data + "] failed, " + e.message # 数据格式有异常,根据配置参数确定是否续跑 if !@dirty_data_continue @logger.error "Dirty data found, exit process now." puts "Dirty data found, exit process now." Process.exit(1) # 忽略的异常数据直接落文件 else write_as_dirty_data(event_map) end return false end end
check_params()
click to toggle source
# File lib/logstash/outputs/datahub.rb, line 138 def check_params() # 如果shard_id配置了,则检查该shard是否ok if !@shard_id.empty? valid = false for i in 0...@shards.size shard_entry = @shards[i] if shard_entry.getShardId() == @shard_id && shard_entry.getState() == DatahubPackage.model.ShardState::ACTIVE valid = true end end if (!valid) @logger.error "Config shard_id not exists or state not active, check your config" raise "Config shard_id not exists or state not active, check your config" end end # 检查shard_keys字段是否合法 if @shard_keys.size > 0 for i in 0...@shard_keys.size shard_key = @shard_keys[i] if !@schema.containsField(shard_key) @logger.error "Config shard_keys contains one or one more unknown field, check your config" raise "Config shard_keys contains one or one more unknown field, check your config" end end end # 配置了脏数据继续,必须指定脏数据文件 if @dirty_data_continue if @dirty_data_file.to_s.chomp.length == 0 raise "Dirty data file path can not be empty" end end end
get_active_shards(shards)
click to toggle source
# File lib/logstash/outputs/datahub.rb, line 226 def get_active_shards(shards) active_shards = [] for i in 0...shards.size entry = shards.get(i) if entry.getState() == DatahubPackage.model.ShardState::ACTIVE active_shards.push(entry) end end return active_shards end
get_next_shard_id()
click to toggle source
# File lib/logstash/outputs/datahub.rb, line 237 def get_next_shard_id() if !@shard_id.empty? return @shard_id # 否则轮询写入shard else idx = 0 @@shard_lock.synchronize { idx = @shard_cursor % @shard_count @shard_cursor = idx + 1 } shard_id = @shards[idx].getShardId() return shard_id end end
multi_receive(event_list)
click to toggle source
# File lib/logstash/outputs/datahub.rb, line 252 def multi_receive(event_list) retry_count = 0 begin entries = [] shard_id = get_next_shard_id() event_list.each do |event| if event == LogStash::SHUTDOWN return end event_map = event.to_hash entry = DatahubPackage.model.RecordEntry::new(@schema) entry.putAttribute("srcId", event_map["host"].to_s) entry.putAttribute("ts", event_map["@timestamp"].to_s) entry.putAttribute("version", event_map["@version"].to_s) entry.putAttribute("srcType", "log") is_data_valid = false for i in 0...@columns_size do column_name = @columnnames[i] column_type = @columntypes[i] value = event_map[column_name] if value != nil is_data_valid = check_and_set_data(entry, column_type, i, event_map, column_name) break if !is_data_valid end end if is_data_valid if @shard_keys.size > 0 hash_string = "" for i in 0...@shard_keys.size shard_key = @shard_keys[i] if event_map[shard_key] != nil hash_string += event_map[shard_key].to_s + "," end end hashed_value = java.lang.String.new(hash_string).hashCode() entry.setPartitionKey(hashed_value) else entry.setShardId(shard_id) end entries.push(entry) end end # puts "total: " + entries.size.to_s # 提交列表必须有数据 if entries.size > 0 put_result = @client.putRecords(@project_name, @topic_name, entries) if put_result.getFailedRecordCount() > 0 @logger.info "Put " + put_result.getFailedRecordCount().to_s + " records to datahub failed, total " + entries.size().to_s sleep @retry_interval entries = put_result.getFailedRecords() raise "Write to datahub failed: " + entries.size.to_s else @logger.info "Put data to datahub success, total " + entries.size().to_s end end rescue DatahubPackage.exception.DatahubServiceException => e @logger.error "Flush data exception: " + e.message #+ " " + e.backtrace.inspect.to_s # shard的状态改变,需要重新加载shard if e.getErrorCode() == "InvalidShardOperation" @shards = get_active_shards(@topic.listShard()) @shard_count = @shards.size() if @shard_count == 0 @logger.error "No active shard available, please check" end elsif e.getErrorCode() == nil sleep @retry_interval end retry_count += 1 @logger.warn "Now retry: " + retry_count.to_s retry rescue => e @logger.error "Flush data exception: " + e.message + " " + e.backtrace.inspect.to_s # 无限重试 if @retry_times < 0 retry_count += 1 @logger.warn "Now retry: " + retry_count.to_s # puts "Now retry..." sleep @retry_interval retry elsif @retry_times == 0 @logger.error "Retry not work, now exit" Process.exit(1) # 继续重试 elsif @retry_times > 0 retry_count += 1 if retry_count > @retry_times @logger.warn "Retry over: " + @retry_times.to_s Process.exit(1) end @logger.warn "Now retry..." sleep @retry_interval retry end end end
register()
click to toggle source
# File lib/logstash/outputs/datahub.rb, line 93 def register begin @account = DatahubPackage.auth.AliyunAccount::new(@access_id, @access_key) @conf = DatahubPackage.DatahubConfiguration::new(@account, @endpoint) if @compress_method == "deflate" || @compress_method == "lz4" @compression_format = DatahubPackage.model.compress.CompressionFormat.fromValue(@compress_method) @conf.setCompressionFormat(@compression_format) end @client = DatahubPackage.DatahubClient::new(@conf) @project = DatahubPackage.wrapper.Project::Builder.build(@project_name, @client) @topic = @project.getTopic(@topic_name) @shard_cursor = 0 @shards = get_active_shards(@topic.listShard()) @shard_count = @shards.size() result = @client.getTopic(@project_name, @topic_name) @schema = result.getRecordSchema() fields = @schema.getFields() @columns_size = fields.size @columnnames = [] for i in 0...@columns_size @columnnames.push(fields[i].getName()) end @columntypes = [] for i in 0...@columns_size @columntypes.push(fields[i].getType()) end # 前置校验参数 check_params() if @shard_count == 0 @logger.error "No active shard available, please check" raise "No active shard available, please check" end @logger.info "Init datahub success!" rescue => e @logger.error "Init failed!" + e.message + " " + e.backtrace.inspect.to_s raise e end end
write_as_dirty_data(event_amp)
click to toggle source
脏数据文件处理
# File lib/logstash/outputs/datahub.rb, line 209 def write_as_dirty_data(event_amp) dirty_file_part1_name = @dirty_data_file + ".part1" dirty_file_part2_name = @dirty_data_file + ".part2" # 加锁写入 @@file_lock.synchronize { dirty_file_part2 = File.open(dirty_file_part2_name, "a+") dirty_file_part2.puts(event_amp.to_s) dirty_file_part2.close if File.size(dirty_file_part2_name) > @dirty_data_file_max_size / 2 # .part1, .part2分别存储数据 # 旧数据落part1,新的数据落part2 FileUtils.mv(dirty_file_part2_name, dirty_file_part1_name) end } end