class Fluent::SQLInput::TableElement

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sql.rb, line 70
def configure(conf)
  super
end
emit_next_records(last_record, limit) click to toggle source

emits next records and returns the last record of emitted records

# File lib/fluent/plugin/in_sql.rb, line 117
def emit_next_records(last_record, limit)
  relation = @model
  if last_record && last_update_value = last_record[@update_column]
    relation = relation.where("#{@update_column} > ?", last_update_value)
  end
  relation = relation.order("#{@update_column} ASC")
  relation = relation.limit(limit) if limit > 0

  now = Engine.now

  me = MultiEventStream.new
  relation.each do |obj|
    record = obj.serializable_hash rescue nil
    if record
      if @time_column && tv = obj.read_attribute(@time_column)
        if tv.is_a?(Time)
          time = tv.to_i
        else
          time = Time.parse(tv.to_s).to_i rescue now
        end
      else
        time = now
      end
      me.add(time, record)
      last_record = record
    end
  end

  last_record = last_record.dup if last_record  # some plugin rewrites record :(
  @router.emit_stream(@tag, me)

  return last_record
end
init(tag_prefix, base_model, router) click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 74
def init(tag_prefix, base_model, router)
  @router = router
  @tag = "#{tag_prefix}.#{@tag}" if tag_prefix

  # creates a model for this table
  table_name = @table
  primary_key = @primary_key
  @model = Class.new(base_model) do
    self.table_name = table_name
    self.inheritance_column = '_never_use_'
    self.primary_key = primary_key if primary_key

    #self.include_root_in_json = false

    def read_attribute_for_serialization(n)
      v = send(n)
      if v.respond_to?(:to_msgpack)
        v
      else
        v.to_s
      end
    end
  end

  # ActiveRecord requires model class to have a name.
  class_name = table_name.singularize.camelize
  base_model.const_set(class_name, @model)

  # Sets model_name otherwise ActiveRecord causes errors
  model_name = ActiveModel::Name.new(@model, nil, class_name)
  @model.define_singleton_method(:model_name) { model_name }

  # if update_column is not set, here uses primary key
  unless @update_column
    pk = @model.columns_hash[@model.primary_key]
    unless pk
      raise "Composite primary key is not supported. Set update_column parameter to <table> section."
    end
    @update_column = pk.name
  end
end
read_attribute_for_serialization(n) click to toggle source

self.include_root_in_json = false

# File lib/fluent/plugin/in_sql.rb, line 88
def read_attribute_for_serialization(n)
  v = send(n)
  if v.respond_to?(:to_msgpack)
    v
  else
    v.to_s
  end
end