class Fluent::Plugin::GithubActivitiesInput

Constants

DEFAULT_BASE_TAG
DEFAULT_CLIENTS
DEFAULT_STORAGE_TYPE

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_github-activities.rb, line 50
def configure(conf)
  super

  @base_tag = @base_tag.sub(/\.\z/, "")
  @users += load_users_list
  @n_clients = [@clients, @users.size].min
  @interval = @interval * @n_clients
  raise Fluent::ConfigError, "You can define <storage> section at once" unless @storage_configs.size == 1
  storage_section = @storage_configs.first
  storage_config = storage_section.corresponding_config_element
  @pos_storage = storage_create(usage: storage_section.usage,
                                conf: storage_config,
                                default_type: DEFAULT_STORAGE_TYPE)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_github-activities.rb, line 106
def shutdown
  until @request_queue.empty?
    log.debug(queue_size: @request_queue.size)
    sleep(@interval)
  end
  @crawlers.each(&:stop)
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_github-activities.rb, line 65
def start
  super

  @request_queue = Queue.new
  @crawlers = []

  users_manager_params = {
    users: @users,
    pos_storage: @pos_storage,
  }
  users_manager = ::Fluent::Plugin::GithubActivities::UsersManager.new(users_manager_params)
  users_manager.generate_initial_requests.each do |request|
    @request_queue.push(request)
  end
  @n_clients.times do |n|
    thread_create("in_github_activity_#{n}".to_sym) do
      crawler_options = {
        access_token: @access_token,
        watching_users: @users,
        include_commits_from_pull_request: @include_commits_from_pull_request,
        include_foreign_commits: @include_foreign_commits,
        pos_storage: @pos_storage,
        request_queue: @request_queue,
        default_interval: @interval,
        log: log
      }
      crawler = ::Fluent::Plugin::GithubActivities::Crawler.new(crawler_options)
      @crawlers << crawler
      crawler.on_emit = lambda do |tag, record|
        router.emit("#{@base_tag}.#{tag}", Engine.now, record)
      end

      loop do
        crawler.process_request
        break unless crawler.running
        sleep(crawler.interval_for_next_request)
      end
    end
  end
end

Private Instance Methods

load_users_list() click to toggle source
# File lib/fluent/plugin/in_github-activities.rb, line 117
def load_users_list
  users = []
  if @users_list
    users_list = Pathname(@users_list)
    if users_list.exist?
      list = users_list.read
      users += list.split("\n")
    end
  end

  users.collect(&:strip).reject(&:empty?)
end