class Fluent::Plugin::SolrOutput

Constants

DEFAULT_BUFFER_TYPE
DEFAULT_COLLECTION
DEFAULT_COMMIT_WITH_FLUSH
DEFAULT_FLUSH_SIZE
DEFAULT_IGNORE_UNDEFINED_FIELDS
DEFAULT_MILLISECOND
DEFAULT_TAG_FIELD
DEFAULT_TIME_FIELD
DEFAULT_TIME_FORMAT
MODE_SOLRCLOUD
MODE_STANDALONE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_solr.rb, line 66
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_solr.rb, line 70
def configure(conf)
  compat_parameters_convert(conf, :inject)
  super
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 112
def format(tag, time, record)
  [time, record].to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 116
def formatted_to_msgpack_binary
  true
end
get_fields() click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 214
def get_fields
  fields = []

  begin
    response = nil

    if @mode == MODE_STANDALONE then
      response = @solr.get 'schema/fields'
    elsif @mode == MODE_SOLRCLOUD then
      response = @solr.get 'schema/fields', collection: @collection
    end
    response['fields'].each do |field|
      fields.push(field['name'])
    end
    log.debug "Fields: #{fields}"
  rescue Exception
    log.warn 'An error occurred while getting fields'
  end

  return fields
end
get_unique_key() click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 195
def get_unique_key
  unique_key = 'id'

  begin
    response = nil
    if @mode == MODE_STANDALONE then
      response = @solr.get 'schema/uniquekey'
    elsif @mode == MODE_SOLRCLOUD then
      response = @solr.get 'schema/uniquekey', collection: @collection
    end
    unique_key = response['uniqueKey']
    log.debug "Unique key: #{unique_key}"
  rescue Exception
    log.warn 'An error occurred while getting unique key'
  end

  return unique_key
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 120
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_solr.rb, line 104
def shutdown
  super

  unless @zk.nil? then
    @zk.close
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_solr.rb, line 76
def start
  super

  @mode = nil
  if ! @base_url.nil? then
    @mode = MODE_STANDALONE
  elsif ! @zk_host.nil?
    @mode = MODE_SOLRCLOUD
  end

  @solr = nil
  @zk = nil

  if @mode == MODE_STANDALONE then
    @solr = RSolr.connect :url => @base_url.end_with?('/') ? @base_url + @collection : @base_url + '/' + @collection
  elsif @mode == MODE_SOLRCLOUD then
    @zk = ZK.new(@zk_host)
    cloud_connection = RSolr::Cloud::Connection.new(@zk)
    @solr = RSolr::Client.new(cloud_connection, read_timeout: 60, open_timeout: 60)
  end

  # Get unique key field from Solr
  @unique_key = get_unique_key

  # Get fields from Solr
  @fields = get_fields
end
update(documents) click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 182
def update(documents)
  begin
    if @mode == MODE_STANDALONE then
      @solr.add documents, :params => {:commit => @commit_with_flush}
    elsif @mode == MODE_SOLRCLOUD then
      @solr.add documents, collection: @collection, :params => {:commit => @commit_with_flush}
    end
    log.debug "Sent #{documents.count} document(s) to Solr"
  rescue Exception
    log.warn "An error occurred while sending #{documents.count} document(s) to Solr"
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_solr.rb, line 124
def write(chunk)
  documents = []

  # Get fluentd tag
  tag = chunk.metadata.tag

  chunk.msgpack_each do |time, record|
    record = inject_values_to_record(tag, time, record)

    # Set unique key and value
    unless record.has_key?(@unique_key) then
      record.merge!({@unique_key => SecureRandom.uuid})
    end

    # Set Fluentd tag to Solr tag field
    unless record.has_key?(@tag_field) then
      record.merge!({@tag_field => tag})
    end

    # Set time
    tmp_time = Time.at(time).utc
    if record.has_key?(@time_field) then
      # Parsing the time field in the record by the specified format.
      begin
        tmp_time = Time.strptime(record[@time_field], @time_format).utc
      rescue Exception => e
        log.warn "An error occurred in parsing the time field: #{e.message}"
      end
    end
    if @millisecond then
      record.merge!({@time_field => '%s.%03dZ' % [tmp_time.strftime('%FT%T'), tmp_time.usec / 1000.0]})
    else
      record.merge!({@time_field => tmp_time.strftime('%FT%TZ')})
    end

    # Ignore undefined fields
    if @ignore_undefined_fields then
      record.each_key do |key|
        unless @fields.include?(key) then
          record.delete(key)
        end
      end
    end

    # Add record to documents
    documents << record

    # Update when flash size is reached
    if documents.count >= @flush_size
      update documents
      documents.clear
    end
  end

  # Update remaining documents
  update documents unless documents.empty?
end