class Fluent::Plugin::SQLInput::TableElement

Attributes

log[R]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sql.rb, line 65
def configure(conf)
  super
end
emit_next_records(last_record, limit) click to toggle source

emits next records and returns the last record of emitted records

# File lib/fluent/plugin/in_sql.rb, line 129
def emit_next_records(last_record, limit)
  relation = @model
  if last_record && last_update_value = last_record[@update_column]
    relation = relation.where("#{@update_column} > ?", last_update_value)
  end
  relation = relation.order("#{@update_column} ASC")
  relation = relation.limit(limit) if limit > 0

  now = Fluent::Engine.now

  me = Fluent::MultiEventStream.new
  relation.each do |obj|
    record = obj.serializable_hash rescue nil
    if record
      time =
        if @time_column && (tv = obj.read_attribute(@time_column))
          normalized_time(tv, now)
        else
          now
        end

      me.add(time, record)
      last_record = record
    end
  end

  last_record = last_record.dup if last_record  # some plugin rewrites record :(
  @router.emit_stream(@tag, me)

  return last_record
end
init(tag_prefix, base_model, router, log) click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 69
def init(tag_prefix, base_model, router, log)
  @router = router
  @tag = "#{tag_prefix}.#{@tag}" if tag_prefix
  @log = log

  # creates a model for this table
  table_name = @table
  primary_key = @primary_key
  time_format = @time_format

  @model = Class.new(base_model) do
    self.table_name = table_name
    self.inheritance_column = '_never_use_'
    self.primary_key = primary_key if primary_key
    self.const_set(:TIME_FORMAT, time_format)

    #self.include_root_in_json = false

    def read_attribute_for_serialization(n)
      v = send(n)
      if v.respond_to?(:to_msgpack)
        v
      elsif v.is_a? Time
        v.strftime(self.class::TIME_FORMAT)
      else
        v.to_s
      end
    end
  end

  # ActiveRecord requires model class to have a name.
  class_name = table_name.gsub(/\./, "_").singularize.camelize
  base_model.const_set(class_name, @model)

  # Sets model_name otherwise ActiveRecord causes errors
  model_name = ActiveModel::Name.new(@model, nil, class_name)
  @model.define_singleton_method(:model_name) { model_name }

  # if update_column is not set, here uses primary key
  unless @update_column
    pk = @model.columns_hash[@model.primary_key]
    unless pk
      raise "Composite primary key is not supported. Set update_column parameter to <table> section."
    end
    @update_column = pk.name
  end
end
normalized_time(tv, now) click to toggle source

Make sure we always have a Fluent::EventTime object regardless of what comes in

# File lib/fluent/plugin/in_sql.rb, line 118
def normalized_time(tv, now)
  return Fluent::EventTime.from_time(tv) if tv.is_a?(Time)
  begin
    Fluent::EventTime.parse(tv.to_s)
  rescue
    log.warn "Message contains invalid timestamp, using current time instead (#{now.inspect})"
    now
  end
end
read_attribute_for_serialization(n) click to toggle source

self.include_root_in_json = false

# File lib/fluent/plugin/in_sql.rb, line 87
def read_attribute_for_serialization(n)
  v = send(n)
  if v.respond_to?(:to_msgpack)
    v
  elsif v.is_a? Time
    v.strftime(self.class::TIME_FORMAT)
  else
    v.to_s
  end
end