class LogStash::Outputs::Datahub

Datahub output plugin

Constants

DatahubPackage

Attributes

shard_cursor[RW]

该值内部使用,不提供配置 分发shard的游标

Public Instance Methods

check_and_set_data(entry, field_type, index, event_map, column_name) click to toggle source

检查并设置数据到entry中 如果解析数据异常,则数据落脏数据文件

# File lib/logstash/outputs/datahub.rb, line 176
def check_and_set_data(entry, field_type, index, event_map, column_name)
  data = event_map[column_name]
  begin
    if field_type == DatahubPackage.common.data.FieldType::STRING
      entry.setString(index, data.to_s)
    elsif field_type == DatahubPackage.common.data.FieldType::BIGINT
      entry.setBigint(index, java.lang.Long.parseLong(data.to_s))
    elsif field_type == DatahubPackage.common.data.FieldType::DOUBLE
      entry.setDouble(index, java.lang.Double.parseDouble(data.to_s))
    elsif field_type == DatahubPackage.common.data.FieldType::BOOLEAN
      entry.setBoolean(index, java.lang.Boolean.parseBoolean(data.to_s))
    elsif field_type == DatahubPackage.common.data.FieldType::TIMESTAMP
      entry.setTimeStamp(index, java.lang.Long.parseLong(data.to_s))
    else
      raise "Unknown schema type of data"
    end
    return true
  rescue => e
    @logger.error "Parse data: " + column_name + "[" + data + "] failed, " + e.message
    # 数据格式有异常,根据配置参数确定是否续跑
    if !@dirty_data_continue
      @logger.error "Dirty data found, exit process now."
      puts "Dirty data found, exit process now."
      Process.exit(1)
      # 忽略的异常数据直接落文件
    else
      write_as_dirty_data(event_map)
    end
    return false
  end
end
check_params() click to toggle source
# File lib/logstash/outputs/datahub.rb, line 138
def check_params()
  # 如果shard_id配置了,则检查该shard是否ok
  if !@shard_id.empty?
    valid = false
    for i in 0...@shards.size
      shard_entry = @shards[i]
      if shard_entry.getShardId() == @shard_id && shard_entry.getState() == DatahubPackage.model.ShardState::ACTIVE
        valid = true
      end
    end
    if (!valid)
      @logger.error "Config shard_id not exists or state not active, check your config"
      raise "Config shard_id not exists or state not active, check your config"
    end
  end

  # 检查shard_keys字段是否合法
  if @shard_keys.size > 0
    for i in 0...@shard_keys.size
      shard_key = @shard_keys[i]
      if !@schema.containsField(shard_key)
        @logger.error "Config shard_keys contains one or one more unknown field, check your config"
        raise "Config shard_keys contains one or one more unknown field, check your config"
      end
    end
  end

  # 配置了脏数据继续,必须指定脏数据文件
  if @dirty_data_continue
    if @dirty_data_file.to_s.chomp.length == 0
      raise "Dirty data file path can not be empty"
    end
  end

end
get_active_shards(shards) click to toggle source
# File lib/logstash/outputs/datahub.rb, line 226
def get_active_shards(shards)
  active_shards = []
  for i in 0...shards.size
    entry = shards.get(i)
    if entry.getState() == DatahubPackage.model.ShardState::ACTIVE
      active_shards.push(entry)
    end
  end
  return active_shards
end
get_next_shard_id() click to toggle source
# File lib/logstash/outputs/datahub.rb, line 237
def get_next_shard_id()
  if !@shard_id.empty?
    return @shard_id
    # 否则轮询写入shard
  else
    idx = 0
    @@shard_lock.synchronize {
      idx = @shard_cursor % @shard_count
      @shard_cursor = idx + 1
    }
    shard_id = @shards[idx].getShardId()
    return shard_id
  end
end
multi_receive(event_list) click to toggle source
# File lib/logstash/outputs/datahub.rb, line 252
def multi_receive(event_list)
  retry_count = 0
  begin
    entries = []
    shard_id = get_next_shard_id()

    event_list.each do |event|
      if event == LogStash::SHUTDOWN
        return
      end
      event_map = event.to_hash

      entry = DatahubPackage.model.RecordEntry::new(@schema)
      entry.putAttribute("srcId", event_map["host"].to_s)
      entry.putAttribute("ts", event_map["@timestamp"].to_s)
      entry.putAttribute("version", event_map["@version"].to_s)
      entry.putAttribute("srcType", "log")

      is_data_valid = false
      for i in 0...@columns_size do
        column_name = @columnnames[i]
        column_type = @columntypes[i]
        value = event_map[column_name]
        if value != nil
          is_data_valid = check_and_set_data(entry, column_type, i, event_map, column_name)
          break if !is_data_valid
        end
      end

      if is_data_valid
        if @shard_keys.size > 0
          hash_string = ""
          for i in 0...@shard_keys.size
            shard_key = @shard_keys[i]
            if event_map[shard_key] != nil
              hash_string += event_map[shard_key].to_s + ","
            end
          end
          hashed_value = java.lang.String.new(hash_string).hashCode()
          entry.setPartitionKey(hashed_value)
        else
          entry.setShardId(shard_id)
        end
        entries.push(entry)
      end
    end

    # puts "total: " + entries.size.to_s

    # 提交列表必须有数据
    if entries.size > 0
      put_result = @client.putRecords(@project_name, @topic_name, entries)
      if put_result.getFailedRecordCount() > 0
        @logger.info "Put " + put_result.getFailedRecordCount().to_s + " records to datahub failed, total " + entries.size().to_s
        sleep @retry_interval
        entries = put_result.getFailedRecords()
        raise "Write to datahub failed: " + entries.size.to_s
      else
        @logger.info "Put data to datahub success, total " + entries.size().to_s
      end
    end

  rescue DatahubPackage.exception.DatahubServiceException => e
    @logger.error "Flush data exception: " + e.message #+ " " + e.backtrace.inspect.to_s
    # shard的状态改变,需要重新加载shard
    if e.getErrorCode() == "InvalidShardOperation"
      @shards = get_active_shards(@topic.listShard())
      @shard_count = @shards.size()

      if @shard_count == 0
        @logger.error "No active shard available, please check"
      end
    elsif e.getErrorCode() == nil
      sleep @retry_interval
    end
    retry_count += 1
    @logger.warn "Now retry: " + retry_count.to_s
    retry
  rescue => e
    @logger.error "Flush data exception: " + e.message + " " + e.backtrace.inspect.to_s

    # 无限重试
    if @retry_times < 0
      retry_count += 1
      @logger.warn "Now retry: " + retry_count.to_s
      # puts "Now retry..."
      sleep @retry_interval
      retry
    elsif @retry_times == 0
      @logger.error "Retry not work, now exit"
      Process.exit(1)
      # 继续重试
    elsif @retry_times > 0
      retry_count += 1
      if retry_count > @retry_times
        @logger.warn "Retry over: " + @retry_times.to_s
        Process.exit(1)
      end
      @logger.warn "Now retry..."
      sleep @retry_interval
      retry
    end
  end
end
register() click to toggle source
# File lib/logstash/outputs/datahub.rb, line 93
def register
  begin
    @account = DatahubPackage.auth.AliyunAccount::new(@access_id, @access_key)
    @conf = DatahubPackage.DatahubConfiguration::new(@account, @endpoint)
    if @compress_method == "deflate" || @compress_method == "lz4"
      @compression_format = DatahubPackage.model.compress.CompressionFormat.fromValue(@compress_method)
      @conf.setCompressionFormat(@compression_format)
    end

    @client = DatahubPackage.DatahubClient::new(@conf)
    @project = DatahubPackage.wrapper.Project::Builder.build(@project_name, @client)
    @topic = @project.getTopic(@topic_name)
    @shard_cursor = 0

    @shards = get_active_shards(@topic.listShard())
    @shard_count = @shards.size()

    result = @client.getTopic(@project_name, @topic_name)
    @schema = result.getRecordSchema()
    fields = @schema.getFields()
    @columns_size = fields.size
    @columnnames = []
    for i in 0...@columns_size
      @columnnames.push(fields[i].getName())
    end
    @columntypes = []
    for i in 0...@columns_size
      @columntypes.push(fields[i].getType())
    end

    # 前置校验参数
    check_params()

    if @shard_count == 0
      @logger.error "No active shard available, please check"
      raise "No active shard available, please check"
    end

    @logger.info "Init datahub success!"
  rescue => e
    @logger.error "Init failed!"  + e.message + " " + e.backtrace.inspect.to_s
    raise e
  end
end
write_as_dirty_data(event_amp) click to toggle source

脏数据文件处理

# File lib/logstash/outputs/datahub.rb, line 209
def write_as_dirty_data(event_amp)
  dirty_file_part1_name = @dirty_data_file + ".part1"
  dirty_file_part2_name = @dirty_data_file + ".part2"

  # 加锁写入
  @@file_lock.synchronize {
    dirty_file_part2 = File.open(dirty_file_part2_name, "a+")
    dirty_file_part2.puts(event_amp.to_s)
    dirty_file_part2.close
    if File.size(dirty_file_part2_name) > @dirty_data_file_max_size / 2
      # .part1, .part2分别存储数据
      # 旧数据落part1,新的数据落part2
      FileUtils.mv(dirty_file_part2_name, dirty_file_part1_name)
    end
  }
end