class LogStash::Outputs::TreasureData
Constants
- IMPORT_SIZE_LIMIT
- RECORD_KEYS_LIMIT
- RECORD_SIZE_LIMIT
- UUID_FORMAT
- VERSION
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 164 def close buffer_flush(final: true) end
ensure_table_exists(client, database, table)
click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 142 def ensure_table_exists(client, database, table) begin client.create_log_table(database, table) rescue TreasureData::NotFoundError => e client.create_database(database) client.create_log_table(database, table) rescue TreasureData::AlreadyExistsError => e # ignore end end
flush(events, teardown = false)
click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 104 def flush(events, teardown = false) @logger.debug "flushing #{events} events (may include chunk uuid)" return if events.size < 1 if UUID_FORMAT !~ events.first new_uuid = @uuid.generate @logger.debug "adding chunk uuid #{new_uuid}" events.unshift new_uuid end uuid = events.first io = StringIO.new @logger.debug "gzipping chunk #{uuid}" Zlib::GzipWriter.wrap(io){|f| events.each do |row| unless row == uuid f.write row end end f.finish } data = io.string @logger.debug "sending gzipped chunk #{uuid}, #{data.bytesize} bytes" begin @client.import(@database, @table, "msgpack.gz", data, data.bytesize, uuid) rescue TreasureData::NotFoundError => e raise unless @auto_create_table @logger.info "creating missing table #{@table} on database #{@database} for chunk #{uuid}" ensure_table_exists(@client, @database, @table) @logger.info "retrying upload chunk #{uuid}" retry end @logger.debug "done #{uuid}" end
on_flush_error(e)
click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 154 def on_flush_error(e) @logger.warn "flush error #{e.class}: #{e.message}" end
on_full_buffer_error(opts={})
click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 159 def on_full_buffer_error(opts={}) @logger.warn "buffer exceeds limits: pending:#{opts[:pending_count]}, outgoing:#{opts[:outgoing_count]}" end
receive(event)
click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 85 def receive(event) record = event.clone @logger.debug "receive a event" record['time'] ||= (record.timestamp.to_i || Time.now.to_i) if record.to_hash.size > RECORD_KEYS_LIMIT raise "Too many number of keys (#{record.keys.size} keys)" end row = record.to_msgpack if row.bytesize > RECORD_SIZE_LIMIT raise "Too large record (#{row.bytesize} bytes with keys:#{record.keys.join(',')})" end buffer_receive(row) @logger.debug "buffered a event" end
register()
click to toggle source
# File lib/logstash/outputs/treasure_data.rb, line 58 def register @empty_gz_data = TreasureData::API.create_empty_gz_data @user_agent = "logstash-output-treasure_data: #{VERSION}".freeze @uuid = UUID.new TreasureData::API.validate_database_name(@database) TreasureData::API.validate_table_name(@table) client_opts = { ssl: @use_ssl, http_proxy: @http_proxy, user_agent: @user_agent, endpoint: @endpoint, connect_timeout: @connect_timeout, read_timeout: @read_timeout, send_timeout: @send_timeout } @client = TreasureData::Client.new(@apikey.value, client_opts) buffer_initialize( max_items: @flush_size, max_interval: @flush_interval, logger: @logger ) end