class Franz::Output::HTTP
Public Class Methods
new(opts={})
click to toggle source
Start a new output in the background. We’ll consume from the input queue and ship events via HTTP
.
@param [Hash] opts options for the output @option opts [Queue] :input ([]) “input” queue @option opts [Queue] :output ([]) “output” configuration
# File lib/franz/output/http.rb, line 21 def initialize opts={} opts = { logger: Logger.new(STDOUT), tags: [], input: [], output: { uri: 'http://user:pass@localhost:3000/v2/.theon', flush_size: 500, flush_interval: 10, ssl: { cert_file: nil, key_file: nil, ca_file: nil, verify_mode: nil } } }.deep_merge!(opts) @statz = opts[:statz] || Franz::Stats.new @statz.create :num_output, 0 @logger = opts[:logger] @stop = false @foreground = opts[:foreground] uri = opts[:output].delete(:uri) || opts[:output].delete(:server) @uri = URI(uri) @ssl = if @uri.scheme =~ /https/ opts[:output].delete :ssl end open_uri @flush_size = opts[:output][:flush_size] @flush_interval = opts[:output][:flush_interval] @lock = Mutex.new @messages = [] Thread.new do until @stop @lock.synchronize do flush_messages true end sleep @flush_interval end end @thread = Thread.new do until @stop event = opts[:input].shift unless opts[:tags].empty? event['tags'] ||= [] event['tags'] += opts[:tags] end payload = JSON::generate event @lock.synchronize do enqueue payload end log.debug \ event: 'publish', raw: event end end log.info event: 'output started' @thread.join if @foreground end
Public Instance Methods
join()
click to toggle source
Join the Output
thread. Effectively only once.
# File lib/franz/output/http.rb, line 95 def join return if @foreground @foreground = true @thread.join end
stop()
click to toggle source
Stop the Output
thread. Effectively only once.
# File lib/franz/output/http.rb, line 103 def stop return if @foreground @foreground = true @thread.kill log.info event: 'output stopped' end
Private Instance Methods
emit(body)
click to toggle source
# File lib/franz/output/http.rb, line 165 def emit body request = Net::HTTP::Post.new(@uri) request.basic_auth @uri.user, @uri.password if @uri.user request.body = body @http.request(request) rescue EOFError, Errno::ECONNREFUSED, Errno::EPIPE log.warn event: 'output dropped' open_uri sleep 1 retry end
enqueue(event)
click to toggle source
# File lib/franz/output/http.rb, line 150 def enqueue event @messages << event flush_messages end
flush_messages(force=false)
click to toggle source
# File lib/franz/output/http.rb, line 155 def flush_messages force=false size = @messages.length return if size.zero? if force || size >= @flush_size emit @messages.join("\n") @statz.inc :num_output, size @messages.clear end end
log()
click to toggle source
# File lib/franz/output/http.rb, line 112 def log ; @logger end
open_uri()
click to toggle source
# File lib/franz/output/http.rb, line 114 def open_uri @http = Net::HTTP.new(@uri.host, @uri.port) if @ssl @http.use_ssl = true if cert_file = @ssl['cert_file'] cert = File.read cert_file @http.cert = OpenSSL::X509::Certificate.new(cert) end if key_file = @ssl['key_file'] key = File.read key_file @http.key = OpenSSL::PKey::RSA.new(key) end if @ssl['ca_file'] @http.ca_file = @ssl['ca_file'] end case @ssl['verify_mode'] when 'verify_peer' @http.verify_mode = OpenSSL::SSL::VERIFY_PEER when 'verify_none' @http.verify_mode = OpenSSL::SSL::VERIFY_NONE when 'verify_client_once' @http.verify_mode = OpenSSL::SSL::VERIFY_CLIENT_ONCE when 'verify_fail_if_no_peer_cert' @http.verify_mode = OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT when nil else raise 'Invalid "verify_mode" specified' end end end