class Fluent::SQLInput

Constants

SKIP_TABLE_REGEXP

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sql.rb, line 144
def configure(conf)
  super

  unless @state_file
    $log.warn "'state_file PATH' parameter is not set to a 'sql' source."
    $log.warn "this parameter is highly recommended to save the last rows to resume tailing."
  end

  @tables = conf.elements.select {|e|
    e.name == 'table'
  }.map {|e|
    te = TableElement.new
    te.configure(e)
    te
  }

  if config['all_tables']
    @all_tables = true
  end
end
generate_schema(table, schema_name) click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 262
def generate_schema table, schema_name
  require "avro_turf"
  require 'avro_turf/messaging'
  require "avro/builder"
  avro = AvroTurf::Messaging.new(registry_url: "http://#{KAFKA_SERVER}:8081")
  fields = @base_model.connection.columns(table.table).map do |col|
    col_type = if col.sql_type.include? 'bigint'
                 'long'
               elsif ['int', 'bool'].any? {|needle| col.sql_type.include?(needle)}
                 'int'
               elsif ['float', 'double', 'real'].any? {|needle| col.sql_type.include?(needle)}
                 'float'
               else
                 'string'
               end
    {
        'name' => col.name,
        'type' => ['null', col_type]
    }
  end
  field_types = fields.map{|field| [field['name'], (field['type'] - ['null']).first]}.to_h
  fields << {"name" => "enchilada_timestamp", "type" => "long"}
  fields << {"name" => "enchilada_time_with_format", "type" => "string"}
  schema_json = {
      "type": "record",
      "name": schema_name,
      "fields": fields
  }.to_json
  registry = avro.instance_variable_get('@registry')
  schema = Avro::Schema.parse(schema_json)
  schema_id = registry.register("#{schema_name}-value", schema)

  stored_schema = {
      'schema_json' => schema_json,
      'schema_id' => schema_id,
      'field_types' => field_types,
      'schema' => schema
  }

  set_schema_to_redis(schema_name, stored_schema)
end
get_schema_from_redis_by_name(schema_name) click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 308
def get_schema_from_redis_by_name schema_name
  stored_schema = $redis.get(schema_name)
end
init_redis() click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 258
def init_redis
  $redis = Redis.new
end
set_schema_to_redis(schema_name, schema) click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 304
def set_schema_to_redis schema_name, schema
  $redis.set(schema_name, schema.to_json)
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 237
def shutdown
  @stop_flag = true
end
start() click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 167
def start
  @state_store = @state_file.nil? ? MemoryStateStore.new : StateStore.new(@state_file)

  config = {
    :adapter => @adapter,
    :host => @host,
    :port => @port,
    :database => @database,
    :username => @username,
    :password => @password,
    :socket => @socket,
  }

  # creates subclass of ActiveRecord::Base so that it can have different
  # database configuration from ActiveRecord::Base.
  @base_model = Class.new(ActiveRecord::Base) do
    # base model doesn't have corresponding phisical table
    self.abstract_class = true
  end

  # ActiveRecord requires the base_model to have a name. Here sets name
  # of an anonymous class by assigning it to a constant. In Ruby, class has
  # a name of a constant assigned first
  SQLInput.const_set("BaseModel_#{rand(1 << 31)}", @base_model)

  # Now base_model can have independent configuration from ActiveRecord::Base
  @base_model.establish_connection(config)

  if @all_tables
    # get list of tables from the database
    @tables = @base_model.connection.tables.map do |table_name|
      if table_name.match(SKIP_TABLE_REGEXP)
        # some tables such as "schema_migrations" should be ignored
        nil
      else
        te = TableElement.new
        te.configure({
          'table' => table_name,
          'tag' => table_name,
          'update_column' => nil,
        })
        te
      end
    end.compact
  end

  init_redis

  # ignore tables if TableElement#init failed
  @tables.reject! do |te|
    begin
      schema_name = "#{SERVER_PREFIX}_#{@tag_prefix}_#{te.tag.presence || te.table}_#{Digest::MD5.new.hexdigest(@base_model.connection.columns(te.table).map{|c| c.name}.to_s)[0..5]}"
      unless get_schema_from_redis_by_name schema_name
        generate_schema te, schema_name
      end

      te.init(@tag_prefix, @base_model, router)
      log.info "Selecting '#{te.table}' table"
      false
    rescue => e
      log.warn "Can't handle '#{te.table}' table. Ignoring.", :error => e.message, :error_class => e.class
      log.warn_backtrace e.backtrace
      true
    end
  end

  @stop_flag = false
  @thread = Thread.new(&method(:thread_main))
end
thread_main() click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 241
def thread_main
  until @stop_flag
    sleep @select_interval

    @tables.each do |t|
      begin
        last_record = @state_store.last_records[t.table]
        @state_store.last_records[t.table] = t.emit_next_records(last_record, @select_limit)
        @state_store.update!
      rescue => e
        log.error "unexpected error", :error => e.message, :error_class => e.class
        log.error_backtrace e.backtrace
      end
    end
  end
end