class LogStash::Outputs::ClickHouse

Public Instance Methods

flush(events, close=false) click to toggle source
# File lib/logstash/outputs/clickhouse.rb, line 151
def flush(events, close=false)
  documents = ""  #this is the string of hashes that we push to Fusion as documents

  events.each do |event|
      documents << LogStash::Json.dump( mutate( event.to_hash() ) ) << "\n"
  end

  hosts = get_host_addresses()

  make_request(documents, hosts, @http_query, 1, 1, hosts.sample)
end
print_plugin_info() click to toggle source
register() click to toggle source
# File lib/logstash/outputs/clickhouse.rb, line 65
def register
  # Handle this deprecated option. TODO: remove the option
  #@ssl_certificate_validation = @verify_ssl if @verify_ssl

  # We count outstanding requests with this queue
  # This queue tracks the requests to create backpressure
  # When this queue is empty no new requests may be sent,
  # tokens must be added back by the client on success
  @request_tokens = SizedQueue.new(@pool_max)
  @pool_max.times {|t| @request_tokens << true }
  @requests = Array.new
  @http_query = "/?query=INSERT%20INTO%20#{table}%20FORMAT%20JSONEachRow"

  @hostnames_pool =
    parse_http_hosts(http_hosts,
      ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger))

  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )

  print_plugin_info()
end

Private Instance Methods

delay_attempt(attempt_number, delay) click to toggle source
# File lib/logstash/outputs/clickhouse.rb, line 177
def delay_attempt(attempt_number, delay)
  # sleep delay grows roughly as k*x*ln(x) where k is the initial delay set in @backoff_time param
  attempt = [attempt_number, 1].max
  timeout = lambda { |x| [delay*x*Math.log(x), 1].max }
  # using rand() to pick final sleep delay to reduce the risk of getting in sync with other clients writing to the DB
  sleep_time = rand(timeout.call(attempt)..timeout.call(attempt+1))
  sleep sleep_time
end
get_host_addresses() click to toggle source
# File lib/logstash/outputs/clickhouse.rb, line 116
def get_host_addresses()
  begin
    @hostnames_pool.call
  rescue Exception => ex
    @logger.error('Error while resolving host', :error => ex.to_s)
  end
end
log_failure(message, opts) click to toggle source

This is split into a separate method mostly to help testing

# File lib/logstash/outputs/clickhouse.rb, line 270
def log_failure(message, opts)
  @logger.error("[HTTP Output Failure] #{message}", opts)
end
make_request(documents, hosts, query, con_count = 1, req_count = 1, host = "", uuid = SecureRandom.hex) click to toggle source
# File lib/logstash/outputs/clickhouse.rb, line 188
def make_request(documents, hosts, query, con_count = 1, req_count = 1, host = "", uuid = SecureRandom.hex)

  if host == ""
    host = hosts.pop
  end

  url = host+query

  # Block waiting for a token
  #@logger.info("Requesting token ", :tokens => request_tokens.length())
  token = @request_tokens.pop
  @logger.debug("Got token", :tokens => @request_tokens.length)

  # Create an async request
  begin
    request = client.send(:post, url, :body => documents, :headers => request_headers, :async => true)
  rescue Exception => e
    @logger.warn("An error occurred while indexing: #{e.message}")
  end

  request.on_success do |response|
    # Make sure we return the token to the pool
    @request_tokens << token

    if response.code == 200
      @logger.debug("Successfully submitted", 
        :size => documents.length,
        :response_code => response.code,
        :uuid => uuid)
    else
      if req_count >= @request_tolerance
        log_failure(
            "Encountered non-200 HTTP code #{response.code}",
            :response_code => response.code,
            :url => url,
            :size => documents.length,
            :uuid => uuid)
        if @save_on_failure
          save_to_disk(documents)
        end
      else
        @logger.info("Retrying request", :url => url, :message => response.message, :response => response.body, :uuid => uuid)
        delay_attempt(req_count, @backoff_time)
        make_request(documents, hosts, query, con_count, req_count+1, host, uuid)
      end
    end
  end

  request.on_failure do |exception|
    # Make sure we return the token to the pool
    @request_tokens << token

    if hosts.length == 0
        log_failure("Could not access URL",
          :url => url,
          :method => @http_method,
          :headers => headers,
          :message => exception.message,
          :class => exception.class.name,
          :backtrace => exception.backtrace,
          :size => documents.length,
          :uuid => uuid)
        if @save_on_failure
          save_to_disk(documents)
        end
        return
    end

    if con_count >= @automatic_retries
      host = ""
      con_count = 0
    end

    @logger.info("Retrying connection", :url => url, :uuid => uuid)
    delay_attempt(con_count, @backoff_time)
    make_request(documents, hosts, query, con_count+1, req_count, host, uuid)
  end

  client.execute!
end
mutate( src ) click to toggle source
# File lib/logstash/outputs/clickhouse.rb, line 129
def mutate( src )
  return src if @mutations.empty?
  res = {}
  @mutations.each_pair do |dstkey, source|
    case source
      when String then
        scrkey = source
        next unless src.key?(scrkey)

        res[dstkey] = src[scrkey]
      when Array then
        scrkey = source[0]
        next unless src.key?(scrkey)
        pattern = source[1]
        replace = source[2]
        res[dstkey] = src[scrkey].sub( Regexp.new(pattern), replace )
    end
  end
  res
end
parse_http_hosts(hosts, resolver) click to toggle source
# File lib/logstash/outputs/clickhouse.rb, line 93
def parse_http_hosts(hosts, resolver)
  ip_re = /^[\d]+\.[\d]+\.[\d]+\.[\d]+$/

  lambda {
    hosts.flat_map { |h|
      scheme = URI(h).scheme
      host = URI(h).host
      port = URI(h).port
      path = URI(h).path

      if ip_re !~ host
        resolver.get_addresses(host).map { |ip|
          "#{scheme}://#{ip}:#{port}#{path}"
        }
      else
        [h]
      end
    }
  }
end
receive(event) click to toggle source

This module currently does not support parallel requests as that would circumvent the batching

# File lib/logstash/outputs/clickhouse.rb, line 125
def receive(event)
  buffer_receive(event)
end
request_headers() click to toggle source
# File lib/logstash/outputs/clickhouse.rb, line 274
def request_headers()
  headers = @headers || {}
  headers["Content-Type"] ||= "application/json"
  headers
end
save_to_disk(documents) click to toggle source
# File lib/logstash/outputs/clickhouse.rb, line 165
def save_to_disk(documents)
  begin
    file = File.open("#{save_dir}/#{table}_#{save_file}", "a")
    file.write(documents) 
  rescue IOError => e
    log_failure("An error occurred while saving file to disk: #{e}",
                  :file_name => file_name)
  ensure
    file.close unless file.nil?
  end
end