class OdpsDatahub::OdpsTable

Public Class Methods

new(odpsConfig, projectName, tableName) click to toggle source
# File lib/fluent/plugin/odps/odps_table.rb, line 105
def initialize(odpsConfig, projectName, tableName)
  @mOdpsConfig = odpsConfig
  @mProjectName = projectName
  @mTableName = tableName
end

Public Instance Methods

addPartition(ptStr) click to toggle source

ptStr ex: ‘dt=20150805,hh=08,mm=24’ call add partiton if not exsits

# File lib/fluent/plugin/odps/odps_table.rb, line 158
def addPartition(ptStr)
  pts_array = ptStr.split(',')
  sqlstr = "ALTER TABLE " + @mProjectName + "." + @mTableName
  sqlstr = sqlstr +  " ADD IF NOT EXISTS" + " PARTITION ("
  pts_array.each { |pt|
    ptkv = pt.split('=')
    if ptkv.size != 2
      raise "invalid partition spec" + pt
    end
    sqlstr += ptkv[0] + '=' + "'" + ptkv[1] + "'" + ','
  }
  sqlstr = sqlstr[0..-2] + ");"
  taskName = "SQLAddPartitionTask"
  runSQL(taskName, sqlstr)
end
genJobXml(name, priority, comment, taskStr, runMode='sequence') click to toggle source

TODO support mulit task

# File lib/fluent/plugin/odps/odps_table.rb, line 201
def genJobXml(name, priority, comment, taskStr, runMode='sequence')
  job_xml = XmlTemplate.getJobXml(name, priority, comment, taskStr, runMode)
  return job_xml
end
getPartitionList() click to toggle source

get partitions and return an array like :[{“time”=>“2016”, “place”=>“china2”},{“time”=>“2015”, “place”=>“china”}]

# File lib/fluent/plugin/odps/odps_table.rb, line 112
def getPartitionList
  partitionList = Array.new
  url = "/projects/" + @mProjectName +"/tables/" +  @mTableName
  lastMarker = nil
  isEnd = false
  while !isEnd do
    header = Hash.new
    param = Hash.new
    param[$PARAM_CURR_PROJECT] = @mProjectName
    param[$PARAM_EXPECT_MARKER] = true
    param[$PARAM_PARTITIONS] = ""
    if lastMarker != nil
      param[$PARAM_MARKER] = lastMarker
    end
    conn = HttpConnection.new(@mOdpsConfig, header, param, url, "GET", "", true)
    res = conn.getResponse
    if res.code != "200"
      return partitionList
      #raise OdpsDatahubException.new($INVALID_ARGUMENT, "This not a partitioned table")
    end

    doc = REXML::Document.new(res.body.to_s)

    #parse partitions
    partitionsXml = doc.root.get_elements("Partition")
    partitionsXml.each { |partition|
      partitionInfo = Hash.new
      partition.elements.each { |column|
        partitionInfo[column.attributes["Name"]] = column.attributes["Value"]
      }
      partitionList.push(partitionInfo)
    }

    #get marker
    markerXml = doc.root.get_elements("Marker")
    if markerXml[0].text == nil
      isEnd = true
    elsif
      lastMarker = markerXml[0].text
    end
  end
  return partitionList
end
getTaskResult(instanceurl, name) click to toggle source
# File lib/fluent/plugin/odps/odps_table.rb, line 236
def getTaskResult(instanceurl, name)
  headers = Hash.new
  params = Hash.new
  params['result'] = ""
  res = nil

  conn = HttpConnection.new(@mOdpsConfig, headers, params, instanceurl, 'GET', "", true)
  res = conn.getResponse
  doc = REXML::Document.new(res.body.to_s)
  doc.root.elements.each('Tasks/Task') { |e|
    taskname = e.elements['Name'].text
    if taskname == name.to_s
      return e.elements['Result'].cdatas().to_s
    end
  }
end
runSQL(taskName, sqlstring) click to toggle source
# File lib/fluent/plugin/odps/odps_table.rb, line 174
def runSQL(taskName, sqlstring)
  task_xml = XmlTemplate.getTaskXml(taskName, sqlstring)

  job_xml = genJobXml('arbitriary_job', '9', "", task_xml)
  headers = Hash.new
  headers['Content-Type'] = 'application/xml'
  headers['Content-MD5'] = Digest::MD5.hexdigest(job_xml)
  headers['Content-Length'] = job_xml.size.to_s

  params = Hash.new

  url = "/projects/" + @mProjectName +"/instances"
  conn = HttpConnection.new(@mOdpsConfig, headers, params, url, 'POST', job_xml, true)

  res = conn.getResponse
  if res.code != '200'
    raise "Add partition failed with error" + res.code.to_s
  end

  if res.to_hash['Content-Length'] != "0" and not res.body.to_s.include?"Instance"
    raise res.body
  end

  waitForSQLComplete(res)
end
waitForSQLComplete(res) click to toggle source
# File lib/fluent/plugin/odps/odps_table.rb, line 206
def waitForSQLComplete(res)
  ret_headers = res.to_hash
  instanceurl =  "/projects/" + @mProjectName +"/instances" + "/" + ret_headers['location'][0].split('/')[-1]

  headers = Hash.new
  params = Hash.new
  params['taskstatus'] = ""
  res = nil

  while true
    conn = HttpConnection.new(@mOdpsConfig, headers, params, instanceurl, 'GET', "", true)
    res = conn.getResponse
    doc = REXML::Document.new(res.body.to_s)
    insStatus = doc.root.elements["Status"].text
    if insStatus == 'Terminated'
      break;
    elsif insStatus == 'Running' or insStatus == 'Suspended'
      sleep(5)
    end
  end

  doc.root.elements.each('Tasks/Task') { |e|
    status = e.elements['Status'].text
    name = e.elements['Name'].text
    if status.to_s != 'Success'
      raise getTaskResult(instanceurl, name.to_s)
    end
  }
end