class Fluent::SQLInput::TableElement
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sql.rb, line 70 def configure(conf) super end
emit_next_records(last_record, limit)
click to toggle source
emits next records and returns the last record of emitted records
# File lib/fluent/plugin/in_sql.rb, line 117 def emit_next_records(last_record, limit) relation = @model if last_record && last_update_value = last_record[@update_column] relation = relation.where("#{@update_column} > ?", last_update_value) end relation = relation.order("#{@update_column} ASC") relation = relation.limit(limit) if limit > 0 now = Engine.now me = MultiEventStream.new relation.each do |obj| record = obj.serializable_hash rescue nil if record if @time_column && tv = obj.read_attribute(@time_column) if tv.is_a?(Time) time = tv.to_i else time = Time.parse(tv.to_s).to_i rescue now end else time = now end me.add(time, record) last_record = record end end last_record = last_record.dup if last_record # some plugin rewrites record :( @router.emit_stream(@tag, me) return last_record end
init(tag_prefix, base_model, router)
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 74 def init(tag_prefix, base_model, router) @router = router @tag = "#{tag_prefix}.#{@tag}" if tag_prefix # creates a model for this table table_name = @table primary_key = @primary_key @model = Class.new(base_model) do self.table_name = table_name self.inheritance_column = '_never_use_' self.primary_key = primary_key if primary_key #self.include_root_in_json = false def read_attribute_for_serialization(n) v = send(n) if v.respond_to?(:to_msgpack) v else v.to_s end end end # ActiveRecord requires model class to have a name. class_name = table_name.singularize.camelize base_model.const_set(class_name, @model) # Sets model_name otherwise ActiveRecord causes errors model_name = ActiveModel::Name.new(@model, nil, class_name) @model.define_singleton_method(:model_name) { model_name } # if update_column is not set, here uses primary key unless @update_column pk = @model.columns_hash[@model.primary_key] unless pk raise "Composite primary key is not supported. Set update_column parameter to <table> section." end @update_column = pk.name end end
read_attribute_for_serialization(n)
click to toggle source
self.include_root_in_json = false
# File lib/fluent/plugin/in_sql.rb, line 88 def read_attribute_for_serialization(n) v = send(n) if v.respond_to?(:to_msgpack) v else v.to_s end end