class Fluent::Plugin::MysqlReplicatorElasticsearchOutput

Constants

DEFAULT_BUFFER_TYPE
DEFAULT_TAG_FORMAT

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb, line 25
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb, line 29
def configure(conf)
  super

  if @tag_format.nil? || @tag_format == DEFAULT_TAG_FORMAT
    @tag_format = DEFAULT_TAG_FORMAT
  else
    @tag_format = Regexp.new(conf['tag_format'])
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb, line 43
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb, line 55
def formatted_to_msgpack_binary?
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb, line 51
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb, line 47
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb, line 39
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb, line 59
def write(chunk)
  bulk_message = []

  chunk.msgpack_each do |tag, time, record|
    tag_parts = tag.match(@tag_format)
    target_index = tag_parts['index_name']
    target_type = tag_parts['type_name']
    id_key = tag_parts['primary_key']

    if tag_parts['event'] == 'delete'
      meta = { "delete" => {"_index" => target_index, "_type" => target_type, "_id" => record[id_key]} }
      bulk_message << Yajl::Encoder.encode(meta)
    else
      meta = { "index" => {"_index" => target_index, "_type" => target_type} }
      if id_key && record[id_key]
        meta['index']['_id'] = record[id_key]
      end
      bulk_message << Yajl::Encoder.encode(meta)
      bulk_message << Yajl::Encoder.encode(record)
    end
  end
  bulk_message << ""

  http = Net::HTTP.new(@host, @port.to_i)
  http.use_ssl = @ssl

  request = Net::HTTP::Post.new('/_bulk', {'content-type' => 'application/json; charset=utf-8'})
  if @username && @password
    request.basic_auth(@username, @password)
  end

  request.body = bulk_message.join("\n")
  http.request(request).value
end