class LogStash::Inputs::Salesforce
This Logstash input plugin allows you to query Salesforce
using SOQL and puts the results into Logstash, one row per event. You can configure it to pull entire sObjects or only specific fields.
NOTE: This input plugin will stop after all the results of the query are processed and will need to be re-run to fetch new results. It does not utilize the streaming API.
In order to use this plugin, you will need to create a new SFDC Application using oauth. More details can be found here: help.salesforce.com/apex/HTViewHelpDoc?id=connected_app_create.htm
You will also need a username, password, and security token for your salesforce instance. More details for generating a token can be found here: help.salesforce.com/apex/HTViewHelpDoc?id=user_security_token.htm
In addition to specifying an sObject, you can also supply a list of API fields that will be used in the SOQL query.
Example¶ ↑
This example prints all the Salesforce
Opportunities to standard out
- source,ruby
input {
salesforce { client_id => 'OAUTH CLIENT ID FROM YOUR SFDC APP' client_secret => 'OAUTH CLIENT SECRET FROM YOUR SFDC APP' username => 'email@example.com' password => 'super-secret' security_token => 'SECURITY TOKEN FOR THIS USER' sfdc_object_name => 'Opportunity' }
}
output {
stdout { codec => rubydebug }
}
Public Instance Methods
# File lib/logstash/inputs/salesforce.rb, line 99 def register require 'restforce' obj_desc = client.describe(@sfdc_object_name) @sfdc_field_types = get_field_types(obj_desc) @sfdc_fields = get_all_fields if @sfdc_fields.empty? end
# File lib/logstash/inputs/salesforce.rb, line 107 def run(queue) results = client.query(get_query()) if results && results.first results.each do |result| event = LogStash::Event.new() decorate(event) @sfdc_fields.each do |field| field_type = @sfdc_field_types[field] value = result.send(field) event_key = @to_underscores ? underscore(field) : field if not value.nil? case field_type when 'datetime', 'date' event.set(event_key, format_time(value)) else event.set(event_key, value) end end end queue << event end end end
Private Instance Methods
# File lib/logstash/inputs/salesforce.rb, line 132 def client if @use_tooling_api @client ||= Restforce.tooling client_options else @client ||= Restforce.new client_options end end
# File lib/logstash/inputs/salesforce.rb, line 141 def client_options options = { :username => @username, :password => @password.value, :security_token => @security_token.value, :client_id => @client_id, :client_secret => @client_secret.value } # configure the endpoint to which restforce connects to for authentication if @sfdc_instance_url && @use_test_sandbox raise ::LogStash::ConfigurationError.new("Both \"use_test_sandbox\" and \"sfdc_instance_url\" can't be set simultaneously. Please specify either \"use_test_sandbox\" or \"sfdc_instance_url\"") elsif @sfdc_instance_url options.merge!({ :host => @sfdc_instance_url }) elsif @use_test_sandbox options.merge!({ :host => "test.salesforce.com" }) end options.merge!({ :api_version => @api_version }) if @api_version return options end
# File lib/logstash/inputs/salesforce.rb, line 198 def format_time(string) # salesforce can use different time formats so until we have a higher # performance requirement we can just use Time.parse # otherwise it's possible to use a sequence of DateTime.strptime, for example LogStash::Timestamp.new(Time.parse(string)) end
# File lib/logstash/inputs/salesforce.rb, line 183 def get_all_fields return @sfdc_field_types.keys end
# File lib/logstash/inputs/salesforce.rb, line 173 def get_field_types(obj_desc) field_types = {} obj_desc.fields.each do |f| field_types[f.name] = f.type end @logger.debug? && @logger.debug("Field types", :field_types => field_types.to_s) return field_types end
# File lib/logstash/inputs/salesforce.rb, line 162 def get_query() query = ["SELECT",@sfdc_fields.join(','), "FROM",@sfdc_object_name] query << ["WHERE",@sfdc_filters] unless @sfdc_filters.empty? query << "ORDER BY LastModifiedDate DESC" if @sfdc_fields.include?('LastModifiedDate') query_str = query.flatten.join(" ") @logger.debug? && @logger.debug("SFDC Query", :query => query_str) return query_str end
From stackoverflow.com/a/1509957/4701287
# File lib/logstash/inputs/salesforce.rb, line 189 def underscore(camel_cased_word) camel_cased_word.to_s.gsub(/::/, '/'). gsub(/([A-Z]+)([A-Z][a-z])/,'\1_\2'). gsub(/([a-z\d])([A-Z])/,'\1_\2'). tr("-", "_"). downcase end