class Fluent::LogzioOutputBuffered

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logzio_buffered.rb, line 14
def configure(conf)
  super
  $log.debug "Logzio url #{@endpoint_url}"
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 34
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logzio_buffered.rb, line 30
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logzio_buffered.rb, line 19
def start
  super
  require 'net/http/persistent'
  @uri = URI @endpoint_url
  @http = Net::HTTP::Persistent.new 'fluent-plugin-logzio-ng', :ENV
  @http.headers['Content-Type'] = 'text/plain'
  @http.idle_timeout = @http_idle_timeout
  @http.socket_options << [Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1]
  $log.debug "Started logzio shipper.."
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 38
def write(chunk)
  records = []

  chunk.msgpack_each {|tag,time,record|
    record['@timestamp'] ||= Time.at(time).iso8601(3) if @output_include_time
    record['fluentd_tags'] ||= tag.to_s if @output_include_tags
    records.push(Yajl.dump(record))
  }

  $log.debug "Got flush timeout, containing #{records.length} chunks"

  # Setting our request
  post = Net::HTTP::Post.new @uri.request_uri

  # Logz.io bulk http endpoint expecting log line with \n delimiter
  post.body = records.join("\n")

  begin
    response = @http.request @uri, post
    $log.debug "HTTP Response code #{response.code}"

    if response.code != '200'

      $log.debug "Got HTTP #{response.code} from logz.io, not giving up just yet"

      # If any other non-200, we will try to resend it after 2, 4 and 8 seconds. Then we will give up

      sleep_interval = 2
      @retry_count.times do |counter|

        $log.debug "Sleeping for #{sleep_interval} seconds, and trying again."

        sleep(sleep_interval)

        # Retry
        response = @http.request @uri, post

        # Sucecss, no further action is needed
        if response.code == 200

          $log.debug "Successfuly sent the failed bulk."

          # Breaking out
          break

        else

          # Doubling the sleep interval
          sleep_interval *= 2

          if counter == @retry_count - 1

            $log.error "Could not send your bulk after 3 tries. Sorry. Got HTTP #{response.code}"
          end
        end
      end
    end
  rescue StandardError => error
    $log.error "Error connecting to logzio. Got exception: #{error}"
  end
end