class GoodDataMarketo::Stream

Attributes

client[R]

Public Class Methods

new(web_method, request, config = {}) click to toggle source
# File lib/gooddata_marketo/models/streams.rb, line 12
def initialize(web_method, request, config = {})

  def start_stream(web_method, request, config)

    @timer_start = Time.now
    puts "#{@timer_start} => Start Streaming:#{web_method}" if GoodDataMarketo.logging

    @client = config[:client]
    @storage = []

    # Make an initial call to determine what type of stream Marketo will respond with. This is based on the configurations in the initial request.
    initialized_stream = @client.call(web_method, request)
    raise "ERROR: No response from Marketo based on query: #{request}" unless initialized_stream

    begin

      # Timeout.timeout(config[:timeout] || 100){
      @storage << initialized_stream.to_json
      # }

      if initialized_stream[:new_stream_position].is_a? String
        @stream_id = initialized_stream[:new_stream_position]
        @stream_id_monitor = true
        @offset_monitor = false
      else
        initialized_stream.delete(:start_position)
        @offset_monitor = true
        @stream_id_monitor = false
        @offset = initialized_stream[:new_start_position][:offset]
      end

      @count = initialized_stream[:remaining_count].to_i

      puts "#{Time.now} => Stream:#{web_method}:#{@count} remain." if GoodDataMarketo.logging

    rescue Timeout::Error => e
      client.load.log('TIMEOUT') if client.load
      puts e if GoodDataMarketo.logging
    end

  end

  start_stream(web_method, request, config)

  @timeouts = 0
  # Begin a loop to iterate by offset or stream id depending on the stream response from Marketo.
  def next_stream_request web_method, request

    begin

      new_request = {}
      request = new_request.merge(request)

      if @stream_id
        request[:stream_position] = @stream_id
      end

      if @count < 1000
        request[:batch_size] = @count.to_s
      end

      chunk_from_stream = @client.call(web_method, request)

      @storage << chunk_from_stream.to_json

      # FOR GET LEAD ACTIVITIES. The return count and remaining count are not stacked. Request 100 and it says 8 remaining, request a batch of 8 and it says 100 remaining. No stream id.
      if chunk_from_stream[:return_count]
        @count = @count - chunk_from_stream[:return_count].to_i
      else
        @count = chunk_from_stream[:remaining_count].to_i
      end

      puts "#{Time.now} => Stream:#{web_method}:#{@count} remain." if GoodDataMarketo.logging

      if chunk_from_stream[:new_stream_position].is_a? String
        @stream_id = chunk_from_stream[:new_stream_position]
        @stream_id_monitor = true
        @offset_monitor = false
      else
        @offset_monitor = true
        @stream_id_monitor = false
        @offset = chunk_from_stream[:new_start_position][:offset]

        # Update Request
        request[:start_position] = {
            :offset => @offset
        }

      end

    rescue Error => e
      puts e if GoodDataMarketo.logging
      retry if @timeouts < 3

    end

  end

  while @count > 0
    next_stream_request web_method, request
  end

  @timer_end = Time.now
  puts "#{@timer_end} => End Streaming: #{web_method}" if GoodDataMarketo.logging
  puts "#{Time.now} => Stream duration: #{((@timer_end - @timer_start)/60).round} minutes." if GoodDataMarketo.logging

  storage_index = 0
  @storage.map! { |m|
    storage_index += 1
    puts "#{Time.now} => Hashing streams in storage: #{storage_index}" if GoodDataMarketo.logging
    JSON.parse(m, :symbolize_names => true)
  }

end

Public Instance Methods

next_stream_request(web_method, request) click to toggle source

Begin a loop to iterate by offset or stream id depending on the stream response from Marketo.

# File lib/gooddata_marketo/models/streams.rb, line 58
def next_stream_request web_method, request

  begin

    new_request = {}
    request = new_request.merge(request)

    if @stream_id
      request[:stream_position] = @stream_id
    end

    if @count < 1000
      request[:batch_size] = @count.to_s
    end

    chunk_from_stream = @client.call(web_method, request)

    @storage << chunk_from_stream.to_json

    # FOR GET LEAD ACTIVITIES. The return count and remaining count are not stacked. Request 100 and it says 8 remaining, request a batch of 8 and it says 100 remaining. No stream id.
    if chunk_from_stream[:return_count]
      @count = @count - chunk_from_stream[:return_count].to_i
    else
      @count = chunk_from_stream[:remaining_count].to_i
    end

    puts "#{Time.now} => Stream:#{web_method}:#{@count} remain." if GoodDataMarketo.logging

    if chunk_from_stream[:new_stream_position].is_a? String
      @stream_id = chunk_from_stream[:new_stream_position]
      @stream_id_monitor = true
      @offset_monitor = false
    else
      @offset_monitor = true
      @stream_id_monitor = false
      @offset = chunk_from_stream[:new_start_position][:offset]

      # Update Request
      request[:start_position] = {
          :offset => @offset
      }

    end

  rescue Error => e
    puts e if GoodDataMarketo.logging
    retry if @timeouts < 3

  end

end
start_stream(web_method, request, config) click to toggle source
# File lib/gooddata_marketo/models/streams.rb, line 14
def start_stream(web_method, request, config)

  @timer_start = Time.now
  puts "#{@timer_start} => Start Streaming:#{web_method}" if GoodDataMarketo.logging

  @client = config[:client]
  @storage = []

  # Make an initial call to determine what type of stream Marketo will respond with. This is based on the configurations in the initial request.
  initialized_stream = @client.call(web_method, request)
  raise "ERROR: No response from Marketo based on query: #{request}" unless initialized_stream

  begin

    # Timeout.timeout(config[:timeout] || 100){
    @storage << initialized_stream.to_json
    # }

    if initialized_stream[:new_stream_position].is_a? String
      @stream_id = initialized_stream[:new_stream_position]
      @stream_id_monitor = true
      @offset_monitor = false
    else
      initialized_stream.delete(:start_position)
      @offset_monitor = true
      @stream_id_monitor = false
      @offset = initialized_stream[:new_start_position][:offset]
    end

    @count = initialized_stream[:remaining_count].to_i

    puts "#{Time.now} => Stream:#{web_method}:#{@count} remain." if GoodDataMarketo.logging

  rescue Timeout::Error => e
    client.load.log('TIMEOUT') if client.load
    puts e if GoodDataMarketo.logging
  end

end
storage() click to toggle source
# File lib/gooddata_marketo/models/streams.rb, line 127
def storage
  @storage
end