class OdpsDatahub::Serializer

Public Instance Methods

encodeBool(value) click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 32
def encodeBool(value)
  [value ? 1 : 0].pack('C')
end
encodeDataTime(value) click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 59
def encodeDataTime(value)
  self.encodeSInt64(value)
end
encodeDouble(value) click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 36
def encodeDouble(value)
  [value].pack('E')
end
encodeFixed32(value) click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 76
def encodeFixed32(value)
  [value].pack('V')
end
encodeFixed64(value) click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 71
def encodeFixed64(value)
  # we don't use 'Q' for pack/unpack. 'Q' is machine-dependent.
  [value & 0xffff_ffff, value >> 32].pack('VV')
end
encodeFixedString(value) click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 80
def encodeFixedString(value)
  [value].pack('V')
end
encodeSInt64(value) click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 40
def encodeSInt64(value)
  if value >= 0
    ::Protobuf::Field::VarintField.encode(value << 1)
  else
    ::Protobuf::Field::VarintField.encode(~(value << 1))
  end
end
encodeString(value) click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 63
def encodeString(value)
  value_to_encode = value.dup
  value_to_encode.encode!(::Protobuf::Field::StringField::ENCODING, :invalid => :replace, :undef => :replace, :replace => "")
  value_to_encode.force_encoding(::Protobuf::Field::BytesField::BYTES_ENCODING)
  string_bytes = ::Protobuf::Field::VarintField.encode(value_to_encode.size)
  string_bytes << value_to_encode
end
encodeUInt32(value) click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 48
def encodeUInt32(value)
  return [value].pack('C') if value < 128
  bytes = []
  until value == 0
    bytes << (0x80 | (value & 0x7f))
    value >>= 7
  end
  bytes[-1] &= 0x7f
  bytes.pack('C*')
end
serialize(upStream, recordList) click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 89
def serialize(upStream, recordList)
  crc32cPack = ::Digest::CRC32c.new
  if recordList.is_a?Array
    recordList.each { |record|
      crc32cRecord = ::Digest::CRC32c.new
      schema = OdpsTableSchema.new
      schema = record.getTableSchema
      schema.mCols.each { | col |
        cellValue = record.getValue(col.mIdx)
        if cellValue == nil
          next
        end
        crc32cRecord.update(encodeFixed32(col.mIdx + 1))
        case col.mType
          when $ODPS_BIGINT
            crc32cRecord.update(encodeFixed64(cellValue))
            writeTag(col.mIdx + 1, ::Protobuf::WireType::VARINT, upStream)
            upStream.write(encodeSInt64(cellValue))
          when $ODPS_DOUBLE
            crc32cRecord.update(encodeDouble(cellValue))
            writeTag(col.mIdx + 1, ::Protobuf::WireType::FIXED64, upStream)
            upStream.write(encodeDouble(cellValue))
          when $ODPS_BOOLEAN
            crc32cRecord.update(encodeBool(cellValue))
            writeTag(col.mIdx + 1, ::Protobuf::WireType::VARINT, upStream)
            upStream.write(encodeBool(cellValue))
          when $ODPS_DATETIME
            crc32cRecord.update(encodeFixed64(cellValue))
            writeTag(col.mIdx + 1, ::Protobuf::WireType::VARINT, upStream)
            upStream.write(encodeDataTime(cellValue))
          when $ODPS_STRING
            crc32cRecord.update(cellValue)
            writeTag(col.mIdx + 1, ::Protobuf::WireType::LENGTH_DELIMITED, upStream)
            upStream.write(encodeString(cellValue))
          else
            raise OdpsDatahubException.new($INVALID_ARGUMENT, "invalid mType")
        end
      }
      recordCrc = crc32cRecord.checksum.to_i
      writeTag($TUNNEL_END_RECORD, ::Protobuf::WireType::VARINT, upStream)
      upStream.write(encodeUInt32(recordCrc))
      crc32cPack.update(encodeFixed32(recordCrc))
    }
    writeTag($TUNNEL_META_COUNT, ::Protobuf::WireType::VARINT, upStream)
    upStream.write(encodeSInt64(recordList.size))
    writeTag($TUNNEL_META_CHECKSUM, ::Protobuf::WireType::VARINT, upStream)
    upStream.write(encodeUInt32(crc32cPack.checksum))
  else
    raise OdpsDatahubException.new($INVALID_ARGUMENT, "param must be a array")
  end
end
writeTag(idx, type, stream) click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 84
def writeTag(idx, type, stream)
  key = (idx << 3) | type
  stream << ::Protobuf::Field::VarintField.encode(key)
end