class Fluent::Plugin::DocumentdbOutput
Constants
- DEFAULT_BUFFER_TYPE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_documentdb.rb, line 41 def configure(conf) compat_parameters_convert(conf, :buffer) super raise Fluent::ConfigError, 'no docdb_endpoint' if @docdb_endpoint.empty? raise Fluent::ConfigError, 'no docdb_account_key' if @docdb_account_key.empty? raise Fluent::ConfigError, 'no docdb_database' if @docdb_database.empty? raise Fluent::ConfigError, 'no docdb_collection' if @docdb_collection.empty? if @add_time_field and @time_field_name.empty? raise Fluent::ConfigError, 'time_field_name must be set if add_time_field is true' end if @add_tag_field and @tag_field_name.empty? raise Fluent::ConfigError, 'tag_field_name must be set if add_tag_field is true' end if @partitioned_collection raise Fluent::ConfigError, 'partition_key must be set in partitioned collection mode' if @partition_key.empty? if (@auto_create_collection && @offer_throughput < AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT) raise Fluent::ConfigError, sprintf("offer_throughput must be more than and equals to %s", AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT) end end raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag @timef = Fluent::TimeFormatter.new(@time_format, @localtime) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_documentdb.rb, line 113 def format(tag, time, record) record['id'] = SecureRandom.uuid if @add_time_field record[@time_field_name] = @timef.format(time) end if @add_tag_field record[@tag_field_name] = tag end record.to_msgpack end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_documentdb.rb, line 124 def formatted_to_msgpack_binary? true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_documentdb.rb, line 128 def multi_workers_ready? true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_documentdb.rb, line 108 def shutdown super # destroy end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_documentdb.rb, line 66 def start super begin @client = nil if @partitioned_collection @client = AzureDocumentDB::PartitionedCollectionClient.new(@docdb_account_key,@docdb_endpoint) else @client = AzureDocumentDB::Client.new(@docdb_account_key,@docdb_endpoint) end ## initial operations for database res = @client.find_databases_by_name(@docdb_database) if( res[:body]["_count"].to_i == 0 ) raise "No database (#{docdb_database}) exists! Enable auto_create_database or create it by useself" if !@auto_create_database # create new database as it doesn't exists @client.create_database(@docdb_database) end ## initial operations for collection database_resource = @client.get_database_resource(@docdb_database) res = @client.find_collections_by_name(database_resource, @docdb_collection) if( res[:body]["_count"].to_i == 0 ) raise "No collection (#{docdb_collection}) exists! Enable auto_create_collection or create it by useself" if !@auto_create_collection # create new collection as it doesn't exists if @partitioned_collection partition_key_paths = ["/#{@partition_key}"] @client.create_collection(database_resource, @docdb_collection, partition_key_paths, @offer_throughput) else @client.create_collection(database_resource, @docdb_collection) end end @coll_resource = @client.get_collection_resource(database_resource, @docdb_collection) rescue Exception =>ex log.fatal "Error: '#{ex}'" exit! end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_documentdb.rb, line 132 def write(chunk) chunk.msgpack_each { |record| unique_doc_identifier = record["id"] begin if @partitioned_collection @client.create_document(@coll_resource, unique_doc_identifier, record, @partition_key) else @client.create_document(@coll_resource, unique_doc_identifier, record) end rescue RestClient::ExceptionWithResponse => rcex exdict = JSON.parse(rcex.response) if exdict['code'] == 'Conflict' log.fatal "Duplicate Error: document #{unique_doc_identifier} already exists, data=>" + record.to_json else log.fatal "RestClient Error: '#{rcex.response}', data=>" + record.to_json end rescue => ex log.fatal "UnknownError: '#{ex}', uniqueid=>#{unique_doc_identifier}, data=>" + record.to_json end } end