class LogStash::Outputs::HttpCodec
Constants
- RETRYABLE_MANTICORE_EXCEPTIONS
- VALID_METHODS
Attributes
is_batch[RW]
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 279 def close @timer.cancel client.close end
log_error_response(response, url, event)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 154 def log_error_response(response, url, event) log_failure( "Encountered non-2xx HTTP code #{response.code}", :response_code => response.code, :url => url, :event => event ) end
log_retryable_response(response)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 146 def log_retryable_response(response) if (response.code == 429) @logger.debug? && @logger.debug("Encountered a 429 response, will retry. This is not serious, just flow control via HTTP") else @logger.warn("Encountered a retryable HTTP request in HTTP output, will retry", :code => response.code, :body => response.body) end end
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 128 def multi_receive(events) return if events.empty? send_events(events) end
register()
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 92 def register @codec.on_event do |event, payload| payload end @http_method = @http_method.to_sym # We count outstanding requests with this queue # This queue tracks the requests to create backpressure # When this queue is empty no new requests may be sent, # tokens must be added back by the client on success @request_tokens = SizedQueue.new(@pool_max) @pool_max.times {|t| @request_tokens << true } @requests = Array.new if @content_type.nil? case @format when "form" ; @content_type = "application/x-www-form-urlencoded" when "json" ; @content_type = "application/json" when "json_batch" ; @content_type = "application/json" when "message" ; @content_type = "text/plain" when "codec" ; @content_type = "text/plain" end end @is_batch = @format == "json_batch" @headers["Content-Type"] = @content_type validate_format! # Run named Timer as daemon thread @timer = java.util.Timer.new("HTTP Output #{self.params['id']}", true) end
send_event(event, attempt)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 231 def send_event(event, attempt) body = event_body(event) # Send the request url = @is_batch ? @url : event.sprintf(@url) headers = @is_batch ? @headers : event_headers(event) # Compress the body and add appropriate header if @http_compression == true headers["Content-Encoding"] = "gzip" body = gzip(body) end # Create an async request response = client.send(@http_method, url, :body => body, :headers => headers).call if !response_success?(response) if retryable_response?(response) log_retryable_response(response) return :retry, event, attempt else log_error_response(response, url, event) return :failure, event, attempt end else return :success, event, attempt end rescue => exception will_retry = retryable_exception?(exception) log_failure("Could not fetch URL", :url => url, :method => @http_method, :body => body, :headers => headers, :message => exception.message, :class => exception.class.name, :backtrace => exception.backtrace, :will_retry => will_retry ) if will_retry return :retry, event, attempt else return :failure, event, attempt end end
send_events(events)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 163 def send_events(events) successes = java.util.concurrent.atomic.AtomicInteger.new(0) failures = java.util.concurrent.atomic.AtomicInteger.new(0) retries = java.util.concurrent.atomic.AtomicInteger.new(0) event_count = @is_batch ? 1 : events.size pending = Queue.new if @is_batch pending << [events, 0] else events.each {|e| pending << [e, 0]} end while popped = pending.pop break if popped == :done event, attempt = popped action, event, attempt = send_event(event, attempt) begin action = :failure if action == :retry && !@retry_failed case action when :success successes.incrementAndGet when :retry retries.incrementAndGet next_attempt = attempt+1 sleep_for = sleep_for_attempt(next_attempt) @logger.info("Retrying http request, will sleep for #{sleep_for} seconds") timer_task = RetryTimerTask.new(pending, event, next_attempt) @timer.schedule(timer_task, sleep_for*1000) when :failure failures.incrementAndGet else raise "Unknown action #{action}" end if action == :success || action == :failure if successes.get+failures.get == event_count pending << :done end end rescue => e # This should never happen unless there's a flat out bug in the code @logger.error("Error sending HTTP Request", :class => e.class.name, :message => e.message, :backtrace => e.backtrace) failures.incrementAndGet raise e end end rescue => e @logger.error("Error in http output loop", :class => e.class.name, :message => e.message, :backtrace => e.backtrace) raise e end
sleep_for_attempt(attempt)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 225 def sleep_for_attempt(attempt) sleep_for = attempt**2 sleep_for = sleep_for <= 60 ? sleep_for : 60 (sleep_for/2) + (rand(0..sleep_for)/2) end
Private Instance Methods
convert_mapping(mapping, event)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 331 def convert_mapping(mapping, event) if mapping.is_a?(Hash) mapping.reduce({}) do |acc, kv| k, v = kv acc[k] = convert_mapping(v, event) acc end elsif mapping.is_a?(Array) mapping.map { |elem| convert_mapping(elem, event) } else event.sprintf(mapping) end end
custom_headers(event)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 357 def custom_headers(event) return nil unless @headers @headers.reduce({}) do |acc,kv| k,v = kv acc[k] = event.sprintf(v) acc end end
encode(hash)
click to toggle source
TODO Extract this to a codec
# File lib/logstash/outputs/httpcodec.rb, line 368 def encode(hash) return hash.collect do |key, value| CGI.escape(key) + "=" + CGI.escape(value.to_s) end.join("&") end
event_body(event)
click to toggle source
Format the HTTP body
# File lib/logstash/outputs/httpcodec.rb, line 306 def event_body(event) # TODO: Create an HTTP post data codec, use that here if @format == "json" LogStash::Json.dump(map_event(event)) elsif @format == "message" event.sprintf(@message) elsif @format == "codec" @codec.encode(event) elsif @format == "json_batch" LogStash::Json.dump(event.map {|e| map_event(e) }) else encode(map_event(event)) end end
event_headers(event)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 353 def event_headers(event) custom_headers(event) || {} end
gzip(data)
click to toggle source
gzip data
# File lib/logstash/outputs/httpcodec.rb, line 322 def gzip(data) gz = StringIO.new gz.set_encoding("BINARY") z = Zlib::GzipWriter.new(gz) z.write(data) z.close gz.string end
log_failure(message, opts)
click to toggle source
This is split into a separate method mostly to help testing
# File lib/logstash/outputs/httpcodec.rb, line 301 def log_failure(message, opts) @logger.error("[HTTP Output Failure] #{message}", opts) end
map_event(event)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 345 def map_event(event) if @mapping convert_mapping(@mapping, event) else event.to_hash end end
response_success?(response)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 286 def response_success?(response) code = response.code return true if @ignorable_codes && @ignorable_codes.include?(code) return code >= 200 && code <= 299 end
retryable_exception?(exception)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 296 def retryable_exception?(exception) RETRYABLE_MANTICORE_EXCEPTIONS.any? {|me| exception.is_a?(me) } end
retryable_response?(response)
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 292 def retryable_response?(response) @retryable_codes && @retryable_codes.include?(response.code) end
validate_format!()
click to toggle source
# File lib/logstash/outputs/httpcodec.rb, line 375 def validate_format! if @format == "message" if @message.nil? raise "message must be set if message format is used" end if @content_type.nil? raise "content_type must be set if message format is used" end unless @mapping.nil? @logger.warn "mapping is not supported and will be ignored if message format is used" end end end