class Fluent::Plugin::SolrOutput
Constants
- DEFAULT_BUFFER_TYPE
- DEFAULT_COLLECTION
- DEFAULT_COMMIT_WITH_FLUSH
- DEFAULT_FLUSH_SIZE
- DEFAULT_IGNORE_UNDEFINED_FIELDS
- DEFAULT_MILLISECOND
- DEFAULT_TAG_FIELD
- DEFAULT_TIME_FIELD
- DEFAULT_TIME_FORMAT
- MODE_SOLRCLOUD
- MODE_STANDALONE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_solr.rb, line 66 def initialize super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_solr.rb, line 70 def configure(conf) compat_parameters_convert(conf, :inject) super raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 112 def format(tag, time, record) [time, record].to_msgpack end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 116 def formatted_to_msgpack_binary true end
get_fields()
click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 214 def get_fields fields = [] begin response = nil if @mode == MODE_STANDALONE then response = @solr.get 'schema/fields' elsif @mode == MODE_SOLRCLOUD then response = @solr.get 'schema/fields', collection: @collection end response['fields'].each do |field| fields.push(field['name']) end log.debug "Fields: #{fields}" rescue Exception log.warn 'An error occurred while getting fields' end return fields end
get_unique_key()
click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 195 def get_unique_key unique_key = 'id' begin response = nil if @mode == MODE_STANDALONE then response = @solr.get 'schema/uniquekey' elsif @mode == MODE_SOLRCLOUD then response = @solr.get 'schema/uniquekey', collection: @collection end unique_key = response['uniqueKey'] log.debug "Unique key: #{unique_key}" rescue Exception log.warn 'An error occurred while getting unique key' end return unique_key end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 120 def multi_workers_ready? true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_solr.rb, line 104 def shutdown super unless @zk.nil? then @zk.close end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_solr.rb, line 76 def start super @mode = nil if ! @base_url.nil? then @mode = MODE_STANDALONE elsif ! @zk_host.nil? @mode = MODE_SOLRCLOUD end @solr = nil @zk = nil if @mode == MODE_STANDALONE then @solr = RSolr.connect :url => @base_url.end_with?('/') ? @base_url + @collection : @base_url + '/' + @collection elsif @mode == MODE_SOLRCLOUD then @zk = ZK.new(@zk_host) cloud_connection = RSolr::Cloud::Connection.new(@zk) @solr = RSolr::Client.new(cloud_connection, read_timeout: 60, open_timeout: 60) end # Get unique key field from Solr @unique_key = get_unique_key # Get fields from Solr @fields = get_fields end
update(documents)
click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 182 def update(documents) begin if @mode == MODE_STANDALONE then @solr.add documents, :params => {:commit => @commit_with_flush} elsif @mode == MODE_SOLRCLOUD then @solr.add documents, collection: @collection, :params => {:commit => @commit_with_flush} end log.debug "Sent #{documents.count} document(s) to Solr" rescue Exception log.warn "An error occurred while sending #{documents.count} document(s) to Solr" end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 124 def write(chunk) documents = [] # Get fluentd tag tag = chunk.metadata.tag chunk.msgpack_each do |time, record| record = inject_values_to_record(tag, time, record) # Set unique key and value unless record.has_key?(@unique_key) then record.merge!({@unique_key => SecureRandom.uuid}) end # Set Fluentd tag to Solr tag field unless record.has_key?(@tag_field) then record.merge!({@tag_field => tag}) end # Set time tmp_time = Time.at(time).utc if record.has_key?(@time_field) then # Parsing the time field in the record by the specified format. begin tmp_time = Time.strptime(record[@time_field], @time_format).utc rescue Exception => e log.warn "An error occurred in parsing the time field: #{e.message}" end end if @millisecond then record.merge!({@time_field => '%s.%03dZ' % [tmp_time.strftime('%FT%T'), tmp_time.usec / 1000.0]}) else record.merge!({@time_field => tmp_time.strftime('%FT%TZ')}) end # Ignore undefined fields if @ignore_undefined_fields then record.each_key do |key| unless @fields.include?(key) then record.delete(key) end end end # Add record to documents documents << record # Update when flash size is reached if documents.count >= @flush_size update documents documents.clear end end # Update remaining documents update documents unless documents.empty? end