class Eloqua::Query

Usage:

client = Eloqua::BulkClient.new({ … }) client.query(“My Query/Export Name”).

select("Activity.Asset.Id AS AssetId", "Activity.Id AS ActivityId").
from("activities").
where("Activity.CreatedAt > 2014-08-01", "Activity.Type = EmailOpen").
limit(3).
retain(3600).
execute().
wait(30).each do |row|
   puts row
end

If anything goes wrong (at any stage of the chain) a Query::Error exception will be thrown with a human readable error message, accompanied by the full HTTP request response and code when appropriate.

Public Class Methods

new(name, client) click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 32
def initialize(name, client)
  @name = name
  @client = client
  @fields = {}
  @filter = []
  @retain = 0
  @resource = nil
  @limit = 50000
  @uri = nil
  @sync = false
end

Public Instance Methods

count() click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 100
def count
  raise Error, 'Wait must be called before calling count.' unless @sync

  response = @client.send(retrieve_export_method, @uri, :limit => @limit)
  if response.code == 200 and response.parsed_response['totalResults']
    response.parsed_response['totalResults'].to_i
  else
    raise Error.new("Failed to get count.", response)
  end
end
delete() click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 44
def delete
  raise Error, 'Execute must be called before calling delete.' if @uri.nil?
  
  @client.delete_export(@uri)
  @client.delete_export_definition(@uri)

  self
end
each() { |item| ... } click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 111
def each
  raise Error, 'No block provided.' unless block_given?
  raise Error, 'Wait must be called before calling each.' unless @sync

  offset = 0
  while true
    response = @client.send(retrieve_export_method, @uri, :offset => offset, :limit => @limit)
    if response.code == 200 and response.parsed_response['totalResults']
      response = response.parsed_response

      response['items'].each do |item|
        yield item
      end if response['items'].is_a?(Array)

      if response['hasMore']
        offset += @limit
      else
        break
      end
    else
      raise Error.new("Failed to get rows for each.", response)
    end
  end

  self
end
execute() click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 53
def execute
  raise Error, 'Execute cannot be called more than once.' unless @uri.nil?
  raise Error, 'A valid resource must be defined before calling execute.' unless @resource and @client.respond_to?(define_export_method) and @client.respond_to?(retrieve_export_method)

  response = @client.send(define_export_method, to_h)
  if response.code == 201 and response.parsed_response['uri']
    @uri = response.parsed_response['uri']
  else
    raise Error.new("Could not execute query.", response)
  end

  self
end
from(resource) click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 152
def from(resource)
  @resource = resource.downcase if resource.is_a?(String)
  self
end
limit(n) click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 147
def limit(n)
  @limit = n.to_i
  self
end
or(*conditions) click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 198
def or(*conditions)
  where(*conditions) do |filter|
    "OR ( #{filter} )"
  end

  self
end
retain(secs) click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 157
def retain(secs)
  @retain = secs.to_i
  self
end
select(*selectors) click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 162
def select(*selectors)
  fields = {}

  selectors.each do |field|
    l, op, r = expression(field)
    if not op.nil? and op.upcase == 'AS'
      fields[r] = "{{#{l}}}"
    elsif op.nil?
      fields[field.gsub('.', '')] = "{{#{field}}}"
    end
  end

  @fields.merge!(fields) if fields.any?

  self
end
to_h() click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 138
def to_h
  hash = {}
  hash['name'] = @name
  hash['fields'] = @fields
  hash['filter'] = @filter.join(' ')
  hash['secondsToRetainData'] = @retain if @retain > 0
  hash
end
wait(secs) click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 67
def wait(secs)
  raise Error, 'Execute must be called before wait.' if @uri.nil?

  response = nil
  @sync_uri ||= nil

  if @sync_uri.nil?
    response = @client.syncs(@uri)
    if response.code == 201 and response.parsed_response['status'] == 'pending'
      @sync_uri = response.parsed_response['uri']
    else
      raise Error.new("Could not sync.", response)
    end
  end

  i = 0
  while i < secs
    i += 1

    response = @client.syncs_status(@sync_uri)
    if response.code == 200 and response.parsed_response['status'] == "success"
      @sync = true
      return self
    end

    sleep 1
  end

  raise Error.new("Could not sync in #{secs} seconds.", response)

  self
end
where(*conditions) { |join(" AND ")| ... } click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 179
def where(*conditions)
  filter = []
  
  conditions.each do |condition|
    l, op, r = expression(condition)
    filter << "'{{#{l}}}'#{op.upcase}'#{r}'" if l
  end

  if filter.any?
    if block_given?
      @filter << yield(filter.join(" AND "))
    else
      @filter << filter.join(" AND ")
    end
  end

  self
end

Protected Instance Methods

define_export_method() click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 208
def define_export_method
  @define_export_method ||= :"define_#{@resource}_export"
end
expression(condition) click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 216
def expression(condition)
  if condition =~ /^(.*?)\s+(.*?)\s+(.*?)$/
    [$1, $2, $3]
  else
    [nil, nil, nil]
  end
end
retrieve_export_method() click to toggle source
# File lib/eloqua_api/bulk/query.rb, line 212
def retrieve_export_method
  @retrieve_export_method ||= :"retrieve_#{@resource}_export"
end