class Fluent::Plugin::LogzioOutputBuffered
Public Instance Methods
compress(string)
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 175 def compress(string) wio = StringIO.new("w") w_gz = Zlib::GzipWriter.new(wio) w_gz.write(string) w_gz.close wio.string end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logzio_buffered.rb, line 25 def configure(conf) super compat_parameters_convert(conf, :buffer) log.debug "Logz.io URL #{@endpoint_url}" if conf['proxy_uri'] log.debug "Proxy #{@proxy_uri}" ENV['http_proxy'] = @proxy_uri end if conf['proxy_cert'] log.debug "Proxy #{@proxy_cert}" ENV['SSL_CERT_FILE'] = @proxy_cert end end
encode_chunk(chunk) { |records, bulk_size| ... }
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 85 def encode_chunk(chunk) records = [] bulk_size = 0 chunk.each { |tag, time, record| record['@timestamp'] ||= Time.at(time).iso8601(3) if @output_include_time record[@output_tags_fieldname] ||= tag.to_s if @output_include_tags begin json_record = Yajl.dump(record) record_size = json_record.size + (1 if !records.empty?).to_i # Accounting for trailing "\n" rescue log.error "Adding record #{record} to buffer failed. Exception: #{$!}" next end if record_size > @bulk_limit if @bulk_limit_warning_limit.is_a?(Integer) log.warn "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record starts with (truncated at #{@bulk_limit_warning_limit} characters): #{json_record[0,@bulk_limit_warning_limit]}" # Send the full message to debug facility log.debug "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record is: #{json_record}" else log.warn "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record is: #{json_record}" end next end if bulk_size + record_size > @bulk_limit yield(records, bulk_size) records = [] bulk_size = 0 end records.push(json_record) bulk_size += record_size } if records yield(records, bulk_size) end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 70 def format(tag, time, record) if time.is_a?(Fluent::EventTime) sec_frac = time.to_f else sec_frac = time * 1.0 end [tag, sec_frac, record].to_msgpack end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 62 def formatted_to_msgpack_binary? true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 66 def multi_workers_ready? true end
send_bulk(bulk_records, bulk_size)
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 123 def send_bulk(bulk_records, bulk_size) log.debug "Sending a bulk of #{bulk_records.size} records, size #{bulk_size}B to Logz.io" # Setting our request post = Net::HTTP::Post.new @uri.request_uri # Logz.io bulk http endpoint expecting log line with \n delimiter post.body = bulk_records.join("\n") if gzip post.body = compress(post.body) end sleep_interval = @retry_sleep begin @retry_count.times do |counter| should_retry = true begin response = @http.request @uri, post if response.code != '200' if response.code == '401' log.error "You are not authorized with Logz.io! Token OK? dropping logs..." should_retry = false elsif response.code == '400' log.info "Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. Response: #{response.body}" should_retry = false else log.warn "Got HTTP #{response.code} from Logz.io, not giving up just yet (Try #{counter + 1}/#{@retry_count})" end else log.debug "Successfully sent bulk of #{bulk_records.size} records, size #{bulk_size}B to Logz.io" should_retry = false end rescue StandardError => e log.warn "Error connecting to Logz.io. Got exception: #{e} (Try #{counter + 1}/#{@retry_count})" end if should_retry if counter == @retry_count - 1 log.error "Could not send your bulk after #{retry_count} tries Sorry! Your bulk is: #{post.body}" break end sleep(sleep_interval) sleep_interval *= 2 else return end end rescue Exception => e log.error "Got unexpected exception! Here: #{e}" end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logzio_buffered.rb, line 58 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logzio_buffered.rb, line 43 def start super require 'net/http/persistent' @uri = URI @endpoint_url @http = Net::HTTP::Persistent.new name: 'fluent-plugin-logzio', proxy: :ENV @http.headers['Content-Type'] = 'text/plain' if @gzip @http.headers['Content-Encoding'] = 'gzip' end @http.idle_timeout = @http_idle_timeout @http.socket_options << [Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1] log.debug "Started Logz.io shipper.." end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 79 def write(chunk) encode_chunk(chunk) { |bulk_records, bulk_size| send_bulk(bulk_records, bulk_size) } end