class LogStash::Inputs::Gemfire
Push events to a GemFire region.
GemFire is an object database.
To use this plugin you need to add gemfire.jar to your CLASSPATH. Using format=json requires jackson.jar too; use of continuous queries requires antlr.jar.
Note: this plugin has only been tested with GemFire 7.0.
Public Instance Methods
close()
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 86 def close @cache.close if @cache @cache = nil finished end
register()
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 58 def register import com.gemstone.gemfire.cache.AttributesMutator import com.gemstone.gemfire.cache.InterestResultPolicy import com.gemstone.gemfire.cache.client.ClientCacheFactory import com.gemstone.gemfire.cache.client.ClientRegionShortcut import com.gemstone.gemfire.cache.query.CqQuery import com.gemstone.gemfire.cache.query.CqAttributes import com.gemstone.gemfire.cache.query.CqAttributesFactory import com.gemstone.gemfire.cache.query.QueryService import com.gemstone.gemfire.cache.query.SelectResults import com.gemstone.gemfire.pdx.JSONFormatter @logger.info("Registering input", :plugin => self) end
run(queue)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 73 def run(queue) return if terminating? connect @logstash_queue = queue if @query continuous_query(@query) else register_interest(@interest_regexp) end end
Protected Instance Methods
afterCreate(event)
click to toggle source
CacheListener interface
# File lib/logstash/inputs/gemfire.rb, line 175 def afterCreate(event) regionName = event.getRegion.getName key = event.getKey newValue = event.getNewValue @logger.debug("afterCreate #{regionName} #{key} #{newValue}") process_event(event.getNewValue, "afterCreate", "gemfire://#{regionName}/#{key}/afterCreate") end
afterDestroy(event)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 184 def afterDestroy(event) regionName = event.getRegion.getName key = event.getKey newValue = event.getNewValue @logger.debug("afterDestroy #{regionName} #{key} #{newValue}") process_event(nil, "afterDestroy", "gemfire://#{regionName}/#{key}/afterDestroy") end
afterRegionClear(event)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 211 def afterRegionClear(event) @logger.debug("afterRegionClear #{event}") end
afterRegionCreate(event)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 207 def afterRegionCreate(event) @logger.debug("afterRegionCreate #{event}") end
afterRegionDestroy(event)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 215 def afterRegionDestroy(event) @logger.debug("afterRegionDestroy #{event}") end
afterRegionInvalidate(event)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 219 def afterRegionInvalidate(event) @logger.debug("afterRegionInvalidate #{event}") end
afterRegionLive(event)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 203 def afterRegionLive(event) @logger.debug("afterRegionLive #{event}") end
afterUpdate(event)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 193 def afterUpdate(event) regionName = event.getRegion.getName key = event.getKey oldValue = event.getOldValue newValue = event.getNewValue @logger.debug("afterUpdate #{regionName} #{key} #{oldValue} -> #{newValue}") process_event(event.getNewValue, "afterUpdate", "gemfire://#{regionName}/#{key}/afterUpdate") end
connect()
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 93 def connect begin @logger.debug("Connecting to GemFire #{@cache_name}") @cache = ClientCacheFactory.new. set("name", @cache_name). set("cache-xml-file", @cache_xml_file).create @logger.debug("Created cache #{@cache.inspect}") rescue => e if terminating? return else @logger.error("Gemfire connection error (during connect), will reconnect", :exception => e, :backtrace => e.backtrace) sleep(1) retry end end @region = @cache.getRegion(@region_name); @logger.debug("Created region #{@region.inspect}") end
continuous_query(query)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 118 def continuous_query(query) qs = @cache.getQueryService cqAf = CqAttributesFactory.new cqAf.addCqListener(self) cqa = cqAf.create @logger.debug("Running continuous query #{query}") cq = qs.newCq("logstashCQ" + self.object_id.to_s, query, cqa) cq.executeWithInitialResults end
deserialize_message(message)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 136 def deserialize_message(message) if @serialization == "json" message ? JSONFormatter.toJSON(message) : "{}" else message end end
onError(event)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 167 def onError(event) @logger.debug("onError #{event}") end
onEvent(event)
click to toggle source
CqListener interface
# File lib/logstash/inputs/gemfire.rb, line 159 def onEvent(event) key = event.getKey newValue = event.getNewValue @logger.debug("onEvent #{event.getQueryOperation} #{key} #{newValue}") process_event(event.getNewValue, "onEvent", "gemfire://query/#{key}/#{event.getQueryOperation}") end
process_event(event, event_name)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 144 def process_event(event, event_name) message = deserialize_message(event) @codec.decode(message) do |event| decorate(event) @logstash_queue << event end end
register_interest(interest)
click to toggle source
# File lib/logstash/inputs/gemfire.rb, line 131 def register_interest(interest) @region.getAttributesMutator.addCacheListener(self) @region.registerInterestRegex(interest, InterestResultPolicy::NONE, false, true) end