class LogStash::Filters::Memcached

This filter provides facilities to interact with Memcached.

Attributes

cache[R]

Public Instance Methods

close() click to toggle source
# File lib/logstash/filters/memcached.rb, line 115
def close
  @connection_mutex.synchronize do
    @connected.make_false
    cache.close
  end
rescue => e
  # we basically ignore any error here as we may be trying to close an invalid
  # connection or if we close on shutdown we can also ignore any error
  logger.debug("error closing memcached connection", :message => e.message)
end
filter(event) click to toggle source
# File lib/logstash/filters/memcached.rb, line 93
def filter(event)
  unless connection_available?
    event.tag(@tag_on_failure)
    return
  end

  begin
    set_success = do_set(event)
    get_success = do_get(event)
    filter_matched(event) if (set_success || get_success)
  rescue Dalli::NetworkError, Dalli::RingError => e
    event.tag(@tag_on_failure)
    logger.error("memcached communication error",  hosts: @memcached_hosts, options: @memcached_options, message: e.message)
    close
  rescue => e
    meta = { :message => e.message }
    meta[:backtrace] = e.backtrace if logger.debug?
    logger.error("unexpected error", meta)
    event.tag(@tag_on_failure)
  end
end
register() click to toggle source
# File lib/logstash/filters/memcached.rb, line 78
def register
  raise(LogStash::ConfigurationError, "'ttl' option cannot be negative") if @ttl < 0

  @memcached_hosts = validate_connection_hosts
  @memcached_options = validate_connection_options
  begin
    @cache = new_connection(@memcached_hosts, @memcached_options)
  rescue => e
    logger.error("failed to connect to memcached", hosts: @memcached_hosts, options: @memcached_options, message: e.message)
    fail("failed to connect to memcached")
  end
  @connected = Concurrent::AtomicBoolean.new(true)
  @connection_mutex = Mutex.new
end

Private Instance Methods

connection_available?() click to toggle source
# File lib/logstash/filters/memcached.rb, line 193
def connection_available?
  # this method is called at every #filter method invocation and to minimize synchronization cost
  # only @connected if fist check. The tradeoff is that another worker connection could be in the
  # process of failing and @connected will not yet reflect that but this is acceptable for performance reason.
  return true if @connected.true?

  @connection_mutex.synchronize do
    # the reconnection process is exclusive and will not be be concurrently performed in another worker
    # by re-verifying the state of @connected from the exclusive code.
    return @connected.true? ? true : reconnect(@memcached_hosts, @memcached_options)
  end
end
context(hash={}) click to toggle source
# File lib/logstash/filters/memcached.rb, line 218
def context(hash={})
  @plugin_context ||= Hash.new.tap do |hash|
    hash[:namespace] = @namespace unless @namespace.nil? or @namespace.empty?
  end
  return hash if @plugin_context.empty?

  @plugin_context.merge(hash)
end
do_get(event) click to toggle source
# File lib/logstash/filters/memcached.rb, line 128
def do_get(event)
  return false unless @get && !@get.empty?

  event_fields_by_memcached_key = @get.each_with_object({}) do |(memcached_key_template, event_field), memo|
    memcached_key = event.sprintf(memcached_key_template)
    memo[memcached_key] = event_field
  end

  memcached_keys = event_fields_by_memcached_key.keys
  cache_hits_by_memcached_key = cache.get_multi(memcached_keys)

  cache_hits = 0
  event_fields_by_memcached_key.each do |memcached_key, event_field|
    value = cache_hits_by_memcached_key[memcached_key]
    if value.nil?
      logger.trace("cache:get miss", context(key: memcached_key))
    else
      logger.trace("cache:get hit", context(key: memcached_key, value: value))
      cache_hits += 1
      event.set(event_field, value)
    end
  end

  return cache_hits > 0
end
do_set(event) click to toggle source
# File lib/logstash/filters/memcached.rb, line 154
def do_set(event)
  return false unless @set && !@set.empty?

  values_by_memcached_key = @set.each_with_object({}) do |(event_field, memcached_key_template), memo|
    memcached_key = event.sprintf(memcached_key_template)
    value = event.get(event_field)

    memo[memcached_key] = value unless value.nil?
  end

  return false if values_by_memcached_key.empty?

  cache.multi do
    values_by_memcached_key.each do |memcached_key, value|
      logger.trace("cache:set", context(key: memcached_key, value: value))
      cache.set(memcached_key, value)
    end
  end

  return true
end
new_connection(hosts, options) click to toggle source
# File lib/logstash/filters/memcached.rb, line 176
def new_connection(hosts, options)
  logger.debug('connecting to memcached', context(hosts: hosts, options: options))
  Dalli::Client.new(hosts, options).tap { |client| client.alive! }
end
reconnect(hosts, options) click to toggle source

reconnect is not thread safe

# File lib/logstash/filters/memcached.rb, line 182
def reconnect(hosts, options)
  begin
    @cache = new_connection(hosts, options)
    @connected.make_true
  rescue => e
    logger.error("failed to reconnect to memcached", hosts: hosts, options: options, message: e.message)
    @connected.make_false
  end
  return @connected.value
end
validate_connection_hosts() click to toggle source
# File lib/logstash/filters/memcached.rb, line 213
def validate_connection_hosts
  raise(LogStash::ConfigurationError, "'hosts' cannot be empty") if @hosts.empty?
  @hosts.map(&:to_s)
end
validate_connection_options() click to toggle source
# File lib/logstash/filters/memcached.rb, line 206
def validate_connection_options
  {}.tap do |options|
    options[:expires_in] = @ttl
    options[:namespace] = @namespace unless @namespace.nil? || @namespace.empty?
  end
end