class LogStash::Inputs::Gemfire

Push events to a GemFire region.

GemFire is an object database.

To use this plugin you need to add gemfire.jar to your CLASSPATH. Using format=json requires jackson.jar too; use of continuous queries requires antlr.jar.

Note: this plugin has only been tested with GemFire 7.0.

Public Instance Methods

close() click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 86
def close
  @cache.close if @cache
  @cache = nil
  finished
end
register() click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 58
def register
  import com.gemstone.gemfire.cache.AttributesMutator
  import com.gemstone.gemfire.cache.InterestResultPolicy
  import com.gemstone.gemfire.cache.client.ClientCacheFactory
  import com.gemstone.gemfire.cache.client.ClientRegionShortcut
  import com.gemstone.gemfire.cache.query.CqQuery
  import com.gemstone.gemfire.cache.query.CqAttributes
  import com.gemstone.gemfire.cache.query.CqAttributesFactory
  import com.gemstone.gemfire.cache.query.QueryService
  import com.gemstone.gemfire.cache.query.SelectResults
  import com.gemstone.gemfire.pdx.JSONFormatter

  @logger.info("Registering input", :plugin => self)
end
run(queue) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 73
def run(queue)
  return if terminating?
  connect

  @logstash_queue = queue

  if @query
    continuous_query(@query)
  else
    register_interest(@interest_regexp)
  end
end

Protected Instance Methods

afterCreate(event) click to toggle source

CacheListener interface

# File lib/logstash/inputs/gemfire.rb, line 175
def afterCreate(event)
  regionName = event.getRegion.getName
  key = event.getKey
  newValue = event.getNewValue
  @logger.debug("afterCreate #{regionName} #{key} #{newValue}")

  process_event(event.getNewValue, "afterCreate", "gemfire://#{regionName}/#{key}/afterCreate")
end
afterDestroy(event) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 184
def afterDestroy(event)
  regionName = event.getRegion.getName
  key = event.getKey
  newValue = event.getNewValue
  @logger.debug("afterDestroy #{regionName} #{key} #{newValue}")

  process_event(nil, "afterDestroy", "gemfire://#{regionName}/#{key}/afterDestroy")
end
afterRegionClear(event) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 211
def afterRegionClear(event)
  @logger.debug("afterRegionClear #{event}")
end
afterRegionCreate(event) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 207
def afterRegionCreate(event)
  @logger.debug("afterRegionCreate #{event}")
end
afterRegionDestroy(event) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 215
def afterRegionDestroy(event)
  @logger.debug("afterRegionDestroy #{event}")
end
afterRegionInvalidate(event) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 219
def afterRegionInvalidate(event)
  @logger.debug("afterRegionInvalidate #{event}")
end
afterRegionLive(event) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 203
def afterRegionLive(event)
  @logger.debug("afterRegionLive #{event}")
end
afterUpdate(event) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 193
def afterUpdate(event)
  regionName = event.getRegion.getName
  key = event.getKey
  oldValue = event.getOldValue
  newValue = event.getNewValue
  @logger.debug("afterUpdate #{regionName} #{key} #{oldValue} -> #{newValue}")

  process_event(event.getNewValue, "afterUpdate", "gemfire://#{regionName}/#{key}/afterUpdate")
end
connect() click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 93
def connect
  begin
    @logger.debug("Connecting to GemFire #{@cache_name}")

    @cache = ClientCacheFactory.new.
      set("name", @cache_name).
      set("cache-xml-file", @cache_xml_file).create
    @logger.debug("Created cache #{@cache.inspect}")

  rescue => e
    if terminating?
      return
    else
      @logger.error("Gemfire connection error (during connect), will reconnect",
                    :exception => e, :backtrace => e.backtrace)
      sleep(1)
      retry
    end
  end

  @region = @cache.getRegion(@region_name);
  @logger.debug("Created region #{@region.inspect}")
end
continuous_query(query) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 118
def continuous_query(query)
  qs = @cache.getQueryService

  cqAf = CqAttributesFactory.new
  cqAf.addCqListener(self)
  cqa = cqAf.create

  @logger.debug("Running continuous query #{query}")
  cq = qs.newCq("logstashCQ" + self.object_id.to_s, query, cqa)

  cq.executeWithInitialResults
end
deserialize_message(message) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 136
def deserialize_message(message)
  if @serialization == "json"
    message ? JSONFormatter.toJSON(message) : "{}"
  else
    message
  end
end
onError(event) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 167
def onError(event)
  @logger.debug("onError #{event}")
end
onEvent(event) click to toggle source

CqListener interface

# File lib/logstash/inputs/gemfire.rb, line 159
def onEvent(event)
  key = event.getKey
  newValue = event.getNewValue
  @logger.debug("onEvent #{event.getQueryOperation} #{key} #{newValue}")

  process_event(event.getNewValue, "onEvent", "gemfire://query/#{key}/#{event.getQueryOperation}")
end
process_event(event, event_name) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 144
def process_event(event, event_name)
  message = deserialize_message(event)
  @codec.decode(message) do |event|
    decorate(event)
    @logstash_queue << event
  end
end
register_interest(interest) click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 131
def register_interest(interest)
  @region.getAttributesMutator.addCacheListener(self)
  @region.registerInterestRegex(interest, InterestResultPolicy::NONE, false, true)
end