class LogStash::Outputs::TreasureData

Constants

IMPORT_SIZE_LIMIT
RECORD_KEYS_LIMIT
RECORD_SIZE_LIMIT
UUID_FORMAT
VERSION

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 164
def close
  buffer_flush(final: true)
end
ensure_table_exists(client, database, table) click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 142
def ensure_table_exists(client, database, table)
  begin
    client.create_log_table(database, table)
  rescue TreasureData::NotFoundError => e
    client.create_database(database)
    client.create_log_table(database, table)
  rescue TreasureData::AlreadyExistsError => e
    # ignore
  end
end
flush(events, teardown = false) click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 104
def flush(events, teardown = false)
  @logger.debug "flushing #{events} events (may include chunk uuid)"
  return if events.size < 1
  if UUID_FORMAT !~ events.first
    new_uuid = @uuid.generate
    @logger.debug "adding chunk uuid #{new_uuid}"
    events.unshift new_uuid
  end

  uuid = events.first
  io = StringIO.new
  @logger.debug "gzipping chunk #{uuid}"
  Zlib::GzipWriter.wrap(io){|f|
    events.each do |row|
      unless row == uuid
        f.write row
      end
    end
    f.finish
  }
  data = io.string
  @logger.debug "sending gzipped chunk #{uuid}, #{data.bytesize} bytes"
  begin
    @client.import(@database, @table, "msgpack.gz", data, data.bytesize, uuid)

  rescue TreasureData::NotFoundError => e
    raise unless @auto_create_table

    @logger.info "creating missing table #{@table} on database #{@database} for chunk #{uuid}"
    ensure_table_exists(@client, @database, @table)
    @logger.info "retrying upload chunk #{uuid}"
    retry
  end

  @logger.debug "done #{uuid}"
end
on_flush_error(e) click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 154
def on_flush_error(e)
  @logger.warn "flush error #{e.class}: #{e.message}"
end
on_full_buffer_error(opts={}) click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 159
def on_full_buffer_error(opts={})
  @logger.warn "buffer exceeds limits: pending:#{opts[:pending_count]}, outgoing:#{opts[:outgoing_count]}"
end
receive(event) click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 85
def receive(event)
  record = event.clone
  @logger.debug "receive a event"

  record['time'] ||= (record.timestamp.to_i || Time.now.to_i)
  if record.to_hash.size > RECORD_KEYS_LIMIT
    raise "Too many number of keys (#{record.keys.size} keys)"
  end

  row = record.to_msgpack
  if row.bytesize > RECORD_SIZE_LIMIT
    raise "Too large record (#{row.bytesize} bytes with keys:#{record.keys.join(',')})"
  end

  buffer_receive(row)
  @logger.debug "buffered a event"
end
register() click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 58
def register
  @empty_gz_data = TreasureData::API.create_empty_gz_data
  @user_agent = "logstash-output-treasure_data: #{VERSION}".freeze
  @uuid = UUID.new

  TreasureData::API.validate_database_name(@database)
  TreasureData::API.validate_table_name(@table)

  client_opts = {
    ssl: @use_ssl,
    http_proxy: @http_proxy,
    user_agent: @user_agent,
    endpoint: @endpoint,
    connect_timeout: @connect_timeout,
    read_timeout: @read_timeout,
    send_timeout: @send_timeout
  }
  @client = TreasureData::Client.new(@apikey.value, client_opts)

  buffer_initialize(
    max_items: @flush_size,
    max_interval: @flush_interval,
    logger: @logger
  )
end