class Fluent::Plugin::SQLInput::TableElement
Attributes
log[R]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sql.rb, line 65 def configure(conf) super end
emit_next_records(last_record, limit)
click to toggle source
emits next records and returns the last record of emitted records
# File lib/fluent/plugin/in_sql.rb, line 129 def emit_next_records(last_record, limit) relation = @model if last_record && last_update_value = last_record[@update_column] relation = relation.where("#{@update_column} > ?", last_update_value) end relation = relation.order("#{@update_column} ASC") relation = relation.limit(limit) if limit > 0 now = Fluent::Engine.now me = Fluent::MultiEventStream.new relation.each do |obj| record = obj.serializable_hash rescue nil if record time = if @time_column && (tv = obj.read_attribute(@time_column)) normalized_time(tv, now) else now end me.add(time, record) last_record = record end end last_record = last_record.dup if last_record # some plugin rewrites record :( @router.emit_stream(@tag, me) return last_record end
init(tag_prefix, base_model, router, log)
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 69 def init(tag_prefix, base_model, router, log) @router = router @tag = "#{tag_prefix}.#{@tag}" if tag_prefix @log = log # creates a model for this table table_name = @table primary_key = @primary_key time_format = @time_format @model = Class.new(base_model) do self.table_name = table_name self.inheritance_column = '_never_use_' self.primary_key = primary_key if primary_key self.const_set(:TIME_FORMAT, time_format) #self.include_root_in_json = false def read_attribute_for_serialization(n) v = send(n) if v.respond_to?(:to_msgpack) v elsif v.is_a? Time v.strftime(self.class::TIME_FORMAT) else v.to_s end end end # ActiveRecord requires model class to have a name. class_name = table_name.gsub(/\./, "_").singularize.camelize base_model.const_set(class_name, @model) # Sets model_name otherwise ActiveRecord causes errors model_name = ActiveModel::Name.new(@model, nil, class_name) @model.define_singleton_method(:model_name) { model_name } # if update_column is not set, here uses primary key unless @update_column pk = @model.columns_hash[@model.primary_key] unless pk raise "Composite primary key is not supported. Set update_column parameter to <table> section." end @update_column = pk.name end end
normalized_time(tv, now)
click to toggle source
Make sure we always have a Fluent::EventTime object regardless of what comes in
# File lib/fluent/plugin/in_sql.rb, line 118 def normalized_time(tv, now) return Fluent::EventTime.from_time(tv) if tv.is_a?(Time) begin Fluent::EventTime.parse(tv.to_s) rescue log.warn "Message contains invalid timestamp, using current time instead (#{now.inspect})" now end end
read_attribute_for_serialization(n)
click to toggle source
self.include_root_in_json = false
# File lib/fluent/plugin/in_sql.rb, line 87 def read_attribute_for_serialization(n) v = send(n) if v.respond_to?(:to_msgpack) v elsif v.is_a? Time v.strftime(self.class::TIME_FORMAT) else v.to_s end end