module Fluent::Plugin::FestivalProxy

Public Instance Methods

add_location(result, resource) click to toggle source
# File lib/fluent/plugin/festival_proxy.rb, line 143
def add_location(result, resource)
  if resource.require_location
    log.debug "get_data (location): request #{get_data_request(resource_path(resource.path))}, #{get_data_header.inspect}"
    get_sensor_res = @https.get(get_data_request(resource_path(resource.path)), get_data_header)
    return result if !error_handler(get_sensor_res, "get_data failed.")
    log.debug "get_data: #{get_sensor_res.body}"
    sensor = JSON.parse(get_sensor_res.body)
    # TODO: arbitrary geographicArea type should be supported
    return result.merge({
      "location": {
        "lon": JSON.parse(sensor["location"]["geographicArea"])["coordinates"][0],
        "lat": JSON.parse(sensor["location"]["geographicArea"])["coordinates"][1]
      }
    })
  elsif !resource.fixed_location.nil?
    log.debug "set fixed_location: #{resource.fixed_location.inspect}"
    return result.merge({
      "location": {
        "lon": resource.fixed_location[0].to_f,
        "lat": resource.fixed_location[1].to_f
      }
    })
  end
  return result
end
create_session() click to toggle source
# File lib/fluent/plugin/festival_proxy.rb, line 65
def create_session
  return @session if valid_session?
  @session_req ||= create_session_request
  session_res = @https.request(@session_req)
  return nil if !error_handler(session_res, 'create_session failed.')
  # access_token is returned as follows
  # {"access_token":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX","expires_in":86400}
  @session = JSON.parse(session_res.body)
  # the expiration time is set to 5 minutes before expiration
  @session_expires_in = Time.now + @session["expires_in"] - 5 * 60
end
create_session_request() click to toggle source
# File lib/fluent/plugin/festival_proxy.rb, line 58
def create_session_request
  session_req = Net::HTTP::Post.new(@uri + '/festival/eaas/security/token')
  session_req.body = {email: @email, password: @password}.to_json
  session_req.content_type = 'application/json'
  session_req
end
delete_session() click to toggle source

TODO: to be implemented

# File lib/fluent/plugin/festival_proxy.rb, line 83
def delete_session
  #return if !valid_session?
  #del_session_res = @https.request(delete_session_request)
  #error_handler(del_session_res, 'delete_session failed.')
end
delete_session_request() click to toggle source

TODO: to be implemented

# File lib/fluent/plugin/festival_proxy.rb, line 78
def delete_session_request
  #Net::HTTP::Delete.new(@uri + "/session/#{@session_key}")
end
error_handler(response, message) click to toggle source
# File lib/fluent/plugin/festival_proxy.rb, line 28
def error_handler(response, message)
  if response.code != "200"
    log.error error: message
    log.debug "code: #{response.code}"
    log.debug "message: #{response.message}"
    log.debug "body: #{response.body}"
    return false
  end
  if @api_type == "sensinact"
    body = JSON.parse(response.body)
    if body["statusCode"] != 200
      log.error error: message
      log.debug "code: #{body["statusCode"]}"
      log.debug "body: #{response.body}"
      return false
    end
  end
  return true
end
get_data() click to toggle source
# File lib/fluent/plugin/festival_proxy.rb, line 169
def get_data
  if !valid_session? && @api_type == 'festival'
    return nil if create_session.nil?
    log.debug "session #{@session} created."
  end
  data = []
  #require 'pry-byebug'
  log.debug "@resources: #{@resources.inspect}"
  @resources.each do |resource|
    case resource_type(resource.path)
    when "current_data" then
      log.debug "get_data: request #{get_data_request(resource.path)}, #{get_data_header.inspect}"
      get_data_res = @https.get(get_data_request(resource.path), get_data_header)
      next if !error_handler(get_data_res,"get_data failed.")
      log.debug "get_data: #{get_data_res.body}"
      result = 
        case @api_type
        when 'festival'
          {
            "resourceName": resource.path,
            "dataValue": JSON.parse(get_data_res.body)["dataValue"]
          }
        when 'sensinact'
          {
            "resourceName": resource.path,
            "dataValue": JSON.parse(get_data_res.body)["response"]["value"],
            "timestamp": (Time.at(JSON.parse(get_data_res.body)["response"]["timestamp"].to_f/1000, JSON.parse(get_data_res.body)["response"]["timestamp"].to_f%1000*1000) if @use_sensor_time)
          }.reject {|k, v| v.nil?}
        else
          return nil
        end
      data << add_location(result, resource)
    when "historical_data" then
      log.error "historical_data is not supported yet"
      next
    else
      log.error "The other resource type is not supported yet"
      log.error "resource_type: #{resource_type(resource.path)}"
      next
    end
  end
  if data.size > 1
    return data
  end
  data[0]
end
get_data_header() click to toggle source
# File lib/fluent/plugin/festival_proxy.rb, line 115
def get_data_header
  header = {
    "Accept": "application/json"
  }
  if @api_type == 'festival'
    return header.merge("X-Auth-Token": @session["access_token"])
  end
  header
end
get_data_request(path) click to toggle source
type: "current_data", "historical_data"

def target_path(type)

if !@aggregator_id.nil? && !@testbed_id.nil? && !@resource_id.nil?
  return "/festival/eaas/experimentation/aggregators/#{@aggregator_id}/testbeds/#{@testbed_id}/resources/#{@resource_id}/#{type}"
else
  raise Fluent::ConfigError, "aggregator_id, testbed_id and resource_id must be specified."
end

end

# File lib/fluent/plugin/festival_proxy.rb, line 98
def get_data_request(path)
  #get_data_req = @uri + target_path(type)

  get_data_req = @uri +
    case @api_type
    when 'sensinact' 
      Pathname("/sensinact/providers/#{WEBrick::HTTPUtils.escape(path)}").cleanpath.to_s
    else # default (festival)
      Pathname("/festival/eaas/experimentation/#{WEBrick::HTTPUtils.escape(path)}").cleanpath.to_s
    end
  #get_data_req.query = URI.encode_www_form(get_data_params)
  log.debug "#{get_data_req}"
  # currently time window is automatically updated
  #@from = Time.now.iso8601
  get_data_req
end
get_historical_data() click to toggle source

curl –request GET \ –url 'sensinact-cea.ddns.net:8099/festival/driver/testbeds/jose/resources/hyogo001_barometer-info-valueasfloat/historical_data?startDate=2017-03-01T00%3A05%3A55Z' \ –header 'cache-control: no-cache' \ –header 'content-type: application/json' \ –header 'postman-token: 6bceac9e-5d14-3c9e-d34c-acbd4922ebfc' \ –header 'userid: me' \ –data '{“options”:{“rows”:20}}'

# File lib/fluent/plugin/festival_proxy.rb, line 223
def get_historical_data
  # TODO: to be implemented
end
resource_path(path) click to toggle source
# File lib/fluent/plugin/festival_proxy.rb, line 134
def resource_path(path)
  case @api_type
  when 'sensinact'
    path
  else
    Pathname(path).dirname.to_s
  end
end
resource_type(path) click to toggle source
# File lib/fluent/plugin/festival_proxy.rb, line 125
def resource_type(path)
  case @api_type
  when 'sensinact'
    "current_data"
  else
    Pathname(path).basename.to_s
  end
end
shutdown_proxy() click to toggle source
# File lib/fluent/plugin/festival_proxy.rb, line 22
def shutdown_proxy
  log.debug "shutdown_proxy #{@session.inspect}"
  delete_session
  @https.finish() if @https.active?
end
start_proxy() click to toggle source
# File lib/fluent/plugin/festival_proxy.rb, line 13
def start_proxy
  log.debug "start festival proxy #{@api_uri}"

  @uri = URI.parse(@api_uri)
  @https = Net::HTTP.new(@uri.host, @uri.port)
  @https.use_ssl = (@uri.scheme == 'https')
  @session = nil
end
valid_session?() click to toggle source
# File lib/fluent/plugin/festival_proxy.rb, line 48
def valid_session?
  # TODO validate @session by FESTIVAL EaaS API
  if !@session.nil?
    if Time.now < @session_expires_in
      return true
    end
  end
  return false
end