class Fluent::FeedlyInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_feedly.rb, line 21
def initialize
  require 'feedlr'
  require 'digest/sha2'

  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_feedly.rb, line 28
def configure(conf)
  super

  if not @fetch_count >= 20 && @fetch_count <= 10000
    raise Fluent::ConfigError, "Feedly: fetch_count param (#{@fetch_count}) should be between 20 and 10000."
  end

  @client = Feedlr::Client.new(
    oauth_access_token: @access_token,
    sandbox: @enable_sandbox
  )
end
fetch() click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 69
def fetch
  @profile_id ||= @client.user_profile.id
  @state_store ||= StateStore.new(@state_file)
  @subscribe_categories.each do |category_name|
    category_id = "user/#{@profile_id}/category/#{category_name}"
    fetch_time_range = get_fetch_time_range
    loop do
      request_option = { count: @fetch_count, continuation: get_continuation_id, newerThan: fetch_time_range }
      cursor = @client.stream_entries_contents(category_id, request_option)
      if cursor.items.nil?
        return raise Feedlr::Error::ServerError, cursor
      end
      cursor.items.each do |item|
        Engine.emit(@tag, Engine.now, item)
      end
      log.info "Feedly: fetched articles.", articles: cursor.items.size, request_option: request_option
      set_continuation_id(cursor.continuation)
      break if get_continuation_id.nil?
    end
  end
end
get_continuation_id() click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 113
def get_continuation_id
  record = @state_store.get('continuation')
  if subscribe_categories_hash == record[:subscribe_categories_hash]
    return record[:id]
  else
    return nil
  end
end
get_fetch_time_range() click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 91
def get_fetch_time_range
  if @initial_loop
    @initial_loop = false
    range = @fetch_time_range_on_startup
  else
    range = @fetch_time_range
  end
  return (Time.now.to_i - range ) * 1000
end
run() click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 49
def run
  @initial_loop = true
  loop do
    begin
      fetch
    rescue Feedlr::Error::Unauthorized, Feedlr::Error::Forbidden => e
      log.error "Feedly: unrecoverable error has occoured.", error: e.message, error_class: e.class
      log.error_backtrace e.backtrace
      break
    rescue => e
      log.error "Feedly: error has occoured. trying to retry after #{@run_interval} seconds.", error: e.message, error_class: e.class
      log.error_backtrace e.backtrace
      sleep @run_interval
      retry
    end
    sleep @run_interval
  end
  log.error "Feedly: stopped fetching process due to the previous error."
end
set_continuation_id(continuation_id) click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 105
def set_continuation_id(continuation_id)
  @state_store.set("continuation", {
    id: continuation_id,
    subscribe_categories_hash: subscribe_categories_hash
  })
  @state_store.update!
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 45
def shutdown
  Thread.kill(@thread)
end
start() click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 41
def start
  @thread = Thread.new(&method(:run))
end
subscribe_categories_hash() click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 101
def subscribe_categories_hash
  Digest::SHA512.digest(@subscribe_categories.sort.join(''))
end