class Drndump::DumpClient
Constants
- DEFAULT_MESSAGES_PER_SECOND
- MIN_REPORTED_THROUGHPUT
- ONE_HOUR_IN_SECONDS
- ONE_MINUTE_IN_SECONDS
Attributes
error_message[R]
n_forecasted_messages[R]
n_received_messages[R]
on_error[W]
on_finish[W]
on_progress[W]
Public Class Methods
new(params)
click to toggle source
# File lib/drndump/dump_client.rb, line 41 def initialize(params) @host = params[:host] @port = params[:port] @tag = params[:tag] @dataset = params[:dataset] @receiver_host = params[:receiver_host] @receiver_port = params[:receiver_port] @n_forecasted_messages = 0 @n_received_messages = 0 @n_messages_per_second = DEFAULT_MESSAGES_PER_SECOND @error_message = nil @on_finish = nil @on_progress = nil @on_error = nil end
Public Instance Methods
formatted_remaining_time()
click to toggle source
# File lib/drndump/dump_client.rb, line 149 def formatted_remaining_time seconds = remaining_seconds hours = (seconds / ONE_HOUR_IN_SECONDS).floor seconds -= hours * ONE_HOUR_IN_SECONDS minutes = (seconds / ONE_MINUTE_IN_SECONDS).floor seconds -= minutes * ONE_MINUTE_IN_SECONDS sprintf("%02i:%02i:%02i", hours, minutes, seconds) end
n_remaining_messages()
click to toggle source
# File lib/drndump/dump_client.rb, line 137 def n_remaining_messages [@n_forecasted_messages - @n_received_messages, 0].max end
progress_percentage()
click to toggle source
# File lib/drndump/dump_client.rb, line 158 def progress_percentage return 0 if @n_forecasted_messages.zero? progress = @n_received_messages.to_f / @n_forecasted_messages [(progress * 100).to_i, 100].min end
recent_throughput()
click to toggle source
# File lib/drndump/dump_client.rb, line 115 def recent_throughput now = Time.now n_messages = @n_received_messages - @previous_n_received_messages if now - @previous_measure_time < 1 now = @previous_measure_time n_messages = @previous_n_received_messages else @previous_measure_time = now @previous_n_received_messages = n_messages.to_f end if now == @measure_start_time actual_throughput = 0 else elapsed_seconds = now - @measure_start_time actual_throughput = n_messages / elapsed_seconds end [actual_throughput, MIN_REPORTED_THROUGHPUT].max end
remaining_seconds()
click to toggle source
# File lib/drndump/dump_client.rb, line 141 def remaining_seconds throughput = [recent_throughput, @n_messages_per_second].min n_remaining_messages.to_f / throughput end
run(options={}) { |restore_message| ... }
click to toggle source
# File lib/drndump/dump_client.rb, line 61 def run(options={}, &block) extra_client_options = { :backend => options[:backend], :loop => options[:loop], } @client = Droonga::Client.new(client_options.merge(extra_client_options)) @client.on_error = lambda do |error| on_error(ClientError.new(error)) end @n_dumpers = 0 @n_messages_per_second = options[:messages_per_second] || DEFAULT_MESSAGES_PER_SECOND @n_messages_per_second = [@n_messages_per_second, 1].max @measure_start_time = Time.now @previous_measure_time = @measure_start_time @previous_n_received_messages = 0.0 dump_message = { "type" => "dump", "dataset" => @dataset, "body" => { "messagesPerSecond" => @n_messages_per_second, }, } @client.subscribe(dump_message) do |message| begin on_progress(message) case message when Droonga::Client::Error client.close on_error(message) @error_message = message.to_s when Hash handle_dump_message(message) do |restore_message| yield(restore_message) end when NilClass raise NilMessage.new("nil message in dump") else raise InvalidMessage.new("invalid message in dump", :message => message.inspect) end rescue Exception => exception @client.close on_error(exception) @error_message = exception.to_s end end @error_message end
Private Instance Methods
client_options()
click to toggle source
# File lib/drndump/dump_client.rb, line 165 def client_options { :host => @host, :port => @port, :tag => @tag, :protocol => :droonga, :receiver_host => @receiver_host, :receiver_port => @receiver_port, } end
convert_to_column_create_message(message)
click to toggle source
# File lib/drndump/dump_client.rb, line 245 def convert_to_column_create_message(message) body = message["body"] column_create_message = { "type" => "column_create", "dataset" => message["dataset"], "body" => { "table" => body["table"], "name" => body["name"], "type" => body["valueType"], } } flags = [] case body["type"] when "Scalar" flags << "COLUMN_SCALAR" when "Vector" flags << "COLUMN_VECTOR" vector_options = body["vectorOptions"] || {} flags << "WITH_WEIGHT" if vector_options["weight"] when "Index" flags << "COLUMN_INDEX" index_options = body["indexOptions"] || {} flags << "WITH_SECTION" if index_options["section"] flags << "WITH_WEIGHT" if index_options["weight"] flags << "WITH_POSITION" if index_options["position"] end column_create_message["body"]["flags"] = flags.join("|") if body["type"] == "Index" index_options = body["indexOptions"] || {} sources = index_options["sources"] || [] unless sources.empty? column_create_message["body"]["source"] = sources.join(",") end end column_create_message end
convert_to_table_create_message(message)
click to toggle source
# File lib/drndump/dump_client.rb, line 212 def convert_to_table_create_message(message) body = message["body"] flags = [] case body["type"] when "Array" flags << "TABLE_NO_KEY" when "Hash" flags << "TABLE_HASH_KEY" when "PatriciaTrie" flags << "TABLE_PAT_KEY" when "DoubleArrayTrie" flags << "TABLE_DAT_KEY" end table_create_message = { "type" => "table_create", "dataset" => message["dataset"], "body" => { "name" => body["name"], "flags" => flags.join("|"), "key_type" => body["keyType"], } } if body["tokenizer"] table_create_message["body"]["default_tokenizer"] = body["tokenizer"] end if body["normalizer"] table_create_message["body"]["normalizer"] = body["normalizer"] end table_create_message end
handle_dump_message(message) { |table_create_message| ... }
click to toggle source
# File lib/drndump/dump_client.rb, line 176 def handle_dump_message(message, &block) case message["type"] when "dump.result", "dump.error" if message["statusCode"] != 200 @client.close error = message["body"] on_error(error) @error_message = "#{error['name']}: #{error['message']}" end when "dump.table" @n_received_messages += 1 table_create_message = convert_to_table_create_message(message) yield(table_create_message) when "dump.column" @n_received_messages += 1 column_create_message = convert_to_column_create_message(message) yield(column_create_message) when "dump.record" @n_received_messages += 1 add_message = message.dup add_message.delete("inReplyTo") add_message["type"] = "add" yield(add_message) when "dump.start" @n_dumpers += 1 when "dump.end" @n_dumpers -= 1 if @n_dumpers <= 0 @client.close on_finish end when "dump.forecast" @n_forecasted_messages += message["body"]["nMessages"] end end
on_error(error)
click to toggle source
# File lib/drndump/dump_client.rb, line 294 def on_error(error) @on_error.call(error) if @on_error end
on_finish()
click to toggle source
# File lib/drndump/dump_client.rb, line 286 def on_finish @on_finish.call if @on_finish end
on_progress(message)
click to toggle source
# File lib/drndump/dump_client.rb, line 290 def on_progress(message) @on_progress.call(message) if @on_progress end