class Fluent::Plugin::GithubActivities::Crawler

Constants

DEFAULT_INTERVAL
MERGE_COMMIT_MESSAGE_PATTERN
NO_INTERVAL

Attributes

interval_for_next_request[R]
log[R]
on_emit[W]
request_queue[R]
running[R]

Public Class Methods

new(options={}) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 43
def initialize(options={})
  @users_manager = UsersManager.new(users: options[:watching_users],
                                    pos_storage: options[:pos_storage])

  @access_token = options[:access_token]

  @watching_users = options[:watching_users] || []

  @include_commits_from_pull_request = options[:include_commits_from_pull_request]
  @include_foreign_commits = options[:include_foreign_commits]

  @request_queue = options[:request_queue] || []

  @default_interval = options[:default_interval] || DEFAULT_INTERVAL
  @interval_for_next_request = @default_interval
  # Fluent::PluginLogger instance
  @log = options[:log]
  @running = true
end

Public Instance Methods

extra_request_headers(request) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 131
def extra_request_headers(request)
  headers = {}
  if request[:previous_entity_tag]
    headers["If-None-Match"] = request[:previous_entity_tag]
  elsif request[:type] == TYPE_EVENTS
    position = @users_manager.position_for(request[:user])
    if position
      entity_tag = position["entity_tag"]
      headers["If-None-Match"] = entity_tag if entity_tag
    end
  end
  headers
end
process_commit(commit, push_event) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 213
def process_commit(commit, push_event)
  log.debug("GithubActivities::Crawler: processing commit #{commit["sha"]}")
  user = commit["author"]["login"]

  if user and (@include_foreign_commits or watching_user?(user))
    commit[RELATED_USER_IMAGE_KEY] = push_event["actor"]["avatar_url"]
    if push_event["org"]
      commit[RELATED_ORGANIZATION_IMAGE_KEY] = push_event["org"]["avatar_url"]
    end
    commit[RELATED_EVENT] = push_event.reject {|k, _| k == "payload" }
    emit("commit", commit)
  end

  commit_refs = push_event["payload"]["commits"]
  target_commit_ref = commit_refs.find do |commit_ref|
    commit_ref["sha"] == commit["sha"]
  end
  target_commit_ref["commit"] = commit if target_commit_ref

  completely_fetched = commit_refs.all? do |commit_ref|
    commit_ref["commit"]
  end
  emit("push", push_event) if completely_fetched
end
process_create_event(event) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 301
def process_create_event(event)
  payload = event["payload"]
  case payload["ref_type"]
  when "branch"
    emit("branch", event)
  when "tag"
    emit("tag", event)
  end
end
process_issue_event(event) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 242
def process_issue_event(event)
  payload = event["payload"]
  case payload["action"]
  when "opened"
    emit("issue-open", event)
  when "closed"
    emit("issue-close", event)
  when "reopened"
    emit("issue-reopen", event)
  when "assigned"
    emit("issue-assign", event)
  when "unassigned"
    emit("issue-unassign", event)
  when "labeled"
    emit("issue-label", event)
  when "unlabeled"
    emit("issue-unlabel", event)
  end
end
process_issue_or_pull_request_comment_event(event) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 291
def process_issue_or_pull_request_comment_event(event)
  payload = event["payload"]
  if payload["issue"]["pull_request"]
    emit("pull-request-comment", event)
  # emit("pull-request.cancel", event)
  else
    emit("issue-comment", event)
  end
end
process_pull_request_event(event) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 262
def process_pull_request_event(event)
  payload = event["payload"]
  case payload["action"]
  when "opened"
    emit("pull-request", event)
  when "closed"
    if payload["pull_request"]["merged"]
      emit("pull-request-merged", event)
    else
      emit("pull-request-cancelled", event)
    end
  when "reopened"
    emit("pull-request-reopen", event)
  end
end
process_push_event(event) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 196
def process_push_event(event)
  return unless @running
  payload = event["payload"]
  commit_refs = payload["commits"]
  if !@include_commits_from_pull_request and
     push_event_from_merged_pull_request?(event)
    return
  end
  commit_refs.reverse.each do |commit_ref|
    @request_queue.push(type: TYPE_COMMIT,
                        uri: commit_ref["url"],
                        sha: commit_ref["sha"],
                        push: event)
  end
  # emit("push", event)
end
process_request() click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 63
def process_request
  request = @request_queue.shift
  log.debug("GithubActivities::Crawler: processing request: #{request.inspect}")
  if request[:process_after] and
    Time.now.to_i < request[:process_after]
    @request_queue.push(request)
    @interval_for_next_request = NO_INTERVAL
    return false
  end

  uri = request_uri(request)
  extra_headers = extra_request_headers(request)

  log.debug("GithubActivities::Crawler: requesting to #{uri.inspect}")
  response = http_get(uri, extra_headers)
  log.debug("GithubActivities::Crawler: response: #{response.inspect}")

  case response
  when Net::HTTPSuccess
    log.trace("GithubActivities::Crawler: Net::HTTPSuccess / request type: #{request[:type]}")
    body = JSON.parse(response.body)
    case request[:type]
    when TYPE_EVENTS
      events = body
      log.trace("GithubActivities::Crawler: events size: #{events.size}")
      process_user_events(request[:user], events)
      reserve_user_events(request[:user], previous_response: response)
      @users_manager.save_position_for(request[:user], entity_tag: response["ETag"])
    when TYPE_COMMIT
      process_commit(body, request[:push])
    end
  when Net::HTTPNotModified
    log.trace("GithubActivities::Crawler: Net::HTTPNotModified / request type: #{request[:type]}")
    case request[:type]
    when TYPE_EVENTS
      reserve_user_events(request[:user],
                          previous_response: response,
                          previous_entity_tag: extra_headers["If-None-Match"])
    end
    @interval_for_next_request = @default_interval
    return true
  else
    log.trace("GithubActivities::Crawler: UnknownType / request type: #{request[:type]}")
    case request[:type]
    when TYPE_COMMIT
      fake_body = {
        "sha"    => request[:sha],
        "author" => {},
      }
      process_commit(fake_body, request[:push])
    end
  end
  @interval_for_next_request = @default_interval
  return true
rescue StandardError => error
  log.error(error.inspect)
end
process_user_event(user, event) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 168
def process_user_event(user, event)
  # see also: https://developer.github.com/v3/activity/events/types/
  event[RELATED_USER_IMAGE_KEY] = event["actor"]["avatar_url"]
  if event["org"]
    event[RELATED_ORGANIZATION_IMAGE_KEY] = event["org"]["avatar_url"]
  end
  case event["type"]
  when "PushEvent"
    process_push_event(event)
  when "CommitCommentEvent"
    emit("commit-comment", event)
  when "IssuesEvent"
    process_issue_event(event)
  when "IssueCommentEvent"
    process_issue_or_pull_request_comment_event(event)
  when "ForkEvent"
    emit("fork", event)
  when "PullRequestEvent"
    process_pull_request_event(event)
  when "CreateEvent"
    process_create_event(event)
  else
    emit(event["type"], event)
  end
rescue StandardError => error
  log.fatal(error)
end
process_user_events(user, events) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 150
def process_user_events(user, events)
  last_event_timestamp = UsersManager::DEFAULT_LAST_EVENT_TIMESTAMP
  position = @users_manager.position_for(user)
  if position and position["last_event_timestamp"]
    last_event_timestamp = position["last_event_timestamp"]
  end

  events = events.sort do |a, b|
    b["created_at"] <=> a["created_at"]
  end
  events.each do |event|
    timestamp = Time.parse(event["created_at"]).to_i
    next if timestamp <= last_event_timestamp
    process_user_event(user, event)
    @users_manager.save_position_for(user, last_event_timestamp: timestamp)
  end
end
push_event_from_merged_pull_request?(event) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 280
def push_event_from_merged_pull_request?(event)
  payload = event["payload"]
  inserted_requests = []
  commit_refs = payload["commits"]
  if MERGE_COMMIT_MESSAGE_PATTERN =~ commit_refs.last["message"]
    true
  else
    false
  end
end
request_uri(request) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 121
def request_uri(request)
  uri = nil
  case request[:type]
  when TYPE_EVENTS
    uri = user_activities(request[:user])
  else
    uri = request[:uri]
  end
end
reserve_user_events(user, options={}) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 145
def reserve_user_events(user, options={})
  request = @users_manager.new_events_request(user, options)
  @request_queue.push(request) if @running
end
stop() click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 311
def stop
  @running = false
end
watching_user?(user) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 238
def watching_user?(user)
  @watching_users.include?(user)
end

Private Instance Methods

emit(tag, record) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 324
def emit(tag, record)
  log.trace("GithubActivities::Crawler: emit => #{tag}, #{record.inspect}")
  @on_emit.call(tag, record) if @on_emit
end
http_get(uri, extra_headers={}) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 329
def http_get(uri, extra_headers={})
  parsed_uri = URI(uri)
  if @access_token
    extra_headers["Authorization"] = "token #{@access_token}"
  end
  response = nil
  http = Net::HTTP.new(parsed_uri.host, parsed_uri.port)
  http.use_ssl = parsed_uri.is_a?(URI::HTTPS)
  http.start do |http|
    http_request = Net::HTTP::Get.new(parsed_uri.path, extra_headers)
    response = http.request(http_request)
  end
  response
end
user_activities(user) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 316
def user_activities(user)
  "https://api.github.com/users/#{user}/events/public"
end
user_info(user) click to toggle source
# File lib/fluent/plugin/github-activities/crawler.rb, line 320
def user_info(user)
  "https://api.github.com/users/#{user}"
end