class LogStash::Outputs::Riak

Riak is a distributed k/v store from Basho. It's based on the Dynamo model.

Public Instance Methods

receive(event) click to toggle source
# File lib/logstash/outputs/riak.rb, line 122
def receive(event)
  
  
  @bucket.each do |b|
    # setup our bucket(s)
    bukkit = @client.bucket(event.sprintf(b))
    # Disable bucket props for now
    # Need to detect params passed that should be converted to int
    # otherwise setting props fails =(
    # Logstash syntax only supports strings and bools
    # likely fix is to either hack in is_numeric?
    # or whitelist certain params and call to_i
    ##@logger.debug("Setting bucket props", :props => @bucket_props)
    ##bukkit.props = @bucket_props if @bucket_props
    ##@logger.debug("Bucket", :bukkit => bukkit.inspect)
   
    if @enable_search
      @logger.debug("Enable search requested", :bucket => bukkit.inspect)
      # Check if search is enabled
      @logger.debug("Checking bucket status", :search_enabled => bukkit.is_indexed?)
      bukkit.enable_index! unless bukkit.is_indexed?
      @logger.debug("Rechecking bucket status", :search_enabled => bukkit.is_indexed?)
    end
    @key_name.nil? ? evt_key=nil : evt_key=event.sprintf(@key_name)
    evt = Riak::RObject.new(bukkit, evt_key)
    @logger.debug("RObject", :robject => evt.to_s)
    begin
      evt.content_type = "application/json"
      evt.data = event
      if @indices
        @indices.each do |k|
          idx_name = "#{k.gsub('@','')}_bin"
          @logger.debug("Riak index name", :idx => idx_name)
          @logger.info("Indexes", :indexes => evt.indexes.to_s)
          evt.indexes[idx_name] << event.sprintf("%{#{k}}")
        end
      end
      evt.store
    rescue Exception => e
      @logger.warn("Exception storing", :message => e.message)
    end
  end
end
register() click to toggle source

Metadata (NYI) Allow the user to set custom metadata on the object Should consider converting logstash data to metadata as well

# File lib/logstash/outputs/riak.rb, line 95
def register
  require 'riak'
  riak_opts = {}
  cluster_nodes = Array.new
  @logger.debug("Setting protocol", :protocol => @proto)
  proto_type = "#{@proto}_port".to_sym
  @nodes.each do |node,port|
    @logger.debug("Adding node", :node => node, :port => port)
    cluster_nodes << {:host => node, proto_type => port}
  end
  @logger.debug("Cluster nodes", :nodes => cluster_nodes)
  if @enable_ssl
    @logger.debug("SSL requested")
    if @ssl_opts
      @logger.debug("SSL options provided", @ssl_opts)
      riak_opts.merge!(@ssl_opts.inject({}) {|h,(k,v)| h[k.to_sym] = v; h})
    else
      riak_opts.merge!({:ssl => true})
    end
  @logger.debug("Riak options:", :riak_opts => riak_opts)
  end
  riak_opts.merge!({:nodes => cluster_nodes})
  @logger.debug("Riak options:", :riak_opts => riak_opts)
  @client = Riak::Client.new(riak_opts)
end