class Fluent::FeedlyInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_feedly.rb, line 21 def initialize require 'feedlr' require 'digest/sha2' super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_feedly.rb, line 28 def configure(conf) super if not @fetch_count >= 20 && @fetch_count <= 10000 raise Fluent::ConfigError, "Feedly: fetch_count param (#{@fetch_count}) should be between 20 and 10000." end @client = Feedlr::Client.new( oauth_access_token: @access_token, sandbox: @enable_sandbox ) end
fetch()
click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 69 def fetch @profile_id ||= @client.user_profile.id @state_store ||= StateStore.new(@state_file) @subscribe_categories.each do |category_name| category_id = "user/#{@profile_id}/category/#{category_name}" fetch_time_range = get_fetch_time_range loop do request_option = { count: @fetch_count, continuation: get_continuation_id, newerThan: fetch_time_range } cursor = @client.stream_entries_contents(category_id, request_option) if cursor.items.nil? return raise Feedlr::Error::ServerError, cursor end cursor.items.each do |item| Engine.emit(@tag, Engine.now, item) end log.info "Feedly: fetched articles.", articles: cursor.items.size, request_option: request_option set_continuation_id(cursor.continuation) break if get_continuation_id.nil? end end end
get_continuation_id()
click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 113 def get_continuation_id record = @state_store.get('continuation') if subscribe_categories_hash == record[:subscribe_categories_hash] return record[:id] else return nil end end
get_fetch_time_range()
click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 91 def get_fetch_time_range if @initial_loop @initial_loop = false range = @fetch_time_range_on_startup else range = @fetch_time_range end return (Time.now.to_i - range ) * 1000 end
run()
click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 49 def run @initial_loop = true loop do begin fetch rescue Feedlr::Error::Unauthorized, Feedlr::Error::Forbidden => e log.error "Feedly: unrecoverable error has occoured.", error: e.message, error_class: e.class log.error_backtrace e.backtrace break rescue => e log.error "Feedly: error has occoured. trying to retry after #{@run_interval} seconds.", error: e.message, error_class: e.class log.error_backtrace e.backtrace sleep @run_interval retry end sleep @run_interval end log.error "Feedly: stopped fetching process due to the previous error." end
set_continuation_id(continuation_id)
click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 105 def set_continuation_id(continuation_id) @state_store.set("continuation", { id: continuation_id, subscribe_categories_hash: subscribe_categories_hash }) @state_store.update! end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 45 def shutdown Thread.kill(@thread) end
start()
click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 41 def start @thread = Thread.new(&method(:run)) end
subscribe_categories_hash()
click to toggle source
# File lib/fluent/plugin/in_feedly.rb, line 101 def subscribe_categories_hash Digest::SHA512.digest(@subscribe_categories.sort.join('')) end