class Nifi

Constants

DEFAULT_ACYNC
DEFAULT_DEBUG
DEFAULT_HOST
DEFAULT_PORT
DEFAULT_SCHEMA

Public Class Methods

new(*args) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 35
def initialize(*args)

  args = args.reduce Hash.new, :merge

  @@schema        = args[:schema] ? args[:schema] : DEFAULT_SCHEMA
  @@host          = args[:host] ? args[:host] : DEFAULT_HOST
  @@port          = args[:port] ? args[:port] : DEFAULT_PORT
  @@base_url      = @@schema + '://' + @@host + ':' + @@port.to_s + '/nifi-api'
  @@debug         = DEFAULT_DEBUG
  @@async         = DEFAULT_ACYNC
  @@sdk_name      = 'ruby'
  @@sdk_version   = NifiSdkRuby::VERSION
  @@client_id     = SecureRandom.uuid
  @@cert          = args[:cert] ? args[:cert] : nil
  @@cert_key      = args[:cert_key] ? args[:cert_key] : nil
  @@cert_password = args[:cert_password] ? args[:cert_key] : nil
end

Private Class Methods

exists() click to toggle source
# File lib/nifi_sdk_ruby.rb, line 418
def self.exists
  base_url = @@base_url + '/resources'
  res = self.http_client(base_url)
  return res['resources']
end
get(resource) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 424
def self.get(resource)
  base_url = @@base_url + resource
  self.http_client(base_url)
end
http_client(url, method = 'GET', params = nil, filename = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 429
def self.http_client(url, method = 'GET', params = nil, filename = nil)
  c = Curl::Easy.new
  #c.http_auth_types              = :basic
  #c.username                     = @@api_key
  #c.password                     = ''
  c.url                         = url
  c.useragent                   = @@sdk_name + '_' << @@sdk_version
  c.headers['NIFI-SDK-Name']    = @@sdk_name
  c.headers['NIFI-SDK-Version'] = @@sdk_version
  c.ssl_verify_peer             = false
  c.ssl_verify_host             = false

  if @@schema == 'https' and @@cert
    c.cert  = @@cert
  end
  if @@schema == 'https' and @@cert_key
    c.cert_key = @@cert_key
  end
  if @@schema == 'https' and @@cert_password
    c.certpassword = @@cert_password
  end

  #c.verbose                     = true

  case method
    when 'GET'
      c.get
    when 'POSTRAW'
      c.headers['Content-Type'] = 'application/json'
      c.post(params)
    when 'POST'
      c.multipart_form_post = true
      c.post(params)
    when 'PUT'
      c.headers['Content-Type'] = 'application/json'
      c.put(params)
    when 'DELETE'
      c.delete
    else
      raise ArgumentError.new('HTTP method ' << method << ' not supported.')
  end

  if c.response_code.to_s.match(/20./) and not c.body_str.empty?
    begin
      JSON.parse(c.body_str)
    rescue
      if c.content_type == 'application/xml'
        JSON.parse(Hash.from_xml(c.body_str).to_json)
      end
    end
  else
    puts c.response_code.to_s
    puts c.body_str
  end
end

Public Instance Methods

create_process_group(*args) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 140
def create_process_group(*args)
  args = args.reduce Hash.new, :merge

  if args[:name].nil?
    raise ArgumentError.new('name params is mandatory.')
  end
  name = args[:name].to_s
  if self.process_group_by_name? name
    raise ArgumentError.new('The process group ' << name << ' already exists')
  end

  params = '{"revision":{"clientId":"' << @@client_id + '","version":0},"component":{"name":"' << name <<
      '","position":{"x":274.54776144527517,"y":-28.886681059739686}}}'

  process_group = args[:id] ? args[:id] : 'root'
  base_url = @@base_url + '/process-groups/' << process_group << '/process-groups'
  self.class.http_client(base_url, 'POSTRAW', params)
end
create_template_instance(*args) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 292
def create_template_instance(*args)
  args = args.reduce Hash.new, :merge

  if args[:id].nil? and args[:name].nil?
    raise ArgumentError.new('either specify id of the template or it\'s name ')
  end

  if args[:name]
    raise StandardError.new('Could not find template called ' << args[:name]) unless template_by_name?(args[:name])
    id = get_template_by_name(args[:name])[0][0]
  else
    raise StandardError.new('Could not find template with id ' << args[:id]) unless template_by_id?(args[:id])
    id = args[:id]
  end

  originX = args[:originX] ? args[:originX].to_s : '0.0'
  originY = args[:originY] ? args[:originY].to_s : '0.0'
  process_group = args[:process_group_id] ? args[:process_group_id] : 'root'
  params = '{"templateId": "' << id << '", "originX": ' << originX << ', "originY": ' << originY << '}'
  base_url = @@base_url + '/process-groups/' << process_group << '/template-instance'
  self.class.http_client(base_url, 'POSTRAW', params)
end
delete_process_group(id = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 159
def delete_process_group(id = nil)

  if id.nil?
    raise ArgumentError.new('id is mandatory.')
  end

  base_url = @@base_url + '/process-groups/' << id << '?clientId=' << @@client_id + '&version=1'
  self.class.http_client(base_url, 'DELETE')
end
delete_template(id = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 355
def delete_template(id = nil)

  if id.nil?
    raise ArgumentError.new('id is mandatory.')
  end

  base_url = @@base_url + '/templates/' << id
  self.class.http_client(base_url, 'DELETE')
end
get_api_key() click to toggle source
# File lib/nifi_sdk_ruby.rb, line 85
def get_api_key
  @@api_key
end
get_async() click to toggle source
# File lib/nifi_sdk_ruby.rb, line 81
def get_async
  @@async
end
get_base_url() click to toggle source
# File lib/nifi_sdk_ruby.rb, line 97
def get_base_url
  @@base_url
end
get_conection_status(id = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 112
def get_conection_status(id = nil)

  if id.nil?
    raise ArgumentError.new('name params is mandatory.')
  end

  base_url = @@base_url + "/flow/connections/#{id}/status"
  self.class.http_client(base_url)
end
get_conection_status_history(id = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 122
def get_conection_status_history(id = nil)

  if id.nil?
    raise ArgumentError.new('name params is mandatory.')
  end

  base_url = @@base_url + "/flow/connections/#{id}/status/history"
  self.class.http_client(base_url)
end
get_debug() click to toggle source
# File lib/nifi_sdk_ruby.rb, line 65
def get_debug
  @@debug
end
get_flow_status() click to toggle source
# File lib/nifi_sdk_ruby.rb, line 106
def get_flow_status()

  base_url = @@base_url + '/flow/status'
  self.class.http_client(base_url)
end
get_host() click to toggle source
# File lib/nifi_sdk_ruby.rb, line 93
def get_host
  @@host
end
get_process(id = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 188
def get_process(id = nil)
  if id.nil?
    raise ArgumentError.new('id is mandatory.')
  end

  url = @@base_url + '/processors/' << id
  self.class.http_client(url)
end
get_process_group(id = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 132
def get_process_group(id = nil)

  process_group = id ? id : 'root'

  base_url = @@base_url + '/process-groups/' << process_group
  self.class.http_client(base_url)
end
get_process_group_by_name(name = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 169
def get_process_group_by_name(name = nil)

  if name.nil?
    raise ArgumentError.new('name is mandatory.')
  end

  res = self.class.exists

  pg = res.select do |r|
    r['name'] == name and r['identifier'] =~ /process-groups/
  end

  if pg.count == 1
    self.class.get pg[0]['identifier']
  else
    raise ArgumentError.new('Unable to locate group with name ' << name)
  end
end
get_resources() click to toggle source
# File lib/nifi_sdk_ruby.rb, line 101
def get_resources()
  base_url = @@base_url + '/resources'
  self.class.http_client(base_url)
end
get_schema() click to toggle source
# File lib/nifi_sdk_ruby.rb, line 89
def get_schema
  @@schema
end
get_template_by_name(name = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 365
def get_template_by_name(name = nil)

  if name.nil?
    raise ArgumentError.new('name is mandatory.')
  end

  res = self.class.exists

  t = res.select do |r|
    r['name'] == name and r['identifier'] =~ /^\/templates\//
  end


  if t.count == 1
    t[0]['identifier'].scan(/\/templates\/(.*)/)[0][0]
  else
    raise StandardError.new('Unable to locate template with name ' << name)
  end

end
process_group_by_name?(name = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 277
def process_group_by_name?(name = nil)

  if name.nil?
    raise ArgumentError.new('name is mandatory.')
  end

  res = self.class.exists

  pg = res.select do |r|
    r['name'] == name and r['identifier'] =~ /process-groups/
  end

  pg.count == 1 ? true : false
end
set_async(async = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 69
def set_async(async = nil)
  if async.nil?
    raise ArgumentError.new('missing async')
  end

  if !(!!async == async)
    raise TypeError.new('async must be a boolean')
  end

  @@async = async
end
set_debug(debug = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 53
def set_debug(debug = nil)
  if debug.nil?
    raise ArgumentError.new('missing debug')
  end

  if !(!!debug == debug)
    raise TypeError.new('debug must be a boolean')
  end

  @@debug = debug
end
start_process(*args) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 197
def start_process(*args)
  args = args.reduce Hash.new, :merge
  if args[:id].nil? or args[:version].nil?
    raise ArgumentError.new('id and version params are mandatory')
  end

  id = args[:id].to_s
  version = args[:version].to_s

  params = {
    revision:{
      version: version
    },
    id: id,
    component:{
      id: id,
      state: 'RUNNING'
    },
    status:{
      runStatus: 'Running'
    }
  }
  update_process(id: id, update_json: params)
end
stop_process(*args) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 222
def stop_process(*args)
  args = args.reduce Hash.new, :merge
  if args[:id].nil? or args[:version].nil?
    raise ArgumentError.new('id and version params are mandatory')
  end

  id = args[:id].to_s
  version = args[:version].to_s

  params = {
    revision:{
      version: version
    },
    id: id,
    component:{
      id: id,
      state: 'STOPPED'
    },
    status:{
      runStatus: 'Stopped'
    }
  }
  update_process(id: id, update_json: params)
end
template_by_id?(id = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 401
def template_by_id?(id = nil)

  if id.nil?
    raise ArgumentError.new('id is mandatory.')
  end

  res = self.class.exists

  pg = res.select do |r|
    r['identifier'] == '/templates/' << id
  end

  pg.count == 1 ? true : false
end
template_by_name?(name = nil) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 386
def template_by_name?(name = nil)

  if name.nil?
    raise ArgumentError.new('name is mandatory.')
  end

  res = self.class.exists

  pg = res.select do |r|
    r['name'] == name and r['identifier'] =~ /templates/
  end

  pg.count == 1 ? true : false
end
update_process(*args) click to toggle source

NiFi has a classic Optimistic Locking pattern implementation which means that your call will only be accepted and processed as valid if you specify the right version of the component in the revision -> version field

ArgumentError is raised if you haven't specified required arguments or they are invalid

Updates the process by id using given JSON string or Hash Params: :id => the id of the process :update_json => json with updated values, could be either a JSON string or Ruby Hash

# File lib/nifi_sdk_ruby.rb, line 257
def update_process(*args)
  args = args.reduce Hash.new, :merge
  if args[:id].nil? or args[:update_json].nil?
    raise ArgumentError.new('id and update_json params are mandatory')
  end

  id = args[:id].to_s
  case args[:update_json]
    when Hash
      params = args[:update_json].to_json
    when String
      params = args[:update_json]
    else
      raise ArgumentError.new('update_json param must be either a Hash or a String')
  end

  base_url = @@base_url + '/processors/' << id
  self.class.http_client(base_url, 'PUT', params)
end
upload_template(*args) click to toggle source
# File lib/nifi_sdk_ruby.rb, line 315
def upload_template(*args)

  args = args.reduce Hash.new, :merge

  if args[:path].nil?
    raise ArgumentError.new('path params is mandatory.')
  end
  path = args[:path]

  if path =~ URI::regexp

    download_s = open(path)
    download_t = '/tmp/' << download_s.base_uri.to_s.split('/')[-1]
    IO.copy_stream(download_s, download_t)
    path = download_t
  end

  if not File.file? path or not File.readable? path
    raise IOError.new('Access to ' <<path << ' failed')
  end

  t = File.open(path) { |f| Nokogiri::XML(f) }
  name = t.xpath('//template/name').text

  if self.template_by_name? name
    self.delete_template self.get_template_by_name name
    #raise StandardError.new('The template ' << name << ' already exists')
  end

  params = Array.new
  params << Curl::PostField.file('template', path)

  process_group = args[:id] ? args[:id] : 'root'

  base_url = @@base_url + '/process-groups/' << process_group << '/templates/upload'
  res = self.class.http_client(base_url, 'POST', params)

  return res['templateEntity']['template']
end