class Kuroko2::ExecutionLogger::CloudWatchLogs

Constants

MAX_RETRY_COUNT
RETRY_ERRORS

Attributes

client[R]

Public Class Methods

new(stream_name:, group_name:, region: 'ap-northeast-1') click to toggle source
# File lib/autoload/kuroko2/execution_logger/cloud_watch_logs.rb, line 12
def initialize(stream_name:, group_name:, region: 'ap-northeast-1')
  @client = Aws::CloudWatchLogs::Client.new(region: region)

  @group_name    = group_name
  @stream_name   = stream_name
  @put_log_token = nil
  @get_log_token = nil
end

Public Instance Methods

get_logs(token = @get_log_token) click to toggle source
# File lib/autoload/kuroko2/execution_logger/cloud_watch_logs.rb, line 65
def get_logs(token = @get_log_token)
  response = client.get_log_events({
    log_group_name: @group_name,
    log_stream_name: @stream_name,
    next_token: token,
    start_from_head: true,
  })

  @get_log_token = response.next_forward_token
  response
rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException
  raise ExecutionLogger::NotFound
end
put_logs(events) click to toggle source
# File lib/autoload/kuroko2/execution_logger/cloud_watch_logs.rb, line 25
def put_logs(events)
  exception_cb = lambda do |exception|
    Kuroko2.logger.warn("#{exception.class} #{exception.message} #{events}")

    case exception
    when Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException
      old_token = @put_log_token
      new_token = exception.message.match(%r{\AThe given sequenceToken is invalid. The next expected sequenceToken is:\s*(\w+)\z})[1]
      if new_token
        @put_log_token = new_token
        Kuroko2.logger.warn("Refreshed sequenceToken from '#{old_token}' to '#{@put_log_token}'")
      end
    when Aws::CloudWatchLogs::Errors::ResourceNotFoundException
      create_log_stream
    when Aws::CloudWatchLogs::Errors::ThrottlingException
      sleep(0.5)
    end
  end

  retry_options = {
    exception_cb: exception_cb,
    on: RETRY_ERRORS,
    tries: MAX_RETRY_COUNT,
    sleep: 0,
  }

  Retryable.retryable(retry_options) do
    response = client.put_log_events(
      log_group_name: @group_name,
      log_stream_name: @stream_name,
      log_events: events,
      sequence_token: @put_log_token,
    )
    @put_log_token = response.data[:next_sequence_token]

    Kuroko2.logger.debug("Put logs: #{@group_name} #{@stream_name} / #{response.data}")
    response
  end
end
send_log(message) click to toggle source
# File lib/autoload/kuroko2/execution_logger/cloud_watch_logs.rb, line 21
def send_log(message)
  put_logs([{ timestamp: timestamp_now, message: message.to_json }])
end

Private Instance Methods

create_log_stream() click to toggle source
# File lib/autoload/kuroko2/execution_logger/cloud_watch_logs.rb, line 85
def create_log_stream
  Kuroko2.logger.info("Create log stream: #{@group_name} #{@stream_name}")
  client.create_log_stream(log_group_name: @group_name, log_stream_name: @stream_name)
rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException
  warn "Log stream '#{@stream_name}' already exists"
end
timestamp_now() click to toggle source
# File lib/autoload/kuroko2/execution_logger/cloud_watch_logs.rb, line 81
def timestamp_now
  (Time.current.to_f * 1000).to_i # milliseconds
end