class LogStash::Filters::Memcached
This filter provides facilities to interact with Memcached
.
Attributes
cache[R]
Public Instance Methods
close()
click to toggle source
# File lib/logstash/filters/memcached.rb, line 115 def close @connection_mutex.synchronize do @connected.make_false cache.close end rescue => e # we basically ignore any error here as we may be trying to close an invalid # connection or if we close on shutdown we can also ignore any error logger.debug("error closing memcached connection", :message => e.message) end
filter(event)
click to toggle source
# File lib/logstash/filters/memcached.rb, line 93 def filter(event) unless connection_available? event.tag(@tag_on_failure) return end begin set_success = do_set(event) get_success = do_get(event) filter_matched(event) if (set_success || get_success) rescue Dalli::NetworkError, Dalli::RingError => e event.tag(@tag_on_failure) logger.error("memcached communication error", hosts: @memcached_hosts, options: @memcached_options, message: e.message) close rescue => e meta = { :message => e.message } meta[:backtrace] = e.backtrace if logger.debug? logger.error("unexpected error", meta) event.tag(@tag_on_failure) end end
register()
click to toggle source
# File lib/logstash/filters/memcached.rb, line 78 def register raise(LogStash::ConfigurationError, "'ttl' option cannot be negative") if @ttl < 0 @memcached_hosts = validate_connection_hosts @memcached_options = validate_connection_options begin @cache = new_connection(@memcached_hosts, @memcached_options) rescue => e logger.error("failed to connect to memcached", hosts: @memcached_hosts, options: @memcached_options, message: e.message) fail("failed to connect to memcached") end @connected = Concurrent::AtomicBoolean.new(true) @connection_mutex = Mutex.new end
Private Instance Methods
connection_available?()
click to toggle source
# File lib/logstash/filters/memcached.rb, line 193 def connection_available? # this method is called at every #filter method invocation and to minimize synchronization cost # only @connected if fist check. The tradeoff is that another worker connection could be in the # process of failing and @connected will not yet reflect that but this is acceptable for performance reason. return true if @connected.true? @connection_mutex.synchronize do # the reconnection process is exclusive and will not be be concurrently performed in another worker # by re-verifying the state of @connected from the exclusive code. return @connected.true? ? true : reconnect(@memcached_hosts, @memcached_options) end end
context(hash={})
click to toggle source
# File lib/logstash/filters/memcached.rb, line 218 def context(hash={}) @plugin_context ||= Hash.new.tap do |hash| hash[:namespace] = @namespace unless @namespace.nil? or @namespace.empty? end return hash if @plugin_context.empty? @plugin_context.merge(hash) end
do_get(event)
click to toggle source
# File lib/logstash/filters/memcached.rb, line 128 def do_get(event) return false unless @get && !@get.empty? event_fields_by_memcached_key = @get.each_with_object({}) do |(memcached_key_template, event_field), memo| memcached_key = event.sprintf(memcached_key_template) memo[memcached_key] = event_field end memcached_keys = event_fields_by_memcached_key.keys cache_hits_by_memcached_key = cache.get_multi(memcached_keys) cache_hits = 0 event_fields_by_memcached_key.each do |memcached_key, event_field| value = cache_hits_by_memcached_key[memcached_key] if value.nil? logger.trace("cache:get miss", context(key: memcached_key)) else logger.trace("cache:get hit", context(key: memcached_key, value: value)) cache_hits += 1 event.set(event_field, value) end end return cache_hits > 0 end
do_set(event)
click to toggle source
# File lib/logstash/filters/memcached.rb, line 154 def do_set(event) return false unless @set && !@set.empty? values_by_memcached_key = @set.each_with_object({}) do |(event_field, memcached_key_template), memo| memcached_key = event.sprintf(memcached_key_template) value = event.get(event_field) memo[memcached_key] = value unless value.nil? end return false if values_by_memcached_key.empty? cache.multi do values_by_memcached_key.each do |memcached_key, value| logger.trace("cache:set", context(key: memcached_key, value: value)) cache.set(memcached_key, value) end end return true end
new_connection(hosts, options)
click to toggle source
# File lib/logstash/filters/memcached.rb, line 176 def new_connection(hosts, options) logger.debug('connecting to memcached', context(hosts: hosts, options: options)) Dalli::Client.new(hosts, options).tap { |client| client.alive! } end
reconnect(hosts, options)
click to toggle source
reconnect is not thread safe
# File lib/logstash/filters/memcached.rb, line 182 def reconnect(hosts, options) begin @cache = new_connection(hosts, options) @connected.make_true rescue => e logger.error("failed to reconnect to memcached", hosts: hosts, options: options, message: e.message) @connected.make_false end return @connected.value end
validate_connection_hosts()
click to toggle source
# File lib/logstash/filters/memcached.rb, line 213 def validate_connection_hosts raise(LogStash::ConfigurationError, "'hosts' cannot be empty") if @hosts.empty? @hosts.map(&:to_s) end
validate_connection_options()
click to toggle source
# File lib/logstash/filters/memcached.rb, line 206 def validate_connection_options {}.tap do |options| options[:expires_in] = @ttl options[:namespace] = @namespace unless @namespace.nil? || @namespace.empty? end end