class OdpsDatahub::StreamClient

Attributes

mOdpsConfig[R]
mOdpsTable[R]
mOdpsTableSchema[R]
mProject[R]
mTable[R]

Public Class Methods

new(odpsConfig, project, table) click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 30
def initialize(odpsConfig, project, table)
  @mOdpsConfig = odpsConfig
  @mProject = project
  @mTable = table
  @mShards = Array.new
  if @mProject == nil or @mProject == ""
    @mProject = @mOdpsConfig.defaultProjectName
  end
  @mOdpsTable = OdpsTable.new(@mOdpsConfig, @mProject, @mTable)
  header = Hash.new
  param = Hash.new
  param[$PARAM_QUERY] = "meta"
  conn = HttpConnection.new(@mOdpsConfig, header, param, getResource, "GET")
  res = conn.getResponse
  jsonTableMeta = JSON.parse(res.body)
  if res.code != "200"
    raise OdpsDatahubException.new(jsonTableMeta["Code"], "initialize failed because " + jsonTableMeta["Message"])
  end
  @mOdpsTableSchema = OdpsTableSchema.new(jsonTableMeta["Schema"])
end

Public Instance Methods

addPartition(ptStr) click to toggle source

ptStr ex: ‘dt=20150805,hh=08,mm=24’ call add partiton if not exsits

# File lib/fluent/plugin/stream_client.rb, line 58
def addPartition(ptStr)
  @mOdpsTable.addPartition(ptStr)
end
createStreamArrayWriter(shardId = nil) click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 70
def createStreamArrayWriter(shardId = nil)
  StreamWriter.new(@mOdpsConfig, @mProject, @mTable,getResource, shardId,  @mOdpsTableSchema)
end
createStreamWriter(shardId = nil) click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 66
def createStreamWriter(shardId = nil)
  StreamWriter.new(@mOdpsConfig, @mProject, @mTable,getResource, shardId)
end
getOdpsTableSchema() click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 62
def getOdpsTableSchema
  return @mOdpsTableSchema
end
getPartitionList() click to toggle source

get partitions and return an array like :[{“time”=>“2016”, “place”=>“china2”},{“time”=>“2015”, “place”=>“china”}]

# File lib/fluent/plugin/stream_client.rb, line 52
def getPartitionList
  @mOdpsTable.getPartitionList
end
getShardStatus() click to toggle source

return json like [{“ShardId”: “0”,“State”: “loaded”},{“ShardId”: “1”,“State”: “loaded”}]

# File lib/fluent/plugin/stream_client.rb, line 75
def getShardStatus
  header = Hash.new
  param = Hash.new
  param[$PARAM_CURR_PROJECT] = @mProject
  param[$PARAM_SHARD_STATUS] = ""

  conn = HttpConnection.new(@mOdpsConfig, header, param, getResource + "/shards", "GET")
  res = conn.getResponse
  json_obj = JSON.parse(res.body)
  if res.code != "200"
    raise OdpsDatahubException.new(json_obj["Code"], "getShardStatus failed because " + json_obj["Message"])
  end
  return json_obj["ShardStatus"]
end
loadShard(idx) click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 90
def loadShard(idx)
  if idx < 0
    raise OdpsDatahubException.new($INVALID_ARGUMENT, "loadShard num invalid")
  end
  header = Hash.new
  param = Hash.new
  param[$PARAM_CURR_PROJECT] = @mProject
  param[$PARAM_SHARD_NUMBER] = idx
  conn = HttpConnection.new(@mOdpsConfig, header, param, getResource + "/shards", "POST")
  res = conn.getResponse
  if res.code != "200"
    json_obj = JSON.parse(res.body)
    raise OdpsDatahubException.new(json_obj["Code"], "loadShard failed because " + json_obj["Message"])
  end
end

Protected Instance Methods

getResource() click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 107
def getResource
  return "/projects/" + @mProject + "/tables/" + @mTable
end