class Drndump::DumpClient

Constants

DEFAULT_MESSAGES_PER_SECOND
MIN_REPORTED_THROUGHPUT
ONE_HOUR_IN_SECONDS
ONE_MINUTE_IN_SECONDS

Attributes

error_message[R]
n_forecasted_messages[R]
n_received_messages[R]
on_error[W]
on_finish[W]
on_progress[W]

Public Class Methods

new(params) click to toggle source
# File lib/drndump/dump_client.rb, line 41
def initialize(params)
  @host     = params[:host]
  @port     = params[:port]
  @tag      = params[:tag]
  @dataset  = params[:dataset]

  @receiver_host = params[:receiver_host]
  @receiver_port = params[:receiver_port]

  @n_forecasted_messages = 0
  @n_received_messages = 0
  @n_messages_per_second = DEFAULT_MESSAGES_PER_SECOND

  @error_message = nil

  @on_finish = nil
  @on_progress = nil
  @on_error = nil
end

Public Instance Methods

formatted_remaining_time() click to toggle source
# File lib/drndump/dump_client.rb, line 149
def formatted_remaining_time
  seconds  = remaining_seconds
  hours    = (seconds / ONE_HOUR_IN_SECONDS).floor
  seconds -= hours * ONE_HOUR_IN_SECONDS
  minutes  = (seconds / ONE_MINUTE_IN_SECONDS).floor
  seconds -= minutes * ONE_MINUTE_IN_SECONDS
  sprintf("%02i:%02i:%02i", hours, minutes, seconds)
end
n_remaining_messages() click to toggle source
# File lib/drndump/dump_client.rb, line 137
def n_remaining_messages
  [@n_forecasted_messages - @n_received_messages, 0].max
end
progress_percentage() click to toggle source
# File lib/drndump/dump_client.rb, line 158
def progress_percentage
  return 0 if @n_forecasted_messages.zero?
  progress = @n_received_messages.to_f / @n_forecasted_messages
  [(progress * 100).to_i, 100].min
end
recent_throughput() click to toggle source
# File lib/drndump/dump_client.rb, line 115
def recent_throughput
  now = Time.now
  n_messages = @n_received_messages - @previous_n_received_messages

  if now - @previous_measure_time < 1
    now = @previous_measure_time
    n_messages = @previous_n_received_messages
  else
    @previous_measure_time = now
    @previous_n_received_messages = n_messages.to_f
  end

  if now == @measure_start_time
    actual_throughput = 0
  else
    elapsed_seconds = now - @measure_start_time
    actual_throughput = n_messages / elapsed_seconds
  end

  [actual_throughput, MIN_REPORTED_THROUGHPUT].max
end
remaining_seconds() click to toggle source
# File lib/drndump/dump_client.rb, line 141
def remaining_seconds
  throughput = [recent_throughput, @n_messages_per_second].min
  n_remaining_messages.to_f / throughput
end
run(options={}) { |restore_message| ... } click to toggle source
# File lib/drndump/dump_client.rb, line 61
def run(options={}, &block)
  extra_client_options = {
    :backend => options[:backend],
    :loop    => options[:loop],
  }
  @client = Droonga::Client.new(client_options.merge(extra_client_options))
  @client.on_error = lambda do |error|
    on_error(ClientError.new(error))
  end

  @n_dumpers = 0

  @n_messages_per_second = options[:messages_per_second] || DEFAULT_MESSAGES_PER_SECOND
  @n_messages_per_second = [@n_messages_per_second, 1].max

  @measure_start_time = Time.now
  @previous_measure_time = @measure_start_time
  @previous_n_received_messages = 0.0

  dump_message = {
    "type"    => "dump",
    "dataset" => @dataset,
    "body"    => {
      "messagesPerSecond" => @n_messages_per_second,
    },
  }
  @client.subscribe(dump_message) do |message|
    begin
      on_progress(message)
      case message
      when Droonga::Client::Error
        client.close
        on_error(message)
        @error_message = message.to_s
      when Hash
        handle_dump_message(message) do |restore_message|
          yield(restore_message)
        end
      when NilClass
        raise NilMessage.new("nil message in dump")
      else
        raise InvalidMessage.new("invalid message in dump",
                                 :message => message.inspect)
      end
    rescue Exception => exception
      @client.close
      on_error(exception)
      @error_message = exception.to_s
    end
  end

  @error_message
end

Private Instance Methods

client_options() click to toggle source
# File lib/drndump/dump_client.rb, line 165
def client_options
  {
    :host          => @host,
    :port          => @port,
    :tag           => @tag,
    :protocol      => :droonga,
    :receiver_host => @receiver_host,
    :receiver_port => @receiver_port,
  }
end
convert_to_column_create_message(message) click to toggle source
# File lib/drndump/dump_client.rb, line 245
def convert_to_column_create_message(message)
  body = message["body"]
  column_create_message = {
    "type"    => "column_create",
    "dataset" => message["dataset"],
    "body" => {
      "table"  => body["table"],
      "name"   => body["name"],
      "type"   => body["valueType"],
    }
  }

  flags = []
  case body["type"]
  when "Scalar"
    flags << "COLUMN_SCALAR"
  when "Vector"
    flags << "COLUMN_VECTOR"
    vector_options = body["vectorOptions"] || {}
    flags << "WITH_WEIGHT" if vector_options["weight"]
  when "Index"
    flags << "COLUMN_INDEX"
    index_options = body["indexOptions"] || {}
    flags << "WITH_SECTION"  if index_options["section"]
    flags << "WITH_WEIGHT"   if index_options["weight"]
    flags << "WITH_POSITION" if index_options["position"]
  end

  column_create_message["body"]["flags"] = flags.join("|")

  if body["type"] == "Index"
    index_options = body["indexOptions"] || {}
    sources = index_options["sources"] || []
    unless sources.empty?
      column_create_message["body"]["source"] = sources.join(",")
    end
  end

  column_create_message
end
convert_to_table_create_message(message) click to toggle source
# File lib/drndump/dump_client.rb, line 212
def convert_to_table_create_message(message)
  body = message["body"]
  flags = []
  case body["type"]
  when "Array"
    flags << "TABLE_NO_KEY"
  when "Hash"
    flags << "TABLE_HASH_KEY"
  when "PatriciaTrie"
    flags << "TABLE_PAT_KEY"
  when "DoubleArrayTrie"
    flags << "TABLE_DAT_KEY"
  end
  table_create_message = {
    "type"    => "table_create",
    "dataset" => message["dataset"],
    "body" => {
      "name"              => body["name"],
      "flags"             => flags.join("|"),
      "key_type"          => body["keyType"],
    }
  }

  if body["tokenizer"]
    table_create_message["body"]["default_tokenizer"] = body["tokenizer"]
  end
  if body["normalizer"]
    table_create_message["body"]["normalizer"] = body["normalizer"]
  end

  table_create_message
end
handle_dump_message(message) { |table_create_message| ... } click to toggle source
# File lib/drndump/dump_client.rb, line 176
def handle_dump_message(message, &block)
  case message["type"]
  when "dump.result", "dump.error"
    if message["statusCode"] != 200
      @client.close
      error = message["body"]
      on_error(error)
      @error_message = "#{error['name']}: #{error['message']}"
    end
  when "dump.table"
    @n_received_messages += 1
    table_create_message = convert_to_table_create_message(message)
    yield(table_create_message)
  when "dump.column"
    @n_received_messages += 1
    column_create_message = convert_to_column_create_message(message)
    yield(column_create_message)
  when "dump.record"
    @n_received_messages += 1
    add_message = message.dup
    add_message.delete("inReplyTo")
    add_message["type"] = "add"
    yield(add_message)
  when "dump.start"
    @n_dumpers += 1
  when "dump.end"
    @n_dumpers -= 1
    if @n_dumpers <= 0
      @client.close
      on_finish
    end
  when "dump.forecast"
    @n_forecasted_messages += message["body"]["nMessages"]
  end
end
on_error(error) click to toggle source
# File lib/drndump/dump_client.rb, line 294
def on_error(error)
  @on_error.call(error) if @on_error
end
on_finish() click to toggle source
# File lib/drndump/dump_client.rb, line 286
def on_finish
  @on_finish.call if @on_finish
end
on_progress(message) click to toggle source
# File lib/drndump/dump_client.rb, line 290
def on_progress(message)
  @on_progress.call(message) if @on_progress
end